spark-streaming-kafka-0-8 和 0-10的使用区别
2024-10-18 17:45:01
一、spark-streaming-kafka-0-8_2.11-2.0.2.jar
1、pom.xml
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>2.0.2</version>
- <scope>runtime</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>2.0.2</version>
- <scope>runtime</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
- <version>2.0.2</version>
- <scope>runtime</scope>
- </dependency>
2、Kafka Consumer类
- package com.spark.main;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Map;
- import java.util.Set;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.VoidFunction;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaPairInputDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import kafka.serializer.StringDecoder;
- import scala.Tuple2;
- public class KafkaConsumer{
- public static void main(String[] args) throws InterruptedException{
- /**
- * SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
- * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
- * Durations.seconds(2)每两秒读取一次kafka
- */
- SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));
- jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint/KafkaConsumer");
- /**
- * 配置连接kafka的相关参数
- */
- Set<String> topicsSet = new HashSet<String>(Arrays.asList("TestTopic"));
- Map<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
- kafkaParams.put("auto.offset.reset", "smallest");//smallest:从最初开始;largest :从最新开始
- kafkaParams.put("fetch.message.max.bytes", "524288");
- JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
- StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
- /**
- * _2()获取第二个对象的值
- */
- JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- });
- lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- public void call(JavaRDD<String> rdd) throws Exception {
- rdd.foreach(new VoidFunction<String>() {
- public void call(String s) throws Exception {
- System.out.println(s);
- }
- });
- }
- });
- // Start the computation
- jssc.start();
- jssc.awaitTermination();
- }
- }
二、spark-streaming-kafka-0-10_2.11-2.0.2.jar
1、pom.xml
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>2.0.2</version>
- <scope>runtime</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>2.0.2</version>
- <scope>runtime</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
- <version>2.0.2</version>
- <scope>runtime</scope>
- </dependency>
2、Kafka Consumer类
- package com.spark.main;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Map;
- import java.util.Set;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.VoidFunction;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaInputDStream;
- import org.apache.spark.streaming.api.java.JavaPairInputDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka010.ConsumerStrategies;
- import org.apache.spark.streaming.kafka010.KafkaUtils;
- import org.apache.spark.streaming.kafka010.LocationStrategies;
- import kafka.serializer.StringDecoder;
- import scala.Tuple2;
- public class Kafka10Consumer{
- public static void main(String[] args) throws InterruptedException{
- /**
- * SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
- * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
- * Durations.seconds(2)每两秒读取一次kafka
- */
- SparkConf sparkConf = new SparkConf().setAppName("Kafka10Consumer").setMaster("local[2]");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));
- jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint/Kafka10Consumer");
- /**
- * 配置连接kafka的相关参数
- */
- Set<String> topicsSet = new HashSet<String>(Arrays.asList("TestTopic"));
- Map<String, Object> kafkaParams = new HashMap<String, Object>();
- kafkaParams.put("bootstrap.servers", "192.168.168.200:9092");
- kafkaParams.put("key.deserializer", StringDeserializer.class);
- kafkaParams.put("value.deserializer", StringDeserializer.class);
- kafkaParams.put("group.id", "Kafka10Consumer");
- kafkaParams.put("auto.offset.reset", "earliest");//earliest : 从最早开始;latest :从最新开始
- kafkaParams.put("enable.auto.commit", false);
- //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
- JavaInputDStream<ConsumerRecord<Object,Object>> messages = KafkaUtils.createDirectStream(
- jssc,
- LocationStrategies.PreferConsistent(),
- ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
- );
- /**
- * _2()获取第二个对象的值
- */
- JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<Object,Object>, String>() {
- @Override
- public String call(ConsumerRecord<Object, Object> consumerRecord) throws Exception {
- // TODO Auto-generated method stub
- return consumerRecord.value().toString();
- }
- });
- lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- public void call(JavaRDD<String> rdd) throws Exception {
- rdd.foreach(new VoidFunction<String>() {
- public void call(String s) throws Exception {
- System.out.println(s);
- }
- });
- }
- });
- // Start the computation
- jssc.start();
- jssc.awaitTermination();
- }
- }
最新文章
- MATLAB 物体识别算法说明:vision.ForegroundDetector, vision.BlobAnalysis
- groovy-输入输出
- tcp的三次握手及四次挥手(连接与中断流程)
- C# OpenFileDialog和PictrueBox
- PHP面向对象(OOP):.static和const关键字的使用(self::)
- 关于js对象引用的小例子
- .h 与 .hpp 文件
- poj2635 同余定理 + 素数筛法
- python安装第三方库
- 1013. Battle Over Cities (25)
- Azure系列2.1.11 —— CloudBlobContainer
- Jsp基本语法 第二章
- 小程序视图层(xx.xml)和逻辑层(xx.js)
- Android图片加载框架最全解析(八),带你全面了解Glide 4的用法
- linux 2.6.xx自动加载kvm模块
- Django 1.10文档中文版Part1
- 解析IE, FireFox, Opera 浏览器支持Alpha透明的方法
- 为什么说git比svn好
- VI编辑器、ipython、jupyter及进程
- rmdir 命令
热门文章
- c++下基于windows socket的多线程服务器(基于TCP协议)
- python --flask框架的搭建--(flask,python2,python3都可以运行)
- ubuntu discuz 该函数需要 PHP 支持 XML。请联系空间商,确定开启了此项功能
- mAP的计算
- todolist待办事项
- unity 实现调用Windows窗口/对话框交互
- CodeForces - 616F:Expensive Strings (后缀自动机)
- lesson2-cnn-fastai
- ajax遍历数组对象
- Blender 3D 打印插件Print Toolbox