第 1 章 HDFS 概述
定义
是一个文件系统,是分布式的
使用场景
适合一次写入,多次读出的场景
优点
高容错性、适合处理大数据、可构建在廉价机器上
缺点
不适合低延时数据访问、无法高效的对大量小文件进行存储、不支持并发写入和文件随机修改
组成架构
1)NameNode(nn):就是Master,它是一个主管、管理者。
(1)管理HDFS的名称空间;
(2)配置副本策略;
(3)管理数据块(Block)映射信息;
(4)处理客户端读写请求。
2)DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。
(1)存储实际的数据块;
(2)执行数据块的读/写操作。
3)Client:就是客户端。
(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传;
(2)与NameNode交互,获取文件的位置信息;
(3)与DataNode交互,读取或者写入数据;
(4)Client提供一些命令来管理HDFS,比如NameNode格式化;
(5)Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作;
4)Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。
(1)辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode ;
(2)在紧急情况下,可辅助恢复NameNode。
HDFS 文件块大小
分块存储:
块的大小( dfs.blocksize ):默认在Hadoop2.x/3.x版本中是128M,1.x版本中是64M。
寻址时间
若约为10ms,即查找到目标block的时间为10ms。
寻址时间为传输时间的1%时,则为最佳状态。
因此,传输时间=10ms/0.01=1000ms=1s
而目前磁盘的传输速率普遍为100MB/s。
block大小=1s*100MB/s=100MB
为什么块的大小不能设置太小,也不能设置太大?
(1)HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置
(2)如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时,会非常慢。
总结:HDFS块的大小设置主要取决于磁盘传输速率。
第 2 章 HDFS 的 API 操作
2.1 客户端环境准备
配置 Path 环境变量
配置【 非中文路径\hadoop-3.1.0\bin 】到环境变量
Maven依赖
<dependencies>
# hadoop包
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
配置log4j.properties
在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
2.2 HDFS 的 API 案例实操
参数优先级
越靠近代码的越优先
客户端代码中设置的值 >
ClassPath 下的用户自定义配置文件 >
服务器的自定义配置(xxx-site.xml)>
服务器的默认配置(xxx-default.xml)
@Before 前置操作: 获取文件系统
// HdfsClient: 操作Hdfs文件系统
public class HdfsClient {
private FileSystem fs;
// 前置操作:获取文件系统
@Before
public void init() throws URISyntaxException, IOException, InterruptedException {
Configuration configuration = new Configuration();
// 连接的集群nn地址
fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration,"azula");
}
//
...
}
FileSystem.get()源码
public static FileSystem get(final URI uri, final Configuration conf, String user) throws IOException, InterruptedException {
String ticketCachePath = conf.get("hadoop.security.kerberos.ticket.cache.path");
UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user);
return (FileSystem)ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return FileSystem.get(uri, conf);
}
});
}
@After 后置操作: 关闭文件系统
@After
public void close() throws IOException {
// 3 关闭资源
fs.close();
}
目录的创建
@Test
public void testMkdirs() throws IOException{
// 2 执行的操作:创建目录
fs.mkdirs(new Path("/xiyou/huaguoshan/"));
}
文件的上传
@Test
public void testPut() throws IOException {
// void copyFromLocalFile(Path, Path)
// void copyFromLocalFile(boolean, Path, Path)
// void copyFromLocalFile(boolean, boolean, Path[], Path)
// void copyFromLocalFile(boolean, boolean, Path, Path)
// 参数一:表示删除原数据,参数二:是否允许覆盖,参数三:原数据路径,参数四:目的地路径
fs.copyFromLocalFile(false,true,new Path("D:\\sunwukong.txt"),new Path("hdfs://hadoop102/xiyou/huaguoshan"));
}
fs.copyFromLocalFile源码:copyFromLocalFile=>copy
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException {
Configuration conf = this.getConf();
FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
}
public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite, Configuration conf) throws IOException {
FileStatus fileStatus = srcFS.getFileStatus(src);
return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
}
public static boolean copy(FileSystem srcFS, FileStatus srcStatus, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite, Configuration conf) throws IOException {
Path src = srcStatus.getPath();
dst = checkDest(src.getName(), dstFS, dst, overwrite);
FileStatus[] contents;
if (srcStatus.isDirectory()) {
checkDependencies(srcFS, src, dstFS, dst);
if (!dstFS.mkdirs(dst)) {
return false;
}
contents = srcFS.listStatus(src);
for(int i = 0; i < contents.length; ++i) {
copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getPath().getName()), deleteSource, overwrite, conf);
}
} else {
contents = null;
OutputStream out = null;
try {
InputStream in = srcFS.open(src);
out = dstFS.create(dst, overwrite);
IOUtils.copyBytes(in, out, conf, true);
} catch (IOException var11) {
IOUtils.closeStream(out);
IOUtils.closeStream(contents);
throw var11;
}
}
return deleteSource ? srcFS.delete(src, true) : true;
}
文件下载
@Test
public void testGet() throws IOException {
// void copyToLocalFile(boolean, Path, Path, boolean)
// 原文件是否生成,原文件路径HDFS 目标地址路径Win
fs.copyToLocalFile(false,new Path("hdfs://hadoop102/xiyou/huaguoshan"),new Path("D:\\sunwukong.txt"));
}
fs.copyToLocalFile源码:copyToLocalFile=>copy
public void copyToLocalFile(boolean delSrc, Path src, Path dst, boolean useRawLocalFileSystem) throws IOException {
Configuration conf = this.getConf();
FileSystem local = null;
if (useRawLocalFileSystem) {
local = getLocal(conf).getRawFileSystem();
} else {
local = getLocal(conf);
}
FileUtil.copy(this, src, (FileSystem)local, dst, delSrc, conf);
}
文件的删除
@Test
public void testRm() throws IOException {
// boolean delete(Path, boolean)
//要删除的路径 是否递归删除
fs.delete(new Path("hdfs://hadoop102/jinguo"),true);
}
文件的改名和移动
@Test
public void testmv() throws IOException {
//public abstract boolean rename(
// org.apache.hadoop.fs.Path path,
// org.apache.hadoop.fs.Path path1 )
// 原文件路径 目标文件路径
// fs.rename(new Path("/xiyou/huaguoshan/sunwukong.txt"),new Path("/sunwukong.txt"));
fs.rename(new Path("/sunwukong.txt"),new Path("/xiyou/huaguoshan/swk.txt"));
}
获取文件详细信息
//获取文件详细信息
@Test
public void fileDetail() throws IOException {
//获取所有文件信息
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
//遍历文件
while (listFiles.hasNext()){
LocatedFileStatus next = listFiles.next();
System.out.println("===="+next.getPath()+"====");
System.out.println(next.getPermission());
System.out.println(next.getOwner());
System.out.println(next.getModificationTime());
System.out.println(next.getGroup());
System.out.println(next.getBlockSize());
System.out.println(next.getAccessTime());
BlockLocation[] blockLocations = next.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
}
判断是文件还是文件夹
@Test
public void testListStatus() throws IOException{
// 判断是文件还是文件夹
/**
* public abstract org.apache.hadoop.fs.FileStatus[] listStatus( org.apache.hadoop.fs.Path path )
*/
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
// 如果是文件
if (fileStatus.isFile()) {
System.out.println("f:"+fileStatus.getPath().getName());
}else {
System.out.println("d:"+fileStatus.getPath().getName());
}
}
}
第 3 章 HDFS 的读写流程
3.1 HDFS 写数据流程
(1)客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
(2)NameNode 返回是否可以上传。
(3)客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。
(4)NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。
(5)客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。
(6)dn1、dn2、dn3 逐级应答客户端。
(7)客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet会放入一个应答队列等待应答。
(8)当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务器。(重复执行 3-7 步)。
3.2 节点距离计算
0(同一节点上的进程)
2(同一机架上的不同节点)
4(同一数据中心不同机架上的节点)
6(不同数据中心的节点)
3.3 副本节点选择
- 第一个副本在Client所处的节点上。如果客户端在集群外,随机选一个。
- 第二个副本在另一个机架的随机一个节点
- 第三个副本在第二个副本所在机架的随机节点
3.4 HDFS 读数据流程
(1)客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。
(2)挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
(3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。
(4)客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。
第 4 章 NameNode 和 SecondaryNameNode
4.1 NN 和 2NN 工作机制
思考:NameNode 中的元数据是存储在哪里的?
-
如果存储在 NameNode 节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。
==> 因此产生在磁盘中备份元数据的FsImage。
-
当在内存中的元数据更新时,如果同时更新 FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦 NameNode 节点断电,就会产生数据丢失。
==> 因此,引入 Edits 文件,只进行追加操作,效率很高。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到 Edits 中。这样,一旦 NameNode 节点断电,以通过 FsImage 和 Edits 的合并,合成元数据。
-
如果长时间添加数据到 Edits 中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。
==> 因此,需要定期进行 FsImage 和 Edits 的合并
-
如果定期进行 FsImage 和 Edits 的合并的操作由NameNode 节点完成,又会效率过低。
==> 因此,引入一个新的节点 SecondaryNamenode,专门用于 FsImage 和 Edits 的合并。
- 1)第一阶段:NameNode 启动
- NameNode 格式化 => 创建 Fsimage 和 Edits 文件
- 若不是第一次启动 => 加载编辑日志和镜像文件到内存
- => 客户端对元数据进行增删改的请求
- => NameNode 记录操作日志,更新滚动日志
- => NameNode 在内存中对元数据进行增删改
- NameNode 格式化 => 创建 Fsimage 和 Edits 文件
- 2)第二阶段:Secondary NameNode 工作
- 2NN 询问 NN 是否需要 CheckPoint
- 若需要 CheckPoint
- => 2NN 请求执行 CheckPoint
- => NN 滚动正在写的 Edits 日志
- => 将滚动前的编辑日志和镜像文件拷贝到 2NN
- => 2NN 加载编辑日志和镜像文件到内存,并合并
- => 生成新的镜像文件 fsimage.chkpoint
- => 拷贝 fsimage.chkpoint 到 NameNode
- => NameNode 将 fsimage.chkpoint 重新命名成 fsimage
4.2 Fsimage 和 Edits 解析
NameNode被格式化之后,将在/opt/module/hadoop-3.1.3/data/tmp/dfs/name/current目录中产生如下文件:
fsimage_0000000000000000000
fsimage_0000000000000000000.md5
seen_txid
VERSION
(1)Fsimage文件:HDFS文件系统元数据的一个永久性的检查点
=> 其中包含HDFS文件系统的所有目录和文件inode的序列化信息。
(2)Edits文件:存放HDFS文件系统的所有更新操作的路径
=> 文件系统客户端执行的所有写操作首先会被记录到Edits文件中。
(3)seen_txid文件保存的是一个数字
=> 就是最后一个edits_的数字
(4)每次NameNode启动的时候都会将Fsimage文件读入内存,加载Edits里面的更新操作,保证内存中的元数据信息是最新的、同步的
=> 可以看成NameNode启动的时候就将Fsimage和Edits文件进行了合并。
4.3 CheckPoint 时间设置
1)通常情况下,SecondaryNameNode 每隔一小时执行一次。
2)一分钟检查一次操作次数,当操作次数达到 1 百万时,SecondaryNameNode 执行一次。
在 [hdfs-default.xml] 中配置
第 5 章 DataNode
5.1 DataNode 工作机制
-
一个数据块在 DataNode 上以文件形式存储在磁盘上
=> 包括两个文件
一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。 -
DataNode 启动后向 NameNode 注册
-
通过后,周期性(6 小时)的向 NameNode 上报所有的块信息。
DN 向 NN 汇报当前解读信息的时间间隔,默认 6 小时;
DN 扫描自己节点块信息列表的时间,默认 6 小时
-
-
心跳是每 3 秒一次
- 心跳返回结果带有 NameNode 给该 DataNode 的命令
- 如复制块数据到另一台机器,或删除某个数据块。
- 如果超过 10 分钟没有收到某个DataNode 的心跳,则认为该节点不可用。
- 心跳返回结果带有 NameNode 给该 DataNode 的命令
-
集群运行中可以安全加入和退出一些机器。
5.2 数据完整性
DataNode 节点保证数据完整性的方法
- 当 DataNode 读取 Block 的时候,它会计算 CheckSum。
如果计算后的 CheckSum,与 Block 创建时值不一样,说明 Block 已经损坏。 - DataNode 在其文件创建后,定期验证 CheckSum。
- Client 读取其他 DataNode 上的 Block。
- 常见的校验算法 crc(32),md5(128),sha1(160)
5.3 掉线时限参数设置
-
DataNode进程死亡或者网络故障造成DataNode无法与NameNode通信。
NameNode不会立即把该节点判定为死亡,要经过一段时间,称作超时时长。
- HDFS默认的超时时长为10分钟+30秒。
- 如果定义超时时间为TimeOut,则超时时长的计算公式为:
TimeOut = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval。 - 默认的dfs.namenode.heartbeat.recheck-interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
- hdfs-site.xml 配置文件中的 heartbeat.recheck.interval 的单位为毫秒,
dfs.heartbeat.interval 的单位为秒。
常见的校验算法 crc(32),md5(128),sha1(160)
5.3 掉线时限参数设置
-
DataNode进程死亡或者网络故障造成DataNode无法与NameNode通信。
NameNode不会立即把该节点判定为死亡,要经过一段时间,称作超时时长。
- HDFS默认的超时时长为10分钟+30秒。
- 如果定义超时时间为TimeOut,则超时时长的计算公式为:
TimeOut = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval。 - 默认的dfs.namenode.heartbeat.recheck-interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
- hdfs-site.xml 配置文件中的 heartbeat.recheck.interval 的单位为毫秒,
dfs.heartbeat.interval 的单位为秒。