IDEA Spark Streaming Kafka数据源-Producer
2024-08-26 13:16:53
import java.util import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
/**
* Created by soyo on 17-10-17.
* 运行kafka程序 1.需要启动Zookeeper服务:./bin/zookeeper-server-start.sh config/zookeeper.properties
* 2.启动Kafka服务:./bin/kafka-server-start.sh config/server.properties
* 3.执行 DStream_Kafa_Producer
* 4.执行 DStream_Kafa_Consumer
*/
object DStream_Kafa_Producer {
def main(args: Array[String]): Unit = {
val brokers="localhost:9092"
val topic="wordsender"
val messagePerSec= //行数
val wordsPerMessage= //列数
//配置Zookeeper
val props= new util.HashMap[String,Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer") val producer=new KafkaProducer[String,String](props)
while (true){
( to messagePerSec).foreach({x=>
val str=( to wordsPerMessage).map(x=>scala.util.Random.nextInt().toString).mkString(" ")
println(str)
val message=new ProducerRecord[String,String](topic,null,str)
producer.send(message)
val s2="ni hao 我的 测试 的 字符串" //可以与message一起发送
val message2=new ProducerRecord[String,String](topic,null,s2)
producer.send(message2)
})
Thread.sleep()
}
}
}
结果:
4 6 9 7 6 8 3
0 0 8 3 9 3 4
2 2 1 9 2 2 3
6 2 5 8 1 0 7
6 9 6 8 5 8 0
7 8 6 5 3 4 4
3 7 9 1 3 1 9
9 0 0 9 0 6 9
2 5 2 8 3 6 5
9 3 2 6 2 6 8
2 1 2 7 2 7 3
/**
* Created by soyo on 17-10-17.
* 运行kafka程序 1.需要启动Zookeeper服务:./bin/zookeeper-server-start.sh config/zookeeper.properties
* 2.启动Kafka服务:./bin/kafka-server-start.sh config/server.properties
* 3.执行 DStream_Kafa_Producer
* 4.执行 DStream_Kafa_Consumer
*/
最新文章
- iOS 实现转盘的效果
- 【Windows编程】系列第七篇:Menubar的创建和使用
- ZOJ2314 Reactor Cooling
- dll return a string
- iOS7 中的JavaScriptCore简单介绍
- Think Python - Chapter 17 - Classes and methods
- 非常实用的10个PHP高级应用技巧
- SGU131 - Hardwood floor(状态压缩DP)
- Swift的74标准功能
- UC编程:输入输出重定向(系统调用)
- MySQL XtraBackup自动恢复脚本
- 【全面总结】js获取元素位置大小
- mongodb 复制集
- Codeforces878 A. Short Program
- FireDAC 连接SQL Server一些要注意的地方(转)
- 斯坦福大学公开课机器学习: advice for applying machine learning | deciding what to try next(revisited)(针对高偏差、高方差问题的解决方法以及隐藏层数的选择)
- 【原创】大叔经验分享(30)CM开启kerberos
- PS学习之小猪佩奇身上纹,掌声送给社会人
- laravel 一表關聯二表,二表關聯三表,通過一表controller拿三表數據
- IDA动态调试so文件出现SIGILL