我们知道KeywordMessage就是被kafka发送和存储的对象。所以只需要模拟出这个就可以发送自定义消息了。

比如我需要将用户的id,user,age,address和访问ip和访问date记录为一个消息。我就自定义一个消息格式(id-user-age-address-ip-date)。

我立马想到自己定义个javaBean.写一个UserInfo类伪代码。

class UserInfo(){

    id;

   user;

   age;

   address;

   ip;

   date;

   toString(){

    return this.getId()+"-"+this.getUser()+"-"+"..."+this.getDate(); 

}

}

你以为这样就可以了吗?当然不行啊!

还要按照kafka的消息类型进行封装,在这里我们只需要实现Encoder类即可:继续看看代码就好;

public class KeywordMessage implements kafka.serializer.Encoder<UserInfo>{
     
    public static final Logger LOG=LoggerFactory.getLogger(UserInfo.class);
     
    @Override
    public Message toMessage(Keyword words) {
        LOG.info("start in encoding...");
        return new Message(words.toString().getBytes());
    }
}
这样KeywordMessage就是一个可以被kafka发送和存储的对象了。
 
我们再看看producer,producer数据的推送到broker的,所以发起者还是业务系统,下面的代码就能直接发送一次数据。
/**配置producer必要的参数*/

Properties props = new Properties();
必要的一些配置省略。。。
/**选择用哪个类来进行序列化,就是我们自定义的消息类*/
props.put("serializer.class", "org.kafka.message.UserInfo");
ProducerConfig config=new ProducerConfig(props);
/**构造测试数据*/
UserInfo userInfo = new UserInfo();
userInfo.setId(1);
userInfo.setUser("xiaoming");
 ...
List<UserInfo> msg=new ArrayList<UserInfo>();
msg.add(userInfo);
/**构造数据发送对象*/
Producer<String, UserInfo> producer=new Producer<String, UserInfo>(config);      
ProducerData<String,UserInfo> data=new ProducerData<String, UserInfo>("test", msg);
producer.send(data);
 
以上就是自定义封装消息内容。

最新文章

  1. 【目录】 hadoop2.6.0
  2. node exports与 module.exports的区别
  3. jmeter接口测试教程
  4. Android 建立文件夹、生成文件并写入文本文件内容
  5. Memcached解决单台服务器故障问题
  6. opencv + numpy for python
  7. &amp;quot;错: void 值不被忽略,因为预期&amp;quot;解决
  8. 201521123083 《Java程序设计》第10周学习总结
  9. iOS开发xcode报错:&quot;xxxxxx&quot;has been modified since the precompiled header was built
  10. (三)jdk8学习心得之方法引用
  11. Android Connection refused
  12. ICPC中国南昌国家邀请赛和国际丝绸之路规划大赛预选赛 I J
  13. LOJ 2409「THUPC 2017」小 L 的计算题 / Sum
  14. UTF-8的BOM含义
  15. 启发式合并&amp;线段树合并/分裂&amp;treap合并&amp;splay合并
  16. apigateway-kong(三)Proxy规则
  17. eclipse编译zookeeper源码
  18. Unix环境高级编程(二十)伪终端
  19. DataGridView使用技巧十一:DataGridView用户输入时,单元格输入值的设定
  20. 新建用户组、用户、用户密码、删除用户组、用户(适合CentOS、Ubuntu系统)

热门文章

  1. Leetcode: LRU Cache 解题报告
  2. PowerShell中实现人机交互
  3. windows下PIP安装模块编码错误解决
  4. VBA学习笔记(4)--数组和单元格互相转换
  5. Android 自定义Adapter实现多视图Item的ListView
  6. oops_根据epc定位linux_kernel_panic位置
  7. 内核模块module传参
  8. 公司内网成功实现WSUS在不连外网的条件下更新补丁包!
  9. SparkR:数据科学家的新利器
  10. Stay hungry, stay foolish. 求知若饥,虚心若愚。