7. 基于MLlib的机器学习
*以下内容由《Spark快速大数据分析》整理所得。
读书笔记的第七部分是讲的是如何使用Spark中提供机器学习函数的MLlib库,在集群中并行运行机器学习算法。
MLlib是Spark中提供机器学习函数的,MLlib就是RDD上一系列可供调用的函数的集合。需要注意的是:MLlib 中只包含能够在集群上运行良好的并行算法,不支持不能并行执行的算法。
一、操作向量
二、特征提取
三、特征标准化
四、分类
五、回归
六、聚类
七、降维
一、操作向量
MLlib 中有个最常用的数据类型Vector类:稠密向量与稀疏向量。
例子:用 Python 创建向量
from numpy import array
from pyspark.mllib.linalg import Vectors # 创建稠密向量<1.0, 2.0, 3.0>
# 方法1:NumPy数组可以直接传给MLlib
denseVec1 = array([1.0, 2.0, 3.0])
# 方法2:或者使用Vectors类来创建
denseVec2 = Vectors.dense([1.0, 2.0, 3.0]) # 创建稀疏向量<1.0, 0.0, 2.0, 0.0>;该方法只接收
# 向量的维度(4)以及非零位的位置和对应的值
# 这些数据可以用一个dictionary来传递,或使用两个分别代表位置和值的list
sparseVec1 = Vectors.sparse(4, {0: 1.0, 2: 2.0})
sparseVec2 = Vectors.sparse(4, [0, 2], [1.0, 2.0])
二、特征提取
MLlib 有两个算法可以用来计算 TF-IDF:HashingTF和IDF,都在 mllib.feature 包内。 HashingTF从一个文档中计算出给定大小的词频向量。
例子:在 Python 中使用 TF-IDF
from pyspark.mllib.feature import HashingTF, IDF # 将若干文本文件读取为TF向量
rdd = sc.wholeTextFiles("data").map(lambda (name, text): text.split())
tf = HashingTF()
tfVectors = tf.transform(rdd).cache() # 计算IDF,然后计算TF-IDF向量 idf = IDF()
idfModel = idf.fit(tfVectors)
tfIdfVectors = idfModel.transform(tfVectors)
还有其它方法: Word2Vec.fit_transform(rdd)
三、特征标准化
使用MLlib中的StandardScaler类来进行这样的缩放,同时控制均值和标准差(例如所有的特征平均值为0,标准差为1)。
例子:在Python中缩放向量
from pyspark.mllib.feature import StandardScaler vectors = [Vectors.dense([-2.0, 5.0, 1.0]), Vectors.dense([2.0, 0.0, 1.0])]
dataset = sc.parallelize(vectors)
scaler = StandardScaler(withMean=True, withStd=True)
model = scaler.fit(dataset)
result = model.transform(dataset)
四、分类
例子:Python 版垃圾邮件分类器
from pyspark.mllib.regression import LabeledPoint # 方便准备带标签的数据点
from pyspark.mllib.feature import HashingTF # 词频特征的提取
from pyspark.mllib.classification import LogisticRegressionWithSGD # SGD spam = sc.textFile("spam.txt") normal = sc.textFile("normal.txt") # 创建一个HashingTF实例来把邮件文本映射为包含10000个特征的向量
tf = HashingTF(numFeatures = 10000)
# 各邮件都被切分为单词,每个单词被映射为一个特征
spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
normalFeatures = normal.map(lambda email: tf.transform(email.split(" "))) # 创建LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeExamples = normalFeatures.map(lambda features: LabeledPoint(0, features))
trainingData = positiveExamples.union(negativeExamples) trainingData.cache() # 因为逻辑回归是迭代算法,所以缓存训练数据RDD # 使用SGD算法运行逻辑回归
model = LogisticRegressionWithSGD.train(trainingData) # 以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试。首先使用
# 一样的HashingTF特征来得到特征向量,然后对该向量应用得到的模型
posTest = tf.transform("O M G GET cheap stuff by sending money to ...".split(" "))
negTest = tf.transform("Hi Dad, I started studying Spark the other ...".split(" "))
print "Prediction for positive test example: %g" % model.predict(posTest)
print "Prediction for negative test example: %g" % model.predict(negTest)
五、回归
例子:Python 中的线性回归
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD points = # (创建LabeledPoint组成的RDD)
model = LinearRegressionWithSGD.train(points, iterations=200, intercept=True)
print "weights: %s, intercept: %s" % (model.weights, model.intercept)
六、聚类
当你要调用K-means算法时,你需要创建 mllib.clustering.KMeans 对象(在Java/Scala中)或者调用 KMeans.train (在Python中)。它接收一个Vector组成的RDD作为参数。K-means返回一个KMeansModel对象,该对象允许你访问其clusterCenters属性(聚类中心,是一个向量的数组)或者调用 predict() 来对一个新的向量返回它所属的聚类。
七、降维
PCA目前只在Java 和Scala(MLlib 1.2)中可用。要调用PCA,你首先要使用mllib. linalg.distributed.RowMatrix类来表示你的矩阵,然后存储一个由Vector 组成的RDD每行一个。
例子:Scala 中的 PCA
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix val points: RDD[Vector] = // ...
val mat: RowMatrix = new RowMatrix(points)
val pc: Matrix = mat.computePrincipalComponents(2) // 将点投影到低维空间中
val projected = mat.multiply(pc).rows // 在投影出的二维数据上训练k-means模型
val model = KMeans.train(projected, 10)
最新文章
- PostgreSQL 与 MySQL 相比,优势何在?
- http错误码大全?
- App主界面Tab实现方法
- robot API笔记4
- JAVA 图形界面 JFrame容器
- C#使用EmguCV实现视频读取和播放,及多个视频一起播放的问题
- Hadoop 1.1.2 Eclipse 插件使用——异常解决
- Android 动画学习笔记
- WPF学习小记
- 工欲善其事必先利其器之Xcode高效插件和舒适配色
- haskell,lisp,erlang你们更喜欢哪个?
- java_log4j----java 日志管理
- [强烈推荐]ORACLE PL/SQL编程详解之七:程序包的创建与应用(聪明在于学习,天才在于积累!)
- svn检出服务器项目中出现的could not connect to server
- AJAX数据请求
- MySQL统计函数记录——按月、按季度、按日、时间段统计
- 啥?客户叫在DataGridView的左上角添加CheckBox?
- 常见的HTTP状态码(HTTP Status Code)说明
- C#.NET 中的 Timer 计时器及 3 种使用方法
- 安装UEStudio以及破解
热门文章
- Acticiti流程引擎在已知当前流程定义id的情况下获取当前流程的所有信息(包括:节点和连线)
- vue 项目打包后静态资源加载不到
- Get提交方式中文乱码
- 【原创】xenomai内核解析--双核系统调用(二)--应用如何区分xenomai/linux系统调用或服务
- linux(centos8):禁用selinux(临时关闭/永久关闭)
- vue知识点15
- Jmeter入门(1)- 什么是Jmeter以及Jmeter的安装和环境配置
- Linux操作系统的介绍和安装教程(Centos6.4)
- 自定义View(进度条)
- Retrofit学习