*以下内容由《Spark快速大数据分析》整理所得。

读书笔记的第七部分是讲的是如何使用Spark中提供机器学习函数的MLlib库,在集群中并行运行机器学习算法。

MLlib是Spark中提供机器学习函数的,MLlib就是RDD上一系列可供调用的函数的集合。需要注意的是:MLlib 中只包含能够在集群上运行良好的并行算法,不支持不能并行执行的算法。

一、操作向量

二、特征提取

三、特征标准化

四、分类

五、回归

六、聚类

七、降维


一、操作向量

MLlib 中有个最常用的数据类型Vector类:稠密向量稀疏向量

例子:用 Python 创建向量

from numpy import array
from pyspark.mllib.linalg import Vectors # 创建稠密向量<1.0, 2.0, 3.0>
# 方法1:NumPy数组可以直接传给MLlib
denseVec1 = array([1.0, 2.0, 3.0])
# 方法2:或者使用Vectors类来创建
denseVec2 = Vectors.dense([1.0, 2.0, 3.0]) # 创建稀疏向量<1.0, 0.0, 2.0, 0.0>;该方法只接收
# 向量的维度(4)以及非零位的位置和对应的值
# 这些数据可以用一个dictionary来传递,或使用两个分别代表位置和值的list
sparseVec1 = Vectors.sparse(4, {0: 1.0, 2: 2.0})
sparseVec2 = Vectors.sparse(4, [0, 2], [1.0, 2.0])

二、特征提取

MLlib 有两个算法可以用来计算 TF-IDF:HashingTF和IDF,都在 mllib.feature 包内。 HashingTF从一个文档中计算出给定大小的词频向量。
例子:在 Python 中使用 TF-IDF

from pyspark.mllib.feature import HashingTF, IDF 

# 将若干文本文件读取为TF向量
rdd = sc.wholeTextFiles("data").map(lambda (name, text): text.split())
tf = HashingTF()
tfVectors = tf.transform(rdd).cache() # 计算IDF,然后计算TF-IDF向量 idf = IDF()
idfModel = idf.fit(tfVectors)
tfIdfVectors = idfModel.transform(tfVectors)

还有其它方法: Word2Vec.fit_transform(rdd)


三、特征标准化

使用MLlib中的StandardScaler类来进行这样的缩放,同时控制均值和标准差(例如所有的特征平均值为0,标准差为1)。
例子:在Python中缩放向量

from pyspark.mllib.feature import StandardScaler 

vectors = [Vectors.dense([-2.0, 5.0, 1.0]), Vectors.dense([2.0, 0.0, 1.0])]
dataset = sc.parallelize(vectors)
scaler = StandardScaler(withMean=True, withStd=True)
model = scaler.fit(dataset)
result = model.transform(dataset)

四、分类

例子:Python 版垃圾邮件分类器

from pyspark.mllib.regression import LabeledPoint # 方便准备带标签的数据点
from pyspark.mllib.feature import HashingTF # 词频特征的提取
from pyspark.mllib.classification import LogisticRegressionWithSGD # SGD spam = sc.textFile("spam.txt") normal = sc.textFile("normal.txt") # 创建一个HashingTF实例来把邮件文本映射为包含10000个特征的向量
tf = HashingTF(numFeatures = 10000)
# 各邮件都被切分为单词,每个单词被映射为一个特征
spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
normalFeatures = normal.map(lambda email: tf.transform(email.split(" "))) # 创建LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeExamples = normalFeatures.map(lambda features: LabeledPoint(0, features))
trainingData = positiveExamples.union(negativeExamples) trainingData.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据RDD # 使用SGD算法运行逻辑回归
model = LogisticRegressionWithSGD.train(trainingData) # 以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试。首先使用
# 一样的HashingTF特征来得到特征向量,然后对该向量应用得到的模型
posTest = tf.transform("O M G GET cheap stuff by sending money to ...".split(" "))
negTest = tf.transform("Hi Dad, I started studying Spark the other ...".split(" "))
print "Prediction for positive test example: %g" % model.predict(posTest)
print "Prediction for negative test example: %g" % model.predict(negTest)

五、回归

例子:Python 中的线性回归

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD points = # (创建LabeledPoint组成的RDD)
model = LinearRegressionWithSGD.train(points, iterations=200, intercept=True)
print "weights: %s, intercept: %s" % (model.weights, model.intercept)

六、聚类

当你要调用K-means算法时,你需要创建 mllib.clustering.KMeans 对象(在Java/Scala中)或者调用 KMeans.train (在Python中)。它接收一个Vector组成的RDD作为参数。K-means返回一个KMeansModel对象,该对象允许你访问其clusterCenters属性(聚类中心,是一个向量的数组)或者调用 predict() 来对一个新的向量返回它所属的聚类。


七、降维

PCA目前只在Java 和Scala(MLlib 1.2)中可用。要调用PCA,你首先要使用mllib. linalg.distributed.RowMatrix类来表示你的矩阵,然后存储一个由Vector 组成的RDD每行一个。
例子:Scala 中的 PCA

import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix val points: RDD[Vector] = // ...
val mat: RowMatrix = new RowMatrix(points)
val pc: Matrix = mat.computePrincipalComponents(2) // 将点投影到低维空间中
val projected = mat.multiply(pc).rows // 在投影出的二维数据上训练k-means模型
val model = KMeans.train(projected, 10)

最新文章

  1. PostgreSQL 与 MySQL 相比,优势何在?
  2. http错误码大全?
  3. App主界面Tab实现方法
  4. robot API笔记4
  5. JAVA 图形界面 JFrame容器
  6. C#使用EmguCV实现视频读取和播放,及多个视频一起播放的问题
  7. Hadoop 1.1.2 Eclipse 插件使用——异常解决
  8. Android 动画学习笔记
  9. WPF学习小记
  10. 工欲善其事必先利其器之Xcode高效插件和舒适配色
  11. haskell,lisp,erlang你们更喜欢哪个?
  12. java_log4j----java 日志管理
  13. [强烈推荐]ORACLE PL/SQL编程详解之七:程序包的创建与应用(聪明在于学习,天才在于积累!)
  14. svn检出服务器项目中出现的could not connect to server
  15. AJAX数据请求
  16. MySQL统计函数记录——按月、按季度、按日、时间段统计
  17. 啥?客户叫在DataGridView的左上角添加CheckBox?
  18. 常见的HTTP状态码(HTTP Status Code)说明
  19. C#.NET 中的 Timer 计时器及 3 种使用方法
  20. 安装UEStudio以及破解

热门文章

  1. Acticiti流程引擎在已知当前流程定义id的情况下获取当前流程的所有信息(包括:节点和连线)
  2. vue 项目打包后静态资源加载不到
  3. Get提交方式中文乱码
  4. 【原创】xenomai内核解析--双核系统调用(二)--应用如何区分xenomai/linux系统调用或服务
  5. linux(centos8):禁用selinux(临时关闭/永久关闭)
  6. vue知识点15
  7. Jmeter入门(1)- 什么是Jmeter以及Jmeter的安装和环境配置
  8. Linux操作系统的介绍和安装教程(Centos6.4)
  9. 自定义View(进度条)
  10. Retrofit学习