限流降級神器-哨兵(sentinel)原理分析

      網(wǎng)友投稿 837 2022-05-30

      Sentinel 是阿里中間件團(tuán)隊(duì)開源的,面向分布式服務(wù)架構(gòu)的輕量級高可用流量控制組件,主要以流量為切入點(diǎn),從流量控制、熔斷降級、系統(tǒng)負(fù)載保護(hù)等多個(gè)維度來幫助用戶保護(hù)服務(wù)的穩(wěn)定性。

      大家可能會(huì)問:Sentinel 和之前常用的熔斷降級庫 Netflix Hystrix 有什么異同呢?Sentinel官網(wǎng)有一個(gè)對比的文章,這里摘抄一個(gè)總結(jié)的表格,具體的對比可以點(diǎn)此 鏈接 查看。

      從對比的表格可以看到,Sentinel比Hystrix在功能性上還要強(qiáng)大一些,本文讓我們一起來了解下Sentinel的源碼,揭開Sentinel的神秘面紗。

      項(xiàng)目結(jié)構(gòu)

      將Sentinel的源碼fork到自己的github庫中,接著把源碼clone到本地,然后開始源碼閱讀之旅吧。

      首先我們看一下Sentinel項(xiàng)目的整個(gè)結(jié)構(gòu):

      sentinel-core 核心模塊,限流、降級、系統(tǒng)保護(hù)等都在這里實(shí)現(xiàn)

      sentinel-dashboard 控制臺模塊,可以對連接上的sentinel客戶端實(shí)現(xiàn)可視化的管理

      sentinel-transport 傳輸模塊,提供了基本的監(jiān)控服務(wù)端和客戶端的API接口,以及一些基于不同庫的實(shí)現(xiàn)

      sentinel-extension 擴(kuò)展模塊,主要對DataSource進(jìn)行了部分?jǐn)U展實(shí)現(xiàn)

      sentinel-adapter 適配器模塊,主要實(shí)現(xiàn)了對一些常見框架的適配

      sentinel-demo 樣例模塊,可參考怎么使用sentinel進(jìn)行限流、降級等

      sentinel-benchmark 基準(zhǔn)測試模塊,對核心代碼的精確性提供基準(zhǔn)測試

      運(yùn)行樣例

      基本上每個(gè)框架都會(huì)帶有樣例模塊,有的叫example,有的叫demo,sentinel也不例外。

      那我們從sentinel的demo中找一個(gè)例子運(yùn)行下看看大致的情況吧,上面說過了sentinel主要的核心功能是做限流、降級和系統(tǒng)保護(hù),那我們就從“限流”開始看sentinel的實(shí)現(xiàn)原理吧。

      可以看到sentinel-demo模塊中有很多不同的樣例,我們找到basic模塊下的flow包,這個(gè)包下面就是對應(yīng)的限流的樣例,但是限流也有很多種類型的限流,我們就找根據(jù)qps限流的類看吧,其他的限流方式原理上都大差不差。

      public?class?FlowQpsDemo?{????private?static?final?String?KEY?=?"abc";????private?static?AtomicInteger?pass?=?new?AtomicInteger();????private?static?AtomicInteger?block?=?new?AtomicInteger();????private?static?AtomicInteger?total?=?new?AtomicInteger();????private?static?volatile?boolean?stop?=?false;????private?static?final?int?threadCount?=?32;????private?static?int?seconds?=?30;????public?static?void?main(String[]?args)?throws?Exception?{????????initFlowQpsRule();????????tick();????????//?first?make?the?system?run?on?a?very?low?condition????????simulateTraffic();????????System.out.println("=====?begin?to?do?flow?control");????????System.out.println("only?20?requests?per?second?can?pass");????}????private?static?void?initFlowQpsRule()?{????????List?rules?=?new?ArrayList();????????FlowRule?rule1?=?new?FlowRule();????????rule1.setResource(KEY);????????//?set?limit?qps?to?20????????rule1.setCount(20);????????//?設(shè)置限流類型:根據(jù)qps????????rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);????????rule1.setLimitApp("default");????????rules.add(rule1);????????//?加載限流的規(guī)則????????FlowRuleManager.loadRules(rules);????}????private?static?void?simulateTraffic()?{????????for?(int?i?=?0;?i?

      執(zhí)行上面的代碼后,打印出如下的結(jié)果:

      可以看到,上面的結(jié)果中,pass的數(shù)量和我們的預(yù)期并不相同,我們預(yù)期的是每秒允許pass的請求數(shù)是20個(gè),但是目前有很多pass的請求數(shù)是超過20個(gè)的。

      原因是,我們這里測試的代碼使用了多線程,注意看 threadCount 的值,一共有32個(gè)線程來模擬,而在RunTask的run方法中執(zhí)行資源保護(hù)時(shí),即在 SphU.entry 的內(nèi)部是沒有加鎖的,所以就會(huì)導(dǎo)致在高并發(fā)下,pass的數(shù)量會(huì)高于20。

      可以用下面這個(gè)模型來描述下,有一個(gè)TimeTicker線程在做統(tǒng)計(jì),每1秒鐘做一次。有N個(gè)RunTask線程在模擬請求,被訪問的business code被資源key保護(hù)著,根據(jù)規(guī)則,每秒只允許20個(gè)請求通過。

      由于pass、block、total等計(jì)數(shù)器是全局共享的,而多個(gè)RunTask線程在執(zhí)行SphU.entry申請獲取entry時(shí),內(nèi)部沒有鎖保護(hù),所以會(huì)存在pass的個(gè)數(shù)超過設(shè)定的閾值。

      那為了證明在單線程下限流的正確性與可靠性,那我們的模型就應(yīng)該變成了這樣:

      那接下來我把 threadCount 的值改為1,只有一個(gè)線程來執(zhí)行這個(gè)方法,看下具體的限流結(jié)果,執(zhí)行上面的代碼后打印的結(jié)果如下:

      可以看到pass數(shù)基本上維持在20,但是第一次統(tǒng)計(jì)的pass值還是超過了20。這又是什么原因?qū)е碌哪兀?/p>

      其實(shí)仔細(xì)看下Demo中的代碼可以發(fā)現(xiàn),模擬請求是用的一個(gè)線程,統(tǒng)計(jì)結(jié)果是用的另外一個(gè)線程,統(tǒng)計(jì)線程每1秒鐘統(tǒng)計(jì)一次結(jié)果,這兩個(gè)線程之間是有時(shí)間上的誤差的。從TimeTicker線程打印出來的時(shí)間戳可以看出來,雖然每隔一秒進(jìn)行統(tǒng)計(jì),但是當(dāng)前打印時(shí)的時(shí)間和上一次的時(shí)間還是有誤差的,不完全是1000ms的間隔。

      要真正驗(yàn)證每秒限制20個(gè)請求,保證數(shù)據(jù)的精準(zhǔn)性,需要做基準(zhǔn)測試,這個(gè)不是本篇文章的重點(diǎn),有興趣的同學(xué)可以去了解下jmh,sentinel中的基準(zhǔn)測試也是通過jmh做的。

      深入原理

      通過一個(gè)簡單的示例程序,我們了解了sentinel可以對請求進(jìn)行限流,除了限流外,還有降級和系統(tǒng)保護(hù)等功能。那現(xiàn)在我們就撥開云霧,深入源碼內(nèi)部去一窺sentinel的實(shí)現(xiàn)原理吧。

      首先從入口開始: SphU.entry() 。這個(gè)方法會(huì)去申請一個(gè)entry,如果能夠申請成功,則說明沒有被限流,否則會(huì)拋出BlockException,表面已經(jīng)被限流了。

      從 SphU.entry() 方法往下執(zhí)行會(huì)進(jìn)入到 Sph.entry() ,Sph的默認(rèn)實(shí)現(xiàn)類是 CtSph ,在CtSph中最終會(huì)執(zhí)行到 entry(ResourceWrapperresourceWrapper,intcount,Object...args)throwsBlockException 這個(gè)方法。

      我們來看一下這個(gè)方法的具體實(shí)現(xiàn):

      public?Entry?entry(ResourceWrapper?resourceWrapper,?int?count,?Object...?args)?throws?BlockException?{????Context?context?=?ContextUtil.getContext();????if?(context?instanceof?NullContext)?{????????//?Init?the?entry?only.?No?rule?checking?will?occur.????????return?new?CtEntry(resourceWrapper,?null,?context);????}????if?(context?==?null)?{????????context?=?MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME,?"",?resourceWrapper.getType());????}????//?Global?switch?is?close,?no?rule?checking?will?do.????if?(!Constants.ON)?{????????return?new?CtEntry(resourceWrapper,?null,?context);????}????//?獲取該資源對應(yīng)的SlotChain????ProcessorSlot?chain?=?lookProcessChain(resourceWrapper);????/*?????*?Means?processor?cache?size?exceeds?{@link?Constants.MAX_SLOT_CHAIN_SIZE},?so?no?????*?rule?checking?will?be?done.?????*/????if?(chain?==?null)?{????????return?new?CtEntry(resourceWrapper,?null,?context);????}????Entry?e?=?new?CtEntry(resourceWrapper,?chain,?context);????try?{????????//?執(zhí)行Slot的entry方法????????chain.entry(context,?resourceWrapper,?null,?count,?args);????}?catch?(BlockException?e1)?{????????e.exit(count,?args);????????//?拋出BlockExecption????????throw?e1;????}?catch?(Throwable?e1)?{????????RecordLog.info("Sentinel?unexpected?exception",?e1);????}????return?e;}

      這個(gè)方法可以分為以下幾個(gè)部分:

      1.對參數(shù)和全局配置項(xiàng)做檢測,如果不符合要求就直接返回了一個(gè)CtEntry對象,不會(huì)再進(jìn)行后面的限流檢測,否則進(jìn)入下面的檢測流程。

      2.根據(jù)包裝過的資源對象獲取對應(yīng)的SlotChain

      3.執(zhí)行SlotChain的entry方法

      3.1.如果SlotChain的entry方法拋出了BlockException,則將該異常繼續(xù)向上拋出

      3.2.如果SlotChain的entry方法正常執(zhí)行了,則最后會(huì)將該entry對象返回

      4.如果上層方法捕獲了BlockException,則說明請求被限流了,否則請求能正常執(zhí)行

      其中比較重要的是第2、3兩個(gè)步驟,我們來分解一下這兩個(gè)步驟。

      創(chuàng)建SlotChain

      首先看一下lookProcessChain的方法實(shí)現(xiàn):

      private?ProcessorSlot?lookProcessChain(ResourceWrapper?resourceWrapper)?{????ProcessorSlotChain?chain?=?chainMap.get(resourceWrapper);????if?(chain?==?null)?{????????synchronized?(LOCK)?{????????????chain?=?chainMap.get(resourceWrapper);????????????if?(chain?==?null)?{????????????????//?Entry?size?limit.????????????????if?(chainMap.size()?>=?Constants.MAX_SLOT_CHAIN_SIZE)?{????????????????????return?null;????????????????}????????????????//?具體構(gòu)造chain的方法????????????????chain?=?Env.slotsChainbuilder.build();????????????????Map?newMap?=?new?HashMap(chainMap.size()?+?1);????????????????newMap.putAll(chainMap);????????????????newMap.put(resourceWrapper,?chain);????????????????chainMap?=?newMap;????????????}????????}????}????return?chain;}

      該方法使用了一個(gè)HashMap做了緩存,key是資源對象。這里加了鎖,并且做了 doublecheck 。具體構(gòu)造chain的方法是通過: Env.slotsChainbuilder.build() 這句代碼創(chuàng)建的。那就進(jìn)入這個(gè)方法看看吧。

      public?ProcessorSlotChain?build()?{????ProcessorSlotChain?chain?=?new?DefaultProcessorSlotChain();????chain.addLast(new?NodeSelectorSlot());????chain.addLast(new?ClusterBuilderSlot());????chain.addLast(new?LogSlot());????chain.addLast(new?StatisticSlot());????chain.addLast(new?SystemSlot());????chain.addLast(new?AuthoritySlot());????chain.addLast(new?FlowSlot());????chain.addLast(new?DegradeSlot());????return?chain;}

      Chain是鏈條的意思,從build的方法可看出,ProcessorSlotChain是一個(gè)鏈表,里面添加了很多個(gè)Slot。具體的實(shí)現(xiàn)需要到DefaultProcessorSlotChain中去看。

      public?class?DefaultProcessorSlotChain?extends?ProcessorSlotChain?{????AbstractLinkedProcessorSlot?first?=?new?AbstractLinkedProcessorSlot()?{????????@Override????????public?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?Object?t,?int?count,?Object...?args)????????????throws?Throwable?{????????????super.fireEntry(context,?resourceWrapper,?t,?count,?args);????????}????????@Override????????public?void?exit(Context?context,?ResourceWrapper?resourceWrapper,?int?count,?Object...?args)?{????????????super.fireExit(context,?resourceWrapper,?count,?args);????????}????};????AbstractLinkedProcessorSlot?end?=?first;????@Override????public?void?addFirst(AbstractLinkedProcessorSlot?protocolProcessor)?{????????protocolProcessor.setNext(first.getNext());????????first.setNext(protocolProcessor);????????if?(end?==?first)?{????????????end?=?protocolProcessor;????????}????}????@Override????public?void?addLast(AbstractLinkedProcessorSlot?protocolProcessor)?{????????end.setNext(protocolProcessor);????????end?=?protocolProcessor;????}}

      DefaultProcessorSlotChain中有兩個(gè)AbstractLinkedProcessorSlot類型的變量:first和end,這就是鏈表的頭結(jié)點(diǎn)和尾節(jié)點(diǎn)。

      創(chuàng)建DefaultProcessorSlotChain對象時(shí),首先創(chuàng)建了首節(jié)點(diǎn),然后把首節(jié)點(diǎn)賦值給了尾節(jié)點(diǎn),可以用下圖表示:

      將第一個(gè)節(jié)點(diǎn)添加到鏈表中后,整個(gè)鏈表的結(jié)構(gòu)變成了如下圖這樣:

      將所有的節(jié)點(diǎn)都加入到鏈表中后,整個(gè)鏈表的結(jié)構(gòu)變成了如下圖所示:

      這樣就將所有的Slot對象添加到了鏈表中去了,每一個(gè)Slot都是繼承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一種責(zé)任鏈的設(shè)計(jì),每個(gè)對象中都有一個(gè)next屬性,指向的是另一個(gè)AbstractLinkedProcessorSlot對象。其實(shí)責(zé)任鏈模式在很多框架中都有,比如Netty中是通過pipeline來實(shí)現(xiàn)的。

      知道了SlotChain是如何創(chuàng)建的了,那接下來就要看下是如何執(zhí)行Slot的entry方法的了。

      執(zhí)行SlotChain的entry方法

      lookProcessChain方法獲得的ProcessorSlotChain的實(shí)例是DefaultProcessorSlotChain,那么執(zhí)行chain.entry方法,就會(huì)執(zhí)行DefaultProcessorSlotChain的entry方法,而DefaultProcessorSlotChain的entry方法是這樣的:

      @Overridepublic?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?Object?t,?int?count,?Object...?args)????throws?Throwable?{????first.transformEntry(context,?resourceWrapper,?t,?count,?args);}

      也就是說,DefaultProcessorSlotChain的entry實(shí)際是執(zhí)行的first屬性的transformEntry方法。

      而transformEntry方法會(huì)執(zhí)行當(dāng)前節(jié)點(diǎn)的entry方法,在DefaultProcessorSlotChain中first節(jié)點(diǎn)重寫了entry方法,具體如下:

      @Overridepublic?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?Object?t,?int?count,?Object...?args)????throws?Throwable?{????super.fireEntry(context,?resourceWrapper,?t,?count,?args);}

      first節(jié)點(diǎn)的entry方法,實(shí)際又是執(zhí)行的super的fireEntry方法,那繼續(xù)把目光轉(zhuǎn)移到fireEntry方法,具體如下:

      @Overridepublic?void?fireEntry(Context?context,?ResourceWrapper?resourceWrapper,?Object?obj,?int?count,?Object...?args)????throws?Throwable?{????if?(next?!=?null)?{????????next.transformEntry(context,?resourceWrapper,?obj,?count,?args);????}}

      從這里可以看到,從fireEntry方法中就開始傳遞執(zhí)行entry了,這里會(huì)執(zhí)行當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)transformEntry方法,上面已經(jīng)分析過了,transformEntry方法會(huì)觸發(fā)當(dāng)前節(jié)點(diǎn)的entry,也就是說fireEntry方法實(shí)際是觸發(fā)了下一個(gè)節(jié)點(diǎn)的entry方法。具體的流程如下圖所示:

      從圖中可以看出,從最初的調(diào)用Chain的entry()方法,轉(zhuǎn)變成了調(diào)用SlotChain中Slot的entry()方法。從上面的分析可以知道,SlotChain中的第一個(gè)Slot節(jié)點(diǎn)是NodeSelectorSlot。

      執(zhí)行Slot的entry方法

      現(xiàn)在可以把目光轉(zhuǎn)移到SlotChain中的第一個(gè)節(jié)點(diǎn)NodeSelectorSlot的entry方法中去了,具體的代碼如下:

      @Overridepublic?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?Object?obj,?int?count,?Object...?args)????throws?Throwable?{????DefaultNode?node?=?map.get(context.getName());????if?(node?==?null)?{????????synchronized?(this)?{????????????node?=?map.get(context.getName());????????????if?(node?==?null)?{????????????????node?=?Env.nodeBuilder.buildTreeNode(resourceWrapper,?null);????????????????HashMap?cacheMap?=?new?HashMap(map.size());????????????????cacheMap.putAll(map);????????????????cacheMap.put(context.getName(),?node);????????????????map?=?cacheMap;????????????}????????????//?Build?invocation?tree????????????((DefaultNode)context.getLastNode()).addChild(node);????????}????}????context.setCurNode(node);????//?由此觸發(fā)下一個(gè)節(jié)點(diǎn)的entry方法????fireEntry(context,?resourceWrapper,?node,?count,?args);}

      從代碼中可以看到,NodeSelectorSlot節(jié)點(diǎn)做了一些自己的業(yè)務(wù)邏輯處理,具體的大家可以深入源碼繼續(xù)追蹤,這里大概的介紹下每種Slot的功能職責(zé):

      NodeSelectorSlot?負(fù)責(zé)收集資源的路徑,并將這些資源的調(diào)用路徑,以樹狀結(jié)構(gòu)存儲(chǔ)起來,用于根據(jù)調(diào)用路徑來限流降級;

      ClusterBuilderSlot?則用于存儲(chǔ)資源的統(tǒng)計(jì)信息以及調(diào)用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級的依據(jù);

      StatistcSlot?則用于記錄,統(tǒng)計(jì)不同緯度的 runtime 信息;

      FlowSlot?則用于根據(jù)預(yù)設(shè)的限流規(guī)則,以及前面 slot 統(tǒng)計(jì)的狀態(tài),來進(jìn)行限流;

      AuthorizationSlot?則根據(jù)黑白名單,來做黑白名單控制;

      DegradeSlot?則通過統(tǒng)計(jì)信息,以及預(yù)設(shè)的規(guī)則,來做熔斷降級;

      SystemSlot?則通過系統(tǒng)的狀態(tài),例如 load1 等,來控制總的入口流量;

      執(zhí)行完業(yè)務(wù)邏輯處理后,調(diào)用了fireEntry()方法,由此觸發(fā)了下一個(gè)節(jié)點(diǎn)的entry方法。此時(shí)我們就知道了sentinel的責(zé)任鏈就是這樣傳遞的:每個(gè)Slot節(jié)點(diǎn)執(zhí)行完自己的業(yè)務(wù)后,會(huì)調(diào)用fireEntry來觸發(fā)下一個(gè)節(jié)點(diǎn)的entry方法。

      所以可以將上面的圖完整了,具體如下:

      至此就通過SlotChain完成了對每個(gè)節(jié)點(diǎn)的entry()方法的調(diào)用,每個(gè)節(jié)點(diǎn)會(huì)根據(jù)創(chuàng)建的規(guī)則,進(jìn)行自己的邏輯處理,當(dāng)統(tǒng)計(jì)的結(jié)果達(dá)到設(shè)置的閾值時(shí),就會(huì)觸發(fā)限流、降級等事件,具體是拋出BlockException異常。

      sentinel主要是基于7種不同的Slot形成了一個(gè)鏈表,每個(gè)Slot都各司其職,自己做完分內(nèi)的事之后,會(huì)把請求傳遞給下一個(gè)Slot,直到在某一個(gè)Slot中命中規(guī)則后拋出BlockException而終止。

      前三個(gè)Slot負(fù)責(zé)做統(tǒng)計(jì),后面的Slot負(fù)責(zé)根據(jù)統(tǒng)計(jì)的結(jié)果結(jié)合配置的規(guī)則進(jìn)行具體的控制,是Block該請求還是放行。

      控制的類型也有很多可選項(xiàng):根據(jù)qps、線程數(shù)、冷啟動(dòng)等等。

      然后基于這個(gè)核心的方法,衍生出了很多其他的功能:

      1、dashboard控制臺,可以可視化的對每個(gè)連接過來的sentinel客戶端 (通過發(fā)送heartbeat消息)進(jìn)行控制,dashboard和客戶端之間通過http協(xié)議進(jìn)行通訊。

      2、規(guī)則的持久化,通過實(shí)現(xiàn)DataSource接口,可以通過不同的方式對配置的規(guī)則進(jìn)行持久化,默認(rèn)規(guī)則是在內(nèi)存中的

      3、對主流的框架進(jìn)行適配,包括servlet,dubbo,rRpc等

      Dashboard控制臺

      sentinel-dashboard是一個(gè)單獨(dú)的應(yīng)用,通過spring-boot進(jìn)行啟動(dòng),主要提供一個(gè)輕量級的控制臺,它提供機(jī)器發(fā)現(xiàn)、單機(jī)資源實(shí)時(shí)監(jiān)控、集群資源匯總,以及規(guī)則管理的功能。

      我們只需要對應(yīng)用進(jìn)行簡單的配置,就可以使用這些功能。

      1 啟動(dòng)控制臺

      下載?控制臺?工程

      使用以下命令將代碼打包成一個(gè) fat jar:?mvn cleanpackage

      使用如下命令啟動(dòng)編譯后的控制臺:

      $?java?-Dserver.port=8080?-Dcsp.sentinel.dashboard.server=localhost:8080?-jar?target/sentinel-dashboard.jar

      上述命令中我們指定了一個(gè)JVM參數(shù), -Dserver.port=8080 用于指定 Spring Boot 啟動(dòng)端口為 8080。

      2 客戶端接入控制臺

      控制臺啟動(dòng)后,客戶端需要按照以下步驟接入到控制臺。

      通過 pom.xml 引入 jar 包:

      ????com.alibaba.csp????sentinel-transport-simple-http????x.y.z

      啟動(dòng)時(shí)加入 JVM 參數(shù) -Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制臺地址和端口。若啟動(dòng)多個(gè)應(yīng)用,則需要通過 -Dcsp.sentinel.api.port=xxxx 指定客戶端監(jiān)控 API 的端口(默認(rèn)是 8719)。

      除了修改 JVM 參數(shù),也可以通過配置文件取得同樣的效果。更詳細(xì)的信息可以參考 啟動(dòng)配置項(xiàng)。

      確保客戶端有訪問量,Sentinel 會(huì)在客戶端首次調(diào)用的時(shí)候進(jìn)行初始化,開始向控制臺發(fā)送心跳包。

      sentinel-dashboard是一個(gè)獨(dú)立的web應(yīng)用,可以接受客戶端的連接,然后與客戶端之間進(jìn)行通訊,他們之間使用http協(xié)議進(jìn)行通訊。他們之間的關(guān)系如下圖所示:

      dashboard

      dashboard啟動(dòng)后會(huì)等待客戶端的連接,具體的做法是在 MachineRegistryController 中有一個(gè) receiveHeartBeat 的方法,客戶端發(fā)送心跳消息,就是通過http請求這個(gè)方法。

      dashboard接收到客戶端的心跳消息后,會(huì)把客戶端的傳遞過來的ip、port等信息封裝成一個(gè) MachineInfo對象,然后將該對象通過 MachineDiscovery 接口的 addMachine 方法添加到一個(gè)ConcurrentHashMap中保存起來。

      這里會(huì)有問題,因?yàn)榭蛻舳说男畔⑹潜4嬖赿ashboard的內(nèi)存中的,所以當(dāng)dashboard應(yīng)用重啟后,之前已經(jīng)發(fā)送過來的客戶端信息都會(huì)丟失掉。

      client

      client在啟動(dòng)時(shí),會(huì)通過CommandCenterInitFunc選擇一個(gè),并且只選擇一個(gè)CommandCenter進(jìn)行啟動(dòng)。

      啟動(dòng)之前會(huì)通過spi的方式掃描獲取到所有的CommandHandler的實(shí)現(xiàn)類,然后將所有的CommandHandler注冊到一個(gè)HashMap中去,待后期使用。

      PS:考慮一下,為什么CommandHandler不需要做持久化,而是直接保存在內(nèi)存中。

      注冊完CommandHandler之后,緊接著就啟動(dòng)CommandCenter了,目前CommandCenter有兩個(gè)實(shí)現(xiàn)類:

      SimpleHttpCommandCenter 通過ServerSocket啟動(dòng)一個(gè)服務(wù)端,接受socket連接

      NettyHttpCommandCenter 通過Netty啟動(dòng)一個(gè)服務(wù)端,接受channel連接

      CommandCenter啟動(dòng)后,就等待dashboard發(fā)送消息過來了,當(dāng)接收到消息后,會(huì)把消息通過具體的CommandHandler進(jìn)行處理,然后將處理的結(jié)果返回給dashboard。

      這里需要注意的是,dashboard給client發(fā)送消息是通過異步的httpClient進(jìn)行發(fā)送的,在HttpHelper類中。

      但是詭異的是,既然通過異步發(fā)送了,又通過一個(gè)CountDownLatch來等待消息的返回,然后獲取結(jié)果,那這樣不就失去了異步的意義的嗎?具體的代碼如下:

      private?String?httpGetContent(String?url)?{????final?HttpGet?httpGet?=?new?HttpGet(url);????final?CountDownLatch?latch?=?new?CountDownLatch(1);????final?AtomicReference?reference?=?new?AtomicReference<>();????httpclient.execute(httpGet,?new?FutureCallback()?{????????@Override????????public?void?completed(final?HttpResponse?response)?{????????????try?{????????????????reference.set(getBody(response));????????????}?catch?(Exception?e)?{????????????????logger.info("httpGetContent?"?+?url?+?"?error:",?e);????????????}?finally?{????????????????latch.countDown();????????????}????????}????????@Override????????public?void?failed(final?Exception?ex)?{????????????latch.countDown();????????????logger.info("httpGetContent?"?+?url?+?"?failed:",?ex);????????}????????@Override????????public?void?cancelled()?{????????????latch.countDown();????????}????});????try?{????????latch.await(5,?TimeUnit.SECONDS);????}?catch?(Exception?e)?{????????logger.info("wait?http?client?error:",?e);????}????return?reference.get();}

      主流框架的適配

      sentinel也對一些主流的框架進(jìn)行了適配,使得在使用主流框架時(shí),也可以享受到sentinel的保護(hù)。目前已經(jīng)支持的適配器包括以下這些:

      Web Servlet

      Dubbo

      Spring Boot / Spring Cloud

      gRPC

      Apache RocketMQ

      其實(shí)做適配就是通過那些主流框架的擴(kuò)展點(diǎn),然后在擴(kuò)展點(diǎn)上加入sentinel限流降級的代碼即可。拿Servlet的適配代碼看一下,具體的代碼是:

      public?class?CommonFilter?implements?Filter?{????@Override????public?void?init(FilterConfig?filterConfig)?{????}????@Override????public?void?doFilter(ServletRequest?request,?ServletResponse?response,?FilterChain?chain)????????throws?IOException,?ServletException?{????????HttpServletRequest?sRequest?=?(HttpServletRequest)request;????????Entry?entry?=?null;????????try?{????????????//?根據(jù)請求生成的資源????????????String?target?=?FilterUtil.filterTarget(sRequest);????????????target?=?WebCallbackManager.getUrlCleaner().clean(target);????????????//?“申請”該資源????????????ContextUtil.enter(target);????????????entry?=?SphU.entry(target,?EntryType.IN);????????????//?如果能成功“申請”到資源,則說明未被限流????????????//?則將請求放行????????????chain.doFilter(request,?response);????????}?catch?(BlockException?e)?{????????????//?否則如果捕獲了BlockException異常,說明請求被限流了????????????//?則將請求重定向到一個(gè)默認(rèn)的頁面????????????HttpServletResponse?sResponse?=?(HttpServletResponse)response;????????????WebCallbackManager.getUrlBlockHandler().blocked(sRequest,?sResponse);????????}?catch?(IOException?e2)?{????????????//?省略部分代碼????????}?finally?{????????????if?(entry?!=?null)?{????????????????entry.exit();????????????}????????????ContextUtil.exit();????????}????}????@Override????public?void?destroy()?{????}}

      通過Servlet的Filter進(jìn)行擴(kuò)展,實(shí)現(xiàn)一個(gè)Filter,然后在doFilter方法中對請求進(jìn)行限流控制,如果請求被限流則將請求重定向到一個(gè)默認(rèn)頁面,否則將請求放行給下一個(gè)Filter。

      規(guī)則持久化,動(dòng)態(tài)化

      Sentinel 的理念是開發(fā)者只需要關(guān)注資源的定義,當(dāng)資源定義成功,可以動(dòng)態(tài)增加各種流控降級規(guī)則。

      Sentinel 提供兩種方式修改規(guī)則:

      通過 API 直接修改 (?loadRules)

      通過?DataSource適配不同數(shù)據(jù)源修改

      通過 API 修改比較直觀,可以通過以下三個(gè) API 修改不同的規(guī)則:

      FlowRuleManager.loadRules(List?rules);?//?修改流控規(guī)則DegradeRuleManager.loadRules(List?rules);?//?修改降級規(guī)則SystemRuleManager.loadRules(List?rules);?//?修改系統(tǒng)規(guī)則

      DataSource 擴(kuò)展

      上述 loadRules() 方法只接受內(nèi)存態(tài)的規(guī)則對象,但應(yīng)用重啟后內(nèi)存中的規(guī)則就會(huì)丟失,更多的時(shí)候規(guī)則最好能夠存儲(chǔ)在文件、數(shù)據(jù)庫或者配置中心中。

      DataSource 接口給我們提供了對接任意配置源的能力。相比直接通過 API 修改規(guī)則,實(shí)現(xiàn) DataSource 接口是更加可靠的做法。

      限流降級神器-哨兵(sentinel)原理分析

      官方推薦通過控制臺設(shè)置規(guī)則后將規(guī)則推送到統(tǒng)一的規(guī)則中心,用戶只需要實(shí)現(xiàn) DataSource 接口,來監(jiān)聽規(guī)則中心的規(guī)則變化,以實(shí)時(shí)獲取變更的規(guī)則。

      DataSource 拓展常見的實(shí)現(xiàn)方式有:

      拉模式:客戶端主動(dòng)向某個(gè)規(guī)則管理中心定期輪詢拉取規(guī)則,這個(gè)規(guī)則中心可以是 SQL、文件,甚至是 VCS 等。這樣做的方式是簡單,缺點(diǎn)是無法及時(shí)獲取變更;

      推模式:規(guī)則中心統(tǒng)一推送,客戶端通過注冊-的方式時(shí)刻監(jiān)聽變化,比如使用?Nacos、Zookeeper 等配置中心。這種方式有更好的實(shí)時(shí)性和一致性保證。

      至此,sentinel的基本情況都已經(jīng)分析了,更加詳細(xì)的內(nèi)容,可以繼續(xù)閱讀源碼來研究。

      https://mp.weixin.qq.com/s?__biz=MzAxNjk4ODE4OQ==&mid=2247484425&idx=1&sn=58cddf736260c02096e32234fc11cb4d&chksm=9bed257bac9aac6df9f786d0d3a1629e6c2c5d7f4318accfe0681b47773d169fe1b35e079625&scene=21#wechat_redirect

      任務(wù)調(diào)度 緩存

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:excel表格里的圖片復(fù)制出來的方法(excel表格里的圖片怎么復(fù)制出來)
      下一篇:【技術(shù)補(bǔ)給站】第11期:左手自研,右手開源,技術(shù)揭秘華為云如何領(lǐng)跑容器市場
      相關(guān)文章
      亚洲乱妇熟女爽到高潮的片| 亚洲国产成人无码AV在线影院| 亚洲国产成人精品女人久久久 | 国产人成亚洲第一网站在线播放| 亚洲另类小说图片| 亚洲欧洲日产专区| 亚洲自偷精品视频自拍| 亚洲明星合成图综合区在线| 亚洲理论在线观看| 亚洲免费福利视频| 亚洲综合久久精品无码色欲| 国产午夜亚洲精品国产| 亚洲av纯肉无码精品动漫| 精品国产亚洲第一区二区三区| 综合一区自拍亚洲综合图区| 色九月亚洲综合网| 亚洲国产一区视频| av在线亚洲欧洲日产一区二区| 亚洲综合亚洲综合网成人| 亚洲深深色噜噜狠狠爱网站| 亚洲AV无码一区东京热久久| 无码乱人伦一区二区亚洲| 亚洲日韩乱码中文无码蜜桃| 亚洲婷婷天堂在线综合| 国产色在线|亚洲| 亚洲精品亚洲人成在线| 亚洲av日韩av欧v在线天堂| 亚洲黄片手机免费观看| 亚洲人成网站在线播放vr| 亚洲成av人在线视| 亚洲成aⅴ人片在线影院八| 四虎亚洲精品高清在线观看| 亚洲国产成人综合精品| 亚洲情侣偷拍精品| 亚洲国产精品一区第二页 | 国产精品亚洲二区在线| 久久久久久久亚洲精品| 亚洲产国偷V产偷V自拍色戒| 亚洲欧洲日本精品| 亚洲精品无码专区| 亚洲熟伦熟女新五十路熟妇|