Heim  >  Artikel  >  Datenbank  >  HBase快速导入数据--BulkLoad

HBase快速导入数据--BulkLoad

WBOY
WBOYOriginal
2016-06-07 15:21:331135Durchsuche

Apache HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。但是怎样有效的将数据导入到HBase呢?

Apache HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。但是怎样有效的将数据导入到HBase呢?HBase有多种导入数据的方法,最直接的方法就是在MapReduce作业中使用TableOutputFormat作为输出,或者使用标准的客户端API,但是这些都不是非常有效的方法。

Bulkload利用MapReduce作业输出HBase内部数据格式的表数据,然后将生成的StoreFiles直接导入到集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。

Bulkload过程主要包括三部分:

1.从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS

这一步不在HBase的考虑范围内,不管数据源是什么,,只要在进行下一步之前将数据上传到HDFS即可。

2.利用一个MapReduce作业准备数据

这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce函数不需要我们考虑,由HBase提供。该作业需要使用rowkey(行键)作为输出Key,KeyValue、Put或者Delete作为输出Value。MapReduce作业需要使用HFileOutputFormat2来生成HBase数据文件。为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区域中。为了达到这个目的,MapReduce作业会使用Hadoop的TotalOrderPartitioner类根据表的key值将输出分割开来。HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。

3.告诉RegionServers数据的位置并导入数据

这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到相应的区域。

下图简单明确的说明了整个过程

HBase快速导入数据--BulkLoad

图片来自How-to: Use HBase Bulk Loading, and Why

Note:在进行BulkLoad之前,要在HBase中创建与程序中同名且结构相同的空表

Java实现如下:

BulkLoadDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Created by shaobo on 15-6-9.
 */
public class BulkLoadDriver extends Configured implements Tool {
    private static final String DATA_SEPERATOR = "\\s+";
    private static final String TABLE_NAME = "temperature";//表名
    private static final String COLUMN_FAMILY_1="date";//列组1
    private static final String COLUMN_FAMILY_2="tempPerHour";//列组2

    public static void main(String[] args) {
        try {
            int response = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadDriver(), args);
            if(response == 0) {
                System.out.println("Job is successfully completed...");
            } else {
                System.out.println("Job failed...");
            }
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }

    public int run(String[] args) throws Exception {
        String outputPath = args[1];
        /**
        * 设置作业参数
        */
        Configuration configuration = getConf();
        configuration.set("data.seperator", DATA_SEPERATOR);
        configuration.set("hbase.table.name", TABLE_NAME);
        configuration.set("COLUMN_FAMILY_1", COLUMN_FAMILY_1);
        configuration.set("COLUMN_FAMILY_2", COLUMN_FAMILY_2);
        Job job = Job.getInstance(configuration, "Bulk Loading HBase Table::" + TABLE_NAME);
        job.setJarByClass(BulkLoadDriver.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类
        job.setMapOutputValueClass(Put.class);//指定输出值类
        job.setMapperClass(BulkLoadMapper.class);//指定Map函数
        FileInputFormat.addInputPaths(job, args[0]);//输入路径
        FileSystem fs = FileSystem.get(configuration);
        Path output = new Path(outputPath);
        if (fs.exists(output)) {
            fs.delete(output, true);//如果输出路径存在,就将其删除
        }
        FileOutputFormat.setOutputPath(job, output);//输出路径
        Connection connection = ConnectionFactory.createConnection(configuration);
        TableName tableName = TableName.valueOf(TABLE_NAME);
        HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));
        job.waitForCompletion(true);
        if (job.isSuccessful()){
            HFileLoader.doBulkLoad(outputPath, TABLE_NAME);//导入数据
            return 0;
        } else {
            return 1;
        }
    }

}

BulkLoadMapper.java

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn