SparkStreaming简单例子

◆ 构建第一个Streaming程序: (wordCount) 

  ◆ Spark Streaming 程序最好以使用Maven或者sbt编译出来的独立应用的形式运行。

  ◆ 准备工作:
  1.引入Spark Streaming的jar
  2.scala流计算import声明
  import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.streaming.StreamingContext._
  import org.apache.spark.streaming.dstream.DStream
  import org.apache.spark.streaming.Duration
  import org.apache.spark.streaming.Seconds

1.初始化StreamingContext对象

   //创建一个本地StreamingContext两个工作线程和批间隔1秒。
   val conf = new SparkConf()
   conf.setMaster(“local[2]")
   conf.setAppName(“ NetworkWordCount")
   val ssc = new StreamingContext(conf, Seconds(1))

2.获取DStream对象 

  //创建一个连接到主机名的DStream,像localhost:9999

   val lines = ssc.socketTextStream("localhost", 9999)

3.操作DStream对象

  //将每一行接收到的数据通过空格分割成单词

  val words = lines.flatMap(_.split(" “))
  //导入StreamingContext中的隐式转换
  import org.apache.spark.streaming.StreamingContext._

   // 对每一批次的单词进行转化求和

  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  // 每个批次中默认打印前十个元素到控制台
  wordCounts.print()

4.启动流处理程序

  ssc.start// 开始计算

  ssc.awaitTermination() // 等待计算终止

  ssc.stop() //结束应用

启动网络端口,模拟发送数据

  1.借助于nc命令,手动输入数据

    Linux/Mac :nc

    Windows:cat

      nc -lk 9999

  2.借助于代码,编写一个模拟数据发生器  

package com.briup.streaming

import java.io.PrintWriter
import java.net.ServerSocket import scala.io.Source object MassageServer { // 定义随机获取整数的方法
def index(length: Int) = {
import java.util.Random
val rdm = new Random
rdm.nextInt(length)
} def main(args: Array[String]) {
println("模拟数据器启动!!!")
// 获取指定文件总的行数
val filename ="Spark/ihaveadream.txt";
val lines = Source.fromFile(filename).getLines.toList
val filerow = lines.length // 指定监听某端口,当外部程序请求时建立连接
val serversocket = new ServerSocket(9999); while (true) {
//监听9999端口,获取socket对象
val socket = serversocket.accept()
// println(socket)
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) {
Thread.sleep(1000)
// 当该端口接受请求时,随机获取某行数据发送给对方
val content = lines(index(filerow)) println (content) out.write(content + '\n') out.flush()
}
socket.close()
}
}.start()
}
}
}

模拟发送数据

注意事项:

◆ 1.启动 Spark Streaming 之前所作的所有步骤只是创建了执行流程, 程序没有真正
连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划
◆ 2.当 ssc.start()启动后程序才真正进行所有预期的操作
◆ 3.执行会在另一个线程中进行,所以需要调用awaitTermination来等待流计算完成
◆ 4.一个Streaming context只能启动一次
◆ 5.如果模式是本地模式,那么请务必设置local[n] ,n>=2   1个用于接收,1个用于处理


package com.briup.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext} object MyTestOldAPI {
def main(args: Array[String]): Unit = {
//设置日志级别
Logger.getLogger("org").setLevel(Level.WARN) //1 获取DS
val conf = new SparkConf().setAppName("MyTestOldAPI").setMaster("local[*]")
val dss = new StreamingContext(conf, Duration(1000))
val ds = dss.socketTextStream("localhost", 9999) //2 逻辑处理 //统计
val res = ds.filter(_ != "").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) res.print() //3 开启实时处理任务
dss.start()
dss.awaitTermination()
dss.stop()
}
}

最新文章

  1. 读书笔记_Effective_C++_条款四十九:了解new_handler的行为
  2. 微软职位内部推荐-Software Engineer II
  3. 图片的copy,从一个目录复制到另一个目录
  4. 黄聪:如何使用WebKitBrowser调用元素点击事件(C#)
  5. 2014年互联网IT待遇【转载】
  6. Windows - 子系统(subsystem)错误
  7. MVC应用程序显示上传的图片
  8. 关于vue2用vue-cli搭建环境后域名代理的http-proxy-middleware
  9. MySQL Innodb如何找出阻塞事务源头SQL
  10. Coding theano under remote ubuntu server from local Mac (在本地mac机器上,写、跑、调试、看-远程ubuntu上的theano代码)
  11. 网络编程_tcp与dup协议简单应用
  12. web页面实现文件下载的几种方法
  13. shell for 循环
  14. Razor视图基本语法
  15. SQLite基本操作-----IOS(如有雷同,纯属巧合)
  16. Linux ssh将命令放入后台
  17. fasttext与Linear SVC 分类测试结果
  18. PowerDesigner最基础的使用方法入门学习(一)
  19. storm报错:Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout])
  20. KDD 2018 | 最佳论文:首个面向Facebook、arXiv网络图类的对抗攻击研究

热门文章

  1. 大数据篇:一文读懂@数据仓库(PPT文字版)
  2. NanoHTTPD服务
  3. 你不知道的Java引用
  4. PHP XML DOM:DOM 是什么?
  5. PHP zip_read() 函数
  6. 【BZOJ4631】踩气球 题解(线段树)
  7. Pytest单元测试框架-学习
  8. 【Python 实例】面向对象 | 按逗号分割列表
  9. 使用MPI进行分布式内存编程(2)
  10. HTTP POST 请求的两种编码格式:application/x-www-form-urlencoded 和 multipart/form-data