1.消费者代码

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /**
* Created by p on 2018/10/8.
*/
public class AvroKafkaProducer {
public static final String USER_SCHEMA = "{\n" +
" \"type\":\"record\",\n" +
" \"name\":\"Customer\",\n" +
" \"fields\":[\n" +
" {\"name\":\"id\",\"type\":\"int\"},\n" +
" {\"name\":\"name\",\"type\":\"string\"},\n" +
" {\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":\"null\"}\n" +
" ]\n" +
"}"; public static void main(String[] args){ Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","ip:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
kafkaProps.put("partitioner.class","MyPartitioner"); Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord,byte[]> injection = GenericAvroCodecs.toBinary(schema);
KafkaProducer producer = new KafkaProducer<String,byte[]>(kafkaProps);
for(int i = 0;i < 1000;i++){
GenericData.Record record = new GenericData.Record(schema);
record.put("id",i);
record.put("name","name-"+i);
record.put("email","email-"+i);
byte[] bytes = injection.apply(record);
ProducerRecord<String,byte[]> record1 = new ProducerRecord<String, byte[]>("Customer","customer-"+i,bytes);
producer.send(record1);
}
producer.close();
System.out.println(USER_SCHEMA);
}
}

2. 消费者代码

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections;
import java.util.Properties; /**
* Created by p on 2018/10/14.
*/
public class AvroKafkaConsumer { public static final String USER_SCHEMA = "{\n" +
" \"type\":\"record\",\n" +
" \"name\":\"Customer\",\n" +
" \"fields\":[\n" +
" {\"name\":\"id\",\"type\":\"int\"},\n" +
" {\"name\":\"name\",\"type\":\"string\"},\n" +
" {\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":\"null\"}\n" +
" ]\n" +
"}"; public static void main(String[] args){
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","ip:9092"); kafkaProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer"); kafkaProps.put("group.id","DemoAvroKafkaConsumer"); kafkaProps.put("auto.offset.reset","earliest"); KafkaConsumer<String ,byte[]> consumer = new KafkaConsumer<String, byte[]>(kafkaProps); consumer.subscribe(Collections.singletonList("Customer")); Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); Injection<GenericRecord,byte[]> injection = GenericAvroCodecs.toBinary(schema); try {
while (true){
ConsumerRecords<String,byte[]> records = consumer.poll(10);
for(ConsumerRecord<String,byte[]> record : records){
GenericRecord record1 = injection.invert(record.value()).get();
System.out.println(record.key() + ":" + record1.get("id") + "\t" + record1.get("name") + "\t" + record1.get("email"));
}
}
} finally {
consumer.close();
}
}
}

3. pom依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.6-cdh5.9.1</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.11</artifactId>
<version>0.9.6</version>
</dependency>

最新文章

  1. SSL安全证书-概念解析
  2. Linux下Redis安装与PHP扩展(PHP7适用)
  3. linux文件系统模拟
  4. MyBatis学习总结_13_Mybatis查询之resultMap和resultType区别
  5. [Gauss]POJ1222 EXTENDED LIGHTS OUT
  6. HDU 5224 Tom and paper(最小周长)
  7. 写代码质量改善java计划151建议——导航开始
  8. (八)、vpn-pptp部署
  9. FSG报表打印报错,log文件显示java.sql.SQLException: No corresponding LOB data found
  10. 【Python 】selenium 简介
  11. 浅析JavaScript解析赋值、浅拷贝和深拷贝的区别
  12. 开启telnet
  13. C#操作Control异步工具类
  14. istio实现自动sidecar自动注入(k8s1.13.3+istio1.1.1)
  15. 操作dom影响性能的原因
  16. (Go rails)使用Rescue_from(ActiveSupport:Rescuable::ClassMethods)来解决404(ActiveRecord::RecordNotFound)❌
  17. hibernate(一)
  18. MongoDB常用命令总结
  19. nyoj Registration system
  20. c++面试题中经常被面试官面试的小问题总结(一)(本篇偏向基础知识)

热门文章

  1. day02-业务服务监控
  2. 异或加密 - cr2-many-time-secrets(攻防世界) - 异性相吸(buuctf)
  3. 3种办法教你解决Vegas预览画面卡顿问题
  4. 为什么思维导图软件MindManager成为了企业培训必备的工具
  5. Vue 3.0 升级指南
  6. python3 Failed to establish a new connection: [WinError 10061] 由于目标计算机积极拒绝,无法连接
  7. Codeforces Round #674 (Div. 3) C、D 题解
  8. 转载的一篇文章eclipse添加插件
  9. 帆软用工具测试超链接打开弹窗(iframe嵌套),解决js传参带中文传递有乱码问题
  10. GitHub上最火的、最值得前端学习的几个数据结构与算法项目!没有之一!