最近几天,在研究怎么样把日志中的IP地址转化成具体省份城市。

希望写一个pig udf

IP数据库采用的纯真IP数据库文件qqwry.dat,可以从http://www.cz88.net/下载。

这里关键点在于怎么样读取这个文件,浪费了二天时间,现在把代码记录下来供和我遇到相同问题的朋友参考。

pig script

register /usr/local/pig/mypigudf.jar;
define ip2address my.pig.func.IP2Address('/user/anny/qqwry.dat'); a = load '/user/anny/hdfs/logtestdata/ipdata.log' as (ip:chararray);
b = foreach a generate ip,ip2address(ip) as cc:map[chararray];
c = foreach b generate ip,cc#'province' as province,cc#'city' as city,cc#'region' as region;
dump c;

java写的pig udf:

package my.pig.func;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import my.pig.func.IPConvertCity.IPSeeker;
import my.pig.func.IPConvertCity.IPUtil;
import my.pig.func.IPConvertCity.LogFactory; import org.apache.log4j.Level;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema; public class IP2Address extends EvalFunc<Map<String, Object>> {
private String lookupFile = "";
private RandomAccessFile objFile = null; public IP2Address(String file) {
this.lookupFile = file;
} @Override
public Map<String, Object> exec(Tuple input) throws IOException {
if (input == null || input.size() == 0 || input.get(0) == null)
return null;
Map<String, Object> output = new HashMap<String, Object>();
String str = (String) input.get(0);
try {
if (str.length() == 0)
return output; if (objFile == null) {
try {
objFile = new RandomAccessFile("./qqwry.dat", "r");
} catch (FileNotFoundException e1) {
System.out.println("IP地址信息文件没有找到" + lookupFile);
return null;
}
}
IPSeeker seeker = new IPSeeker(objFile);
String country = seeker.getCountry(str);
output = IPUtil.splitCountry(country); return output;
} catch (Exception e) {
return output;
}
} @Override
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.MAP));
} public List<String> getCacheFiles() {
List<String> list = new ArrayList<String>(1);
list.add(lookupFile + "#qqwry.dat");
return list;
}
}

Search for "Distributed Cache" in this page of the Pig docs: http://pig.apache.org/docs/r0.11.0/udf.html

The example it shows using the getCacheFiles() method should ensure that the file is accessible to all the nodes in the cluster.

参考文章:http://stackoverflow.com/questions/17514022/access-hdfs-file-from-udf

http://stackoverflow.com/questions/19149839/pig-udf-maxmind-geoip-database-data-file-loading-issue

最新文章

  1. 【腾讯Bugly干货分享】Android进程保活招式大全
  2. android: 实现跨程序数据共享
  3. HTML5新特性之Web Worker
  4. iOS-OC根据时间戳获取距离现在的状态(刚刚,分钟前,今天,昨天)
  5. loadrunner 脚本和replaylog中的中文乱码问题(转载)
  6. MVC4 数据库连接字串
  7. php mysql_insert_id() 获取为空
  8. Jetty支持Windows认证
  9. Persistent Bookcase
  10. Python实战之Selenium自动化测试web登录
  11. 【nginx】4xx,5xx 保持自定义header
  12. JS 实现DIV 滚动至顶部后固定
  13. Daily Scrum 10.20
  14. 字符编码问题mysql
  15. Number of Islands I &amp; II
  16. 状压dp2
  17. 利用开源软件 Hugin 实现照片的景深合成
  18. Thymeleaf学习记录(5)--运算及表单
  19. vector 类简介和例程
  20. python cookbook

热门文章

  1. MysQL使用一高级应用(上)
  2. wamp 环境下配置多台虚拟主机
  3. 爬虫框架Scrapy之案例一
  4. JQuery中serialize()
  5. 分词工具比较及使用(ansj、hanlp、jieba)
  6. 配置了java环境变量后不起作用
  7. Gray Code,求格林码
  8. Iterator 和 for...of 循环
  9. 【Python】模块学习之使用paramiko连接Linux,远程执行命令,上传下载、文件
  10. elasticsearch 集群的安装部署