这里使用的是低级API,因为高级API非常不好用,需要繁琐的配置,也不够自动化,却和低级API的效果一样,所以这里以低级API做演示

你得有zookeeper和kafka

我这里是3台节点主机

架构图

与高级API的区别,简单并行(不需要创造多个输入流,它会自动并行读取kafka的数据),高效(不会像receiver数据被copy两次),一次性语义(缺点:无法使用zookeeper的监控工具)

1.创建maven工程

首先添加pom依赖,其它运行依赖请参考 sparkStreaming整合WordCount

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.0.2</version>
</dependency>

2.启动zookeeper集群

我把zookeeper集群弄成了个脚本,直接执行脚本启动所有zookeeper

启动成功

3.启动kafka集群

我这里是3台主机,三台都需要

进入目录

cd /export/servers/kafka/bin/

启动

kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties 

成功

4.测试kafka

创建topic

cd /export/servers/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic kafka_spark

通过生产者发送消息

cd /export/servers/kafka_2.11-0.10.2.1
bin/kafka-console-producer.sh --broker-list node01:9092 --topic  kafka_spark

想发啥,发啥。此时通过创建AP接收生产者发送的数据

编写代码

package SparkStreaming

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext} object SparkStreamingKafka {
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象
val conf: SparkConf = new SparkConf()
.setAppName("SparkStreamingKafka_Direct")
.setMaster("local[2]") // 2.创建SparkContext对象
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN") // 3.创建StreamingContext对象
/**
* 参数说明:
* 参数一:SparkContext对象
* 参数二:每个批次的间隔时间
*/
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
//设置checkpoint目录 ssc.checkpoint("./Kafka_Direct") // 4.通过KafkaUtils.createDirectStream对接kafka(采用是kafka低级api偏移量不受zk管理)
// 4.1.配置kafka相关参数
val kafkaParams=Map("metadata.broker.list"->"192.168.52.110:9092,192.168.52.120:9092,192.168.52.130:9092","group.id"->"kafka_Direct")
// 4.2.定义topic
val topics=Set("kafka_spark") val dstream: InputDStream[(String, String)] = KafkaUtils
.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) // 5.获取topic中的数据
val topicData: DStream[String] = dstream.map(_._2) // 6.切分每一行,每个单词计为1
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1)) // 7.相同单词出现的次数累加
val resultDS: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) // 8.通过Output Operations操作打印数据
resultDS.print() // 9.开启流式计算
ssc.start() // 阻塞一直运行
ssc.awaitTermination() }
}

生产者生产数据

API接收控制台打印计算结果

最新文章

  1. Python for Infomatics 第12章 网络编程五(译)
  2. DataSanp的控制老大-DSServer
  3. 淘宝(阿里百川)手机客户端开发日记第六篇 Service详解(二)
  4. 传感器(2)常用api简介及列出当前设备支持的传感器代码
  5. JamCam创业故事:辞掉工作,去开发一个应用
  6. Redis教程01——命令
  7. Android面试,与Service交互方式
  8. kaggle之人脸特征识别
  9. cocos2dx进阶学习之CCTMXLayer
  10. 【一天一道LeetCode】#155. Min Stack
  11. Android6.0 源码修改之 仿IOS添加全屏可拖拽浮窗返回按钮
  12. The server time zone value &#39;&#214;&#208;&#185;&#250;&#177;&#234;&#215;&#188;&#202;&#177;&#188;&#228;&#39; is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration
  13. 【一:定义】python 简介
  14. QML学习笔记(四)-TabView-竖直方向
  15. System.Data.Entity.Internal.AppConfig&quot;的类型初始值设定项引发异常
  16. 误操作yum导致error: rpmdb
  17. Python匿名函数详解
  18. mysql通过mysqldump工具,对某个库下的表进行备份
  19. django URL的补充 默认值 传多个参数
  20. promise封装小程序的请求类(request,清爽易懂)

热门文章

  1. PHP http_response_code 网络函数
  2. JS基础语法---函数作为参数使用---回调函数
  3. vue slot内容分发
  4. PHP删除数组中重复的元素
  5. 一个驱动导致的内存泄漏问题的分析过程(meminfo-&gt;pmap-&gt;slabtop-&gt;alloc_calls)
  6. Linux:搭建samba服务器
  7. 预览本地图片原生js
  8. 8.jenkins 远程管理
  9. 数据库导出--Oracle-dmp格式
  10. 解决:Unable to acquire the dpkg frontend lock (/var/lib/dpkg/lock-frontend), is another process using it?