不使用es-hadoop的saveToES,与scala版本冲突问题太多。
不使用bulkprocessor,异步提交,es容易oom,速度反而不快。
使用BulkRequestBuilder同步提交。

主要代码

public static void main(String[] args){
System.setProperty("hadoop.home.dir", "D:\\hadoop");
System.setProperty("es.set.netty.runtime.available.processors", "false");
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SendRecord");
conf.set("spark.streaming.backpressure.enabled", "true");
conf.set("spark.streaming.receiver.maxRate", "1000");
conf.set("spark.streaming.kafka.maxRatePerPartition", "1000");
conf.set("es.nodes", "eshost");
conf.set("es.port", "9200");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2)); Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "kafkahost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "sparkGroup4");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("users");
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream
(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); JavaDStream<User> kafkaDStream = stream.map(new Function<ConsumerRecord<String, String>, User>() {
@Override
public User call(ConsumerRecord<String, String> record) throws Exception {
Gson gson = new Gson();
return gson.fromJson(record.value(), User.class);
}
}); kafkaDStream.foreachRDD(new VoidFunction<JavaRDD<User>>() {
@Override
public void call(JavaRDD<User> userJavaRDD) throws Exception {
userJavaRDD.foreachPartition(new VoidFunction<Iterator<User>>() {
@Override
public void call(Iterator<User> userIterator) throws Exception {
TransportClient client = ESClient.getClient();
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
Map<String, Object> map = new HashMap<>();
while(userIterator.hasNext()){
User user = userIterator.next();
map.put("name", user.getName());
map.put("age", user.getAge());
map.put("desc", user.getDescription());
IndexRequest request = client.prepareIndex("users", "info").setSource(map).request();
bulkRequestBuilder.add(request);
}
if(bulkRequestBuilder.numberOfActions() > 0){
BulkResponse bulkItemResponses = bulkRequestBuilder.execute().actionGet();
}
}
});
}
});
ssc.start(); try {
// Wait for the computation to terminate.
ssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

  

ESClient:

public class ESClient {
public static TransportClient getClient(){
return Holder.client;
} private static class Holder{
private static TransportClient client;
static{
try {
Settings setting = Settings.builder()
.put("cluster.name", "es")
.put("client.transport.sniff", false)
.put("client.transport.ping_timeout", "60s")
.put("client.transport.nodes_sampler_interval", "60s")
.build();
client = new PreBuiltTransportClient(setting);
client.addTransportAddress(new TransportAddress(new InetSocketAddress("eshost",9300)));
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
}

  

最新文章

  1. 笔记整理之BCP
  2. mongoVUE的增删改查操作使用说明
  3. 无法打开注册表项 unknown 没有足够的权限访问
  4. kuangbin_ShortPath L (POJ 2502)
  5. 写了一个jquery的 弹出层插件。
  6. Linux驱动开发常用头文件
  7. Jmeter -- 初体验
  8. jquery禁用右键单击、F5刷新
  9. Java学习笔记——I/O流
  10. JSON--stringify() 和 parse() 方法
  11. css选择器语法速查
  12. Unable to connect to MKS;Too many scoket connect attempts;giving up
  13. 利用.frm、.ibd恢复数据
  14. 深度学习课程笔记(五)Ensemble
  15. Video Frame Synthesis using Deep Voxel Flow 论文笔记
  16. 洛谷 P4128: bzoj 1815: [SHOI2006]有色图
  17. WMAppManifest.xml
  18. linux 递归删除目录文件
  19. 【BZOJ】1901: Zju2112 Dynamic Rankings
  20. Linux 指令篇:磁盘管理--tree

热门文章

  1. 各种浏览器的兼容css
  2. html5的开发
  3. The &#39;decorators&#39; plugin requires a &#39;decoratorsBeforeExport&#39; option, ...(npm start报错)
  4. document.getElementById()
  5. linux下mysql5.7忘记root密码修改
  6. codeforces 1165F1/F2 二分好题
  7. mybatis的核心配置文件
  8. web快速开发框架 WebBuilder 8.7发布
  9. dubbo rest服务(消费者) java.lang.ClassNotFoundException: org.jboss.resteasy.client.jaxrs.engines.ApacheHttpClient4Engine 错误问题
  10. CentOS 7 端口白名单设置