核心类在
org.apache.hadoop.hdfs.server.balancer.Balancer
均衡算法 伪代码
- while(true){
- 1.获取需要迁移的字节数
- if(需要迁移字节数==0){
- return"成功,无需迁移";
- }
- 2.选择需要迁移的节点
- if(需要移动的数据==0){
- return"没有需要移动的块"
- }
- 3.开始并行迁移
- 4.清空列表
- 5.Thread.sleep(2*conf.getLong("dfs.heartbeat.interval",3));
- }
获取所有的data node节点,计算
initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
initNodes()函数如下:
- 计算平均使用量
- longtotalCapacity=0L,totalUsedSpace=0L;
- for(DatanodeInfodatanode:datanodes){
- if(datanode.isDecommissioned()||datanode.isDecommissionInProgress()){
- continue;
- }
- totalCapacity+=datanode.getCapacity();
- totalUsedSpace+=datanode.getDfsUsed();
- }
当前集群的平均使用率(是当前使用的空间/总空间*100),注意这个是百分比计算后再乘100的值,不是百分比
this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
四个队列
1.aboveAvgUtilizedDatanodes(超过集群平均使用率 && 低于集群平均使用率+阀值)
2.overUtilizedDatanodes(超过集群平均使用率+阀值)
3.belowAvgUtilizedDatanodes(低于集群平均使用率 && 超过集群平均使用率-阀值)
4.underUtilizedDatanodes(低于集群平均使用率-阀值)
2个参数
overLoadedBytes 超过负载值的字节
underLoadedBytes低于负载值的字节
- for(DatanodeInfodatanode:datanodes){
- if(当前节点使用率>集群平均使用率){
- if(当前节点使用率<=(集群平均使用率+阀值)&&当前节点使用率>集群平均使用率){
- 创建一个BalancerDatanode
- aboveAvgUtilizedDatanodes.save(当前节点)
- }
- else{
- overUtilizedDatanodes.save(当前节点)
- overLoadedBytes+=(当前节点使用率-集群平均使用率-阀值)*当前节点总数据量/100
- }
- }
- else{
- 创建一个BalancerDatanode
- if(当前节点使用率>=(集群平均使用率-阀值)&&当前节点使用率<集群平均使用率){
- belowAvgUtilizedDatanodes.save(当前节点)
- }
- else{
- underUtilizedDatanodes.save(当前节点)
- underLoadedBytes+=(集群平均使用率-阀值-当前节点使用率)*当前节点总数据量/100
- }
- }
- }
- 均衡器只会执行overUtilizedDatanodes和underUtilizedDatanodes队列中的集群
BalancerDatanode()构造函数
- if(当前节点使用率>=集群平均使用率+阀值||当前节点使用率<=集群平均使用率-阀值){
- 一次移动的数据量=阀值*当前节点总容量/100
- }
- else{
- 一次移动的数据量=(集群平均使用率-当前节点使用率)*当前节点总容量/100
- }
- 一次移动的数据量=min(当前节点剩余使用量,一次移动的数据量)
- 一次移动的数据量=(一次移动数据量上限10G,一次移动的数据量)
chooseNodes()函数
- chooseNodes(true);
- chooseNodes(false);
- chooseNodes(booleanonRack){
- chooseTargets(underUtilizedDatanodes.iterator(),onRack);
- chooseTargets(belowAvgUtilizedDatanodes.iterator(),onRack);
- chooseSources(aboveAvgUtilizedDatanodes.iterator(),onRack);
- }
- chooseTargets(){
- for(源节点source:overUtilizedDatanodes列表){
- 选择目标节点(source)
- }
- }
- 选择目标节点(source){
- while(){
- 1.从候选队列中找到一个节点
- 2.如果这个可转移的数据已经满了continue
- 3.if(在相同机架中转移)
- 4.if(在不同机架中转移)
- 5.创建NodeTask
- }
- }
- chooseSources(){
- for(目标节点target:underUtilizedDatanodes){
- 选择源节点()
- }
- }
- 选择源节点(target){
- while(){
- 1.从候选队列中找到一个节点
- 2.如果这个节点可转移的数据已经满了continue
- 3.if(在相同机架中转移)
- 4.if(在不同机架中转移)
- 5.创建NodeTask
- }
- }
- 控制台或者日志上会显示Decidedtomove3.55GBbytesfromsource_host:50010totarget_host:50010
开始并行迁移数据
- for(Sourcesource:sources){
- futures[i++]=dispatcherExecutor.submit(source.new());
- }
BlockMoveDispatcher线程
- 1.选择要迁移的节点chooseNextBlockToMove()
- 2.if(要迁移的节点!=null){
- scheduleBlockMove()
- }
- 3.获取block列表,继续下一轮迁移
发送和接收数据块的dispatch()函数
- sock.connect(NetUtils.createSocketAddr(
- target.datanode.getName()),HdfsConstants.READ_TIMEOUT);
- sock.setKeepAlive(true);
- out=newDataOutputStream(newBufferedOutputStream(
- sock.getOutputStream(),FSConstants.BUFFER_SIZE));
- sendRequest(out);
- in=newDataInputStream(newBufferedInputStream(
- sock.getInputStream(),FSConstants.BUFFER_SIZE));
- receiveResponse(in);
- bytesMoved.inc(block.getNumBytes());