Hadopo提供了一个抽象的文件系统模型FileSystem,HDFS是其中的一个实现。

FileSystem是Hadoop中所有文件系统的抽象父类,它定义了文件系统所具有的基本特征和基本操作。

FileSystem类在org.apache.hadoop.fs包中。在eclipse中按ctrl+shift+T进行搜索,提示导入源码包hadoop-hdfs-client-3.0.0-sources.jar。导入即可。

一、成员变量

  1.Hadoop使用的默认的文件系统的配置项,在core-default.xml中

public static final String FS_DEFAULT_NAME_KEY =CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;  //该值为fs.defultFS

  2.文件系统的缓存

/** FileSystem cache. */
static final Cache CACHE = new Cache();

  3.该文件系统在Cache中的key实例

  /** The key this instance is stored under in the cache. */
private Cache.Key key;

  4.记录每一个文件系统类的统计信息

  /** Recording statistics per a FileSystem class. */
private static final Map<Class<? extends FileSystem>, Statistics>
statisticsTable = new IdentityHashMap<>();

  5.该文件系统的统计信息

  /**
* The statistics for this file system.
*/
protected Statistics statistics;

  6.在文件系统关闭或者JVN退出时,需要删除缓存中的文件

  /**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
*/
private final Set<Path> deleteOnExit = new TreeSet<>();

 二、内部类

内部类 Cache:缓存文件系统对象

  1.Hadoop将文件系统对象以键值对的形式保存到HashMap中。key是一个Cache的静态内部类

    private final Map<Key, FileSystem> map = new HashMap<>();

   2.根据文件系统的URI和配置信息得到一个key,再得到一个文件系统实例。如果文件系统不存在,则创建并初始化一个文件系统

    FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
} private FileSystem getInternal(URI uri, Configuration conf, Key key)
throws IOException{
FileSystem fs;
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
} fs = createFileSystem(uri, conf);
synchronized (this) { // refetch the lock again
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
} // now insert the new file system into the map
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
}
fs.key = key;
map.put(key, fs);
if (conf.getBoolean(
FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
toAutoClose.add(key);
}
return fs;
}
}

   3.删除指定的key对应的文件系统实例

    synchronized void remove(Key key, FileSystem fs) {
FileSystem cachedFs = map.remove(key);
if (fs == cachedFs) {
toAutoClose.remove(key);
} else if (cachedFs != null) {
map.put(key, cachedFs);
}
}

  4.删除所有的文件系统对象,并关闭这些文件系统。onlyAutomatic - 仅仅关闭这些标记为自动关闭的。

    /**
* Close all FileSystem instances in the Cache.
* @param onlyAutomatic only close those that are marked for automatic closing
* @throws IOException a problem arose closing one or more FileSystem.
*/
synchronized void closeAll(boolean onlyAutomatic) throws IOException {
List<IOException> exceptions = new ArrayList<>(); // Make a copy of the keys in the map since we'll be modifying
// the map while iterating over it, which isn't safe.
List<Key> keys = new ArrayList<>();
keys.addAll(map.keySet()); for (Key key : keys) {
final FileSystem fs = map.get(key); if (onlyAutomatic && !toAutoClose.contains(key)) {
continue;
} //remove from cache
map.remove(key);
toAutoClose.remove(key); if (fs != null) {
try {
fs.close();
}
catch(IOException ioe) {
exceptions.add(ioe);
}
}
} if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}

  5.根据用户组信息关闭对应的文件系统

    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
List<FileSystem> targetFSList = new ArrayList<>(map.entrySet().size());
//Make a pass over the list and collect the FileSystems to close
//we cannot close inline since close() removes the entry from the Map
for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
final Key key = entry.getKey();
final FileSystem fs = entry.getValue();
if (ugi.equals(key.ugi) && fs != null) {
targetFSList.add(fs);
}
}
List<IOException> exceptions = new ArrayList<>();
//now make a pass over the target list and close each
for (FileSystem fs : targetFSList) {
try {
fs.close();
}
catch(IOException ioe) {
exceptions.add(ioe);
}
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}

 内部类 Key :

  1.成员变量

      final String scheme;    //模式信息
final String authority; //授权信息
final UserGroupInformation ugi; //用户组信息

  2.有两种构造方法

      Key(URI uri, Configuration conf) throws IOException {
this(uri, conf, 0);
} Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null ?
"" : StringUtils.toLowerCase(uri.getScheme());
authority = uri.getAuthority()==null ?
"" : StringUtils.toLowerCase(uri.getAuthority());
this.unique = unique; this.ugi = UserGroupInformation.getCurrentUser();
}

 内部类 Statistics :文件系统的统计信息

  1.成员变量

      private final String scheme;    //文件系统URI的模式信息
private volatile long bytesRead; //从统计信息中读取的字节数
private volatile long bytesWritten; //向统计信息中写入的字节数
private volatile int readOps; //执行读操作的次数
private volatile int largeReadOps; //执行读取大数据操作的次数
private volatile int writeOps; //执行写操作的次数

三、成员方法

抽象方法如下:

  1.得到唯一标识文件系统的URI

  /**
* Returns a URI which identifies this FileSystem.
*
* @return the URI of this filesystem.
*/
public abstract URI getUri();

  2.打开Path路径指定的文件的FSDataInputStream输入流

  /**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
* @throws IOException IO failure
*/
public abstract FSDataInputStream open(Path f, int bufferSize)
throws IOException;

  3.在指定的路径上创建一个具有写入进度的FSDataOutputStream

  /**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param permission file permission
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize block size
* @param progress the progress reporter
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
*/
public abstract FSDataOutputStream create(Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException;

  4.在已经存在的文件后执行追加操作

  /**
* Append to an existing file (optional operation).
* @param f the existing file to be appended.
* @param bufferSize the size of the buffer to be used.
* @param progress for reporting progress if it is not null.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
public abstract FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException;

  5.将src指定的文件重命名为dst指定的文件

  /**
* Renames Path src to Path dst.
* @param src path to be renamed
* @param dst new path after rename
* @throws IOException on failure
* @return true if rename is successful
*/
public abstract boolean rename(Path src, Path dst) throws IOException;

  6.删除一个目录。如果这个path是一个目录并设置为true,则该目录被删除,否则将抛出一个异常。设置为true时会递归地删除目录。

  /** Delete a file.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false.
* @throws IOException IO failure
*/
public abstract boolean delete(Path f, boolean recursive) throws IOException;

  7.列出一个目录下面的文件和子目录的状态信息

  /**
* List the statuses of the files/directories in the given path if the path is
* a directory.
* <p>
* Does not guarantee to return the List of files/directories status in a
* sorted order.
* <p>
* Will not return null. Expect IOException upon access error.
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,IOException;

  8.设置给定文件系统的当前工作目录

  /**
* Set the current working directory for the given FileSystem. All relative
* paths will be resolved relative to it.
*
* @param new_dir Path of new working directory
*/
public abstract void setWorkingDirectory(Path new_dir);

  9.得到给定文件系统的当前工作目录

  /**
* Get the current working directory for the given FileSystem
* @return the directory pathname
*/
public abstract Path getWorkingDirectory();

  10.创建一个具有操作权限的目录

  /**
* Make the given file and all non-existent parents into
* directories. Has roughly the semantics of Unix @{code mkdir -p}.
* Existence of the directory hierarchy is not an error.
* @param f path to create
* @param permission to apply to f
* @throws IOException IO failure
*/
public abstract boolean mkdirs(Path f, FsPermission permission
) throws IOException;

  11.得到指定文件目录的状态信息

  /**
* Return a file status object that represents the path.
* @param f The path we want information from
* @return a FileStatus object
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public abstract FileStatus getFileStatus(Path f) throws IOException;

以上是FileSystem的一些抽象方法。

以下是FileSystem对抽象方法的一些重载方法。

  1.open的重载方法,IO_FILE_BUFFER_SIZE_KEY的值为"io.file.buffer.size",IO_FILE_BUFFER_SIZE_DEFULE的值为4096,即4KB。

  /**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file to open
* @throws IOException IO failure
*/
public FSDataInputStream open(Path f) throws IOException {
return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT));
}

  2.create的重载方法

    2.1默认对已经存在的文件进行重写。

  /**
* Create an FSDataOutputStream at the indicated Path.
* Files are overwritten by default.
* @param f the file to create
* @throws IOException IO failure
*/
public FSDataOutputStream create(Path f) throws IOException {
return create(f, true);
}

      2.1.1根据缓冲区大小、副本数量、块大小来创建FSDataOutputStream

  /**
* Create an FSDataOutputStream at the indicated Path.
* @param f the file to create
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an exception will be thrown.
* @throws IOException IO failure
*/
public FSDataOutputStream create(Path f, boolean overwrite)
throws IOException {
return create(f, overwrite,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
getDefaultReplication(f),
getDefaultBlockSize(f));
}

  默认的块的副本数是1

  /**
* Get the default replication.
* @return the replication; the default value is "1".
* @deprecated use {@link #getDefaultReplication(Path)} instead
*/
@Deprecated
public short getDefaultReplication() { return 1; } /**
* Get the default replication for a path.
* The given path will be used to locate the actual FileSystem to query.
* The full path does not have to exist.
* @param path of the file
* @return default replication for the path's filesystem
*/
public short getDefaultReplication(Path path) {
return getDefaultReplication();
}

  默认的块大小为32MB

  /**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time.
* @deprecated use {@link #getDefaultBlockSize(Path)} instead
*/
@Deprecated
public long getDefaultBlockSize() {
// default to 32MB: large enough to minimize the impact of seeks
return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
} /**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
* @param f path of file
* @return the default block size for the path's filesystem
*/
public long getDefaultBlockSize(Path f) {
return getDefaultBlockSize();
}

   2.2在创建FSDataOutputStream的同时会向Hadoop汇报执行进度

  /**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* Files are overwritten by default.
* @param f the file to create
* @param progress to report progress
* @throws IOException IO failure
*/
public FSDataOutputStream create(Path f, Progressable progress)
throws IOException {
return create(f, true,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
getDefaultReplication(f),
getDefaultBlockSize(f), progress);
}

  其他的create的重载方法大概就是使用指定或默认的缓冲区大小、块的副本数、块大小作为参数来创建FSDataOutputSream,以及指定跨文件系统进行创建。这里就不一一列举。

  3.append的重载方法

    3.1根据默认的缓冲区大小打开文件,然后向文件末尾追加内容

  /**
* Append to an existing file (optional operation).
* Same as
* {@code append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
* IO_FILE_BUFFER_SIZE_DEFAULT), null)}
* @param f the existing file to be appended.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
public FSDataOutputStream append(Path f) throws IOException {
return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT), null);
}

    3.2根据用户指定的缓冲区大小来打开文件,然后向文件末尾追加内容

  /**
* Append to an existing file (optional operation).
* Same as append(f, bufferSize, null).
* @param f the existing file to be appended.
* @param bufferSize the size of the buffer to be used.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
return append(f, bufferSize, null);
}

  4.mkdisr的重载方法

    4.1根据默认的文件系统权限来创建目录,默认为00777

  /**
* Call {@link #mkdirs(Path, FsPermission)} with default permission.
* @param f path
* @return true if the directory was created
* @throws IOException IO failure
*/
public boolean mkdirs(Path f) throws IOException {
return mkdirs(f, FsPermission.getDirDefault());
}

    4.2跨文件系统来执行创建目录的操作。首先根据指定的路径创建一个目录,然后再将其权限设置为用户指定的权限

  /**
* Create a directory with the provided permission.
* The permission of the directory is set to be the provided permission as in
* setPermission, not permission&~umask
*
* @see #create(FileSystem, Path, FsPermission)
*
* @param fs FileSystem handle
* @param dir the name of the directory to be created
* @param permission the permission of the directory
* @return true if the directory creation succeeds; false otherwise
* @throws IOException A problem creating the directories.
*/
public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
throws IOException {
// create the directory using the default permission
boolean result = fs.mkdirs(dir);
// set its permission to be the supplied one
fs.setPermission(dir, permission);
return result;
}

    5.listStatus的重载方法

      5.1使用指定的过滤器过滤指定path的文件,然后将剩余的文件的状态信息保存到用户给定的ArrayList集合中。

  /**
* Filter files/directories in the given path using the user-supplied path
* filter. Results are added to the given array <code>results</code>.
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
private void listStatus(ArrayList<FileStatus> results, Path f,
PathFilter filter) throws FileNotFoundException, IOException {
FileStatus listing[] = listStatus(f);
Preconditions.checkNotNull(listing, "listStatus should not return NULL");
for (int i = 0; i < listing.length; i++) {
if (filter.accept(listing[i].getPath())) {
results.add(listing[i]);
}
}
}

      5.2和上面的区别是,保存状态信息的集合是内部新建的,不是用户指定的。

  public FileStatus[] listStatus(Path f, PathFilter filter)
throws FileNotFoundException, IOException {
ArrayList<FileStatus> results = new ArrayList<>();
listStatus(results, f, filter);
return results.toArray(new FileStatus[results.size()]);
}

        5.3使用默认的过滤器来过滤指定path集合中的文件,然后将剩余的文件的状态信息保存到内部新建的列表中。

  /**
* Filter files/directories in the given list of paths using default
* path filter.
* <p>
* Does not guarantee to return the List of files/directories status in a
* sorted order.
*
* @param files
* a list of paths
* @return a list of statuses for the files under the given paths after
* applying the filter default Path filter
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public FileStatus[] listStatus(Path[] files)
throws FileNotFoundException, IOException {
return listStatus(files, DEFAULT_FILTER);
}

     5.4使用指定的过滤器来过滤指定path集合中的文件,然后将剩余的文件的状态信息保存到内部新建的列表中。

  public FileStatus[] listStatus(Path[] files, PathFilter filter)
throws FileNotFoundException, IOException {
ArrayList<FileStatus> results = new ArrayList<FileStatus>();
for (int i = 0; i < files.length; i++) {
listStatus(results, files[i], filter);
}
return results.toArray(new FileStatus[results.size()]);
}

  6.copyFromLocalFile方法的重载

    6.1将本地磁盘上src指定路径的文件复制到dst指定的路径,不删除源文件

  /**
* The src file is on the local disk. Add it to filesystem at
* the given dst name and the source is kept intact afterwards
* @param src path
* @param dst path
* @throws IOException IO failure
*/
public void copyFromLocalFile(Path src, Path dst)
throws IOException {
copyFromLocalFile(false, src, dst);
}

    6.2根据用户指定的delSrc的值来决定删不删除源文件

  /**
* The src file is on the local disk. Add it to the filesystem at
* the given dst name.
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param src path
* @param dst path
*/
public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
copyFromLocalFile(delSrc, true, src, dst);
}

    6.3根据delSrc参数决定是否删除源文件,根据overwrite参数决定是否覆盖dst路径下已有的目标文件。

  /**
* The src files are on the local disk. Add it to the filesystem at
* the given dst name.
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param srcs array of paths which are source
* @param dst path
* @throws IOException IO failure
*/
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path[] srcs, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
}

  7.moveFromLocalFile的重载方法,通过调用copyToLocalFile方法来实现

  /**
* Copy a file to the local filesystem, then delete it from the
* remote filesystem (if successfully copied).
* @param src path src file in the remote filesystem
* @param dst path local destination
* @throws IOException IO failure
*/
public void moveToLocalFile(Path src, Path dst) throws IOException {
copyToLocalFile(true, src, dst);
}

  8.将远程文件系统中src指定的文件复制到本地dst指定的路径下,delSrc参数决定是否删除源文件

  /**
* Copy it a file from a remote filesystem to the local one.
* delSrc indicates if the src will be removed or not.
* @param delSrc whether to delete the src
* @param src path src file in the remote filesystem
* @param dst path local destination
* @throws IOException IO failure
*/
public void copyToLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
copyToLocalFile(delSrc, src, dst, false);
}

最新文章

  1. Javascript模式(第二章基本技巧)------读书笔记
  2. 【HDU 5744】Keep On Movin
  3. Div内有ul li元素,Div设置高度100%,谷歌IE下li过多会显示滚动条,而火狐下就会滚动条就不起作用,li会撑大Div
  4. Jenkins遇到问题三:调整jdk版本不生效的解决办法
  5. OneZero第一次会议(非正式)
  6. jquery在线预览PDF文件,打开PDF文件(向下兼容ie8、ie7)
  7. Perl Print Win32 Console Windows 控制台 print Unicode 问题
  8. iOS 开发 中级:UIToolbar,UINavigationBar,UITabBar,UIBarButtonItem,UITabBarItem自定义方法总结
  9. varchar(n),nvarchar(n) 长度、性能、及所占空间分析 nvarchar(64) nvarchar(128) nvarchar(256)(转)
  10. 模式匹配-KMP算法
  11. 新秀nginx源代码分析数据结构篇(两) 双链表ngx_queue_t
  12. $_SERVER参数的含义
  13. ArrayList源码解析(四)
  14. CVE-2016-3714 - ImageMagick 命令执行
  15. 201521123057《Java程序设计》第14周学习总结
  16. flume1.8 Sinks类型介绍(三)
  17. 解析XML的方法
  18. gradle入门(1-6)将Java项目从maven迁移到gradle
  19. POJ - 1062(昂贵的聘礼)(有限制的spfa最短路)
  20. 没有使用Material组件

热门文章

  1. QT 读取txt文件的几种方法
  2. 4-(基础入门篇)学会刷Wi-Fi模块固件(刷AT指令固件)
  3. WPF,ListView设置分组
  4. [Oralce][InMemory]如何确定一个表已经被Populate 到In Memory 中?
  5. JavaScript 利用 async await 实现 sleep 效果
  6. (10)学习笔记 ) ASP.NET CORE微服务 Micro-Service ---- Ocelot+Identity Server
  7. VMware workstation运维实践系列博客导航
  8. 洛谷P1004 方格取数-四维DP
  9. OneZero第一次随感
  10. “数学口袋精灵”第二个Sprint计划(第六~八天)