目的

实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)

源码

  1. import java.io.BufferedReader;
  2. import java.io.BufferedWriter;
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5. import java.io.FileNotFoundException;
  6. import java.io.FileReader;
  7. import java.io.FileWriter;
  8. import java.io.IOException;
  9. import java.io.LineNumberReader;
  10. import java.util.ArrayList;
  11. import java.util.Collection;
  12. import java.util.HashMap;
  13. import java.util.List;
  14. import java.util.Properties;
  15. import java.util.Random;
  16. import kafka.javaapi.producer.Producer;
  17. import kafka.producer.KeyedMessage;
  18. import kafka.producer.ProducerConfig;
  19. ;
  20. public class XTail_Line {
  21. public static class TailFileThread extends Thread
  22. {
  23. File file;
  24. LineNumberReader randomFile=null;
  25. String newfile=null;
  26. String thisfile=null;
  27. String prefile=null;
  28. private long lastTimeFileSize = 0;
  29. private String drname=null;
  30. int ln=0;
  31. int beginln=0;
  32. private Producer<String,String> inner;
  33. java.util.Random ran = new Random();
  34. String topicname=null;
  35. public TailFileThread(String path,String drname,String topicname) throws FileNotFoundException, IOException
  36. {
  37. file=new File(path);
  38. this.drname=drname;
  39. this.topicname=topicname;
  40. Properties properties = new Properties();
  41. //   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
  42. properties.load(new FileInputStream("producer.properties"));
  43. ProducerConfig config = new ProducerConfig(properties);
  44. inner = new Producer<String, String>(config);
  45. }
  46. public void send(String topicName,String message) {
  47. if(topicName == null || message == null){
  48. return;
  49. }
  50. //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
  51. //随机作为key,hash分散到各个分区
  52. KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);
  53. //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);
  54. inner.send(km);
  55. }
  56. public void send(String topicName,Collection<String> messages) {
  57. if(topicName == null || messages == null){
  58. return;
  59. }
  60. if(messages.isEmpty()){
  61. return;
  62. }
  63. List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
  64. for(String entry : messages){
  65. KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
  66. kms.add(km);
  67. }
  68. inner.send(kms);
  69. }
  70. public void close(){
  71. inner.close();
  72. }
  73. public String getNewFile(File file)
  74. {
  75. File[] fs=file.listFiles();
  76. long maxtime=0;
  77. String newfilename="";
  78. for (int i=0;i<fs.length;i++)
  79. {
  80. if (fs[i].isFile()&&fs[i].lastModified()>maxtime)
  81. {
  82. maxtime=fs[i].lastModified();
  83. newfilename=fs[i].getAbsolutePath();
  84. }
  85. }
  86. return newfilename;
  87. }
  88. //写入文件名及行号
  89. public void writePosition(String path,int rn)
  90. {
  91. try {
  92. BufferedWriter out = new BufferedWriter(new FileWriter(drname+".position"));
  93. out.write(path+","+rn);
  94. out.close();
  95. } catch (IOException e) {
  96. }
  97. }
  98. public void run()
  99. {
  100. thisfile=getNewFile(file);
  101. prefile=thisfile;
  102. //访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件
  103. try {
  104. BufferedReader br=new BufferedReader(new FileReader(drname+".position"));
  105. String line=br.readLine();
  106. if (line!=null &&line.contains(","))
  107. {
  108. thisfile=line.split(",")[0];
  109. prefile=thisfile;
  110. beginln=Integer.parseInt(line.split(",")[1]);
  111. }
  112. } catch (FileNotFoundException e2) {
  113. // TODO Auto-generated catch block
  114. e2.printStackTrace();
  115. }
  116. catch (IOException e2) {
  117. // TODO Auto-generated catch block
  118. e2.printStackTrace();
  119. }
  120. //指定文件可读可写
  121. try {
  122. randomFile = new LineNumberReader(new FileReader(thisfile));
  123. } catch (FileNotFoundException e) {
  124. // TODO Auto-generated catch block
  125. e.printStackTrace();
  126. }
  127. while (true)
  128. {
  129. try {
  130. Thread.sleep(100);
  131. //调用interrupt方法后
  132. if(isInterrupted())
  133. {
  134. System.out.println("Interrupted...");
  135. break;
  136. }
  137. } catch (InterruptedException e1) {
  138. // TODO Auto-generated catch block
  139. e1.printStackTrace();
  140. }
  141. try {
  142. //获得变化部分的
  143. //  randomFile.seek(lastTimeFileSize);
  144. String tmp = "";
  145. while( (tmp = randomFile.readLine())!= null) {
  146. int currln=randomFile.getLineNumber();
  147. //beginln默认为0
  148. if (currln>beginln)
  149. send(topicname,new String(tmp.getBytes("utf8")));
  150. ln++;
  151. //每发生一条写一次影响效率
  152. if (ln>100)
  153. {
  154. writePosition(thisfile,currln);
  155. ln=0;
  156. }
  157. }
  158. thisfile=getNewFile(file);
  159. if(!thisfile.equals(prefile))
  160. {
  161. randomFile.close();
  162. randomFile = new LineNumberReader(new FileReader(thisfile));
  163. prefile=thisfile;
  164. beginln=0;
  165. }
  166. } catch (IOException e) {
  167. throw new RuntimeException(e);
  168. }
  169. }
  170. }
  171. }
  172. public static void main(String[] args) throws Exception {
  173. /*
  174. LogView view = new LogView();
  175. final File tmpLogFile = new File("D:\\test.txt");
  176. view.realtimeShowLog(tmpLogFile);
  177. */
  178. if (args.length!=2)
  179. {
  180. System.out.println("usage:topicname pathname");
  181. System.exit(1);
  182. }
  183. String topicname=args[0];
  184. String pathname=args[1];
  185. HashMap<String,TailFileThread> hm=new HashMap<String,TailFileThread>();
  186. File tmpLogFile = new File(pathname);
  187. File[] fs=tmpLogFile.listFiles();
  188. while (true)
  189. {
  190. fs=tmpLogFile.listFiles();
  191. for (int i=0;i<fs.length;i++)
  192. {
  193. if(fs[i].isDirectory())
  194. {
  195. String path=fs[i].getAbsolutePath();
  196. //以drname作为position文件名
  197. String drname=fs[i].getName();
  198. //如果该目录对应的处理线程已经存在,判断是否存活
  199. if (drname.contains("xx") || drname.contains("yy") || drname.contains("zz") || drname.contains("aa")
  200. )
  201. {
  202. if (hm.containsKey(path))
  203. {
  204. if (!hm.get(path).isAlive())
  205. {
  206. hm.get(path).interrupt();
  207. TailFileThread tt=new TailFileThread(path,drname,topicname);
  208. tt.start();
  209. hm.put(path, tt);
  210. }
  211. }
  212. //如果不存在,新建
  213. else
  214. {
  215. TailFileThread tt=new TailFileThread(path,drname,topicname);
  216. tt.start();
  217. hm.put(path, tt);
  218. }
  219. }
  220. }           //System.out.println(fs[i].getAbsolutePath());
  221. }
  222. Thread.sleep(100);
  223. }
  224. }
  225. }

转:http://blog.csdn.net/u011750989/article/details/21957741

最新文章

  1. [Machine-Learning] matlab 矩阵常见基本操作
  2. The property on could not be set to a &#39;Int16&#39; value.You must set this property to a non-null value of type ‘Int32’.”
  3. BZOJ 1756: Vijos1083 小白逛公园
  4. earlysuspend调用过程
  5. 使用JS实现手风琴效果
  6. 关于Spring总结
  7. 阿里云轻量应用服务器Lamp部署php工程踩过的坑
  8. c语言变量类型联想
  9. 基于Spring Boot,使用JPA操作Sql Server数据库完成CRUD
  10. SQLSERVER 执行过的语句查询
  11. 基于nodejs的流水线式的CRUD服务。依赖注入可以支持插件。
  12. jmeter 之变量传递
  13. 2019/4/18 wen 线程
  14. JDBC driver连接MySQL运行报错The server time zone value &#39;&#214;&#208;&#185;&#250;&#177;&#234;&#215;&#188;&#202;&#177;&#188;&#228;&#39; is unrecognized or represents more than
  15. wpf 状态栏图标背景闪烁提醒 FlashWindowEx
  16. Python_summary
  17. 微服务 - Eureka注册中心
  18. Eclipse中JBoss插件配置
  19. Html页面Dom对象之Element
  20. iso搭建本地源

热门文章

  1. Miner3D 数据分析软件
  2. Azure 媒体服务换新锁,更安全更方便,新钥匙请收好!
  3. xfs管理2T以上大分区
  4. Spring Cloud入门程序
  5. Office加载项对Excel进行读写操作
  6. Bloom Filter (海量数据处理)
  7. MySql 8.0.11 在win10下的zip非安装配置
  8. 【CSS古话今说】-- 01.神奇的CSS-BFC在实战中的应用
  9. Leetcode 46 47 Permutation, 77 combination
  10. 【CF660E】Different Subsets For All Tuples(组合数学)