官方 http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/

mongo-haoop项目地址 https://github.com/mongodb/mongo-hadoop

该代码托管 https://github.com/cclient/mongo_hadoop_map-reduce

原分析 由nodejs+async编写

用游标迭代查询mongo数据库,分析数据

因数据量较大,目前执行分析任务耗时4个小时,这只是极限数据量的1%

为优化,采用hadoop-mongo 方案

优点:mongo只能单机单线程(不作shard的情况),hadoop-mongo可以集群处理。

完成代码

近期一直写的脚本语言,再回头写点JAVA,好悲催,感觉很受限制。

初步代码 很粗糙

MAIN 入口

 package group.artifactid;

 //cc MaxTemperature Application to find the maximum temperature in the weather dataset
//vv MaxTemperature
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.mongodb.hadoop.MongoConfig;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoTool; import com.mongodb.hadoop.MongoConfig;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.util.MapredMongoConfigUtil;
import com.mongodb.hadoop.util.MongoConfigUtil;
import com.mongodb.hadoop.util.MongoTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.util.ToolRunner; public class MongoMaxTemperature extends MongoTool {
public MongoMaxTemperature() {
Configuration conf = new Configuration();
MongoConfig config = new MongoConfig(conf);
setConf(conf);
MongoConfigUtil.setInputFormat(getConf(), MongoInputFormat.class);
MongoConfigUtil.setOutputFormat(getConf(), MongoOutputFormat.class);
config.setInputURI("mongodb://localhost:27017/db1.collection1");
config.setMapper(MongoMaxTemperatureMapper.class);
// Combiner
config.setCombiner(MongoMaxTemperatureCombine.class);
// config.setReducer(MongoMaxTemperatureReducer.class);
config.setReducer(MongoMaxTemperatureReducerCombine.class);
config.setMapperOutputKey(Text.class);
config.setMapperOutputValue(Text.class);
config.setOutputKey(Text.class);
config.setOutputValue(BSONWritable.class);
config.setOutputURI("mongodb://localhost:27017/db2.collection2");
} public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MongoMaxTemperature(), args));
}
}

MAPER代码

package group.artifactid;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.bson.BSONObject; import com.mongodb.hadoop.io.BSONWritable; public class MongoMaxTemperatureMapper extends
Mapper<Object, BSONObject, Text, Text> {
@Override
public void map(final Object key, BSONObject val, Context context)
throws IOException, InterruptedException {
String apmac = (String) val.get("apMac");
String clientmac = (String) val.get("clientMac");
String url = (String) val.get("url");
String proto = (String) val.get("proto");
if (proto.equals("http")&&!url.equals("")) {
if (url.indexOf("http://") == 0) {
url = url.substring(7);
}
int firstargindex = url.indexOf('/');
if(firstargindex>-1){
url = url.substring(0, firstargindex);
}
//验证输入 带.则参数错误,临时转为}
url=url.replace('.','}');
context.write(new Text(apmac), new Text(clientmac + url));
}
}
}

COMBINE代码

package group.artifactid;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import com.mongodb.hadoop.io.BSONWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.bson.BasicBSONObject; public class MongoMaxTemperatureReducerCombine extends
Reducer<Text, Text, Text, BSONWritable> {
public class UrlCount {
public UrlCount(String url, int count) {
this.Url = url;
this.Count = count;
}
String Url;
int Count;
}
public List<UrlCount> compresstopobj(BasicBSONObject topobj, int topnum) {
List<UrlCount> studentList = new ArrayList<UrlCount>();
for (Map.Entry<String, Object> entry : topobj.entrySet()) {
String Url = entry.getKey();
String scount = entry.getValue().toString();
studentList.add(new UrlCount(Url, Integer.parseInt(scount)));
}
Collections.sort(studentList, new Comparator<UrlCount>() {
@Override
public int compare(UrlCount o1, UrlCount o2) {
if (o1.Count > o2.Count) {
return -1;
} else if (o1.Count < o2.Count) {
return 1;
} else {
return 0;
}
}
});
// System.out.print("--------这里排序成功,但入库时,mongo按键名()排序,这里的排序是为筛选前100条用\n");
// for (int i = 0; i < studentList.size(); i++) {
// System.out.print(studentList.get(i).Count + "\n");
// }
if (studentList.size() > topnum) {
studentList = studentList.subList(0, topnum);
}
return studentList;
} @Override
public void reduce(Text apmac, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
BasicBSONObject clientmacmap = new BasicBSONObject();
int count = 0;
for (Text value : values) {
String subline = value.toString();
String clientmac = subline.substring(0, 17);
int indexcount = subline.indexOf("|");
int maplastcount = 1;
String url = null;
if (indexcount > -1) {
indexcount++;
url = subline.substring(17, indexcount);
String mapcount = subline.substring(indexcount);
maplastcount = Integer.parseInt(mapcount); } else {
url = subline.substring(17);
}
BasicBSONObject urlmap = (BasicBSONObject) clientmacmap
.get(clientmac);
if (urlmap == null) {
urlmap = new BasicBSONObject();
clientmacmap.put(clientmac, urlmap);
}
Object eveurl = urlmap.get(url); if (eveurl == null && !url.equals(" ")) {
urlmap.put(url, maplastcount);
} else {
urlmap.put(url, Integer.parseInt(eveurl.toString())
+ maplastcount);
}
count++;
if (count == 10000) {
List<UrlCount> arr = compresstopobj(urlmap, 100);
BasicBSONObject newurlcmap = new BasicBSONObject();
for (int i = 0; i < arr.size(); i++) {
UrlCount cuc = arr.get(i);
newurlcmap.put(cuc.Url, cuc.Count);
}
urlmap = newurlcmap;
}
}
for (Map.Entry<String, Object> entry : clientmacmap.entrySet()) {
BasicBSONObject urlmap = (BasicBSONObject) entry.getValue();
List<UrlCount> arr = compresstopobj(urlmap, 100);
BasicBSONObject newurlcmap = new BasicBSONObject();
for (int i = 0; i < arr.size(); i++) {
UrlCount cuc = arr.get(i);
newurlcmap.put(cuc.Url, cuc.Count);
}
urlmap = newurlcmap;
}
context.write(apmac, new BSONWritable(clientmacmap));
}
}

REDUCER代码

package group.artifactid;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet; import com.mongodb.hadoop.io.BSONWritable; import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.bson.BasicBSONObject; public class MongoMaxTemperatureReducer extends
Reducer<Text, Text, Text, BSONWritable> {
public class UrlCount {
public UrlCount(String url, int count) {
this.Url = url;
this.Count = count;
}
String Url;
int Count;
}
class SortByCount implements Comparator {
public int compare(Object o1, Object o2) {
UrlCount s1 = (UrlCount) o1;
UrlCount s2 = (UrlCount) o2;
if (s1.Count > s2.Count)
return 1;
return 0;
}
}
public List<UrlCount> compresstopobj(BasicBSONObject topobj, int topnum) {
List<UrlCount> studentList = new ArrayList<UrlCount>();
for (Map.Entry<String, Object> entry : topobj.entrySet()) {
String Url = entry.getKey();
String scount = entry.getValue().toString();
System.out.print(scount + "\n");
studentList.add(new UrlCount(Url, Integer.parseInt(scount)));
}
Collections.sort(studentList, new SortByCount());
if (studentList.size() > topnum) {
studentList = studentList.subList(0, topnum);
}
return studentList;
}
@Override
public void reduce(Text apmac, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
BasicBSONObject clientmacmap = new BasicBSONObject();
int count = 0;
for (Text value : values) {
String subline = value.toString();
String clientmac = subline.substring(0, 17);
String url = subline.substring(17);
BasicBSONObject urlmap = (BasicBSONObject) clientmacmap
.get(clientmac);
if (urlmap == null) {
urlmap = new BasicBSONObject();
clientmacmap.put(clientmac, urlmap);
}
Object eveurl = urlmap.get(url);
if (eveurl == null && !url.equals(" ")) {
urlmap.put(url, 1);
} else {
urlmap.put(url, Integer.parseInt(eveurl.toString()) + 1);
}
count++;
if (count == 1000) {
List<UrlCount> arr = compresstopobj(urlmap, 100);
BasicBSONObject newurlcmap = new BasicBSONObject();
for (int i = 0; i < arr.size(); i++) {
UrlCount cuc = arr.get(i);
newurlcmap.put(cuc.Url, cuc.Count);
}
urlmap = newurlcmap;
}
}
context.write(apmac, new BSONWritable(clientmacmap));
}
}

Mongo collection 数据格式

{
"_id" : ObjectId("54d83f3548c9bc218e056ce6"),"apMac" : "aa:bb:cc:dd:ee:ff","proto" : "http",
"url" : "extshort.weixin.qq.comhttp",
"clientMac" : "ff:ee:dd:cc:bb:aa"
}

clientMac和url 先拼在一起,再按mac长度分割

数据流程

orgin->map

map:[{"aa:bb:cc:dd:ee:ff":[ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp]}]

假如是多条数据则

map:[{"aa:bb:cc:dd:ee:ff":["ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp","ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp1","ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp2"]}]

map->compine

如果有相同的client+url 则统计个数,以|分隔

compine:[{"aa:bb:cc:dd:ee:ff":[ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp|100]}]

compine->reducer

reducer中 按mac长度分割出 clientMac url 再按“|”分割出 个数

统计前每个clientMac的前100条

reduce:

{
"_id": "00:21:26:00:0A:FF",
"aa:bb:cc:1c:b9:8f": {
"c}tieba}baidu}com|": 1,
"short}weixin}qq}comhttp:|": 1,
"get}sogou}com|": 1,
"md}openapi}360}cn|": 1,
"74}125}235}224|": 1,
"mmbiz}qpic}cn|": 1,
"tb}himg}baidu}com|": 1
},
"cc:bb:aa:d5:30:8a": {
"captive}apple}com|": 2,
"www}airport}us|": 1,
"www}itools}info|": 2,
"www}thinkdifferent}us|": 1,
"www}ibook}info|": 1
},
"ee:ee:bb:78:31:74": {
"www}itools}info|": 1,
"www}ibook}info|": 1
} }

最新文章

  1. 使用纯CSS实现带箭头的提示框
  2. 删除mysql binlog日志
  3. WindowsService 安装 cmd
  4. [UWP]涨姿势UWP源码——Unit Test
  5. leetcode 74. Search a 2D Matrix
  6. CL.exe的 /D 选项, Preprocessor Macro预处理器宏定义
  7. javascript代码注意事项
  8. 代码实现native2assci
  9. bg-route
  10. JavaScript实现八大内部排序算法
  11. mysql插件的初始化
  12. 前端传递给后端且通过cookie方式,尽量传递id
  13. 【bfs】献给阿尔吉侬的花束
  14. StringUtils工具类常用方法汇总(判空、转换、移除、替换、反转)
  15. WebService服务发布与使用(JDK自带WebService)
  16. 清除win下连接的linux的samba服务缓存 用户名和密码
  17. what’s this?
  18. [LeetCode] 785. Is Graph Bipartite?_Medium tag: DFS, BFS
  19. 转载-MyBatis学习总结
  20. HashMap+双向链表手写LRU缓存算法/页面置换算法

热门文章

  1. centos 7 相关的一些记录
  2. PIC32MZ tutorial -- UART Communication
  3. PHP中类自动加载的方式
  4. R12_专题知识总结提炼-AR模块
  5. iframe跨域cookie问题
  6. EUI ViewStack实现选项卡组件
  7. WPF绘制矢量图形模糊的问题
  8. HostOnly Cookie和HttpOnly Cookie
  9. python的rename原来这么用
  10. 前端资源构建-Grunt环境搭建