- 1. 测试数据
- 2. 定义dataSchema
- 2.1 指定数据源名称
- 2.2 指定__time时间戳
- 2.3 定义Rollup相关
- 2.3.1 "rollup" : true
- 2.3.2 "rollup" : false
- 2.4 定义granularitySpec
- 3. 定义task type
- 4. 定义数据输入源
- 5. 其它优化参数
- 6. 提交任务
- 7. 查询结果数据
我们要插入的测试是一批网络流数据,如下所示:
{"ts":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":10, "bytes":1000, "cost": 1.4}
{"ts":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":20, "bytes":2000, "cost": 3.1}
{"ts":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":30, "bytes":3000, "cost": 0.4}
{"ts":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":40, "bytes":4000, "cost": 7.9}
{"ts":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":50, "bytes":5000, "cost": 10.2}
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
{"ts":"2018-01-01T02:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":100, "bytes":10000, "cost": 22.4}
{"ts":"2018-01-01T02:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":200, "bytes":20000, "cost": 34.5}
{"ts":"2018-01-01T02:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":300, "bytes":30000, "cost": 46.3}
将其保存到quickstart/tutorial/ingestion-tutorial-data.json文件中
2. 定义dataSchemaingestion spec中最核心的是dataSchema,定义了如何将输入数据解析为一组列
在quickstart/tutorial目录下新建文件ingestion-tutorial-index.json,在其中添加一个空的dataSchema,内容如下:
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {}
[root@bigdata001 apache-druid-0.22.1]#
2.1 指定数据源名称
通过dataSource指定
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {
"dataSource" : "ingestion-tutorial"
}
[root@bigdata001 apache-druid-0.22.1]#
2.2 指定__time时间戳
通过timestampSpec指定__time时间戳。通过column定义__time时间戳的来源时间字段,通过format定义来源时间字段的时间类型
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
}
}
[root@bigdata001 apache-druid-0.22.1]#
2.3 定义Rollup相关
rollup定义在granularitySpec下面
- 如果启用了rollup,输入字段分为两类,“dimensions"和"metrics”。“dimensions” 是用于rollup的聚合字段,"metrics"是计算的指标字段
- 如果关闭了rollup,则所有字段都是"dimensions",不会发生预聚合
本示例,我们开启rollup
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
},
"granularitySpec" : {
"rollup" : true
}
}
[root@bigdata001 apache-druid-0.22.1]#
dimension和Metrics的字段划分
- dimension:srcIP、srcPort、dstIP、dstPort、protocol
- Metrics:packets、bytes、cost
dimension的指定
dimensions定义在dimensionsSpec下面
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
},
"granularitySpec" : {
"rollup" : true
},
"dimensionsSpec" : {
"dimensions" : [
"srcIP",
{"name" : "srcPort", "type" : "long"},
{"name" : "dstIP", "type" : "string"},
{"name" : "dstPort", "type" : "long"},
{"name" : "protocol", "type" : "string"}
]
}
}
[root@bigdata001 apache-druid-0.22.1]#
- 字段类型可以是long、float、double、string
- 如果字段类型是string类型,可以只指定字段名称,因为默认的字段类型就是string。如"srcIP"
- protocol字段在数据文件中是数值类型,本应定义为long类型,但这里我们定义为string类型。会强制将long类型转换为string类型
- 数据文件中的字段为数值类型,定义为数值类型可以减少磁盘空间,降低处理开销,Metrics字段推荐用数值类型。定义为string类型可以使用位图索引,dimension字段推荐用string类型
Metrics的指定
使用metricsSpec进行定义
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
},
"granularitySpec" : {
"rollup" : true
},
"dimensionsSpec" : {
"dimensions" : [
"srcIP",
{"name" : "srcPort", "type" : "long"},
{"name" : "dstIP", "type" : "string"},
{"name" : "dstPort", "type" : "long"},
{"name" : "protocol", "type" : "string"}
]
},
"metricsSpec" : [
{"type" : "count", "name" : "count"},
{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},
{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},
{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}
]
}
[root@bigdata001 apache-druid-0.22.1]#
这里我们定义了一个count聚合字段,用来计算每个分组中,原始数据有多少行数据
2.3.2 “rollup” : false如果Rollup为false,所有字段都定义在dimensionsSpec。例子如下:
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
},
"granularitySpec" : {
"rollup" : false
},
"dimensionsSpec" : {
"dimensions" : [
"srcIP",
{"name" : "srcPort", "type" : "long"},
{"name" : "dstIP", "type" : "string"},
{"name" : "dstPort", "type" : "long"},
{"name" : "protocol", "type" : "string"},
{"name" : "packets", "type" : "long"},
{"name" : "bytes", "type" : "long"},
{"name" : "srcPort", "type" : "double"}
]
}
}
2.4 定义granularitySpec
- type的可选值有uniform、arbitrary
- uniform:所有segment都有统一的时间间隔,例如所有segment都包含一小时的数据
- segmentGranularity:指定每个segment的时间间隔大小,例如:HOUR、DAY、WEEK
- queryGranularity:指定segment中__time字段的保存粒度,也称bucketing粒度
- intervals:对于批处理任务,不在该intervals时间范围内的数据,不会被保存
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
},
"granularitySpec" : {
"rollup" : true,
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"intervals" : ["2018-01-01/2018-01-02"]
},
"dimensionsSpec" : {
"dimensions" : [
"srcIP",
{"name" : "srcPort", "type" : "long"},
{"name" : "dstIP", "type" : "string"},
{"name" : "dstPort", "type" : "long"},
{"name" : "protocol", "type" : "string"}
]
},
"metricsSpec" : [
{"type" : "count", "name" : "count"},
{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},
{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},
{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}
]
}
[root@bigdata001 apache-druid-0.22.1]#
- 我们的数据时间有1点和2点的,所以会分成两个segment
- __time会被保存为分钟粒度,比如"2018-01-01T01:01:35Z"保存为"2018-01-01T01:01:00Z"
- dataSchema在所有task type中都一样
- 但每个task type的其它部分定义的format不一样
本示例,我们使用本地batch数据插入spec,从本地文件读取数据
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
},
"granularitySpec" : {
"rollup" : true,
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"intervals" : ["2018-01-01/2018-01-02"]
},
"dimensionsSpec" : {
"dimensions" : [
"srcIP",
{"name" : "srcPort", "type" : "long"},
{"name" : "dstIP", "type" : "string"},
{"name" : "dstPort", "type" : "long"},
{"name" : "protocol", "type" : "string"}
]
},
"metricsSpec" : [
{"type" : "count", "name" : "count"},
{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},
{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},
{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}
]
}
}
}
4. 定义数据输入源
在ioConfig中定义:
- ioConfig也需要指定task type
- 同时定义inputSource指定数据的来源
- 定义inputFormat指定文件的数据格式
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
},
"granularitySpec" : {
"rollup" : true,
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"intervals" : ["2018-01-01/2018-01-02"]
},
"dimensionsSpec" : {
"dimensions" : [
"srcIP",
{"name" : "srcPort", "type" : "long"},
{"name" : "dstIP", "type" : "string"},
{"name" : "dstPort", "type" : "long"},
{"name" : "protocol", "type" : "string"}
]
},
"metricsSpec" : [
{"type" : "count", "name" : "count"},
{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},
{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},
{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}
]
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial",
"filter" : "ingestion-tutorial-data.json"
},
"inputFormat" : {
"type" : "json"
}
}
}
}
[root@bigdata001 apache-druid-0.22.1]#
5. 其它优化参数
在tuningConfig中定义
- 需要定义task type
- 再指定针对该task type的优化参数
这里我们设置本地文件数据导入的优化参数,形成最终的spec
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"column" : "ts",
"format" : "iso"
},
"granularitySpec" : {
"rollup" : true,
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"intervals" : ["2018-01-01/2018-01-02"]
},
"dimensionsSpec" : {
"dimensions" : [
"srcIP",
{"name" : "srcPort", "type" : "long"},
{"name" : "dstIP", "type" : "string"},
{"name" : "dstPort", "type" : "long"},
{"name" : "protocol", "type" : "string"}
]
},
"metricsSpec" : [
{"type" : "count", "name" : "count"},
{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},
{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},
{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}
]
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial",
"filter" : "ingestion-tutorial-data.json"
},
"inputFormat" : {
"type" : "json"
}
},
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 5000000
}
}
}
[root@bigdata001 apache-druid-0.22.1]#
6. 提交任务
将ingestion-tutorial-data.json和ingestion-tutorial-index.json文件,分发到Druid集群其它服务器的quickstart/tutorial目录下
[root@bigdata001 apache-druid-0.22.1]#
[root@bigdata001 apache-druid-0.22.1]# scp quickstart/tutorial/ingestion-tutorial-data.json root@bigdata002:/opt/apache-druid-0.22.1/quickstart/tutorial/
ingestion-tutorial-data.json 100% 1408 956.5KB/s 00:00
[root@bigdata001 apache-druid-0.22.1]# scp quickstart/tutorial/ingestion-tutorial-index.json root@bigdata002:/opt/apache-druid-0.22.1/quickstart/tutorial/
ingestion-tutorial-index.json 100% 1373 1.4MB/s 00:00
[root@bigdata001 apache-druid-0.22.1]#
在命令行执行task
[root@bigdata001 apache-druid-0.22.1]# bin/post-index-task --file quickstart/tutorial/ingestion-tutorial-index.json --url http://bigdata003:9081
Beginning indexing data for ingestion-tutorial
Task started: index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z
Task log: http://bigdata003:9081/druid/indexer/v1/task/index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z/log
Task status: http://bigdata003:9081/druid/indexer/v1/task/index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z/status
Task index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z still running...
Task index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z still running...
Task index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z still running...
Task finished with status: SUCCESS
Completed indexing data for ingestion-tutorial. Now loading indexed data onto the cluster...
[root@bigdata001 apache-druid-0.22.1]#
7. 查询结果数据
dsql>
dsql> select * from "ingestion-tutorial";
┌──────────────────────────┬───────┬──────┬───────┬─────────┬─────────┬─────────┬──────────┬─────────┬─────────┐
│ __time │ bytes │ cost │ count │ dstIP │ dstPort │ packets │ protocol │ srcIP │ srcPort │
├──────────────────────────┼───────┼──────┼───────┼─────────┼─────────┼─────────┼──────────┼─────────┼─────────┤
│ 2018-01-01T01:01:00.000Z │ 6000 │ 4.9 │ 3 │ 2.2.2.2 │ 3000 │ 60 │ 6 │ 1.1.1.1 │ 2000 │
│ 2018-01-01T01:02:00.000Z │ 9000 │ 18.1 │ 2 │ 2.2.2.2 │ 7000 │ 90 │ 6 │ 1.1.1.1 │ 5000 │
│ 2018-01-01T01:03:00.000Z │ 6000 │ 4.3 │ 1 │ 2.2.2.2 │ 7000 │ 60 │ 6 │ 1.1.1.1 │ 5000 │
│ 2018-01-01T02:33:00.000Z │ 30000 │ 56.9 │ 2 │ 8.8.8.8 │ 5000 │ 300 │ 17 │ 7.7.7.7 │ 4000 │
│ 2018-01-01T02:35:00.000Z │ 30000 │ 46.3 │ 1 │ 8.8.8.8 │ 5000 │ 300 │ 17 │ 7.7.7.7 │ 4000 │
└──────────────────────────┴───────┴──────┴───────┴─────────┴─────────┴─────────┴──────────┴─────────┴─────────┘
Retrieved 5 rows in 0.24s.
dsql>