一、spark-streaming-kafka-0-8_2.11-2.0.2.jar

1、pom.xml


  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.11</artifactId>
  5. <version>2.0.2</version>
  6. <scope>runtime</scope>
  7. </dependency>
  8. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
  9. <dependency>
  10. <groupId>org.apache.spark</groupId>
  11. <artifactId>spark-streaming_2.11</artifactId>
  12. <version>2.0.2</version>
  13. <scope>runtime</scope>
  14. </dependency>
  15. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
  16. <dependency>
  17. <groupId>org.apache.spark</groupId>
  18. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  19. <version>2.0.2</version>
  20. <scope>runtime</scope>
  21. </dependency>

2、Kafka Consumer类


  1. package com.spark.main;
  2. import java.util.Arrays;
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import org.apache.spark.SparkConf;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.function.Function;
  10. import org.apache.spark.api.java.function.VoidFunction;
  11. import org.apache.spark.streaming.Durations;
  12. import org.apache.spark.streaming.api.java.JavaDStream;
  13. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
  14. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  15. import org.apache.spark.streaming.kafka.KafkaUtils;
  16. import kafka.serializer.StringDecoder;
  17. import scala.Tuple2;
  18. public class KafkaConsumer{
  19. public static void main(String[] args) throws InterruptedException{
  20. /**
  21. * SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
  22. * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
  23. * Durations.seconds(2)每两秒读取一次kafka
  24. */
  25. SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
  26. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));
  27. jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint/KafkaConsumer");
  28. /**
  29. * 配置连接kafka的相关参数
  30. */
  31. Set<String> topicsSet = new HashSet<String>(Arrays.asList("TestTopic"));
  32. Map<String, String> kafkaParams = new HashMap<String, String>();
  33. kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
  34. kafkaParams.put("auto.offset.reset", "smallest");//smallest:从最初开始;largest :从最新开始
  35. kafkaParams.put("fetch.message.max.bytes", "524288");
  36. JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
  37. StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
  38. /**
  39. * _2()获取第二个对象的值
  40. */
  41. JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  42. public String call(Tuple2<String, String> tuple2) {
  43. return tuple2._2();
  44. }
  45. });
  46. lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
  47. public void call(JavaRDD<String> rdd) throws Exception {
  48. rdd.foreach(new VoidFunction<String>() {
  49. public void call(String s) throws Exception {
  50. System.out.println(s);
  51. }
  52. });
  53. }
  54. });
  55. // Start the computation
  56. jssc.start();
  57. jssc.awaitTermination();
  58. }
  59. }

二、spark-streaming-kafka-0-10_2.11-2.0.2.jar

1、pom.xml


  1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.11</artifactId>
  5. <version>2.0.2</version>
  6. <scope>runtime</scope>
  7. </dependency>
  8. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
  9. <dependency>
  10. <groupId>org.apache.spark</groupId>
  11. <artifactId>spark-streaming_2.11</artifactId>
  12. <version>2.0.2</version>
  13. <scope>runtime</scope>
  14. </dependency>
  15. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 -->
  16. <dependency>
  17. <groupId>org.apache.spark</groupId>
  18. <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  19. <version>2.0.2</version>
  20. <scope>runtime</scope>
  21. </dependency>

2、Kafka Consumer类


  1. package com.spark.main;
  2. import java.util.Arrays;
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import org.apache.kafka.clients.consumer.ConsumerRecord;
  8. import org.apache.kafka.common.serialization.StringDeserializer;
  9. import org.apache.spark.SparkConf;
  10. import org.apache.spark.api.java.JavaRDD;
  11. import org.apache.spark.api.java.function.Function;
  12. import org.apache.spark.api.java.function.VoidFunction;
  13. import org.apache.spark.streaming.Durations;
  14. import org.apache.spark.streaming.api.java.JavaDStream;
  15. import org.apache.spark.streaming.api.java.JavaInputDStream;
  16. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
  17. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  18. import org.apache.spark.streaming.kafka010.ConsumerStrategies;
  19. import org.apache.spark.streaming.kafka010.KafkaUtils;
  20. import org.apache.spark.streaming.kafka010.LocationStrategies;
  21. import kafka.serializer.StringDecoder;
  22. import scala.Tuple2;
  23. public class Kafka10Consumer{
  24. public static void main(String[] args) throws InterruptedException{
  25. /**
  26. * SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
  27. * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
  28. * Durations.seconds(2)每两秒读取一次kafka
  29. */
  30. SparkConf sparkConf = new SparkConf().setAppName("Kafka10Consumer").setMaster("local[2]");
  31. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));
  32. jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint/Kafka10Consumer");
  33. /**
  34. * 配置连接kafka的相关参数
  35. */
  36. Set<String> topicsSet = new HashSet<String>(Arrays.asList("TestTopic"));
  37. Map<String, Object> kafkaParams = new HashMap<String, Object>();
  38. kafkaParams.put("bootstrap.servers", "192.168.168.200:9092");
  39. kafkaParams.put("key.deserializer", StringDeserializer.class);
  40. kafkaParams.put("value.deserializer", StringDeserializer.class);
  41. kafkaParams.put("group.id", "Kafka10Consumer");
  42. kafkaParams.put("auto.offset.reset", "earliest");//earliest : 从最早开始;latest :从最新开始
  43. kafkaParams.put("enable.auto.commit", false);
  44. //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
  45. JavaInputDStream<ConsumerRecord<Object,Object>> messages = KafkaUtils.createDirectStream(
  46. jssc,
  47. LocationStrategies.PreferConsistent(),
  48. ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
  49. );
  50. /**
  51. * _2()获取第二个对象的值
  52. */
  53. JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<Object,Object>, String>() {
  54. @Override
  55. public String call(ConsumerRecord<Object, Object> consumerRecord) throws Exception {
  56. // TODO Auto-generated method stub
  57. return consumerRecord.value().toString();
  58. }
  59. });
  60. lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
  61. public void call(JavaRDD<String> rdd) throws Exception {
  62. rdd.foreach(new VoidFunction<String>() {
  63. public void call(String s) throws Exception {
  64. System.out.println(s);
  65. }
  66. });
  67. }
  68. });
  69. // Start the computation
  70. jssc.start();
  71. jssc.awaitTermination();
  72. }
  73. }

最新文章

  1. MATLAB 物体识别算法说明:vision.ForegroundDetector, vision.BlobAnalysis
  2. groovy-输入输出
  3. tcp的三次握手及四次挥手(连接与中断流程)
  4. C# OpenFileDialog和PictrueBox
  5. PHP面向对象(OOP):.static和const关键字的使用(self::)
  6. 关于js对象引用的小例子
  7. .h 与 .hpp 文件
  8. poj2635 同余定理 + 素数筛法
  9. python安装第三方库
  10. 1013. Battle Over Cities (25)
  11. Azure系列2.1.11 —— CloudBlobContainer
  12. Jsp基本语法 第二章
  13. 小程序视图层(xx.xml)和逻辑层(xx.js)
  14. Android图片加载框架最全解析(八),带你全面了解Glide 4的用法
  15. linux 2.6.xx自动加载kvm模块
  16. Django 1.10文档中文版Part1
  17. 解析IE, FireFox, Opera 浏览器支持Alpha透明的方法
  18. 为什么说git比svn好
  19. VI编辑器、ipython、jupyter及进程
  20. rmdir 命令

热门文章

  1. c++下基于windows socket的多线程服务器(基于TCP协议)
  2. python --flask框架的搭建--(flask,python2,python3都可以运行)
  3. ubuntu discuz 该函数需要 PHP 支持 XML。请联系空间商,确定开启了此项功能
  4. mAP的计算
  5. todolist待办事项
  6. unity 实现调用Windows窗口/对话框交互
  7. CodeForces - 616F:Expensive Strings (后缀自动机)
  8. lesson2-cnn-fastai
  9. ajax遍历数组对象
  10. Blender 3D 打印插件Print Toolbox