利用Zookeeper實現 - 分布式鎖

      網友投稿 928 2025-04-02

      在許多場景中,數據一致性是一個比較重要的話題,在單機環境中,我們可以通過Java提供的并發API來解決;而在分布式環境(會遇到網絡故障、消息重復、消息丟失等各種問題)下要復雜得多,常見的解決方案是分布式事務、分布式鎖等。

      本文主要探討如何利用ZooKeeper來實現分布式鎖。

      關于分布式鎖

      分布式鎖是控制分布式系統之間同步訪問共享資源的一種方式。

      在實現分布式鎖的過程中需要注意的:

      鎖的可重入性(遞歸調用不應該被阻塞、避免死鎖)

      在許多場景中,數據一致性是一個比較重要的話題,在單機環境中,我們可以通過Java提供的并發API來解決;而在分布式環境(會遇到網絡故障、消息重復、消息丟失等各種問題)下要復雜得多,常見的解決方案是分布式事務、分布式鎖等。

      本文主要探討如何利用ZooKeeper來實現分布式鎖。

      關于分布式鎖

      分布式鎖是控制分布式系統之間同步訪問共享資源的一種方式。

      在實現分布式鎖的過程中需要注意的:

      鎖的可重入性(遞歸調用不應該被阻塞、避免死鎖)

      鎖的超時(避免死鎖、死循環等意外情況)

      鎖的阻塞(保證原子性等)

      鎖的特性支持(阻塞鎖、可重入鎖、公平鎖、聯鎖、信號量、讀寫鎖)

      在使用分布式鎖時需要注意:

      分布式鎖的開銷(分布式鎖一般能不用就不用,有些場景可以用樂觀鎖代替)

      加鎖的粒度(控制加鎖的粒度,可以優化系統的性能)

      加鎖的方式

      以下是幾種常見的實現分布式鎖的方案及其優缺點。

      1. 基于數據庫表

      最簡單的方式可能就是直接創建一張鎖表,當我們要鎖住某個方法或資源時,我們就在該表中增加一條記錄,想要釋放鎖的時候就刪除這條記錄。給某字段添加唯一性約束,如果有多個請求同時提交到數據庫的話,數據庫會保證只有一個操作可以成功,那么我們就可以認為操作成功的那個線程獲得了該方法的鎖,可以執行方法體內容。

      會引入數據庫單點、無失效時間、不阻塞、不可重入等問題。

      2. 基于數據庫排他鎖

      如果使用的是MySql的InnoDB引擎,在查詢語句后面增加for update,數據庫會在查詢過程中(須通過唯一索引查詢)給數據庫表增加排他鎖,我們可以認為獲得排它鎖的線程即可獲得分布式鎖,通過 connection.commit() 操作來釋放鎖。

      會引入數據庫單點、不可重入、無法保證一定使用行鎖(部分情況下MySQL自動使用表鎖而不是行鎖)、排他鎖長時間不提交導致占用數據庫連接等問題。

      3. 數據庫實現分布式鎖總結

      優點:

      直接借助數據庫,容易理解。

      缺點:

      會引入更多的問題,使整個方案變得越來越復雜

      操作數據庫需要一定的開銷,有一定的性能問題

      使用數據庫的行級鎖并不一定靠譜,尤其是當我們的鎖表并不大的時候

      相比較于基于數據庫實現分布式鎖的方案來說,基于緩存來實現在性能方面會表現的更好一點。目前有很多成熟的緩存產品,包括Redis、memcached、tair等。

      這里以Redis為例舉出幾種實現方法:

      1. 基于 redis 的 setnx()、expire() 方法做分布式鎖

      setnx 的含義就是 SET if Not Exists,其主要有兩個參數 setnx(key, value)。該方法是原子的,如果 key 不存在,則設置當前 key 成功,返回 1;如果當前 key 已經存在,則設置當前 key 失敗,返回 0。

      expire 設置過期時間,要注意的是 setnx 命令不能設置 key 的超時時間,只能通過 expire() 來對 key 設置。

      2. 基于 redis 的 setnx()、get()、getset()方法做分布式鎖

      getset 這個命令主要有兩個參數 getset(key,newValue),該方法是原子的,對 key 設置 newValue 這個值,并且返回 key 原來的舊值。

      3. 基于 Redlock 做分布式鎖

      4. 基于 redisson 做分布式鎖

      redisson 是 redis 官方的分布式鎖組件,GitHub 地址:https://github.com/redisson/redisson

      基于緩存實現分布式鎖總結

      優點:

      性能好

      缺點:

      實現中需要考慮的因素太多

      通過超時時間來控制鎖的失效時間并不是十分的靠譜

      大致思想為:每個客戶端對某個方法加鎖時,在 Zookeeper 上與該方法對應的指定節點的目錄下,生成一個唯一的臨時有序節點。 判斷是否獲取鎖的方式很簡單,只需要判斷有序節點中序號最小的一個。 當釋放鎖的時候,只需將這個臨時節點刪除即可。同時,其可以避免服務宕機導致的鎖無法釋放,而產生的死鎖問題

      Zookeeper實現分布式鎖總結

      優點:

      有效的解決單點問題,不可重入問題,非阻塞問題以及鎖無法釋放的問題

      實現較為簡單

      缺點:

      性能上不如使用緩存實現的分布式鎖,因為每次在創建鎖和釋放鎖的過程中,都要動態創建、銷毀臨時節點來實現鎖功能

      需要對Zookeeper的原理有所了解

      Zookeeper 如何實現分布式鎖?

      下面講如何實現排他鎖和共享鎖,以及如何解決羊群效應。

      排他鎖,又稱寫鎖或獨占鎖。如果事務T1對數據對象O1加上了排他鎖,那么在整個加鎖期間,只允許事務T1對O1進行讀取或更新操作,其他任務事務都不能對這個數據對象進行任何操作,直到T1釋放了排他鎖。

      排他鎖核心是保證當前有且僅有一個事務獲得鎖,并且鎖釋放之后,所有正在等待獲取鎖的事務都能夠被通知到。

      Zookeeper 的強一致性特性,能夠很好地保證在分布式高并發情況下節點的創建一定能夠保證全局唯一性,即Zookeeper將會保證客戶端無法重復創建一個已經存在的數據節點。可以利用Zookeeper這個特性,實現排他鎖。

      定義鎖:通過Zookeeper上的數據節點來表示一個鎖

      獲取鎖:客戶端通過調用 create 方法創建表示鎖的臨時節點,可以認為創建成功的客戶端獲得了鎖,同時可以讓沒有獲得鎖的節點在該節點上注冊Watcher監聽,以便實時監聽到lock節點的變更情況

      釋放鎖:以下兩種情況都可以讓鎖釋放

      當前獲得鎖的客戶端發生宕機或異常,那么Zookeeper上這個臨時節點就會被刪除

      正常執行完業務邏輯,客戶端主動刪除自己創建的臨時節點

      基于Zookeeper實現排他鎖流程:

      共享鎖,又稱讀鎖。如果事務T1對數據對象O1加上了共享鎖,那么當前事務只能對O1進行讀取操作,其他事務也只能對這個數據對象加共享鎖,直到該數據對象上的所有共享鎖都被釋放。

      共享鎖與排他鎖的區別在于,加了排他鎖之后,數據對象只對當前事務可見,而加了共享鎖之后,數據對象對所有事務都可見。

      定義鎖:通過Zookeeper上的數據節點來表示一個鎖,是一個類似于 /lockpath/[hostname]-請求類型-序號 的臨時順序節點

      獲取鎖:客戶端通過調用 create 方法創建表示鎖的臨時順序節點,如果是讀請求,則創建 /lockpath/[hostname]-R-序號 節點,如果是寫請求則創建 /lockpath/[hostname]-W-序號 節點

      判斷讀寫順序:大概分為4個步驟

      1)創建完節點后,獲取 `/lockpath` 節點下的所有子節點,并對該節點注冊子節點變更的Watcher監聽

      2)確定自己的節點序號在所有子節點中的順序

      3.1)對于讀請求:1. 如果沒有比自己序號更小的子節點,或者比自己序號小的子節點都是讀請求,那么表明自己已經成功獲取到了共享鎖,同時開始執行讀取邏輯 2. 如果有比自己序號小的子節點有寫請求,那么等待 3.

      3.2)對于寫請求,如果自己不是序號最小的節點,那么等待

      4)接收到Watcher通知后,重復步驟1)

      釋放鎖:與排他鎖邏輯一致

      基于Zookeeper實現共享鎖流程:

      在實現共享鎖的 "判斷讀寫順序" 的第1個步驟是:創建完節點后,獲取 /lockpath 節點下的所有子節點,并對該節點注冊子節點變更的Watcher監聽。這樣的話,任何一次客戶端移除共享鎖之后,Zookeeper將會發送子節點變更的Watcher通知給所有機器,系統中將有大量的 "Watcher通知" 和 "子節點列表獲取" 這個操作重復執行,然后所有節點再判斷自己是否是序號最小的節點(寫請求)或者判斷比自己序號小的子節點是否都是讀請求(讀請求),從而繼續等待下一次通知。

      然而,這些重復操作很多都是 "無用的",實際上每個鎖競爭者只需要關注序號比自己小的那個節點是否存在即可

      當集群規模比較大時,這些 "無用的" 操作不僅會對Zookeeper造成巨大的性能影響和網絡沖擊,更為嚴重的是,如果同一時間有多個客戶端釋放了共享鎖,Zookeeper服務器就會在短時間內向其余客戶端發送大量的事件通知--這就是所謂的 "羊群效應"。

      改進后的分布式鎖實現:

      具體實現如下:

      1. 客戶端調用 create 方法創建一個類似于 /lockpath/[hostname]-請求類型-序號 的臨時順序節點

      2. 客戶端調用 getChildren 方法獲取所有已經創建的子節點列表(這里不注冊任何Watcher)

      3. 如果無法獲取任何共享鎖,那么調用 exist 來對比自己小的那個節點注冊Watcher

      讀請求:向比自己序號小的最后一個寫請求節點注冊Watcher監聽

      寫請求:向比自己序號小的最后一個節點注冊Watcher監聽

      4. 等待Watcher監聽,繼續進入步驟2

      Zookeeper羊群效應改進前后Watcher監聽圖

      基于Curator客戶端實現分布式鎖

      Apache Curator是一個Zookeeper的開源客戶端,它提供了Zookeeper各種應用場景(Recipe,如共享鎖服務、master選舉、分布式計數器等)的抽象封裝,接下來將利用Curator提供的類來實現分布式鎖。

      Curator提供的跟分布式鎖相關的類有5個,分別是:

      Shared Reentrant Lock 可重入鎖

      Shared Lock 共享不可重入鎖

      Shared Reentrant Read Write Lock 可重入讀寫鎖

      Shared Semaphore 信號量

      Multi Shared Lock 多鎖

      關于錯誤處理:還是強烈推薦使用ConnectionStateListener處理連接狀態的改變。當連接LOST時你不再擁有鎖。

      Shared Reentrant Lock,全局可重入鎖,所有客戶端都可以請求,同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。它是由類 InterProcessMutex 來實現,它的主要方法:

      //?構造方法

      public?InterProcessMutex(CuratorFramework?client,?String?path)

      public?InterProcessMutex(CuratorFramework?client,?String?path,?LockInternalsDriver?driver)

      //?通過acquire獲得鎖,并提供超時機制:

      public?void?acquire()?throws?Exception

      public?boolean?acquire(long?time,?TimeUnit?unit)?throws?Exception

      //?撤銷鎖

      public?void?makeRevocable(RevocationListener?listener)

      public?void?makeRevocable(final?RevocationListener?listener,?Executor?executor)

      定義一個 FakeLimitedResource 類來模擬一個共享資源,該資源一次只能被一個線程使用,直到使用結束,下一個線程才能使用,否則會拋出異常

      public?class?FakeLimitedResource?{

      private?final?AtomicBoolean?inUse?=?new?AtomicBoolean(false);

      //?模擬只能單線程操作的資源

      public?void?use()?throws?InterruptedException?{

      if?(!inUse.compareAndSet(false,?true))?{

      //?在正確使用鎖的情況下,此異常不可能拋出

      throw?new?IllegalStateException("Needs?to?be?used?by?one?client?at?a?time");

      }

      try?{

      Thread.sleep((long)?(100?*?Math.random()));

      }?finally?{

      inUse.set(false);

      }

      }

      }

      下面的代碼將創建 N 個線程來模擬分布式系統中的節點,系統將通過 InterProcessMutex 來控制對資源的同步使用;每個節點都將發起10次請求,完成 請求鎖--訪問資源--再次請求鎖--釋放鎖--釋放鎖 的過程;客戶端通過 acquire 請求鎖,通過 release 釋放鎖,獲得幾把鎖就要釋放幾把鎖;這個共享資源一次只能被一個線程使用,如果控制同步失敗,將拋異常。

      public?class?SharedReentrantLockTest?{

      private?static?final?String?lockPath?=?"/testZK/sharedreentrantlock";

      private?static?final?Integer?clientNums?=?5;

      final?static?FakeLimitedResource?resource?=?new?FakeLimitedResource();?//?共享的資源

      private?static?CountDownLatch?countDownLatch?=?new?CountDownLatch(clientNums);

      public?static?void?main(String[]?args)?throws?InterruptedException?{

      for?(int?i?=?0;?i?

      String?clientName?=?"client#"?+?i;

      new?Thread(new?Runnable()?{

      @Override

      public?void?run()?{

      CuratorFramework?client?=?ZKUtils.getClient();

      client.start();

      Random?random?=?new?Random();

      try?{

      final?InterProcessMutex?lock?=?new?InterProcessMutex(client,?lockPath);

      //?每個客戶端請求10次共享資源

      for?(int?j?=?0;?j?

      if?(!lock.acquire(10,?TimeUnit.SECONDS))?{

      throw?new?IllegalStateException(j?+?".?"?+?clientName?+?"?不能得到互斥鎖");

      }

      try?{

      System.out.println(j?+?".?"?+?clientName?+?"?已獲取到互斥鎖");

      resource.use();?//?使用資源

      if?(!lock.acquire(10,?TimeUnit.SECONDS))?{

      throw?new?IllegalStateException(j?+?".?"?+?clientName?+?"?不能再次得到互斥鎖");

      }

      System.out.println(j?+?".?"?+?clientName?+?"?已再次獲取到互斥鎖");

      lock.release();?//?申請幾次鎖就要釋放幾次鎖

      }?finally?{

      System.out.println(j?+?".?"?+?clientName?+?"?釋放互斥鎖");

      lock.release();?//?總是在finally中釋放

      }

      Thread.sleep(random.nextInt(100));

      }

      }?catch?(Throwable?e)?{

      System.out.println(e.getMessage());

      }?finally?{

      CloseableUtils.closeQuietly(client);

      System.out.println(clientName?+?"?客戶端關閉!");

      countDownLatch.countDown();

      }

      }

      }).start();

      }

      countDownLatch.await();

      System.out.println("結束!");

      }

      }

      控制臺打印日志,可以看到對資源的同步訪問控制成功,并且鎖是可重入的

      0.?client#3?已獲取到互斥鎖

      0.?client#3?已再次獲取到互斥鎖

      0.?client#3?釋放互斥鎖

      0.?client#1?已獲取到互斥鎖

      0.?client#1?已再次獲取到互斥鎖

      0.?client#1?釋放互斥鎖

      0.?client#2?已獲取到互斥鎖

      0.?client#2?已再次獲取到互斥鎖

      0.?client#2?釋放互斥鎖

      0.?client#0?已獲取到互斥鎖

      0.?client#0?已再次獲取到互斥鎖

      0.?client#0?釋放互斥鎖

      0.?client#4?已獲取到互斥鎖

      0.?client#4?已再次獲取到互斥鎖

      0.?client#4?釋放互斥鎖

      1.?client#1?已獲取到互斥鎖

      1.?client#1?已再次獲取到互斥鎖

      1.?client#1?釋放互斥鎖

      2.?client#1?已獲取到互斥鎖

      2.?client#1?已再次獲取到互斥鎖

      2.?client#1?釋放互斥鎖

      1.?client#4?已獲取到互斥鎖

      1.?client#4?已再次獲取到互斥鎖

      1.?client#4?釋放互斥鎖

      1.?client#3?已獲取到互斥鎖

      1.?client#3?已再次獲取到互斥鎖

      1.?client#3?釋放互斥鎖

      1.?client#2?已獲取到互斥鎖

      1.?client#2?已再次獲取到互斥鎖

      1.?client#2?釋放互斥鎖

      2.?client#4?已獲取到互斥鎖

      2.?client#4?已再次獲取到互斥鎖

      2.?client#4?釋放互斥鎖

      ....

      ....

      client#2?客戶端關閉!

      9.?client#0?已獲取到互斥鎖

      9.?client#0?已再次獲取到互斥鎖

      9.?client#0?釋放互斥鎖

      9.?client#3?已獲取到互斥鎖

      9.?client#3?已再次獲取到互斥鎖

      9.?client#3?釋放互斥鎖

      client#0?客戶端關閉!

      8.?client#4?已獲取到互斥鎖

      8.?client#4?已再次獲取到互斥鎖

      8.?client#4?釋放互斥鎖

      9.?client#4?已獲取到互斥鎖

      9.?client#4?已再次獲取到互斥鎖

      9.?client#4?釋放互斥鎖

      client#3?客戶端關閉!

      client#4?客戶端關閉!

      結束!

      同時在程序運行期間查看Zookeeper節點樹,可以發現每一次請求的鎖實際上對應一個臨時順序節點

      [zk:?localhost:2181(CONNECTED)?42]?ls?/testZK/sharedreentrantlock

      [leases,?_c_208d461b-716d-43ea-ac94-1d2be1206db3-lock-0000001659,?locks,?_c_64b19dba-3efa-46a6-9344-19a52e9e424f-lock-0000001658,?_c_cee02916-d7d5-4186-8867-f921210b8815-lock-0000001657]

      Shared Lock 與 Shared Reentrant Lock 相似,但是不可重入。這個不可重入鎖由類 InterProcessSemaphoreMutex 來實現,使用方法和上面的類類似。

      將上面程序中的 InterProcessMutex 換成不可重入鎖 InterProcessSemaphoreMutex,如果再運行上面的代碼,結果就會發現線程被阻塞在第二個 acquire 上,直到超時,也就是此鎖不是可重入的。

      控制臺輸出日志

      0.?client#2?已獲取到互斥鎖

      0.?client#1?不能得到互斥鎖

      0.?client#4?不能得到互斥鎖

      0.?client#0?不能得到互斥鎖

      0.?client#3?不能得到互斥鎖

      client#1?客戶端關閉!

      client#4?客戶端關閉!

      client#3?客戶端關閉!

      client#0?客戶端關閉!

      0.?client#2?釋放互斥鎖

      0.?client#2?不能再次得到互斥鎖

      client#2?客戶端關閉!

      結束!

      把第二個獲取鎖的代碼注釋,程序才能正常執行

      0.?client#1?已獲取到互斥鎖

      0.?client#1?釋放互斥鎖

      0.?client#2?已獲取到互斥鎖

      0.?client#2?釋放互斥鎖

      0.?client#0?已獲取到互斥鎖

      0.?client#0?釋放互斥鎖

      0.?client#4?已獲取到互斥鎖

      0.?client#4?釋放互斥鎖

      0.?client#3?已獲取到互斥鎖

      0.?client#3?釋放互斥鎖

      1.?client#1?已獲取到互斥鎖

      1.?client#1?釋放互斥鎖

      1.?client#2?已獲取到互斥鎖

      1.?client#2?釋放互斥鎖

      ....

      ....

      9.?client#4?已獲取到互斥鎖

      9.?client#4?釋放互斥鎖

      9.?client#0?已獲取到互斥鎖

      client#2?客戶端關閉!

      9.?client#0?釋放互斥鎖

      9.?client#1?已獲取到互斥鎖

      client#0?客戶端關閉!

      client#4?客戶端關閉!

      9.?client#1?釋放互斥鎖

      9.?client#3?已獲取到互斥鎖

      client#1?客戶端關閉!

      9.?client#3?釋放互斥鎖

      client#3?客戶端關閉!

      結束!

      Shared Reentrant Read Write Lock,可重入讀寫鎖,一個讀寫鎖管理一對相關的鎖,一個負責讀操作,另外一個負責寫操作;讀操作在寫鎖沒被使用時可同時由多個進程使用,而寫鎖在使用時不允許讀(阻塞);此鎖是可重入的;一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖,這也意味著寫鎖可以降級成讀鎖, 比如 請求寫鎖 --->讀鎖 ---->釋放寫鎖;從讀鎖升級成寫鎖是不行的。

      可重入讀寫鎖主要由兩個類實現:InterProcessReadWriteLock、InterProcessMutex,使用時首先創建一個 InterProcessReadWriteLock 實例,然后再根據你的需求得到讀鎖或者寫鎖,讀寫鎖的類型是 InterProcessMutex。

      public?static?void?main(String[]?args)?throws?InterruptedException?{

      for?(int?i?=?0;?i?

      final?String?clientName?=?"client#"?+?i;

      new?Thread(new?Runnable()?{

      @Override

      public?void?run()?{

      CuratorFramework?client?=?ZKUtils.getClient();

      client.start();

      final?InterProcessReadWriteLock?lock?=?new?InterProcessReadWriteLock(client,?lockPath);

      final?InterProcessMutex?readLock?=?lock.readLock();

      final?InterProcessMutex?writeLock?=?lock.writeLock();

      try?{

      //?注意只能先得到寫鎖再得到讀鎖,不能反過來!!!

      if?(!writeLock.acquire(10,?TimeUnit.SECONDS))?{

      throw?new?IllegalStateException(clientName?+?"?不能得到寫鎖");

      }

      System.out.println(clientName?+?"?已得到寫鎖");

      if?(!readLock.acquire(10,?TimeUnit.SECONDS))?{

      throw?new?IllegalStateException(clientName?+?"?不能得到讀鎖");

      }

      System.out.println(clientName?+?"?已得到讀鎖");

      try?{

      resource.use();?//?使用資源

      }?finally?{

      System.out.println(clientName?+?"?釋放讀寫鎖");

      readLock.release();

      writeLock.release();

      }

      }?catch?(Exception?e)?{

      System.out.println(e.getMessage());

      利用Zookeeper實現 - 分布式鎖

      }?finally?{

      CloseableUtils.closeQuietly(client);

      countDownLatch.countDown();

      }

      }

      }).start();

      }

      countDownLatch.await();

      System.out.println("結束!");

      }

      }

      控制臺打印日志

      client#1?已得到寫鎖

      client#1?已得到讀鎖

      client#1?釋放讀寫鎖

      client#2?已得到寫鎖

      client#2?已得到讀鎖

      client#2?釋放讀寫鎖

      client#0?已得到寫鎖

      client#0?已得到讀鎖

      client#0?釋放讀寫鎖

      client#4?已得到寫鎖

      client#4?已得到讀鎖

      client#4?釋放讀寫鎖

      client#3?已得到寫鎖

      client#3?已得到讀鎖

      client#3?釋放讀寫鎖

      結束!

      Shared Semaphore,一個計數的信號量類似JDK的 Semaphore,JDK中 Semaphore 維護的一組許可(permits),而Cubator中稱之為租約(Lease)。有兩種方式可以決定 semaphore 的最大租約數,第一種方式是由用戶給定的 path 決定,第二種方式使用 SharedCountReader 類。如果不使用 SharedCountReader,沒有內部代碼檢查進程是否假定有10個租約而進程B假定有20個租約。 所以所有的實例必須使用相同的 numberOfLeases 值.

      信號量主要實現類有:

      InterProcessSemaphoreV2?-?信號量實現類

      Lease?-?租約(單個信號)

      SharedCountReader?-?計數器,用于計算最大租約數量

      調用 acquire 會返回一個租約對象,客戶端必須在 finally 中 close 這些租約對象,否則這些租約會丟失掉。但是,如果客戶端session由于某種原因比如crash丟掉,那么這些客戶端持有的租約會自動close,這樣其它客戶端可以繼續使用這些租約。租約還可以通過下面的方式返還:

      public?void?returnLease(Lease?lease)

      public?void?returnAll(Collection?leases)

      注意一次你可以請求多個租約,如果 Semaphore 當前的租約不夠,則請求線程會被阻塞。同時還提供了超時的重載方法。

      public?Lease?acquire()?throws?Exception

      public?Collection?acquire(int?qty)?throws?Exception

      public?Lease?acquire(long?time,?TimeUnit?unit)?throws?Exception

      public?Collection?acquire(int?qty,?long?time,?TimeUnit?unit)?throws?Exception

      一個Demo程序如下

      public?class?SharedSemaphoreTest?{

      private?static?final?int?MAX_LEASE?=?10;

      private?static?final?String?PATH?=?"/testZK/semaphore";

      private?static?final?FakeLimitedResource?resource?=?new?FakeLimitedResource();

      public?static?void?main(String[]?args)?throws?Exception?{

      CuratorFramework?client?=?ZKUtils.getClient();

      client.start();

      InterProcessSemaphoreV2?semaphore?=?new?InterProcessSemaphoreV2(client,?PATH,?MAX_LEASE);

      Collection?leases?=?semaphore.acquire(5);

      System.out.println("獲取租約數量:"?+?leases.size());

      Lease?lease?=?semaphore.acquire();

      System.out.println("獲取單個租約");

      resource.use();?//?使用資源

      //?再次申請獲取5個leases,此時leases數量只剩4個,不夠,將超時

      Collection?leases2?=?semaphore.acquire(5,?10,?TimeUnit.SECONDS);

      System.out.println("獲取租約,如果超時將為null:?"?+?leases2);

      System.out.println("釋放租約");

      semaphore.returnLease(lease);

      //?再次申請獲取5個,這次剛好夠

      leases2?=?semaphore.acquire(5,?10,?TimeUnit.SECONDS);

      System.out.println("獲取租約,如果超時將為null:?"?+?leases2);

      System.out.println("釋放集合中的所有租約");

      semaphore.returnAll(leases);

      semaphore.returnAll(leases2);

      client.close();

      System.out.println("結束!");

      }

      }

      控制臺打印日志

      獲取租約數量:5

      獲取單個租約

      獲取租約,如果超時將為null:?null

      釋放租約

      獲取租約,如果超時將為null:?[org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@3108bc,?org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@370736d9,?org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@5f9d02cb,?org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@63753b6d,?org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@6b09bb57]

      釋放集合中的所有租約

      結束!

      注意:上面所講的4種鎖都是公平鎖(fair)。從ZooKeeper的角度看,每個客戶端都按照請求的順序獲得鎖,相當公平。

      Multi Shared Lock 是一個鎖的容器。當調用 acquire,所有的鎖都會被 acquire,如果請求失敗,所有的鎖都會被 release。同樣調用 release 時所有的鎖都被 release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。

      主要涉及兩個類:

      InterProcessMultiLock?-?對所對象實現類

      InterProcessLock?-?分布式鎖接口類

      它的構造函數需要包含的鎖的集合,或者一組 ZooKeeper 的 path,用法和 Shared Lock 相同

      public?InterProcessMultiLock(CuratorFramework?client,?List?paths)

      public?InterProcessMultiLock(List?locks)

      一個Demo程序如下

      public?class?MultiSharedLockTest?{

      private?static?final?String?lockPath1?=?"/testZK/MSLock1";

      private?static?final?String?lockPath2?=?"/testZK/MSLock2";

      private?static?final?FakeLimitedResource?resource?=?new?FakeLimitedResource();

      public?static?void?main(String[]?args)?throws?Exception?{

      CuratorFramework?client?=?ZKUtils.getClient();

      client.start();

      InterProcessLock?lock1?=?new?InterProcessMutex(client,?lockPath1);?//?可重入鎖

      InterProcessLock?lock2?=?new?InterProcessSemaphoreMutex(client,?lockPath2);?//?不可重入鎖

      //?組鎖,多鎖

      InterProcessMultiLock?lock?=?new?InterProcessMultiLock(Arrays.asList(lock1,?lock2));

      if?(!lock.acquire(10,?TimeUnit.SECONDS))?{

      throw?new?IllegalStateException("不能獲取多鎖");

      }

      System.out.println("已獲取多鎖");

      System.out.println("是否有第一個鎖:?"?+?lock1.isAcquiredInThisProcess());

      System.out.println("是否有第二個鎖:?"?+?lock2.isAcquiredInThisProcess());

      try?{

      resource.use();?//?資源操作

      }?finally?{

      System.out.println("釋放多個鎖");

      lock.release();?//?釋放多鎖

      }

      System.out.println("是否有第一個鎖:?"?+?lock1.isAcquiredInThisProcess());

      System.out.println("是否有第二個鎖:?"?+?lock2.isAcquiredInThisProcess());

      client.close();

      System.out.println("結束!");

      }

      }

      代碼-:http://t.cn/EtVc1s4

      1、 《從Paxos到Zookeeper分布式一致性原理與實踐》

      2、 Apache Curator Recipes Docs

      3、 分布式鎖的幾種實現方式~

      4、 技術專題討論第四期:漫談分布式鎖

      5、 分布式鎖看這篇就夠了

      6、 Curator分布式鎖

      分布式 ZooKeeper 緩存

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:excel中如何把兩格內容合并(excel怎么把兩個單元格的內容合并)
      下一篇:手把手教你制作炫酷的PCB板3D效果圖
      相關文章
      亚洲色大成WWW亚洲女子| ASS亚洲熟妇毛茸茸PICS| 亚洲精品无码久久久久APP| 亚洲国产精品乱码在线观看97| 亚洲一区二区在线免费观看| 亚洲va久久久噜噜噜久久男同 | 亚洲H在线播放在线观看H| 亚洲精彩视频在线观看| 亚洲精品自拍视频| 亚洲特级aaaaaa毛片| 亚洲国产高清视频在线观看| 亚洲乱码无限2021芒果| 中文字幕在线观看亚洲视频| 色噜噜亚洲男人的天堂| 亚洲综合色婷婷在线观看| 亚洲日韩国产二区无码| 亚洲成av人无码亚洲成av人| WWW国产亚洲精品久久麻豆| 日本亚洲中午字幕乱码| 亚洲天堂在线视频| 亚洲午夜福利精品无码| 亚洲熟妇av一区二区三区| 亚洲gv白嫩小受在线观看| 亚洲av无码潮喷在线观看| 亚洲一区二区成人| 亚洲国产精品网站久久| 亚洲综合色一区二区三区| 久久久久亚洲AV无码去区首| 亚洲精品一级无码鲁丝片| 中文字幕亚洲无线码| 亚洲av之男人的天堂网站| 91亚洲导航深夜福利| 亚洲一区免费视频| 亚洲国产一区二区三区在线观看| 四虎亚洲国产成人久久精品| 久久久亚洲精品蜜桃臀| 亚洲AV成人一区二区三区AV| 亚洲最新中文字幕| 亚洲人成网站在线播放2019| 全亚洲最新黄色特级网站 | 亚洲女子高潮不断爆白浆|