kafka端
 
consumer vpc版代码
 
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError # context.check_hostname = True consumer = KafkaConsumer(bootstrap_servers=['192.168.xx.xx:9092'],
group_id='xx',
api_version = (0,10)
) print('consumer start to consuming...')
consumer.subscribe(('xx',))
for message in consumer:
print(message.topic)
print(message.offset)
print(message.key)
print(message.value)
print(message.partition)

producer vpc版代码

#!/usr/bin/env python
# encoding: utf-8 import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092'],
api_version = (0,10),
retries=5) partitions = producer.partitions_for('xx')
print('Topic下分区: %s' % partitions) try:
future = producer.send(topic='xx', value=b'hello aliyun-kafka!')
future.get()
print('send message succeed.')
except KafkaError as e:
print('send message failed.')
print(e)

consumer公网版代码

import ssl
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert") consumer = KafkaConsumer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],
group_id='xxx',
sasl_mechanism="PLAIN",
ssl_context=context,
security_protocol='SASL_SSL',
api_version = (0,10),
sasl_plain_username='xxx',
sasl_plain_password='1234567890') print('consumer start to consuming...')
consumer.subscribe(('xxx', ))
for message in consumer:
print(message.topic)
print(message.offset)
print(message.value)
break
 
producer 公网版代码
#!/usr/bin/env python
# encoding: utf-8 import ssl
import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert")
#这个文件参考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo producer = KafkaProducer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],
sasl_mechanism="PLAIN",
ssl_context=context,
security_protocol='SASL_SSL',
api_version = (0,10),
retries=5,
sasl_plain_username='xx',
sasl_plain_password='1234567890'#注意是access-key的最后十位) partitions = producer.partitions_for('xxx')
print ('Topic下分区: %s' % partitions) try:
future = producer.send('xxx', b'hello aliyun-kafka!')
future.get()
print('send message succeed.')
except KafkaError as e:
print('send message failed.')
print(e)

从阿里云控台获得连接信息

最新文章

  1. angular源码分析:angular中脏活累活的承担者之$interpolate
  2. HTML标签-【fieldset】-fieldset
  3. 向modesim中添加alter库 (或者在每次仿真时将库文件加入仿真文件夹一起编译)
  4. Lagrange插值公式
  5. 【BZOJ】1104: [POI2007]洪水pow
  6. 开源安全平台Ossim 4.5系统使用入门(高清视频)
  7. swift 基于SDK8.0 获取当前时间
  8. Java基础知识强化之多线程笔记06:Lock接口 (区别于Synchronized块)
  9. 怎样通过iPhone Safari 来安装测试版ipa
  10. Hadoop: the definitive guide 第三版 拾遗 第十二章 之Hive分区表、桶
  11. [01] Preparation - Sitecore Installment
  12. 反编译app方法
  13. ffplay播放器移植VC的工程:ffplay for MFC
  14. Android BLE与终端通信(五)——Google API BLE4.0低功耗蓝牙文档解读之案例初探
  15. 《http权威指南》读书笔记10
  16. Linux 内核中的数据结构:基数树(radix tree)
  17. 阿里云三台CentOS7.2配置安装CDH5.12
  18. Mac安装HomeBridge适配小米Homekit报错:module未找到解决
  19. 《算法》第三章部分程序 part 6
  20. iOS 性能调优

热门文章

  1. 如何规范App广告的隐私获取,让用户拥有更多知情权?
  2. 常用Linux命令(常年更新)
  3. 聊聊GPU与CPU的区别
  4. Java安全之Resin2内存马
  5. js高级之对象高级部分
  6. dlv远端调试go的问题
  7. UBOOT编译--- UBOOT顶层Makefile中目标_all和all的关系及背景(四)
  8. c++题目:数迷
  9. C温故补缺(十一):文件读写
  10. 关于js更改编码问题