3.1 HDFS接口与编程
HDFS提供了多种用户操作和编程接口,既通过Shell命令管理文件与目录、管理作业调度、控制与优化集群性能等,也提供了Java、C语言等的编程接口,用户可以通过编写程序对HDFS进行扩展。
3.1.1 Shell命令
HDFS资源URI的格式如下:
scheme://authority/path
其中scheme是协议名,一般是file或hdfs; authority是授权访问的主机名或IP; path是访问路径。例如:
hdfs://localhost:9000/user/chunk/test.txt
如果已经在core-site.xml里配置了fs.default.name=hdfs://localhost:9000,则仅使用/user/chunk/test.txt即可。
在HDFS的所有接口中,Shell命令行接口最简单,也是开发者比较熟悉的方式。我们通过使用“hdfs -help”命令,可以看到HDFS支持的文件系统命令,如图3-1所示。
图3-1 HDFS支持的文件系统命令
HDFS支持的文件系统命令主要有两类。
(1)用户命令:用于管理HDFS日常操作,如dfs、fsck、fetchdt等。
(2)系统管理命令:主要用于控制和管理HDFS集群,如balancer、namenode、datanode、dfsadmin、secondarynamenode等。限于篇幅,这里只介绍几种常用的命令模块。
1. hdfs dfs [GENERIC_OPTIONS][COMMAND_OPTIONS]
“hdfs dfs”提供了类似于Linux Shell一样的命令集,其用法与Linux Shell基本一致。下面详细介绍各个命令。
(1)appendToFile。
说明:将一个或者多个本地文件追加到目的文件。成功返回0,错误返回1。
格式:hdfs dfs -appendToFile <localsrc> ... <dst>
示例:
hdfs dfs -appendToFile localfile /user/hadoop/hadoopfile hdfs dfs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile hdfs dfs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile
(2)cat。
说明:将路径指定文件的内容输出到stdout。成功返回0,错误返回-1。
格式:hdfs dfs -cat URI [URI ...]
示例:
hdfs dfs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2 hdfs dfs -cat file:///file3 /user/hadoop/file4
(3)chgrp。
说明:改变文件所属的用户组。如果使用-R选项,则这一操作对整个目录结构递归执行。使用这一命令的用户必须是文件的所属用户,或者是超级用户。
格式:hdfs dfs -chgrp [-R]GROUP URI [URI ...]
(4)chmod。
说明:改变文件的权限。使用-R将使改变在目录结构下递归进行。命令的使用者必须是文件的所有者或者超级用户。
格式:hdfs dfs -chmod [-R]<MODE[, MODE]... | OCTALMODE> URI [URI ...]
(5)chown。
说明:改变文件的所属用户。如果使用-R选项,则这一操作对整个目录结构递归执行。使用这一命令的用户必须是文件在命令变更之前的所属用户,或者是超级用户。
格式:hdfs dfs -chown [-R][OWNER][:[GROUP]]URI [URI ]
(6)copyFromLocal。
说明:从本地复制,与put命令相似,但限定源路径是本地的。
格式:hdfs dfs -copyFromLocal <localsrc> URI
(7)copyToLocal。
说明:复制到本地,与get命令相似,但限定目的路径是本地的。
格式:hdfs dfs -copyToLocal [-ignorecrc][-crc]URI <localdst>
(8)count。
说明:计算文件、目录的数量。成功返回0,错误返回-1。
格式:hdfs dfs -count [-q][-h]<paths>
示例:
hdfs dfs -count hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2 hdfs dfs -count -q hdfs://nn1.example.com/file1 hdfs dfs -count -q -h hdfs://nn1.example.com/file1
(9)cp。
说明:将文件从源路径复制到目标路径。这个命令允许有多个源路径,但同时,目标路径必须是一个目录。成功返回0,错误返回-1。
格式:hdfs dfs -cp [-f][-p | -p[topax]]URI [URI ...]<dest>
示例:
hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir
(10)du。
说明:显示目录中所有文件的大小,或者当只指定一个文件时,显示此文件的大小。成功返回0,错误返回-1。
格式:hdfs dfs -du [-s][-h]URI [URI ...]
示例:
hdfs dfs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://nn.example.com/user/hadoop/dir1
(11)dus。
说明:显示文件的大小。此命令可以用“du -s”替代。
格式:hdfs dfs -dus <args>
(12)expunge。
作用:清空回收站。
格式:hdfs dfs -expunge
(13)get。
说明:复制文件到本地文件系统。可用“-ignorecrc”选项复制CRC校验失败的文件。使用“-crc”选项复制文件以及CRC信息。成功返回0,错误返回-1。
格式:hdfs dfs -get [-ignorecrc][-crc]<src> <localdst>
示例:
hdfs dfs -get /user/hadoop/file localfile hdfs dfs -get hdfs://nn.example.com/user/hadoop/file localfile
(14)getfacl。
说明:显示文件或者目录的权限控制列表。成功返回0,错误返回非零值。
格式:hdfs dfs -getfacl [-R]<path>
示例:
hdfs dfs -getfacl /file hdfs dfs -getfacl -R /dir
(15)getfattr。
说明:显示文件或者目录的扩展属性。成功返回0,错误返回非零值。
格式:hdfs dfs -getfattr [-R]-n name | -d [-e en]<path>
示例:
hdfs dfs -getfattr -d /file hdfs dfs -getfattr -R -n user.myAttr /dir
(16)getmerge。
说明:接受一个源目录和一个目标文件作为输入,并且将源目录中所有的文件连接成本地目标文件。addnl是可选的,用于指定在每个文件结尾添加一个换行符。
格式:hdfs dfs -getmerge <src> <localdst> [addnl]
(17)ls。
说明:与Linux中一样,返回子目录或子文件列表。成功返回0,错误返回-1。
格式:hdfs dfs -ls [-R]<args>
示例:
hdfs dfs -ls /user/hadoop/file1
(18)lsr。
说明:ls命令的递归版本,一般使用“ls -R”代替。
格式:hdfs dfs -lsr <args>
(19)mkdir。
说明:创建目录,加-p选项创建多层目录。成功返回0,错误返回-1。
格式:hdfs dfs -mkdir [-p]<paths>
示例:
hdfs dfs -mkdir /user/hadoop/dir1 /user/hadoop/dir2 hdfs dfs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir
(20)moveFromLocal。
说明:类似put,区别在于put操作完成后删除。
格式:hdfs dfs -moveFromLocal <localsrc> <dst>
(21)mv。
说明:将文件从源路径移动到目标路径。这个命令允许有多个源路径,此时,目标路径必须是一个目录。不允许在不同的文件系统间移动文件。成功返回0,错误返回-1。
格式:hdfs dfs -mv URI [URI ...]<dest>
示例:
hdfs dfs -mv /user/hadoop/file1 /user/hadoop/file2 hdfs dfs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2 hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1
(22)put。
说明:从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入设备中读取输入,写入目标文件系统。成功返回0,错误返回-1。
格式:hdfs dfs -put <localsrc> ... <dst>
示例:
hdfs dfs -put localfile /user/hadoop/hadoopfile hdfs dfs -put localfile1 localfile2 /user/hadoop/hadoopdir hdfs dfs -put localfile hdfs://nn.example.com/hadoop/hadoopfile
(23)rm。
说明:删除指定的文件或目录。成功返回0,错误返回-1。
格式:hdfs dfs -rm [-f][-r|-R][-skipTrash]URI [URI ...]
示例:
hdfs dfs -rm hdfs://nn.example.com/file /user/hadoop/emptydir
(24)rmr。
说明:rm的递归版本,已过时,一般使用“rm -r”代替。
格式:hdfs dfs -rmr [-skipTrash]URI [URI ...]
(25)setfacl。
说明:设置文件或者目录的权限控制列表。成功返回0,错误返回非零值。
格式:hdfs dfs -setfacl [-R][-b|-k -m|-x <acl_spec> <path>]|[--set <acl_spec> <path>]
示例:
hdfs dfs -setfacl -m user:hadoop:rw- /file hdfs dfs -setfacl -x user:hadoop /file hdfs dfs -setfacl -b /file hdfs dfs -setfacl -k /dir hdfs dfs -setfacl --set user::rw-, user:hadoop:rw-, group::r--, other::r-- /file hdfs dfs -setfacl -R -m user:hadoop:r-x /dir hdfs dfs -setfacl -m default:user:hadoop:r-x /dir
(26)setfattr。
说明:设置文件或者目录的扩展属性。成功返回0,错误返回非零值。
格式:hdfs dfs -setfattr -n name [-v value]| -x name <path>
示例:
hdfs dfs -setfattr -n user.myAttr -v myValue /file hdfs dfs -setfattr -n user.noValue /file hdfs dfs -setfattr -x user.myAttr /file
(27)setrep。
说明:改变文件和目录的复制因子。成功返回0,错误返回-1。
格式:hdfs dfs -setrep [-R][-w]<numReplicas> <path>
示例:
hdfs dfs -setrep -w 3 /user/hadoop/dir1
(28)stat。
说明:返回指定路径的统计信息。成功返回0,错误返回-1。
格式:hdfs dfs -stat URI [URI ...]
示例:
hdfs dfs -stat path
(29)tail。
说明:将文件尾部1KB的内容输出到stdout。成功返回0,错误返回-1。
格式:hdfs dfs -tail [-f]URI
示例:
hdfs dfs -tail pathname
(30)test
说明:检查文件。选项“-e”检查文件是否存在,如果存在则返回0;选项“-z”检查文件是否为0字节,如果是则返回0;选项“-d”检查路径是否为目录,如果是则返回1,否则返回0。
格式:hdfs dfs -test -[ezd]URI
示例:
hdfs dfs -test -e filename
(31)text。
说明:将源文件输出为文本格式。允许的格式是zip和TextRecordInputStream。
格式:hdfs dfs -text <src>
(32)touchz。
说明:创建一个空文件。成功返回0,错误返回-1。
格式:hdfs dfs -touchz URI [URI ...]
示例:
hdfs dfs -touchz pathname
小提示
“hadoop dfs”与“hdfs dfs”都是操作HDFS文件系统的命令,“hadoop dfs”属于早期版本的格式,已经过时,一般使用“hdfs dfs”。
“hadoop fs”也是文件系统操作命令,但使用范围更广,能够操作其他格式文件系统,如local、HDFS等,可以在本地与Hadoop分布式文件系统的交互操作中使用。
2. hdfs fsck [GENERIC_OPTIONS]<path> [-list-corruptfileblocks | [-move | -delete| -openforwrite][-files [-blocks [-locations | -racks]]]][-includeSnapshots]
fsck是一个文件系统健康状况检查工具,用来检查各类问题,比如,文件块丢失等(如图3-2所示)。但是,注意它不会主动恢复备份缺失的block,这个是由NameNode单独的线程异步处理的。
图3-2 fsck命令的运行结果
fsck命令的参数说明见表3-1。
表3-1 fsck参数的说明
3. hdfs datanode [-regular | -rollback | -rollingupgrade rollback]
运行一个HDFS集群的数据节点。参数说明见表3-2。
表3-2 hdfs datanode命令参数的说明
4. hdfs namenode [GENERIC_OPTIONS]
“hdfs namenode”是运行NameNode的命令,是一个比较核心的工具。该命令的主要参数说明见表3-3。
表3-3 hdfs namenode命令参数的说明
5. hdfs dfsadmin [GENERIC_OPTIONS]
dfsadmin是一个多任务的工具,我们可以使用它来获取HDFS的状态信息,以及在HDFS上执行的一系列管理操作。该命令的主要参数说明见表3-4。
表3-4 hdfs dfsadmin命令参数的说明
续表
dfsadmin命令的使用示例如图3-3所示。
图3-3 dfsadmin命令的使用示例
6. hdfs cacheadmin
管理员和用户通过“hdfs cacheadmin”命令管理缓存资源。
缓存指令由一个唯一的无重复的64位整数ID来标识。即使缓存指令后来被删除了,ID也不会重复使用。缓存池由一个唯一的字符串名称来标识。
(1)增加缓存:addDirective。
用法:hdfs cacheadmin -addDirective -path <path> -pool <pool-name> [-force][-replication<replication>][-ttl <time-to-live>]
参数说明见表3-5。
表3-5 addDirective的参数说明
(2)删除一个缓存:removeDirective。
用法:hdfs cacheadmin -removeDirective <id>
参数id指定要删除的缓存指令的ID。删除时,必须对该指令的缓存池拥有写权限。
(3)删除指定路径下的每一个缓存:removeDirectives。
用法:hdfs cacheadmin -removeDirectives <path>
参数path中设置要删除的缓存指令的路径。删除时必须对该指令的缓存池拥有写权限。
(4)缓存列表:listDirectives。
用法:hdfs cacheadmin -listDirectives [-stats][-path <path>][-pool <pool>]
参数说明见表3-6。
表3-6 listDirectives的参数说明
(5)新增缓存池:addPool。
用法:hdfs cacheadmin -addPool <name> [-owner <owner>][-group <group>][-mode<mode>][-limit <limit>][-maxTtl <maxTtl>
参数说明见表3-7。
表3-7 addPool的参数说明
(6)修改缓存池:modifyPool。
用法:hdfs cacheadmin -modifyPool <name> [-owner <owner>][-group <group>][-mode<mode>][-limit <limit>][-maxTtl <maxTtl>]
参数说明见表3-8。
表3-8 modifyPool的参数说明
(7)删除缓存池:removePool。
用法:hdfs cacheadmin -removePool <name>
参数name指定要删除的缓存池的名称。
(8)缓存池列表:listPools。
用法:hdfs cacheadmin -listPools [-stats][<name>]
参数说明见表3-9。
表3-9 addPool的参数说明
7. hdfs balancer [-threshold <threshold>][-policy <policy>]
HDFS集群非常容易出现机器与机器之间磁盘利用率不平衡的情况,尤其是增加新的数据节点时。保证HDFS中的数据平衡非常重要。HDFS出现不平衡的状况将引发很多问题,比如MapReduce程序无法很好地利用本地计算的优势、机器之间无法达到更好的网络带宽使用率等。
在Hadoop中,包含一个Balancer程序,可以调节HDFS集群平衡的状态。启动Balancer服务时,界面如图3-4所示。
图3-4 启动Balancer服务
服务启动后,集群管理人员可用balancer命令进行分析和再平衡数据,如图3-5所示。
图3-5 可用balancer命令进行分析和再平衡数据
参数threshold是判断集群是否平衡的目标参数,表示HDFS达到平衡状态的磁盘使用率偏差值。默认设置为10,参数取值范围是0~100。如果机器之间磁盘使用率偏差小于10%,我们就认为HDFS集群已经达到了平衡的状态。
8. hdfs version
hdfs version命令用于查看当前系统的版本,运行示例如图3-6所示。
图3-6 使用hdfs version命令查看当前系统的版本
3.1.2 Java接口操作
由于Hadoop本身就是使用Java语言编写的,理论上,通过Java API能够调用所有的Hadoop文件系统的操作接口。
Hadoop有一个抽象的文件系统概念,在Java抽象类org.apache.hadoop.fs中定义了接口。只要某个文件系统实现了这个接口,那么,它就可以作为Hadoop支持的文件系统。目前Hadoop能够支持的文件系统如表3-10所示。
表3-10 Hadoop文件类的实现
在Hadoop中,主要是定义了一组分布式文件系统和通用的I/O组件和接口,Hadoop的文件系统准确地应该称作Hadoop I/O。而HDFS是实现该文件接口的Hadoop自带的分布式文件项目,是对Hadoop I/O接口的实现。在处理大数据集时,为实现最优性能,通常使用HDFS存储。
org.apache.hadoop.fs包由接口类(FsConstants、Syncable等)、Java类(AbstractFileSystem、BlockLocation、FileSystem、FileUtil、FSDataInputStream等)、枚举类型(如CreateFlag)、异常类(ChecksumException、InvalidPathException等)和错误类(如FSError)组成。每个子对象中都定义了相应的方法,通过对org.apache.hadoop.fs包的封装与调用,可以拓展HDFS应用,更好地帮助用户使用集群海量存储。
在介绍Java接口操作之前,先介绍几个常用的Java类。
(1)FileSystem。
org.apache.hadoop.fs.FileSystem,通用文件系统基类,用于与HDFS文件系统交互,编写的HDFS程序都需要重写FileSystem类。通过FileSystem,可以非常方便地像操作本地文件系统一样操作HDFS集群文件。
FileSystem提供了get方法,一个是通过配置文件获取与HDFS的连接;一个是通过URL指定配置文件,获取与HDFS的连接,URL的格式为hdfs://namenode/xxx.xml。
方法的原型如下:
public static FileSystem get(Configuration conf) throws IOException; public static FileSystem get(URI uri, Configuration conf) throws IOException; public static FileSystem get(final URI uri, final Configuration conf, final String user) throws IOException, InterruptedException;
其中,Configuration(org.apache.hadoop.conf.Configuration)类对象封装了客户端或服务器的配置;URI是指文件在HDFS里存放的路径。
(2)FSDataInputStream。
org.apache.hadoop.fs.FSDataInputStream,文件输入流,用于读取HDFS文件,它是Java中DataInputStream的派生类,支持从任意位置读取流式数据。
常用的读取方法是从指定的位置,读取指定大小的数据至缓存区。方法如下所示:
int read(long position, byte[]buffer, int offset, int length)
还有用于随时定位的方法,可以定位到指定的读取点,如下所示:
void seek(long desired)
通过long getPos()方法,还可以获取当前的读取点。
(3)FSDataOutputStream。
org.apache.hadoop.fs.FSDataOutputStream,文件输出流,是DataOutputStream的派生类,通过这个类,能够向HDFS顺序写入数据流。
通常的写入方法为write,如下所示:
public void write(int b)
获取当前写入点的函数为long getPos()。
(4)Path。
org.apache.hadoop.fs.Path,文件与目录定位类,用于定义HDFS集群中指定的目录与文件绝对或相对路径。
可以通过多种方式构造Path,如通过URL的模式,通常编写方式为:
hdfs://ip:port/directory/filename
Path可以与FileSystem的open函数相关联,通过Path构造访问路径,用FileSystem进行访问。
(5)FileStatus。
org.apache.hadoop.fs.FileStatus,文件状态显示类,可以获取文件与目录的元数据、长度、块大小、所属用户、编辑时间等信息,同时,可以设置文件用户、权限等内容。
FileStatus有很多get与set方法,如获取文件长度的long getLen()方法、设置文件权限的setPermission(FsPermission permission)方法等。
下面,我们开始Hadoop的Java操作之旅。
1.创建文件
FileSystem类里提供了很多API,用来创建文件,其中,最简单的一个是:
public FSDataOutputStream create(Path f) throws IOException;
它创建一个Path类代表的文件,并返回一个输出流。这个方法有多个重载方法,可以用来设置是否覆盖已有文件、该文件复制的份数、写入时的缓冲区大小、文件块大小(block)、权限等。默认情况下,如果Path中文件的父目录(或者更上一级目录)不存在,这些目录会被自动创建。
2.读取数据
通过调用FileSystem实例的open方法打开文件,得到一个输入流。下面是使用FileSystem类读取HDFS中文件内容的完整程序:
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class FileSystemCat { public static void main(String[]args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); FSDataInputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
此外,FSDataInputStream类同时也实现了PositionedReadable(org.apache.hadoop.fs. PositionedReadable)接口,接口中定义的三个方法允许在任意位置读取文件的内容:
public int read(long position, byte[]buffer, int offset, int length) throws IOException; public void readFully(long position, byte[]buffer, int offset, int length) throws IOException; public void readFully(long position, byte[]buffer) throws IOException;
结合第2章内容,下面我们结合程序实现深入剖析HDFS读文件时的数据流向过程。
(1)客户端通过调用FileSystem.open()方法打开一个文件,对于HDFS来讲,其实是调用DistributedFileSystem实例的open方法。
(2)DistributedFileSystem通过远程方法调用访问NameNode,获取该文件的前几个blocks所在的位置信息;针对每个block, NameNode都会返回有该block数据信息的所有DataNodes节点,比如配置的dfs.replication为3,就会每个block返回3个DataNodes节点信息,这些节点是按距离客户端的远近排序的,如果发起读文件的客户端就在包含该block的DataNode上,那么这个DataNode就排第一位(这种情况在做Map任务时常见),客户端就会从本机读取数据。
DistributedFileSystem的open方法返回一个FSDataInputStream, FSDataInputStream里包装着一个DFSInputStream, DFSInputStream真正管理DataNodes和NameNode的I/O。
(3)客户端调用FSDataInputStream.read()方法,FSDataInputStream里已经缓存了该文件前几个block所在的DataNode的地址,于是,从第一个block的第一个地址(也就是最近的DataNode)开始连接读取。
(4)反复调用read()方法,数据不断地从DataNode流向客户端。
(5)当一个block的数据读完时,DFSInputStream会关闭当前DataNode的连接,打开下一个block所在的最优DataNode的连接继续读取;这些对客户端是透明的,在客户端看来,就是在读一个连续的流。
(6)这样,一个block一个block地读下去,当需要使用更多block的存储信息时,DFSInputStream会再次调用NameNode,获取下一批block的存储位置信息,直到客户端停止读取,调用FSDataInputStream.close()方法,整个读取过程结束。
小提示
文件操作还可以使用Hadoop URL的方式,示例代码如下:
import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; public class URLCat { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[]args) throws Exception { InputStream in = null; try { in = new URL(args[0]).openStream(); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
在上面的程序中,先设置URLStreamHandlerFactory,然后通过URL打开一个流,读取流,就得到了文件的内容,通过IOUtils.copyBytes()把读到的内容写出到标准输出流里,也就是控制台上,从而实现了类似于Linux里的cat命令的功能。最后关闭输入流。
3.写入数据
与读操作类似,Hadoop对于写操作也提供了一个类:FSDataOutputStream,这个类重载了很多java.io.DataOutputStream的write方法,用于写入很多类型的数据,比如int、char、字节数组等。
HDFS写文件的示例代码如下:
FileSystem hdfs = FileSystem.get(new Configuration()); Path path = new Path("/testfile"); FSDataOutputStream dos = hdfs.create(path); byte[]readBuf = "Hello World".getBytes("UTF-8"); dos.write(readBuf, 0, readBuf.length); dos.close(); hdfs.close();
如果希望向已有文件追加内容,可以调用:
public FSDataOutputStream append(Path f) throws IOException;
如果文件不存在时,append方法也可以用来新建一个文件。
下面,我们结合以上的程序,深入剖析HDFS写文件时的数据流向过程。
(1)客户端调用DistributedFileSystem.create()方法创建一个文件。
(2)DistributedFileSystem向NameNode发起远程方法调用,创建一个文件,但是,NameNode没有把它关联到任何block上去;NameNode在这一步做了很多检查工作,保证该文件当前不存在,客户端有创建该文件的权限等。如果这些检查都通过了,NameNode创建一条新文件记录;否则,创建失败,客户端返回IOException。DistributedFileSystem返回一个FSDataOutputStream,像读文件时一样,这个FSDataOutputStream里包装着一个DFSOutputStream,由它来实际处理与DataNodes和NameNode的通信。
(3)客户端向DFSOutputStream里写数据,DFSOutputStream把数据分成包,丢进一个称为data queue的队列中。DataStreamer负责向NameNode申请新的block,新的block被分配在了一个或多个(默认为3个)节点上,这些节点就形成一个管道。
(4)DataStreamer把data queue里的包拿出来,通过管道输送给第1个节点,第1个节点再通过管道输送给第2个节点,第2个再输送给第3个。以此类推。
(5)DFSOutputStream同时还在内部维护一个通知队列,名叫ack queue,里面保存发过的数据包。一个包只有被所有管道上的DataNodes通知收到了,才会被移除。如果任意一个DataNode接收失败了,首先,管道关闭,然后把ack queue里的包都放回到data queue的头部,以便使失败节点的下游节点不会丢失这些数据。打开管道,把坏节点移除,数据会继续向其他好节点输送,直到管道上的节点都完成了。如果少复制了一个节点,向NameNode报告一下,说现在这个block没有达到设定的副本数,然后就返回成功了,后期,NameNode会组织一个异步的任务,把副本数恢复到设定值。然后,接下来的数据包和数据块正常写入。
如果多个DataNodes都失败了,会检测hdfs-site.xml里的dfs.replication.min参数,默认值是1,意思是只要有1个DataNode接收成功,就认为数据写入成功了。客户端就会收到写入成功的返回。后期,Hadoop会发起异步任务把副本数恢复到dfs.replication设置的值。
以上操作对客户端都是透明的,客户端不知道发生了这些事情,只知道写文件成功了。
(6)当客户端完成数据写入后,调用流的close()方法,这个操作把data queue里的所有剩余的包都发给管道。
(7)等所有包都收到了写成功的反馈后,客户端通知NameNode写文件完成了。因为DataStream写文件前就先向NameNode申请block的位置信息了,所以写文件完成时,NameNode已知道每个block的位置信息,它只需等最小的副本数写成功,就可以返回成功。
4.文件读写位置
读取文件时(FSDataInputStream),允许使用seek()方法在文件中定位。支持随机访问,理论上,可以从流的任何位置读取数据,但调用seek()方法的开销是相当巨大的,应该尽量少调用,尽可能地使程序做到顺序读。
由于HDFS只允许对一个打开的文件顺序写入,或向一个已有文件的尾部追加,不允许在任意位置写,FSDataOutputStream没有seek方法。但FSDataOutputStream类提供了一个getPos()方法,可以查询当前在往文件的哪个位置写的写入偏移量:
public long getPos() throws IOException;
5.重命名
通过FileSystem.rename()方法,可为指定的HDFS文件重命名:
protected void rename(Path src, Path dst, Options) throws IOException;
示例代码实现如下:
Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); Path frpath = new Path("/test"); //旧的文件名 Path topath = new Path("/testNew"); //新的文件名 boolean isRename = hdfs.rename(frpath, topath); String result = isRename? "成功" : "失败";
6.删除操作
通过FileSystem.delete()方法删除指定的HDFS文件或目录(永久删除):
public boolean delete(Path f, boolean recursive) throws IOException;
其中,f为需要删除文件的完整路径,recursive用来确定是否进行递归删除。如果f是一个文件或空目录,则不论recursive是何值,都删除。如果f是一个非空目录,则recursive为true时,目录下内容全部删除;如果recursive为false,不删除,并抛出IOException。
示例代码实现如下:
Path f = new Path(fileName); boolean isExists = hdfs.exists(f); if (isExists) { //if exists, delete boolean isDel = hdfs.delete(f, true); System.out.println(fileName + " delete? \t" + isDel); } else { System.out.println(fileName + " exist? \t" + isExists); }
7.文件夹操作
FileSystem中创建文件夹的方法如下:
public boolean mkdirs(Path f) throws IOException;
与java.io.File.mkdirs方法一样,创建目录的同时,默认地创建缺失的父目录。我们一般不需要创建目录,一般在创建文件时,默认地就把所需的目录都创建好了。
目录创建的示例代码实现如下:
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(path); boolean isok = fs.mkdirs(srcPath); if(isok) { System.out.println("create dir ok! "); } else { System.out.println("create dir failure"); } fs.close();
使用FileSystem的listStatus()方法能够列出某个目录中的所有文件:
public FileStatus[]listStatus(Path f) throws IOException public FileStatus[]listStatus(Path f, PathFilter filter) throws IOException public FileStatus[]listStatus(Path[]files) throws IOException public FileStatus[]listStatus(Path[]files, PathFilter filter) throws IOException
这一组方法都接收Path参数,如果Path是一个文件,返回值是一个数组,数组里只有一个元素,是这个Path代表的文件的FileStatus对象;如果Path是一个目录,返回值数组是该目录下的所有文件和目录的FileStatus组成的数组,有可能是一个0长数组;如果参数是Path[],则返回值相当于多次调用单Path,然后把返回值整合到一个数组里;如果参数中包含PathFilter,则PathFilter会对返回的文件或目录进行过滤,返回满足条件的文件或目录,条件由开发者自行定义。
FileSystem的globStatus方法利用通配符来列出文件和目录:
public FileStatus[]globStatus(Path pathPattern) throws IOException; public FileStatus[]globStatus(Path pathPattern, PathFilter filter) throws IOException;
文件夹删除操作与文件删除类似。
其他关于文件夹的操作方法还有FileSystem.getWorkingDirectory(返回当前工作目录)、FileSystem.setWorkingDirectory(更改当前工作目录)等。
8.属性操作
FileSystem类中的getFileStatus()方法返回一个FileStatus实例,该FileStatus实例中,包含了该Path(文件或目录)的元数据信息:文件大小、block大小、复制的份数、最后修改时间、所有者、权限等。示例代码实现如下:
FileStatus status = fs.getFileStatus(path); System.out.println("path = " + status.getPath()); System.out.println("owner = " + status.getOwner()); System.out.println("block size = " + status.getBlockSize()); System.out.println("permission = " + status.getPermission()); System.out.println("replication = " + status.getReplication());
3.1.3 WebHDFS
Hadoop提供的Java Native API支持对文件或目录的操作,为开发者提供了极大的便利。为满足许多外部应用程序操作HDFS文件系统的需求,Hadoop提供了两种基于HTTP方式的接口:一是用于浏览文件系统的Web界面;另一个是WebHDFS REST API接口。
启动HDFS时,NameNode和DataNode各自启动了一个内置的Web服务器,显示了集群当前的基本状态和信息。默认配置下NameNode的首页地址是http://namenode-name:50070/。这个页面列出了集群里的所有DataNode和集群的基本状态。
这个Web界面也可以用来浏览整个文件系统。使用NameNode首页上的Browse the file system链接,输入需要查看的目录地址,即可看到,如图3-7所示。
图3-7 Web界面
WebHDFS基于HTTP,通过GET、PUT、POST和DELETE等操作,支持FileSystem/FileContext的全部API。具体操作类型见表3-11。
表3-11 WebHDFS的操作
在使用WebHDFS REST API接口前,要先对Hadoop进行配置和授权认证。编辑hdfs-site.xml文件,添加启用WebHDFS(dfs.webhdfs.enabled)、kerberos验证(dfs.web. authentication.kerberos.principal、dfs.web.authentication.kerberos.keytab)等属性配置。配置完成后,启动WebHDFS服务即可,如图3-8所示。
图3-8 启动WebHDFS服务
WebHDFS默认的HTTP服务端口是14000。需要说明的是,WebHDFS的FileSystem模式是“webhdfs://”, URI的格式如下:
webhdfs://<HOST>:<HTTP_PORT>/<PATH>
与之对应的HDFS URI格式如下:
hdfs://<HOST>:<RPC_PORT>/<PATH>
在REST API接口中,在path之前插入前缀“/webhdfs/v1”,操作语句被追加到最后,相应的HTTP URL格式如下:
http://<HOST>:<HTTP_PORT>/webhdfs/v1/<PATH>? op=...
下面我们以具体实例,来测试一下WebHDFS的功能。使用curl命令工具在HDFS根目录下创建一个名为“webdir”的目录,如图3-9所示。
图3-9 WebHDFS创建目录的运行结果
3.1.4 其他接口
HDFS支持的使用接口除了前面介绍过的Java等以外,还有C、Thrift、HttpFS、HFTP、NFS等。下面简单介绍几种。
1. C接口
HDFS基于Java编写,并没有提供原生的C语言访问接口,但HDFS提供了基于JNI(Java Native Interface)的C调用接口libhdfs,使C语言访问HDFS成为可能。
libhdfs接口的头文件和库文件已包含在Hadoop发行版本中,可以直接使用。它的头文件hdfs.h一般位于{HADOOP_HOME}/include目录中,而其库文件libhdfs.so通常则位于{HADOOP_HOME}/lib/native目录中。不同的版本,库文件所在位置稍有不同。
通过libhdfs访问HDFS文件系统与使用C语言API访问普通操作系统的文件系统类似。C++访问HDFS的方式也与C语言类似。接口主要如下。
(1)建立、关闭与HDFS连接:hdfsConnect()、hdfsConnectAsUser()、hdfsDisconnect()。
(2)打开、关闭HDFS文件:hdfsOpenFile()、hdfsCloseFile()。当用hdfsOpenFile()创建文件时,可以指定replication和blocksize参数。
(3)读HDFS文件:hdfsRead()、hdfsPread()。
(4)写HDFS文件:hdfsWrite()。HDFS不支持随机写,只能是从文件头顺序写入。
(5)查询HDFS文件信息:hdfsGetPathInfo()。
(6)查询数据块所在节点信息:hdfsGetHosts()。返回一个或多个数据块所在数据节点的信息,一个数据块可能存在于多个数据节点上。
libhdfs中的函数是通过JNI调用Java虚拟机的,在虚拟机中构造对应的HDFS的Java类,然后反射调用该类的功能函数,占用内存较多,不适合对虚拟要求较高的场景。
下面是一个简单的例子:
#include "hdfs.h" int main(int argc, char **argv) { hdfsFS fs = hdfsConnect("default", 0); const char *writePath = "/tmp/testfile.txt"; hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); if(! writeFile) { fprintf(stderr, "Failed to open %s for writing! \n", writePath); exit(-1); } char *buffer = "Hello, World! "; tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1); if (hdfsFlush(fs, writeFile)) { fprintf(stderr, "Failed to 'flush' %s\n", writePath); exit(-1); } hdfsCloseFile(fs, writeFile); }
2. HFTP
HFTP是一个可以实现从远程HDFS集群读取Hadoop文件系统数据的接口。HFTP默认是打开的,数据读取通过HTTP协议,允许以浏览器的方式访问和下载所有文件。这种方式带来便利的同时,也存在一定的安全隐患。
HFTP是一个只读的文件系统,如果试图用它写或者修改文件系统的状态,将会抛出一个错误。如果使用多个不同版本的HDFS集群时,需要在集群之间移动数据,HFTP是非常有用的。HFTP在不同HDFS版本之间都是兼容的,通常与distcp结合使用实现并行复制。
HSFTP是HFTP的一个扩展,默认使用HTTPS在传输时加密数据。
3. HttpFS
HttpFS是Cloudera公司提供的一个Web应用,一般部署在内嵌的Web服务器中,但独立于Hadoop的NameNode。
HttpFS是提供REST HTTP接口的服务器,可以支持全部HDFS文件系统操作。通过WebHDFS REST API,可以对HDFS进行读写等访问操作。与WebHDFS的区别是,不需要客户端,就可以访问Hadoop集群的每一个节点。
通过HttpFS,可以访问放置在防火墙后面的Hadoop集群数据。HttpFS可以作为一个网关角色,是唯一可以穿过防火墙访问内部集群数据的系统。
HttpFS的内置安全特性支持Hadoop伪身份验证和HTTP SPNEGO Kerberos及其他插件式(pluggable)验证机制。它还提供了对Hadoop代理用户的支持。