作者:Syn良子 出处:https://www.cnblogs.com/cssdongl/p/9885534.html 转载请注明出处

Druid的数据摄取任务类型


Druid支持很多种类型的数据摄取任务.任务通过CURL POST的方式提交到Overlord节点然后分配给middle manager运行.

Segment创建任务类型


本地批处理索引任务

本地批处理摄取任务

{
"type" : "index",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "index",
"firehose" : {
"type" : "local",
"baseDir" : "examples/indexing/",
"filter" : "wikipedia_data.json"
}
},
"tuningConfig" : {
"type" : "index",
"targetPartitionSize" : 5000000,
"maxRowsInMemory" : 75000
}
}
}

以上为本地索引任务的语法格式,注意type必须为"index",这个任务将本地examples/indexing/下的wikipedia_data.json文件摄取到druid的segment中去,可以通过CURL POST的方式提交到Overlord,并不需要额外的hadoop配置


Hadoop索引任务

{
"type" : "index_hadoop",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "hadoopyString",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"paths" : "/MyDirectory/example/wikipedia_data.json"
}
},
"tuningConfig" : {
"type": "hadoop"
}
},
"hadoopDependencyCoordinates": <my_hadoop_version>
}

以上为Hadoop索引任务的语法格式,注意type必须为"index_hadoop",这个任务将/MyDirectory/example/wikipedia_data.json文件摄取到druid的segment中去,注意这个路径是基于HDFS的,任务可以通过CURL POST的方式提交到Overlord,需要额外的hadoop已经配置好,因为最终会转化为MapReduce的方式来摄取


Kafka索引任务

{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}

以上为Kafka索引任务的语法格式,注意type必须为"kafka",这个任务通过localhost:9092端口来消费kafka中的数据并摄取到druid的segment中去,注意这个kafka摄取的任务类型还在实验阶段并且需要kafka0.10的支持


流式Streaming push任务类型

这种任务类型是通过Tranquility来自动化的创建realtime任务类型提交到overlord来运行.Tranquility是什么?如下为其github地址

https://github.com/druid-io/tranquility

我们可以利用Tranquility消费实时数据并向Druid发送实时事件流,并无缝地处理分区,复制,提供服务发现等功能.我举个栗子,比如你可以通过storm或者sparkstreaming或者flink来集成Tranquility实时的消费kafka数据流并加载到druid的segments中去并且可以同时进行实时的数据查询.这种方案要写大量代码的但是相对来说比较成熟自由度较高.随后我会找时间单独详细讲解.


压缩任务类型

{
"type": "compact",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"dimensions" <custom dimensionsSpec>,
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}

注意任务类型必须为compact,这个任务类型可以压缩指定时间段内的segments到一个新的segments并同时指定分区数和维度组合

参考资料:Druid的任务类型总览

最新文章

  1. [转]VS2013自带SQL Server 的启用方法
  2. ObjectOutputStream和ObjectInputStream
  3. maven时候Embedded error: error in opening zip file
  4. 序列dp笔记√
  5. PAT-乙级-1009. 说反话 (20)
  6. eNSP
  7. Java基础知识强化69:基本类型包装类之Character案例(统计字符串中大写小写以及数字的次数)
  8. gameUnity 0.15alpha 网络游戏框架
  9. json_encode处理json数据中文乱码
  10. axios页面无刷新提交from表单
  11. linux下 vi命令编辑/etc/my.cnf
  12. 对迭代器操作的python 模块
  13. chromdriver与geckodriver和浏览器版本问题
  14. 关于Java堆、栈和常量池的详解
  15. Eclipse Maven项目报错1之JAVA编译版本报错
  16. 如何将jar包添加到maven本地仓库
  17. PHP图片压缩
  18. 【PAT】1004. 成绩排名 (20)
  19. 统计Visual Studio项目的代码行数
  20. SCU 4444 Travel (补图最短路)

热门文章

  1. pythonanywhere笔记
  2. CENTOS --5分钟搞定Nginx安装的教程
  3. Struts2_day02--课程安排_结果页面配置
  4. Activity 5秒 Broadcast 10秒 Service 20秒
  5. SPOJ 375 QTREE
  6. 【BZOJ2879】[Noi2012]美食节 动态加边网络流
  7. springMVC问题
  8. centos免密登录
  9. maven发布项目的snapshot到nexus
  10. java面试基础题------》抽象类和接口有什么异同