package com.bd.useranalysis.spark.streaming.kafka2es;

import com.alibaba.fastjson.JSON;
import com.bd.useranalysis.common.config.ConfigUtil;
import com.bd.useranalysis.common.project.datatype.DataTypeProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*; import java.util.*; public class Kafka2EsJava { Properties properties = ConfigUtil.getInstance().getProperties("kafka/kafka-server-config.properties"); static Set<String> dataTypes = DataTypeProperties.dataTypeMap.keySet(); public static void main(String[] args) throws InterruptedException { SparkConf sparkConf = new SparkConf().setAppName("sparkstreaming_kafka2es").setMaster("local[2]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel("WARN");
JavaStreamingContext jss = new JavaStreamingContext(jsc, Durations.seconds(2L)); Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers","quyf:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "test_20190815");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
List<String> topicList = Arrays.asList("test","test2");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jss,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicList, kafkaParams)
); JavaDStream<HashMap<String, String>> recordDS = stream.map(new Function<ConsumerRecord<String, String>, HashMap<String, String>>() { @Override
public HashMap<String, String> call(ConsumerRecord<String, String> record) throws Exception {
//System.out.println("consumer==>"+record.value());
return JSON.parseObject(record.value(), HashMap.class);
}
}); for (String type : dataTypes) {
recordDS.filter(new Function<HashMap<String, String>, Boolean>() {
@Override
public Boolean call(HashMap<String, String> resultMap) throws Exception {
return resultMap.get("table").equals(type);
}
}).foreachRDD(new VoidFunction<JavaRDD<HashMap<String, String>>>() {
@Override
public void call(JavaRDD<HashMap<String, String>> mapJavaRDD) throws Exception {
mapJavaRDD.foreach(new VoidFunction<HashMap<String, String>>() {
@Override
public void call(HashMap<String, String> stringStringHashMap) throws Exception {
System.out.println(stringStringHashMap.toString());
}
});
}
});
} jss.start();
jss.awaitTermination(); }
}

  

public class GenKafkaData {

    public static void main(String[] args) throws Exception {
List<String> lines = IOUtils.readLines(new FileReader(
new File("E:\\wechat\\wechat_source1_1111153.txt"))); Producer<String, String> producer = getProducer(); ArrayList<String> columns = DataTypeProperties.dataTypeMap.get("wechat");
Map<String, String> dataMap = new HashMap<>();
dataMap.put("table","wechat");
for(String line : lines){
String[] fields = line.split("\t");
for (int i = 0; i < fields.length; i++) {
dataMap.put(columns.get(i), fields[i]);
}
int index = 0;
while(true){
String lineRecord = JSON.toJSONString(dataMap);
producer.send(new ProducerRecord<>("test2",null, lineRecord));
Thread.sleep(1000);
index++;
System.out.println("send->"+lineRecord);
if(index==10){
break;
}
}
//System.out.println("send->"+lineRecord);
//StringProducer.producer("test", lineRecord);
}
} public static Producer<String, String> getProducer(){
Producer<String, String> producer = new KafkaProducer<String, String>(createProducerProperties());
return producer;
} private static Properties createProducerProperties() {
Properties props = new Properties();
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("bootstrap.servers", "quyf:9092");
props.put("linger.ms",1);
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
return props;
}
}

  

最新文章

  1. 移动开发框架剖析(二) Hammer专业的手势控制
  2. iOS中关于NavigationController中preferredStatusBarStyle一直不执行的问题
  3. HTML5拖拽实例
  4. 【CodeVS 1198】【NOIP 2012】国王游戏
  5. MySQL Workbench 6 不能删除数据等问题(“Error Code: 1175”) 和入门教程
  6. EF工作中踩过的坑.
  7. CentOS 5.X安装LAMP最高版本环境
  8. FZU 2127 养鸡场
  9. 16个最棒的jQuery视差滚动效果教程
  10. Fragmen横竖屏切换,导致页面混乱,oncreateView重复调用
  11. sleep函数——Gevent源码分析
  12. Uva 552 Prime Ring Problem(dfs)
  13. java war 打包、解压命令(转载)
  14. 关于extjs表单布局的几种方式
  15. Jmeter(三)_配置元件
  16. 智表(ZCELL)专业版收费说明
  17. inode 与black 特点与简介
  18. Android 百度sdk5.0定位
  19. 19.python设置单线程和多线程
  20. hdu 5182 结构体排序

热门文章

  1. Spring框架学习笔记(7)——Spring Boot 实现上传和下载
  2. hdu 1028 Sample Ignatius and the Princess III (母函数)
  3. nyoj 259-茵茵的第一课 (python, input, print)
  4. Kotlin Coroutines不复杂, 我来帮你理一理
  5. 领扣(LeetCode)回文链表 个人题解
  6. Future模式的学习以及JDK内置Future模式的源码分析
  7. ubuntu server 1604 设置笔记本盒盖 不操作
  8. PHP提高SESSION响应速度的方法有哪些
  9. Redis 的底层数据结构(对象)
  10. 从Netty EventLoop实现上可以学到什么