最近flink真是风生水起,但是浪院长看来这不过是阿里错过了创造spark影响力之后,想要在flink领域创建绝对的影响力。但是,不可否认flink在实时领域确实目前来看独树一帜,当然也有它不适合的地方,比如今天要推荐的第一个基于flink开发的项目,流表和维表的join,还有很多地方还是用spark streaming更合适,但是整体的流处理而言flink确实很优秀,虽然目前测出了一些bug,后面会发文说明一下flink开发时候常见的坑和已有的自身bug。接下来转入正题。

flinkStreamSQL

熟悉flink的应该都了解,flink支持流表之间的join,但到1.6为止都不支持流表和维表的join。浪尖最近,也在开发流平台,需要到flink流表和维表的join。那么针对这个大家第一印象,可以写个算子去实现,比如map等。但是浪尖这里开发的流平台不是说自己写api,而是用户通过sql去实现创建source,sink,udf,sql等,这个时候要进行维表join,大家可能是想到了udf。是的对于只有一个维表的情况下使用udf比较方便,但是多个维表,相对就麻烦很多了。

而基于flink开发的flinkStreamSQL主要是实现了flink 流表和维表的join,其主要功能如下:

自定义create table 语法(包括源表,输出表,维表)

自定义create function 语法

实现了流与维表的join

浪尖花了个把小时看了一下源码,源码思路很清晰,主要是两个步骤:

用flink api实现维表的功能: 要实现维表功能就要用到 flink Aysnc I/O 这个功能,是由阿里巴巴贡献给apache flink的。关于异步IO的介绍,可以参考:https://yq.aliyun.com/articles/457385

解析流与维表join的sql语法转化成底层的flinkAPI

源码下载地址:

https://github.com/DTStack/flinkStreamSQL

为了方便大家阅读,这里浪尖也把维表转化的过程主要函数贴出来吧:

主函数
Main#main

SQL解析
SqlTree sqlTree = SqlParser.parseSql(sql)
拆读
SqlParser#parseSql
TableInfoParserFactory#parseWithTableType

注册表
registerTable

存在维表的话,维表转换与逻辑sql执行
SideSqlExec#exec
也即是
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);

不存在维表的话
tableEnv.sqlUpdate(result.getExecSql());

SqlSession sqlSession=null;
List<User> userList=new ArrayList<User>(); try{
    sqlSession=MyBatisUtil.createSqlSession();
    User user=new User(www.furggw.com);
    user.setUserName("赵");
    user.setUserRole(www.mingrenf178.com);
    userList=sqlSession.getMapper(UserMapper.class).getUserListByUser(user);
}catch (Exception ex){
    ex.printStackTrace();
}finally {
    MyBatisUtil.closeSqlSession(sqlSession);
}
for (User user:
        userList) {
    System.out.println(user.getUserName()+"\t"+user.getUserRole());
}

使用Map入参编写接口

List<User> getUserListByMap(Map<www.ysyl157.com String,String> userMap);

编写UserMapper.xml文件

<select id="getUserListByMap" resultType="User" parameterType=www.mcyllpt.com"Map">
    SELECT * FROM USER www.meiwanyule.cn WHERE userName LIKE concat('%',#{userName},'%')
    and userRole=#{userRole}

FlinkX

FlinkX主要是用来做数据同步的,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行

最新文章

  1. C++ std::deque
  2. (学)解决诡异的 Exception type: SocketException 127.0.0.1:80
  3. poj 1182:食物链(种类并查集,食物链问题)
  4. nginx配置文件中的location中文详解
  5. sidePagination: &quot;server&quot;和responseHandler: responseHandler
  6. Python标准库:迭代器Itertools
  7. java中的单例模式与doublecheck
  8. Spring Data Elasticsearch
  9. CDN云主机与传统虚拟主机功能对比
  10. java中可以出现的中文乱码的集中解决
  11. 基于python第三方requests 模块的HTTP请求类
  12. 【linux】linux下网络的配置
  13. Android学习——NDK交叉编译
  14. Spring Session event事件分析
  15. A - K进制下的大数
  16. Java知多少(70)面向字节流的应用
  17. mysql 8.0 错误The server requested authentication method unknown to the client
  18. vue 给v-html中的元素设置样式
  19. Android拖动和缩放
  20. Beta冲刺——day1

热门文章

  1. ls参数
  2. 洛谷 P1334 瑞瑞的木板==P2664 【题目待添加】
  3. Android计算器布局
  4. spfa模板+讲解
  5. Codeforces Round #274 (Div. 2)-C. Exams
  6. Instance Methods are Curried Functions in Swift
  7. React初识整理(三)--受控组件解决方法
  8. 关于HTML设置访问密码。
  9. LeetCode 最大子序和
  10. 七:MYSQL之常用操作符