阿里云kafka使用记录(python版本)
2024-10-15 23:11:52
kafka端
consumer vpc版代码
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError # context.check_hostname = True consumer = KafkaConsumer(bootstrap_servers=['192.168.xx.xx:9092'],
group_id='xx',
api_version = (0,10)
) print('consumer start to consuming...')
consumer.subscribe(('xx',))
for message in consumer:
print(message.topic)
print(message.offset)
print(message.key)
print(message.value)
print(message.partition)
producer vpc版代码
#!/usr/bin/env python
# encoding: utf-8 import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092'],
api_version = (0,10),
retries=5) partitions = producer.partitions_for('xx')
print('Topic下分区: %s' % partitions) try:
future = producer.send(topic='xx', value=b'hello aliyun-kafka!')
future.get()
print('send message succeed.')
except KafkaError as e:
print('send message failed.')
print(e)
consumer公网版代码
import ssl
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert") consumer = KafkaConsumer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],
group_id='xxx',
sasl_mechanism="PLAIN",
ssl_context=context,
security_protocol='SASL_SSL',
api_version = (0,10),
sasl_plain_username='xxx',
sasl_plain_password='1234567890') print('consumer start to consuming...')
consumer.subscribe(('xxx', ))
for message in consumer:
print(message.topic)
print(message.offset)
print(message.value)
break
producer 公网版代码
#!/usr/bin/env python
# encoding: utf-8 import ssl
import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert")
#这个文件参考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo producer = KafkaProducer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],
sasl_mechanism="PLAIN",
ssl_context=context,
security_protocol='SASL_SSL',
api_version = (0,10),
retries=5,
sasl_plain_username='xx',
sasl_plain_password='1234567890'#注意是access-key的最后十位) partitions = producer.partitions_for('xxx')
print ('Topic下分区: %s' % partitions) try:
future = producer.send('xxx', b'hello aliyun-kafka!')
future.get()
print('send message succeed.')
except KafkaError as e:
print('send message failed.')
print(e)
从阿里云控台获得连接信息
最新文章
- angular源码分析:angular中脏活累活的承担者之$interpolate
- HTML标签-【fieldset】-fieldset
- 向modesim中添加alter库 (或者在每次仿真时将库文件加入仿真文件夹一起编译)
- Lagrange插值公式
- 【BZOJ】1104: [POI2007]洪水pow
- 开源安全平台Ossim 4.5系统使用入门(高清视频)
- swift 基于SDK8.0 获取当前时间
- Java基础知识强化之多线程笔记06:Lock接口 (区别于Synchronized块)
- 怎样通过iPhone Safari 来安装测试版ipa
- Hadoop: the definitive guide 第三版 拾遗 第十二章 之Hive分区表、桶
- [01] Preparation - Sitecore Installment
- 反编译app方法
- ffplay播放器移植VC的工程:ffplay for MFC
- Android BLE与终端通信(五)——Google API BLE4.0低功耗蓝牙文档解读之案例初探
- 《http权威指南》读书笔记10
- Linux 内核中的数据结构:基数树(radix tree)
- 阿里云三台CentOS7.2配置安装CDH5.12
- Mac安装HomeBridge适配小米Homekit报错:module未找到解决
- 《算法》第三章部分程序 part 6
- iOS 性能调优