StreamSets使用指南
StreamSets使用指南
最近在调研Streamsets,照猫画虎做了几个最简单的Demo鉴于网络上相关资料非常少,做个记录。
1.简介
Streamsets是一款大数据实时采集和ETL工具,可以实现不写一行代码完成数据的采集和流转。通过拖拽式的可视化界面,实现数据管道(Pipelines)的设计和定时任务调度。最大的特点有:
- 可视化界面操作,不写代码完成数据的采集和流转
- 内置监控,可是实时查看数据流传输的基本信息和数据的质量
- 强大的整合力,对现有常用组件全力支持,包括50种数据源、44种数据操作、46种目的地。
对于Streamsets来说,最重要的概念就是数据源(Origins)、操作(Processors)、目的地(Destinations)。创建一个Pipelines管道配置也基本是这三个方面。
常见的Origins有Kafka、HTTP、UDP、JDBC、HDFS等;Processors可以实现对每个字段的过滤、更改、编码、聚合等操作;Destinations跟Origins差不多,可以写入Kafka、Flume、JDBC、HDFS、Redis等。
2.基本安装和基本操作
查看https://cloud.tencent.com/developer/article/1078852
目前网上的中文资料中,也就这个专题介绍的比较详细,几个常用组件的配置介绍的还可以,我也是按照这个入门的。
3.数据源
kafka单主题单进程消费者
- 基本使用,配置broker、zookeeper、consumer group、topic
- kafka的properties可以在kafka configuration设置
- offset管理,offset信息根据kafka版本保存在zookeeper或kafka里
- 如果没保存offset,默认方式是接受通道启动后的数据,如果需要from begining,设置conf为auto.offset.reset:earliest
- 如果之前保存了offset,从保存的offset+1开始处理
- 数据类型包括avro、json、log、text、delimited等
- Produce Single Record 如果勾选,json字符串包含多个对象也只生成一条记录;如果没勾选,会生成多条记录
kafka多主题多线程消费者
- 基本使用,配置broker、consumer group、topic list、num of threads
Hadoop FS Standalone
- 基本使用,配置Hadoop URI、File Directory、num of threads、File name parttern、File name parttern mode、read order等
- 配置时要明确指明文件路径、文件名模式、文件名匹配模式、读取顺序
- 文件名匹配模式有两种,glob mode(形如*.json)和正则表达式
- 文件名模式,例如.json、.txt等
- 读取顺序
- Last Modified Timestamp,按照文件的最后修改时间的升序次序读取数据,而且是嵌套读取子目录的内容
- Lexicographically Ascending File Names,按照文件名的字母升序读取数据
- 多线程处理,一个线程建立一个pipeline实例,并行处理数据
- 可以指定处理的第一份文件名称,那么之前的文件就不会处理了
- Buffer Limit指定每条记录的大小,如果记录大于这个限制,会按该条信息error处理
JDBC
- 首先安装jdbc driver
- 基本使用,配置connection、SQL query、username、password、jdbc driver
- 可以通过在where语句中设置指定列(offset column)的指定值(offset value)开始读取数据,比如从主键id大于10000的记录开始读取
- 两种查询模式
- 增量 从设置好的initial offset value开始读,会按一定的时间批次对append的数据自动更新,适用于append_only的场景,需要写明where和order by
select * from <tableName> where <offset column> > ${OFFSET} order by <offset column>
- 全量 会在时间间隔后重复执行query,会捕捉所有行的变化,不适合大规模的表,可以选择offset column和offset value
select * from <tableName>
4.操作
- Field Master 给指定字符串字段加密,可以选择Mast的类型
- Stream Selector 对数据流做分流处理
- Field Merger 支持数据合并,但只支持map、List结构的数据
- Aggregator 在一段时间间隔内做聚合指标,比如sum、avg、max、min等
- Delay 延迟一段时间
- Field Flattener 拆分map、List结构的数据成没有嵌套的数据结构
- Field Hasher 对指定字段进行encode,策略
5.目的地
基本配置跟数据源差不多。
6.一个简单的Demo
实现kafka读入数据,Stream Selector对数据流做分流,再各自写入到kafka不同的topic中。
整体架构
Streamsets1.PNG配置数据源
配置kafka consumer的基本信息,包括broker、zk、topic、consumer group
Streamsets2.jpg配置Data格式,这里选择的是json格式
Streamsets3.PNG
配置操作
配置Stream Selector,数据根据Version字段分为两类
Streamsets4.PNG
配置目的地
继续配置kafka producer
Streamsets5.jpg
7.参考内容 原文地址 : https://www.jianshu.com/p/870e1bb52da4
最新文章
- 安装Arch Linux
- MVC5 属性路由
- java基础知识回顾之---java String final类普通方法的应用之“两个字符串中最大相同的子串”
- C#中IDisposable学习
- VC++函数(win32_exe)
- MIT-scheme安装
- ASM上的备份集如何转移到文件系统中
- 基于Struts2,Spring4,Hibernate4框架的系统架构设计与示例系统实现
- 4.28Linux(6)
- 【JVM】JVM内存结构 VS Java内存模型 VS Java对象模型
- Springboot@Configuration和@Bean详解
- Linux搭建 SVN 服务器
- Centos6.5 防火墙开放端口
- MySQL数据类型--与MySQL零距离接触2-14MySQL默认约束
- 解决At least one JAR was scanned for TLDs yet contained no TLDs. Enable debug logging for this logger for a complete list of JARs that were scanned but no TLDs were found in them. Skipping unneeded JARs
- day01 计算机的基础知识
- Python_变量命名
- HDU 1079 Calendar Game (博弈或暴搜)
- 题目1447:最短路(Floyd算法)
- Codeforces805 A. Fake NP 2017-05-05 08:30 327人阅读 评论(0) 收藏
热门文章
- BurpSuite 扩展开发[1]-API与HelloWold
- Nginx比SRS做得好的地方
- Git 上传本地项目到远程仓库 (工具篇)
- Java面试题:String、StringBuilder、StringBuffer区别
- redis系列之3----redis高级应用(主从、事务与锁、持久化)
- [计算机视觉]从零开始构建一个微软how-old.net服务/面部属性识别
- 带";反悔";的贪心-超市
- spring 事务管理配置
- mybatis控制台打印执行的sql语句
- 王颖奇 20171010129《面向对象程序设计(java)》第十四周学习总结