近期项目组有需求点击流日志须要自己收集,学习了一下flume而且成功安装了。相关信息记录一下。

1)下载flume1.5版本号 

wget http://www.apache.org/dyn/closer.cgi/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gz

2) 解压flume1.5

tar -zxvf apache-flume-1.5.0.1-bin.tar.gz

3) 配置环境变量

jdk已装

export FLUME_HOME=/XXX/XX/apache-flume-1.5.0.1-bin

export PATH=$FLUME_HOME/bin:$PATH

4) 配置conf相关文件

4.1) 配置flume-env.sh 主要设置一下JAVA_HOME

4.2) 配置log4j.properties  

假设是測试环境凝视掉flume.root.logger=INFO,LOGFILE选择flume.root.logger=DEBUG,console把日志打印到控制台

4.3) 配置flume-conf.properties 这个文件名称能够随便改 执行命令时指定你自己创建的属性文件就可以

#set agent 名字为a1  sources名字为r1   sinks名字为k1    channels名字为c1

a1.sources = r1

a1.sinks = k1

a1.channels = c1

sources组件类型为exec 运行linux命令

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/flume/flume/conf/source.txt (大小tail -f有非常大差别,攻克了我们一个大问题)





sinks组件类型为logger

a1.sinks.k1.type = logger

channels组件类型为内存

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100





把sources、sinks与管道连通起来

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1





5) 在flume文件夹下执行命令

bin/flume-ng agent -n a1 -f test/source-tail-sink-logger.properties --conf conf





初步的样例完毕了。眼下我们生产环境是有两个节点往metaq里面生产数据。 metaq自己定义一个sink(自己定义sink见后面代码)

记得把metaq相关jar放到flume/lib下 gecko-1.1.4.jar metamorphosis-client-1.4.6.2.jar metamorphosis-commons-1.4.6.2.jar zkclient-0.3.jar zookeeper-3.4.3.jar





a1.sources = r1

a1.sinks = k1

a1.channels = c1





a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/flume/flume/conf/source.txt

a1.sinks.k1.type = com.XX.flume.sink.MetaQSink

a1.sinks.k1.sink.zkConnect = 0.0.0.0:2181,0.0.0.0:2181,0.0.0.0:2181

a1.sinks.k1.sink.zkRoot = /meta(此文件夹必须写死)

a1.sinks.k1.sink.topic = XXXX

a1.sinks.k1.sink.batchSize = 20000

#a1.channels.c1.type = memory

#a1.channels.c1.capacity = 1000000

#a1.channels.c1.transactionCapacity = 100000

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /home/hadoop/flume/flume/channel/checkpoint 

a1.channels.c1.dataDirs = /home/hadoop/flume/flume/channel/data





a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

自己定义sink代码

</pre><pre name="code" class="java">package com.jd.flume.sink;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; /**
* 功能描写叙述:
* <p/>
* 这个类主要是将flume收集的数据发送到metaq消息队列中
* <p/>
* ----------------------------
*/
public class MetaQSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(MetaQSink.class);
private MessageSessionFactory sessionFactory;
private MessageProducer producer; private String zkConnect;
private String zkRoot;
private String topic;
private int batchSize;
private int threadNum;
private ExecutorService executor; public MetaQSink() {
} @Override
public void configure(Context context) {
this.zkConnect = context.getString("sink.zkConnect");
this.zkRoot = context.getString("sink.zkRoot");
this.topic = context.getString("sink.topic");
this.batchSize = context.getInteger("sink.batchSize", 10000);
this.threadNum = context.getInteger("sink.threadNum", 50);
executor = Executors.newCachedThreadPool(); MetaClientConfig metaClientConfig = new MetaClientConfig();
ZkUtils.ZKConfig zkConfig = new ZkUtils.ZKConfig();
zkConfig.zkConnect = zkConnect;
zkConfig.zkRoot = zkRoot;
metaClientConfig.setZkConfig(zkConfig);
try {
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (MetaClientException e) {
e.printStackTrace();
logger.error("", e);
throw new RuntimeException("init error");
}
producer = sessionFactory.createProducer();
logger.info("zkConnect:" + zkConnect + ", zkRoot:" + zkRoot
+ ", topic:" + topic);
} @Override
public Status process() throws EventDeliveryException {
long start = System.currentTimeMillis();
producer.publish(topic);
Status result = Status.READY;
final Channel channel = getChannel();
final AtomicInteger al = new AtomicInteger(0);
final CountDownLatch cdl = new CountDownLatch(threadNum);
for (int t = 0; t < threadNum; t++) {
executor.execute(new Runnable() {
@Override
public void run() { Event event = null;
Transaction transaction = null;
int i = 0;
try {
transaction = channel.getTransaction();
transaction.begin();
boolean startTransaction = false;
for (i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
if (i == 0) {
producer.beginTransaction();
startTransaction = true;
}
final SendResult sendResult = producer
.sendMessage(new Message(topic, event
.getBody()));
// check result
if (!sendResult.isSuccess()) {
logger.error("Send message failed,error message:"
+ sendResult.getErrorMessage());
throw new RuntimeException(
"Send message failed,error message:"
+ sendResult
.getErrorMessage());
} else {
logger.debug("Send message successfully,sent to "
+ sendResult.getPartition());
}
} else {
// No event found, request back-off semantics
// from the sink
// runner
// result = Status.BACKOFF;
break;
} }
if (startTransaction) {
producer.commit();
}
al.addAndGet(i);
transaction.commit();
} catch (Exception ex) {
logger.error("error while rollback:", ex);
try {
producer.rollback();
} catch (Exception e) {
e.printStackTrace();
}
transaction.rollback();
} finally {
cdl.countDown();
transaction.close();
}
}
});
}
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (al.get() == 0) {
result = Status.BACKOFF;
} logger.info("metaqSink_new,process:{},time:{},queue_size:{}",
new Object[] { al.get(), System.currentTimeMillis() - start });
return result;
}
}

最新文章

  1. tomcat集群学习记录1--初识apache http server
  2. appledoc 使用brew命令安装使用
  3. mssql2000 身份证号码验证
  4. 利用jdk自带的运行监控工具JConsole观察分析Java程序的运行
  5. ssh框架开发问题
  6. 设置Eclipse中的tab键为4个空格的完整方法
  7. MySQL(22):事务管理之 事务回滚
  8. Android Spinner(级联 天气预报)
  9. Ofbiz 10.04 + eclipse 安装与配置
  10. EF 存储过程(上)
  11. Maven之多模块打包成一个jar包及assembly
  12. Ninja介绍
  13. ASP.NET Core之跨平台的实时性能监控(2.健康检查)
  14. SpringMVC过程中@RequestBody接收Json的问题 总是报415
  15. locust压测websocket协议
  16. &lt;转载&gt;XML操作
  17. centos7进入单用户模式
  18. MediatR 中介模式
  19. Python学习-40.Python中的迭代
  20. 【Python】理想论坛帖子读取爬虫1.04版

热门文章

  1. NET平台和C#
  2. Xcode7之后常见问题整理-b
  3. 浅析Linux操作系统工作的基础
  4. 【关于php】Appserv的安装注意事项
  5. nutch 二次开发
  6. Contest20140906 反思
  7. Early 80386 CPUs
  8. android逐行读取文件内容以及保存为文件
  9. android新建项目时 出现appcompat_v7工程错误和红色感叹号
  10. 网络流(最大流)CodeForces 512C:Fox And Dinner