BUG:在往目录中copy大文件时,没有复制完,flume就开始读-->导致报错

在代码中体现为:
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile()方法内

解决方案:
等文件完全拷贝完成,再开始读这个文件

1.5版本:

private Optional<FileInfo> getNextFile() {
7 /* Filter to exclude finished or hidden files */
8 FileFilter filter = new FileFilter() {
9 public boolean accept(File candidate) {
10 String fileName = candidate.getName();
11 if ((candidate.isDirectory()) ||
12 (fileName.endsWith(completedSuffix)) ||
13 (fileName.startsWith(".")) ||
14 ignorePattern.matcher(fileName).matches()) {
15 return false;
16 }
17 return true;
18 }
19 };
20 List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); //获取spoolDirectory下满足条件的文件
21 if (candidateFiles.isEmpty()) {
22 return Optional.absent();
23 } else {
24 Collections.sort(candidateFiles, new Comparator<File>() { //按最后修改时间排序文件
25 public int compare(File a, File b) {
26 int timeComparison = new Long(a.lastModified()).compareTo(
27 new Long(b.lastModified()));
28 if (timeComparison != 0) {
29 return timeComparison;
30 }
31 else {
32 return a.getName().compareTo(b.getName());
33 }
34 }
35 });
36 File nextFile = candidateFiles.get(0); //因为每次获取到的文件处理完都会被标记为已完成,所以直接取拍完序的第一个
37 //修复传输大文件报错文件被修改的BUG
38 this.checkFileCpIsOver(nextFile);//此处被阻塞,直到文件拷贝文件或者超过20秒
39
40 try {
41 // roll the meta file, if needed
42 String nextPath = nextFile.getPath()

1.7版本 :

  private Optional<FileInfo> getNextFile() {
List<File> candidateFiles = Collections.emptyList(); if (consumeOrder != ConsumeOrder.RANDOM ||
candidateFileIter == null ||
!candidateFileIter.hasNext()) {
candidateFiles = getCandidateFiles(spoolDirectory.toPath());
listFilesCount++;
candidateFileIter = candidateFiles.iterator();
} if (!candidateFileIter.hasNext()) { // No matching file in spooling directory.
return Optional.absent();
} File selectedFile = candidateFileIter.next();
if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random.
return openFile(selectedFile);
} else if (consumeOrder == ConsumeOrder.YOUNGEST) {
for (File candidateFile : candidateFiles) {
long compare = selectedFile.lastModified() -
candidateFile.lastModified();
if (compare == 0) { // ts is same pick smallest lexicographically.
selectedFile = smallerLexicographical(selectedFile, candidateFile);
} else if (compare < 0) { // candidate is younger (cand-ts > selec-ts)
selectedFile = candidateFile;
}
}
} else { // default order is OLDEST
for (File candidateFile : candidateFiles) {
long compare = selectedFile.lastModified() -
candidateFile.lastModified();
if (compare == 0) { // ts is same pick smallest lexicographically.
selectedFile = smallerLexicographical(selectedFile, candidateFile);
} else if (compare > 0) { // candidate is older (cand-ts < selec-ts).
selectedFile = candidateFile;
}
}
} firstTimeRead = true; //修复传输大文件报错文件被修改的BUG
this.checkFileCpIsOver(selectedFile);//此处被阻塞,直到文件拷贝文件或者超过20秒
return openFile(selectedFile);
}

解决代码:

 /**
*
* @Title: checkFileCpIsOver
* @Description: TODO(用来检查文件拷贝是否完成)
* @param @param currentFile 设定文件
* @return void 返回类型
* @throws
*/
private void checkFileCpIsOver(File file) {
long modified = file.lastModified();//目前文件的修改时间
long length = file.length();//目前文件的大小
try {
Thread.sleep(1000);//等待1秒钟
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
File currentFile = new File(file.getAbsolutePath());
int count = 0;//记录循环次数,超过20次,也就是10秒后抛出异常
while(currentFile.lastModified() != modified || currentFile.length() != length) {
if(count > 20) {
String message = "File Copy time too long. please check copy whether exception!" + "\n"
+ "File at :" + file.getAbsolutePath() + "\n"
+ "File current length is:" + currentFile.lastModified();
new IllegalStateException(message);
}
count++;
modified = currentFile.lastModified();
length = currentFile.length();
try {
Thread.sleep(500);//等待500毫秒
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
currentFile = new File(file.getAbsolutePath()); }
//一直到文件传输完成就可以退出
}

最新文章

  1. JAVA Socket 编程学习笔记(一)
  2. UDP 一个封锁操作被对 WSACancelBlockingCall 的调用中断
  3. Java中hashCode的作用
  4. android:ToolBar详解(手把手教程)
  5. fscanf的返回值未成功输入的元素个数 .xml
  6. js获取url及url参数的方法
  7. windows10UWP:Segoe MDL2 图标指南
  8. iis6 下发布MVC2项目的方法
  9. (转载)JS中的prototype
  10. python 之遍历目录树(可匹配输出特定后缀的文件)
  11. Java操作PDF之itext入门
  12. splay详解(三)
  13. 下载caffe慢
  14. 电商项目-商品表(spu)、规格表(sku)设计
  15. 【php增删改查实例】第二十五节 - 在main.php中显示头像
  16. group by具有去重的功能
  17. sql 指定时间 所在的周、月、季、年
  18. loj#2574. 「TJOI2018」智力竞赛 (路径覆盖)
  19. 更改AVD默认路径
  20. echarts - 特殊需求实现代码汇总之【饼图】篇

热门文章

  1. Android开发点点滴滴——一些基础的但实用的知识(2)
  2. wrote a programming language
  3. HDU 2686 Matrix(最大费用流)
  4. 深入理解 Java中的 流 (Stream)
  5. [Unity Editor]10行代码搞定Hierarchy排序
  6. 【maven】排除maven中jar包依赖的解决过程 例子:spring cloud启动zipkin,报错maven依赖jar包冲突 Class path contains multiple SLF4J bindings.
  7. 判断一个请求是否为Ajax请求
  8. linux下限制ip访问
  9. 解决hue/hiveserver2对于hive date类型显示为NULL的问题
  10. linux C 多线程/线程池编程 同步实例