Code Walkthroughs DataStream API
上级:https://www.cnblogs.com/hackerxiaoyon/p/12747387.html
DataStream API
DataStreamApi 提供了健壮,有状态的流应用,提供了细力度的控制基于状态和时间,事件驱动系统中可以高级实现。通过DataStreamApi 一步一步的向导我们可以学习到一个有状态的流应用。
What Are You Building ?
通过信用卡交易监控例子,用一个简单的规则set,让我们可以看到flink是如何实现实时业务操作的。
Prerequisites 前提
这段代码的演示需要你具备了java 或者 scala的知识,但是你如果你有其他的编程语言能力也是可以的。
Help,I’m Stuck! 寻求帮助
如果你卡住了,可以 求助 https://flink.apache.org/gettinghelp.html 。https://flink.apache.org/community.html#mailing-lists 用户邮件列表是一个活跃快速提供帮助的地方。
How to Follow Along 如何跟进
有一些电脑配置:
Java 8 或者 11 这里官网没有说别的
Maven
然后是构建项目:java 和 scala 各一份
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-java \
-DarchetypeVersion=1.10.0 \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-scala \
-DarchetypeVersion=1.10.0 \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
之后代码生成了,我们直接看代码。那么如何跟进的演示代码就这样结束了。
FraudDetectionJob.java
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.walkthrough.common.sink.AlertSink;import org.apache.flink.walkthrough.common.entity.Alert;import org.apache.flink.walkthrough.common.entity.Transaction;import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}}
FraudDetector.java
package spendreport;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import org.apache.flink.walkthrough.common.entity.Alert;import org.apache.flink.walkthrough.common.entity.Transaction;
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}}
FraudDetectionJob.scala
package spendreport
import org.apache.flink.streaming.api.scala._import org.apache.flink.walkthrough.common.sink.AlertSinkimport org.apache.flink.walkthrough.common.entity.Alertimport org.apache.flink.walkthrough.common.entity.Transaction
最新文章
- C# 通过Selecnuim WebDriver操作非IE浏览器
- div+css中clear用法
- Leetcode Substring with Concatenation of All Words
- 关于Entity Framework中的Attached报错的完美解决方案
- python装饰器初探
- Discuz! X3.1去除内置门户导航/portal.php尾巴的方法
- start stack
- jacksonall的使用,解析json
- SQL数据库的一些操作
- windows已激活,但提示:windows 7 内部版本7601 此windows副本不是正版
- python 10道面试陷阱题目
- Perl的浅拷贝和深度拷贝
- 【工匠大道】升级Mac下的svn,解决命令行不能使用svn的问题
- 使用Jupyter Notebook编写技术文档
- 配置Activiti Explorer使用MYSQL
- 黑苹果 之 神舟战神Z7M-SL7D2
- 深入HBase架构解析(二)【转】
- 【玩转Golang】 自定义json序列化对象时,非法字符错误原因
- Mac百度云盘不限速操作步骤
- 关于java泛型的使用方式。。。。
热门文章
- 关于自己配置有关webpack.config.js和vue项目搭建相关步骤
- Rocket - debug - TLDebugModuleInnerAsync
- Spring Boot笔记(七) springboot 集成 JavaMail 实现邮箱认证
- C# winform 学习(一)
- Java实现 LeetCode 650 只有两个键的键盘(递归 || 数学)
- Java实现 蓝桥杯 算法提高 三角形
- 第四届蓝桥杯JavaC组国(决)赛真题
- Java实现 蓝桥杯 算法训练 数据交换
- Linux 文件特殊权限-SetGID
- (九)不安全的HTTP方法