背景
前段时间我们的集群在写入block时有如下报错:
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
File /tmp/xxxx.tmp could only be replicated to 0 nodes instead of minReplication (=1).
There are 983 datanode(s) running and 983 node(s) are excluded in this operation.
虽然集群容量确实使用到了百分之九十多,但也不至于近千个节点全部都满了,所以看了一下相关代码,也总结了一下Hadoop的副本存放策略,我们这个集群使用的hadoop版本是2.6
HDFS副本存放策略
- 1st replica 如果写请求client所在机器是其中一个datanode,则直接存放在本地,否则随机在集群中选择一个datanode.
- 2nd replica 第二个副本存放于不同第一个副本的所在的机架.
- 3rd replica 第三个副本存放于第二个副本所在的机架,但是属于不同的节点
- 4rd replica 第四个副本或更多副本随机选择datanode节点进行存储
源码分析
BlockManager.java
可以看到以上的报错就是下面这个方法,接下来看chooseTarget方法
/**
* Choose target datanodes for creating a new block.
*
* @throws IOException
* if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
* Set, long, List, BlockStoragePolicy)
*/
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
final int numOfReplicas, final Node client,
final Set<Node> excludedNodes,
final long blocksize,
final List<String> favoredNodes,
final byte storagePolicyID) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
// 调用blockplacement的chooseTarget方法
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize,
favoredDatanodeDescriptors, storagePolicy);
// 选择的目标节点数量不足,则会抛出IO异常
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to "
+ targets.length + " nodes instead of minReplication (="
+ minReplication + "). There are "
+ getDatanodeManager().getNetworkTopology().getNumOfLeaves()
+ " datanode(s) running and "
+ (excludedNodes == null? "no": excludedNodes.size())
+ " node(s) are excluded in this operation.");
}
return targets;
}
BlockPlacementPolicyDefault.java
/** This is the implementation. */
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosenStorage,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy) {
// 副本数为0或datanode数量为0,返回空数组
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return DatanodeStorageInfo.EMPTY_ARRAY;
}
// 初始化排除节点列表
if (excludedNodes == null) {
excludedNodes = new HashSet<Node>();
}
// 计算每个机架允许分配的最大副本数
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
// 初始化结果节点列表
final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
for (DatanodeStorageInfo storage : chosenStorage) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
// 调用chooseTarget方法选择节点
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty());
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}
// sorting nodes to form a pipeline
return getPipeline(
(writer != null && writer instanceof DatanodeDescriptor) ? writer
: localNode,
results.toArray(new DatanodeStorageInfo[results.size()]));
}
getMaxNodesPerRack方法(Calculate the maximum number of replicas to allocate per rack.)
private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
int clusterSize = clusterMap.getNumOfLeaves();
int totalNumOfReplicas = numOfChosen + numOfReplicas;
if (totalNumOfReplicas > clusterSize) {
numOfReplicas -= (totalNumOfReplicas-clusterSize);
totalNumOfReplicas = clusterSize;
}
// No calculation needed when there is only one rack or picking one node.
int numOfRacks = clusterMap.getNumOfRacks();
if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
return new int[] {numOfReplicas, totalNumOfReplicas};
}
int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2;
// At this point, there are more than one racks and more than one replicas
// to store. Avoid all replicas being in the same rack.
//
// maxNodesPerRack has the following properties at this stage.
// 1) maxNodesPerRack >= 2
// 2) (maxNodesPerRack-1) * numOfRacks > totalNumOfReplicas
// when numOfRacks > 1
//
// Thus, the following adjustment will still result in a value that forces
// multi-rack allocation and gives enough number of total nodes.
if (maxNodesPerRack == totalNumOfReplicas) {
maxNodesPerRack--;
}
return new int[] {numOfReplicas, maxNodesPerRack};
}
chooseTarget方法,这里就是最主要的选择逻辑
private Node chooseTarget(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final BlockStoragePolicy storagePolicy,
final EnumSet<StorageType> unavailableStorages,
final boolean newBlock) {
// 如果额外需要请求副本数为0,或者集群中没有可选节点
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
// 如果writer是datanode则直接返回writer,否则返回null
return (writer instanceof DatanodeDescriptor) ? writer : null;
}
// 已经选择完成的节点数
final int numOfResults = results.size();
// 期望达到的副本总数
final int totalReplicasExpected = numOfReplicas + numOfResults;
// 如果writer为空或不是datanode且不是新的block,则取出已选择节点列表中的第一个,赋值给writer
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0).getDatanodeDescriptor();
}
// Keep a copy of original excludedNodes
final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);
// choose storage types; use fallbacks for unavailable storages
final List<StorageType> requiredStorageTypes = storagePolicy
.chooseStorageTypes((short) totalReplicasExpected,
DatanodeStorageInfo.toStorageTypes(results),
unavailableStorages, newBlock);
final EnumMap<StorageType, Integer> storageTypes =
getRequiredStorageTypes(requiredStorageTypes);
if (LOG.isTraceEnabled()) {
LOG.trace("storageTypes=" + storageTypes);
}
try {
// 如果请求的存储类型列表为空,则抛出异常
if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
throw new NotEnoughReplicasException(
"All required storage types are unavailable: "
+ " unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy);
}
//如果numOfResults == 0 则表示副本一个都还没开始选,首先从选本地节点开始
if (numOfResults == 0) {
// 选择本地存储作为第一个节点
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
.getDatanodeDescriptor();
// 如果此时需求的副本数为降为0,代表选择目标完成,返回第一个节点writer
if (--numOfReplicas == 0) {
return writer;
}
}
// 取出第一个选好的节点
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
// 选择与第一个节点不同机架的节点
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
// 取出第二个选好的节点
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
if (clusterMap.isOnSameRack(dn0, dn1)) {
// 如果第一个节点和第二个节点在同一个机架,则在另一个机架选第三个节点
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else if (newBlock){
// 第一个节点和第二个节点不在一个机架,且是新block,则在第二个节点所在机架选第三个节点
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else {
// 在本地机架选节点
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
// 如果副本数超过3个,则剩下的随机选节点
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e) {
final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected
+ " (unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy
+ ", newBlock=" + newBlock + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(message, e);
} else {
LOG.warn(message + " " + e.getMessage());
}
if (avoidStaleNodes) {
// Retry chooseTarget again, this time not avoiding stale nodes.
// excludedNodes contains the initial excludedNodes and nodes that were
// not chosen because they were stale, decommissioned, etc.
// We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above.
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
}
// Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock);
}
boolean retry = false;
// simply add all the remaining types into unavailableStorages and give
// another try. No best effort is guaranteed here.
for (StorageType type : storageTypes.keySet()) {
if (!unavailableStorages.contains(type)) {
unavailableStorages.add(type);
retry = true;
}
}
if (retry) {
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
oldExcludedNodes);
}
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock);
}
}
return writer;
}
副本选择降级处理
首先选择本地存储位置.如果不满足条件,再选择本地机架的其他节点,如果还是没有满足条件的,进一步降级选择不同机架的节点,最后随机选择集群中的节点,关系图如下:
chooseRandom
无论是在本地机架还是远程机架选择节点,都会调用chooseRandom方法
protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
String scope,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException {
// 取出节点数
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
scope, excludedNodes);
StringBuilder builder = null;
if (LOG.isDebugEnabled()) {
builder = debugLoggingBuilder.get();
builder.setLength(0);
builder.append("[");
}
boolean badTarget = false;
DatanodeStorageInfo firstChosen = null;
// 开始循环
while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
// 随机选择一个节点
DatanodeDescriptor chosenNode =
(DatanodeDescriptor)clusterMap.chooseRandom(scope);
// 添加进excludedNodes
if (excludedNodes.add(chosenNode)) { //was not in the excluded list
if (LOG.isDebugEnabled()) {
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
}
// 节点数减一
numOfAvailableNodes--;
final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
chosenNode.getStorageInfos());
int i = 0;
boolean search = true;
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
.entrySet().iterator(); search && iter.hasNext(); ) {
Map.Entry<StorageType, Integer> entry = iter.next();
for (i = 0; i < storages.length; i++) {
StorageType type = entry.getKey();
// 判断目标节点是否符合条件
final int newExcludedNodes = addIfIsGoodTarget(storages[i],
excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
avoidStaleNodes, type);
// 符合条件
if (newExcludedNodes >= 0) {
// 副本数减一
numOfReplicas--;
if (firstChosen == null) {
firstChosen = storages[i];
}
numOfAvailableNodes -= newExcludedNodes;
int num = entry.getValue();
if (num == 1) {
iter.remove();
} else {
entry.setValue(num - 1);
}
search = false;
break;
}
}
}
if (LOG.isDebugEnabled()) {
builder.append("\n]");
}
// If no candidate storage was found on this DN then set badTarget.
badTarget = (i == storages.length);
}
}
// 如果剩余副本数大于0,则抛出NotEnoughReplicasException异常
if (numOfReplicas>0) {
String detail = enableDebugLogging;
if (LOG.isDebugEnabled()) {
if (badTarget && builder != null) {
detail = builder.toString();
builder.setLength(0);
} else {
detail = "";
}
}
throw new NotEnoughReplicasException(detail);
}
return firstChosen;
}
addIfIsGoodTarget方法(If the given storage is a good target, add it to the result list and update the set of excluded nodes.)
int addIfIsGoodTarget(DatanodeStorageInfo storage,
Set<Node> excludedNodes,
long blockSize,
int maxNodesPerRack,
boolean considerLoad,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType) {
// 调用isGoodTarget方法
if (isGoodTarget(storage, blockSize, maxNodesPerRack, considerLoad,
results, avoidStaleNodes, storageType)) {
// 如果符合条件,则把节点添加进results和excludedNodes
results.add(storage);
// add node and related nodes to excludedNode
return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
} else {
return -1;
}
}
isGoodTarget方法(Determine if a storage is a good target.)
private boolean isGoodTarget(DatanodeStorageInfo storage,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType requiredStorageType) {
// 判断存储类型是否符合
if (storage.getStorageType() != requiredStorageType) {
logNodeIsNotChosen(storage, "storage types do not match,"
+ " where the required storage type is " + requiredStorageType);
return false;
}
// storage是否只读
if (storage.getState() == State.READ_ONLY_SHARED) {
logNodeIsNotChosen(storage, "storage is read-only");
return false;
}
// storage是否failed
if (storage.getState() == State.FAILED) {
logNodeIsNotChosen(storage, "storage has failed");
return false;
}
DatanodeDescriptor node = storage.getDatanodeDescriptor();
// check if the node is (being) decommissioned
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
logNodeIsNotChosen(storage, "the node is (being) decommissioned ");
return false;
}
// 节点是否stale
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
logNodeIsNotChosen(storage, "the node is stale ");
return false;
}
}
// 判断容量是否足够
final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
final long remaining =
node.getRemaining(storage.getStorageType(), requiredSize);
// 如果remaining < requiredSize+scheduledSize,则不符合条件
if (requiredSize > remaining - scheduledSize) {
logNodeIsNotChosen(storage, "the node does not have enough "
+ storage.getStorageType() + " space"
+ " (required=" + requiredSize
+ ", scheduled=" + scheduledSize
+ ", remaining=" + remaining + ")");
return false;
}
// check the communication traffic of the target machine
if (considerLoad) {
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
final int nodeLoad = node.getXceiverCount();
// 如果节点负载大于集群平均负载的2倍,则不符合条件
if (nodeLoad > maxLoad) {
logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad
+ " > " + maxLoad + ") ");
return false;
}
}
// check if the target rack has chosen too many nodes
String rackname = node.getNetworkLocation();
int counter=1;
for(DatanodeStorageInfo resultStorage : results) {
if (rackname.equals(
resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
counter++;
}
}
if (counter>maxTargetPerRack) {
logNodeIsNotChosen(storage, "the rack has too many chosen nodes ");
return false;
}
return true;
}
副本存储节点需要满足的条件
1、storage type要与请求的storage type相同
2、storage状态不能为read-only
3、storage状态不能为failed
4、节点不能是decommissioned
5、节点不能是stale
6、剩余容量要大于等于requiredSize+scheduledSize
7、节点负载不能大于集群平均负载的2倍
8、不能超过机架最大存放副本数(maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2)
scheduledSize
scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
DatanodeDescriptor.java
/**
* @return Approximate number of blocks currently scheduled to be written
* to the given storage type of this datanode.
*/
public int getBlocksScheduled(StorageType t) {
return (int)(currApproxBlocksScheduled.get(t)
+ prevApproxBlocksScheduled.get(t));
}
返回pipeline
接下来是选择好了副本存放的节点,按最短距离组成pipeline返回
/**
* Return a pipeline of nodes.
* The pipeline is formed finding a shortest path that
* starts from the writer and traverses all <i>nodes</i>
* This is basically a traveling salesman problem.
*/
private DatanodeStorageInfo[] getPipeline(Node writer,
DatanodeStorageInfo[] storages) {
if (storages.length == 0) {
return storages;
}
synchronized(clusterMap) {
int index=0;
// 首先如果writer请求方本身不在一个datanode上,则默认选取第一个datanode作为起始节点
if (writer == null || !clusterMap.contains(writer)) {
writer = storages[0].getDatanodeDescriptor();
}
for(; index < storages.length; index++) {
// 获取当前index下标所属的Storage为最近距离的目标storage
DatanodeStorageInfo shortestStorage = storages[index];
// 计算当前距离
int shortestDistance = clusterMap.getDistance(writer,
shortestStorage.getDatanodeDescriptor());
int shortestIndex = index;
for(int i = index + 1; i < storages.length; i++) {
// 遍历计算后面的距离
int currentDistance = clusterMap.getDistance(writer,
storages[i].getDatanodeDescriptor());
if (shortestDistance>currentDistance) {
shortestDistance = currentDistance;
shortestStorage = storages[i];
shortestIndex = i;
}
}
// 找到新的最短距离的storage,并进行下标替换
if (index != shortestIndex) {
storages[shortestIndex] = storages[index];
storages[index] = shortestStorage;
}
// 找到当前这一轮的最近的storage,并作为下一轮迭代的源节点
writer = shortestStorage.getDatanodeDescriptor();
}
}
return storages;
}
概况就是选出一个源节点,根据这个节点,遍历当前可选的下一个目标节点,找出一个最短距离的节点,作为下一轮选举的源节点,这样每2个节点之间的距离总是最近的,于是整个pipeline节点间的距离和就保证是足够小的了。那要如何定义和计算2个节点直接的距离,如下图: