1 public class TopK extends Configured implements Tool {

     public static class TopKMapper extends Mapper<Object, Text, NullWritable, LongWritable> {

         public static final int K = 100;
private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); @Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
try {
long k = Integer.parseInt(value.toString().substring(0, 9));
tm.put(k, k);
if (tm.size() > K) {
tm.remove(tm.firstKey());
}
} catch (Exception e) {
context.getCounter("TopK", "errorlog").increment(1);
}
} @Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Long text : tm.values()) {
context.write(NullWritable.get(), new LongWritable(text));
}
}
} public static class TopKReducer extends Reducer<NullWritable, LongWritable, NullWritable, LongWritable> { public static final int K = 100;
private TreeMap<Long, Long> mt = new TreeMap<Long, Long>(); @Override
protected void reduce(NullWritable key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
for (LongWritable value : values) {
mt.put(value.get(), value.get());
if (mt.size() > K) {
mt.remove(mt.firstKey());
}
}
for (Long val : mt.descendingKeySet()) {
context.write(NullWritable.get(), new LongWritable(val));
}
} } @Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "TopKNum");
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(TopKMapper.class);
job.setReducerClass(TopKReducer.class);
job.setJarByClass(TopK.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class); return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws IOException, InterruptedException {
try {
if (args.length < 2) {
System.err.println("ERROR: Parameter format length ");
System.exit(0);
}
int ret = ToolRunner.run(new TopK(), args);
System.exit(ret);
} catch (Exception e) {
e.printStackTrace();
}
}
}

上面是求最大100个,如果求最小的100 个数,改map和reduce中的mt.remove(mt.firstKey());为mt.remove(mt.lastKey())

来自:http://blog.csdn.net/liuzhoulong/article/details/11175381

最新文章

  1. AngularJs的$http发送POST请求,php无法接收Post的数据解决方案
  2. web form 复合控件
  3. Vue.js进阶
  4. 【网络流24题】No. 17 运输问题 (费用流)
  5. 移植openssh到nuc951 evb板
  6. Node填坑教程——HelloWorld
  7. ListNode线性表
  8. 如何解决Visual Studio2012 与此版本的Windows不兼容
  9. 如何在一个项目中同时包含mvc建站、webapi接口
  10. HTTP 错误码
  11. 走进javascript——重拾数组
  12. VMware 15 pro虚拟机
  13. 打开控制台F12弹出弹窗
  14. pyspider爬一批文章保存到word中
  15. 任意格式视频转MP4格式
  16. mysql linux 安装卸载
  17. Oracle 12C -- Plug in a Non-CDB as a PDB
  18. Python学习-1.安装Python
  19. 最全的HTTP 响应状态码列表!
  20. vlc源码分析(三) 调用live555接收RTSP数据

热门文章

  1. PostgreSQL各命令行工具功能说明
  2. XStream转换Java对象与XML
  3. 用VC资源动态链接库解决国际化问题
  4. 使用 NuGet 更新套件時將 jQuery 升級到 2.0.2 應該如何降級
  5. Python脚本报错AttributeError: 'module' object has no attribute 'maketrans'
  6. Spring 反射注入+全注解注入
  7. FFmpeg深入分析之零-基础
  8. ubuntu12.04-server版 倒腾
  9. Win8 下配置Java开发环境
  10. Asp.net FileUpload+Image制作头像效果