淘先锋技术网

首页 1 2 3 4 5 6 7

java抽象类import org.apache.hadoop.fs.FileSystem 定义了hadoop中的一个文件系统接口。
一、读取数据
1、从Hadoop URL读取数据
这个方法是通过FsURLStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHandlerFactory方法。每个java虚拟机只能调用一次这个方法,因此通常在静态方法中调用。如果已经声明一个FsURLStreamHandlerFactory实例,你将无法使用这个方法从hadoop读取数据。

import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;

public class URLCat {
	static {
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
	}
	public static void main(String[] args) throws Exception{
		InputStream in = null;
		try {
			in = new URL(args[0]).openStream();
			IOUtils.copyBytes(in, System.out, 4096, false);
		}finally {
			IOUtils.closeStream(in);
		}
	}
}

运行结果:
在这里插入图片描述
2、通过FileSystem API读取数据
使用FileSystem API来打开一个文件的输入流,可以避免第一种方法的问题。获取FileSystem实例有下面这几个静态工厂方法。
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException
Configration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如 conf/core-site.xml)。第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,若没有指定,则是默认的本地文件系统)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,若没有给定URI方案,则返回默认文件系统。第三,作为给定的用户来访问文件系统。
在某些情况下,需要使用本地文件系统的运行实例,此时可以用getlocal()方法
public static LocalFileSystem getlocal(Configuration conf) throws IOException
有了FileSystem实例之后,我们调用open()函数来获取文件的输入流
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileSystemCat {
	public static void main(String[] args) throws Exception{
		String uri = args[0];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		InputStream in = null;
		try {
			in = fs.open(new Path(uri));
			IOUtils.copyBytes(in, System.out, 4096, false);
		}finally {
			IOUtils.closeStream(in);
		}
	}
}

FSDataInputStream是继承DataInputStream的一个特殊类,可从流的任意位置读取数据。Seekable接口支持在文件中找到指定位置(这里是绝对位置,与skip()相对位置不同),seek(long pos)方法移到文件指定位置,getPos()查询当前位置,还有一个seekToNewSource(long targetPos)。
注意:seek()方法是一个相对高开销的操作,建议用流数据来构建应用的访问模式(如使用mapreduce),而非执行大量的seek()方法。

import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class FileSystemDoubleCat {
	public static void main(String[] args) throws Exception{
		String uri = args[0];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		FSDataInputStream in = null;
		try {
			in = fs.open(new Path(uri));
			IOUtils.copyBytes(in, System.out, 4096, false);
			in.seek(5);
			IOUtils.copyBytes(in, System.out, 4096, false);
		}finally {
			IOUtils.closeStream(in);
		}
	}
}

二、写入数据
FileSystem有一系列新建文件的方法。最简单的是指定一个Path对象,然后返回一个用于写入数据的输出流:
public FSDataOutputStream create(Path f) throws IOException
此方法有多个重载版本,允许我们指定是否需要强制覆盖现有的文件、文件备份数量、写入文件时所用缓冲区大小、文件块大小以及文件权限。
create()写入时如果文件不存在,会创建父目录,应用exists()方法检查父目录是否存在。
还有一个重载方法Progressable用于传递回调接口,这样子,可以把数据写入datanode的进度通知给应用。每次64kb数据包写入datanode管线后,打印一个时间点来显示整个运行过程,这个操作不是通过API实现的。

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

public class FileCopyWithProgress {
	public static void main(String[] args) throws Exception{
		String localSrc = args[0];
		String dst = args[1];
		InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
		
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		OutputStream out = fs.create(new Path(dst), new Progressable(){
			public void progress() {
				System.out.print(".");
			}
		});
		IOUtils.copyBytes(in, out, 4096, true);
		System.out.println();
	}
}

在这里插入图片描述
FSDataOutputStream对象。有FSDataInputStream类似的方法。但HDFS只允许对一个已打开的文件顺序写入,或者现有文件的末尾追加数据。