`

Hadoop-balancer执行原理

阅读更多

 

核心类在

org.apache.hadoop.hdfs.server.balancer.Balancer

 

均衡算法 伪代码

while(true) {
 1.获取需要迁移的字节数
 if(需要迁移字节数 == 0) {
  return "成功,无需迁移";
 }
 
 2.选择需要迁移的节点
 if(需要移动的数据 == 0) {
  return "没有需要移动的块"
 }
 
 3.开始并行迁移
 4.清空列表
 5.Thread.sleep(2*conf.getLong("dfs.heartbeat.interval", 3));
}

 

获取所有的data node节点,计算

initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));

initNodes()函数如下:

计算平均使用量
    long totalCapacity=0L, totalUsedSpace=0L;
    for (DatanodeInfo datanode : datanodes) {
      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
        continue; // ignore decommissioning or decommissioned nodes
      }
      totalCapacity += datanode.getCapacity();
      totalUsedSpace += datanode.getDfsUsed();
    }

 

当前集群的平均使用率(是当前使用的空间/总空间*100),注意这个是百分比计算后再乘100的值,不是百分比

this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;

 

 

四个队列

1.aboveAvgUtilizedDatanodes(超过集群平均使用率 && 低于集群平均使用率+阀值)

2.overUtilizedDatanodes(超过集群平均使用率+阀值)

3.belowAvgUtilizedDatanodes(低于集群平均使用率 && 超过集群平均使用率-阀值)

4.underUtilizedDatanodes(低于集群平均使用率-阀值)

 

2个参数

overLoadedBytes 超过负载值的字节

underLoadedBytes低于负载值的字节

 
//注意这里的阈值默认是10D,这里不是百分比计算集群平均使用率如果为0.5不是50%,而相当于0.5%
//所以如果是0.5-10D就变成负数了,一般来说肯定是小于当前节点使用率的,除非当前节点使用率特别大
//比如当前节点使用率为20,则用百分比来说就是使用了20%,这肯定就超于阈值了,于是这个节点的数据
//就需要均衡了
for (DatanodeInfo datanode : datanodes) {
    if(当前节点使用率 > 集群平均使用率) {
  if(当前节点使用率 <=(集群平均使用率+阀值) && 当前节点使用率 > 集群平均使用率) {
   创建一个BalancerDatanode
   aboveAvgUtilizedDatanodes.save(当前节点)
  }
  else {
   overUtilizedDatanodes.save(当前节点)
   overLoadedBytes += (当前节点使用率-集群平均使用率-阀值)*当前节点总数据量/100
  }
    }
   
    else {
     创建一个BalancerDatanode
     if(当前节点使用率>=(集群平均使用率-阀值) && 当前节点使用率<集群平均使用率) {
      belowAvgUtilizedDatanodes.save(当前节点)
     }
     else {
      underUtilizedDatanodes.save(当前节点)
      underLoadedBytes += (集群平均使用率-阀值-当前节点使用率)*当前节点总数据量/100
     }
    }
}

均衡器只会执行 overUtilizedDatanodes 和 underUtilizedDatanodes队列中的集群

 

 

BalancerDatanode()构造函数

if(当前节点使用率 >= 集群平均使用率+阀值 || 当前节点使用率 <= 集群平均使用率-阀值) {
    一次移动的数据量 = 阀值*当前节点总容量/100
}
else {
    一次移动的数据量 = (集群平均使用率-当前节点使用率) * 当前节点总容量/100
}
一次移动的数据量 = min(当前节点剩余使用量,一次移动的数据量)
一次移动的数据量 = (一次移动数据量上限10G,一次移动的数据量)

 

chooseNodes()函数

chooseNodes(true);	 //首先在相同机架中迁移
chooseNodes(false);	 //在不同机架中迁移

chooseNodes(boolean onRack) {
 chooseTargets(underUtilizedDatanodes.iterator(), onRack);
 chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
 chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
}

chooseTargets() {
 for(源节点 source : overUtilizedDatanodes列表) {
  选择目标节点(source)
 }
}
 
选择目标节点(source) {
 while() {
  1.从候选队列中找到一个节点
  2.如果这个可转移的数据已经满了continue
  3.if(在相同机架中转移)
  4.if(在不同机架中转移)
  5.创建NodeTask
 }
}

//和chooseTargets函数类似
chooseSources() {
 for(目标节点 target : underUtilizedDatanodes) {
  选择源节点()
 }
}

选择源节点(target) {
 while() {
  1.从候选队列中找到一个节点
  2.如果这个节点可转移的数据已经满了continue
  3.if(在相同机架中转移)
  4.if(在不同机架中转移)
  5.创建NodeTask
 }
}

控制台或者日志上会显示  Decided to move 3.55 GB bytes from source_host:50010 to target_host:50010

 

开始并行迁移数据

    for (Source source : sources) {
      futures[i++] = dispatcherExecutor.submit(source.new ());
    }

 

BlockMoveDispatcher线程

1.选择要迁移的节点 chooseNextBlockToMove()
2.if(要迁移的节点 != null) {
 //启动数据迁移,创建一个新线程发送接收数据
 scheduleBlockMove()
 
}
3.获取block列表,继续下一轮迁移

 

发送和接收数据块的dispatch()函数

//使用阻塞IO的方式发送数据并接收返回的结果
        sock.connect(NetUtils.createSocketAddr(
            target.datanode.getName()), HdfsConstants.READ_TIMEOUT);
        sock.setKeepAlive(true);
        out = new DataOutputStream( new BufferedOutputStream(
            sock.getOutputStream(), FSConstants.BUFFER_SIZE));
        sendRequest(out);
        in = new DataInputStream( new BufferedInputStream(
            sock.getInputStream(), FSConstants.BUFFER_SIZE));
        receiveResponse(in);
        bytesMoved.inc(block.getNumBytes());

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics