背景

在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批

EventTime计算原理

图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。

案例使用

我们的方案是将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。

下图是一个flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的一个执行过程。

US调度系统轮询逻辑

如何解决乱序到来问题,  我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。

Maven pom 依赖

针对此功能特性的Hudi依赖版本如下


<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.13-bundle</artifactId>
<version>0.12.1</version>
</dependency>
</dependencies> <dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.15-bundle</artifactId>
<version>0.12.1</version>
</dependency>
</dependencies>

如何设置EventTime

能够解析的字段类型及格式如下:

类型 示例
TIMESTAMP(3) 2012-12-12T12:12:12
TIMESTAMP(3) 2012-12-12 12:12:12
DATE 2012-12-12
BIGINT 100L
INT 100

Flink API

用户只需要设置flink conf指定时间字段作为时间推进字段

Map<String, String> options = new HashMap<>();
// 这里省略其他表字段
options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("id int not null")
.column("ts string")
.column("dt string")
.pk("id")
.partition("dt")
.options(options);

Flink SQL

通过设置hoodie.payload.event.time.field指定需要计算的eventtime的字段

create table hudi_cow_01(\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" +
" ts timestamp(3),\n" +
" PRIMARY KEY(uuid) NOT ENFORCED\n" +
")\n" +
" with (\n" +
// 这里省略其他参数
" 'hoodie.payload.event.time.field' = 'ts'\n"
")

如何读取EventTime

Spark SQL

call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');

Java API

代码获取片段如下

Option<HoodieCommitMetadata> commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
if (!commitMetadataOption.isPresent()) {
throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
}
// 获取到当前版本的时间进度
String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
System.out.println("current eventTime: " + eventTime);

输出结果如下

current eventTime: 1667971364742

最新文章

  1. RedHat6.2搭建FTP服务器
  2. Effective C++ -----条款48:认识template元编程
  3. 扩展KMP --- HDU 3613 Best Reward
  4. Educational Codeforces Round 16---部分题解
  5. MapReduce 重要组件——Recordreader组件
  6. XML2_XML的节点和元素
  7. Scala内部类
  8. Jenkins安装与配置
  9. 研究大华3G设备接入自主视频开发平台
  10. 如何写一个通用的README规范
  11. ImageMagick: win7 | win8 &amp; uac (用户帐户控制) 注册表的一些事
  12. SparkStreaming:关于checkpoint的弊端
  13. python - wmi模块学习(windwos硬件信息获取)
  14. Mac下如何设置Eclipse默认浏览器为chrome
  15. C和C指针小记(一)-字符输入,函数,ASCII扩展表
  16. sort-归并排序
  17. ECR是什么意思
  18. tms web core 与 kbmmw 第一次亲密接触
  19. poj 3278 Catch That Cow(bfs+队列)
  20. AngularJS 模块及provide

热门文章

  1. 6个tips缓解第三方访问风险
  2. C温故补缺(九):字节对齐与排序
  3. 关于pip3 ImportError: cannot import name 'main'的报错的原因及解决办法
  4. MySQL数据库下载以及启动软件的详细步骤
  5. JS基础笔记合集(1-3)
  6. PostgreSQL常用操作合辑:时间日期、系统函数、正则表达式、库表导入导出、元数据查询、自定义函数、常用案例
  7. 4.2:Scrapy爬虫
  8. JavaEE课程复习1--数据库相关操作
  9. Docker容器入门到精通
  10. 微软出品自动化神器【Playwright+Java】系列(六) 之 字符输入、单元素键盘事件操作、上传文件、聚焦、拖拽、悬浮操作