Home >Database >Mysql Tutorial >Mapreduce读取hbase汇总到RDBMS

Mapreduce读取hbase汇总到RDBMS

WBOY
WBOYOriginal
2016-06-07 16:41:161385browse

前言 Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。 HBase作为源的MapReduce读取示例 package hbase;import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import jav

前言

Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。

HBase作为源的MapReduce读取示例

<code>package hbase;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
public class ExampleHbaseToMysqlMapreduce {
    public static void main(String[] args) throws Exception {
        //hbase配置 
        Configuration config = HBaseConfiguration.create();
        String tableName = "flws";
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("5768014"));
        scan.setStopRow(Bytes.toBytes("5768888"));
        scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("AH"));
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        //JOB定义
        Job job = new Job(config, "ExampleHbaseMapreduce");
        job.setJarByClass(ExampleHbaseToMysqlMapreduce.class);
        //设置map读取hbase方法
        TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,
                Text.class,Text.class, job);
        //reduce设置
        job.setReducerClass(MyReducer.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setNumReduceTasks(5);
        boolean b = job.waitForCompletion(true);
        if (!b) {
            throw new Exception("error with job!");
        }
    }
    public static class MyMapper extends TableMapper<text text> {
        public void map(ImmutableBytesWritable row, Result value,
                Context context) throws IOException, InterruptedException {
            context.write(
                    new Text(row.get()),
                    new Text(value.getValue(Bytes.toBytes("cf"),
                            Bytes.toBytes("AH"))));
        }
    }
    public static class MyReducer extends
            TableReducer<text text immutablebyteswritable> {
        private Connection conn = null;
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            String driver = "com.mysql.jdbc.Driver";
            String url = "jdbc:mysql://172.16.35.242/judgment?useUnicode=true&characterEncoding=gbk&zeroDateTimeBehavior=convertToNull";
            try {
                Class.forName(driver);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            try {
                conn = DriverManager.getConnection(url, "root", "root");
            } catch (SQLException e) {
                e.printStackTrace();
            }
            super.setup(context);
        }
        public void reduce(Text key, Iterable<text> values, Context context)
                throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for (Text text : values) {
                sb.append(text.toString());
            }
            try {
                Statement st = conn.createStatement();
                st.executeUpdate("insert into test_mapreduce (id,ah) values ("
                        + Integer.valueOf(key.toString()) + ",'"
                        + sb.toString() + "')");
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}
</text></text></text></code>
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn