Heim  >  Artikel  >  Datenbank  >  控制Hive MAP个数详解

控制Hive MAP个数详解

WBOY
WBOYOriginal
2016-06-07 17:33:051668Durchsuche

Hive的MAP数或者说MAPREDUCE的MAP数是由谁来决定的呢?inputsplit size,那么对于每一个inputsplit size是如何计算出来的,这是做

Hive的MAP数或者说MAPREDUCE的MAP数是由谁来决定的呢?inputsplit size,那么对于每一个inputsplit size是如何计算出来的,这是做MAP数调整的关键.
Hadoop给出了Inputformat接口用于描述输入数据的格式,,其中一个关键的方法就是getSplits,对输入的数据进行分片.
Hive对InputFormat进行了封装:

而具体采用的实现是由参数hive.input.format来决定的,主要使用2中类型HiveInputFormat和CombineHiveInputFormat.
对于HiveInputFormat来说:


 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    //扫描每一个分区
    for (Path dir : dirs) {
      PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
    //获取分区的输入格式
      Class inputFormatClass = part.getInputFileFormatClass();
      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
    //按照相应格式的分片算法获取分片
    //注意:这里的Inputformat只是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,因此不能采用新的API,否则在查询时会报异常:Input format must implement InputFormat.区别就是新的API的计算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一样;
      InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
      for (InputSplit is : iss) {
    //封装结果,返回
        result.add(new HiveInputSplit(is, inputFormatClass.getName()));
      }
    }
    return result.toArray(new HiveInputSplit[result.size()]);
}

 

对于CombineHiveInputFormat来说的计算就比较复杂了:


 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    //加载CombineFileInputFormatShim,这个类继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat
    CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
        .getCombineFileInputFormat();
if (combine == null) {
//若为空则采用HiveInputFormat的方式,下同
      return super.getSplits(job, numSplits);
    }
    Path[] paths = combine.getInputPathsShim(job);
for (Path path : paths) {
//若是外部表,则按照HiveInputFormat方式分片
      if ((tableDesc != null) && tableDesc.isNonNative()) {
        return super.getSplits(job, numSplits);
      }
      Class inputFormatClass = part.getInputFileFormatClass();
      String inputFormatClassName = inputFormatClass.getName();
      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
      if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) {
        if (inputFormat instanceof TextInputFormat) {
        if ((new CompressionCodecFactory(job)).getCodec(path) != null)
//在未开启hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)参数情况下,对于TextInputFormat并且为压缩则采用HiveInputFormat分片算法
                    return super.getSplits(job, numSplits);
        }
      }
    //对于连接式同上
      if (inputFormat instanceof SymlinkTextInputFormat) {
        return super.getSplits(job, numSplits);
      }
      CombineFilter f = null;
      boolean done = false;
Path filterPath = path;
//由参数hive.mapper.cannot.span.multiple.partitions控制,默认false;如果没true,则对每一个partition创建一个pool,以下省略为true的处理;对于同一个表的同一个文件格式的split创建一个pool为combine做准备;
      if (!mrwork.isMapperCannotSpanPartns()) {
        opList = HiveFileFormatUtils.doGetWorksFromPath(
                  pathToAliases, aliasToWork, filterPath);
        f = poolMap.get(new CombinePathInputFormat(opList, inputFormatClassName));
      }
      if (!done) {
        if (f == null) {
          f = new CombineFilter(filterPath);
          combine.createPool(job, f);
        } else {
          f.addPath(filterPath);
        }
      }
    }
if (!mrwork.isMapperCannotSpanPartns()) {
//到这里才调用combine的分片算法,继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat
      iss = Arrays.asList(combine.getSplits(job, 1));
}
//对于sample查询特殊处理
    if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) {
      iss = sampleSplits(iss);
}
//封装结果返回
    for (InputSplitShim is : iss) {
      CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is);
      result.add(csplit);
    }
    return result.toArray(new CombineHiveInputSplit[result.size()]);
  }

更多详情见请继续阅读下一页的精彩内容

Hive 的详细介绍:请点这里
Hive 的下载地址:请点这里

相关阅读:

基于Hadoop集群的Hive安装

Hive内表和外表的区别

Hadoop + Hive + Map +reduce 集群安装部署

Hive本地独立模式安装

Hive学习之WordCount单词统计

linux

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn