java抽象类import org.apache.hadoop.fs.FileSystem 定义了hadoop中的一个文件系统接口。
一、读取数据
1、从Hadoop URL读取数据
这个方法是通过FsURLStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHandlerFactory方法。每个java虚拟机只能调用一次这个方法,因此通常在静态方法中调用。如果已经声明一个FsURLStreamHandlerFactory实例,你将无法使用这个方法从hadoop读取数据。
import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
public class URLCat {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception{
InputStream in = null;
try {
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in, System.out, 4096, false);
}finally {
IOUtils.closeStream(in);
}
}
}
运行结果:
2、通过FileSystem API读取数据
使用FileSystem API来打开一个文件的输入流,可以避免第一种方法的问题。获取FileSystem实例有下面这几个静态工厂方法。
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException
Configration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如 conf/core-site.xml)。第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,若没有指定,则是默认的本地文件系统)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,若没有给定URI方案,则返回默认文件系统。第三,作为给定的用户来访问文件系统。
在某些情况下,需要使用本地文件系统的运行实例,此时可以用getlocal()方法
public static LocalFileSystem getlocal(Configuration conf) throws IOException
有了FileSystem实例之后,我们调用open()函数来获取文件的输入流
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class FileSystemCat {
public static void main(String[] args) throws Exception{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
InputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
}finally {
IOUtils.closeStream(in);
}
}
}
FSDataInputStream是继承DataInputStream的一个特殊类,可从流的任意位置读取数据。Seekable接口支持在文件中找到指定位置(这里是绝对位置,与skip()相对位置不同),seek(long pos)方法移到文件指定位置,getPos()查询当前位置,还有一个seekToNewSource(long targetPos)。
注意:seek()方法是一个相对高开销的操作,建议用流数据来构建应用的访问模式(如使用mapreduce),而非执行大量的seek()方法。
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class FileSystemDoubleCat {
public static void main(String[] args) throws Exception{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(5);
IOUtils.copyBytes(in, System.out, 4096, false);
}finally {
IOUtils.closeStream(in);
}
}
}
二、写入数据
FileSystem有一系列新建文件的方法。最简单的是指定一个Path对象,然后返回一个用于写入数据的输出流:
public FSDataOutputStream create(Path f) throws IOException
此方法有多个重载版本,允许我们指定是否需要强制覆盖现有的文件、文件备份数量、写入文件时所用缓冲区大小、文件块大小以及文件权限。
create()写入时如果文件不存在,会创建父目录,应用exists()方法检查父目录是否存在。
还有一个重载方法Progressable用于传递回调接口,这样子,可以把数据写入datanode的进度通知给应用。每次64kb数据包写入datanode管线后,打印一个时间点来显示整个运行过程,这个操作不是通过API实现的。
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class FileCopyWithProgress {
public static void main(String[] args) throws Exception{
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable(){
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
System.out.println();
}
}
FSDataOutputStream对象。有FSDataInputStream类似的方法。但HDFS只允许对一个已打开的文件顺序写入,或者现有文件的末尾追加数据。