package iie.udps.example.spark.mllib;

import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors; /**
* Example using MLLib KMeans from Java.
*
* spark-submit --class iie.udps.example.spark.mllib.JavaKMeans --master
* yarn-cluster --num-executors 15 --driver-memory 512m --executor-memory 2g
* --executor-cores 2 /home/xdf/test2.jar /user/xdf/Example.txt 10 2
*/
public final class JavaKMeans { @SuppressWarnings("serial")
private static class ParsePoint implements Function<String, Vector> {
private static final Pattern SPACE = Pattern.compile(","); @Override
public Vector call(String line) {
String[] tok = SPACE.split(line);
// 统一数据维度为3,此处没有考虑其他异常数据情况
if (tok.length < 3) {
tok = SPACE.split(line + ",0");
for (int i = tok.length; i < 3; i++) {
tok[i] = "0";
}
}
if (tok.length > 3) {
tok = SPACE.split("0,0,0");
}
double[] point = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
point[i] = Double.parseDouble(tok[i]);
}
return Vectors.dense(point);
} } public static void main(String[] args) {
if (args.length < 3) {
System.err
.println("Usage: JavaKMeans <input_file> <k> <max_iterations> [<runs>]");
System.exit(1);
}
String inputFile = args[0]; // 要读取的文件
int k = Integer.parseInt(args[1]); // 聚类个数
int iterations = Integer.parseInt(args[2]); // 迭代次数
int runs = 1; // 运行算法次数 if (args.length >= 4) {
runs = Integer.parseInt(args[3]);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans");
// sparkConf.set("spark.default.parallelism", "4");
// sparkConf.set("spark.akka.frameSize", "1024");
System.setProperty(
"dfs.client.block.write.replace-datanode-on-failure.enable",
"true");
System.setProperty(
"dfs.client.block.write.replace-datanode-on-failure.policy",
"never");
// sparkConf.set(
// "dfs.client.block.write.replace-datanode-on-failure.enable",
// "true");
// sparkConf.set(
// "dfs.client.block.write.replace-datanode-on-failure.policy",
// "never");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// 指定文件分片数
JavaRDD<String> lines = sc.textFile(inputFile,2400);// ,1264 , 1872,2400
JavaRDD<Vector> points = lines.map(new ParsePoint()); KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs,
KMeans.K_MEANS_PARALLEL()); // System.out.println("Vector 98, 345, 90 belongs to clustering :"
// + model.predict(Vectors.dense(98, 345, 90)));
// System.out.println("Vector 748, 965, 202 belongs to clustering :"
// + model.predict(Vectors.dense(748, 965, 202)));
// System.out.println("Vector 310, 554, 218 belongs to clustering :"
// + model.predict(Vectors.dense(310, 554, 218))); System.out.println("Cluster centers:");
for (Vector center : model.clusterCenters()) {
System.out.println(" " + center); }
double cost = model.computeCost(points.rdd());
System.out.println("Cost: " + cost); sc.stop();
}
}

  

最新文章

  1. 【JavaScript】 Webpack安装及文件打包
  2. sys.stdout.write与sys.sterr.write(二)
  3. Linux[Fedora]查找文件包含的字段
  4. 安装生物信息学软件-Samtools
  5. 【网络编程】——windows socket 编程
  6. 【nginx】配置文件的优化
  7. linux sed命令参数及用法详解
  8. 怎么用OCR图文识别软件在MS Office中创建PDF文件
  9. 九度 1371 最小的K个数
  10. lc面试准备:Remove Duplicates from Sorted List II
  11. JS方法在iframe父子窗口间的调用
  12. java——递归调用
  13. Delphi关键词
  14. JPA EntityManagerFactory Vs Hibernate’s SessionFactory
  15. 5.如何修改maven本地仓库
  16. 有关Lambda的一些思考
  17. centos7使用yum安装mysql 【转】
  18. 获取链接的参数,判断是否是微信打开,ajax获取数据
  19. hive表的存储路径查找以及表的大小
  20. jqGrid 刷新单行数据

热门文章

  1. Servlet容器如何同时来处理多个请求
  2. DataGridView批量执行Insert和Remove行时特别慢的解决方案
  3. Android 4.3正式发布:四大新功能一览
  4. IBatis.net 输出SQL语句(七)
  5. ASP.NET MVC学习之控制器篇(二)
  6. 用PHP的socket实现客户端到服务端的通信
  7. FR报表 自动缩小的代码
  8. Access 中数据库操作时提示from子句语法错误
  9. libbspatch.so
  10. Http Framework