数据源类型:数组列表

[{field:value}, {field:value}, {field:value}, {field:value}]

1. 定义http数据源链接

package com.etl.datalink;

import java.util.Map;

public class LinkHttp {

        private String url;
private Map<String,Object> params; public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Map<String, Object> getParams() {
return params;
}
public void setParams(Map<String, Object> params) {
this.params = params;
} }

2. 定义hdfs链接配置

package com.etl.datalink;

import org.apache.hadoop.conf.Configuration;

public class LinkHdfs {

        private Configuration conf = new Configuration();
private String fsName="fs.defaultFS";
private String fsURI; public LinkHdfs(String fsName, String fsURI) {
this.fsName = fsName;
this.fsURI = fsURI;
conf.set(this.fsName, this.fsURI);
} public LinkHdfs(String fsURI) {
this.fsURI = fsURI;
conf.set(this.fsName, this.fsURI);
} public String getFsName() {
return fsName;
} public void setFsName(String fsName) {
this.fsName = fsName;
} public String getFsURI() {
return fsURI;
} public void setFsURI(String fsURI) {
this.fsURI = fsURI;
} public Configuration getConf() {
return conf;
} public void setConf(Configuration conf) {
this.conf = conf;
} }

3. 定义泛型类用于传送http的内容到hdfs

这里存在一点小问题:由于json是数组列表,所以需要获取每条记录,然后加入换行符号\n写入hdfs。这样在hive中查询才能获取到多个记录。否则会全部当作一条记录。

/**
* 通用的http抽取数据到hdfs文件中
* @author KingWang
* @date 2018-10-15
* @description
*/
public class Api2Hdfs{ private static Logger log = Logger.getLogger(Api2Hdfs.class); public static <T> void run(String[] args, Class<T> clazz) { //http
String url = args[0];
String method = args[1];
String startTime = args[2];
String endTime = args[3]; //hdfs
String fsName = args[4];
String fsURI = args[5];
String targetFilePath = args[6];
//http config
Map<String,Object> params = new HashMap<String,Object>(); //....省略部分参数
params.put("timestamp", System.currentTimeMillis()/1000L);
params.put("start_time", startTime);
params.put("end_time", endTime); LinkHttp http = new LinkHttp();
http.setUrl(url);
http.setParams(params); //hdfs config
LinkHdfs hdfs = new LinkHdfs(fsName, fsURI);
try {
Api2Hdfs.process(http, hdfs, targetFilePath, clazz);
} catch(Exception e) {
e.printStackTrace();
}
} private static <T> void process(LinkHttp http,LinkHdfs hdfs, String hdfsFile, Class<T> clazz) throws Exception{ if(null==http) {
log.error("请求参数http未设置");
throw new Exception("请求参数http未设置");
}
if(null==hdfs) {
log.error("请求参数hdfs未设置");
throw new Exception("请求参数hdfs未设置");
} //创建http请求
String url = http.getUrl();
Map<String,Object> params = http.getParams();
OkHttpClient client = new OkHttpClient(); //添加参数
FormBody.Builder bodyParams=new FormBody.Builder();
if(params!=null && params.size() > 0) {
Iterator<Map.Entry<String,Object>> it = params.entrySet().iterator();
while(it.hasNext()) {
Map.Entry<String, Object> entry = it.next();
bodyParams.add(entry.getKey(), entry.getValue().toString());
}
} final Request request = new Request.Builder().url(url).post(bodyParams.build()).build();
Call call = client.newCall(request);
call.enqueue(new Callback() { //网络错误延迟处理
@Override
public void onFailure(Call call, IOException e) {
e.printStackTrace();
log.error(e.getMessage());
} @Override
public void onResponse(Call call, Response response) throws IOException {
FileSystem fs = null;
try { Path dstPath = new Path(hdfsFile);
fs = FileSystem.get(hdfs.getConf());
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
if(response.isSuccessful()) { //对后台返回的数据进行处理
System.out.println(df.format(LocalDateTime.now()) +" response.code:" +response.code());
if (200 == response.code()) { //注意:response.body().string()只能有效调用一次
ResponseInfo info = JSONObject.parseObject(response.body().string(), ResponseInfo.class); //error不为空,则错误
if(StringUtils.isNotBlank(info.getError())) {
log.error(info.getError());
} else { String rspcode = info.getResult().getRsp();
//写入hdfs
if(rspcode.equalsIgnoreCase(ResultCode.SUCCESS.getCode())) {
System.out.println(info.getResult().getData());
if(info.getResult().getData().equals("[]")) {
System.out.println(df.format(LocalDateTime.now()) + " " + info.getResult().getMsg());
} else {
List<T> objList = JSON.parseArray(info.getResult().getData(),clazz);
// byte[] bt = info.getResult().getData().getBytes();
FSDataOutputStream outputStream = fs.create(dstPath);
int size = objList.size();
for(int i=0;i<size; i++) {
String orderstr = JSON.toJSONString(objList.get(i)) + '\n';
System.out.println(orderstr);
outputStream.write(orderstr.getBytes());
if(i % 1000==0) {
outputStream.flush();
}
}
outputStream.flush();
outputStream.close();
log.info("create file " + hdfsFile + " success!");
}
} else {
log.error(info.getResult().getMsg());
}
}
}
//对后台返回200~300之间的错误进行处理
else {
log.error(response.message());
} //fs.close();
}
}catch (Exception e){
e.printStackTrace();
log.error(e.getMessage());
}finally {
fs.close();
//关闭
if(response.body()!=null) {
response.body().close();
}
}
log.info("write hdfs file end: " + hdfsFile);
}
}); } }

4. 定义bean用于解析, 由于定义了泛型,可以针对不同到接口定义不同的bean。

类似如下

5. 定义执行的每个接口主类:

public class MemberApi extends Api2Hdfs{

        public static void main(String[] args) {
Api2Hdfs.run(args, Member.class);
}
}
public class OrderApi extends Api2Hdfs{

        public static void main(String[] args) {
Api2Hdfs.run(args, Order.class);
}
}

6. 定义每个接口的shell脚本,执行即可。

java -Djava.ext.dirs=lib com.etl.MemberApi \
${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt} >> ./logs/${table}.log >& &
java -Djava.ext.dirs=lib com.etl.OrderApi \
${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt} >> ./logs/${table}.log >& &

最新文章

  1. 初试Scala解析XML
  2. C#压缩库SharpZipLib的应用
  3. [TCPIP] 分层 Note
  4. [深入浅出Windows 10]应用实战:Bing在线壁纸
  5. AC自动机 - 关于Fail指针
  6. python 连接 mysql
  7. Spring3.2.2之后不赞成使用queryForInt
  8. 关于jquery插件 入门
  9. 《赢在用户:Web人物角色创建和应用实践指南》阅读总结
  10. demo_02 less
  11. C++调用C#之C++DLL调用C# COM控件
  12. UICollectionView中Cell左对齐 居中 右对齐 等间距------你想要的,这里都有
  13. C# MessageBox.Show每隔3秒自动关闭
  14. java笔记02
  15. 测试同学难道要写一辈子的hello world?
  16. Windows下创建ArcGIS Server站点
  17. 教你轻松快速学会用Calibre TXT转MOBI
  18. @WebFilter怎么控制多个filter的执行顺序
  19. DBMS_OUTPUT包学习
  20. Floyd 和 bellman 算法

热门文章

  1. 第2章 Python基础-字符编码&amp;数据类型 字典 练习题
  2. gitlab 迁移、升级打怪之路:8.8.5--&gt; 8.10.8 --&gt; 8.17.8 --&gt; 9.5.9 --&gt; 10.1.4 --&gt; 10.2.5
  3. activiti并行和串行区别
  4. Visual F# Power Tools 简单介绍
  5. [CTCI] 最长合成字符串
  6. Android 开发之修改 app 的字体大小(老人模式)
  7. ConEmu配置task的脚本
  8. Unity投影器细节整理
  9. [tools]hugo&amp;github构建静态网站/百度统计
  10. python List使用