Flume直接对接SaprkStreaming的两种方式
2024-08-28 23:42:55
一、flume对接sparkStreaming的两种方式:
Push推送的方式
Poll拉取的方式
第一种Push方式:
代码如下:
package cn.itcast.spark.day5 import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext} /**
* .
*/
object FlumePushWordCount { def main(args: Array[String]) {
val host = args(0)
val port = args(1).toInt
LoggerLevels.setStreamingLogLevels()
val conf = new SparkConf().setAppName("FlumeWordCount")//.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//推送方式: flume向spark发送数据
val flumeStream = FlumeUtils.createStream(ssc, host, port)
//flume中的数据通过event.getBody()才能拿到真正的内容
val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1)) val results = words.reduceByKey(_ + _)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
flume配置如下:
#agent名, source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#具体定义source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/monitor
#具体定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#具体定义sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 10.1.9.102 (是本机IP)
a1.sinks.k1.port = 6666
#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume:
/usr/java/flume/bin/flume-ng agent -n a1 -c conf -f /usr/java/flume/mytest/push.properties
第二种Poll的方式:
但是这种方法必须要引入Spark官方的一个jar包,见官方的文档:点击跳转,将jar下载下来放到flume安装包的lib目录下即可,点击直接下载jar包
代码如下:
package cn.itcast.spark.day5 import java.net.InetSocketAddress import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext} object FlumePollWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//从flume中拉取数据(flume的地址)
val address = Seq(new InetSocketAddress("172.16.0.11", 8888))
val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1))
val results = words.reduceByKey(_+_)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
启动flume
#agent名, source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#具体定义source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/monitor
#具体定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#具体定义sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.80.123
a1.sinks.k1.port = 10086
#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume:
/usr/java/flume/bin/flume-ng agent -n a1 -c conf -f /usr/java/flume/mytest/push.properties
最新文章
- spring boot(六):如何优雅的使用mybatis
- iBatis.net 类的继承extends和懒加载
- C和指针 第八章 数组
- Spring和EJB3的技术对比
- Linux运行级详解
- c# 正则表达式 匹配回车
- sqlite3内存不断增加的原因
- c++ 动态分配二维数组 new 二维数组
- windows下设置socket的connect超时
- 【技术宅3】截取文件和url扩展名的N种方法
- fabric 安装及使用
- zoj 2524 并查集裸
- Java 学习使用常见的开源连接池
- xadmin设置
- Reachability from the Capital CodeForces - 999E(强连通分量 缩点 入度为0的点)
- php static 变量用法
- SSAS 收藏
- 01_NIO基本概念
- ftp客户端的创建
- BZOJ4009:[HNOI2015]接水果(整体二分版)
热门文章
- 使用JQuery做一组复选框的功能。
- Eclipse_java项目中导入外部jar文件
- 当当网-前端project师測试题
- 【转】Impala安装json解析udf插件
- ADF中遍历VO中的行数据(Iterator)
- Linux学习总结(十六)系统用户及用户组管理
- Linux 学习总结(五)-linux 文件系统及相关命令
- [转]Asp.Net url中文乱码
- 微软提供的Office在线预览地址
- tomcat启动后报错Bad version number in .class file (unable to load class oracle.jdbc.OracleDriver)