利用Zookeeper實現 - 數據發布訂閱

      網友投稿 776 2025-04-01

      數據發布/訂閱

      所謂的數據發布/訂閱,意思是發布者將數據發布到ZooKeeper上的一個或一系列節點上,通過watcher機制,客戶端可以監聽(訂閱)這些數據節點,當這些節點發生變化時,ZooKeeper及時地通知客戶端,從而達到動態獲取數據的目的。

      一種常見的場景就是配置中心。隨著應用越來越多,功能越來越復雜,機器也越來越多,對于一些公共的程序配置,譬如各種功能的開關、數據庫的配置、服務器的地址等,如果每個應用每個機器仍然單獨維護,當要修改配置時就得一個一個地修改,這樣顯然非常不方便。

      這些公共的配置信息通常具備以下3個特性:

      數據量通常比較小

      數據發布/訂閱

      所謂的數據發布/訂閱,意思是發布者將數據發布到Zookeeper上的一個或一系列節點上,通過watcher機制,客戶端可以監聽(訂閱)這些數據節點,當這些節點發生變化時,Zookeeper及時地通知客戶端,從而達到動態獲取數據的目的。

      一種常見的場景就是配置中心。隨著應用越來越多,功能越來越復雜,機器也越來越多,對于一些公共的程序配置,譬如各種功能的開關、數據庫的配置、服務器的地址等,如果每個應用每個機器仍然單獨維護,當要修改配置時就得一個一個地修改,這樣顯然非常不方便。

      這些公共的配置信息通常具備以下3個特性:

      數據量通常比較小

      數據內容在運行時發生動態變化

      集群中各機器共享、配置一致

      可以將這些配置抽取出來,交給配置中心統一管理起來。配置中心的架構一般是這樣:

      開源配置中心

      開源的配置中心有很多,各有特點,這里只列出幾個進行簡單地介紹。

      github地址:https://github.com/ctripcorp/apollo

      介紹:Apollo(阿波羅)是攜程框架部門研發的分布式配置中心,能夠集中化管理應用不同環境、不同集群的配置,配置修改后能夠實時推送到應用端,并且具備規范的權限、流程治理等特性,適用于微服務配置管理場景。

      github地址:https://github.com/knightliao/disconf

      介紹:專注于各種「分布式系統配置管理」的「通用組件」和「通用平臺」, 提供統一的「配置管理服務」。主要目標是部署極其簡單、部署動態化、統一管理、一個jar包,到處運行。

      github地址:https://github.com/spring-cloud/spring-cloud-config

      介紹:Spring Cloud Config是一個基于http協議的遠程配置實現方式,通過統一的配置管理服務器進行配置管理,客戶端通過https協議主動的拉取服務的的配置信息,完成配置獲取。

      github地址:https://github.com/alibaba/nacos

      介紹:Nacos是阿里最近才開源的一個更易于構建云原生應用的動態服務發現、配置管理和服務管理平臺。Nacos 致力于幫助您發現、配置和管理微服務。Nacos提供了一組簡單易用的特性集,幫助您快速實現動態服務發現、服務配置、服務元數據及流量管理。

      開源的配置中心當然都很優秀,但是現在我們還是先利用Zookeeper來實現一個屬于自己的配置中心。

      我們的配置中心保存的配置信息十分簡單,就是JDBC連接MySQL需要用的連接信息。這些連接信息將轉化為JSON字符串,保存在Zookeeper上的一個節點中;應用程序(通過線程模擬的)從Zookeeper中讀取這些配置信息,然后查詢數據庫;當修改數據庫連接信息時(切換數據庫),應用程序能及時的拉取新的連接信息,使用新的連接查詢數據庫。

      定義一個 MysqlConfig 類,方便使用 FastJSON 將配置信息在JSON字符串與對象之間做轉換。

      @AllArgsConstructor

      @Data

      public?class?MysqlConfig?{

      private?String?url;

      private?String?driver;

      private?String?username;

      private?String?password;

      }

      最開始,將Zookeeper上節點的配置信息初始化為 test 數據庫的連接信息,然后啟動 N 個線程(模擬應用程序),讀取連接信息并查詢數據,同時設置監聽節點;等待 10 秒鐘之后,將配置切換為 test2 數據庫的連接信息,這時應用程序將受到配置變更的通知,然后獲取信息連接信息,重新查詢數據庫。

      //?工具類

      public?class?ZKUtils?{

      private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";

      public?static?synchronized?CuratorFramework?getClient()?{

      CuratorFramework?client?=?CuratorFrameworkFactory.builder().connectString(zkServerIps)

      .sessionTimeoutMs(6000).connectionTimeoutMs(3000)?//.namespace("LeaderLatchTest")

      .retryPolicy(new?ExponentialBackoffRetry(1000,?3)).build();

      return?client;

      }

      }

      //?配置中心示例,模擬數據庫切換

      public?class?ConfigCenterTest?{

      //?test?數據庫的?test1?表

      private?static?final?MysqlConfig?mysqlConfig_1?=?new?MysqlConfig("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false",?"com.mysql.jdbc.Driver",?"root",?"123456");

      //?test2?數據庫的?test1?表

      private?static?final?MysqlConfig?mysqlConfig_2?=?new?MysqlConfig("jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false",?"com.mysql.jdbc.Driver",?"root",?"123456");

      //?存儲MySQL配置信息的節點路徑

      private?static?final?String?configPath?=?"/testZK/jdbc/mysql";

      private?static?final?Integer?clientNums?=?3;

      private?static?CountDownLatch?countDownLatch?=?new?CountDownLatch(clientNums);

      public?static?void?main(String[]?args)?throws?Exception?{

      //?最開始時設置MySQL配置信息為?mysqlConfig_1

      setMysqlConfig(mysqlConfig_1);

      //?啟動?clientNums?個線程,模擬分布式系統中的節點,

      //?從Zookeeper中獲取MySQL的配置信息,查詢數據

      for?(int?i?=?0;?i?

      String?clientName?=?"client#"?+?i;

      new?Thread(new?Runnable()?{

      @Override

      public?void?run()?{

      CuratorFramework?client?=?ZKUtils.getClient();

      client.start();

      try?{

      Stat?stat?=?new?Stat();

      利用Zookeeper實現 - 數據發布訂閱

      //?如果要監聽多個子節點則應該使用?PathChildrenCache

      final?NodeCache?cacheNode?=?new?NodeCache(client,?configPath,?false);

      cacheNode.start(true);??//?true?表示啟動時立即從Zookeeper上獲取節點

      byte[]?nodeData?=?cacheNode.getCurrentData().getData();

      MysqlConfig?mysqlConfig?=?JSON.parseObject(new?String(nodeData),?MysqlConfig.class);

      queryMysql(clientName,?mysqlConfig);????//?查詢數據

      cacheNode.getListenable().addListener(new?NodeCacheListener()?{

      @Override

      public?void?nodeChanged()?throws?Exception?{

      byte[]?newData?=?cacheNode.getCurrentData().getData();

      MysqlConfig?newMysqlConfig?=?JSON.parseObject(new?String(newData),?MysqlConfig.class);

      queryMysql(clientName,?newMysqlConfig);????//?查詢數據

      }

      });

      Thread.sleep(20?*?1000);

      }?catch?(Exception?e)?{

      e.printStackTrace();

      }?finally?{

      client.close();

      countDownLatch.countDown();

      }

      }

      }).start();

      }

      Thread.sleep(10?*?1000);

      System.out.println("\n---------10秒鐘后將MySQL配置信息修改為?mysqlConfig_2---------\n");

      setMysqlConfig(mysqlConfig_2);

      countDownLatch.await();

      }

      /**

      *?初始化,最開始的時候的MySQL配置為?mysqlConfig_1

      */

      public?static?void?setMysqlConfig(MysqlConfig?config)?throws?Exception?{

      CuratorFramework?client?=?ZKUtils.getClient();

      client.start();

      String?mysqlConfigStr?=?JSON.toJSONString(config);

      Stat?s?=?client.checkExists().forPath(configPath);

      if?(s?!=?null)?{

      Stat?resultStat?=?client.setData().forPath(configPath,?mysqlConfigStr.getBytes());

      System.out.println(String.format("節點?%s?已存在,更新數據為:%s",?configPath,?mysqlConfigStr));

      }?else?{

      client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(configPath,?mysqlConfigStr.getBytes());

      System.out.println(String.format("創建節點:%s,初始化數據為:%s",?configPath,?mysqlConfigStr));

      }

      System.out.println();

      client.close();

      }

      /**

      *?通過配置信息,查詢MySQL數據庫

      */

      public?static?synchronized?void?queryMysql(String?clientName,?MysqlConfig?mysqlConfig)?throws?ClassNotFoundException,?SQLException?{

      System.out.println(clientName?+?"?查詢MySQL數據,使用的MySQL配置信息:"?+?mysqlConfig);

      Class.forName(mysqlConfig.getDriver());

      Connection?connection?=?DriverManager.getConnection(mysqlConfig.getUrl(),?mysqlConfig.getUsername(),?mysqlConfig.getPassword());

      Statement?statement?=?connection.createStatement();

      ResultSet?resultSet?=?statement.executeQuery("select?*?from?test1");

      while?(resultSet.next())?{

      System.out.println(String.format("id=%s,?name=%s,?age=%s",?resultSet.getString(1),?resultSet.getString(2),?resultSet.getString(3)));

      }

      System.out.println();

      resultSet.close();

      statement.close();

      connection.close();

      }

      }

      控制臺打印日志

      client#2?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)

      id=2,?name=賴鍵鋒,?age=25

      id=3,?name=小旋鋒,?age=22000

      id=4,?name=test,?age=100

      client#1?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)

      id=2,?name=賴鍵鋒,?age=25

      id=3,?name=小旋鋒,?age=22000

      id=4,?name=test,?age=100

      client#0?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)

      id=2,?name=賴鍵鋒,?age=25

      id=3,?name=小旋鋒,?age=22000

      id=4,?name=test,?age=100

      ---------10秒鐘后將MySQL配置信息修改為?mysqlConfig_2---------

      節點?/testZK/jdbc/mysql?已存在,更新數據為:{"driver":"com.mysql.jdbc.Driver","password":"123456","url":"jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false","username":"root"}

      client#1?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)

      id=2,?name=賴鍵鋒,?age=23

      id=3,?name=小旋鋒,?age=22

      id=4,?name=whirly,?age=24

      client#2?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)

      id=2,?name=賴鍵鋒,?age=23

      id=3,?name=小旋鋒,?age=22

      id=4,?name=whirly,?age=24

      client#0?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)

      id=2,?name=賴鍵鋒,?age=23

      id=3,?name=小旋鋒,?age=22

      id=4,?name=whirly,?age=24

      上面采用的示例是通過 NodeCache 來監聽單個節點,如果要監聽多個子節點則須使用 PathChildrenCache,使用示例可以參考《Zookeeper 分布式協調服務介紹》

      Zookeeper 分布式協調服務介紹

      利用Zookeeper實現 - Master選舉

      代碼下載:http://t.cn/E5ncvDR

      我的博客:laijianfeng.org

      參考:

      《從Paxos到Zookeeper分布式一致性原理與實踐》

      大數據 ZooKeeper

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:RFID技術的產品防偽上應用
      下一篇:高級的自動篩選效果選取關鍵字后自動從源表中篩選出結果(內容自動篩選)
      相關文章
      亚洲国产精品xo在线观看| 亚洲日韩小电影在线观看| 亚洲av中文无码乱人伦在线r▽| 亚洲精品WWW久久久久久| 亚洲AV性色在线观看| 亚洲色大成网站www久久九| 亚洲天堂一区二区三区四区| 久久久久亚洲AV成人片| 久久精品亚洲一区二区三区浴池 | 久久亚洲精品中文字幕| 亚洲av无码乱码国产精品fc2| 久久精品国产精品亚洲精品| 国产偷v国产偷v亚洲高清| 国产亚洲一区二区三区在线观看| 亚洲成AV人片一区二区| 亚洲Aⅴ无码专区在线观看q| 久久久久亚洲AV无码专区首| 久久亚洲精品AB无码播放| 亚洲国产精品一区二区久久| 久久夜色精品国产噜噜噜亚洲AV| 亚洲美女色在线欧洲美女| 亚洲成人动漫在线观看| 亚洲精品第一国产综合野| 亚洲熟妇无码AV| 色九月亚洲综合网| 国产成人亚洲精品影院| 在线播放亚洲第一字幕| 亚洲国产另类久久久精品| 亚洲天堂中文资源| 亚洲午夜国产精品| 在线aⅴ亚洲中文字幕| 精品亚洲国产成人av| 亚洲?V乱码久久精品蜜桃| 亚洲综合国产精品第一页| 亚洲va无码va在线va天堂| 亚洲系列中文字幕| 亚洲а∨天堂久久精品9966| 婷婷亚洲天堂影院| 国产亚洲人成A在线V网站 | 好看的亚洲黄色经典| 亚洲国产精品自在在线观看|