Heim  >  Artikel  >  Datenbank  >  Mapreduce读取hbase汇总到RDBMS

Mapreduce读取hbase汇总到RDBMS

WBOY
WBOYOriginal
2016-06-07 16:41:161321Durchsuche

前言 Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。 HBase作为源的MapReduce读取示例 package hbase;import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import jav

前言

Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。

HBase作为源的MapReduce读取示例

<code>package hbase;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
public class ExampleHbaseToMysqlMapreduce {
    public static void main(String[] args) throws Exception {
        //hbase配置 
        Configuration config = HBaseConfiguration.create();
        String tableName = "flws";
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("5768014"));
        scan.setStopRow(Bytes.toBytes("5768888"));
        scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("AH"));
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        //JOB定义
        Job job = new Job(config, "ExampleHbaseMapreduce");
        job.setJarByClass(ExampleHbaseToMysqlMapreduce.class);
        //设置map读取hbase方法
        TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,
                Text.class,Text.class, job);
        //reduce设置
        job.setReducerClass(MyReducer.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setNumReduceTasks(5);
        boolean b = job.waitForCompletion(true);
        if (!b) {
            throw new Exception("error with job!");
        }
    }
    public static class MyMapper extends TableMapper<text text> {
        public void map(ImmutableBytesWritable row, Result value,
                Context context) throws IOException, InterruptedException {
            context.write(
                    new Text(row.get()),
                    new Text(value.getValue(Bytes.toBytes("cf"),
                            Bytes.toBytes("AH"))));
        }
    }
    public static class MyReducer extends
            TableReducer<text text immutablebyteswritable> {
        private Connection conn = null;
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            String driver = "com.mysql.jdbc.Driver";
            String url = "jdbc:mysql://172.16.35.242/judgment?useUnicode=true&characterEncoding=gbk&zeroDateTimeBehavior=convertToNull";
            try {
                Class.forName(driver);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            try {
                conn = DriverManager.getConnection(url, "root", "root");
            } catch (SQLException e) {
                e.printStackTrace();
            }
            super.setup(context);
        }
        public void reduce(Text key, Iterable<text> values, Context context)
                throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for (Text text : values) {
                sb.append(text.toString());
            }
            try {
                Statement st = conn.createStatement();
                st.executeUpdate("insert into test_mapreduce (id,ah) values ("
                        + Integer.valueOf(key.toString()) + ",'"
                        + sb.toString() + "')");
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}
</text></text></text></code>
Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn