FlinkX的安装与简单使用

FlinkX的安装

安装unzip:yum install unzip

1、上传并解压

unzip flinkx-1.10.zip -d /usr/local/soft/

2、配置环境变量

3、给bin/flinkx这个文件加上执行权限

chmod a+x flinkx

4、修改配置文件,设置运行端口

vim flinkconf/flink-conf.yaml
## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888

FlinkX的简单使用

MySQLToHDFS

  • 配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '理科二班'",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "hdfs://master:9000/data/flinkx/student",
"defaultFS": "hdfs://master:9000",
"column": [
{
"name": "col1",
"index": 0,
"type": "string"
},
{
"name": "col2",
"index": 1,
"type": "string"
},
{
"name": "col3",
"index": 2,
"type": "string"
},
{
"name": "col4",
"index": 3,
"type": "string"
},
{
"name": "col5",
"index": 4,
"type": "string"
},
{
"name": "col6",
"index": 5,
"type": "string"
}
],
"fieldDelimiter": ",",
"fileType": "text",
"writeMode": "overwrite"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
  • 启动任务
    flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHDFS.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
  • 监听日志

flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件

tail -f nohup.out
  • 通过web界面查看任务运行情况
http://master:8888

MySQLToHive

  • 配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '文科二班'",
"splitPk": "id",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hivewriter",
"parameter": {
"jdbcUrl": "jdbc:hive2://master:10000/testflinkx",
"username": "",
"password": "",
"fileType": "text",
"fieldDelimiter": ",",
"writeMode": "overwrite",
"compress": "",
"charsetName": "UTF-8",
"maxFileSize": 1073741824,
"tablesColumn": "{\"student\":[{\"key\":\"id\",\"type\":\"string\"},{\"key\":\"name\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"string\"}]}",
"defaultFS": "hdfs://master:9000"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 3
}
}
}
}
  • 在hive中创建testflinkx数据库,并创建student分区表
create database testflinkx;
use testflinkx;
CREATE TABLE `student`(
`id` string,
`name` string,
`age` string)
PARTITIONED BY (
`pt` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
  • 启动hiveserver2
# 第一种方式:
hiveserver2
# 第二种方式:
hive --service hiveserver2
  • 启动任务
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
  • 查看日志及运行情况同上

MySQLToHBase

  • 配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"score"
]
}
],
"column": [
"*"
],
"customSql": "",
"splitPk": "student_id",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hbasewriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://master:9000/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master,node1,node2",
"zookeeper.znode.parent": "/hbase"
},
"table": "testFlinkx",
"rowkeyColumn": "$(cf1:student_id)_$(cf1:course_id)",
"column": [
{
"name": "cf1:student_id",
"type": "string"
},
{
"name": "cf1:course_id",
"type": "string"
},
{
"name": "cf1:score",
"type": "string"
}
]
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 3
}
}
}
}
  • 启动hbase 并创建testflinkx表
create 'testFlinkx','cf1'
  • 启动任务
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHBase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
  • 查看日志及运行情况同上

MySQLToMySQL

  • 配置文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
],
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?useSSL=false"
],
"table": [
"student"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student?useSSL=false",
"table": [
"student2"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}

最新文章

  1. 利用setns()将进程加入一个新的network namespace
  2. 【Unity3D】利用Shader以及更改Mesh实现2D游戏的动态阴影效果
  3. JUnit4参数的使用
  4. iOS关于CoreAnimation动画知识总结
  5. vs2013 遇到的web性能记录器无法使用问题
  6. CSS控制XML与通过js解析xml然后通过html显示xml中的数据
  7. 2015ACM/ICPC亚洲区长春站 J hdu 5536 Chip Factory
  8. mysql源码重启
  9. UserMapper.selectByPrimaryKey-Inline 报错的解决办法
  10. 通过 itms-services 协议,发布或者分享 iOS 应用程序
  11. 【多路复用】I/O多路复用
  12. pmf,cpmf,pdf,cdf,iid的解释
  13. POJ 1258 Agri-Net(Prim)
  14. linux命令分块总结---多操作才是真理
  15. SQLCODE和SQLERRM .
  16. Windows技巧
  17. asp.net core 2.0发布到IIS流程及报错解决方案
  18. tensorflow中使用指定的GPU及GPU显存 CUDA_VISIBLE_DEVICES
  19. java struts2入门学习--基于xml文件的声明式验证
  20. 如何在Oracle中一次执行多条sql语句 (.net C#)

热门文章

  1. Vue.js之计算属性(computed)、属性监听(watch)与方法选项(methods)
  2. 「NOI十联测」黑暗
  3. Spring学习一: Ioc容器
  4. Mysql Json函数创建 (二)
  5. hr虚线
  6. Java内存分析简单介绍
  7. 猪齿鱼平台常用前端css实现方案
  8. java+selenium自动化脚本编写
  9. Ceres 四重奏 之 入门简介
  10. Python基础(Day1)