Home  >  Article  >  Database  >  Giraph源码分析(九)Aggregators原理解析

Giraph源码分析(九)Aggregators原理解析

WBOY
WBOYOriginal
2016-06-07 15:56:381671browse

本人原创,转载请注明出处!欢迎大家加入 Giraph 技术交流群 : 228591158 Giraph中Aggregator的用法请参考官方文档:http://giraph.apache.org/aggregators.html ,本文重点在解析Giraph如何实现 Aggregators 。 基本原理 :在每个超级步中,每个Worker计算

本人原创,转载请注明出处!欢迎大家加入Giraph 技术交流群: 228591158

Giraph中Aggregator的用法请参考官方文档:http://giraph.apache.org/aggregators.html ,本文重点在解析Giraph如何实现Aggregators

基本原理:在每个超级步中,每个Worker计算本地的聚集值。超级步计算完成后,把本地的聚集值发送给Master汇总。在MasterCompute()执行后,把全局的聚集值回发给所有的Workers。

缺点:当某个应用(或算法)使用了多个聚集器(Aggregators),Master要完成所有聚集器的计算。因为Master要接受、处理、发送大量的数据,无论是在计算方面还是网络通信层次,都会导致Master成为系统瓶颈。

改进:采用分片聚集 (sharded aggregators) . 在每个超级步的最后,每个聚集器被派发给一个Worker,该Worker接受和聚集其他Workers发送给该聚集器的值。然后Workers把自己的所有的聚集器发送给Master,这样Master就无需执行任何聚集,只是接收每个聚集器的最终值。在MasterCompute.compute执行后,Master不是直接把所有的聚集器发送给所有的Workers,而是发送给聚集器所属的Worker,然后每个Worker再把其上的聚集器发送给所有的Workers.

首先给出Master Worker间, Worker Worker间通信协议,在每个类中的doRequest(ServerData serverData)方法中会解析并存储收到的消息。
1). org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest 类 . Worker --> Worker Owner
功能:每个worker把当前超步的局部 aggregated values 发送到该Aggregator的拥有者。
2). org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest 类. Worker Owner--> Master
功能:每个Worker把自己所拥有的Aggregator的最终 aggregated values 发送给 master。
3). org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest 类. Master --> Worker Owner.
功能:master把最终的 aggregated values 或aggregators 发送给该Aggregator的拥有者。
4). org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest 类。 Worker Owner--> Worker
功能: 发送最终的 aggregated values 到 其他workers。发送者为该Aggregator的拥有者,接受者为除发送者之外的所有workers。

\

IDC49rOsvLayvU1hc3RlckNvbXB1dGUuY29tcHV0ZSgpt723qNbQu/G1w7XEvtu8r8b3JiMyMDU0MDu++c6qxuSz9cq8JiMyMDU0MDs8L3N0cm9uZz6ho7TTtdoxuPazrLy2sr2/qsq8o6xNYXN0ZXJDb21wdXRlLmNvbXB1dGUoKbe9t6iyxbvxtcPBy8v509BWZXJ0ZXguY29tcHV0ZSgp1Nq12jC49rOsvLayvb7bvK+1xCYjMjA1NDA7oaM8L3A+CjxwPjEuILTTtdowuPazrLy2sr2/qsq8o6xCc3BTZXJ2aWNlTWFzdGVytffTw01hc3RlckFnZ3JlZ2F0b3JIYW5kbGVywOC1xGZpbmlzaFN1cGVyU3RlcChNYXN0ZXJDbGllbnQgbWFzdGVyQ2xpZW50KSC3vbeosNG+27yvxvfFybeiuPhXb3JrZXKjrL7bvK/G97XEdmFsdWXOqsnP0ru49rOsvLayvbXEyKu+1r7bvK8mIzIwNTQwO6OoZmluYWwgYWdncmVnYXRlZCB2YWx1ZXOjqaOstdrSu7TOzqqz9cq8JiMyMDU0MDuho8/IuPiz9k1hc3RlckFnZ3JlZ2F0b3JIYW5kbGVytcTA4LzMs9C52M+1o6zI58/Co7o8L3A+CjxwPjxpbWcgc3JjPQ=="http://www.2cto.com/uploadfile/Collfiles/20140523/201405230851307.jpg" alt="\">

finishSuperStep(MasterClient masterClient) 方法核心内容如下:

喎?http://www.2cto.com/kf/ware/vc/" target="_blank" class="keylink">vcD4KPHByZSBjbGFzcz0="brush:sql;"> /** * Finalize aggregators for current superstep and share them with workers */ public void finishSuperstep(MasterClient masterClient) { for (AggregatorWrapper aggregator : aggregatorMap.values()) { if (aggregator.isChanged()) { // if master compute changed the value, use the one he chose aggregator.setPreviousAggregatedValue( aggregator.getCurrentAggregatedValue()); // reset aggregator for the next superstep aggregator.resetCurrentAggregator(); } } /** * 把聚集器发送给所属的Worker。发送内容: * 1). Name of the aggregator * 2). Class of the aggregator * 3). Value of the aggretator */ try { for (Map.Entry> entry : aggregatorMap.entrySet()) { masterClient.sendAggregator(entry.getKey(), entry.getValue().getAggregatorClass(), entry.getValue().getPreviousAggregatedValue()); } masterClient.finishSendingAggregatedValues(); } catch (IOException e) { throw new IllegalStateException("finishSuperstep: " + "IOException occurred while sending aggregators", e); } }
问题1:如何确定aggregator的Worker Owner ?

答:根据aggregator的Name来确定它所属的Worker,计算方法如下:

/**
 * 根据aggregatorName和所有的workers列表来计算aggregator所属的Worker
 * 参数aggregatorName:Name of the aggregator
 * 参数workers: Workers的list列表
 * 返回值:Worker which owns the aggregator
 */
public static WorkerInfo getOwner(String aggregatorName,List<WorkerInfo> workers) {
    //用aggregatorName的HashCode()值模以 Workers的总数目
    int index = Math.abs(aggregatorName.hashCode() % workers.size());
    return workers.get(index);  //返回aggregator所属的Worker
}
问题2:Worker 如何判断自身是否接收完自己所拥有的aggregators?

答:Master给某个Worker发送aggregators时,同时发送到该Worker的aggregators数目。使用的 SendAggregatorsToOwnerRequest类对消息进行封装和解析。

2. Worker接受Master发送的Aggregator,Worker把接收到的聚集体值发送给其他所有Workers,然后每个Workers就会得到上一个超级步的全局聚集值。

由前文知道,每个Worker都有一个ServerData对象,ServerData类中关于Aggregator的两个成员变量如下:

// 保存Worker在当前超步拥有的aggregators
private final OwnerAggregatorServerData ownerAggregator;
// 保存前一个超步的aggregators
private final AllAggregatorServerData allAggregatorData;

可以看到,ownerAggregatorData用来存储在当前超步Master发送给Worker的聚集器,allAggregatorData用来保存上一个超级步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 类中的doRequest(ServerData serverData)方法中,如下:

public void doRequest(ServerData serverData) {
    DataInput input = getDataInput();
    AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
    try {
      //收到的Aggregators数目。在CountingOutputStream类中有计数器counter,
      //每向输出流中添加一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流最前面。
      int numAggregators = input.readInt();
      for (int i = 0; i < numAggregators; i++) {
        String aggregatorName = input.readUTF();
        String aggregatorClassName = input.readUTF();
        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
          LongWritable count = new LongWritable(0);
          //Master发送给该Worker的requests总数目.
          count.readFields(input);
          aggregatorData.receivedRequestCountFromMaster(count.get(),
              getSenderTaskId());
        } else {
          Class<Aggregator<Writable>> aggregatorClass =
              AggregatorUtils.getAggregatorClass(aggregatorClassName);
          aggregatorData.registerAggregatorClass(aggregatorName,
              aggregatorClass);
          Writable aggregatorValue =
              aggregatorData.createAggregatorInitialValue(aggregatorName);
          aggregatorValue.readFields(input);
          //把收到的上一次全局聚集的值赋值给allAggregatorData
          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
          //ownerAggregatorData只接受聚集器
          serverData.getOwnerAggregatorData().registerAggregator(
              aggregatorName, aggregatorClass);
        }
      }
    } catch (IOException e) {
      throw new IllegalStateException("doRequest: " +
          "IOException occurred while processing request", e);
    }
    //接受一个 request,计数减1,同时把收到的Data添加到allAggregatorServerData的List<byte[]> masterData中
    aggregatorData.receivedRequestFromMaster(getData());
 }

每个Worker在开始计算前,会调用BspServiceWorker类的prepareSuperStep()方法来进行聚集器值的派发和接受其他Workers发送的聚集器值。调用关系如下:

\

BspServiceWorker类的prepareSuperStep()方法如下:

@Override
public void prepareSuperstep() {
   if (getSuperstep() != INPUT_SUPERSTEP) {
	  /*
	   * aggregatorHandler为WorkerAggregatorHandler类型.可参考上文中MasterAggregatorHandler的类继承关系
	   * workerAggregatorRequestProcessor声明为WorkerAggregatorRequestProcessor(接口)类型,
	   * 实际为NettyWorkerAggregatorRequestProcessor的实例,用于Worker间发送聚集器的值。
	   */
      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
   }
}

WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法如下:

public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {
    AllAggregatorServerData allAggregatorData =
        serviceWorker.getServerData().getAllAggregatorData();
    /**
     * 等待直到Master发送给该Worker的聚集器都已接受完,
     * 返回值为Master发送给该Worker的所有Data(聚集器)
     */
    Iterable<byte[]> dataToDistribute =
        allAggregatorData.getDataFromMasterWhenReady(
            serviceWorker.getMasterInfo());
  
    // 把从Master收到的Data(聚集器)发送给其他所有Workers
    requestProcessor.distributeAggregators(dataToDistribute);

    // 等待直到接受完其他Workers发送给该Workers的聚集器
    allAggregatorData.fillNextSuperstepMapsWhenReady(
        getOtherWorkerIdsSet(), previousAggregatedValueMap,
        currentAggregatorMap);
    // 只是清空allAggregatorServerData的List<byte[]> masterData对象
    // 为下一个超级步接受Master发送的聚集器做准备
    allAggregatorData.reset();
}
下面详述Worker如何判定已接收完所有Master发送的所有Request ? 主要目的在于描述分布式环境下线程间如何协作。在AllAggregatorServerData类中定义了TaskIdsPermitBarrier类型的变量masterBarrier,用来判断是否接收完Master发送的Request. TaskIdsPermitBarrier类中主要使用wait()、notifyAll()等方法来控制,当获得的aggregatorName等于AggregatorUtils.SPECIAL_COUNT_AGGREGATOR时,会调用requirePermits(long permits,int taskId)来增加接收的arrivedTaskIds和需要等待的request数目waitingOnPermits. 接受一个Request

  /**
   * Require more permits. This will increase the number of times permits
   * were required. Doesn&#39;t wait for permits to become available.
   *
   * @param permits Number of permits to require
   * @param taskId Task id which required permits
   */
  public synchronized void requirePermits(long permits, int taskId) {
    arrivedTaskIds.add(taskId);
    waitingOnPermits += permits;
    notifyAll();
  }

\ 接受一个Request后,会调用releaseOnePermit()方法把waitingOnPermits减1。 \

3. 在Vertex.compute()方法中,每个Worker聚集自身的值。计算完成后,调用WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法,把本地的聚集器的值给句聚集器的aggregatorName发送给该aggregator所属的Worker. Aggregator的属主Worker接受其他所有Workers发送的本地聚集值进行汇总,汇总完毕后发送给Master,供下一次超级步的MasterCompute.compute()方法使用。finishSuperstep方法如下:

 /**
   * Send aggregators to their owners and in the end to the master
   *
   * @param requestProcessor Request processor for aggregators
   */
  public void finishSuperstep(
      WorkerAggregatorRequestProcessor requestProcessor) {
    OwnerAggregatorServerData ownerAggregatorData =
        serviceWorker.getServerData().getOwnerAggregatorData();
    // First send partial aggregated values to their owners and determine
    // which aggregators belong to this worker
    for (Map.Entry<String, Aggregator<Writable>> entry :
        currentAggregatorMap.entrySet()) {
        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
            entry.getValue().getAggregatedValue());
        if (!sent) {
          // If it&#39;s my aggregator, add it directly
          ownerAggregatorData.aggregate(entry.getKey(),
              entry.getValue().getAggregatedValue());
        }
    }
    // Flush
    requestProcessor.flush();
    // Wait to receive partial aggregated values from all other workers
    Iterable<Map.Entry<String, Writable>> myAggregators =
        ownerAggregatorData.getMyAggregatorValuesWhenReady(
            getOtherWorkerIdsSet());

    // Send final aggregated values to master
    AggregatedValueOutputStream aggregatorOutput =
        new AggregatedValueOutputStream();
    for (Map.Entry<String, Writable> entry : myAggregators) {
        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
            entry.getValue());
        if (currentSize > maxBytesPerAggregatorRequest) {
          requestProcessor.sendAggregatedValuesToMaster(
              aggregatorOutput.flush());
        }   
    }
    requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
    // Wait for master to receive aggregated values before proceeding
    serviceWorker.getWorkerClient().waitAllRequests();
    ownerAggregatorData.reset();
  }

调用关系如下:

\

4. 大同步后,Master调用MasterAggregatorHandler类的prepareSusperStep(masterClient)方法,收集聚集器的值。方法内容如下:

  public void prepareSuperstep(MasterClient masterClient) {

    // 收集上次超级步的聚集值,为master compute 做准备
    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
	// 如果是 Persistent Aggregator,则累加
	if (aggregator.isPersistent()) {
        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
      }
      aggregator.setPreviousAggregatedValue(
          aggregator.getCurrentAggregatedValue());
      aggregator.resetCurrentAggregator();
      progressable.progress();
    }
  }
然后调用MasterCompute.compute()方法(可能会修改聚集器的值),在该方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代。

备注:Job迭代结束条件有三,满足其一就行:
1) 达到最大迭代次数
2) 没有活跃顶点且没有消息在传递
3) 终止MasterCompute计算

总结:为解决在多个Aggregator条件下,Master成为系统瓶颈的问题。采取了把所有Aggregator派发给某一部分Workers,由这些Workers完成全局的聚集值的计算与发送,Master只需要与这些Workers进行简单数据通信即可,大大降低了Master的工作量。

追加:下面用图示方法说明上述执行过程。

实验条件:

1). 一个Master,四个Worker

2). 两个Aggregators,记为A1和A2。

1. Master把Aggregators发送给Workers,收到Aggregator的Worker就作为该Aggregator的Owner。下图中Master把A1发送给Worker1,A2发送给Worker3.那么Worker1就作为A1的Owner,Worker3就是A2的Owner。该步骤在MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法中完成,使用的是SendAggregatorsToOwnerRequest 通信协议。注:每个Owner Worker 可能有多个聚集器。

\

图1 Master分发Aggregator

2. Workers接受Master发送的Aggregator,然后把Aggregator发送给其他Workers。Worker1要把A1分别发送给Worker2、Worker3和Worker4;Worker3要把A2分别发送给Worker1、Worker2和Worker4。该步骤在WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完成,使用的是SendAggregatorsToMasterRequest 通信协议。此步骤完成后,每个Worker上都有了聚集器A1和A2(具体为上一个超步的全局最终聚集值)。

\

3. 每个Worker调用Vertex.compute()方法开始计算,收集本地的Aggregator聚集值。对聚集体A1来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:

A11 、A12、 A13、A14;对聚集器A2来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:

A21 、A22、 A23、A24 。计算完成后,每个Worker就要把本地的聚集值发送给聚集器的Owner,聚集器的Owner在接受的时候会合并聚集。那么A11 、A12、 A13、A14要发送给Worker1进行全局聚集得到A1,A21 、A22、 A23、A24 要发送给Worker3进行全局聚集得到A2’ 。

公式如下:

\

此部分采用的是SendWorkerAggregatorsRequest通信协议。Worker1和Worker3要把汇总的A1和A2的新值:A1’ 和A2’发送给Master,供下一次超级步的MasterCompute.compute()方法使用采用的是SendAggregatorsToMasterRequest通信协议。此部分在WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完成。过程如下图所示:

\

4. Master收到Worker1发送的A1’ 和Woker3发送的A2’后,此步骤在MasterAggregatorHandler类的prepareSusperStep(masterClient)方法中完成。然后调用MasterCompute.compute()方法,此方法可能会修改聚集器的值,如得到A1’’和A2’’。在masterCompute.compute()方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代,继续把A1’’发送给Worker1,A2’’发送给Worker3。



完!

本人原创,转载请注明出处!欢迎大家加入Giraph 技术交流群: 228591158

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn