Spark FPGrowth (Frequent Pattern Mining)
2024-08-21 13:35:25
给定交易数据集,FP增长的第一步是计算项目频率并识别频繁项目。与为同样目的设计的类似Apriori的算法不同,FP增长的第二步使用后缀树(FP-tree)结构来编码事务,而不会显式生成候选集,生成的代价通常很高。第二步之后,可以从FP树中提取频繁项集。
import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate() // For implicit conversions like converting RDDs to DataFrames
import spark.implicits._ val data = List(
"1,2,5",
"1,2,3,5",
"1,2").toDF("items")
data: org.apache.spark.sql.DataFrame = [items: string] // 注意每行,头部和尾部的[中括号
data.rdd.map { s => s.toString() }.collect().take(3)
res20: Array[String] = Array([1,2,5], [1,2,3,5], [1,2]) val transactions: RDD[Array[String]] = data.rdd.map {
s =>
val str = s.toString().drop(1).dropRight(1)
str.trim().split(",")
} val fpg = new FPGrowth().setMinSupport(0.5).setNumPartitions(8) val model = fpg.run(transactions) /* model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}*/ val freqItemSets = model.freqItemsets.map { itemset =>
val items = itemset.items.mkString(",")
val freq = itemset.freq
(items, freq)
}.toDF("items", "freq")
freqItemSets: org.apache.spark.sql.DataFrame = [items: string, freq: bigint] freqItemSets.show
+-----+----+
|items|freq|
+-----+----+
| 1| 3|
| 2| 3|
| 2,1| 3|
| 5| 2|
| 5,2| 2|
|5,2,1| 2|
| 5,1| 2|
+-----+----+ val minConfidence = 0.6
minConfidence: Double = 0.6 /*model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(
rule.antecedent.mkString("[", ",", "]")
+ " => " + rule.consequent.mkString("[", ",", "]")
+ ", " + rule.confidence)
}*/ // 根据置信度生成关联规则
val Rules = model.generateAssociationRules(minConfidence)
Rules: org.apache.spark.rdd.RDD[org.apache.spark.mllib.fpm.AssociationRules.Rule[String]] = MapPartitionsRDD[129] at filter at AssociationRules.scala:80 val df = Rules.map { s =>
val L = s.antecedent.mkString(",")
val R = s.consequent.mkString(",")
val confidence = s.confidence
(L, R, confidence)
}.toDF("left_collect", "right_collect", "confidence")
df: org.apache.spark.sql.DataFrame = [left_collect: string, right_collect: string ... 1 more field] df.show
+------------+-------------+------------------+
|left_collect|right_collect| confidence|
+------------+-------------+------------------+
| 2| 5|0.6666666666666666|
| 2| 1| 1.0|
| 5,2| 1| 1.0|
| 5| 2| 1.0|
| 5| 1| 1.0|
| 1| 5|0.6666666666666666|
| 1| 2| 1.0|
| 2,1| 5|0.6666666666666666|
| 5,1| 2| 1.0|
+------------+-------------+------------------+
最新文章
- rhel7端口开放和查询
- 安装vmall5:从ebak恢复数据,需要配置php.ini
- ExtJS 刷新后,默认选中刷新前最后一次选中的节点
- Lab_5_SysOps_Resources_Linux_v2.5
- CC2541连接BTool教程
- MVC的路由
- 从 Auto Layout 的布局算法谈性能
- Android中attr自定义属性详解
- knockoutjs关键点
- cookie 和 session
- 关于javascript面向对象之闭包
- 实现JQuery_Accordion功能
- ps命令用法详解(转)
- WeQuant交易策略—5日均线
- python自动化开发-[第二天]-基础数据类型与编码(续)
- CentOS7 上以 RPM 包方式安装 Oracle 18c 单实例
- Mysql模糊查询Like传递参数的语句
- Java泛型方法与泛型类的使用------------(五)
- keycloak
- RabbitMQ 消息应答机制(message acknowledgments)