kafka之c接口常用API------librdkafka
2024-09-28 12:27:36
1 安装方法以及相关库文件
https://github.com/edenhill/librdkafka
2
- High-level producer
- High-level consumer
- Simple (Low-level) consumer
- 压缩:snappy, gzip, lz4
- SSL
- SASL
consumer有两套API,高级(high-level)和底层(simple)的,应该叫底层API或者低级API,它跟高级API的区别是没有自动负载均衡,而高级API会自动进行负载均衡。
3 kafka主要的用途
发数据---->producer
//发一条
rd_kafka_produce()
//发多条
rd_kafka_produce_batch()
收数据---->consumer
在收发数据之前至少需要一个统一的句柄,方便kafka内部准备好链接brokers集群,初始化kafka内部结构等
建立这个kafka句柄需要知道连接到哪个broker
发布消息使用rd_kafka_top_t,
// 对rd_kafka_topic_partition_list_t结构的操作
// 创建
rd_kafka_topic_partition_list_new();
// 增加元素
rd_kafka_topic_partition_list_add();
// 删除元素
rd_kafka_topic_partition_list_del();
// 查找元素
rd_kafka_topic_partition_list_find(); // 对rd_kafka_topic_t的操作
// 创建
rd_kafka_topic_new();
// 删除
rd_kafka_topic_destroy();
// 获取该topic的名字
rd_kafka_topic_name();
// 获取该topic传入的应用参数
rd_kafka_topic_opaque(); // 使用rd_kafka_topic_partition_list_t的时候,topic+partition是连在一起的,
// 所以给kafka句柄的时候只用一个参数就够了
// 订阅消息
rd_kafka_subscribe (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *topics);
// 指定消费的partition,可以在运行时更换
rd_kafka_assign (rd_kafka_t *rk,
const rd_kafka_topic_partition_list_t *partitions); // 用rd_kafka_topic_t比较麻烦,需要配合一个partition才行
// 直接启动consumer了
rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
int64_t offset);
// 每次接收也要带上partition
rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
int timeout_ms); // 发送的时候使用 rd_kafka_topic_t
int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
void *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque);
后续用了继续做笔记,关于错误处理,topic_partition_list操作等
https://blog.csdn.net/lijinqi1987/article/details/76582067
http://suntus.github.io/2016/07/07/librdkafka--kafka%20C%20api%E4%BB%8B%E7%BB%8D/
最新文章
- js常用工具类.
- 1.15 kickstart rpm yum
- get_magic_quotes_gpc函数
- 【转】What's the difference between simulation and emulation
- 转:java两个jre目录和三个lib目录
- cmd 控制台 提示:请求的操作须要提升!
- Python之路【第十三篇】:jQuery -暂无内容-待更新
- 对sql进行分页处理(Oracle版)
- Swiper.js使用方法
- gperftools对程序进行分析
- Elasticsearch深入搜索之结构化搜索及JavaAPI的使用
- [Java] SpringMVC工作原理之一:DispatcherServlet
- Source Insight 如何将script等文件加入
- 乘风破浪:LeetCode真题_035_Search Insert Position
- netable 禁用拖动
- centos7 安装拼音输入法
- .Net并行编程系列之一:并行基础
- Android四大基本组件之 Activity
- Unknown type name 'UIColor"; 的问题
- C#中关于换行符的记录