淘先锋技术网

首页 1 2 3 4 5 6 7

1、将文件复制到HDFS

package com.kevin.hadoop;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileCopy {
	public static void main (String args[])
			throws Exception{
		if(args.length != 2)
		{
			System.err.println("Usage: filecopy <src> <des>");
			System.exit(2);
		}
		Configuration conf = new Configuration();
		InputStream in = new BufferedInputStream(new FileInputStream(args[0]));//specify the input stream with buffer by the first argument
		FileSystem fs = FileSystem.get(URI.create(args[1]), conf);//Returns the FileSystem for this URI's scheme and authority
		OutputStream out = fs.create(new Path(args[1]));//
		IOUtils.copyBytes(in, out, 4096,true);
	}

}

new Configuration():configuration对象封装了服务器端、客户端配置信息,通过读取xml配置文件获取HDFS文件系统

BufferedInputStream(new FileInputStream(args[0])):InputStream不可以读取文件,它是一个Abstract的类,根本不可能实例化,是所有输入流的基类。FileInputStream是字节流,BufferedInputStream是字节缓冲流,使用BufferedInputStream读资源比FileInputStream读取资源的效率高(BufferedInputStream的read方法会读取尽可能多的字节),FileInputStream是读取一个文件来作InputStream。所以你可以把BufferedInputStream套在FileInputStream外,来改善FileInputStream的性能

FileSystem.get(URI.create(args[1]), conf):参考Filesystem.get()

public static FileSystem get(Configuration conf) throws IOException 
public static FileSystem get(URI uri, Configuration conf) throws IOException 
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException
第一个方法返回一个默认的文件系统(在conf/core-site.xml中通过fs.default.name来指定的,如果在conf/core-site.xml中没有设置则返回本地文件系统)
第二个方法通过uri来指定要返回的文件系统(例如,如果uri是上个测试例子中的hdfs://localhost/user/tom/quangle.txt,也即以hdfs标识开头,那么就返回一个hdfs文件系统,如果uri中没有相应的标识则返回本地文件系统)
第三个方法返回文件系统的机理同(2)是相同的,但它同时又限定了该文件系统的用户,这在安全方面是很重要的

IOUtils.copyBytes(in, out, 4096,true):IOUtils.copyBytes(),其中in表示拷贝源,System.out表示拷贝目的地(也就是要拷贝到标准输出中去),4096表示用来拷贝的buffer大小,false表明拷贝完成后我们并不关闭拷贝源可拷贝目的地

注意运行方式:1)通过eclipse来运行,run as->run configurations->new launch configuration->main:project,main class; argument: hdfs://master:9000/user/root hdfs://master:9000/user/root/output,注意输出目录不能是已经存在的以及注意要添加主类名称.2)通过hadoop jar命令行的方式运行,从eclipse导出jar包:右击项目、export、java、jar,命令 hadoop jar exportPackageName packageName.className argument1 argument2

2、从HDFS删除文件

package com.kevin.hadoop;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileDelete {
	public static void main(String[] args)throws Exception{
		if(args.length != 1)
		{
			System.out.println("Usage: filedelete <traget>");
			System.exit(2);
		}
		
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
		fs.delete(new Path(args[0]), true);
	}

}

fs.delete(new Path(args[0]), false):删除目录或文件,第二个参数表示是否递归

3、输出文件内容到标准输出

package com.kevin.hadoop;

import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileCat {
	public static void main(String[] args)throws Exception{
		if(1 != args.length)
		{
			System.out.println("Usage: fileCat <source>");
			System.exit(2);
		}
		
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
		InputStream in = null;
		try{
			in = fs.open(new Path(args[0]));
			IOUtils.copyBytes(in, System.out, 4096, false);
			}finally{
				IOUtils.closeStream(in);
			}
		}
}

4、获取和输出文件长度,块大小,备份,修改时间,所有者,权限等信息

package com.kevin.hadoop;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileInfo {
	public static void main(String[] args)throws Exception
	{
		if(1 != args.length)
		{
			System.err.println("Usage: fileInfo <source>");
			System.exit(2);
		}
		
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
		FileStatus stat = fs.getFileStatus(new Path(args[0]));
		
		System.out.println(stat.getPath());
		System.out.println(stat.getLen());
		System.out.println(stat.getModificationTime());
		System.out.println(stat.getOwner());
		System.out.println(stat.getReplication());
		System.out.println(stat.getBlockSize());
		System.out.println(stat.getGroup());
		System.out.println(stat.getPermission().toString());
	}

}

fs.getFileStatus(new Path(args[0])):返回FileStatus对象,表示该目录或文件的状态,保存有文件的各种详细信息

5、显示当前目录下的文件

package com.kevin.hadoop;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

public class FileList {
	public static void main(String[] args)throws Exception
	{
		if(1 != args.length)
		{
			System.err.println("Usage: filelist <source>");
			System.exit(2);
		}
		
		Configuration conf = new Configuration(); 
		FileSystem fs = FileSystem.get(URI.create(args[0]),conf);
		FileStatus[] stat = fs.listStatus(new Path(args[0]));
		Path[] listedPaths = FileUtil.stat2Paths(stat);
		for(Path p:listedPaths)
		{
			System.out.println(p);
		}
	}

}
stat2Paths(status):将FileStatus对象转换成Path