gobblin 0.10

想要持久化kafka到hdfs有很多种方式,比如flume、logstash、gobblin,其中flume和logstash是流式的,gobblin是批处理式的,gobblin通过定时任务触发来完成数据持久化,在任务和任务之间是没有任何读写的,这点是和flume、logstash的最大不同;

gobblin有几种部署方式:

1)standalone+cron;

2)mr+oozie/azkaban等

3)docker;

其中第3中方式最为方便,因为gobblin可以把任务的状态都写到hdfs上,所以在哪个节点启动gobblin并没有什么区别,而且只有数据同步之后才会修改元数据,保证不会因为kafka或者hdfs或者自身故障导致丢数据;

1 配置

#job
job.name=test_job
job.group=test_group
job.schedule=0 0 */1 * * ?
job.lock.enabled=false #source
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
kafka.brokers=$kafka_brokers
bootstrap.with.offset=latest
topic.whitelist=$kafka_topics mr.job.max.mappers=1 #writer
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
writer.partitioner.class=gobblin.writer.partitioner.TimeBasedWriterPartitioner
writer.partition.columns=time
writer.partition.level=hourly
writer.partition.pattern=yyyyMMdd/HH
writer.partition.timezone=Asia/Shanghai
data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher #metrics
metrics.reporting.file.enabled=true
metrics.reporting.file.suffix=txt #fs
fs.uri=hdfs://$name_node:8020
writer.fs.uri=${fs.uri}
state.store.fs.uri=${fs.uri} data.publisher.final.dir=${env:GOBBLIN_WORK_DIR}/job-output
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
state.store.dir=${env:GOBBLIN_WORK_DIR}/state-store
mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working
task.data.root.dir=${env:GOBBLIN_WORK_DIR}/task-data

修改其中的$kafka_brokers,$kafka_topics,$name_node即可;

这里的配置为standalone每小时执行一次,每次执行时根据数据中的time字段来格式化为时间分区进行存放到hdfs上的指定目录;

2 启动

export GOBBLIN_JOB_CONFIG_DIR=/opt/gobblin/gobblin-dist/job_conf
export GOBBLIN_WORK_DIR=/opt/gobblin/gobblin-dist/work_dir bin/gobblin-standalone.sh start

3 定制化

1)希望按照当前时间(而不是数据中的时间)进行时间分区

package gobblin.writer.partitioner;

import gobblin.configuration.State;

public class DefaultTimeBasedWriterPartitioner extends TimeBasedWriterPartitioner {
public DefaultTimeBasedWriterPartitioner(State state, int numBranches, int branchId) {
super(state, numBranches, branchId);
}
public long getRecordTimestamp(Object record) {
return System.currentTimeMillis();
}
}

配置:

writer.partitioner.class=gobblin.writer.partitioner.DefaultTimeBasedWriterPartitioner

2)只保存json数据,并且添加换行

package gobblin.source.extractor.extract.kafka;

import gobblin.configuration.WorkUnitState;
import gobblin.source.extractor.Extractor; import java.io.IOException; public class JsonKafkaSimpleSource extends KafkaSimpleSource {
public JsonKafkaSimpleSource() {}
@Override
public Extractor<String, byte[]> getExtractor(WorkUnitState state) throws IOException {
return new JsonKafkaSimpleExtractor(state);
}
}
package gobblin.source.extractor.extract.kafka;

import gobblin.configuration.WorkUnitState;
import gobblin.kafka.client.ByteArrayBasedKafkaRecord; import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date; public class JsonKafkaSimpleExtractor extends KafkaSimpleExtractor {
public JsonKafkaSimpleExtractor(WorkUnitState state) {
super(state);
} @Override
protected byte[] decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
byte[] resultBytes = kafkaConsumerRecord.getMessageBytes();
String result = new String(resultBytes, "UTF-8");
if (result != null && result.length() > 2 && result.charAt(0) == '{' && result.charAt(result.length() - 1) == '}')
return (result + "\n").getBytes("UTF-8");
else {
System.out.println("[" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "]found invalid json : " + result);
return "".getBytes();
}
}
}

配置:

source.class=gobblin.source.extractor.extract.kafka.JsonKafkaSimpleSource

4 docker image

https://hub.docker.com/r/gobblin/gobblin-standalone

docker run -d gobblin/gobblin-standalone:ubuntu-gobblin-0.10.0

参考:

https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/

https://gobblin.readthedocs.io/en/latest/user-guide/Configuration-Properties-Glossary/

最新文章

  1. 2016&quot;百度之星&quot; - 资格赛(Astar Round1)
  2. Delphi常用关键字用法详解
  3. Fatal error: Allowed memory size of 524288000 bytes exhausted (tried to allocate 64 bytes) in D
  4. javascript基础程序(算出一个数的平方值、算出一个数的阶乘、输出!- !- !- !- !- -! -! -! -! -! 、函数三个数中的最大数)
  5. WPF学习之路(十二)控件(Content控件)
  6. Course Schedule I &amp; II
  7. 【转载】C++异常机制的学习
  8. 【Java基础】Java异常的一些总结
  9. MD5加密详解
  10. BZOJ2749: [HAOI2012]外星人
  11. [git] github 使用简单记录
  12. 学习Redux之分析Redux核心代码分析
  13. Go 自带的 http/server.go 的连接解析 与 如何结合 master-worker 并发模式,提高单机并发能力
  14. 设计模式系列6:适配器模式(Adapter Pattern)
  15. CocosCreator的ToggleGroup组件使用
  16. Maven 下添加oracle11g的包 报Missing artifact com.oracle:ojdbc6:jar:11.2.0.1.0
  17. jsp中相对路劲
  18. centos6 安装python2.7 并做软件兼容处理 及 MySQLdb模块安装
  19. Span&lt;T&gt;和ValueTuple&lt;T&gt;性能是.Net Core非常关键的特性
  20. python下载youtube视频

热门文章

  1. 移动端页面字体——rem的使用
  2. instanceof 实现
  3. ubuntu16.04下如何安装dtc工具?
  4. Kbengine游戏引擎-【2】kbengine引擎服务端目录结构分析
  5. nginx开启目录浏览,解决中文乱码问题
  6. 原生vue实现表格的编辑,包括单元格合并,拆分,删除行/列, 添加行/列
  7. idea设置包的导入和提示重复代码下波浪线
  8. webbench接口并发测试
  9. 图文详解 : 什么是版本控制?Eclipse配置SVN和IDEA配置GIT教程
  10. 配置nova服务使用ceph作为后端存储