大数据:从基础理论到最佳实践
上QQ阅读APP看书,第一时间看更新

3.1 HDFS接口与编程

HDFS提供了多种用户操作和编程接口,既通过Shell命令管理文件与目录、管理作业调度、控制与优化集群性能等,也提供了Java、C语言等的编程接口,用户可以通过编写程序对HDFS进行扩展。

3.1.1 Shell命令

HDFS资源URI的格式如下:

        scheme://authority/path

其中scheme是协议名,一般是file或hdfs; authority是授权访问的主机名或IP; path是访问路径。例如:

        hdfs://localhost:9000/user/chunk/test.txt

如果已经在core-site.xml里配置了fs.default.name=hdfs://localhost:9000,则仅使用/user/chunk/test.txt即可。

在HDFS的所有接口中,Shell命令行接口最简单,也是开发者比较熟悉的方式。我们通过使用“hdfs -help”命令,可以看到HDFS支持的文件系统命令,如图3-1所示。

图3-1 HDFS支持的文件系统命令

HDFS支持的文件系统命令主要有两类。

(1)用户命令:用于管理HDFS日常操作,如dfs、fsck、fetchdt等。

(2)系统管理命令:主要用于控制和管理HDFS集群,如balancer、namenode、datanode、dfsadmin、secondarynamenode等。限于篇幅,这里只介绍几种常用的命令模块。

1. hdfs dfs [GENERIC_OPTIONS][COMMAND_OPTIONS]

“hdfs dfs”提供了类似于Linux Shell一样的命令集,其用法与Linux Shell基本一致。下面详细介绍各个命令。

(1)appendToFile。

说明:将一个或者多个本地文件追加到目的文件。成功返回0,错误返回1。

格式:hdfs dfs -appendToFile <localsrc> ... <dst>

示例:

        hdfs dfs -appendToFile localfile /user/hadoop/hadoopfile
        hdfs dfs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile
        hdfs dfs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile

(2)cat。

说明:将路径指定文件的内容输出到stdout。成功返回0,错误返回-1。

格式:hdfs dfs -cat URI [URI ...]

示例:

        hdfs dfs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
        hdfs dfs -cat file:///file3 /user/hadoop/file4

(3)chgrp。

说明:改变文件所属的用户组。如果使用-R选项,则这一操作对整个目录结构递归执行。使用这一命令的用户必须是文件的所属用户,或者是超级用户。

格式:hdfs dfs -chgrp [-R]GROUP URI [URI ...]

(4)chmod。

说明:改变文件的权限。使用-R将使改变在目录结构下递归进行。命令的使用者必须是文件的所有者或者超级用户。

格式:hdfs dfs -chmod [-R]<MODE[, MODE]... | OCTALMODE> URI [URI ...]

(5)chown。

说明:改变文件的所属用户。如果使用-R选项,则这一操作对整个目录结构递归执行。使用这一命令的用户必须是文件在命令变更之前的所属用户,或者是超级用户。

格式:hdfs dfs -chown [-R][OWNER][:[GROUP]]URI [URI ]

(6)copyFromLocal。

说明:从本地复制,与put命令相似,但限定源路径是本地的。

格式:hdfs dfs -copyFromLocal <localsrc> URI

(7)copyToLocal。

说明:复制到本地,与get命令相似,但限定目的路径是本地的。

格式:hdfs dfs -copyToLocal [-ignorecrc][-crc]URI <localdst>

(8)count。

说明:计算文件、目录的数量。成功返回0,错误返回-1。

格式:hdfs dfs -count [-q][-h]<paths>

示例:

        hdfs dfs -count hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
        hdfs dfs -count -q hdfs://nn1.example.com/file1
        hdfs dfs -count -q -h hdfs://nn1.example.com/file1

(9)cp。

说明:将文件从源路径复制到目标路径。这个命令允许有多个源路径,但同时,目标路径必须是一个目录。成功返回0,错误返回-1。

格式:hdfs dfs -cp [-f][-p | -p[topax]]URI [URI ...]<dest>

示例:

        hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2
        hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir

(10)du。

说明:显示目录中所有文件的大小,或者当只指定一个文件时,显示此文件的大小。成功返回0,错误返回-1。

格式:hdfs dfs -du [-s][-h]URI [URI ...]

示例:

        hdfs dfs -du /user/hadoop/dir1 /user/hadoop/file1
          hdfs://nn.example.com/user/hadoop/dir1

(11)dus。

说明:显示文件的大小。此命令可以用“du -s”替代。

格式:hdfs dfs -dus <args>

(12)expunge。

作用:清空回收站。

格式:hdfs dfs -expunge

(13)get。

说明:复制文件到本地文件系统。可用“-ignorecrc”选项复制CRC校验失败的文件。使用“-crc”选项复制文件以及CRC信息。成功返回0,错误返回-1。

格式:hdfs dfs -get [-ignorecrc][-crc]<src> <localdst>

示例:

        hdfs dfs -get /user/hadoop/file localfile
        hdfs dfs -get hdfs://nn.example.com/user/hadoop/file localfile

(14)getfacl。

说明:显示文件或者目录的权限控制列表。成功返回0,错误返回非零值。

格式:hdfs dfs -getfacl [-R]<path>

示例:

        hdfs dfs -getfacl /file
        hdfs dfs -getfacl -R /dir

(15)getfattr。

说明:显示文件或者目录的扩展属性。成功返回0,错误返回非零值。

格式:hdfs dfs -getfattr [-R]-n name | -d [-e en]<path>

示例:

        hdfs dfs -getfattr -d /file
        hdfs dfs -getfattr -R -n user.myAttr /dir

(16)getmerge。

说明:接受一个源目录和一个目标文件作为输入,并且将源目录中所有的文件连接成本地目标文件。addnl是可选的,用于指定在每个文件结尾添加一个换行符。

格式:hdfs dfs -getmerge <src> <localdst> [addnl]

(17)ls。

说明:与Linux中一样,返回子目录或子文件列表。成功返回0,错误返回-1。

格式:hdfs dfs -ls [-R]<args>

示例:

        hdfs dfs -ls /user/hadoop/file1

(18)lsr。

说明:ls命令的递归版本,一般使用“ls -R”代替。

格式:hdfs dfs -lsr <args>

(19)mkdir。

说明:创建目录,加-p选项创建多层目录。成功返回0,错误返回-1。

格式:hdfs dfs -mkdir [-p]<paths>

示例:

        hdfs dfs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
        hdfs dfs -mkdir hdfs://nn1.example.com/user/hadoop/dir
          hdfs://nn2.example.com/user/hadoop/dir

(20)moveFromLocal。

说明:类似put,区别在于put操作完成后删除。

格式:hdfs dfs -moveFromLocal <localsrc> <dst>

(21)mv。

说明:将文件从源路径移动到目标路径。这个命令允许有多个源路径,此时,目标路径必须是一个目录。不允许在不同的文件系统间移动文件。成功返回0,错误返回-1。

格式:hdfs dfs -mv URI [URI ...]<dest>

示例:

        hdfs dfs -mv /user/hadoop/file1 /user/hadoop/file2
        hdfs dfs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2
          hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1

(22)put。

说明:从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入设备中读取输入,写入目标文件系统。成功返回0,错误返回-1。

格式:hdfs dfs -put <localsrc> ... <dst>

示例:

        hdfs dfs -put localfile /user/hadoop/hadoopfile
        hdfs dfs -put localfile1 localfile2 /user/hadoop/hadoopdir
        hdfs dfs -put localfile hdfs://nn.example.com/hadoop/hadoopfile

(23)rm。

说明:删除指定的文件或目录。成功返回0,错误返回-1。

格式:hdfs dfs -rm [-f][-r|-R][-skipTrash]URI [URI ...]

示例:

        hdfs dfs -rm hdfs://nn.example.com/file /user/hadoop/emptydir

(24)rmr。

说明:rm的递归版本,已过时,一般使用“rm -r”代替。

格式:hdfs dfs -rmr [-skipTrash]URI [URI ...]

(25)setfacl。

说明:设置文件或者目录的权限控制列表。成功返回0,错误返回非零值。

格式:hdfs dfs -setfacl [-R][-b|-k -m|-x <acl_spec> <path>]|[--set <acl_spec> <path>]

示例:

        hdfs dfs -setfacl -m user:hadoop:rw- /file
        hdfs dfs -setfacl -x user:hadoop /file
        hdfs dfs -setfacl -b /file
        hdfs dfs -setfacl -k /dir
        hdfs dfs -setfacl --set user::rw-, user:hadoop:rw-, group::r--, other::r--
          /file
        hdfs dfs -setfacl -R -m user:hadoop:r-x /dir
        hdfs dfs -setfacl -m default:user:hadoop:r-x /dir

(26)setfattr。

说明:设置文件或者目录的扩展属性。成功返回0,错误返回非零值。

格式:hdfs dfs -setfattr -n name [-v value]| -x name <path>

示例:

        hdfs dfs -setfattr -n user.myAttr -v myValue /file
        hdfs dfs -setfattr -n user.noValue /file
        hdfs dfs -setfattr -x user.myAttr /file

(27)setrep。

说明:改变文件和目录的复制因子。成功返回0,错误返回-1。

格式:hdfs dfs -setrep [-R][-w]<numReplicas> <path>

示例:

        hdfs dfs -setrep -w 3 /user/hadoop/dir1

(28)stat。

说明:返回指定路径的统计信息。成功返回0,错误返回-1。

格式:hdfs dfs -stat URI [URI ...]

示例:

        hdfs dfs -stat path

(29)tail。

说明:将文件尾部1KB的内容输出到stdout。成功返回0,错误返回-1。

格式:hdfs dfs -tail [-f]URI

示例:

        hdfs dfs -tail pathname

(30)test

说明:检查文件。选项“-e”检查文件是否存在,如果存在则返回0;选项“-z”检查文件是否为0字节,如果是则返回0;选项“-d”检查路径是否为目录,如果是则返回1,否则返回0。

格式:hdfs dfs -test -[ezd]URI

示例:

        hdfs dfs -test -e filename

(31)text。

说明:将源文件输出为文本格式。允许的格式是zip和TextRecordInputStream。

格式:hdfs dfs -text <src>

(32)touchz。

说明:创建一个空文件。成功返回0,错误返回-1。

格式:hdfs dfs -touchz URI [URI ...]

示例:

        hdfs dfs -touchz pathname

小提示

“hadoop dfs”与“hdfs dfs”都是操作HDFS文件系统的命令,“hadoop dfs”属于早期版本的格式,已经过时,一般使用“hdfs dfs”。

“hadoop fs”也是文件系统操作命令,但使用范围更广,能够操作其他格式文件系统,如local、HDFS等,可以在本地与Hadoop分布式文件系统的交互操作中使用。

2. hdfs fsck [GENERIC_OPTIONS]<path> [-list-corruptfileblocks | [-move | -delete| -openforwrite][-files [-blocks [-locations | -racks]]]][-includeSnapshots]

fsck是一个文件系统健康状况检查工具,用来检查各类问题,比如,文件块丢失等(如图3-2所示)。但是,注意它不会主动恢复备份缺失的block,这个是由NameNode单独的线程异步处理的。

图3-2 fsck命令的运行结果

fsck命令的参数说明见表3-1。

表3-1 fsck参数的说明

3. hdfs datanode [-regular | -rollback | -rollingupgrade rollback]

运行一个HDFS集群的数据节点。参数说明见表3-2。

表3-2 hdfs datanode命令参数的说明

4. hdfs namenode [GENERIC_OPTIONS]

“hdfs namenode”是运行NameNode的命令,是一个比较核心的工具。该命令的主要参数说明见表3-3。

表3-3 hdfs namenode命令参数的说明

5. hdfs dfsadmin [GENERIC_OPTIONS]

dfsadmin是一个多任务的工具,我们可以使用它来获取HDFS的状态信息,以及在HDFS上执行的一系列管理操作。该命令的主要参数说明见表3-4。

表3-4 hdfs dfsadmin命令参数的说明

续表

dfsadmin命令的使用示例如图3-3所示。

图3-3 dfsadmin命令的使用示例

6. hdfs cacheadmin

管理员和用户通过“hdfs cacheadmin”命令管理缓存资源。

缓存指令由一个唯一的无重复的64位整数ID来标识。即使缓存指令后来被删除了,ID也不会重复使用。缓存池由一个唯一的字符串名称来标识。

(1)增加缓存:addDirective。

用法:hdfs cacheadmin -addDirective -path <path> -pool <pool-name> [-force][-replication<replication>][-ttl <time-to-live>]

参数说明见表3-5。

表3-5 addDirective的参数说明

(2)删除一个缓存:removeDirective。

用法:hdfs cacheadmin -removeDirective <id>

参数id指定要删除的缓存指令的ID。删除时,必须对该指令的缓存池拥有写权限。

(3)删除指定路径下的每一个缓存:removeDirectives。

用法:hdfs cacheadmin -removeDirectives <path>

参数path中设置要删除的缓存指令的路径。删除时必须对该指令的缓存池拥有写权限。

(4)缓存列表:listDirectives。

用法:hdfs cacheadmin -listDirectives [-stats][-path <path>][-pool <pool>]

参数说明见表3-6。

表3-6 listDirectives的参数说明

(5)新增缓存池:addPool。

用法:hdfs cacheadmin -addPool <name> [-owner <owner>][-group <group>][-mode<mode>][-limit <limit>][-maxTtl <maxTtl>

参数说明见表3-7。

表3-7 addPool的参数说明

(6)修改缓存池:modifyPool。

用法:hdfs cacheadmin -modifyPool <name> [-owner <owner>][-group <group>][-mode<mode>][-limit <limit>][-maxTtl <maxTtl>]

参数说明见表3-8。

表3-8 modifyPool的参数说明

(7)删除缓存池:removePool。

用法:hdfs cacheadmin -removePool <name>

参数name指定要删除的缓存池的名称。

(8)缓存池列表:listPools。

用法:hdfs cacheadmin -listPools [-stats][<name>]

参数说明见表3-9。

表3-9 addPool的参数说明

7. hdfs balancer [-threshold <threshold>][-policy <policy>]

HDFS集群非常容易出现机器与机器之间磁盘利用率不平衡的情况,尤其是增加新的数据节点时。保证HDFS中的数据平衡非常重要。HDFS出现不平衡的状况将引发很多问题,比如MapReduce程序无法很好地利用本地计算的优势、机器之间无法达到更好的网络带宽使用率等。

在Hadoop中,包含一个Balancer程序,可以调节HDFS集群平衡的状态。启动Balancer服务时,界面如图3-4所示。

图3-4 启动Balancer服务

服务启动后,集群管理人员可用balancer命令进行分析和再平衡数据,如图3-5所示。

图3-5 可用balancer命令进行分析和再平衡数据

参数threshold是判断集群是否平衡的目标参数,表示HDFS达到平衡状态的磁盘使用率偏差值。默认设置为10,参数取值范围是0~100。如果机器之间磁盘使用率偏差小于10%,我们就认为HDFS集群已经达到了平衡的状态。

8. hdfs version

hdfs version命令用于查看当前系统的版本,运行示例如图3-6所示。

图3-6 使用hdfs version命令查看当前系统的版本

3.1.2 Java接口操作

由于Hadoop本身就是使用Java语言编写的,理论上,通过Java API能够调用所有的Hadoop文件系统的操作接口。

Hadoop有一个抽象的文件系统概念,在Java抽象类org.apache.hadoop.fs中定义了接口。只要某个文件系统实现了这个接口,那么,它就可以作为Hadoop支持的文件系统。目前Hadoop能够支持的文件系统如表3-10所示。

表3-10 Hadoop文件类的实现

在Hadoop中,主要是定义了一组分布式文件系统和通用的I/O组件和接口,Hadoop的文件系统准确地应该称作Hadoop I/O。而HDFS是实现该文件接口的Hadoop自带的分布式文件项目,是对Hadoop I/O接口的实现。在处理大数据集时,为实现最优性能,通常使用HDFS存储。

org.apache.hadoop.fs包由接口类(FsConstants、Syncable等)、Java类(AbstractFileSystem、BlockLocation、FileSystem、FileUtil、FSDataInputStream等)、枚举类型(如CreateFlag)、异常类(ChecksumException、InvalidPathException等)和错误类(如FSError)组成。每个子对象中都定义了相应的方法,通过对org.apache.hadoop.fs包的封装与调用,可以拓展HDFS应用,更好地帮助用户使用集群海量存储。

在介绍Java接口操作之前,先介绍几个常用的Java类。

(1)FileSystem。

org.apache.hadoop.fs.FileSystem,通用文件系统基类,用于与HDFS文件系统交互,编写的HDFS程序都需要重写FileSystem类。通过FileSystem,可以非常方便地像操作本地文件系统一样操作HDFS集群文件。

FileSystem提供了get方法,一个是通过配置文件获取与HDFS的连接;一个是通过URL指定配置文件,获取与HDFS的连接,URL的格式为hdfs://namenode/xxx.xml。

方法的原型如下:

        public static FileSystem get(Configuration conf) throws IOException;
        public static FileSystem get(URI uri, Configuration conf) throws IOException;
        public static FileSystem get(final URI uri, final Configuration conf, final
          String user) throws IOException, InterruptedException;

其中,Configuration(org.apache.hadoop.conf.Configuration)类对象封装了客户端或服务器的配置;URI是指文件在HDFS里存放的路径。

(2)FSDataInputStream。

org.apache.hadoop.fs.FSDataInputStream,文件输入流,用于读取HDFS文件,它是Java中DataInputStream的派生类,支持从任意位置读取流式数据。

常用的读取方法是从指定的位置,读取指定大小的数据至缓存区。方法如下所示:

        int read(long position, byte[]buffer, int offset, int length)

还有用于随时定位的方法,可以定位到指定的读取点,如下所示:

        void seek(long desired)

通过long getPos()方法,还可以获取当前的读取点。

(3)FSDataOutputStream。

org.apache.hadoop.fs.FSDataOutputStream,文件输出流,是DataOutputStream的派生类,通过这个类,能够向HDFS顺序写入数据流。

通常的写入方法为write,如下所示:

        public void write(int b)

获取当前写入点的函数为long getPos()。

(4)Path。

org.apache.hadoop.fs.Path,文件与目录定位类,用于定义HDFS集群中指定的目录与文件绝对或相对路径。

可以通过多种方式构造Path,如通过URL的模式,通常编写方式为:

        hdfs://ip:port/directory/filename

Path可以与FileSystem的open函数相关联,通过Path构造访问路径,用FileSystem进行访问。

(5)FileStatus。

org.apache.hadoop.fs.FileStatus,文件状态显示类,可以获取文件与目录的元数据、长度、块大小、所属用户、编辑时间等信息,同时,可以设置文件用户、权限等内容。

FileStatus有很多get与set方法,如获取文件长度的long getLen()方法、设置文件权限的setPermission(FsPermission permission)方法等。

下面,我们开始Hadoop的Java操作之旅。

1.创建文件

FileSystem类里提供了很多API,用来创建文件,其中,最简单的一个是:

        public FSDataOutputStream create(Path f) throws IOException;

它创建一个Path类代表的文件,并返回一个输出流。这个方法有多个重载方法,可以用来设置是否覆盖已有文件、该文件复制的份数、写入时的缓冲区大小、文件块大小(block)、权限等。默认情况下,如果Path中文件的父目录(或者更上一级目录)不存在,这些目录会被自动创建。

2.读取数据

通过调用FileSystem实例的open方法打开文件,得到一个输入流。下面是使用FileSystem类读取HDFS中文件内容的完整程序:

        import java.net.URI;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FSDataInputStream;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.IOUtils;
        public class FileSystemCat {
            public static void main(String[]args) throws Exception {
              String uri = args[0];
              Configuration conf = new Configuration();
              FileSystem fs = FileSystem.get(URI.create(uri), conf);
              FSDataInputStream in = null;
              try {
                  in = fs.open(new Path(uri));
                  IOUtils.copyBytes(in, System.out, 4096, false);
              } finally {
                  IOUtils.closeStream(in);
              }
            }
        }

此外,FSDataInputStream类同时也实现了PositionedReadable(org.apache.hadoop.fs. PositionedReadable)接口,接口中定义的三个方法允许在任意位置读取文件的内容:

        public int read(long position, byte[]buffer, int offset, int length) throws IOException;
        public void readFully(long position, byte[]buffer, int offset, int length)
          throws IOException;
        public void readFully(long position, byte[]buffer) throws IOException;

结合第2章内容,下面我们结合程序实现深入剖析HDFS读文件时的数据流向过程。

(1)客户端通过调用FileSystem.open()方法打开一个文件,对于HDFS来讲,其实是调用DistributedFileSystem实例的open方法。

(2)DistributedFileSystem通过远程方法调用访问NameNode,获取该文件的前几个blocks所在的位置信息;针对每个block, NameNode都会返回有该block数据信息的所有DataNodes节点,比如配置的dfs.replication为3,就会每个block返回3个DataNodes节点信息,这些节点是按距离客户端的远近排序的,如果发起读文件的客户端就在包含该block的DataNode上,那么这个DataNode就排第一位(这种情况在做Map任务时常见),客户端就会从本机读取数据。

DistributedFileSystem的open方法返回一个FSDataInputStream, FSDataInputStream里包装着一个DFSInputStream, DFSInputStream真正管理DataNodes和NameNode的I/O。

(3)客户端调用FSDataInputStream.read()方法,FSDataInputStream里已经缓存了该文件前几个block所在的DataNode的地址,于是,从第一个block的第一个地址(也就是最近的DataNode)开始连接读取。

(4)反复调用read()方法,数据不断地从DataNode流向客户端。

(5)当一个block的数据读完时,DFSInputStream会关闭当前DataNode的连接,打开下一个block所在的最优DataNode的连接继续读取;这些对客户端是透明的,在客户端看来,就是在读一个连续的流。

(6)这样,一个block一个block地读下去,当需要使用更多block的存储信息时,DFSInputStream会再次调用NameNode,获取下一批block的存储位置信息,直到客户端停止读取,调用FSDataInputStream.close()方法,整个读取过程结束。

小提示

文件操作还可以使用Hadoop URL的方式,示例代码如下:

        import java.io.InputStream;
        import java.net.URL;
        import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
        import org.apache.hadoop.io.IOUtils;
        public class URLCat {
            static {
              URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
            }
            public static void main(String[]args) throws Exception {
              InputStream in = null;
              try {
                  in = new URL(args[0]).openStream();
                  IOUtils.copyBytes(in, System.out, 4096, false);
              } finally {
                  IOUtils.closeStream(in);
              }
            }
        }

在上面的程序中,先设置URLStreamHandlerFactory,然后通过URL打开一个流,读取流,就得到了文件的内容,通过IOUtils.copyBytes()把读到的内容写出到标准输出流里,也就是控制台上,从而实现了类似于Linux里的cat命令的功能。最后关闭输入流。

3.写入数据

与读操作类似,Hadoop对于写操作也提供了一个类:FSDataOutputStream,这个类重载了很多java.io.DataOutputStream的write方法,用于写入很多类型的数据,比如int、char、字节数组等。

HDFS写文件的示例代码如下:

        FileSystem hdfs = FileSystem.get(new Configuration());
        Path path = new Path("/testfile");
        FSDataOutputStream dos = hdfs.create(path);
        byte[]readBuf = "Hello World".getBytes("UTF-8");
        dos.write(readBuf, 0, readBuf.length);
        dos.close();
        hdfs.close();

如果希望向已有文件追加内容,可以调用:

        public FSDataOutputStream append(Path f) throws IOException;

如果文件不存在时,append方法也可以用来新建一个文件。

下面,我们结合以上的程序,深入剖析HDFS写文件时的数据流向过程。

(1)客户端调用DistributedFileSystem.create()方法创建一个文件。

(2)DistributedFileSystem向NameNode发起远程方法调用,创建一个文件,但是,NameNode没有把它关联到任何block上去;NameNode在这一步做了很多检查工作,保证该文件当前不存在,客户端有创建该文件的权限等。如果这些检查都通过了,NameNode创建一条新文件记录;否则,创建失败,客户端返回IOException。DistributedFileSystem返回一个FSDataOutputStream,像读文件时一样,这个FSDataOutputStream里包装着一个DFSOutputStream,由它来实际处理与DataNodes和NameNode的通信。

(3)客户端向DFSOutputStream里写数据,DFSOutputStream把数据分成包,丢进一个称为data queue的队列中。DataStreamer负责向NameNode申请新的block,新的block被分配在了一个或多个(默认为3个)节点上,这些节点就形成一个管道。

(4)DataStreamer把data queue里的包拿出来,通过管道输送给第1个节点,第1个节点再通过管道输送给第2个节点,第2个再输送给第3个。以此类推。

(5)DFSOutputStream同时还在内部维护一个通知队列,名叫ack queue,里面保存发过的数据包。一个包只有被所有管道上的DataNodes通知收到了,才会被移除。如果任意一个DataNode接收失败了,首先,管道关闭,然后把ack queue里的包都放回到data queue的头部,以便使失败节点的下游节点不会丢失这些数据。打开管道,把坏节点移除,数据会继续向其他好节点输送,直到管道上的节点都完成了。如果少复制了一个节点,向NameNode报告一下,说现在这个block没有达到设定的副本数,然后就返回成功了,后期,NameNode会组织一个异步的任务,把副本数恢复到设定值。然后,接下来的数据包和数据块正常写入。

如果多个DataNodes都失败了,会检测hdfs-site.xml里的dfs.replication.min参数,默认值是1,意思是只要有1个DataNode接收成功,就认为数据写入成功了。客户端就会收到写入成功的返回。后期,Hadoop会发起异步任务把副本数恢复到dfs.replication设置的值。

以上操作对客户端都是透明的,客户端不知道发生了这些事情,只知道写文件成功了。

(6)当客户端完成数据写入后,调用流的close()方法,这个操作把data queue里的所有剩余的包都发给管道。

(7)等所有包都收到了写成功的反馈后,客户端通知NameNode写文件完成了。因为DataStream写文件前就先向NameNode申请block的位置信息了,所以写文件完成时,NameNode已知道每个block的位置信息,它只需等最小的副本数写成功,就可以返回成功。

4.文件读写位置

读取文件时(FSDataInputStream),允许使用seek()方法在文件中定位。支持随机访问,理论上,可以从流的任何位置读取数据,但调用seek()方法的开销是相当巨大的,应该尽量少调用,尽可能地使程序做到顺序读。

由于HDFS只允许对一个打开的文件顺序写入,或向一个已有文件的尾部追加,不允许在任意位置写,FSDataOutputStream没有seek方法。但FSDataOutputStream类提供了一个getPos()方法,可以查询当前在往文件的哪个位置写的写入偏移量:

        public long getPos() throws IOException;

5.重命名

通过FileSystem.rename()方法,可为指定的HDFS文件重命名:

        protected void rename(Path src, Path dst, Options) throws IOException;

示例代码实现如下:

        Configuration conf = new Configuration();
        FileSystem hdfs = FileSystem.get(conf);
        Path frpath = new Path("/test");   //旧的文件名
        Path topath = new Path("/testNew");   //新的文件名
        boolean isRename = hdfs.rename(frpath, topath);
        String result = isRename? "成功" : "失败";

6.删除操作

通过FileSystem.delete()方法删除指定的HDFS文件或目录(永久删除):

        public boolean delete(Path f, boolean recursive) throws IOException;

其中,f为需要删除文件的完整路径,recursive用来确定是否进行递归删除。如果f是一个文件或空目录,则不论recursive是何值,都删除。如果f是一个非空目录,则recursive为true时,目录下内容全部删除;如果recursive为false,不删除,并抛出IOException。

示例代码实现如下:

        Path f = new Path(fileName);
        boolean isExists = hdfs.exists(f);
        if (isExists) { //if exists, delete
            boolean isDel = hdfs.delete(f, true);
            System.out.println(fileName + "  delete? \t" + isDel);
        } else {
            System.out.println(fileName + "  exist? \t" + isExists);
        }

7.文件夹操作

FileSystem中创建文件夹的方法如下:

        public boolean mkdirs(Path f) throws IOException;

与java.io.File.mkdirs方法一样,创建目录的同时,默认地创建缺失的父目录。我们一般不需要创建目录,一般在创建文件时,默认地就把所需的目录都创建好了。

目录创建的示例代码实现如下:

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path srcPath = new Path(path);
        boolean isok = fs.mkdirs(srcPath);
        if(isok) {
            System.out.println("create dir ok! ");
        } else {
            System.out.println("create dir failure");
        }
        fs.close();

使用FileSystem的listStatus()方法能够列出某个目录中的所有文件:

        public FileStatus[]listStatus(Path f) throws IOException
        public FileStatus[]listStatus(Path f, PathFilter filter) throws IOException
        public FileStatus[]listStatus(Path[]files) throws IOException
        public FileStatus[]listStatus(Path[]files, PathFilter filter)
          throws IOException

这一组方法都接收Path参数,如果Path是一个文件,返回值是一个数组,数组里只有一个元素,是这个Path代表的文件的FileStatus对象;如果Path是一个目录,返回值数组是该目录下的所有文件和目录的FileStatus组成的数组,有可能是一个0长数组;如果参数是Path[],则返回值相当于多次调用单Path,然后把返回值整合到一个数组里;如果参数中包含PathFilter,则PathFilter会对返回的文件或目录进行过滤,返回满足条件的文件或目录,条件由开发者自行定义。

FileSystem的globStatus方法利用通配符来列出文件和目录:

        public FileStatus[]globStatus(Path pathPattern) throws IOException;
        public FileStatus[]globStatus(Path pathPattern, PathFilter filter)
          throws IOException;

文件夹删除操作与文件删除类似。

其他关于文件夹的操作方法还有FileSystem.getWorkingDirectory(返回当前工作目录)、FileSystem.setWorkingDirectory(更改当前工作目录)等。

8.属性操作

FileSystem类中的getFileStatus()方法返回一个FileStatus实例,该FileStatus实例中,包含了该Path(文件或目录)的元数据信息:文件大小、block大小、复制的份数、最后修改时间、所有者、权限等。示例代码实现如下:

        FileStatus status = fs.getFileStatus(path);
        System.out.println("path = " + status.getPath());
        System.out.println("owner = " + status.getOwner());        System.out.println("block size = " + status.getBlockSize());
        System.out.println("permission = " + status.getPermission());
        System.out.println("replication = " + status.getReplication());

3.1.3 WebHDFS

Hadoop提供的Java Native API支持对文件或目录的操作,为开发者提供了极大的便利。为满足许多外部应用程序操作HDFS文件系统的需求,Hadoop提供了两种基于HTTP方式的接口:一是用于浏览文件系统的Web界面;另一个是WebHDFS REST API接口。

启动HDFS时,NameNode和DataNode各自启动了一个内置的Web服务器,显示了集群当前的基本状态和信息。默认配置下NameNode的首页地址是http://namenode-name:50070/。这个页面列出了集群里的所有DataNode和集群的基本状态。

这个Web界面也可以用来浏览整个文件系统。使用NameNode首页上的Browse the file system链接,输入需要查看的目录地址,即可看到,如图3-7所示。

图3-7 Web界面

WebHDFS基于HTTP,通过GET、PUT、POST和DELETE等操作,支持FileSystem/FileContext的全部API。具体操作类型见表3-11。

表3-11 WebHDFS的操作

在使用WebHDFS REST API接口前,要先对Hadoop进行配置和授权认证。编辑hdfs-site.xml文件,添加启用WebHDFS(dfs.webhdfs.enabled)、kerberos验证(dfs.web. authentication.kerberos.principal、dfs.web.authentication.kerberos.keytab)等属性配置。配置完成后,启动WebHDFS服务即可,如图3-8所示。

图3-8 启动WebHDFS服务

WebHDFS默认的HTTP服务端口是14000。需要说明的是,WebHDFS的FileSystem模式是“webhdfs://”, URI的格式如下:

        webhdfs://<HOST>:<HTTP_PORT>/<PATH>

与之对应的HDFS URI格式如下:

        hdfs://<HOST>:<RPC_PORT>/<PATH>

在REST API接口中,在path之前插入前缀“/webhdfs/v1”,操作语句被追加到最后,相应的HTTP URL格式如下:

        http://<HOST>:<HTTP_PORT>/webhdfs/v1/<PATH>? op=...

下面我们以具体实例,来测试一下WebHDFS的功能。使用curl命令工具在HDFS根目录下创建一个名为“webdir”的目录,如图3-9所示。

图3-9 WebHDFS创建目录的运行结果

3.1.4 其他接口

HDFS支持的使用接口除了前面介绍过的Java等以外,还有C、Thrift、HttpFS、HFTP、NFS等。下面简单介绍几种。

1. C接口

HDFS基于Java编写,并没有提供原生的C语言访问接口,但HDFS提供了基于JNI(Java Native Interface)的C调用接口libhdfs,使C语言访问HDFS成为可能。

libhdfs接口的头文件和库文件已包含在Hadoop发行版本中,可以直接使用。它的头文件hdfs.h一般位于{HADOOP_HOME}/include目录中,而其库文件libhdfs.so通常则位于{HADOOP_HOME}/lib/native目录中。不同的版本,库文件所在位置稍有不同。

通过libhdfs访问HDFS文件系统与使用C语言API访问普通操作系统的文件系统类似。C++访问HDFS的方式也与C语言类似。接口主要如下。

(1)建立、关闭与HDFS连接:hdfsConnect()、hdfsConnectAsUser()、hdfsDisconnect()。

(2)打开、关闭HDFS文件:hdfsOpenFile()、hdfsCloseFile()。当用hdfsOpenFile()创建文件时,可以指定replication和blocksize参数。

(3)读HDFS文件:hdfsRead()、hdfsPread()。

(4)写HDFS文件:hdfsWrite()。HDFS不支持随机写,只能是从文件头顺序写入。

(5)查询HDFS文件信息:hdfsGetPathInfo()。

(6)查询数据块所在节点信息:hdfsGetHosts()。返回一个或多个数据块所在数据节点的信息,一个数据块可能存在于多个数据节点上。

libhdfs中的函数是通过JNI调用Java虚拟机的,在虚拟机中构造对应的HDFS的Java类,然后反射调用该类的功能函数,占用内存较多,不适合对虚拟要求较高的场景。

下面是一个简单的例子:

        #include "hdfs.h"
        int main(int argc, char **argv) {
            hdfsFS fs = hdfsConnect("default", 0);
            const char *writePath = "/tmp/testfile.txt";
            hdfsFile writeFile =
              hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
            if(! writeFile) {
              fprintf(stderr, "Failed to open %s for writing! \n", writePath);
              exit(-1);
            }
            char *buffer = "Hello, World! ";
            tSize num_written_bytes =
              hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
            if (hdfsFlush(fs, writeFile)) {
              fprintf(stderr, "Failed to 'flush' %s\n", writePath);
              exit(-1);
            }
            hdfsCloseFile(fs, writeFile);
        }

2. HFTP

HFTP是一个可以实现从远程HDFS集群读取Hadoop文件系统数据的接口。HFTP默认是打开的,数据读取通过HTTP协议,允许以浏览器的方式访问和下载所有文件。这种方式带来便利的同时,也存在一定的安全隐患。

HFTP是一个只读的文件系统,如果试图用它写或者修改文件系统的状态,将会抛出一个错误。如果使用多个不同版本的HDFS集群时,需要在集群之间移动数据,HFTP是非常有用的。HFTP在不同HDFS版本之间都是兼容的,通常与distcp结合使用实现并行复制。

HSFTP是HFTP的一个扩展,默认使用HTTPS在传输时加密数据。

3. HttpFS

HttpFS是Cloudera公司提供的一个Web应用,一般部署在内嵌的Web服务器中,但独立于Hadoop的NameNode。

HttpFS是提供REST HTTP接口的服务器,可以支持全部HDFS文件系统操作。通过WebHDFS REST API,可以对HDFS进行读写等访问操作。与WebHDFS的区别是,不需要客户端,就可以访问Hadoop集群的每一个节点。

通过HttpFS,可以访问放置在防火墙后面的Hadoop集群数据。HttpFS可以作为一个网关角色,是唯一可以穿过防火墙访问内部集群数据的系统。

HttpFS的内置安全特性支持Hadoop伪身份验证和HTTP SPNEGO Kerberos及其他插件式(pluggable)验证机制。它还提供了对Hadoop代理用户的支持。