kafka生产消息,streaming消费
2024-10-20 06:29:34
package com.bd.useranalysis.spark.streaming.kafka2es; import com.alibaba.fastjson.JSON;
import com.bd.useranalysis.common.config.ConfigUtil;
import com.bd.useranalysis.common.project.datatype.DataTypeProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*; import java.util.*; public class Kafka2EsJava { Properties properties = ConfigUtil.getInstance().getProperties("kafka/kafka-server-config.properties"); static Set<String> dataTypes = DataTypeProperties.dataTypeMap.keySet(); public static void main(String[] args) throws InterruptedException { SparkConf sparkConf = new SparkConf().setAppName("sparkstreaming_kafka2es").setMaster("local[2]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel("WARN");
JavaStreamingContext jss = new JavaStreamingContext(jsc, Durations.seconds(2L)); Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers","quyf:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "test_20190815");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
List<String> topicList = Arrays.asList("test","test2");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jss,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicList, kafkaParams)
); JavaDStream<HashMap<String, String>> recordDS = stream.map(new Function<ConsumerRecord<String, String>, HashMap<String, String>>() { @Override
public HashMap<String, String> call(ConsumerRecord<String, String> record) throws Exception {
//System.out.println("consumer==>"+record.value());
return JSON.parseObject(record.value(), HashMap.class);
}
}); for (String type : dataTypes) {
recordDS.filter(new Function<HashMap<String, String>, Boolean>() {
@Override
public Boolean call(HashMap<String, String> resultMap) throws Exception {
return resultMap.get("table").equals(type);
}
}).foreachRDD(new VoidFunction<JavaRDD<HashMap<String, String>>>() {
@Override
public void call(JavaRDD<HashMap<String, String>> mapJavaRDD) throws Exception {
mapJavaRDD.foreach(new VoidFunction<HashMap<String, String>>() {
@Override
public void call(HashMap<String, String> stringStringHashMap) throws Exception {
System.out.println(stringStringHashMap.toString());
}
});
}
});
} jss.start();
jss.awaitTermination(); }
}
public class GenKafkaData { public static void main(String[] args) throws Exception {
List<String> lines = IOUtils.readLines(new FileReader(
new File("E:\\wechat\\wechat_source1_1111153.txt"))); Producer<String, String> producer = getProducer(); ArrayList<String> columns = DataTypeProperties.dataTypeMap.get("wechat");
Map<String, String> dataMap = new HashMap<>();
dataMap.put("table","wechat");
for(String line : lines){
String[] fields = line.split("\t");
for (int i = 0; i < fields.length; i++) {
dataMap.put(columns.get(i), fields[i]);
}
int index = 0;
while(true){
String lineRecord = JSON.toJSONString(dataMap);
producer.send(new ProducerRecord<>("test2",null, lineRecord));
Thread.sleep(1000);
index++;
System.out.println("send->"+lineRecord);
if(index==10){
break;
}
}
//System.out.println("send->"+lineRecord);
//StringProducer.producer("test", lineRecord);
}
} public static Producer<String, String> getProducer(){
Producer<String, String> producer = new KafkaProducer<String, String>(createProducerProperties());
return producer;
} private static Properties createProducerProperties() {
Properties props = new Properties();
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("bootstrap.servers", "quyf:9092");
props.put("linger.ms",1);
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
return props;
}
}
最新文章
- 移动开发框架剖析(二) Hammer专业的手势控制
- iOS中关于NavigationController中preferredStatusBarStyle一直不执行的问题
- HTML5拖拽实例
- 【CodeVS 1198】【NOIP 2012】国王游戏
- MySQL Workbench 6 不能删除数据等问题(“Error Code: 1175”) 和入门教程
- EF工作中踩过的坑.
- CentOS 5.X安装LAMP最高版本环境
- FZU 2127 养鸡场
- 16个最棒的jQuery视差滚动效果教程
- Fragmen横竖屏切换,导致页面混乱,oncreateView重复调用
- sleep函数——Gevent源码分析
- Uva 552 Prime Ring Problem(dfs)
- java war 打包、解压命令(转载)
- 关于extjs表单布局的几种方式
- Jmeter(三)_配置元件
- 智表(ZCELL)专业版收费说明
- inode 与black 特点与简介
- Android 百度sdk5.0定位
- 19.python设置单线程和多线程
- hdu 5182 结构体排序
热门文章
- Spring框架学习笔记(7)——Spring Boot 实现上传和下载
- hdu 1028 Sample Ignatius and the Princess III (母函数)
- nyoj 259-茵茵的第一课 (python, input, print)
- Kotlin Coroutines不复杂, 我来帮你理一理
- 领扣(LeetCode)回文链表 个人题解
- Future模式的学习以及JDK内置Future模式的源码分析
- ubuntu server 1604 设置笔记本盒盖 不操作
- PHP提高SESSION响应速度的方法有哪些
- Redis 的底层数据结构(对象)
- 从Netty EventLoop实现上可以学到什么