import java.util

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
/**
* Created by soyo on 17-10-17.
* 运行kafka程序 1.需要启动Zookeeper服务:./bin/zookeeper-server-start.sh config/zookeeper.properties
* 2.启动Kafka服务:./bin/kafka-server-start.sh config/server.properties
* 3.执行 DStream_Kafa_Producer
* 4.执行 DStream_Kafa_Consumer
*/
object DStream_Kafa_Producer {
def main(args: Array[String]): Unit = {
val brokers="localhost:9092"
val topic="wordsender"
val messagePerSec= //行数
val wordsPerMessage= //列数
//配置Zookeeper
val props= new util.HashMap[String,Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer") val producer=new KafkaProducer[String,String](props)
while (true){
( to messagePerSec).foreach({x=>
val str=( to wordsPerMessage).map(x=>scala.util.Random.nextInt().toString).mkString(" ")
println(str)
val message=new ProducerRecord[String,String](topic,null,str)
producer.send(message)
val s2="ni hao 我的 测试 的 字符串" //可以与message一起发送
val message2=new ProducerRecord[String,String](topic,null,s2)
producer.send(message2)
})
Thread.sleep()
}
}
}

结果:

4 6 9 7 6 8 3
0 0 8 3 9 3 4
2 2 1 9 2 2 3
6 2 5 8 1 0 7
6 9 6 8 5 8 0
7 8 6 5 3 4 4
3 7 9 1 3 1 9
9 0 0 9 0 6 9
2 5 2 8 3 6 5
9 3 2 6 2 6 8
2 1 2 7 2 7 3

/**
* Created by soyo on 17-10-17.
* 运行kafka程序 1.需要启动Zookeeper服务:./bin/zookeeper-server-start.sh config/zookeeper.properties
* 2.启动Kafka服务:./bin/kafka-server-start.sh config/server.properties
* 3.执行 DStream_Kafa_Producer
* 4.执行 DStream_Kafa_Consumer
*/

最新文章

  1. iOS 实现转盘的效果
  2. 【Windows编程】系列第七篇:Menubar的创建和使用
  3. ZOJ2314 Reactor Cooling
  4. dll return a string
  5. iOS7 中的JavaScriptCore简单介绍
  6. Think Python - Chapter 17 - Classes and methods
  7. 非常实用的10个PHP高级应用技巧
  8. SGU131 - Hardwood floor(状态压缩DP)
  9. Swift的74标准功能
  10. UC编程:输入输出重定向(系统调用)
  11. MySQL XtraBackup自动恢复脚本
  12. 【全面总结】js获取元素位置大小
  13. mongodb 复制集
  14. Codeforces878 A. Short Program
  15. FireDAC 连接SQL Server一些要注意的地方(转)
  16. 斯坦福大学公开课机器学习: advice for applying machine learning | deciding what to try next(revisited)(针对高偏差、高方差问题的解决方法以及隐藏层数的选择)
  17. 【原创】大叔经验分享(30)CM开启kerberos
  18. PS学习之小猪佩奇身上纹,掌声送给社会人
  19. laravel 一表關聯二表,二表關聯三表,通過一表controller拿三表數據
  20. IDA动态调试so文件出现SIGILL

热门文章

  1. API Studio 5.1.2 版本更新:加入全局搜索、支持批量测试API测试用例、读取代码注解生成文档支持Github与码云等
  2. Python isdigit() 方法检测字符串是否只由数字组成
  3. 【Redis】二、Redis高级特性
  4. JAVA学习笔记16——线程的创建和启动
  5. 散列(hash)
  6. Mybatis中and和or的细节处理
  7. Servlet的说明及使用案例
  8. LINUX-初始化一个文件系统
  9. Django DTL模板语法中的过滤器
  10. SQL学习笔记:表的约束