Akka Dispatchers和Routers
Akka Dispatcher是維持Akka Actor動(dòng)作的核心組件,是整個(gè)Akka框架的引擎。它是基于Java的Executor框架來(lái)實(shí)現(xiàn)的。Dispatcher控制和協(xié)調(diào)消息并將其分發(fā)給運(yùn)行在底層線程上的Actor,由它來(lái)負(fù)責(zé)調(diào)度資源的優(yōu)化,并保證任務(wù)以最快的速度執(zhí)行。
Akka的高穩(wěn)定性是建立在“Let It Crash”模型之上的,該模型是基于Supervision和Monitoring實(shí)現(xiàn)的。通過(guò)定義Supervision和監(jiān)管策略,實(shí)現(xiàn)系統(tǒng)異常處理。
Akka為了保證事務(wù)的一致,引入了STM的概念。STM使用的是“樂(lè)觀鎖”,執(zhí)行臨界區(qū)代碼后,會(huì)檢測(cè)是否產(chǎn)生沖突,如果產(chǎn)生沖突,將回滾修改,重新執(zhí)行臨界區(qū)代碼。
Akka中,Dispatcher基于Java Executor框架來(lái)實(shí)現(xiàn),提供了異步執(zhí)行任務(wù)的能力。Executor是基于生產(chǎn)者——消費(fèi)者模型來(lái)構(gòu)建的。這意味著任務(wù)的提交和任務(wù)的執(zhí)行是在不同的線程中隔離執(zhí)行的,即提交任務(wù)的線程與執(zhí)行任務(wù)的線程是不同的。
Executor框架有兩個(gè)重要實(shí)現(xiàn):
ThreadPoolExecutor:該實(shí)現(xiàn)從預(yù)定義的線程池中選取線程來(lái)執(zhí)行任務(wù)。
ForkJoinPool:使用相同的線程池模型,提供了工作竊取的支持。
Dispatcher運(yùn)行在線程之上,負(fù)責(zé)分發(fā)其郵箱里面的Actors和Messages到executor中的線程上運(yùn)行。在Akka中,提供了4種類型的Dispatcher:
Dispatcher
Pinned Dispatcher
Balancing Dispatcher
Calling Thread Dispatcher
對(duì)應(yīng)的也有4種默認(rèn)的郵箱:
Unbounded mailbox
Bounded mailbox
Unbounded priority mailbox
Bounded priority mailbox
為Actor指定派發(fā)器
一般Actor都會(huì)有缺省的派發(fā)器,如果要指定派發(fā)器,要做兩件事:
1)在實(shí)例化Actor時(shí),指定派發(fā)器:
val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"),"myActor")
1
2)創(chuàng)建Actor時(shí),使用withDispatcher指定派發(fā)器,如my-dispatcher,然后在applicaction.conf配置文件中配置派發(fā)器
使用Dispatcher派發(fā)器
my-dispatcher{ # Dispatcher是基于事件的派發(fā)器名稱 type = Dispatcher # 使用何種ExecutionService executor = "fork-join-executor" # 配置fork join池 fork-join-executor{ # 容納基于倍數(shù)的并行數(shù)的線程數(shù)下限 parallelism-min = 2 # 并行數(shù)(線程)(CPU核數(shù)*2) parallelism-factor = 2.0 # 容納基于倍數(shù)的并行數(shù)量的線程數(shù)上限 parallelism-max = 10 } # throughput定義了線程切換到另一個(gè)Actor之前處理的消息數(shù)上限 # 設(shè)置為1表示盡可能公平 throughput = 100 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
使用PinnedDispatcher派發(fā)器
my-dispatcher{ # Dispatcher是基于事件的派發(fā)器名稱 type = PinnedDispatcher # 使用何種ExecutionService executor = "thread-pool-executor" # 配置fork join池 thread-pool-executor{ # 容納基于倍數(shù)的并行數(shù)的線程數(shù)下限 parallelism-min = 2 # 并行數(shù)(線程)(CPU核數(shù)*2) parallelism-factor = 2.0 # 容納基于倍數(shù)的并行數(shù)量的線程數(shù)上限 parallelism-max = 10 } # throughput定義了線程切換到另一個(gè)Actor之前處理的消息數(shù)上限 # 設(shè)置為1表示盡可能公平 throughput = 100 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
不同派發(fā)器的介紹
Dispatcher
Dispatcher是Akka中默認(rèn)的派發(fā)器,它是基于事件的分發(fā)器,該派發(fā)器綁定一組Actor到線程池中。該派發(fā)器有如下特點(diǎn):
1)每一個(gè)Actor都有自己的郵箱
2)該派發(fā)器都可以被任意數(shù)量的Actor共享
3)該派發(fā)器可以由ThreadPoolExecutor或ForkJoinPool提供支持
4)該派發(fā)器是非阻塞的。
Balancing Dispatcher
該派發(fā)器是基于事件的分發(fā)器,它會(huì)將任務(wù)比較多的Actor的任務(wù)重新分發(fā)到比較閑的Actor上運(yùn)行。該派發(fā)器有如下特點(diǎn):
1)所有Actor共用一個(gè)郵箱
2)該派發(fā)器只能被同一種類型的Actor共享
3)該派發(fā)器可以由ThreadPoolExecutor或ForkJoinPool提供支持
Pinned Dispatcher
該派發(fā)器為每一個(gè)Actor提供一個(gè)單一的、專用的線程。這種做法在I/O操作或者長(zhǎng)時(shí)間運(yùn)行的計(jì)算中很有用。該派發(fā)器有如下特點(diǎn):
1)每一個(gè)Actor都有自己的郵箱
2)每一個(gè)Actor都有專用的線程,該線程不能和其他Actor共享
3)該派發(fā)器有一個(gè)Executor線程池
4)該派發(fā)器在阻塞上進(jìn)行了優(yōu)化,如:如果程序正在進(jìn)行I/O操作,那么這個(gè)Actor將會(huì)等到任務(wù)執(zhí)行完成。這種阻塞型的操作在性能上要比默認(rèn)的Dispatcher要好。
Calling Thread Dispatcher
該派發(fā)器主要用于測(cè)試,并且在當(dāng)前線程運(yùn)行任務(wù),不會(huì)創(chuàng)建新線程,該派發(fā)器有如下特點(diǎn):
1)每一個(gè)Actor都有自己的郵箱
2)該派發(fā)器都可以被任意數(shù)量的Actor共享
3)該派發(fā)器由調(diào)用線程支持
郵箱
郵箱用于保存接收的消息,在Akka中除使用BalancingDispather分發(fā)器的Actor以外,每個(gè)Actor都擁有自己的郵箱。使用同一個(gè)BalancingDispather的所有Actor共享同一個(gè)郵箱實(shí)例。
郵箱是基于Java concurrent中的隊(duì)列來(lái)實(shí)現(xiàn)的,它有如下特點(diǎn):
1)阻塞隊(duì)列,直到隊(duì)列空間可用,或者隊(duì)列中有可用元素
2)有界隊(duì)列,它的大小是被限制的
缺省的郵箱實(shí)現(xiàn)
UnboundedMailbox
底層是一個(gè)java.util.concurrent.ConcurrentLinkedQueue
是否阻塞:No
是否有界:No
BoundedMailbox
底層是一個(gè)java.util.concurrent.LinkedBlockingQueue
是否阻塞:Yes
是否有界:Yes
UnboundedPriorityMailbox
底層是一個(gè)java.util.concurrent.PriorityBlockingQueue
是否阻塞:Yes
是否有界:No
BoundedPriorityMailbox
底層是一個(gè)java.util.PriorityBlockingQueue
是否阻塞:Yes
是否有界:Yes
還有一些缺省的持久郵箱。
Router
當(dāng)處理到來(lái)的消息流時(shí),我們需要一個(gè)actor來(lái)引導(dǎo)消息路由到目標(biāo)actor,從而提高消息的分配效率。在Akka中這個(gè) actor就是Router。它所管理的一些目標(biāo)actor叫做routees
Akka定義好的一些Router:
akka.routing.RoundRobinRouter:輪轉(zhuǎn)路由器將消息按照輪轉(zhuǎn)順序發(fā)送給routers
akka.routing.RandomRouter:隨機(jī)路由器隨機(jī)選擇一個(gè)router,并將消息發(fā)送給這個(gè)router
akka.routing.SmallestMailboxRouter:最小郵箱路由器會(huì)在routers中選擇郵箱里信息最少的router,然后把消息發(fā)送給它。
akka.routing.BroadcastRouter:廣播路由器將相同的消息發(fā)送給所有的routers
akka.routing.ScatterGatherFirstCompletedRouter:敏捷路由器先將消息廣播到所有routers,返回最先完成任務(wù)的router的結(jié)果給調(diào)用者。
路由器的使用
RoundRobinPool 和 RoundRobinGroupRouter對(duì)routees使用輪詢機(jī)制
RandomPool 和 RandomGroupRouter隨機(jī)選擇routees發(fā)送消息
BalancingPool嘗試從繁忙的routee重新分配任務(wù)到空閑routee,所有的routee共享一個(gè)mailbox
SmallestMailboxPoolRouter創(chuàng)建的所有routees中誰(shuí)郵箱中的消息最少發(fā)給誰(shuí)
BroadcastPool 和 BroadcastGroup廣播的路由器將接收到的消息轉(zhuǎn)發(fā)到它所有的routee。
ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup將消息發(fā)送給所有的routees,然后等待到收到第一個(gè)回復(fù),將結(jié)果發(fā)送回原始發(fā)送者。其他的回復(fù)將被丟棄
TailChoppingPool 和 TailChoppingGroup將首先發(fā)送消息到一個(gè)隨機(jī)挑取的routee,短暫的延遲后發(fā)給第二個(gè)routee(從剩余的routee中隨機(jī)挑選),以此類推。它等待第一個(gè)答復(fù),并將它轉(zhuǎn)回給原始發(fā)送者。其他答復(fù)將被丟棄。此Router的目標(biāo)是通過(guò)查詢到多個(gè)routee來(lái)減少延遲,假設(shè)其他的actor可能比第一個(gè)actor更快響應(yīng)。
ConsistentHashingPool 和 ConsistentHashingGroup對(duì)消息使用一致性哈希(consistent hashing)選擇routee
有三種方式定義哪些數(shù)據(jù)作為一致性哈希鍵
定義路由的hashMapping,將傳入的消息映射到它們一致哈希鍵。這使決策對(duì)發(fā)送者透明。·
這些消息可能會(huì)實(shí)現(xiàn)ConsistentHashable。鍵是消息的一部分,并很方便地與消息定義一起定義?!?/p>
消息可以被包裝在一個(gè)ConsistentHashableEnvelope中,來(lái)定義哪些數(shù)據(jù)可以用來(lái)做一致性哈希。發(fā)送者知道要使用的鍵。
路由器的使用要先創(chuàng)建路由器后使用。 AKKA的路由由router和眾多的routees組成,router和routees都是actor.Router即路由,是負(fù)責(zé)負(fù)載均衡和路由的抽象,有兩種方法來(lái)創(chuàng)建router:
1.Actor Group
2.Actor Pool
當(dāng)處理到來(lái)的消息流時(shí),我們需要一個(gè)actor來(lái)引導(dǎo)消息路由到目標(biāo)actor,從而提高消息的分配效率。在Akka中這個(gè) actor就是Router。它所管理的一些目標(biāo)actor叫做routees
根據(jù)不同的情況需要,Akka提供了幾種路由策略。當(dāng)然也可以創(chuàng)建自己的路由及策略。Akka提供的路由策略如下:
akka.routing.RoundRobinRoutingLogic 輪詢
akka.routing.RandomRoutingLogic 隨機(jī)
akka.routing.SmallestMailboxRoutingLogic 空閑
akka.routing.BroadcastRoutingLogic 廣播
akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
akka.routing.TailChoppingRoutingLogic 尾部斷續(xù)
akka.routing.ConsistentHashingRoutingLogic 一致性哈希
創(chuàng)建Router Actor
創(chuàng)建router actor 有兩種方式:
Pool(池)——routees都是router 的子actor,如果routees終止,router將把它們移除
Group(群組)——routees都創(chuàng)建在router的外部,router通過(guò)使用actor來(lái)選擇將消息發(fā)送到指定路徑,但不監(jiān)管routees是否終止。Router actor 向 routees 發(fā)送消息,與向普通actor發(fā)送消息一樣通過(guò)其ActorRef。Router actor 不會(huì)改變消息的發(fā)送人,routees 回復(fù)消息時(shí)發(fā)送回原始發(fā)件人,而不是Router actor。
Pool(池)可以通過(guò)配置并使用代碼在配置中獲取的方法來(lái)實(shí)現(xiàn) (例如創(chuàng)建一個(gè)輪詢Router向5個(gè)routees發(fā)送消息)
Group(群組)有時(shí)我們需要單獨(dú)地創(chuàng)建routees,然后提供一個(gè)Router來(lái)供其使用??梢酝ㄟ^(guò)將routees的路徑傳遞給Router的配置,消息將通過(guò)ActorSelection來(lái)發(fā)送到這些路徑。
有兩種方式創(chuàng)建路由器:
Pool(池)
import akka.actor._ import akka.routing.{ActorRefRoutee, FromConfig, RoundRobinGroup, RoundRobinPool, RoundRobinRoutingLogic, Router} object HelloScala { def main(args: Array[String]): Unit = { // 創(chuàng)建router val _system = ActorSystem("testRouter") // 通知代碼來(lái)實(shí)現(xiàn)路由器 val hahaRouter = _system.actorOf(RoundRobinPool(5).props(Props[WorkerRoutee]),"router111") hahaRouter ! RouteeMsg(333) val myRouter = _system.actorOf(Props[WorkerRoutee].withRouter(RoundRobinPool(nrOfInstances = 5))) myRouter ! RouteeMsg(22) val masterRouter = _system.actorOf(Props[MasterRouter],"masterRouter") masterRouter ! RouteeMsg(100) } } class MasterRouter extends Actor{ var masterRouter = { val routees = Vector.fill(3){ val r = context.actorOf(Props[WorkerRoutee]) context watch r ActorRefRoutee(r) } Router(RoundRobinRoutingLogic(),routees) } override def receive: Receive = { case w: RouteeMsg => masterRouter.route(w,sender()) case Terminated(a) => masterRouter = masterRouter.removeRoutee(a) val r = context.actorOf(Props[WorkerRoutee]) context watch r masterRouter = masterRouter.addRoutee(r) } } // 定義routee對(duì)應(yīng)的actor類型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class WorkerRoutee2 extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#@@@@@$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
Group(群組)
import akka.actor._ import akka.routing.{ RoundRobinGroup} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! RouteeMsg(13333) } } class TestActor extends Actor{ val routee1 = context.actorOf(Props[WorkerRoutee],"w1") val routee2 = context.actorOf(Props[WorkerRoutee],"w2") val routee3 = context.actorOf(Props[WorkerRoutee],"w3") val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter") override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case _ => } } // 定義routee對(duì)應(yīng)的actor類型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
特殊消息
Broadcast消息用于向Router所有的routee發(fā)送一條消息,不管該Router通常是如何路由消息的。
PoisonPill消息無(wú)論哪個(gè)actor收到PosionPill消息都會(huì)被停止。但是對(duì)于PoisonPill消息Router不會(huì)將其傳給routees。但仍然能影響到routees,因?yàn)镽outer停止時(shí)它的子actor也會(huì)停止,就可能會(huì)造成消息未處理。因此我們可以將PoisonPill包裝到Broadcast消息中。這樣Router所管理的所有routees將會(huì)處理完消息后再處理PoisonPill并停止。
Kill消息當(dāng)Kill消息被發(fā)送到Router,Router將內(nèi)部處理該消息,并且不會(huì)將它發(fā)送到其routee。Router將拋出ActorKilledException并失敗,然后Router根據(jù)監(jiān)管的策略,被恢復(fù)、重啟或終止。Router的子routee也將被暫停,也受Router監(jiān)管的影響,但是獨(dú)立在Router外部創(chuàng)建的routee將不會(huì)被影響。
import akka.actor._ import akka.routing.{Broadcast, RoundRobinGroup} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! PoisonPill } } class TestActor extends Actor{ val routee1 = context.actorOf(Props[WorkerRoutee],"w1") val routee2 = context.actorOf(Props[WorkerRoutee],"w2") val routee3 = context.actorOf(Props[WorkerRoutee],"w3") val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter") override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case RouteeBroadcast => testRouter ! Broadcast // 用于向Router所有的routee發(fā)送一條消息,不管該Router通常是如何路由消息的。 case Broadcast => println("TestActor receive a broadcast message") case Kill => testRouter ! Kill// 當(dāng)Kill消息被發(fā)送到Router,Router將內(nèi)部處理該消息,并且不會(huì)將它發(fā)送到其routee。 case PoisonPill => testRouter ! PoisonPill // 無(wú)論哪個(gè)actor收到PosionPill消息都會(huì)被停止。但是對(duì)于PoisonPill消息Router不會(huì)將其傳給routees。 case _ => } } // 定義routee對(duì)應(yīng)的actor類型 case class RouteeMsg(s: Int) // 定義廣播信息 case object RouteeBroadcast class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
遠(yuǎn)程部署Router
既可以創(chuàng)建本地actor來(lái)作為Router,也可以命令Router在任一遠(yuǎn)程主機(jī)上部署子actor。需要將路由配置放在RemoteRouterConfig下,在遠(yuǎn)程部署的路徑類中要添加akka-remote模塊:
import akka.actor._ import akka.remote.routing.{RemoteRouterConfig} import akka.routing.{Broadcast,RoundRobinPool} object HelloScala { def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val addresses = Seq( Address("akka.tcp","remotesys","otherhost",6666), AddressFromURIString("akka.tcp://othersys@anotherhost:6666") ) // WorkerRoutee 路由部署到遠(yuǎn)程的主機(jī)上 val routerRemote = _system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses).props(Props[WorkerRoutee])) } } // 定義routee對(duì)應(yīng)的actor類型 case class RouteeMsg(s: Int) class WorkerRoutee extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}") } } class Cale extends Actor{ override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}") } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
NAT 任務(wù)調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。