Home >Database >Mysql Tutorial >基于C#+Thrift操作HBase实践


2016-06-07 16:32:541260browse

在基于HBase数据库的开发中,对应Java语言来说,可以直接使用HBase的原生API来操作HBase表数据,当然你要是不嫌麻烦可以使用Thrift客户端Java API,这里有我曾经使用过的 HBase Thrift客户端Java API实践,可以参考。对于具有其他编程语言背景的开发人员,为

在基于HBase数据库的开发中,对应Java语言来说,可以直接使用HBase的原生API来操作HBase表数据,当然你要是不嫌麻烦可以使用Thrift客户端Java API,这里有我曾经使用过的 HBase Thrift客户端Java API实践,可以参考。对于具有其他编程语言背景的开发人员,为了获取HBase带来的好处,那么就可以选择使用HBase Thrift客户端对应编程语言的API,来实现与HBase的交互。
然后,执行如下命令,生成C#编程语言的HBase Thrift客户端API:

[hadoop@master hbase]$ thrift --gen csharp Hbase.thrift
[hadoop@master hbase]$ ls

这里,我们基于C#语言,使用HBase 的Thrift 客户端API访问HBase表。事实上,如果使用Java来实现对HBase表的操作,最好是使用HBase的原生API,无论从性能还是便利性方面,都会提供更好的体验。使用Thrift API访问,实际也是在HBase API之上进行了一层封装,可能初次使用Thrift API感觉很别扭,有时候还要参考Thrift服务端的实现代码。

    1. 下载Thrift软件包,解压缩后,拷贝thrift-0.9.0/lib/java/src下面的代码到工作区(开发工具中)
    2. 将上面生成的gen-csharp目录中代码拷贝到工作区
    3. 保证HBase集群正常运行,接着启动HBase的Thrift服务,执行如下命令:
bin/hbase thrift -b master -p 9090 start

上面,HBase的Thrift服务端口为9090,下面通过Thrift API访问的时候,需要用到,而不是HBase的服务端口(默认60000)。
首先,我们通过HBase Shell创建一个表:

create 'test_info', 'info'

这里,我们实际上是对HBase Thrift客户端Java API实践中的Java代码进行了翻译,改写成C#语言的相关操作。我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为AbstractHBaseThriftService,对应的命名空间为HbaseThrift.HBase.Thrift,该类实现代码如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Thrift.Transport;
using Thrift.Protocol;
namespace HbaseThrift.HBase.Thrift
    public abstract class AbstractHBaseThriftService
        protected static readonly string CHARSET = "UTF-8";
	    private string host = "localhost";
	    private int port = 9090;
	    private readonly TTransport transport;
	    protected readonly Hbase.Client client;
        public AbstractHBaseThriftService() : this("localhost", 9090)
        public AbstractHBaseThriftService(string host, int port)
            this.host = host;
            this.port = port;
            transport = new TSocket(host, port);
            TProtocol protocol = new TBinaryProtocol(transport, true, true);
            client = new Hbase.Client(protocol);
        public void Open() {
            if (transport != null)
        public void Close()
            if (transport != null)
        public abstract List GetTables();
	    public abstract void Update(string table, string rowKey, bool writeToWal,
			string fieldName, string fieldValue, Dictionary attributes);
        public abstract void Update(string table, string rowKey, bool writeToWal,
			Dictionary fieldNameValues, Dictionary attributes);
	    public abstract void DeleteCell(string table, string rowKey, bool writeToWal,
			    string column, Dictionary attributes);
	    public abstract void DeleteCells(string table, string rowKey, bool writeToWal,
			    List columns, Dictionary attributes);
	     public abstract void DeleteRow(string table, string rowKey,
		            Dictionary attributes);
	    public abstract int ScannerOpen(string table, string startRow, List columns,
	            Dictionary attributes);
	    public abstract int ScannerOpen(string table, string startRow, string stopRow, List columns,
	            Dictionary attributes);
	    public abstract int ScannerOpenWithPrefix(string table, string startAndPrefix,
                List columns, Dictionary attributes);
	    public abstract int ScannerOpenTs(string table, string startRow,
	            List columns, long timestamp, Dictionary attributes);
	    public abstract int ScannerOpenTs(string table, string startRow, string stopRow,
	            List columns, long timestamp, Dictionary attributes);
	    public abstract List ScannerGetList(int id, int nbRows);
	    public abstract List ScannerGet(int id);
	    public abstract List GetRow(string table, string row,
		            Dictionary attributes);
	    public abstract List GetRows(string table,
                 List rows, Dictionary attributes);
	    public abstract List GetRowsWithColumns(string table,
                 List rows, List columns, Dictionary attributes);
	    public abstract void ScannerClose(int id);
	     * Iterate result rows(just for test purpose)
	     * @param result
	    public abstract void IterateResults(TRowResult result);


  • 建立到Thrift服务的连接:Open()
  • 获取到HBase中的所有表名:GetTables()
  • 更新HBase表记录:Update()
  • 删除HBase表中一行的记录的数据(cell):DeleteCell()和DeleCells()
  • 删除HBase表中一行记录:deleteRow()
  • 打开一个Scanner,返回id:ScannerOpen()、ScannerOpenWithPrefix()和ScannerOpenTs();然后用返回的id迭代记录:ScannerGetList()和ScannerGet()
  • 获取一行记录结果:GetRow()、GetRows()和GetRowsWithColumns()
  • 关闭一个Scanner:ScannerClose()
  • 迭代结果,用于调试:IterateResults()


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace HbaseThrift.HBase.Thrift
    class HBaseThriftClient : AbstractHBaseThriftService
        public HBaseThriftClient() : this("localhost", 9090)
        public HBaseThriftClient(string host, int port) : base(host, port)
        public override List GetTables()
            List tables = client.getTableNames();
            List list = new List();
            foreach(byte[] table in tables)
            return list;
        public override void Update(string table, string rowKey, bool writeToWal, string fieldName, string fieldValue, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            List mutations = new List();
            Mutation mutation = new Mutation();
            mutation.IsDelete = false;
            mutation.WriteToWAL = writeToWal;
            mutation.Column = Encode(fieldName);
            mutation.Value = Encode(fieldValue);
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        public override void Update(string table, string rowKey, bool writeToWal, Dictionary fieldNameValues, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            List mutations = new List();
            foreach (KeyValuePair pair in fieldNameValues)
                Mutation mutation = new Mutation();
                mutation.IsDelete = false;
                mutation.WriteToWAL = writeToWal;
                mutation.Column = Encode(pair.Key);
                mutation.Value = Encode(pair.Value);
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        public override void DeleteCell(string table, string rowKey, bool writeToWal, string column, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            List mutations = new List();
            Mutation mutation = new Mutation();
            mutation.IsDelete = true;
            mutation.WriteToWAL = writeToWal;
            mutation.Column = Encode(column);
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        public override void DeleteCells(string table, string rowKey, bool writeToWal, List columns, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            List mutations = new List();
            foreach (string column in columns)
                Mutation mutation = new Mutation();
                mutation.IsDelete = true;
                mutation.WriteToWAL = writeToWal;
                mutation.Column = Encode(column);
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        public override void DeleteRow(string table, string rowKey, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            client.deleteAllRow(tableName, row, encodedAttributes);
        public override int ScannerOpen(string table, string startRow, List columns, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            List encodedColumns = EncodeStringList(columns);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpen(tableName, start, encodedColumns, encodedAttributes);
        public override int ScannerOpen(string table, string startRow, string stopRow, List columns, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            byte[] stop = Encode(stopRow);
            List encodedColumns = EncodeStringList(columns);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithStop(tableName, start, stop, encodedColumns, encodedAttributes);
        public override int ScannerOpenWithPrefix(string table, string startAndPrefix, List columns, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] prefix = Encode(startAndPrefix);
            List encodedColumns = EncodeStringList(columns);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithPrefix(tableName, prefix, encodedColumns, encodedAttributes);
        public override int ScannerOpenTs(string table, string startRow, List columns, long timestamp, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            List encodedColumns = EncodeStringList(columns);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenTs(tableName, start, encodedColumns, timestamp, encodedAttributes);
        public override int ScannerOpenTs(string table, string startRow, string stopRow, List columns, long timestamp, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            byte[] stop = Encode(stopRow);
            List encodedColumns = EncodeStringList(columns);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithStopTs(tableName, start, stop, encodedColumns, timestamp, encodedAttributes);
        public override List ScannerGetList(int id, int nbRows)
            return client.scannerGetList(id, nbRows);
        public override List ScannerGet(int id)
            return client.scannerGet(id);
        public override List GetRow(string table, string row, Dictionary attributes)
            byte[] tableName = Encode(table);
            byte[] startRow = Encode(row);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            return client.getRow(tableName, startRow, encodedAttributes);
        public override List GetRows(string table, List rows, Dictionary attributes)
            byte[] tableName = Encode(table);
            List encodedRows = EncodeStringList(rows);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            return client.getRows(tableName, encodedRows, encodedAttributes);
        public override List GetRowsWithColumns(string table, List rows, List columns, Dictionary attributes)
            byte[] tableName = Encode(table);
            List encodedRows = EncodeStringList(rows);
            List encodedColumns = EncodeStringList(columns);
            Dictionary encodedAttributes = EncodeAttributes(attributes);
            return client.getRowsWithColumns(tableName, encodedRows, encodedColumns, encodedAttributes);
        public override void ScannerClose(int id)
        public override void IterateResults(TRowResult result)
            foreach (KeyValuePair pair in result.Columns)
                Console.WriteLine("\tCol=" + Decode(pair.Key) + ", Value=" + Decode(pair.Value.Value));
        private String Decode(byte[] bs)
            return UTF8Encoding.Default.GetString(bs);
        private byte[] Encode(String str)
            return UTF8Encoding.Default.GetBytes(str);
        private Dictionary EncodeAttributes(Dictionary attributes)
            Dictionary encodedAttributes = new Dictionary();
            foreach (KeyValuePair pair in attributes)
                encodedAttributes.Add(Encode(pair.Key), Encode(pair.Value));
            return encodedAttributes;
        private List EncodeStringList(List strings) 
            List list = new List();
            if (strings != null)
                foreach (String str in strings)
            return list;


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace HbaseThrift.HBase.Thrift
    class Test
        private readonly AbstractHBaseThriftService client;
        public Test(String host, int port)
            client = new HBaseThriftClient(host, port);
        public Test() : this("master", 9090)
        static String RandomlyBirthday()
            Random r = new Random();
            int year = 1900 + r.Next(100);
            int month = 1 + r.Next(12);
            int date = 1 + r.Next(30);
            return year + "-" + month.ToString().PadLeft(2, '0') + "-" + date.ToString().PadLeft(2, '0');
        static String RandomlyGender()
            Random r = new Random();
            int flag = r.Next(2);
            return flag == 0 ? "M" : "F";
        static String RandomlyUserType()
            Random r = new Random();
            int flag = 1 + r.Next(10);
            return flag.ToString();
        public void Close()
        public void CaseForUpdate() {
		    bool writeToWal = false;
            Dictionary attributes = new Dictionary(0);
		    string table = SetTable();
		    // put kv pairs
		    for (int i = 0; i 
  • http://wiki.apache.org/hadoop/Hbase/ThriftApi
  • http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift?view=markup
  • http://www.cnblogs.com/panfeng412/archive/2012/11/11/hbase-thrift-api-common-issues-summary.html
  • https://github.com/simplegeo/hadoop-hbase/blob/master/src/examples/thrift/DemoClient.java
  • http://thrift.apache.org/tutorial/java/
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn