Zookeeper 分布式協調服務介紹
分布式系統
分布式系統的簡單定義:分布式系統是一個硬件或軟件組件分布在不同的網絡計算機上,彼此之間僅僅通過消息傳遞進行通信和協調的系統。
分布式系統的特征:
分布性:系統中的計算機在空間上隨意分布和隨時變動
分布式系統的簡單定義:分布式系統是一個硬件或軟件組件分布在不同的網絡計算機上,彼此之間僅僅通過消息傳遞進行通信和協調的系統。
分布式系統的特征:
分布性:系統中的計算機在空間上隨意分布和隨時變動
對等性:系統中的計算機是對等的,沒有主從之分
并發性:并發性操作是非常常見的行為
缺乏全局時鐘:系統中的計算機具有明顯的分布性,且缺乏一個全局的時鐘序列控制,所以很難比較兩個事件的先后
故障總是會發生:任何在設計階段考慮到的異常情況,一定會在系統實際運行中發生,并且還會遇到很多在設計時未考慮到的異常故障
隨著分布式架構的出現,越來越多的分布式應用會面臨數據一致性問題。
ZooKeeper是一個典型的分布式數據一致性解決方案,分布式應用程序可以基于它實現諸如數據發布/訂閱、負載均衡、命名服務、分布式協調/通知、集群管理、master選舉、分布式鎖和分布式隊列等功能。
Zookeeper致力于提供一個高性能、高可用,具有嚴格的順序訪問控制能力的分布式協調服務;其主要的設計目標是簡單的數據模型、可以構建集群、順序訪問、高性能。Zookeeper已經成為很多大型分布式項目譬如Hadoop、HBase、Storm、Solr等中的核心組件,用于分布式協調。
Zookeeper可以保證如下分布式一致性特性:
順序一致性:從同一個客戶端發起的事務請求,最終將會嚴格地按照其發起順序被應用到Zookeeper中去
原子性:所有事務請求的處理結果在整個集群中所有的機器上的應用情況是一致的
單一視圖:無論客戶端連接的是哪個Zookeeper服務器,其看到的服務器數據模型都是一致的
可靠性:一旦服務端成功地應用了一個事務,并完成對客戶端的響應,那么該事務所引起的服務端狀態變更將會被一直保留下來,除非有另一個事務又對其進行了變更
實時性:在一定的時間內,客戶端最終一定能夠從服務端上讀取到最新的數據狀態
Zookeeper 基本概念
集群角色
Leader:客戶端提供讀和寫服務
Follower:提供讀服務,所有寫服務都需要轉交給Leader角色,參與選舉
Observer:提供讀服務,不參與選舉過程,一般是為了增強Zookeeper集群的讀請求并發能力
會話 (session)
Zk的客戶端與zk的服務端之間的連接
通過心跳檢測保持客戶端連接的存活
接收來自服務端的watch事件通知
可以設置超時時間
ZNode 是Zookeeper中數據的最小單元,每個ZNode上可以保存數據(byte[]類型),同時可以掛在子節點,因此構成了一個層次化的命名空間,我們稱之為樹
節點是有生命周期的,生命周期由節點類型決定:
持久節點(PERSISTENT):節點創建后就一直存在于Zookeeper服務器上,直到有刪除操作主動將其刪除
持久順序節點(PERSISTENT_SEQUENTIAL):基本特性與持久節點一致,額外的特性在于Zookeeper會記錄其子節點創建的先后順序
臨時節點(EPHEMERAL):聲明周期與客戶端的會話綁定,客戶端會話失效時節點將被自動清除
臨時順序節點(EPHEMERAL_SEQUENTIAL):基本特性與臨時節點一致,但添加了順序的特性
每個節點都有狀態信息,抽象為 Stat 對象,狀態屬性如下:
czxid:節點被創建時的事務ID
mzxid:節點最后一個被更新時的事務ID
ctime:節點創建時間
mtime:節點最后一個被更新時間
version:節點版本號
cversion:子節點版本號
aversion:節點的ACL版本號
ephemeralOwner:創建該臨時節點的會話的sessionID,若為持久節點則為0
dataLength:數據內容長度
numChildren:子節點數量
權限控制ACL (Access Control Lists)
CREATE:創建子節點的權限
READ:獲取節點數據和子節點列表的權限
WRITE:更新節點數據的權限
DELETE:刪除子節點的權限
ADMIN:設置節點ACL的權限
Zookeeper 引入watcher機制來實現發布/訂閱功能,能夠讓多個訂閱者同時監聽某一個節點對象,當這個節點對象狀態發生變化時,會通知所有訂閱者。
Zookeeper的watcher機制主要包括客戶端線程、客戶端WatchManager、Zookeeper服務器三個部分。其工作流程簡單來說:客戶端在向Zookeeper服務器注冊Watcher的同時,會將Watcher對象存儲在客戶端的WatchManager中;當Zookeeper服務器端觸發Watcher事件后,會向客戶端發送通知,客戶端線程從WatchManager中取出對應的Watcher對象來執行回調邏輯
可以設置的兩種 Watcher
NodeCache
監聽數據節點的內容變更
監聽節點的創建,即如果指定的節點不存在,則節點創建后,會觸發這個監聽
PathChildrenCache
監聽指定節點的子節點變化情況
包括新增子節點、子節點數據變更和子節點刪除
客戶端命令
Zookeeper的安裝可參考官方文檔
#?啟動客戶端,默認為?localhost:2181
bin/zkCli.sh
#?啟動客戶端,指定連接的Zookeeper地址
bin/zkCli.sh?-server?ip:port
#?create?創建一個節點,路徑為?/test,內容為?some?test?data
create?/test?"some?test?data"
#?ls?列出指定節點下的所有子節點
ls?/
#?get?獲取指定節點的數據內容和屬性
get?/test
#?set?更新指定節點的數據內容
set?/test?"new?test?data"
#?delete?刪除節點
delete?/test
#?rmr?刪除非空節點
rmr?/test
#?stat?輸出節點的狀態信息
stat?/test
四字命令可以查看Zookeeper服務器的一些信息,可以通過 telnet 和 nc 等方式執行四字命令,以執行 conf 命令為例
#?telnet?方式?執行Zookeeper的?conf?命令
telnet?localhost?2181
conf
#?nc?方式?執行Zookeeper的?conf?命令
echo?conf?|?nc?localhost?2181
四字命令介紹:
conf 命令用于輸出Zookeeper服務器運行時的基本配置信息
cons 命令用于輸出這臺服務器上所有客戶端連接的詳細信息
crst 命令用于重置所有客戶端的連接統計信息
dump 命令用于輸出當前集群的所有會話信息
envi 命令用于輸出Zookeeper所在服務器的運行時信息
ruok 命令用于輸出當前Zookeeper服務器是否正在運行
stat 命令用于獲取Zookeeper服務器的運行狀態信息
srvr 命令與stat命令功能一致,但僅輸出服務器自身的信息
srst 命令用于重置所有服務器的統計信息
wchs 命令用于輸出當前服務器上管理的 watcher 的概要信息
wchc 命令用于輸出當前服務器上管理的 watcher 的詳細信息
wchp 命令與wchc功能非常相似,但輸出信息以節點路徑為單位進行歸組
mntr 命令用于輸出比stat命令更為詳細的服務器統計信息
Curator 客戶端代碼實例
Curator 是 Apache 基金會的頂級項目之一,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括會話超時重連、反復注冊Watcher、NodeExistsException異常等,提供了一套易用性和可讀性更強的Fluent風格的客戶端API框架,除此之外,curator還提供了Zookeeper各種應用場景(Recipe,如共享鎖服務、master選舉、分布式計數器等)的抽象封裝。
Zookeeper 的核心提交者 Patrick Hunt 對 Curator 的高度評價:
Guava is to Java what Curator is to Zookeeper
import?org.apache.curator.RetryPolicy;
import?org.apache.curator.framework.CuratorFramework;
import?org.apache.curator.framework.CuratorFrameworkFactory;
import?org.apache.curator.framework.imps.CuratorFrameworkState;
import?org.apache.curator.retry.ExponentialBackoffRetry;
import?org.apache.zookeeper.CreateMode;
import?org.apache.zookeeper.ZooDefs;
import?org.apache.zookeeper.data.Stat;
public?class?CuratorCrud?{
//?集群模式則是多個ip
private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";
public?static?void?main(String[]?args)?throws?Exception?{
//?創建節點
String?nodePath?=?"/testZK";?//?節點路徑
byte[]?data?=?"this?is?a?test?data".getBytes();?//?節點數據
byte[]?newData?=?"new?test?data".getBytes();?//?節點數據
//?設置重連策略ExponentialBackoffRetry,?baseSleepTimeMs:初始sleep的時間,maxRetries:最大重試次數,maxSleepMs:最大重試時間
RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(10000,?5);
//(推薦)curator鏈接zookeeper的策略:RetryNTimes?n:重試的次數?sleepMsBetweenRetries:每次重試間隔的時間
//?RetryPolicy?retryPolicy?=?new?RetryNTimes(3,?5000);
//?(不推薦)?curator鏈接zookeeper的策略:RetryOneTime?sleepMsBetweenRetry:每次重試間隔的時間,這個策略只會重試一次
//?RetryPolicy?retryPolicy2?=?new?RetryOneTime(3000);
//?永遠重試,不推薦使用
//?RetryPolicy?retryPolicy3?=?new?RetryForever(retryIntervalMs)
//?curator鏈接zookeeper的策略:RetryUntilElapsed?maxElapsedTimeMs:最大重試時間?sleepMsBetweenRetries:每次重試間隔?重試時間超過maxElapsedTimeMs后,就不再重試
//?RetryPolicy?retryPolicy4?=?new?RetryUntilElapsed(2000,?3000);
//?Curator客戶端
CuratorFramework?client?=?null;
//?實例化Curator客戶端,Curator的編程風格可以讓我們使用方法鏈的形式完成客戶端的實例化
client?=?CuratorFrameworkFactory.builder()??//?使用工廠類來建造客戶端的實例對象
.connectString(zkServerIps)?//?放入zookeeper服務器ip
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)??//?設定會話時間以及重連策略
//?.namespace("testApp")????//?隔離的命名空間
.build();?//?建立連接通道
//?啟動Curator客戶端
client.start();
boolean?isZkCuratorStarted?=?client.getState().equals(CuratorFrameworkState.STARTED);
System.out.println("當前客戶端的狀態:"?+?(isZkCuratorStarted???"連接中..."?:?"已關閉..."));
try?{
//?檢查節點是否存在
Stat?s?=?client.checkExists().forPath(nodePath);
if?(s?==?null)?{
System.out.println("節點不存在,創建節點");
//?創建節點
String?result?=?client.create().creatingParentsIfNeeded()????//?創建父節點,也就是會遞歸創建
.withMode(CreateMode.PERSISTENT)?//?節點類型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)?//?節點的ACL權限
.forPath(nodePath,?data);
System.out.println(result?+?"節點,創建成功...");
}?else?{
System.out.println("節點已存在,"?+?s);
}
getData(client,?nodePath);??//?輸出節點信息
//?更新指定節點的數據
int?version?=?s?==?null???0?:?s.getVersion();??//?版本不一致時的異常:KeeperErrorCode?=?BadVersion
Stat?resultStat?=?client.setData().withVersion(version)???//?指定數據版本
.forPath(nodePath,?newData);????//?需要修改的節點路徑以及新數據
System.out.println("更新節點數據成功");
getData(client,?nodePath);??//?輸出節點信息
//?刪除節點
client.delete().guaranteed()????//?如果刪除失敗,那么在后端還是會繼續刪除,直到成功
.deletingChildrenIfNeeded()?//?子節點也一并刪除,也就是會遞歸刪除
.withVersion(resultStat.getVersion())
.forPath(nodePath);
System.out.println("刪除節點:"?+?nodePath);
Thread.sleep(1000);
}?finally?{
//?關閉客戶端
if?(!client.getState().equals(CuratorFrameworkState.STOPPED))?{
System.out.println("關閉客戶端.....");
client.close();
}
isZkCuratorStarted?=?client.getState().equals(CuratorFrameworkState.STARTED);
System.out.println("當前客戶端的狀態:"?+?(isZkCuratorStarted???"連接中..."?:?"已關閉..."));
}
}
/**
*?讀取節點的數據
*/
private?static?byte[]?getData(CuratorFramework?client,?String?nodePath)?throws?Exception?{
Stat?stat?=?new?Stat();
byte[]?nodeData?=?client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("節點?"?+?nodePath?+?"?的數據為:"?+?new?String(nodeData));
System.out.println("該節點的數據版本號為:"?+?stat.getVersion()?+?"\n");
return?nodeData;
}
}
//?輸出
當前客戶端的狀態:連接中...
節點不存在,創建節點
/testZK節點,創建成功...
節點?/testZK?的數據為:this?is?a?test?data
該節點的數據版本號為:0
更新節點數據成功
節點?/testZK?的數據為:new?test?data
該節點的數據版本號為:1
刪除節點:/testZK
關閉客戶端.....
當前客戶端的狀態:已關閉...
public?class?CuratorBackGround?{
private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";
public?static?void?main(String[]?args)?throws?Exception?{
CountDownLatch?samphore?=?new?CountDownLatch(2);
ExecutorService?tp?=?Executors.newFixedThreadPool(2);???//?線程池
String?nodePath?=?"/testZK";
byte[]?data?=?"this?is?a?test?data".getBytes();
RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(10000,?5);
CuratorFramework?client?=?CuratorFrameworkFactory.builder().connectString(zkServerIps)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
client.start();
//?異步創建節點,傳入?ExecutorService,這樣比較復雜的就會放到線程池中執行
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.inBackground(new?BackgroundCallback()?{
@Override
public?void?processResult(CuratorFramework?curatorFramework,?CuratorEvent?curatorEvent)?throws?Exception?{
System.out.println("event[code:?"?+?curatorEvent.getResultCode()?+?",?type:?"?+?curatorEvent.getType()?+?"]");
System.out.println("當前線程:"?+?Thread.currentThread().getName());
samphore.countDown();
}
},?tp).forPath(nodePath,?data);?//?此處傳入?ExecutorService?tp
//?異步創建節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.inBackground(new?BackgroundCallback()?{
@Override
public?void?processResult(CuratorFramework?curatorFramework,?CuratorEvent?curatorEvent)?throws?Exception?{
System.out.println("event[code:?"?+?curatorEvent.getResultCode()?+?",?type:?"?+?curatorEvent.getType()?+?"]");
System.out.println("當前線程:"?+?Thread.currentThread().getName());
samphore.countDown();
}
}).forPath(nodePath,?data);?//?此處沒有傳入?ExecutorService?tp
samphore.await();
tp.shutdown();
}
}
//?輸出
event[code:?-110,?type:?CREATE]
當前線程:main-EventThread
event[code:?0,?type:?CREATE]
當前線程:pool-1-thread-1
public?class?CuratorWatcher?{
private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";
public?static?void?main(String[]?args)?throws?Exception?{
final?String?nodePath?=?"/testZK";
RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(10000,?5);
CuratorFramework?client?=?CuratorFrameworkFactory.builder().connectString(zkServerIps)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
try?{
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(nodePath,?"this?is?a?test?data".getBytes());
final?NodeCache?cacheNode?=?new?NodeCache(client,?nodePath,?false);
cacheNode.start(true);??//?true?表示啟動時立即從Zookeeper上獲取節點
cacheNode.getListenable().addListener(new?NodeCacheListener()?{
@Override
public?void?nodeChanged()?throws?Exception?{
System.out.println("節點數據更新,新的內容是:?"?+?new?String(cacheNode.getCurrentData().getData()));
}
});
for?(int?i?=?0;?i?5;?i++)?{
client.setData().forPath(nodePath,?("new?test?data?"?+?i).getBytes());
Thread.sleep(1000);
}
Thread.sleep(10000);?//?等待100秒,手動在?zkCli?客戶端操作節點,觸發事件
}?finally?{
client.delete().deletingChildrenIfNeeded().forPath(nodePath);
client.close();
System.out.println("客戶端關閉......");
}
}
}
public?class?CuratorPCWatcher?{
private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";
public?static?void?main(String[]?args)?throws?Exception?{
final?String?nodePath?=?"/testZK";
RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(10000,?5);
CuratorFramework?client?=?CuratorFrameworkFactory.builder().connectString(zkServerIps)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
client.start();
try?{
//?為子節點添加watcher,PathChildrenCache:?監聽數據節點的增刪改,可以設置觸發的事件
final?PathChildrenCache?childrenCache?=?new?PathChildrenCache(client,?nodePath,?true);
/**
*?StartMode:?初始化方式
*??-?POST_INITIALIZED_EVENT:異步初始化,初始化之后會觸發事件
*??-?NORMAL:異步初始化
*??-?BUILD_INITIAL_CACHE:同步初始化
*/
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
//?列出子節點數據列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能獲得,異步是獲取不到的
List
System.out.println("當前節點的子節點詳細數據列表:");
for?(ChildData?childData?:?childDataList)?{
System.out.println("\t*?子節點路徑:"?+?new?String(childData.getPath())?+?",該節點的數據為:"?+?new?String(childData.getData()));
}
//?添加事件-
childrenCache.getListenable().addListener(new?PathChildrenCacheListener()?{
@Override
public?void?childEvent(CuratorFramework?curatorFramework,?PathChildrenCacheEvent?event)?throws?Exception?{
//?通過判斷event?type的方式來實現不同事件的觸發
if?(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED))?{??//?子節點初始化時觸發
System.out.println("子節點初始化成功");
}?else?if?(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED))?{??//?添加子節點時觸發
System.out.print("子節點:"?+?event.getData().getPath()?+?"?添加成功,");
System.out.println("該子節點的數據為:"?+?new?String(event.getData().getData()));
}?else?if?(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED))?{??//?刪除子節點時觸發
System.out.println("子節點:"?+?event.getData().getPath()?+?"?刪除成功");
}?else?if?(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))?{??//?修改子節點數據時觸發
System.out.print("子節點:"?+?event.getData().getPath()?+?"?數據更新成功,");
System.out.println("子節點:"?+?event.getData().getPath()?+?"?新的數據為:"?+?new?String(event.getData().getData()));
}
}
});
Thread.sleep(100000);?//?sleep?100秒,在?zkCli.sh?操作子節點,注意查看控制臺的輸出
}?finally?{
client.close();
}
}
}
//?輸出
當前節點的子節點詳細數據列表:
*?子節點路徑:/testZK/node1,該節點的數據為:hello?world
子節點:/testZK/node2?添加成功,該子節點的數據為:hello?node2
子節點:/testZK/node2?數據更新成功,子節點:/testZK/node2?新的數據為:hello?zookeeper
子節點:/testZK/node2?刪除成功
Zookeeper的典型應用場景
下一篇文章將使用 Curator 客戶端來實現 Zookeeper 的典型應用場景的示例,這里簡單概括一下Zookeeper的典型應用場景:
數據發布/訂閱,即所謂的配置中心
負載均衡
命名服務
分布式協調/通知
集群管理
master 選舉
分布式鎖
分布式隊列
參考:
《從Paxos到Zookeeper分布式一致性原理與實踐》
大數據 ZooKeeper
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。