上级:https://www.cnblogs.com/hackerxiaoyon/p/12747387.html

DataStream API

  DataStreamApi 提供了健壮,有状态的流应用,提供了细力度的控制基于状态和时间,事件驱动系统中可以高级实现。通过DataStreamApi 一步一步的向导我们可以学习到一个有状态的流应用。

What Are You Building ?

  通过信用卡交易监控例子,用一个简单的规则set,让我们可以看到flink是如何实现实时业务操作的。

Prerequisites 前提

  这段代码的演示需要你具备了java 或者 scala的知识,但是你如果你有其他的编程语言能力也是可以的。

Help,I’m Stuck! 寻求帮助

  如果你卡住了,可以 求助 https://flink.apache.org/gettinghelp.html 。https://flink.apache.org/community.html#mailing-lists 用户邮件列表是一个活跃快速提供帮助的地方。

How to Follow Along 如何跟进

  有一些电脑配置:

    Java 8 或者 11 这里官网没有说别的

    Maven

  然后是构建项目:java 和 scala 各一份

$ mvn archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-walkthrough-datastream-java \

-DarchetypeVersion=1.10.0 \

-DgroupId=frauddetection \

-DartifactId=frauddetection \

-Dversion=0.1 \

-Dpackage=spendreport \

-DinteractiveMode=false

$ mvn archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-walkthrough-datastream-scala \

-DarchetypeVersion=1.10.0 \

-DgroupId=frauddetection \

-DartifactId=frauddetection \

-Dversion=0.1 \

-Dpackage=spendreport \

-DinteractiveMode=false

  之后代码生成了,我们直接看代码。那么如何跟进的演示代码就这样结束了。

FraudDetectionJob.java

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.walkthrough.common.sink.AlertSink;import org.apache.flink.walkthrough.common.entity.Alert;import org.apache.flink.walkthrough.common.entity.Transaction;import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Transaction> transactions = env

.addSource(new TransactionSource())

.name("transactions");

DataStream<Alert> alerts = transactions

.keyBy(Transaction::getAccountId)

.process(new FraudDetector())

.name("fraud-detector");

alerts

.addSink(new AlertSink())

.name("send-alerts");

env.execute("Fraud Detection");

}}

FraudDetector.java

package spendreport;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import org.apache.flink.walkthrough.common.entity.Alert;import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

private static final long serialVersionUID = 1L;

private static final double SMALL_AMOUNT = 1.00;

private static final double LARGE_AMOUNT = 500.00;

private static final long ONE_MINUTE = 60 * 1000;

@Override

public void processElement(

Transaction transaction,

Context context,

Collector<Alert> collector) throws Exception {

Alert alert = new Alert();

alert.setId(transaction.getAccountId());

collector.collect(alert);

}}

FraudDetectionJob.scala

package spendreport

import org.apache.flink.streaming.api.scala._import org.apache.flink.walkthrough.common.sink.AlertSinkimport org.apache.flink.walkthrough.common.entity.Alertimport org.apache.flink.walkthrough.common.entity.Transaction

最新文章

  1. C# 通过Selecnuim WebDriver操作非IE浏览器
  2. div+css中clear用法
  3. Leetcode Substring with Concatenation of All Words
  4. 关于Entity Framework中的Attached报错的完美解决方案
  5. python装饰器初探
  6. Discuz! X3.1去除内置门户导航/portal.php尾巴的方法
  7. start stack
  8. jacksonall的使用,解析json
  9. SQL数据库的一些操作
  10. windows已激活,但提示:windows 7 内部版本7601 此windows副本不是正版
  11. python 10道面试陷阱题目
  12. Perl的浅拷贝和深度拷贝
  13. 【工匠大道】升级Mac下的svn,解决命令行不能使用svn的问题
  14. 使用Jupyter Notebook编写技术文档
  15. 配置Activiti Explorer使用MYSQL
  16. 黑苹果 之 神舟战神Z7M-SL7D2
  17. 深入HBase架构解析(二)【转】
  18. 【玩转Golang】 自定义json序列化对象时,非法字符错误原因
  19. Mac百度云盘不限速操作步骤
  20. 关于java泛型的使用方式。。。。

热门文章

  1. 关于自己配置有关webpack.config.js和vue项目搭建相关步骤
  2. Rocket - debug - TLDebugModuleInnerAsync
  3. Spring Boot笔记(七) springboot 集成 JavaMail 实现邮箱认证
  4. C# winform 学习(一)
  5. Java实现 LeetCode 650 只有两个键的键盘(递归 || 数学)
  6. Java实现 蓝桥杯 算法提高 三角形
  7. 第四届蓝桥杯JavaC组国(决)赛真题
  8. Java实现 蓝桥杯 算法训练 数据交换
  9. Linux 文件特殊权限-SetGID
  10. (九)不安全的HTTP方法