首頁  >  文章  >  Java  >  java中zookeeper的使用詳解

java中zookeeper的使用詳解

黄舟
黄舟原創
2017-09-28 09:37:565570瀏覽

ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。以下透過本文給大家分享java 中zookeeper簡單使用,需要的朋友參考下吧

#一、zookeeper的基本原理

資料模型,如下:



ZooKeeper資料​​模型的結構與Unix檔案系統很類似,整體上可以看作是一棵樹,每個節點稱做一個ZNode。每個ZNode都可以透過其路徑唯一標識,例如上圖第三層的第一個ZNode,它的路徑是/app1/c1。在每個ZNode上可儲存少量資料(預設是1M, 可以透過配置修改,通常不建議在ZNode上儲存大量的資料),這個特性非常有用。另外,每個ZNode上也儲存了其Acl訊息,這裡需要注意,雖說ZNode的樹形結構跟Unix檔案系統很類似,但其Acl與Unix檔案系統是完全不同的,每個ZNode的Acl的獨立的,子結點不會繼承父結點的。

ZooKeeper特性:

#1、讀取、寫入(更新)模式

在ZooKeeper集群中,讀取可以從任何一個ZooKeeperServer讀,這一點是保證ZooKeeper比較好的讀取性能的關鍵;寫的請求會先Forwarder到Leader,然後由Leader來通過ZooKeeper中的原子廣播協議,將請求廣播給所有的Follower,Leader收到一半以上的寫成功的Ack後,就認為該寫成功了,就會將該寫進行持久化,並告訴客戶端寫成功了。

2、WAL和Snapshot

和大多數分散式系統一樣,ZooKeeper也有WAL(Write-Ahead-Log),對於每一個更新操作,ZooKeeper都會先寫WAL,然後再對記憶體中的資料做更新,然後再向Client通知更新結果。另外,ZooKeeper也會定期將記憶體中的目錄樹進行Snapshot,落地到磁碟上,這個跟HDFS中的FSImage是比較類似的。這麼做的主要目的,一當然是資料的持久化,二是加快重啟之後的恢復速度,如果全部透過ReplayWAL的形式恢復的話,會比較慢。

3、FIFO

對於每一個ZooKeeper客戶端而言,所有的操作都是遵循FIFO順序的,這一特性是由下面兩個基本特性來確保的:一是ZooKeeperClient與Server之間的網路通訊是基於TCP,TCP保證了Client/Server之間傳輸包的順序;二是ZooKeeperServer執行客戶端請求也是嚴格按照FIFO順序的。

4、Linearizability

在ZooKeeper中,所有的更新操作都有嚴格的偏序關係,更新操作都是串行執行的,這一點是確保ZooKeeper功能正確性的關鍵。

二、zookeeper的常用指令

我們可以執行zookeeper-client或執行/opt/cloudera/parcels/CDH-5.0.0 -1.cdh5.0.0.p0.47/lib/zookeeper/bin/zkCli.sh-server localhost,進入zookeeper命令列,如下:



#然後,執行ls /可以看到:




然後,我們可以執行create /qyktest'qyktest'建立一個節點,如下:



然後,我們執行get /qyktest取得節點值,如下:




##然後,我們可以執行set /qyktest'111'修改節點的值,如下:




最後,我們執行delete /qyktest便可刪除此節點。


另外,我們也可以在qyktest此節點下繼續建立子節點。


好了,幾個基本指令就講到這人啦,其它的指令還有很多,大家可以去查閱下資料。

三、zookeeper的javaapi操作

#關於Javaapi操作zookeeper比較簡單,筆者直接貼出程式碼,如下:


packageorg.zookeeper.demo;
importjava.io.IOException;
importjava.util.concurrent.CountDownLatch;
importorg.apache.zookeeper.CreateMode;
importorg.apache.zookeeper.KeeperException;
importorg.apache.zookeeper.WatchedEvent;
importorg.apache.zookeeper.Watcher;
importorg.apache.zookeeper.Watcher.Event.KeeperState;
importorg.apache.zookeeper.ZooDefs.Ids;
importorg.apache.zookeeper.ZooKeeper;
publicclassZookeeperClientimplementsWatcher{
//连接超时时间,10s
privatestaticfinalintSESSION_TIMEOUT= 10000;
//连接的zookeeperserver
privatestaticfinalStringCONNECTION_STRING = "172.31.25.8:2181";
privatestaticfinalStringZK_PATH = "/qyktest";
privateZooKeeperzk = null;
privateCountDownLatchconnectedSemaphore = newCountDownLatch(1);
publicvoidcreateConnection(StringconnectString, intsessionTimeout){
this.releaseConnection();
try{
zk= newZooKeeper(connectString,sessionTimeout, this);
connectedSemaphore.await();
}catch(InterruptedExceptione) {
System.out.println("连接创建失败,发生InterruptedException");
e.printStackTrace();
}catch(IOExceptione) {
System.out.println("连接创建失败,发生IOException");
e.printStackTrace();
}
}
publicvoidreleaseConnection(){
if(this.zk!= null){
try{
this.zk.close();
}catch(InterruptedExceptione) {
e.printStackTrace();
}
}
}
publicbooleancreatePath(Stringpath, String data) {
try{
Stringresult = this.zk.create(path,data.getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println("节点创建成功,Path: "+result + ", content: "+data);
}catch(KeeperExceptione) {
System.out.println("节点创建失败,发生KeeperException");
e.printStackTrace();
}catch(InterruptedExceptione) {
System.out.println("节点创建失败,发生InterruptedException");
e.printStackTrace();
}
returntrue;
}
publicStringreadData(Stringpath) {
try{
System.out.println("获取数据成功,path:"+path);
returnnewString(this.zk.getData(path,false,null));
}catch(KeeperExceptione) {
System.out.println("读取数据失败,发生KeeperException,path:"+path);
e.printStackTrace();
return"";
}catch(InterruptedExceptione) {
System.out.println("读取数据失败,发生InterruptedException,path: "+path);
e.printStackTrace();
return"";
}
}
publicbooleanwriteData(Stringpath, String data) {
try{
System.out.println("更新数据成功,path:"+path + ", stat: "+this.zk.setData(path,data.getBytes(), -1));
}catch(KeeperExceptione) {
System.out.println("更新数据失败,发生KeeperException,path:"+path);
e.printStackTrace();
}catch(InterruptedExceptione) {
System.out.println("更新数据失败,发生InterruptedException,path: "+path);
e.printStackTrace();
}
returnfalse;
}
publicvoiddeleteNode(Stringpath) {
try{
this.zk.delete(path,-1);
System.out.println("删除节点成功,path:"+path);
}catch(KeeperExceptione) {
System.out.println("删除节点失败,发生KeeperException,path:"+path);
e.printStackTrace();
}catch(InterruptedExceptione) {
System.out.println("删除节点失败,发生InterruptedException,path: "+path);
e.printStackTrace();
}
}
publicstaticvoidmain(String[]args) {
ZookeeperClientsample = newZookeeperClient();
//获取连接
sample.createConnection(CONNECTION_STRING,SESSION_TIMEOUT);
//读数据
Stringqyk = sample.readData("/qyktest");
System.out.println("qyk:"+qyk);
Stringurl = sample.readData("/qyk/db/url");
System.out.println("url"+url);
Stringdriver = sample.readData("/qyk/db/driver");
System.out.println("driver"+driver);
StringuserName = sample.readData("/qyk/db/userName");
System.out.println("userName"+userName);
Stringpassword = sample.readData("/qyk/db/password");
System.out.println("password"+password);
//创建节点
sample.createPath(ZK_PATH,"我是节点初始内容");
System.out.println("数据内容:"+sample.readData(ZK_PATH) + "\n");
//更新节点
sample.writeData(ZK_PATH,"更新后的数据");
System.out.println("数据内容:"+sample.readData(ZK_PATH) + "\n");
//删除节点
sample.deleteNode(ZK_PATH);
//释放连接
sample.releaseConnection();
}
@Override
publicvoidprocess(WatchedEventevent) {
System.out.println("收到事件通知:"+event.getState() + "\n");
if(KeeperState.SyncConnected== event.getState()) {
connectedSemaphore.countDown();
}
}
}

然後,執行可以看到,控制台輸出如下:



#所以,像一些公用的配置,我們可以存到zookeeper裡面,之後其它的服務就可以使用了

#總結##

以上是java中zookeeper的使用詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn