HDFS选择数据节点的方式越来越复杂,也越来越考虑到吞吐量,但是同样存在着一些问题。
分配数据节点的所需的参数主要有文件副本数numOfReplicas、writer(客户端所在节点)、excludedNodes(客户端排除的节点)、chosenNodes(已经选择的节点)
方法是BlockPlacementPolicyDefault类的
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes,
HashMap<Node, Node> excludedNodes,
long blocksize)
首先如果文件副本数为0或者集群存活的DataNode数量为0,则选不出节点;
计算出每个机架的节点数,数据节点属于哪个机架是手动配置的,没有配置的数据节点是处于default默认机架上的。
如果已经选择好的节点chosenNodes不为空,那么在选择新节点的时候需要将chosenNodes加入到excludedNodes节点中,不能选择已经存在文件副本的节点;
int numOfResults = results.size();
...
if (numOfResults == 0) {
writer = chooseLocalNode(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 1) {
chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, results.get(0), excludedNodes,
blocksize, maxNodesPerRack,
results, avoidStaleNodes);
} else if (newBlock){
chooseLocalRack(results.get(1), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
} else {
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
如果已选节点数为0,则先选择一个节点,调用chooseLocalNode,如果writer是集群数据节点中的一个并且是一个Good的节点,则可以将这个节点作为第一个节点,如果不是一个Good节点,则从和writer同一个机架上随机选择一个节点出来;如果writer不是集群数据节点中的一个,则从集群中随机选择一个数据节点作为第一个节点。
如果已选节点数为1,则开始选择第二个节点,调用chooseRemoteRack,从和第一个节点不同的机架上选择一个节点出来;
选择第三个节点的时候,如果第一个和第二个节点在同一个机架上,则再次从其他机架上去选择一个节点;如果是一个新的Block(不是append的),则选择一个跟第二个节点在同一机架上的节点出来作为第三个节点;如果不是一个新的Block(append),则使用和选择第一个节点一样的方式选择节点;
除去以上三个节点的选择外,如果有更多的副本,则直接使用随机选择方式;
以上选择节点的方式,每个节点选择的时候都会判断是否是Good节点,判断Good节点的条件有很多:
isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
首先是Node的状态,是否是处于下线或者正在下线的节点,如果是则不是一个Good的节点;
然后判断该节点是否是陈旧的节点,判断依据默认是心跳时间超过30s的,如果需要避免陈旧的节点,并且这个节点是陈旧的节点,则这个节点不是一个Good节点;
接下来是节点容量,计算剩余容量
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
node剩余容量是DataNode心跳汇报上来的数据,可用容量需要减去正在调度的Block占去的空间,如果
blockSize*HdfsConstants.MIN_BLOCKS_FOR_WRITE > remaining,则不是一个Good节点,MIN_BLOCKS_FOR_WRITE 默认为5;
选择节点可能还需要考虑节点的负载,是可配置的,负载是DataNode心跳汇报上来的正在接收数据块的线程数,如果节点负载过高,即节点负载大于平均负载的两倍,则不是一个Good节点;
如果选择的同一个机架上的节点数大于各机架最大可选节点数,由getMaxNodesPerRack方法计算出来的,则也不是一个Good节点;
使用负载判断方式可能存在有很多问题:
比如一共有10个节点,其中7个节点的容量已经不足,剩余三个节点在被分配了以后将会处于无法分配的状态;
以上只是比较简单的分析了一下分配数据的方式以及可能存在的问题。