如需大数据开发整套视频(hadoop\hive\hbase\flume\sqoop\kafka\zookeeper\presto\spark):请联系QQ:1974983704

由于我使用的是kafka_2.10-0.10.0.1,需要下载对应版本的kafka-clients-0.10.1.1.jar包

生产数据KafkaProducerEx:

 1 package test.KafkaTest
2
3 import java.util.Properties
4 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
5 import org.apache.kafka.clients.consumer
6 import org.apache.kafka.clients.consumer.ConsumerConfig
7 import org.apache.kafka.common.serialization
8
13 object KafkaProducerEx {
14 def main(args:Array[String]): Unit= {
15 val topic = "test"
16 val brokers = "localhost:9092"  //Zookeeper地址,两个地址以逗号(,)分割
17 val props = new Properties()
18 props.put("bootstrap.servers", brokers)
19 props.put("acks","all")
20 props.put("retries","0")
21 props.put("batch.size","16384")
22 props.put("linger.ms","1")
23 props.put(ConsumerConfig.GROUP_ID_CONFIG,"test")
24 props.put("buffer.memory","33554432")
25 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
26 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
27
28 val producer = new KafkaProducer[String, String](props)
29 val t = System.currentTimeMillis()
30 val msg = "producer message::::::"
31 var i=0
32 for(i<-Range(1,1000))
33 {
34 println(msg+i.toString())
35 val record = new ProducerRecord[String, String](topic, "kafka_key", msg+i.toString())
36 producer.send(record)
37 }
38 producer.close()
39 }
40 }

消费数据KafkaConsumerEx:

 1 package test.KafkaTest
2 import java.util.{Collections, Properties}
3
4 import scala.collection.JavaConversions._
5 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
6 import org.apache.kafka.common.serialization
7
8 object KafkaConsumerEx {
9
10 def main(args:Array[String]):Unit={
11 val topic = "test"
12 val brokers = "localhost:9092"
13 val props = new Properties()
14 props.put("bootstrap.servers", brokers)
15 props.put("enable.auto.commit","true")
16 props.put("auto.commit.interval.ms","10000")
17 props.put(ConsumerConfig.GROUP_ID_CONFIG,"test")
18 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
19 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
20 val consumer = new KafkaConsumer[String, String](props)
21 consumer.subscribe(Collections.singleton(topic))
22 while (true)
23 {
24 val records:ConsumerRecords[String,String] = consumer.poll(100)
25 for ( record <- records){
26 println(record.value())
27 }
28 }
29 consumer.close()
30 }
31 }

最新文章

  1. 安全测试 - CSRF攻击及防御
  2. JavaScript读二进制文件并用ajax传输二进制流
  3. json,serialize,msgpack比较
  4. C#本质论读书笔记:第一章 C#概述|第二章 数据类型
  5. Java基础-继承-子类与父类执行顺序
  6. Linux 下没有 my.cnf 文件的解决方式,完全是我自己整的,好多教程都是瞎扯的 (zhuan)
  7. div 两列布局,左侧固定宽度px,右侧自适应宽度,满屏
  8. POJ 3070 Fibonacci(矩阵快速幂)
  9. TSQL基础(二)
  10. [swustoj 1021] Submissions of online judge
  11. php+mysql将大数据sql文件导入数据库
  12. 微信开发之门店管理{&quot;errcode&quot;:40097,&quot;errmsg&quot;:&quot;invalid args hint: [xxxxxxx]&quot;}
  13. Java 学习 第三篇;面向对象
  14. FastDFS分布式存储实战
  15. 20162308 实验一《Java开发环境的熟悉》实验报告
  16. ●UVA 10674 Tangents
  17. qt+vs2017环境下XIMEA相机库的配置
  18. fabric-ca1.03安装
  19. Java序列化对象-字符串转换
  20. centos7之使用最新版的kubeadm体验k8s1.12.0

热门文章

  1. 如何把百度地图左边的搜索列表导出成excel里?
  2. JMM(Java内存模型)笔记
  3. JS中有关闭包的相关内容及介绍
  4. ionic 架构
  5. Excel&mdash;&mdash;解除工作表保护
  6. java自定义的异常类
  7. linux改变用户属主
  8. 2022-3-29内部群每日三题-清辉PMP
  9. TCP三次握手和四次挥手的原因所在
  10. B. Going to the Cinema