Spark机器学习

1 在线学习

模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。

2 Spark Streaming

3 MLib+Streaming应用

3.0 build.sbt

依赖Spark MLlib和Spark Streaming

name := "scala-spark-streaming-app"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1"
使用国内镜像仓库

~/.sbt/repositories

[repositories]
local
osc: http://maven.oschina.net/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

3.1 生产消息

object StreamingProducer {

  def main(args: Array[String]) {

    val random = new Random()

    // Maximum number of events per second
    val MaxEvents = 6

    // Read the list of possible names
    val namesResource = this.getClass.getResourceAsStream("/names.csv")
    val names = scala.io.Source.fromInputStream(namesResource)
      .getLines()
      .toList
      .head
      .split(",")
      .toSeq

    // Generate a sequence of possible products
    val products = Seq(
      "iPhone Cover" -> 9.99,
      "Headphones" -> 5.49,
      "Samsung Galaxy Cover" -> 8.95,
      "iPad Cover" -> 7.49
    )

    /** Generate a number of random product events */
    def generateProductEvents(n: Int) = {
      (1 to n).map { i =>
        val (product, price) = products(random.nextInt(products.size))
        val user = random.shuffle(names).head
        (user, product, price)
      }
    }

    // create a network producer
    val listener = new ServerSocket(9999)
    println("Listening on port: 9999")

    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Got client connected from: " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)

          while (true) {
            Thread.sleep(1000)
            val num = random.nextInt(MaxEvents)
            val productEvents = generateProductEvents(num)
            productEvents.foreach{ event =>
              out.write(event.productIterator.mkString(","))
              out.write("\n")
            }
            out.flush()
            println(s"Created $num events...")
          }
          socket.close()
        }
      }.start()
    }
  }
}
sbt run

Multiple main classes detected, select one to run:

 [1] MonitoringStreamingModel
 [2] SimpleStreamingApp
 [3] SimpleStreamingModel
 [4] StreamingAnalyticsApp
 [5] StreamingModelProducer
 [6] StreamingProducer
 [7] StreamingStateApp

Enter number: 6

3.2 打印消息

阅读全文请点击:http://click.aliyun.com/m/8713/

最新文章

  1. 2016年11月24日--面向对象、C#小复习
  2. Intellij如何设置编译后自动重新加载class文件?
  3. oracle判断某个字符在字段里出现过几次
  4. C#课外实践——校园二手平台(心得篇)
  5. IIS Express魔法堂:解除localhost域名的锁定
  6. MODBUS-寄存器与功能码学习
  7. 济南学习 Day 3 T2 am
  8. Windows 10正式版密钥大全,Win10激活序列号KEY大全
  9. 用c#实现与飞环语音卡的交互
  10. servlet多次跳转报IllegalStateException异常
  11. iOS搜索框UISearchBar 分类: ios技术 2015-04-03 08:55 82人阅读 评论(0) 收藏
  12. [HNOI2013]游走 期望+高斯消元
  13. windows下搭建virtualenv虚拟环境
  14. 微信小程序区分点击,长按事件
  15. Jquery属性练习
  16. scala笔记之惰性赋值(lazy)
  17. 网络I/O模型--07Netty基础
  18. pktgen-dpdk 运行 run.py 报错 Config file 'default' not found 解决方法
  19. P3275 [SCOI2011]糖果 && 差分约束(二)
  20. python IDLE 自动提示功能

热门文章

  1. Android开发之基本控件和详解四种布局方式
  2. Java豆瓣电影爬虫——小爬虫成长记(附源码)
  3. 【分布式】Zookeeper数据与存储
  4. 再谈JavaScript闭包及应用
  5. Spring JdbcTemplate
  6. 第三篇:Entity Framework CodeFirst & Model 映射 续篇 EntityFramework Power Tools 工具使用
  7. C#开发微信门户及应用(22)-微信小店的开发和使用
  8. 两种常用的C语言排序算法
  9. JavaScript中数据类型转换总结
  10. JavaScript闭包(Closure)