Heim >Datenbank >MySQL-Tutorial >Hadoop : 一个目录下的数据只由一个map处理

Hadoop : 一个目录下的数据只由一个map处理

WBOY
WBOYOriginal
2016-06-07 16:38:271261Durchsuche

有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。 刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 mapreduce job让一个文件只由一个map来处理。 或者是把目录写在文

有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。

刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 “mapreduce job让一个文件只由一个map来处理“。

或者是把目录写在文件里面,作为输入:

/path/to/directory1
/path/to/directory2
/path/to/directory3

代码里面按行读取:

 @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            FileSystem fs = FileSystem.get(context.getConfiguration());
            for (FileStatus status : fs.listStatus(new Path(value.toString()))) {
                // process file
            }
        }

都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:

import java.io.IOException;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
/**
 * 一个map处理一个目录的数据
 */
public abstract class OneMapOneDirectoryInputFormat extends CombineFileInputFormat {
    private static final Log LOG = LogFactory.getLog(OneMapOneDirectoryInputFormat.class);
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    @Override
    public List getSplits(JobContext job) throws IOException {
        // get all the files in input path
        List stats = listStatus(job);
        List splits = new ArrayList();
        if (stats.size() == 0) {
            return splits;
        }
        LOG.info("fileNums=" + stats.size());
        Map> map = new HashMap>();
        for (FileStatus stat : stats) {
            String directory = stat.getPath().getParent().toString();
            if (map.containsKey(directory)) {
                map.get(directory).add(stat);
            } else {
                List fileList = new ArrayList();
                fileList.add(stat);
                map.put(directory, fileList);
            }
        }
        // 设置inputSplit
        long currentLen = 0;
        List pathLst = new ArrayList();
        List offsetLst = new ArrayList();
        List lengthLst = new ArrayList();
        Iterator itr = map.keySet().iterator();
        while (itr.hasNext()) {
            String dir = itr.next();
            List fileList = map.get(dir);
            for (int i = 0; i  path[" + i + "]=" + pathArray[i].toString());
            }
            splits.add(thissplit);
            pathLst.clear();
            offsetLst.clear();
            lengthLst.clear();
            currentLen = 0;
        }
        return splits;
    }
    private long[] getLongArray(List lst) {
        long[] rst = new long[lst.size()];
        for (int i = 0; i 
<p>这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。</p>
    <p class="copyright">
        原文地址:Hadoop : 一个目录下的数据只由一个map处理, 感谢原作者分享。
    </p>
    
    


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn