Kafka中Producer端封装自定义消息
2024-08-22 15:47:09
我们知道KeywordMessage就是被kafka发送和存储的对象。所以只需要模拟出这个就可以发送自定义消息了。
比如我需要将用户的id,user,age,address和访问ip和访问date记录为一个消息。我就自定义一个消息格式(id-user-age-address-ip-date)。
我立马想到自己定义个javaBean.写一个UserInfo类伪代码。
class UserInfo(){
id;
user;
age;
address;
ip;
date;
toString(){
return this.getId()+"-"+this.getUser()+"-"+"..."+this.getDate();
}
}
你以为这样就可以了吗?当然不行啊!
还要按照kafka的消息类型进行封装,在这里我们只需要实现Encoder类即可:继续看看代码就好;
public
class
KeywordMessage
implements
kafka.serializer.Encoder<UserInfo>{
public
static
final
Logger LOG=LoggerFactory.getLogger(UserInfo.
class
);
@Override
public
Message toMessage(Keyword words) {
LOG.info(
"start in encoding..."
);
return
new
Message(words.toString().getBytes());
}
}
这样KeywordMessage就是一个可以被kafka发送和存储的对象了。
我们再看看producer,producer数据的推送到broker的,所以发起者还是业务系统,下面的代码就能直接发送一次数据。
/**配置producer必要的参数*/
Properties props =
new
Properties();
必要的一些配置省略。。。
/**选择用哪个类来进行序列化,就是我们自定义的消息类*/
props.put(
"serializer.class"
,
"org.kafka.message.UserInfo"
);
ProducerConfig config=
new
ProducerConfig(props);
/**构造测试数据*/
UserInfo userInfo
=
new
UserInfo();
userInfo
.setId(
1
);
userInfo
.setUser(
"xiaoming"
);
...
List<UserInfo> msg=
new
ArrayList<UserInfo>();
msg.add(userInfo);
/**构造数据发送对象*/
Producer<String, UserInfo> producer=
new
Producer<String, UserInfo>(config);
ProducerData<String,UserInfo> data=
new
ProducerData<String, UserInfo>(
"test"
, msg);
producer.send(data);
以上就是自定义封装消息内容。
最新文章
- 【目录】 hadoop2.6.0
- node exports与 module.exports的区别
- jmeter接口测试教程
- Android 建立文件夹、生成文件并写入文本文件内容
- Memcached解决单台服务器故障问题
- opencv + numpy for python
- &;quot;错: void 值不被忽略,因为预期&;quot;解决
- 201521123083 《Java程序设计》第10周学习总结
- iOS开发xcode报错:";xxxxxx";has been modified since the precompiled header was built
- (三)jdk8学习心得之方法引用
- Android Connection refused
- ICPC中国南昌国家邀请赛和国际丝绸之路规划大赛预选赛 I J
- LOJ 2409「THUPC 2017」小 L 的计算题 / Sum
- UTF-8的BOM含义
- 启发式合并&;线段树合并/分裂&;treap合并&;splay合并
- apigateway-kong(三)Proxy规则
- eclipse编译zookeeper源码
- Unix环境高级编程(二十)伪终端
- DataGridView使用技巧十一:DataGridView用户输入时,单元格输入值的设定
- 新建用户组、用户、用户密码、删除用户组、用户(适合CentOS、Ubuntu系统)