Home >Database >Mysql Tutorial >HBase之普通BulkLoad

HBase之普通BulkLoad

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOriginal
2016-06-07 16:05:14974browse

为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。 Mapper: import com.google.common.base.Strings;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.io.L

为了保持MapReduce架构清晰,同时保留Map和Reduce结构。以便后续扩展。PS:写入HFile的时候,qualifier必须有序。

Mapper:

import com.google.common.base.Strings;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import yeepay.util.HBaseUtil;

public class LoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> {

    protected void map(LongWritable key, Text value, Context context) {

        try {

            String line = value.toString();

            if (Strings.isNullOrEmpty(line)) {
                return;
            }

            String[] arr = line.split("\t", 9);

            if (arr.length != 9) {
                throw new RuntimeException("line.splite() not == 9");
            }

            if (arr.length < 1) {
                return;
            }
            String k1 = arr[0];
            ImmutableBytesWritable keyH = new ImmutableBytesWritable(HBaseUtil.getRowKey(k1));
            context.write(keyH, new Text(line));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }


}

Reducer

import com.google.common.base.Splitter;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;

public class LoadReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {

    final static String[] fileds = new String[]{
            "ID",
            "A_ACCOUNT_ID",
            "A_TRX_ID",
            "P_ID",
            "P_TRXORDER_ID",
            "P_FRP_ID",
            "O_PRODUCTCAT",
            "O_RECEIVER_ID",
            "O_REQUESTID"
    };

    @Override
    public void reduce(ImmutableBytesWritable rowkey, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {

//        super.setID(stringArray[0]);
//        this.A_ACCOUNT_ID = stringArray[1];
//        this.A_TRX_ID = stringArray[2];
//        this.P_ID = stringArray[3];
//        this.P_TRXORDER_ID = stringArray[4];
//        this.P_FRP_ID = stringArray[5];
//        this.O_PRODUCTCAT = stringArray[6];
//        this.O_RECEIVER_ID = stringArray[7];
//        this.O_REQUESTID = stringArray[8];

        try {
            Text vv = values.iterator().next();
            String vs = vv.toString();

            Splitter splitter = Splitter.on("\t").limit(9);

            Iterable<String> iterable = splitter.split(vs);
            Iterator<String> iterator = iterable.iterator();
//            String[] arr = vs.split("\\t", 9);

            int i = 0;
//            Put put = new Put(rowkey.get());

            /**
             *       值的写入必须按照顺序。
             */
            Map<String, String> map = new TreeMap<String, String>();
            while (iterator.hasNext()) {
                map.put(fileds[i++], iterator.next());
            }

            for (Map.Entry<String, String> entry : map.entrySet()) {

                KeyValue kv = new KeyValue(rowkey.copyBytes(), Bytes.toBytes("f"), entry.getKey().getBytes(), 0L, entry.getValue().getBytes());
                context.write(rowkey, kv);


            }

        } catch (Exception e) {
            new RuntimeException(e);
        }


    }

}
Job&BulkLoad
package yeepay.load;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import yeepay.util.HdfsUtil;
import yeepay.util.YeepayConstant;

import java.util.Date;

public abstract class AbstractJobBulkLoad {
    public static Configuration conf = HBaseConfiguration.create();

    public void run(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("please set input dir");
            System.exit(-1);
            return;
        }
        String txtPath = args[0];
        String tableName = args[1];
        Job job = new Job(conf, "txt2HBase");
        HTable htable = null;
        try {
            htable = new HTable(conf, tableName); //set table name
            // 根据region的数量来决定reduce的数量以及每个reduce覆盖的rowkey范围
            HFileOutputFormat.configureIncrementalLoad(job, htable);
            htable.close();
            job.setJarByClass(AbstractJobBulkLoad.class);
            FileSystem fs = FileSystem.get(conf);

            System.out.println("input file :" + txtPath);
            Path inputFile = new Path(txtPath);
            if (!fs.exists(inputFile)) {
                System.err.println("inputFile " + txtPath + " not exist.");
                throw new RuntimeException("inputFile " + txtPath + " not exist.");
            }
            FileInputFormat.addInputPath(job, inputFile);
//
            job.setMapperClass(getMapperClass());
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setInputFormatClass(TextInputFormat.class);
//
            job.setReducerClass(getReducerClass());
            Date now = new Date();
            Path output = new Path("/output/" + tableName + "/" + now.getTime());
            System.out.println("/output/" + tableName + "/" + now.getTime());
            FileOutputFormat.setOutputPath(job, output);
            job.waitForCompletion(true);
            //执行BulkLoad
            HdfsUtil.chmod(conf, output.toString());
            HdfsUtil.chmod(conf, output + "/" + YeepayConstant.COMMON_FAMILY);
            htable = new HTable(conf, tableName);
            new LoadIncrementalHFiles(conf).doBulkLoad(output, htable);
            htable.close();
            System.out.println("HFile data load success!");
            System.out.println(getJobName() + " end!");

        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    protected abstract Class getMapperClass();

    protected abstract Class getReducerClass();


    protected abstract String getTableName();

    protected abstract String getJobName();
}
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:国外论文搜索Next article:删除重复数据