美團動態線程池實踐思路,開源了
大家好,今天我們來聊一個比較實用的話題,動態可監控的線程池實踐,全新開源項目(DynamicTp)地址在文章末尾,歡迎交流學習。
寫在前面
稍微有些Java編程經驗的小伙伴都知道,Java的精髓在juc包,這是大名鼎鼎的Doug Lea老爺
子的杰作,評價一個程序員Java水平怎么樣,一定程度上看他對juc包下的一些技術掌握的怎么樣,這也是面試中的基本上必問的一些技術點之一。
juc包主要包括:
1.原子類(AtomicXXX)
2.鎖類(XXXLock)
3.線程同步類(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)
4.任務執行器類(Executor體系類,包括今天的主角ThreadPoolExecutor)
5.并發集合類(ConcurrentXXX、CopyOnWriteXXX)相關集合類
6.阻塞隊列類(BlockingQueue繼承體系類)
7.Future相關類
8.其他一些輔助工具類
多線程編程場景下,這些類都是必備技能,會這些可以幫助我們寫出高質量、高性能、少bug的代碼,同時這些也是Java中比較難啃的一些技術,需要持之以恒,學以致用,在使用中感受他們帶來的奧妙。
上邊簡單羅列了下juc包下功能分類,這篇文章我們主要來介紹動態可監控線程池的,所以具體內容也就不展開講了,以后有時間單獨來聊吧。看這篇文章前,希望讀者最好有一定的線程池ThreadPoolExecutor使用經驗,不然看起來會有點懵。
如果你對ThreadPoolExecutor不是很熟悉,推薦閱讀下面兩篇文章
javadoop: https://www.javadoop.com/post/java-thread-pool
美團技術博客: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
背景
使用ThreadPoolExecutor過程中你是否有以下痛點呢?
1.代碼中創建了一個ThreadPoolExecutor,但是不知道那幾個核心參數設置多少比較合適
2.憑經驗設置參數值,上線后發現需要調整,改代碼重啟服務,非常麻煩
3.線程池相對開發人員來說是個黑盒,運行情況不能感知到,直到出現問題
如果你有以上痛點,這篇文章要介紹的動態可監控線程池(DynamicTp)或許能幫助到你。
如果看過ThreadPoolExecutor的源碼,大概可以知道其實它有提供一些set方法,可以在運行時動態去修改相應的值,這些方法有:
public void setCorePoolSize(int corePoolSize); public void setMaximumPoolSize(int maximumPoolSize); public void setKeepAliveTime(long time, TimeUnit unit); public void setThreadFactory(ThreadFactory threadFactory); public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
現在大多數的互聯網項目其實都會微服務化部署,有一套自己的服務治理體系,微服務組件中的分布式配置中心扮演的就是動態修改配置,實時生效的角色。那么我們是否可以結合配置中心來做運行時線程池參數的動態調整呢?答案是肯定的,而且配置中心相對都是高可用的,使用它也不用過于擔心配置推送出現問題這類事兒,而且也能減少研發動態線程池組件的難度和工作量。
綜上,我們總結出以下的背景
廣泛性:在Java開發中,想要提高系統性能,線程池已經是一個90%以上的人都會選擇使用的基礎工具
不確定性:項目中可能會創建很多線程池,既有IO密集型的,也有CPU密集型的,但線程池的參數并不好確定;需要有套機制在運行過程中動態去調整參數
無感知性,線程池運行過程中的各項指標一般感知不到;需要有套監控報警機制在事前、事中就能讓開發人員感知到線程池的運行狀況,及時處理
高可用性,配置變更需要及時推送到客戶端;需要有高可用的配置管理推送服務,配置中心是現在大多數互聯網系統都會使用的組件,與之結合可以大幅度減少開發量及接入難度
簡介
我們基于配置中心對線程池ThreadPoolExecutor做一些擴展,實現對運行中線程池參數的動態修改,實時生效;以及實時監控線程池的運行狀態,觸發設置的報警策略時報警,報警信息會推送辦公平臺(釘釘、企微等)。報警維度包括(隊列容量、線程池活性、拒絕觸發等);同時也會定時采集線程池指標數據供監控平臺可視化使用。使我們能時刻感知到線程池的負載,根據情況及時調整,避免出現問題影響線上業務。
| __ \ (_) |__ __| | | | |_ _ _ __ __ _ _ __ ___ _ ___| |_ __ | | | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ | |__| | |_| | | | | (_| | | | | | | | (__| | |_) | |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool ::
特性
參考美團線程池實踐 ,對線程池參數動態化管理,增加監控、報警功能
基于Spring框架,現只支持SpringBoot項目使用,輕量級,引入starter即可食用
基于配置中心實現線程池參數動態調整,實時生效;集成主流配置中心,默認支持Nacos、Apollo,同時也提供SPI接口可自定義擴展實現
內置通知報警功能,提供多種報警維度(配置變更通知、活性報警、容量閾值報警、拒絕策略觸發報警),默認支持企業微信、釘釘報警,同時提供SPI接口可自定義擴展實現
內置線程池指標采集功能,支持通過MicroMeter、JsonLog日志輸出、Endpoint三種方式,可通過SPI接口自定義擴展實現
集成管理常用第三方組件的線程池,已集成SpringBoot內置WebServer(Tomcat、Undertow、Jetty)的線程池管理
架構設計
主要分四大模塊
配置變更監聽模塊:
1.監聽特定配置中心的指定配置文件(默認實現Nacos、Apollo),可通過內部提供的SPI接口擴展其他實現
2.解析配置文件內容,內置實現yml、properties配置文件的解析,可通過內部提供的SPI接口擴展其他實現
3.通知線程池管理模塊實現刷新
線程池管理模塊:
1.服務啟動時從配置中心拉取配置信息,生成線程池實例注冊到內部線程池注冊中心中
2.監聽模塊監聽到配置變更時,將變更信息傳遞給管理模塊,實現線程池參數的刷新
3.代碼中通過getExecutor()方法根據線程池名稱來獲取線程池對象實例
監控模塊:
實現監控指標采集以及輸出,默認提供以下三種方式,也可通過內部提供的SPI接口擴展其他實現
1.默認實現Json log輸出到磁盤
2.MicroMeter采集,引入MicroMeter相關依賴
3.暴雷Endpoint端點,可通過http方式訪問
通知告警模塊:
對接辦公平臺,實現通告告警功能,默認實現釘釘、企微,可通過內部提供的SPI接口擴展其他實現,通知告警類型如下
1.線程池參數變更通知
2.阻塞隊列容量達到設置閾值告警
3.線程池活性達到設置閾值告警
4.觸發拒絕策略告警
使用
maven依賴
apollo應用用接入用此依賴
spring-cloud場景下的nacos應用接入用此依賴
非spring-cloud場景下的nacos應用接入用此依賴
線程池配置
spring: dynamic: tp: enabled: true enabledBanner: true # 是否開啟banner打印,默認true enabledCollect: false # 是否開啟監控指標采集,默認false collectorType: logging # 監控數據采集器類型(JsonLog | MicroMeter),默認logging logPath: /home/logs # 監控日志數據路徑,默認${user.home}/logs monitorInterval: 5 # 監控時間間隔(報警判斷、指標采集),默認5s nacos: # nacos配置,不配置有默認值(規則name-dev.yml這樣) dataId: dynamic-tp-demo-dev.yml group: DEFAULT_GROUP apollo: # apollo配置,不配置默認拿apollo配置第一個namespace namespace: dynamic-tp-demo-dev.yml configType: yml # 配置文件類型 platforms: # 通知報警平臺配置 - platform: wechat urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c # 替換 receivers: test1,test2 # 接受人企微名稱 - platform: ding urlKey: f80dad441fcd655438f4a08dcd6a # 替換 secret: SECb5441fa6f375d5b9d21 # 替換,非sign模式可以沒有此值 receivers: 15810119805 # 釘釘賬號手機號 tomcatTp: # tomcat web server線程池配置 minSpare: 100 max: 400 jettyTp: # jetty web server線程池配置 min: 100 max: 400 undertowTp: # undertow web server線程池配置 ioThreads: 100 workerThreads: 400 executors: # 動態線程池配置 - threadPoolName: dynamic-tp-test-1 corePoolSize: 6 maximumPoolSize: 8 queueCapacity: 200 queueType: VariableLinkedBlockingQueue # 任務隊列,查看源碼QueueTypeEnum枚舉類 rejectedHandlerType: CallerRunsPolicy # 拒絕策略,查看RejectedTypeEnum枚舉類 keepAliveTime: 50 allowCoreThreadTimeOut: false threadNamePrefix: test # 線程名前綴 notifyItems: # 報警項,不配置自動會配置(變更通知、容量報警、活性報警、拒絕報警) - type: capacity # 報警項類型,查看源碼 NotifyTypeEnum枚舉類 enabled: true threshold: 80 # 報警閾值 platforms: [ding,wechat] # 可選配置,不配置默認拿上層platforms配置的所以平臺 interval: 120 # 報警間隔(單位:s) - type: change enabled: true - type: liveness enabled: true threshold: 80 - type: reject enabled: true threshold: 1
代碼方式生成,服務啟動會自動注冊
@Configuration public class DtpConfig { @Bean public DtpExecutor demo1Executor() { return DtpCreator.createDynamicFast("demo1-executor"); } @Bean public ThreadPoolExecutor demo2Executor() { return ThreadPoolBuilder.newBuilder() .threadPoolName("demo2-executor") .corePoolSize(8) .maximumPoolSize(16) .keepAliveTime(50) .allowCoreThreadTimeOut(true) .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false) .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) .buildDynamic(); } }
代碼調用,根據線程池名稱獲取
public static void main(String[] args) { DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1"); dtpExecutor.execute(() -> System.out.println("test")); }
注意事項
配置文件配置的參數會覆蓋通過代碼生成方式配置的參數
阻塞隊列只有VariableLinkedBlockingQueue類型可以修改capacity,該類型功能和LinkedBlockingQueue相似,只是capacity不是final類型,可以修改,
VariableLinkedBlockingQueue參考RabbitMq的實現
啟動看到如下日志輸出證明接入成功
| __ \ (_) |__ __| | | | |_ _ _ __ __ _ _ __ ___ _ ___| |_ __ | | | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ | |__| | |_| | | | | (_| | | | | | | | (__| | |_) | |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool :: DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
配置變更會推送通知消息,且會高亮變更的字段
DynamicTp [dynamic-tp-test-2] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]
通知報警
觸發報警閾值會推送相應報警消息,且會高亮顯示相關字段,活性告警、容量告警、拒絕告警
配置變更會推送通知消息,且會高亮變更的字段
監控日志
通過主配置文件collectType屬性配置指標采集類型,默認值:logging
micrometer方式:通過引入micrometer相關依賴采集到相應的平臺
(如Prometheus,InfluxDb…)
logging:指標數據以json格式輸出日志到磁盤,地址
l
o
g
P
a
t
h
/
d
y
n
a
m
i
c
t
p
/
{logPath}/ dynamictp/
logPath/dynamictp/{appName}.monitor.log
2022-01-16 15:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":100,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":10,"taskCount":120,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1078,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-16 15:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":120,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":20,"taskCount":140,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1459,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-16 15:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":140,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":89,"taskCount":180,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1890,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-16 15:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":160,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":99,"taskCount":230,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":2780,"dtpName":"remoting-call","maximumPoolSize":8} 2022-01-16 15:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":230,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":300,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":4030,"dtpName":"remoting-call","maximumPoolSize":8}
暴露EndPoint端點(dynamic-tp),可以通過http方式請求
[ { "dtp_name": "remoting-call", "core_pool_size": 8, "maximum_pool_size": 16, "queue_type": "SynchronousQueue", "queue_capacity": 0, "queue_size": 0, "fair": false, "queue_remaining_capacity": 0, "active_count": 2, "task_count": 2760, "completed_task_count": 2760, "largest_pool_size": 16, "pool_size": 8, "wait_task_count": 0, "reject_count": 12462, "reject_handler_name": "CallerRunsPolicy" }, { "max_memory": "220 MB", "total_memory": "140 MB", "free_memory": "44 MB", "usable_memory": "125 MB" } ]
項目地址
gitee地址:https://gitee.com/yanhom/dynamic-tp
github地址:https://github.com/lyh200/dynamic-tp
聯系我
對項目有什么想法或者建議,可以加我微信交流,或者創建issues,一起完善項目
微信:yanhom1314
Java 任務調度 多線程
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。