HBase Java编程示例HelloWorld.zip package elementary; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ThreadUtil; public class HelloWorld { private static Configuration conf = null; private static Connection conn = null; private static Admin admin = null; public static AtomicInteger count = new AtomicInteger(); /** * 初始化配置 */ static { conf = HBaseConfiguration.create(); //如果沒有配置文件,一定要記得手動宣告 conf.set("hbase.zookeeper.quorum", "10.148.137.143"); conf.set("hbase.zookeeper.property.clientPort", "2181"); } static { try { conn = ConnectionFactory.createConnection(); admin = conn.getAdmin(); } catch (IOException e) { e.printStackTrace(); } } static public class MyThread extends Thread { int _start; String _tablename; Connection conn; //BufferedMutator table; Table table; public MyThread(int start, String tablename) { _start = start; _tablename = tablename; } public void run() { String tablename = _tablename; Thread current = Thread.currentThread(); long thread_id = current.getId(); System.out.printf("thread[%d] run\n", thread_id); try { conn = ConnectionFactory.createConnection(); //BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename)); //params.writeBufferSize(1024 * 4); //table = conn.getBufferedMutator(params); table = conn.getTable(TableName.valueOf(tablename)); for (int j=_start; j for (int i=0; i // zkb_0_0 String zkb = "zkb_" + String.valueOf(_start) + "_" + String.valueOf(i); Put put = new Put(Bytes.toBytes(zkb)); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field1"),Bytes.toBytes(String.valueOf(i+0))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field2"),Bytes.toBytes(String.valueOf(i+1))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field3"),Bytes.toBytes(String.valueOf(i+2))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field4"),Bytes.toBytes(String.valueOf(i+3))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field5"),Bytes.toBytes(String.valueOf(i+4))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field6"),Bytes.toBytes(String.valueOf(i+5))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field7"),Bytes.toBytes(String.valueOf(i+6))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field8"),Bytes.toBytes(String.valueOf(i+7))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field9"),Bytes.toBytes(String.valueOf(i+8))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field10"),Bytes.toBytes(String.valueOf(i+9))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field11"),Bytes.toBytes(String.valueOf(i+10))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field12"),Bytes.toBytes(String.valueOf(i+11))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field13"),Bytes.toBytes(String.valueOf(i+12))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field14"),Bytes.toBytes(String.valueOf(i+13))); put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field15"),Bytes.toBytes(String.valueOf(i+14))); //table.mutate(put); table.put(put); int m = HelloWorld.count.incrementAndGet(); if (m % 10000 == 0) { Date dt = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss aa"); String now = sdf.format(dt); System.out.printf("[%s] thread[%d] m=%d, j=%d, i=%d\n", now, thread_id, m, j, i); } } } System.out.printf("thread[%d] over\n", thread_id); } catch (Exception e) { e.printStackTrace(); } } } /** * 建立表格 * @param tablename * @param cfs */ public static void createTable(String tablename, String[] cfs){ try { if (admin.tableExists(TableName.valueOf(tablename))) { System.out.println("table already exists!"); } else { HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename)); for (int i = 0; i HColumnDescriptor desc = new HColumnDescriptor(cfs[i]); desc.setMaxVersions(3650); tableDesc.addFamily(desc); } byte[][] splitKeys = new byte[][] { Bytes.toBytes("zkb_0_0"), Bytes.toBytes("zkb_10_0"), Bytes.toBytes("zkb_20_0"), Bytes.toBytes("zkb_30_0"), Bytes.toBytes("zkb_40_0"), Bytes.toBytes("zkb_50_0"), Bytes.toBytes("zkb_60_0"), Bytes.toBytes("zkb_70_0"), Bytes.toBytes("zkb_80_0"), Bytes.toBytes("zkb_90_0"), Bytes.toBytes("zkb_100_0") }; admin.createTable(tableDesc, splitKeys); admin.close(); System.out.println("create table " + tablename + " ok."); } } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 刪除表格 * @param tablename */ public static void deleteTable(String tablename){ try { //Connection conn = ConnectionFactory.createConnection(); //Admin admin = conn.getAdmin(); admin.disableTable(TableName.valueOf(tablename)); admin.deleteTable(TableName.valueOf(tablename)); System.out.println("delete table " + tablename + " ok."); } catch (IOException e) { e.printStackTrace(); } } /** * 刪除一筆資料 * @param tableName * @param rowKey */ public static void delRecord (String tableName, String rowKey){ try { Table table = conn.getTable(TableName.valueOf(tableName)); List list = new ArrayList(); Delete del = new Delete(rowKey.getBytes()); list.add(del); table.delete(list); System.out.println("del recored " + rowKey + " ok."); } catch (IOException e) { e.printStackTrace(); } } /** * 取得一筆資料 * @param tableName * @param rowKey */ public static void getOneRecord (String tableName, String rowKey){ try { Table table = conn.getTable(TableName.valueOf(tableName)); Get get = new Get(rowKey.getBytes()); Result rs = table.get(get); List list = rs.listCells(); for(Cell cell:list){ System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " ); System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" ); System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " ); System.out.print(cell.getTimestamp() + " " ); System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " ); System.out.println(""); } } catch (IOException e) { e.printStackTrace(); } } /** * 取得所有資料 * @param tableName */ public static void getAllRecord (String tableName) { try{ //Connection conn = ConnectionFactory.createConnection(); Table table = conn.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); ResultScanner resultscanner = table.getScanner(scan); for(Result rs:resultscanner){ List list = rs.listCells(); for(Cell cell:list){ System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " ); System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" ); System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " ); System.out.print(cell.getTimestamp() + " " ); System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " ); System.out.println(""); } } } catch (IOException e){ e.printStackTrace(); } } /** * 取得Family清單 * @param tableName * @return */ public static ArrayList getAllFamilyName(String tableName) { ArrayList familyname_list = new ArrayList(); try{ //Connection conn = ConnectionFactory.createConnection(); Table table = conn.getTable(TableName.valueOf(tableName)); HTableDescriptor htabledescriptor = table.getTableDescriptor(); HColumnDescriptor[] hdlist = htabledescriptor.getColumnFamilies(); for(int i=0;i HColumnDescriptor hd = hdlist[i]; familyname_list.add(hd.getNameAsString()); } } catch (IOException e){ e.printStackTrace(); } return familyname_list; } // java -cp HelloWorld.jar:`ls lib/*.jar|awk '{printf("%s:", $0)}'` elementary.HelloWorld 5 public static void main(String[] args) { System.out.println("HelloWorldX"); if (args.length > 0) System.out.println(args[0]); int start = 0; if (args.length > 1) start = Integer.valueOf(args[1]); if (start start = 0; int num_threads = 16; if (args.length > 2) num_threads = Integer.valueOf(args[2]); try { String tablename = "scores"; String[] familys = {"grade", "course"}; HelloWorld.createTable(tablename, familys); //ExecutorService thread_pool = Executors.newSingleThreadExecutor(); ExecutorService thread_pool = Executors.newFixedThreadPool(num_threads); Thread[] pool = new HelloWorld.MyThread[80]; for (int i=0; i pool[i] = new HelloWorld.MyThread(i, tablename); thread_pool.execute(pool[i]); } thread_pool.shutdown(); System.out.println("over"); } catch (Exception e) { e.printStackTrace(); } } }