Zookeeper 分布式協調服務介紹

      網友投稿 910 2022-05-30

      分布式系統

      分布式系統的簡單定義:分布式系統是一個硬件或軟件組件分布在不同的網絡計算機上,彼此之間僅僅通過消息傳遞進行通信和協調的系統。

      分布式系統的特征:

      分布性:系統中的計算機在空間上隨意分布和隨時變動

      分布式系統的簡單定義:分布式系統是一個硬件或軟件組件分布在不同的網絡計算機上,彼此之間僅僅通過消息傳遞進行通信和協調的系統。

      分布式系統的特征:

      分布性:系統中的計算機在空間上隨意分布和隨時變動

      對等性:系統中的計算機是對等的,沒有主從之分

      并發性:并發性操作是非常常見的行為

      缺乏全局時鐘:系統中的計算機具有明顯的分布性,且缺乏一個全局的時鐘序列控制,所以很難比較兩個事件的先后

      故障總是會發生:任何在設計階段考慮到的異常情況,一定會在系統實際運行中發生,并且還會遇到很多在設計時未考慮到的異常故障

      隨著分布式架構的出現,越來越多的分布式應用會面臨數據一致性問題。

      ZooKeeper是一個典型的分布式數據一致性解決方案,分布式應用程序可以基于它實現諸如數據發布/訂閱、負載均衡、命名服務、分布式協調/通知、集群管理、master選舉、分布式鎖和分布式隊列等功能。

      Zookeeper致力于提供一個高性能、高可用,具有嚴格的順序訪問控制能力的分布式協調服務;其主要的設計目標是簡單的數據模型、可以構建集群、順序訪問、高性能。Zookeeper已經成為很多大型分布式項目譬如Hadoop、HBase、Storm、Solr等中的核心組件,用于分布式協調。

      Zookeeper可以保證如下分布式一致性特性:

      順序一致性:從同一個客戶端發起的事務請求,最終將會嚴格地按照其發起順序被應用到Zookeeper中去

      原子性:所有事務請求的處理結果在整個集群中所有的機器上的應用情況是一致的

      單一視圖:無論客戶端連接的是哪個Zookeeper服務器,其看到的服務器數據模型都是一致的

      可靠性:一旦服務端成功地應用了一個事務,并完成對客戶端的響應,那么該事務所引起的服務端狀態變更將會被一直保留下來,除非有另一個事務又對其進行了變更

      實時性:在一定的時間內,客戶端最終一定能夠從服務端上讀取到最新的數據狀態

      Zookeeper 基本概念

      集群角色

      Leader:客戶端提供讀和寫服務

      Follower:提供讀服務,所有寫服務都需要轉交給Leader角色,參與選舉

      Observer:提供讀服務,不參與選舉過程,一般是為了增強Zookeeper集群的讀請求并發能力

      會話 (session)

      Zk的客戶端與zk的服務端之間的連接

      通過心跳檢測保持客戶端連接的存活

      接收來自服務端的watch事件通知

      可以設置超時時間

      ZNode 是Zookeeper中數據的最小單元,每個ZNode上可以保存數據(byte[]類型),同時可以掛在子節點,因此構成了一個層次化的命名空間,我們稱之為樹

      節點是有生命周期的,生命周期由節點類型決定:

      持久節點(PERSISTENT):節點創建后就一直存在于Zookeeper服務器上,直到有刪除操作主動將其刪除

      持久順序節點(PERSISTENT_SEQUENTIAL):基本特性與持久節點一致,額外的特性在于Zookeeper會記錄其子節點創建的先后順序

      臨時節點(EPHEMERAL):聲明周期與客戶端的會話綁定,客戶端會話失效時節點將被自動清除

      臨時順序節點(EPHEMERAL_SEQUENTIAL):基本特性與臨時節點一致,但添加了順序的特性

      每個節點都有狀態信息,抽象為 Stat 對象,狀態屬性如下:

      czxid:節點被創建時的事務ID

      mzxid:節點最后一個被更新時的事務ID

      ctime:節點創建時間

      mtime:節點最后一個被更新時間

      version:節點版本號

      cversion:子節點版本號

      aversion:節點的ACL版本號

      ephemeralOwner:創建該臨時節點的會話的sessionID,若為持久節點則為0

      dataLength:數據內容長度

      numChildren:子節點數量

      權限控制ACL (Access Control Lists)

      CREATE:創建子節點的權限

      READ:獲取節點數據和子節點列表的權限

      WRITE:更新節點數據的權限

      DELETE:刪除子節點的權限

      ADMIN:設置節點ACL的權限

      Zookeeper 引入watcher機制來實現發布/訂閱功能,能夠讓多個訂閱者同時監聽某一個節點對象,當這個節點對象狀態發生變化時,會通知所有訂閱者。

      Zookeeper的watcher機制主要包括客戶端線程、客戶端WatchManager、Zookeeper服務器三個部分。其工作流程簡單來說:客戶端在向Zookeeper服務器注冊Watcher的同時,會將Watcher對象存儲在客戶端的WatchManager中;當Zookeeper服務器端觸發Watcher事件后,會向客戶端發送通知,客戶端線程從WatchManager中取出對應的Watcher對象來執行回調邏輯

      可以設置的兩種 Watcher

      NodeCache

      監聽數據節點的內容變更

      監聽節點的創建,即如果指定的節點不存在,則節點創建后,會觸發這個監聽

      PathChildrenCache

      監聽指定節點的子節點變化情況

      包括新增子節點、子節點數據變更和子節點刪除

      客戶端命令

      Zookeeper的安裝可參考官方文檔

      #?啟動客戶端,默認為?localhost:2181

      bin/zkCli.sh

      #?啟動客戶端,指定連接的Zookeeper地址

      bin/zkCli.sh?-server?ip:port

      #?create?創建一個節點,路徑為?/test,內容為?some?test?data

      create?/test?"some?test?data"

      #?ls?列出指定節點下的所有子節點

      ls?/

      #?get?獲取指定節點的數據內容和屬性

      get?/test

      #?set?更新指定節點的數據內容

      set?/test?"new?test?data"

      #?delete?刪除節點

      delete?/test

      #?rmr?刪除非空節點

      rmr?/test

      #?stat?輸出節點的狀態信息

      stat?/test

      四字命令可以查看Zookeeper服務器的一些信息,可以通過 telnet 和 nc 等方式執行四字命令,以執行 conf 命令為例

      #?telnet?方式?執行Zookeeper的?conf?命令

      telnet?localhost?2181

      conf

      #?nc?方式?執行Zookeeper的?conf?命令

      echo?conf?|?nc?localhost?2181

      四字命令介紹:

      conf 命令用于輸出Zookeeper服務器運行時的基本配置信息

      cons 命令用于輸出這臺服務器上所有客戶端連接的詳細信息

      crst 命令用于重置所有客戶端的連接統計信息

      dump 命令用于輸出當前集群的所有會話信息

      envi 命令用于輸出Zookeeper所在服務器的運行時信息

      ruok 命令用于輸出當前Zookeeper服務器是否正在運行

      stat 命令用于獲取Zookeeper服務器的運行狀態信息

      srvr 命令與stat命令功能一致,但僅輸出服務器自身的信息

      srst 命令用于重置所有服務器的統計信息

      wchs 命令用于輸出當前服務器上管理的 watcher 的概要信息

      wchc 命令用于輸出當前服務器上管理的 watcher 的詳細信息

      wchp 命令與wchc功能非常相似,但輸出信息以節點路徑為單位進行歸組

      mntr 命令用于輸出比stat命令更為詳細的服務器統計信息

      Curator 客戶端代碼實例

      Curator 是 Apache 基金會的頂級項目之一,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括會話超時重連、反復注冊Watcher、NodeExistsException異常等,提供了一套易用性和可讀性更強的Fluent風格的客戶端API框架,除此之外,curator還提供了Zookeeper各種應用場景(Recipe,如共享鎖服務、master選舉、分布式計數器等)的抽象封裝。

      Zookeeper 的核心提交者 Patrick Hunt 對 Curator 的高度評價:

      Guava is to Java what Curator is to Zookeeper

      import?org.apache.curator.RetryPolicy;

      import?org.apache.curator.framework.CuratorFramework;

      import?org.apache.curator.framework.CuratorFrameworkFactory;

      import?org.apache.curator.framework.imps.CuratorFrameworkState;

      import?org.apache.curator.retry.ExponentialBackoffRetry;

      import?org.apache.zookeeper.CreateMode;

      import?org.apache.zookeeper.ZooDefs;

      import?org.apache.zookeeper.data.Stat;

      public?class?CuratorCrud?{

      //?集群模式則是多個ip

      private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";

      public?static?void?main(String[]?args)?throws?Exception?{

      //?創建節點

      String?nodePath?=?"/testZK";?//?節點路徑

      byte[]?data?=?"this?is?a?test?data".getBytes();?//?節點數據

      byte[]?newData?=?"new?test?data".getBytes();?//?節點數據

      //?設置重連策略ExponentialBackoffRetry,?baseSleepTimeMs:初始sleep的時間,maxRetries:最大重試次數,maxSleepMs:最大重試時間

      RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(10000,?5);

      //(推薦)curator鏈接zookeeper的策略:RetryNTimes?n:重試的次數?sleepMsBetweenRetries:每次重試間隔的時間

      //?RetryPolicy?retryPolicy?=?new?RetryNTimes(3,?5000);

      //?(不推薦)?curator鏈接zookeeper的策略:RetryOneTime?sleepMsBetweenRetry:每次重試間隔的時間,這個策略只會重試一次

      //?RetryPolicy?retryPolicy2?=?new?RetryOneTime(3000);

      //?永遠重試,不推薦使用

      //?RetryPolicy?retryPolicy3?=?new?RetryForever(retryIntervalMs)

      //?curator鏈接zookeeper的策略:RetryUntilElapsed?maxElapsedTimeMs:最大重試時間?sleepMsBetweenRetries:每次重試間隔?重試時間超過maxElapsedTimeMs后,就不再重試

      //?RetryPolicy?retryPolicy4?=?new?RetryUntilElapsed(2000,?3000);

      //?Curator客戶端

      CuratorFramework?client?=?null;

      //?實例化Curator客戶端,Curator的編程風格可以讓我們使用方法鏈的形式完成客戶端的實例化

      client?=?CuratorFrameworkFactory.builder()??//?使用工廠類來建造客戶端的實例對象

      .connectString(zkServerIps)?//?放入zookeeper服務器ip

      .sessionTimeoutMs(10000).retryPolicy(retryPolicy)??//?設定會話時間以及重連策略

      //?.namespace("testApp")????//?隔離的命名空間

      .build();?//?建立連接通道

      //?啟動Curator客戶端

      client.start();

      boolean?isZkCuratorStarted?=?client.getState().equals(CuratorFrameworkState.STARTED);

      System.out.println("當前客戶端的狀態:"?+?(isZkCuratorStarted???"連接中..."?:?"已關閉..."));

      try?{

      //?檢查節點是否存在

      Stat?s?=?client.checkExists().forPath(nodePath);

      if?(s?==?null)?{

      System.out.println("節點不存在,創建節點");

      //?創建節點

      String?result?=?client.create().creatingParentsIfNeeded()????//?創建父節點,也就是會遞歸創建

      .withMode(CreateMode.PERSISTENT)?//?節點類型

      .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)?//?節點的ACL權限

      .forPath(nodePath,?data);

      System.out.println(result?+?"節點,創建成功...");

      }?else?{

      System.out.println("節點已存在,"?+?s);

      }

      getData(client,?nodePath);??//?輸出節點信息

      //?更新指定節點的數據

      int?version?=?s?==?null???0?:?s.getVersion();??//?版本不一致時的異常:KeeperErrorCode?=?BadVersion

      Stat?resultStat?=?client.setData().withVersion(version)???//?指定數據版本

      .forPath(nodePath,?newData);????//?需要修改的節點路徑以及新數據

      System.out.println("更新節點數據成功");

      getData(client,?nodePath);??//?輸出節點信息

      //?刪除節點

      client.delete().guaranteed()????//?如果刪除失敗,那么在后端還是會繼續刪除,直到成功

      .deletingChildrenIfNeeded()?//?子節點也一并刪除,也就是會遞歸刪除

      .withVersion(resultStat.getVersion())

      .forPath(nodePath);

      System.out.println("刪除節點:"?+?nodePath);

      Thread.sleep(1000);

      }?finally?{

      //?關閉客戶端

      if?(!client.getState().equals(CuratorFrameworkState.STOPPED))?{

      System.out.println("關閉客戶端.....");

      client.close();

      }

      isZkCuratorStarted?=?client.getState().equals(CuratorFrameworkState.STARTED);

      System.out.println("當前客戶端的狀態:"?+?(isZkCuratorStarted???"連接中..."?:?"已關閉..."));

      }

      }

      /**

      *?讀取節點的數據

      */

      private?static?byte[]?getData(CuratorFramework?client,?String?nodePath)?throws?Exception?{

      Stat?stat?=?new?Stat();

      byte[]?nodeData?=?client.getData().storingStatIn(stat).forPath(nodePath);

      System.out.println("節點?"?+?nodePath?+?"?的數據為:"?+?new?String(nodeData));

      System.out.println("該節點的數據版本號為:"?+?stat.getVersion()?+?"\n");

      return?nodeData;

      }

      }

      //?輸出

      當前客戶端的狀態:連接中...

      節點不存在,創建節點

      /testZK節點,創建成功...

      節點?/testZK?的數據為:this?is?a?test?data

      該節點的數據版本號為:0

      更新節點數據成功

      節點?/testZK?的數據為:new?test?data

      該節點的數據版本號為:1

      刪除節點:/testZK

      關閉客戶端.....

      當前客戶端的狀態:已關閉...

      public?class?CuratorBackGround?{

      private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";

      Zookeeper 分布式協調服務介紹

      public?static?void?main(String[]?args)?throws?Exception?{

      CountDownLatch?samphore?=?new?CountDownLatch(2);

      ExecutorService?tp?=?Executors.newFixedThreadPool(2);???//?線程池

      String?nodePath?=?"/testZK";

      byte[]?data?=?"this?is?a?test?data".getBytes();

      RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(10000,?5);

      CuratorFramework?client?=?CuratorFrameworkFactory.builder().connectString(zkServerIps)

      .sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();

      client.start();

      //?異步創建節點,傳入?ExecutorService,這樣比較復雜的就會放到線程池中執行

      client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)

      .inBackground(new?BackgroundCallback()?{

      @Override

      public?void?processResult(CuratorFramework?curatorFramework,?CuratorEvent?curatorEvent)?throws?Exception?{

      System.out.println("event[code:?"?+?curatorEvent.getResultCode()?+?",?type:?"?+?curatorEvent.getType()?+?"]");

      System.out.println("當前線程:"?+?Thread.currentThread().getName());

      samphore.countDown();

      }

      },?tp).forPath(nodePath,?data);?//?此處傳入?ExecutorService?tp

      //?異步創建節點

      client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)

      .inBackground(new?BackgroundCallback()?{

      @Override

      public?void?processResult(CuratorFramework?curatorFramework,?CuratorEvent?curatorEvent)?throws?Exception?{

      System.out.println("event[code:?"?+?curatorEvent.getResultCode()?+?",?type:?"?+?curatorEvent.getType()?+?"]");

      System.out.println("當前線程:"?+?Thread.currentThread().getName());

      samphore.countDown();

      }

      }).forPath(nodePath,?data);?//?此處沒有傳入?ExecutorService?tp

      samphore.await();

      tp.shutdown();

      }

      }

      //?輸出

      event[code:?-110,?type:?CREATE]

      當前線程:main-EventThread

      event[code:?0,?type:?CREATE]

      當前線程:pool-1-thread-1

      public?class?CuratorWatcher?{

      private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";

      public?static?void?main(String[]?args)?throws?Exception?{

      final?String?nodePath?=?"/testZK";

      RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(10000,?5);

      CuratorFramework?client?=?CuratorFrameworkFactory.builder().connectString(zkServerIps)

      .sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();

      try?{

      client.start();

      client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(nodePath,?"this?is?a?test?data".getBytes());

      final?NodeCache?cacheNode?=?new?NodeCache(client,?nodePath,?false);

      cacheNode.start(true);??//?true?表示啟動時立即從Zookeeper上獲取節點

      cacheNode.getListenable().addListener(new?NodeCacheListener()?{

      @Override

      public?void?nodeChanged()?throws?Exception?{

      System.out.println("節點數據更新,新的內容是:?"?+?new?String(cacheNode.getCurrentData().getData()));

      }

      });

      for?(int?i?=?0;?i?

      client.setData().forPath(nodePath,?("new?test?data?"?+?i).getBytes());

      Thread.sleep(1000);

      }

      Thread.sleep(10000);?//?等待100秒,手動在?zkCli?客戶端操作節點,觸發事件

      }?finally?{

      client.delete().deletingChildrenIfNeeded().forPath(nodePath);

      client.close();

      System.out.println("客戶端關閉......");

      }

      }

      }

      public?class?CuratorPCWatcher?{

      private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";

      public?static?void?main(String[]?args)?throws?Exception?{

      final?String?nodePath?=?"/testZK";

      RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(10000,?5);

      CuratorFramework?client?=?CuratorFrameworkFactory.builder().connectString(zkServerIps)

      .sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();

      client.start();

      try?{

      //?為子節點添加watcher,PathChildrenCache:?監聽數據節點的增刪改,可以設置觸發的事件

      final?PathChildrenCache?childrenCache?=?new?PathChildrenCache(client,?nodePath,?true);

      /**

      *?StartMode:?初始化方式

      *??-?POST_INITIALIZED_EVENT:異步初始化,初始化之后會觸發事件

      *??-?NORMAL:異步初始化

      *??-?BUILD_INITIAL_CACHE:同步初始化

      */

      childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

      //?列出子節點數據列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能獲得,異步是獲取不到的

      List?childDataList?=?childrenCache.getCurrentData();

      System.out.println("當前節點的子節點詳細數據列表:");

      for?(ChildData?childData?:?childDataList)?{

      System.out.println("\t*?子節點路徑:"?+?new?String(childData.getPath())?+?",該節點的數據為:"?+?new?String(childData.getData()));

      }

      //?添加事件-

      childrenCache.getListenable().addListener(new?PathChildrenCacheListener()?{

      @Override

      public?void?childEvent(CuratorFramework?curatorFramework,?PathChildrenCacheEvent?event)?throws?Exception?{

      //?通過判斷event?type的方式來實現不同事件的觸發

      if?(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED))?{??//?子節點初始化時觸發

      System.out.println("子節點初始化成功");

      }?else?if?(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED))?{??//?添加子節點時觸發

      System.out.print("子節點:"?+?event.getData().getPath()?+?"?添加成功,");

      System.out.println("該子節點的數據為:"?+?new?String(event.getData().getData()));

      }?else?if?(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED))?{??//?刪除子節點時觸發

      System.out.println("子節點:"?+?event.getData().getPath()?+?"?刪除成功");

      }?else?if?(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))?{??//?修改子節點數據時觸發

      System.out.print("子節點:"?+?event.getData().getPath()?+?"?數據更新成功,");

      System.out.println("子節點:"?+?event.getData().getPath()?+?"?新的數據為:"?+?new?String(event.getData().getData()));

      }

      }

      });

      Thread.sleep(100000);?//?sleep?100秒,在?zkCli.sh?操作子節點,注意查看控制臺的輸出

      }?finally?{

      client.close();

      }

      }

      }

      //?輸出

      當前節點的子節點詳細數據列表:

      *?子節點路徑:/testZK/node1,該節點的數據為:hello?world

      子節點:/testZK/node2?添加成功,該子節點的數據為:hello?node2

      子節點:/testZK/node2?數據更新成功,子節點:/testZK/node2?新的數據為:hello?zookeeper

      子節點:/testZK/node2?刪除成功

      Zookeeper的典型應用場景

      下一篇文章將使用 Curator 客戶端來實現 Zookeeper 的典型應用場景的示例,這里簡單概括一下Zookeeper的典型應用場景:

      數據發布/訂閱,即所謂的配置中心

      負載均衡

      命名服務

      分布式協調/通知

      集群管理

      master 選舉

      分布式鎖

      分布式隊列

      參考:

      《從Paxos到Zookeeper分布式一致性原理與實踐》

      大數據 ZooKeeper

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:基于華為云物聯網設計的智能家居控制系統(STM32+ESP8266)【我們都是華為云專家】
      下一篇:不同場景容器內獲取客戶端源IP的方法
      相關文章
      亚洲熟妇无码八AV在线播放| 久久久久久亚洲Av无码精品专口 | 中文字幕不卡亚洲| 亚洲AV无码国产剧情| 中文字幕亚洲精品无码| 亚洲精品国产国语| 亚洲一区二区三区四区视频| 亚洲视频免费观看| 亚洲国产美女在线观看| 亚洲精品美女视频| 亚洲国产精品自在在线观看| 亚洲AV无码日韩AV无码导航| 亚洲Av永久无码精品三区在线| 情人伊人久久综合亚洲| 国产AV无码专区亚洲AVJULIA| 久久国产亚洲精品麻豆| 日本亚洲视频在线| 国产亚洲精品成人a v小说| 亚洲成A人片在线观看中文 | 免费亚洲视频在线观看| 亚洲sm另类一区二区三区| 亚洲av永久无码精品网址| 另类专区另类专区亚洲| 亚洲精品动漫人成3d在线| 亚洲日本韩国在线| 亚洲区小说区激情区图片区| 久久精品国产精品亚洲精品| 亚洲av伊人久久综合密臀性色| 亚洲国产成人久久综合一| 在线观看亚洲人成网站| 亚洲一区在线视频观看| 精品久久亚洲中文无码| 中文字幕在线观看亚洲日韩| 亚洲日本一线产区和二线产区对比| 亚洲欧洲AV无码专区| 国产成人综合久久精品亚洲| 亚洲一区二区视频在线观看| 国产亚洲精aa成人网站| 亚洲国产精品VA在线看黑人| 亚洲综合精品香蕉久久网97| 亚洲国产精品成人精品小说|