HDFS
HDFS的组成
- NameNode:管理HDFS的名称空间,配置副本策略,管理数据块的映射信息,处理客户端的读写请求。
- DataNode:存储实际的数据块,执行数据块的读写操作。
- Client:客户端。文件上传时Client将文件切分成数据块然后上传,与NameNode交互获取文件位置信息,与DataNode交互读写数据,Client提供一些命令来管理HDFS例如NameNode格式化,通过一些命令来访问HDFS。
- Secondary NameNode:辅助NameNode定期合并Fsimage和Edits,紧急情况下辅助恢复NameNode。(不是NameNode的热备份)
热备份:备份处于联机状态,当前应用系统通过高速通信设备将实时数据传递到备份系统,保持备份系统与当前应用系统数据的同步。(实时备份)
温备份:将备份系统已安装配置成与当前使用的系统相同或者相似的系统和网络运行环境,安装了应用系统业务定期备份数据。(定期备份)
冷备份:备份系统未安装或未配置成与当前使用的系统相似或相同的运行环境,应用系统数据没有及时装入备份系统。(软件安装配置时备份)
NameNode与SecondaryNameNode
NN与2NN工作机制
- NameNode启动:
- 第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存;
- 客户端对元数据进行增删改的请求;
- NameNode记录操作日志,更新滚动日志;
- NameNode在内存中对元数据进行增删改。
- SecondaryNameNode工作:
- SecondaryNameNode 询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果;
- SecondaryNameNode 请求执行CheckPoint;
- NameNode滚动正在写的Edits日志;
- 将滚动前的编辑日志和镜像文件拷贝到SecondaryNameNode;
- 生成新的镜像文件Fsimage.ckpoint;
- 拷贝Fsimage.ckpoint到 NameNode;
- NameNode将fsimage.ckpoint到NameNode;
- NameNode将fsimage.ckpoint重新命名成fsimage。
Fsimage和Edits解析
- Fsimage文件:HDFS文件系统元数据的一个永久性的检查点,其中包含HDFS文件系统的所有目录和文件的序列化信息。
- Edits文件:存放HDFS文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到Edits文件中。
- 每次NameNode启动时都会将Fsimage文件读入内存,加载Edits里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NameNode启动时就将Fsimage和Edits文件进行了合并。
CheckPoint时间设置
通常情况下,SecondaryNameNode每隔一小时执行一次
一分钟检查一次操作次数,当操作次数达到一百万时,SecondaryNameNode执行一次。
DataNode
DataNode工作机制
- 一个数据块在DataNode上以文件的形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和以及时间戳。
- DataNode启动后先向NameNode注册,之后每隔6小时向NameNode上报所有的块信息。
- DataNode通过心跳向NameNode报告自己还在工作,心跳是3秒一次,心跳的返回结果带有NameNode给数据节点的命令。
- 如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
掉线时限参数设置
- 超时时长:DataNode进程死亡或者网络故障造成DataNode 与NameNode无法通信,NameNode要经过一段时间才会把该节点判定为死亡,这段时间称为超时时长。
- 超时时长 = 2 *dfs.namenode.heartbeat.recheck-interval + 10 *dfs.heartbeat.interval
默认:dfs.namenode.heartbeat.recheck-interval = 5分钟
dfs.heartbeat.interval = 3秒
文件块
- HDFS 中的文件在物理上是分块存储的,默认大小是128M。
- HDFS块大小的设置主要取决于磁盘传输速率。块太小会增加寻址时间,块太大从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。
HDFS读写流程
节点距离计算:两个节点到达最近的共同祖先的距离总和。
即:同一节点上的进程距离为0,同一机架上的不同节点为2,同一数据中心不同机架上的节点为4,不同数据中心的节点为6.
机架感知(副本存储节点选择):第一副本在Client所处的节点上,如果Client在集群外,随机选一个;第二副本在另一个机架的随机一个节点;第三副本在第二副本所在机架的随机节点。
HDFS读数据
- 客户端通过DistributedFileSystem 向 NameNode请求下载文件,NameNode通过查询元数据找到文件块所在的DataNode地址;
- 挑选一台DataNode服务器,请求读取数据;
- DataNode开始传输数据给客户端;
- 客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
HDFS写数据
- 客户端通过DistributedFileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已经存在,父目录是否存在;
- NameNode返回是否可以上传;
- 客户端请求第一个Block上传到哪几个DataNode服务器上;
- NameNode返回3个DataNode节点,分别dn1、dn2、dn3;
- 客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成;
- dn1、dn2、dn3逐级应答客户端;
- 客户端开始往dn1上传第一个Block,以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答;
- 当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。
HDFS编程
Shell编程
- 基本语法:hadoop fs 具体命令 或者 hdfs dfs 具体命令
- 命令:
- 上传:
# 从本地剪切粘贴到HDFS
hadoop fs -moveFromLocal filePath
# 从本地文件系统中拷贝文件到HDFS
hadoop fs -copyFromLocal filePath
# 从本地文件系统中拷贝文件到HDFS
hadoop fs -put filePath
#追加一个文件到已经存在的文件末尾
hadoop fs -appendToFile filePath
- 下载:
# 从HDFS拷贝到本地
hadoop fs -copyToLocal filePath
# 从HDFS拷贝到本地
hadoop fs -get filePath
- HDFS直接操作
# 显示目录信息
hadoop fs -ls path
#显示文件内容
hadoop fs -cat filename
# 修改文件权限
hadoop fs -chmod/-chown/-chgrp 777 filename
# 创建路径
hadoop fs -mkdir path
# 拷贝文件
hadoop fs -cp sourcePath targetPath
# 移动文件
hadoop fs -mv sourcePath targetPath
# 显示文件末尾数据
hadoop fs -tail filePath
# 删除文件
hadoop fs -rm filePath
# 递归删除文件
hadoop fs -rm -r filePath
# 统计文件夹大小信息
hadoop fs -du -s -h filePath
API编程(一般使用Java)
-
使用Java集成开发环境,(笔者使用的是IntelliJ IDEA)创建maven工程
-
添加依赖:在pom.xml中添加以下内容
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
- 设置日志:在resources下创建文件 log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- 创建包,新建类
- 判断文件是否存在
@Test
public void testExsits() throws URISyntaxException, IOException, InterruptedException {
URI uri = new URI("hdfs://hadoop102:8020"); // HDFS运行节点的名称和端口号
Configuration configuration = new Configuration();
String user = "yss"; //设置自己的用户名
//获取客户端对象
FileSystem fs = FileSystem.get(uri,configuration,user);
//判断文件是否存在
if (fs.exists(new Path("/putfile"))) {
System.out.println("putfile存在");
}else {
System.out.println("putfile不存在");
}
fs.close();
}
- 上传文件
@Test
public void testput() throws URISyntaxException, IOException, InterruptedException {
URI uri = new URI("hdfs://hadoop102:8020");
Configuration configuration = new Configuration();
String user = "yss";
//获取一个客户端对象
FileSystem fs = FileSystem.get(uri,configuration,user);
//上传文件
fs.copyFromLocalFile(false,false,new Path("E:\\putfile\\mapred_input.txt"),new Path("\\"));
//关闭客户端
fs.close();
- 下载文件
@Test
public void testGet() throws URISyntaxException, IOException, InterruptedException {
URI uri = new URI("hdfs://hadoop102:8020");
Configuration configuration = new Configuration();
String user = "yss";
FileSystem fs = FileSystem.get(uri,configuration,user);
fs.copyToLocalFile(false,new Path("/hadoop_hdfs"),new Path("E:\\putfile"),false);
fs.close();
}
- 读取文件内容
@Test
public void testResd() throws URISyntaxException, IOException, InterruptedException {
URI uri = new URI("hdfs://hadoop102:8020");
Configuration configuration = new Configuration();
String user = "yss";
FileSystem fs = FileSystem.get(uri,configuration,user);
FSDataInputStream dataInputStream = fs.open(new Path("/putfile/putfile.txt"));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
System.out.println(line);
}
bufferedReader.close();
dataInputStream.close();
fs.close();
}
- 写文件
@Test
public void testWrite() throws URISyntaxException, IOException, InterruptedException {
URI uri = new URI("hdfs://hadoop102:8020");
Configuration configuration = new Configuration();
String user = "yss";
FileSystem fs = FileSystem.get(uri,configuration,user);
byte[] buff = "这里是文件内容".getBytes(StandardCharsets.UTF_8);
FSDataOutputStream fsDataOutputStream = fs.create(new Path("/text.txt"));
fsDataOutputStream.write(buff,0,buff.length);
fsDataOutputStream.close();
fs.close();
}
- 显示文件的详细内容
@Test
public void testFiledetails() throws URISyntaxException, IOException, InterruptedException {
URI uri = new URI("hdfs://hadoop102:8020");
Configuration configuration = new Configuration();
String user = "yss";
FileSystem fs = FileSystem.get(uri,configuration,user);
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/putfile"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("文件名字:"+fileStatus.getPath().getName());
System.out.println("读写权限:"+fileStatus.getPermission());
System.out.println("大小:"+fileStatus.getBlockSize());
System.out.println("创建时间:"+fileStatus.getAccessTime());
System.out.println("路径:"+fileStatus.getPath());
}
fs.close();
}
可以在网页查看HDFS目录