ballerina 的streams 使用的是siddhi complex event processing 引擎处理,可以包含的语法有
projection filtering windows join pattern

简单例子

  • 参考代码
import ballerina/io;
import ballerina/runtime;
type StatusCount {
string status;
int totalCount;
}; type Teacher {
string name;
int age;
string status;
string batch;
string school;
};
function testAggregationQuery(
stream<StatusCount> filteredStatusCountStream,
stream<Teacher> teacherStream) {
forever { // cep 处理并发布结果
from teacherStream where age > 18 window lengthBatch(3)
select status, count(status) as totalCount
group by status
having totalCount > 1
=> (StatusCount[] status) {
filteredStatusCountStream.publish(status);
}
}
} function main(string… args) {
stream<StatusCount> filteredStatusCountStream; stream<Teacher> teacherStream; testAggregationQuery(filteredStatusCountStream, teacherStream); Teacher t1 = {name: "Sam", age: 25, status: "single",
batch: "LK2014", school: "Hampden High School"};
Teacher t2 = {name: "Jordan", age: 33, status: "single",
batch: "LK1998", school: "Columbia High School"};
Teacher t3 = {name: "Morgan", age: 45, status: "married",
batch: "LK1988", school: "Central High School"}; filteredStatusCountStream.subscribe(printStatusCount); // 生产数据
teacherStream.publish(t1);
teacherStream.publish(t2);
teacherStream.publish(t3); runtime:sleep(1000);
}
function printStatusCount(StatusCount s) {
io:println("Event received; status: " + s.status +
", total occurrences: " + s.totalCount);
}
  • 输出结果
Event received; status: single, total occurrences: 2

stream join

  • 参考代码

代码就是进行通过http 获取的到数据进行流化,同时进行join 并对于符合业务的数据进行报警

import ballerina/http;
import ballerina/mime;
import ballerina/io;
type ProductMaterial {
string name;
float amount;
};
type MaterialUsage {
string name;
float totalRawMaterial;
float totalConsumption;
};
stream<ProductMaterial> rawMaterialStream;
stream<ProductMaterial> productionInputStream;
stream<MaterialUsage> materialUsageStream;
function initRealtimeProductionAlert() {
materialUsageStream.subscribe(printMaterialUsageAlert);
forever {
from productionInputStream window time(10000) as p
join rawMaterialStream window time(10000) as r
on r.name == p.name
select r.name, sum(r.amount) as totalRawMaterial,
sum(p.amount) as totalConsumption
group by r.name
having ((totalRawMaterial - totalConsumption) * 100.0 /
totalRawMaterial) > 5
=> (MaterialUsage[] materialUsages) {
materialUsageStream.publish(materialUsages);
}
}
}
function printMaterialUsageAlert(MaterialUsage materialUsage) {
float materialUsageDifference = (materialUsage.totalRawMaterial -
materialUsage.totalConsumption) * 100.0 /
(materialUsage.totalRawMaterial); io:println("ALERT!! : Material usage is higher than the expected"
+ " limit for material : " + materialUsage.name +
" , usage difference (%) : " + materialUsageDifference);
}
endpoint http:Listener productMaterialListener {
port: 9090
};
@http:ServiceConfig {
basePath: "/"
}
service productMaterialService bind productMaterialListener {
future ftr = start initRealtimeProductionAlert();
@http:ResourceConfig {
methods: ["POST"],
path: "/rawmaterial"
}
rawmaterialrequests(endpoint outboundEP, http:Request req) {
var jsonMsg = req.getJsonPayload();
io:println(jsonMsg);
match jsonMsg {
json msg => {
var productMaterial = check <ProductMaterial>msg;
rawMaterialStream.publish(productMaterial);
http:Response res = new;
res.setJsonPayload({"message": "Raw material request"
+ " successfully received"});
_ = outboundEP->respond(res);
}
error err => {
http:Response res = new;
res.statusCode = 500;
res.setPayload(err.message);
_ = outboundEP->respond(res);
}
}
}
@http:ResourceConfig {
methods: ["POST"],
path: "/productionmaterial"
}
productionmaterialrequests(endpoint outboundEP,http:Request req) {
var jsonMsg = req.getJsonPayload();
match jsonMsg {
json msg => {
var productMaterial = check <ProductMaterial>msg;
productionInputStream.publish(productMaterial);
http:Response res = new;
res.setJsonPayload({"message": "Production input " +
"request successfully received"});
_ = outboundEP->respond(res);
}
error err => {
http:Response res = new;
res.statusCode = 500;
res.setPayload(err.message);
_ = outboundEP->respond(res);
}
}
}
}

用途

对于事件处理的应用特别方便,比如日志处理,以及响应式系统开发,其中的siddhi 也是一个很不错的工具

参考资料

https://ballerina.io/learn/by-example/hello-world-streams.html

 
 
 
 

最新文章

  1. 游戏服务器菜鸟之C#初探二游戏服务
  2. 使用ABP EntityFramework连接MySQL数据库
  3. JavaScript中,提取子字符串方法:Slice、Substring、Substr的比较。
  4. Razor练习2
  5. 修改php执行用户,并使其拥有root权限
  6. 生产订单修改删除组件BDC
  7. 在PHPstorm编辑器中配置git环境
  8. 【转】用capability 特征加强Linux系统安全
  9. Js冒泡事件和捕获事件
  10. 51单片机C语言学习笔记6:51单片机C语言头文件及其使用
  11. vuejs数据双向绑定原理(get &amp; set)
  12. 喂,前端,你应该知道的chrome插件
  13. Ajax异步信息抓取方式
  14. Tensorflow计算正确率、精确率、召回率
  15. 新年第一个目标一张表盘串讲所有canves的知识点
  16. 查看apk文件包名的一些方法
  17. day 55 前端
  18. AutoCompleteTextView搭配Poi搜索实现多项选择
  19. OLAP和OLTP基础知识
  20. Golang 序列化方式及对比

热门文章

  1. vim编程技巧
  2. Python实现CSV数据的读取--两种方法实现
  3. 【Demo】jQuery 轮播图简单动画效果
  4. Struts2的Convention插件
  5. ubuntu中python2与python3的默认启动切换
  6. echarts在vue中使用的感悟
  7. windows 2008 server R2 服务器docker安装
  8. 一道简单的JavaScript面试题
  9. APUE学习笔记——5缓冲Buffering、流、文件对象
  10. Java虚拟机访问读写其他进程的数据--RandomAccessFile