如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~ 常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步
如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~
常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步的过滤(对于rowcounter来说,就加了FirstKeyOnlyFilter),但是这么做来说还是会有很大的副作用,比如占用大量的网络带宽(当标级别到达千万级别,亿级别之后)尤为明显,RPC的量也是不容小觑的。
理想的方式应该是怎样?
拿row counter这个简单例子来说,我要统计总行数,如果每个region 告诉我他又多少行,然后把结果告诉我,我再将他们的结果汇总一下,不就行了么?
现在的问题是hbase没有提供这种接口,来统计每个region的行数,那是否我们可以自己来实现一个呢?
没错,正如本文标题所说,我们可以自己来实现一个Endpoint,然后让hbase加载起来,然后我们远程调用即可。
什么是Endpoint?
先弄清楚什么是hbase coprocessor
hbase有两种coprocessor,一种是Observer(观察者),类似于关系数据库的trigger(触发器),另外一种就是EndPoint,类似于关系数据库的存储过程。
观察者这里就多做介绍了,这里介绍Endpoint。
EndPoint是动态RPC插件的接口,它的实现代码被部署在服务器端(regionServer),从而能够通过HBase RPC调用。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个EndPoint,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。
怎么实现一个EndPoint
1. 定义一个新的protocol接口,必须继承CoprocessorProtocol.
2. 实现终端接口,继承抽象类BaseEndpointCoprocessor,改实现代码需要部署到
3. 在客户端,终端可以被两个新的HBase Client API调用 。单个region:HTableInterface.coprocessorProxy(Class
如图
public interface CounterProtocol extends CoprocessorProtocol { public long count(byte[] start, byte[] end) throws IOException; }
public class CounterEndPoint extends BaseEndpointCoprocessor implements CounterProtocol { @Override public long count(byte[] start, byte []end) throws IOException { // aggregate at each region Scan scan = new Scan(); long numRow = 0; InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() .getScanner(scan); try { List curVals = new ArrayList(); boolean hasMore = false; do { curVals.clear(); hasMore = scanner.next(curVals); if (Bytes.compareTo(curVals.get(0).getRow(), start)= 0) { break; } numRow++; } while (hasMore); } finally { scanner.close(); } return numRow; } }
public class CounterEndPointDemo { public static void main(String[] args) throws IOException, Throwable { final String startRow = args[0]; final String endRow = args[1]; @SuppressWarnings("resource") HTableInterface table = new HTable(HBaseConfiguration.create(), "tc"); Map results; // scan: for all regions results = table.coprocessorExec(CounterProtocol.class, startRow.getBytes(), endRow.getBytes(), new Batch.Call() { public Long call(CounterProtocol instance) throws IOException { return instance.count(startRow.getBytes(), endRow.getBytes()); } }); long total = 0; for (Map.Entry e : results.entrySet()) { System.out.println(e.getValue()); total += e.getValue(); } System.out.println("total:" + total); } }
整个程序的框架其实又是另外一个mapreduce,只是运行在region server上面,reduce运行在客户端,其中map计算量较大,reduce计算量很小!
另外需要提醒的是:
protocol的返回类型,可以是基本类型。
如果是一个自定义的类型需要实现org.apache.hadoop.io.Writable接口。
关于详细的支持类型,请参考代码hbase源码:org.apache.hadoop.hbase.io.HbaseObjectWritable
怎么部署?
1. 通过hbase-site.xml增加
hbase.coprocessor.region.classes xxxx.CounterEndPoint
- 如果要配置多个,就用逗号(,)分割。
- 包含此类的jar必须位于hbase的classpath
- 这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。
2. 通过shell方式
增加:
hbase(main):005:0> alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2' Updating all regions with the new schema... 1/1 regions updated. Done. 0 row(s) in 1.0730 seconds
coprocessor格式为:
[FilePath]|ClassName|Priority|arguments
arguments: k=v[,k=v]+
- 其中FilePath是hdfs路径,例如/tmp/zhenhe/cp/zhenhe-1.0.jar
- ClassNameEndPoint实现类的全名
- Priority为,整数,框架会根据这个数据决定多个cp的执行顺序
- Arguments,传给cp的参数
- 如果hbase的classpath包含改类,FilePath可以留空
卸载:
- 先describe “tableName‘,查看你要卸载的cp的编号
- 然后alter 't1', METHOD => 'table_att_unset', NAME=> 'coprocessor$3',coprocessor$3可变。
应用场景
这是一个最简单的例子,另外还有很多统计场景,可以用在这种方式实现,有如下好处:
- 节省网络带宽
- 减少RPC调用(scan的调用随着CacheSzie的变小而线性增加),减轻hbase压力
- 可以提高统计效率,那我之前写过的一个groupby类型的例子来说,大约可以提高50%以上的统计速度。
其他应用场景?
- 一个保存着用户信息的表,可以统计每个用户信息(counter job)
- 统计最大值,最小值,平均值,参考:https://issues.apache.org/jira/browse/HBASE-1512
- 批量删除记录,批量删除某个时间戳的记录
参考:
1. http://blogs.apache.org/hbase/entry/coprocessor_introduction
2. https://issues.apache.org/jira/browse/HBASE-1512
原文地址:使用HBase EndPoint(coprocessor)进行计算, 感谢原作者分享。

存储过程是MySQL中的预编译SQL语句集合,用于提高性能和简化复杂操作。1.提高性能:首次编译后,后续调用无需重新编译。2.提高安全性:通过权限控制限制数据表访问。3.简化复杂操作:将多条SQL语句组合,简化应用层逻辑。

MySQL查询缓存的工作原理是通过存储SELECT查询的结果,当相同查询再次执行时,直接返回缓存结果。1)查询缓存提高数据库读取性能,通过哈希值查找缓存结果。2)配置简单,在MySQL配置文件中设置query_cache_type和query_cache_size。3)使用SQL_NO_CACHE关键字可以禁用特定查询的缓存。4)在高频更新环境中,查询缓存可能导致性能瓶颈,需通过监控和调整参数优化使用。

MySQL被广泛应用于各种项目中的原因包括:1.高性能与可扩展性,支持多种存储引擎;2.易于使用和维护,配置简单且工具丰富;3.丰富的生态系统,吸引大量社区和第三方工具支持;4.跨平台支持,适用于多种操作系统。

MySQL数据库升级的步骤包括:1.备份数据库,2.停止当前MySQL服务,3.安装新版本MySQL,4.启动新版本MySQL服务,5.恢复数据库。升级过程需注意兼容性问题,并可使用高级工具如PerconaToolkit进行测试和优化。

MySQL备份策略包括逻辑备份、物理备份、增量备份、基于复制的备份和云备份。1.逻辑备份使用mysqldump导出数据库结构和数据,适合小型数据库和版本迁移。2.物理备份通过复制数据文件,速度快且全面,但需数据库一致性。3.增量备份利用二进制日志记录变化,适用于大型数据库。4.基于复制的备份通过从服务器备份,减少对生产系统的影响。5.云备份如AmazonRDS提供自动化解决方案,但成本和控制需考虑。选择策略时应考虑数据库大小、停机容忍度、恢复时间和恢复点目标。

MySQLclusteringenhancesdatabaserobustnessandscalabilitybydistributingdataacrossmultiplenodes.ItusestheNDBenginefordatareplicationandfaulttolerance,ensuringhighavailability.Setupinvolvesconfiguringmanagement,data,andSQLnodes,withcarefulmonitoringandpe

在MySQL中优化数据库模式设计可通过以下步骤提升性能:1.索引优化:在常用查询列上创建索引,平衡查询和插入更新的开销。2.表结构优化:通过规范化或反规范化减少数据冗余,提高访问效率。3.数据类型选择:使用合适的数据类型,如INT替代VARCHAR,减少存储空间。4.分区和分表:对于大数据量,使用分区和分表分散数据,提升查询和维护效率。

tooptimizemysqlperformance,lofterTheSeSteps:1)inasemproperIndexingTospeedUpqueries,2)使用ExplaintplaintoAnalyzeandoptimizequeryPerformance,3)ActiveServerConfigurationStersLikeTlikeTlikeTlikeIkeLikeIkeIkeLikeIkeLikeIkeLikeIkeLikeNodb_buffer_pool_sizizeandmax_connections,4)


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

螳螂BT
Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

SublimeText3 Linux新版
SublimeText3 Linux最新版

VSCode Windows 64位 下载
微软推出的免费、功能强大的一款IDE编辑器

SublimeText3汉化版
中文版,非常好用

MinGW - 适用于 Windows 的极简 GNU
这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。