对于MapReduce等框架来说,需要有一套更底层的API来获取某个指定文件中的一部分数据,而不是一整个文件

因此使用流的方式来操作 HDFS上的文件,可以实现读取指定偏移量范围的数据

1.客户端测试类代码:

package cn.bigdata.hdfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Before; public class HdfsStreamAcess {
//获取客户端操作hdfs的实例对象
private FileSystem fs = null;
Configuration conf = null;
@Before
public void inin() throws IOException, InterruptedException, URISyntaxException{
conf = new Configuration();
//拿到一个文件系统操作的客户端实例对象,最后一个参数为用户名
fs = FileSystem.get(new URI("hdfs://shizhan2:9000"),conf,"root");
}
}

2.流式上传文件:

    //流式上传文件
@Test
public void testUploadWithStream() throws IllegalArgumentException, IOException{
//true:该文件夹存在就覆盖 IOUtils:工具类
FSDataOutputStream outputstream = fs.create(new Path("/angelababy.love"), true);
FileInputStream input = new FileInputStream("c:/xxx.txt");
IOUtils.copy(input, outputstream);
}

3.流式下载文件:

    //流式下载文件
@Test
public void testDownloadWithStream() throws Exception{
FSDataInputStream in = fs.open(new Path("/angelababy.love"));
FileOutputStream out = new FileOutputStream("d:/access_stream.log");
IOUtils.copy(in, out);
}

4.流式读取指定长度的文件:

//文件的随机读写
@Test
public void testRandomAccess() throws Exception{
FSDataInputStream in = fs.open(new Path("/regist-copy.log"));
FileOutputStream out = new FileOutputStream("d:/random_stream.log");
IOUtils.copyLarge(in, out, 1*1024*1024, 1*1024*1024); // 从1M位置开始读,读1M
}

hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度,用于上层分布式运算框架并发处理数据

5.控制台打印HDFS文件内容:

@Test
public void testCat() throws Exception{
FSDataInputStream in = fs.open(new Path("/angelababy.love"));
IOUtils.copy(in,System.out);
}

6.递归列出指定目录下所有子文件夹中的文件:

@Test
public void testLs() throws Exception {
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while(listFiles.hasNext()){
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("blocksize: " +fileStatus.getBlockSize());
System.out.println("owner: " +fileStatus.getOwner());
System.out.println("Replication: " +fileStatus.getReplication());
System.out.println("Permission: " +fileStatus.getPermission());
System.out.println("Name: " +fileStatus.getPath().getName());
System.out.println("------------------");
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for(BlockLocation b:blockLocations){
System.out.println("块起始偏移量: " +b.getOffset());
System.out.println("块长度:" + b.getLength());
//块所在的datanode节点
String[] datanodes = b.getHosts();
for(String dn:datanodes){
System.out.println("datanode:" + dn);
}
}
}
}

7.获取文件块信息:

    @Test
public void testGetFileBlock() throws Exception{
FileStatus fileStatus = fs.getFileStatus(new Path("/pcre-8.35.tar.gz"));
BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation bl : blockLocations) {
System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
}

最新文章

  1. Homework 3
  2. 深入webx框架(li)
  3. VB的注释
  4. 在RAC中,当私有网线拔了后,会怎么样?
  5. hibernate 多对多
  6. 移动平台WEB前端开发技巧汇总(转)
  7. java--从控制台读入一些数据
  8. github + SourceTree管理自己的库并上传到cocoapods及各种坑的解决办法
  9. 用python做小说网站
  10. Nginx调用远程php-fpm
  11. JQuery AJAX 全局设置
  12. codevs 3981 动态最大子段和
  13. 【BZOJ 3534】: [Sdoi2014]重建
  14. C# ConcurrentQueue实现
  15. 编程调节Win7/Win8系统音量的一种方法
  16. js的匿名函数 和普通函数
  17. python对象序列化pickle
  18. 【Java并发编程】:线程挂起、恢复与终止
  19. 服务发现与消费 --&gt; Spring Cloud Eureka
  20. Launchy – 快速调出你的程序

热门文章

  1. Markov logic network
  2. Jenkins - Tips
  3. 根据文本内容确定UILabel的高度
  4. Linux添加日常任务监控文件或日志大小
  5. webdriver的八种定位
  6. C学习笔记-数据类型
  7. 二分查找算法C++实现
  8. PHP抽奖代码。亲测可用
  9. POJ1041 John&#39;s trip 【字典序输出欧拉回路】
  10. 2019牛客暑期多校训练营(第八场)-A All-one Matrices (单调栈+前缀和)