11Nacos配置中心之客戶端長輪詢

      網友投稿 1325 2022-05-29

      11Nacos配置中心之客戶端長輪詢

      createConfigService

      NacosConfigService構造

      ClientWorker構造

      checkConfigInfo()方法:

      LongPollingRunnable

      checkUpdateDataIds()方法

      getServerConfig

      總結

      11Nacos配置中心之客戶端長輪詢

      createConfigService

      NacosConfigService構造

      ClientWorker構造

      checkConfigInfo()方法:

      LongPollingRunnable

      checkUpdateDataIds()方法

      getServerConfig

      總結

      11Nacos配置中心之客戶端長輪詢

      客戶端長輪詢定時任務是在NacosFactory的createConfigService構建ConfigService對象實例的時候啟動的

      11Nacos配置中心之客戶端長輪詢

      createConfigService

      public static ConfigService createConfigService(String serverAddr) throws NacosException { return ConfigFactory.createConfigService(serverAddr); }

      public class ConfigFactory { /** * Create Config * * @param properties init param * @return ConfigService * @throws NacosException Exception */ public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } } /** * Create Config * * @param serverAddr serverList * @return Config * @throws ConfigService Exception */ public static ConfigService createConfigService(String serverAddr) throws NacosException { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); return createConfigService(properties); } }

      通過Class.forName加載NacosConfigService類

      使用反射來完成NacosConfigService類的實例化

      NacosConfigService構造

      NacosConfigService構造方法:

      public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty("encode"); if (StringUtils.isBlank(encodeTmp)) { this.encode = "UTF-8"; } else { this.encode = encodeTmp.trim(); } this.initNamespace(properties); this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }

      它的構造方法中:

      初始化HttpAgent,使用了裝飾器模式,實際工作的類是ServerHttpAgent,MetricsHttpAgent內部也調用了ServerHttpAgent的方法,增加監控統計信息

      ClientWorker是客戶端的工作類,agent作為參數傳入ClientWorker,用agent做一些遠程調用

      ClientWorker構造

      ClientWorker的構造函數:

      @SuppressWarnings("PMD.ThreadPoolCreationRule") public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }

      構造方法中:

      構建定時調度的線程池,第一個線程池executor只擁有一個核心線程,每隔10s執行一次checkConfigInfo()方法,功能就是每10ms檢查一次配置信息

      第二個線程池executorService只完成了初始化,后續用于客戶端的定時長輪詢功能。

      checkConfigInfo()方法:

      public void checkConfigInfo() { int listenerSize = cacheMap.get().size(); int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }

      這個方法的主要功能就是檢查配置信息是否發送變化,

      獲取監聽個數

      分配長輪詢任務數,向上取整

      判斷長輪詢任務數是否比當前長輪詢任務數大,如果大的話創建指定就創建線程達到所需的任務數的線程數量,如果不比當前任務數就把求得長輪詢任務數賦值給當前長輪詢任務數

      cacheMap用來存儲監聽變更的緩存集合,key是根據dataID/group/tenant拼接的值。Value是對應的存儲在Nacos服務器上的配置文件的內容。

      默認情況下每個長輪詢LongPollingRunnable任務處理3000個監聽配置集,超過3000個啟動多個LongPollingRunnable執行。

      LongPollingRunnable

      LongPollingRunnable是一個線程,我們可以直接找到LongPollingRunnable里面的run方法

      class LongPollingRunnable implements Runnable { private int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List cacheDatas = new ArrayList(); List inInitializingCacheList = new ArrayList(); try { // check failover config for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } }

      LongPollingRunnable類的run()方法中:

      遍歷CacheData,檢查本地配置,根據taskId對cacheMap進行數據分割,通過checkLocalConfig方法檢查本地配置,本地在${user}\naocs\config\目錄下緩存一份服務端的配置信息,checkLocalConfig將內存中的數據和本地磁盤數據比較,不一致說明數據發生了變化,需要觸發事件通知。

      執行checkUpdateDataIds方法在服務端建立長輪詢機制,通過長輪詢檢查數據變更。

      遍歷變更數據集合changedGroupKeys,調用getServerConfig方法,根據dataId,group,tenant去服務端讀取對應的配置信息并保存到本地文件中。

      繼續定時執行當前線程

      checkUpdateDataIds()方法

      checkUpdateDataIds()方法基于長連接方式監聽服務端配置的變化,最后根據變化數據的key去服務端獲取最新數據。

      checkUpdateDataIds中調用checkUpdateConfigStr

      /** * */ List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List headers = new ArrayList(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }

      這個方法的作用就是從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的,保證不返回NULL

      checkUpdateConfigStr()方法中通過agent.httpPost調用/v1/cs/configs/listener接口實現長輪詢請求。長輪詢請求是實現層面只是設置了一個比較長的超時時間,默認30s。如果服務端的數據發生變更,客戶端會收到HttpResult。服務端返回的是存在數據變更的dataId, group, tenant。獲得這些信息后,在LongPollingRunnable的run方法中調用getServerConfig方法從Nacos服務器中讀取具體的配置內容。

      getServerConfig

      從Nacos服務器中讀取具體的配置內容:

      public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpResult result = null; try { List params = null; if (StringUtils.isBlank(tenant)) { params = Arrays.asList("dataId", dataId, "group", group); } else { params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (IOException e) { String message = String.format( "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, e); throw new NacosException(NacosException.SERVER_ERROR, e); } switch (result.code) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content); return result.content; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return null; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, " + "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.code, result.content); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code); throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }

      總結

      現在我們知道Nacos配置中心的客戶端做了哪些事了,客戶端創建NacosConfigService實例,它的構造方法中創建了ClientWorker對象,ClientWorker中就設定了定時線程每隔10秒執行一次checkConfigInfo()方法來檢查配置信息是否變更,使用的線程是LongPollingRunnable,它的run()方法中的邏輯就是調用checkUpdateDataIds()方法檢查是否數據變更,本質是調用服務端的/v1/cs/configs/listener接口來實現的

      任務調度

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:華為云企業級Redis揭秘第二期:Redis消息隊列Stream應用探討
      下一篇:點擊劫持攻擊及防御
      相關文章
      亚洲精品国产品国语在线| 亚洲福利一区二区精品秒拍| 久久久久亚洲精品天堂| 国产精品亚洲片在线花蝴蝶| va天堂va亚洲va影视中文字幕| 亚洲欧洲中文日产| 91亚洲一区二区在线观看不卡| 亚洲成av人在线视| 亚洲中文字幕久久精品无码APP| 亚洲精品美女久久久久99小说| 亚洲a无码综合a国产av中文| 亚洲Av永久无码精品一区二区| 亚洲国产精品无码久久久秋霞1 | 亚洲中久无码不卡永久在线观看| 偷自拍亚洲视频在线观看| 日本中文一区二区三区亚洲 | 亚洲va无码手机在线电影| 亚洲精品乱码久久久久66| 久久精品国产亚洲综合色| 亚洲成在人线av| 亚洲国产精品自在在线观看| 久久水蜜桃亚洲av无码精品麻豆| 自怕偷自怕亚洲精品| 亚洲国产成人va在线观看网址| 亚洲中字慕日产2021| 国产亚洲精品bv在线观看| 亚洲精品蜜夜内射| 美国毛片亚洲社区在线观看| 亚洲国产一区二区三区| 亚洲伊人久久综合中文成人网| 亚洲一区二区三区无码中文字幕 | 亚洲欧洲日产国码在线观看| 亚洲喷奶水中文字幕电影| 久久精品国产亚洲αv忘忧草| 亚洲一久久久久久久久| 日韩欧美亚洲国产精品字幕久久久| 国产成人综合亚洲绿色| 国产亚洲精品影视在线产品 | 亚洲乱码国产乱码精华| 国产精品亚洲va在线观看| 国产AV无码专区亚洲AV手机麻豆|