Home >Database >Mysql Tutorial >MR总结(三)-MapReduce组件自定义

MR总结(三)-MapReduce组件自定义

WBOY
WBOYOriginal
2016-06-07 16:41:231197browse

自定义InputFormat InputFormat主要包括: ? ? ? ? ? InputSplit和RecordReader ? ? InputSplit用于定义Map的数目和确定最合适的执行节点(位置) ? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。 ? 一个自定义分片实现要继承与抽

自定义InputFormat

InputFormat主要包括:

? ? ? ? ? InputSplit和RecordReader

? ?InputSplit用于定义Map的数目和确定最合适的执行节点(位置)

? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。

? 一个自定义分片实现要继承与抽象类InputSplit,通过定义输入的长度和位置。分片的位置暗示调度器如何是放置一个分片的执行器(即,选择一个合适的TaskTracker)

? JobTracker处理分片的算法大致是:

  1. 通过TaskTracker节点的心跳获取可用的map槽资源
  2. 从排队等候的分片中找出那些可用的节点是本地的
  3. 向TaskTracker提交分片

? 基于存储机制和执行策略,分片的大小和位置是有不同的意思。例如在HDFS上,一个分片和一个物理数据块是一致的,分片的位置是这个数据块的物理存放位置的一个集合。

? 下面是FileInputFormat工作的机制

  1. 继承InputSplit,计算文件的信息。如文件中数据块的开始位置和块的长度
  2. 获取文件的数据块的一个集合
  3. 创建一个数据分片,这个分片的长度和块大小一样,分片位置为这个快的位置。此外还含有文件位置,块位移,长度等信息。

下面是FileInputFormat创建分片的代码:


/**
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<inputsplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<inputsplit> splits = new ArrayList<inputsplit>();
    List<filestatus> files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }

        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits;
  }
</filestatus></inputsplit></inputsplit></inputsplit>

方法创建分片过程主要做了下面几件事:1、首先从Job对象中获取输入文件的状态信息FileStatus。2.然后针对每个文件获取块信息。3.根据文件是否可分割按照分片大小进行切割,如果不能则不分割。

?案例:实现计算密集型InputFormat

有一种比较常见的MapReduce程序:计算密集型程序。

计算密集型MR程序即指:对于与每一个输入的键值对需要复杂的计算算法去处理,主要特征是每一个map处理函数需要比获取该处理数据更长的时间,至少一个量级。比如人脸识别程序。

如果使用默认的FileInputFormat去处理该类型应用的话,很大情况下会出现部门机器cpu负载过高,而其他的则比较闲。(可以通过ganglia监控分析)

默认情况下由于FileInputFormat的实现会根据数据的本地性来创建分片数据。然而对于计算密集型的程序数据的本地性可能不适合了。那我们应该如何做出改变呢?我们获取所有可用服务器各自计算能力的情况,根据服务器来分配创建分片

因此我们需要重载分片函数。

下面我们重载SequenceFileInputFormat来演示如何实现上述要求:

ComputeIntensiveSequenceFileInputFormat继承SequenceFileInputFormat函数,重载gitSplits函数:

//重写分片函数
 @Override
 public List<inputsplit> getSplits(JobContext job) throws IOException {
  String[] servers = getActiveServersList(job);
  if (servers == null)
   return null;
  List<inputsplit> splits = new ArrayList<inputsplit>();
  List<filestatus> files = listStatus(job);
  int currentServer = 0;
  for (FileStatus file : files) {
   Path path = file.getPath();
   long length = file.getLen();
   if ((length != 0) && isSplitable(job, path)) {
    long blockSize = file.getBlockSize();
    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    long bytesRemaining = length;
    while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
     splits.add(new FileSplit(path, length - bytesRemaining,
       splitSize, new String[] { servers[currentServer] }));
     currentServer = getNextServer(currentServer, servers.length);
     bytesRemaining -= splitSize;
    }
   } else if (length != 0) {
    splits.add(new FileSplit(path, 0, length,
      new String[] { servers[currentServer] }));
    currentServer = getNextServer(currentServer, servers.length);
   }
  }
  // Save the number of input files in the job-conf
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  return splits;
 }
 //获取服务器列表
 private String[] getActiveServersList(JobContext context) {
  String[] servers = null;
  try {
   JobClient jc = new JobClient((JobConf) context.getConfiguration());
   ClusterStatus status = jc.getClusterStatus(true);
   Collection<string> atc = status.getActiveTrackerNames();
   servers = new String[atc.size()];
   int s = 0;
   for (String serverInfo : atc) {
    StringTokenizer st = new StringTokenizer(serverInfo, ":");
    String trackerName = st.nextToken();
    StringTokenizer st1 = new StringTokenizer(trackerName, "_");
    st1.nextToken();
    servers[s++] = st1.nextToken();
   }
  } catch (IOException e) {
   e.printStackTrace();
  }
  return servers;
 }
 //选择一个服务器
 private static int getNextServer(int current, int max) {
  current++;
  if (current >= max)
   current = 0;
  return current;
 }

</string></filestatus></inputsplit></inputsplit></inputsplit>

这个类继承于SequenceFileInputFormat,重写了getSplits()函数。计算分片的和FileInputFormat一样。只是原来数据的物理本地性由可用的服务器资源代替。两个主要函数:
getActiveServersList() 查询集群状态,计算一个可用的服务器名字列表
getNextServer() 获取一个服务器

优化:
上面方案是否就已经很完美,没有问题了呢?答案是否定的。
在上面我们替换了数据物理本地性这个特性,那么会导致更多的数据传输的问题,会给网络io带来压力,影响io性能。

因此我们想到可以把两个策略综合起来。首先放置尽可能多的任务为本地,分发剩下的到其他节点。
下面是实现这个方案的程序ComputeIntensiveLocalizedSequenceFileInputFormat:

@Override
 public List<inputsplit> getSplits(JobContext job) throws IOException {
  List<inputsplit> originalSplits = super.getSplits(job);
  String[] servers = getActiveServersList(job);
  if (servers == null)
   return null;
  List<inputsplit> splits = new ArrayList<inputsplit>(
    originalSplits.size());
  int numSplits = originalSplits.size();
  int currentServer = 0;
  for (int i = 0; i 
<p>这里第一步利用父类(FileInputFormat))获取分片来确保数据本地性。对于每一个服务器,首先试着去指定本地的split给它。其他没有本地分片的则随机分配剩下的分片。</p>

<p><strong>总结:</strong></p>
<p>MapReduce的输入格式的重写主要要主要两个组件:InputFormat和Recordreader。本文主要讲述InputFormat的原理及怎么来重写InputFormat,根据业务的特点选择创建分片的策略。</p>
    <p class="copyright">
        原文地址:MR总结(三)-MapReduce组件自定义, 感谢原作者分享。
    </p>
    
    


</inputsplit></inputsplit></inputsplit></inputsplit>
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn