由于需要测试阿里云Datahub功能,因此测了一下Datahub的一些功能

DATAHUB:
简介:
阿里云的流式数据(streaming)处理平台
对流式数据的发布(publish)订阅(subscribe)和分发功能
 
主要功能:
采集实时数据,如移动设备,传感器,网站服务等
使用脚本或流计算引擎来处理写入datahub的数据
最后生成实时图表/报警信息等
 
术语:
project:项目,包含多个topic
topic:可以表示一种类型的流,订阅和发布单位
shard:topic的并发通道
record:用户数据与datahub端交互的基本单位
recordtype:topic的数据类型,支持tuple和blob
DataConnect:把datahub中的流式数据同步到其他云产品中的功能,现在支持odps/oss/es/mysql
 
操作过程
首先在新建project,注意管理员账号
注意授权信息
参考<授权信息管理>
<https://help.aliyun.com/document_detail/47442.html?spm=a2c4g.11186623.6.544.371f1a12NmNa1w>
 
然后进入后,创建topic
schema是指column,可以选择多种数据类型
 
新建DataConnect,来设置下游数据
 
设置maxcompute连接
 
使用python来插入数据到topic
import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType access_id =
access_key =
endpoint = 'https://dh-cn-shanghai.aliyuncs.com'
dh = DataHub(access_id, access_key, endpoint) ##写入
project_name=
topic_name =
try:
# block等待所有shard状态ready
dh.wait_shards_ready(project_name, topic_name)
print("shards all ready!!!")
print("=======================================\n\n")
topic_result = dh.get_topic(project_name, topic_name)
print(topic_result)
if topic_result.record_type != RecordType.TUPLE:
print("topic type illegal!")
sys.exit(-1)
print("topic type normal")
print("=======================================\n\n")
record_schema = topic_result.record_schema
records0 = []
record0 = TupleRecord(schema=record_schema, values=['1', '2yc1', '30.01', '4True', '5455869335000000','6','1455869335000000'])
record0.shard_id = '0'
record0.put_attribute('AK', '47')
records0.append(record0)
for i in range (1,10000):
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, str(i))
record2.set_value(1, str(i)+'yc3')
record2.set_value(2, str(i+1.1))
record2.set_value(3, str(i))
record2.set_value(4, '1455869335000011')
record2.set_value(5, '20180913_1115')
record2.set_value(6, int(time.time())*1000000)
record2.attributes = {'key': 'value'}
record2.partition_key = 'EVENT_TIME'
records0.append(record2) put_result = dh.put_records(project_name, topic_name, records0)
print(put_result)
print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
# failed_record_count如果大于0最好对failed record再进行重试
print('结束')
print
print("=======================================\n\n")
except DatahubException as e:
print(e)
sys.exit(-1)

进行验证数据导入

maxcompute默认是五分钟或者50M触发一次同步,如果需要实时的就要rds登场了
这样就测试完成了.后期进行压测,待续..
 
 
 
 
 

最新文章

  1. Spring集成MyBatis
  2. Starling中通过PivotX 和 PivotY 修改原点
  3. 浅谈JAVA设计模式
  4. Ionic 入门
  5. document的createDocumentFragment()方法
  6. sql openxml sp_xml_preparedocument xml 中文乱码
  7. nginx 反向代理设置
  8. TCP Socket的通讯框架
  9. 从无到有开发连麦直播技术&lt;转&gt;
  10. C#实现树的双亲表示法
  11. ASP.NET Core 源码学习之 Options[2]:IOptions
  12. Iptables详解七层过滤
  13. 详解k8s一个完整的监控方案(Heapster+Grafana+InfluxDB) - kubernetes
  14. Vue+koa2开发一款全栈小程序(6.个人中心)
  15. Three ways to detect outliers
  16. [疑难杂症]__当你的Cortana搜索无法使用,显示纯白界面(ps:已解决).
  17. python2.7环境下的flask项目导入模块失败解决办法
  18. C#双缓冲代码
  19. 解决从客户端(Content=&quot;&lt;div&gt;&lt;p &gt;&lt;p&gt;12312...&quot;)中检测到有潜在危险的Request.Form 值。
  20. 使用pm2管理node.js应用

热门文章

  1. 知识图谱顶会论文(KDD-2022) kgTransformer:复杂逻辑查询的预训练知识图谱Transformer
  2. 7.pyagem-游戏背景
  3. 24.-Django生成csv文件及下载
  4. 一键部署haproxy脚本
  5. 【单元测试】Junit 4(三)--Junit4断言
  6. VirtualBox 下 CentOS7 静态 IP 的配置 → 多次踩坑总结,蚌埠住了!
  7. Codeforces Round #791(Div 2)——D
  8. day14-HTTP01
  9. go GMP
  10. C#多线程之高级篇(上)