首页  >  文章  >  数据库  >  Mapreduce读取hbase汇总到RDBMS

Mapreduce读取hbase汇总到RDBMS

WBOY
WBOY原创
2016-06-07 16:41:161351浏览

前言 Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。 HBase作为源的MapReduce读取示例 package hbase;import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import jav

前言

Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。

HBase作为源的MapReduce读取示例

<code>package hbase;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
public class ExampleHbaseToMysqlMapreduce {
    public static void main(String[] args) throws Exception {
        //hbase配置 
        Configuration config = HBaseConfiguration.create();
        String tableName = "flws";
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("5768014"));
        scan.setStopRow(Bytes.toBytes("5768888"));
        scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("AH"));
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        //JOB定义
        Job job = new Job(config, "ExampleHbaseMapreduce");
        job.setJarByClass(ExampleHbaseToMysqlMapreduce.class);
        //设置map读取hbase方法
        TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,
                Text.class,Text.class, job);
        //reduce设置
        job.setReducerClass(MyReducer.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setNumReduceTasks(5);
        boolean b = job.waitForCompletion(true);
        if (!b) {
            throw new Exception("error with job!");
        }
    }
    public static class MyMapper extends TableMapper<text text> {
        public void map(ImmutableBytesWritable row, Result value,
                Context context) throws IOException, InterruptedException {
            context.write(
                    new Text(row.get()),
                    new Text(value.getValue(Bytes.toBytes("cf"),
                            Bytes.toBytes("AH"))));
        }
    }
    public static class MyReducer extends
            TableReducer<text text immutablebyteswritable> {
        private Connection conn = null;
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            String driver = "com.mysql.jdbc.Driver";
            String url = "jdbc:mysql://172.16.35.242/judgment?useUnicode=true&characterEncoding=gbk&zeroDateTimeBehavior=convertToNull";
            try {
                Class.forName(driver);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            try {
                conn = DriverManager.getConnection(url, "root", "root");
            } catch (SQLException e) {
                e.printStackTrace();
            }
            super.setup(context);
        }
        public void reduce(Text key, Iterable<text> values, Context context)
                throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for (Text text : values) {
                sb.append(text.toString());
            }
            try {
                Statement st = conn.createStatement();
                st.executeUpdate("insert into test_mapreduce (id,ah) values ("
                        + Integer.valueOf(key.toString()) + ",'"
                        + sb.toString() + "')");
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}
</text></text></text></code>
声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn