利用Zookeeper實現 - 數據發布訂閱
數據發布/訂閱
所謂的數據發布/訂閱,意思是發布者將數據發布到ZooKeeper上的一個或一系列節點上,通過watcher機制,客戶端可以監聽(訂閱)這些數據節點,當這些節點發生變化時,ZooKeeper及時地通知客戶端,從而達到動態獲取數據的目的。
一種常見的場景就是配置中心。隨著應用越來越多,功能越來越復雜,機器也越來越多,對于一些公共的程序配置,譬如各種功能的開關、數據庫的配置、服務器的地址等,如果每個應用每個機器仍然單獨維護,當要修改配置時就得一個一個地修改,這樣顯然非常不方便。
這些公共的配置信息通常具備以下3個特性:
數據量通常比較小
數據發布/訂閱
所謂的數據發布/訂閱,意思是發布者將數據發布到Zookeeper上的一個或一系列節點上,通過watcher機制,客戶端可以監聽(訂閱)這些數據節點,當這些節點發生變化時,Zookeeper及時地通知客戶端,從而達到動態獲取數據的目的。
一種常見的場景就是配置中心。隨著應用越來越多,功能越來越復雜,機器也越來越多,對于一些公共的程序配置,譬如各種功能的開關、數據庫的配置、服務器的地址等,如果每個應用每個機器仍然單獨維護,當要修改配置時就得一個一個地修改,這樣顯然非常不方便。
這些公共的配置信息通常具備以下3個特性:
數據量通常比較小
數據內容在運行時發生動態變化
集群中各機器共享、配置一致
可以將這些配置抽取出來,交給配置中心統一管理起來。配置中心的架構一般是這樣:
開源配置中心
開源的配置中心有很多,各有特點,這里只列出幾個進行簡單地介紹。
github地址:https://github.com/ctripcorp/apollo
介紹:Apollo(阿波羅)是攜程框架部門研發的分布式配置中心,能夠集中化管理應用不同環境、不同集群的配置,配置修改后能夠實時推送到應用端,并且具備規范的權限、流程治理等特性,適用于微服務配置管理場景。
github地址:https://github.com/knightliao/disconf
介紹:專注于各種「分布式系統配置管理」的「通用組件」和「通用平臺」, 提供統一的「配置管理服務」。主要目標是部署極其簡單、部署動態化、統一管理、一個jar包,到處運行。
github地址:https://github.com/spring-cloud/spring-cloud-config
介紹:Spring Cloud Config是一個基于http協議的遠程配置實現方式,通過統一的配置管理服務器進行配置管理,客戶端通過https協議主動的拉取服務的的配置信息,完成配置獲取。
github地址:https://github.com/alibaba/nacos
介紹:Nacos是阿里最近才開源的一個更易于構建云原生應用的動態服務發現、配置管理和服務管理平臺。Nacos 致力于幫助您發現、配置和管理微服務。Nacos提供了一組簡單易用的特性集,幫助您快速實現動態服務發現、服務配置、服務元數據及流量管理。
開源的配置中心當然都很優秀,但是現在我們還是先利用Zookeeper來實現一個屬于自己的配置中心。
我們的配置中心保存的配置信息十分簡單,就是JDBC連接MySQL需要用的連接信息。這些連接信息將轉化為JSON字符串,保存在Zookeeper上的一個節點中;應用程序(通過線程模擬的)從Zookeeper中讀取這些配置信息,然后查詢數據庫;當修改數據庫連接信息時(切換數據庫),應用程序能及時的拉取新的連接信息,使用新的連接查詢數據庫。
定義一個 MysqlConfig 類,方便使用 FastJSON 將配置信息在JSON字符串與對象之間做轉換。
@AllArgsConstructor
@Data
public?class?MysqlConfig?{
private?String?url;
private?String?driver;
private?String?username;
private?String?password;
}
最開始,將Zookeeper上節點的配置信息初始化為 test 數據庫的連接信息,然后啟動 N 個線程(模擬應用程序),讀取連接信息并查詢數據,同時設置監聽節點;等待 10 秒鐘之后,將配置切換為 test2 數據庫的連接信息,這時應用程序將受到配置變更的通知,然后獲取信息連接信息,重新查詢數據庫。
//?工具類
public?class?ZKUtils?{
private?static?final?String?zkServerIps?=?"master:2181,hadoop2:2181";
public?static?synchronized?CuratorFramework?getClient()?{
CuratorFramework?client?=?CuratorFrameworkFactory.builder().connectString(zkServerIps)
.sessionTimeoutMs(6000).connectionTimeoutMs(3000)?//.namespace("LeaderLatchTest")
.retryPolicy(new?ExponentialBackoffRetry(1000,?3)).build();
return?client;
}
}
//?配置中心示例,模擬數據庫切換
public?class?ConfigCenterTest?{
//?test?數據庫的?test1?表
private?static?final?MysqlConfig?mysqlConfig_1?=?new?MysqlConfig("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false",?"com.mysql.jdbc.Driver",?"root",?"123456");
//?test2?數據庫的?test1?表
private?static?final?MysqlConfig?mysqlConfig_2?=?new?MysqlConfig("jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false",?"com.mysql.jdbc.Driver",?"root",?"123456");
//?存儲MySQL配置信息的節點路徑
private?static?final?String?configPath?=?"/testZK/jdbc/mysql";
private?static?final?Integer?clientNums?=?3;
private?static?CountDownLatch?countDownLatch?=?new?CountDownLatch(clientNums);
public?static?void?main(String[]?args)?throws?Exception?{
//?最開始時設置MySQL配置信息為?mysqlConfig_1
setMysqlConfig(mysqlConfig_1);
//?啟動?clientNums?個線程,模擬分布式系統中的節點,
//?從Zookeeper中獲取MySQL的配置信息,查詢數據
for?(int?i?=?0;?i?
String?clientName?=?"client#"?+?i;
new?Thread(new?Runnable()?{
@Override
public?void?run()?{
CuratorFramework?client?=?ZKUtils.getClient();
client.start();
try?{
Stat?stat?=?new?Stat();
//?如果要監聽多個子節點則應該使用?PathChildrenCache
final?NodeCache?cacheNode?=?new?NodeCache(client,?configPath,?false);
cacheNode.start(true);??//?true?表示啟動時立即從Zookeeper上獲取節點
byte[]?nodeData?=?cacheNode.getCurrentData().getData();
MysqlConfig?mysqlConfig?=?JSON.parseObject(new?String(nodeData),?MysqlConfig.class);
queryMysql(clientName,?mysqlConfig);????//?查詢數據
cacheNode.getListenable().addListener(new?NodeCacheListener()?{
@Override
public?void?nodeChanged()?throws?Exception?{
byte[]?newData?=?cacheNode.getCurrentData().getData();
MysqlConfig?newMysqlConfig?=?JSON.parseObject(new?String(newData),?MysqlConfig.class);
queryMysql(clientName,?newMysqlConfig);????//?查詢數據
}
});
Thread.sleep(20?*?1000);
}?catch?(Exception?e)?{
e.printStackTrace();
}?finally?{
client.close();
countDownLatch.countDown();
}
}
}).start();
}
Thread.sleep(10?*?1000);
System.out.println("\n---------10秒鐘后將MySQL配置信息修改為?mysqlConfig_2---------\n");
setMysqlConfig(mysqlConfig_2);
countDownLatch.await();
}
/**
*?初始化,最開始的時候的MySQL配置為?mysqlConfig_1
*/
public?static?void?setMysqlConfig(MysqlConfig?config)?throws?Exception?{
CuratorFramework?client?=?ZKUtils.getClient();
client.start();
String?mysqlConfigStr?=?JSON.toJSONString(config);
Stat?s?=?client.checkExists().forPath(configPath);
if?(s?!=?null)?{
Stat?resultStat?=?client.setData().forPath(configPath,?mysqlConfigStr.getBytes());
System.out.println(String.format("節點?%s?已存在,更新數據為:%s",?configPath,?mysqlConfigStr));
}?else?{
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(configPath,?mysqlConfigStr.getBytes());
System.out.println(String.format("創建節點:%s,初始化數據為:%s",?configPath,?mysqlConfigStr));
}
System.out.println();
client.close();
}
/**
*?通過配置信息,查詢MySQL數據庫
*/
public?static?synchronized?void?queryMysql(String?clientName,?MysqlConfig?mysqlConfig)?throws?ClassNotFoundException,?SQLException?{
System.out.println(clientName?+?"?查詢MySQL數據,使用的MySQL配置信息:"?+?mysqlConfig);
Class.forName(mysqlConfig.getDriver());
Connection?connection?=?DriverManager.getConnection(mysqlConfig.getUrl(),?mysqlConfig.getUsername(),?mysqlConfig.getPassword());
Statement?statement?=?connection.createStatement();
ResultSet?resultSet?=?statement.executeQuery("select?*?from?test1");
while?(resultSet.next())?{
System.out.println(String.format("id=%s,?name=%s,?age=%s",?resultSet.getString(1),?resultSet.getString(2),?resultSet.getString(3)));
}
System.out.println();
resultSet.close();
statement.close();
connection.close();
}
}
控制臺打印日志
client#2?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)
id=2,?name=賴鍵鋒,?age=25
id=3,?name=小旋鋒,?age=22000
id=4,?name=test,?age=100
client#1?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)
id=2,?name=賴鍵鋒,?age=25
id=3,?name=小旋鋒,?age=22000
id=4,?name=test,?age=100
client#0?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)
id=2,?name=賴鍵鋒,?age=25
id=3,?name=小旋鋒,?age=22000
id=4,?name=test,?age=100
---------10秒鐘后將MySQL配置信息修改為?mysqlConfig_2---------
節點?/testZK/jdbc/mysql?已存在,更新數據為:{"driver":"com.mysql.jdbc.Driver","password":"123456","url":"jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false","username":"root"}
client#1?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)
id=2,?name=賴鍵鋒,?age=23
id=3,?name=小旋鋒,?age=22
id=4,?name=whirly,?age=24
client#2?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)
id=2,?name=賴鍵鋒,?age=23
id=3,?name=小旋鋒,?age=22
id=4,?name=whirly,?age=24
client#0?查詢MySQL數據,使用的MySQL配置信息:MysqlConfig(url=jdbc:mysql://master:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false,?driver=com.mysql.jdbc.Driver,?username=root,?password=123456)
id=2,?name=賴鍵鋒,?age=23
id=3,?name=小旋鋒,?age=22
id=4,?name=whirly,?age=24
上面采用的示例是通過 NodeCache 來監聽單個節點,如果要監聽多個子節點則須使用 PathChildrenCache,使用示例可以參考《Zookeeper 分布式協調服務介紹》
Zookeeper 分布式協調服務介紹
利用Zookeeper實現 - Master選舉
代碼下載:http://t.cn/E5ncvDR
我的博客:laijianfeng.org
參考:
《從Paxos到Zookeeper分布式一致性原理與實踐》
大數據 ZooKeeper
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。