Spark机器学习· 实时机器学习
2024-08-29 00:27:14
1 在线学习
模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。
2 Spark Streaming
- 离散化流(DStream)
输入源:Akka actors、消息队列、Flume、Kafka、……
http://spark.apache.org/docs/latest/streaming-programming-guide.html
类群(lineage):应用到RDD上的转换算子和执行算子的集合
3 MLib+Streaming应用
3.0 build.sbt
依赖Spark MLlib和Spark Streaming
name := "scala-spark-streaming-app"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"
使用国内镜像仓库
~/.sbt/repositories
[repositories]
local
osc: http://maven.oschina.net/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots
3.1 生产消息
object StreamingProducer {
def main(args: Array[String]) {
val random = new Random()
// Maximum number of events per second
val MaxEvents = 6
// Read the list of possible names
val namesResource = this.getClass.getResourceAsStream("/names.csv")
val names = scala.io.Source.fromInputStream(namesResource)
.getLines()
.toList
.head
.split(",")
.toSeq
// Generate a sequence of possible products
val products = Seq(
"iPhone Cover" -> 9.99,
"Headphones" -> 5.49,
"Samsung Galaxy Cover" -> 8.95,
"iPad Cover" -> 7.49
)
/** Generate a number of random product events */
def generateProductEvents(n: Int) = {
(1 to n).map { i =>
val (product, price) = products(random.nextInt(products.size))
val user = random.shuffle(names).head
(user, product, price)
}
}
// create a network producer
val listener = new ServerSocket(9999)
println("Listening on port: 9999")
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(1000)
val num = random.nextInt(MaxEvents)
val productEvents = generateProductEvents(num)
productEvents.foreach{ event =>
out.write(event.productIterator.mkString(","))
out.write("\n")
}
out.flush()
println(s"Created $num events...")
}
socket.close()
}
}.start()
}
}
}
sbt run
Multiple main classes detected, select one to run:
[1] MonitoringStreamingModel
[2] SimpleStreamingApp
[3] SimpleStreamingModel
[4] StreamingAnalyticsApp
[5] StreamingModelProducer
[6] StreamingProducer
[7] StreamingStateApp
Enter number: 6
3.2 打印消息
阅读全文请点击:http://click.aliyun.com/m/8713/
最新文章
- 2016年11月24日--面向对象、C#小复习
- Intellij如何设置编译后自动重新加载class文件?
- oracle判断某个字符在字段里出现过几次
- C#课外实践——校园二手平台(心得篇)
- IIS Express魔法堂:解除localhost域名的锁定
- MODBUS-寄存器与功能码学习
- 济南学习 Day 3 T2 am
- Windows 10正式版密钥大全,Win10激活序列号KEY大全
- 用c#实现与飞环语音卡的交互
- servlet多次跳转报IllegalStateException异常
- iOS搜索框UISearchBar 分类: ios技术 2015-04-03 08:55 82人阅读 评论(0) 收藏
- [HNOI2013]游走 期望+高斯消元
- windows下搭建virtualenv虚拟环境
- 微信小程序区分点击,长按事件
- Jquery属性练习
- scala笔记之惰性赋值(lazy)
- 网络I/O模型--07Netty基础
- pktgen-dpdk 运行 run.py 报错 Config file 'default' not found 解决方法
- P3275 [SCOI2011]糖果 &;&; 差分约束(二)
- python IDLE 自动提示功能