Home >Database >Mysql Tutorial >使用HBase EndPoint(coprocessor)进行计算

使用HBase EndPoint(coprocessor)进行计算

WBOY
WBOYOriginal
2016-06-07 16:34:111444browse

如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~ 常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步

如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~

常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步的过滤(对于rowcounter来说,就加了FirstKeyOnlyFilter),但是这么做来说还是会有很大的副作用,比如占用大量的网络带宽(当标级别到达千万级别,亿级别之后)尤为明显,RPC的量也是不容小觑的。

理想的方式应该是怎样?

拿row counter这个简单例子来说,我要统计总行数,如果每个region 告诉我他又多少行,然后把结果告诉我,我再将他们的结果汇总一下,不就行了么?
现在的问题是hbase没有提供这种接口,来统计每个region的行数,那是否我们可以自己来实现一个呢?
没错,正如本文标题所说,我们可以自己来实现一个Endpoint,然后让hbase加载起来,然后我们远程调用即可。

什么是Endpoint?

先弄清楚什么是hbase coprocessor

hbase有两种coprocessor,一种是Observer(观察者),类似于关系数据库的trigger(触发器),另外一种就是EndPoint,类似于关系数据库的存储过程。

观察者这里就多做介绍了,这里介绍Endpoint。

EndPoint是动态RPC插件的接口,它的实现代码被部署在服务器端(regionServer),从而能够通过HBase RPC调用。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个EndPoint,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。

怎么实现一个EndPoint

1. 定义一个新的protocol接口,必须继承CoprocessorProtocol.
2. 实现终端接口,继承抽象类BaseEndpointCoprocessor,改实现代码需要部署到
3. 在客户端,终端可以被两个新的HBase Client API调用 。单个region:HTableInterface.coprocessorProxy(Class protocol, byte[] row) 。rigons区域:HTableInterface.coprocessorExec(Class protocol, byte[] startKey, byte[] endKey, Batch.Call callable),这里的region是通过一个row来标示的,就是说,改row落到那个region,RPC就发给哪个region,对于start-end的,[start,end)范围内的region都会受到RPC调用。

如图71e2816c-c109-475a-9d64-bc6b74e61443

public interface CounterProtocol extends CoprocessorProtocol {
	public long count(byte[] start, byte[] end) throws IOException;
}
public class CounterEndPoint extends BaseEndpointCoprocessor implements CounterProtocol {
	@Override
	public long count(byte[] start, byte []end) throws IOException {
		// aggregate at each region
		Scan scan = new Scan();
		long numRow = 0;
		InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
				.getScanner(scan);
		try {
			List curVals = new ArrayList();
			boolean hasMore = false;
			do {
				curVals.clear();
				hasMore = scanner.next(curVals);
				if (Bytes.compareTo(curVals.get(0).getRow(), start)= 0) {
					break;
				}
				numRow++;
			} while (hasMore);
		} finally {
			scanner.close();
		}
		return numRow;
	}
}
public class CounterEndPointDemo {
	public static void main(String[] args) throws IOException, Throwable {
		final String startRow = args[0];
		final String endRow = args[1];
		@SuppressWarnings("resource")
		HTableInterface table = new HTable(HBaseConfiguration.create(), "tc");
		Map results;
		// scan: for all regions
		results = table.coprocessorExec(CounterProtocol.class, startRow.getBytes(),
				endRow.getBytes(), new Batch.Call() {
					public Long call(CounterProtocol instance) throws IOException {
						return instance.count(startRow.getBytes(), endRow.getBytes());
					}
				});
		long total = 0;
		for (Map.Entry e : results.entrySet()) {
			System.out.println(e.getValue());
			total += e.getValue();
		}
		System.out.println("total:" + total);
	}
}

整个程序的框架其实又是另外一个mapreduce,只是运行在region server上面,reduce运行在客户端,其中map计算量较大,reduce计算量很小!

另外需要提醒的是:
protocol的返回类型,可以是基本类型。
如果是一个自定义的类型需要实现org.apache.hadoop.io.Writable接口。
关于详细的支持类型,请参考代码hbase源码:org.apache.hadoop.hbase.io.HbaseObjectWritable

怎么部署?

1. 通过hbase-site.xml增加

  hbase.coprocessor.region.classes
  xxxx.CounterEndPoint 
  1. 如果要配置多个,就用逗号(,)分割。
  2. 包含此类的jar必须位于hbase的classpath
  3. 这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。

2. 通过shell方式
增加:

hbase(main):005:0> alter 't1', METHOD => 'table_att',
'coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.0730 seconds

coprocessor格式为:
[FilePath]|ClassName|Priority|arguments
arguments: k=v[,k=v]+

  1. 其中FilePath是hdfs路径,例如/tmp/zhenhe/cp/zhenhe-1.0.jar
  2. ClassNameEndPoint实现类的全名
  3. Priority为,整数,框架会根据这个数据决定多个cp的执行顺序
  4. Arguments,传给cp的参数
  5. 如果hbase的classpath包含改类,FilePath可以留空

卸载:

  1. 先describe “tableName‘,查看你要卸载的cp的编号
  2. 然后alter 't1', METHOD => 'table_att_unset', NAME=> 'coprocessor$3',coprocessor$3可变。

应用场景

这是一个最简单的例子,另外还有很多统计场景,可以用在这种方式实现,有如下好处:

  1. 节省网络带宽
  2. 减少RPC调用(scan的调用随着CacheSzie的变小而线性增加),减轻hbase压力
  3. 可以提高统计效率,那我之前写过的一个groupby类型的例子来说,大约可以提高50%以上的统计速度。

其他应用场景?

  1. 一个保存着用户信息的表,可以统计每个用户信息(counter job)
  2. 统计最大值,最小值,平均值,参考:https://issues.apache.org/jira/browse/HBASE-1512
  3. 批量删除记录,批量删除某个时间戳的记录

参考:

1. http://blogs.apache.org/hbase/entry/coprocessor_introduction
2. https://issues.apache.org/jira/browse/HBASE-1512

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:Linux下C++操作RedisNext article:MySQL行锁深入研究