11Nacos配置中心之客戶端長輪詢
11Nacos配置中心之客戶端長輪詢
createConfigService
NacosConfigService構造
ClientWorker構造
checkConfigInfo()方法:
LongPollingRunnable
checkUpdateDataIds()方法
getServerConfig
總結
11Nacos配置中心之客戶端長輪詢
createConfigService
NacosConfigService構造
ClientWorker構造
checkConfigInfo()方法:
LongPollingRunnable
checkUpdateDataIds()方法
getServerConfig
總結
11Nacos配置中心之客戶端長輪詢
客戶端長輪詢定時任務是在NacosFactory的createConfigService構建ConfigService對象實例的時候啟動的
createConfigService
public static ConfigService createConfigService(String serverAddr) throws NacosException { return ConfigFactory.createConfigService(serverAddr); }
public class ConfigFactory { /** * Create Config * * @param properties init param * @return ConfigService * @throws NacosException Exception */ public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } } /** * Create Config * * @param serverAddr serverList * @return Config * @throws ConfigService Exception */ public static ConfigService createConfigService(String serverAddr) throws NacosException { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); return createConfigService(properties); } }
通過Class.forName加載NacosConfigService類
使用反射來完成NacosConfigService類的實例化
NacosConfigService構造
NacosConfigService構造方法:
public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty("encode"); if (StringUtils.isBlank(encodeTmp)) { this.encode = "UTF-8"; } else { this.encode = encodeTmp.trim(); } this.initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }
它的構造方法中:
初始化HttpAgent,使用了裝飾器模式,實際工作的類是ServerHttpAgent,MetricsHttpAgent內部也調用了ServerHttpAgent的方法,增加監控統計信息
ClientWorker是客戶端的工作類,agent作為參數傳入ClientWorker,用agent做一些遠程調用
ClientWorker構造
ClientWorker的構造函數:
@SuppressWarnings("PMD.ThreadPoolCreationRule") public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
構造方法中:
構建定時調度的線程池,第一個線程池executor只擁有一個核心線程,每隔10s執行一次checkConfigInfo()方法,功能就是每10ms檢查一次配置信息
第二個線程池executorService只完成了初始化,后續用于客戶端的定時長輪詢功能。
checkConfigInfo()方法:
public void checkConfigInfo() { int listenerSize = cacheMap.get().size(); int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
這個方法的主要功能就是檢查配置信息是否發送變化,
獲取監聽個數
分配長輪詢任務數,向上取整
判斷長輪詢任務數是否比當前長輪詢任務數大,如果大的話創建指定就創建線程達到所需的任務數的線程數量,如果不比當前任務數就把求得長輪詢任務數賦值給當前長輪詢任務數
cacheMap用來存儲監聽變更的緩存集合,key是根據dataID/group/tenant拼接的值。Value是對應的存儲在Nacos服務器上的配置文件的內容。
默認情況下每個長輪詢LongPollingRunnable任務處理3000個監聽配置集,超過3000個啟動多個LongPollingRunnable執行。
LongPollingRunnable
LongPollingRunnable是一個線程,我們可以直接找到LongPollingRunnable里面的run方法
class LongPollingRunnable implements Runnable { private int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List
LongPollingRunnable類的run()方法中:
遍歷CacheData,檢查本地配置,根據taskId對cacheMap進行數據分割,通過checkLocalConfig方法檢查本地配置,本地在${user}\naocs\config\目錄下緩存一份服務端的配置信息,checkLocalConfig將內存中的數據和本地磁盤數據比較,不一致說明數據發生了變化,需要觸發事件通知。
執行checkUpdateDataIds方法在服務端建立長輪詢機制,通過長輪詢檢查數據變更。
遍歷變更數據集合changedGroupKeys,調用getServerConfig方法,根據dataId,group,tenant去服務端讀取對應的配置信息并保存到本地文件中。
繼續定時執行當前線程
checkUpdateDataIds()方法
checkUpdateDataIds()方法基于長連接方式監聽服務端配置的變化,最后根據變化數據的key去服務端獲取最新數據。
checkUpdateDataIds中調用checkUpdateConfigStr
/** * */ List
這個方法的作用就是從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的,保證不返回NULL
checkUpdateConfigStr()方法中通過agent.httpPost調用/v1/cs/configs/listener接口實現長輪詢請求。長輪詢請求是實現層面只是設置了一個比較長的超時時間,默認30s。如果服務端的數據發生變更,客戶端會收到HttpResult。服務端返回的是存在數據變更的dataId, group, tenant。獲得這些信息后,在LongPollingRunnable的run方法中調用getServerConfig方法從Nacos服務器中讀取具體的配置內容。
getServerConfig
從Nacos服務器中讀取具體的配置內容:
public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpResult result = null; try { List
總結
現在我們知道Nacos配置中心的客戶端做了哪些事了,客戶端創建NacosConfigService實例,它的構造方法中創建了ClientWorker對象,ClientWorker中就設定了定時線程每隔10秒執行一次checkConfigInfo()方法來檢查配置信息是否變更,使用的線程是LongPollingRunnable,它的run()方法中的邏輯就是調用checkUpdateDataIds()方法檢查是否數據變更,本質是調用服務端的/v1/cs/configs/listener接口來實現的
任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。