elasticsearch入門系列">elasticsearch入門系列
953
2022-05-30
本篇作為Scala快速入門系列的第三十八篇博客,為大家帶來的是關于
Actor并發編程
的內容。
文章目錄
Actor并發編程
1.Actor介紹
Java并發編程的問題
Actor并發編程模型
Java并發編程對比Actor并發編程
2.創建Actor
使用方式
示例
Actor程序運行流程
3.發送消息/接收消息
使用方式
示例
4.持續接收消息
示例
使用loop和react優化接收消息
示例
5.發送和接收自定義消息
示例一
示例二
示例三
Actor并發編程
1.Actor介紹
scala的Actor并發編程模型可以用來開發比Java線程效率更高的并發程序。我們學習scala Actor的目的主要是為后續學習Akka做準備。
Java并發編程的問題
在Java并發編程中,每個對象都有一個邏輯監視器(monitor),可以用來控制對象的多線程訪問。我們添加sychronized關鍵字來標記,需要進行同步加鎖訪問。這樣,通過加鎖的機制來確保同一時間只有一個線程訪問共享數據。但這種方式存在資源爭奪、以及死鎖問題,程序越大問題越麻煩。
線程死鎖
Actor并發編程模型
Actor并發編程模型,是scala提供給程序員的一種與Java并發編程完全不一樣的并發編程模型,是一種基于事件模型的并發機制。Actor并發編程模型是一種不共享數據,依賴消息傳遞的一種并發編程模式,有效避免資源爭奪、死鎖等情況。
Java并發編程對比Actor并發編程
[NOTE]
scala在2.11.x版本中加入了Akka并發編程框架,老版本已經廢棄。Actor的編程模型和Akka很像,我們這里學習Actor的目的是為學習Akka做準備。
2.創建Actor
創建Actor的方式和Java中創建線程很類似,也是通過繼承來創建。
使用方式
定義class或object繼承Actor特質
重寫act方法
調用Actor的start方法執行Actor
[NOTE]
類似于Java線程,這里的每個Actor是并行執行的
示例
創建兩個Actor,一個Actor打印1-10,另一個Actor打印11-20
使用class繼承Actor創建(如果需要在程序中創建多個相同的Actor)
使用object繼承Actor創建(如果在程序中只創建一個Actor)
參考代碼
使用class繼承Actor創建
object _05ActorDemo { class Actor1 extends Actor { override def act(): Unit = (1 to 10).foreach(println(_)) } class Actor2 extends Actor { override def act(): Unit = (11 to 20).foreach(println(_)) } def main(args: Array[String]): Unit = { new Actor1().start() new Actor2().start() } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
使用object繼承Actor創建
object Actor1 extends Actor { override def act(): Unit = for(i <- 1 to 10) { println(i) } } object Actor2 extends Actor { override def act(): Unit = for(i <- 11 to 20) { println(i) } } def main(args: Array[String]): Unit = { Actor1.start() Actor2.start() }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Actor程序運行流程
調用start()方法啟動Actor
自動執行act()方法
向Actor發送消息
act方法執行完成后,程序會調用exit()方法
3.發送消息/接收消息
我們之前介紹Actor的時候,說過Actor是基于事件(消息)的并發編程模型,那么Actor是如何發送消息和接收消息的呢?
使用方式
發送消息
我們可以使用三種方法來發送消息:
例如:
要給actor1發送一個異步字符串消息,使用以下代碼:
接收消息
Actor中使用receive方法來接收消息,需要給receive方法傳入一個偏函數
[NOTE]
receive方法只接收一次消息,接收完后繼續執行act方法
示例
創建兩個Actor(ActorSender、ActorReceiver)
ActorSender發送一個異步字符串消息給ActorReceiver
ActorReceive接收到該消息后,打印出來
參考代碼
object ActorSender extends Actor { override def act(): Unit = { // 發送消息 while(true) { ActorReceiver ! "hello!" TimeUnit.SECONDS.sleep(3) } } } object ActorReceiver extends Actor { override def act(): Unit = { // 持續接收消息 while(true) { receive { case msg:String => println("接收到消息:" + msg) } } } } def main(args: Array[String]): Unit = { ActorReceiver.start() ActorSender.start() }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
4.持續接收消息
通過上一個案例,ActorReceiver調用receive來接收消息,但接收一次后,Actor就退出了。
object ActorSender extends Actor { override def act(): Unit = { // 發送字符串消息給Actor2 val msg = "你好,ActorSender" println(s"ActorSender: 發送消息$msg") ActorReceiver ! msg // 再次發送一條消息,ActorReceiver無法接收到 ActorReceiver ! "你叫什么名字?" } } object ActorReceiver extends Actor { override def act(): Unit = receive { case msg: String => println(s"接收Actor: 接收到$msg") } } object ActorMsgDemo { def main(args: Array[String]): Unit = { ActorSender.start() ActorReceiver.start() } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
上述代碼,ActorReceiver無法接收到ActorSender發送的第二條消息。
我們希望ActorReceiver能夠一直接收消息,怎么實現呢?
我們只需要使用一個while(true)循環,不停地調用receive來接收消息就可以啦。
示例
在上一個案例的基礎上,讓ActorReceiver能夠一直接收消息
object ActorSender extends Actor { override def act(): Unit = { // 發送消息 while(true) { ActorReceiver ! "hello!" TimeUnit.SECONDS.sleep(3) } } } object ActorReceiver extends Actor { override def act(): Unit = { // 持續接收消息 while(true) { receive { case msg:String => println("接收到消息:" + msg) } } } } def main(args: Array[String]): Unit = { ActorReceiver.start() ActorSender.start() }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
使用loop和react優化接收消息
上述代碼,使用while循環來不斷接收消息。
如果當前Actor沒有接收到消息,線程就會處于阻塞狀態
如果有很多的Actor,就有可能會導致很多線程都是處于阻塞狀態
每次有新的消息來時,重新創建線程來處理
頻繁的線程創建、銷毀和切換,會影響運行效率
在scala中,可以使用loop + react來復用線程。比while + receive更高效
示例
使用loop + react重寫上述案例
參考代碼
// 持續接收消息 loop { react { case msg:String => println("接收到消息:" + msg) } }
1
2
3
4
5
6
7
5.發送和接收自定義消息
我們前面發送的消息是字符串類型,Actor中也支持發送自定義消息,常見的如:使用樣例類封裝消息,然后進行發送處理。
示例一
創建一個MsgActor,并向它發送一個同步消息,該消息包含兩個字段(id、message)
MsgActor回復一個消息,該消息包含兩個字段(message、name)
打印回復消息
[!TIP]
使用!?來發送同步消息
在Actor的act方法中,可以使用sender獲取發送者的Actor引用
case class Message(id:Int, msg:String) case class ReplyMessage(msg:String, name:String) object MsgActor extends Actor { override def act(): Unit = { loop { react { case Message(id, msg) => { println(s"接收到消息:${id}/${msg}") sender ! ReplyMessage("不太好", "Tom") } } } } } def main(args: Array[String]): Unit = { MsgActor.start() val replyMessage: Any = MsgActor !? Message(1, "你好") println("回復消息:" + replyMessage.asInstanceOf[ReplyMessage]) }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
示例二
創建一個MsgActor,并向它發送一個異步無返回消息,該消息包含兩個字段(message, company)
[!TIP]
使用!發送異步無返回消息
case class Mesasge(message:String, company:String) object MsgActor extends Actor { override def act(): Unit = { loop { react { case Mesasge(message, company) => println(s"MsgActor接收到消息:${message}/${company}") } } } } def main(args: Array[String]): Unit = { MsgActor.start() MsgActor ! Mesasge("中國聯通", "大爺,快交話費!") }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
示例三
創建一個MsgActor,并向它發送一個異步有返回消息,該消息包含兩個字段(id、message)
MsgActor回復一個消息,該消息包含兩個字段(message、name)
打印回復消息
[!TIP]
使用!!發送異步有返回消息
發送后,返回類型為Future[Any]的對象
Future表示異步返回數據的封裝,雖獲取到Future的返回值,但不一定有值,可能在將來某一時刻才會返回消息
Future的isSet()可檢查是否已經收到返回消息,apply()方法可獲取返回數據
參考代碼
case class Message(id:Int, message:String) case class ReplyMessage(message:String, name:String) object MsgActor extends Actor { override def act(): Unit = { loop { react { case Message(id, message) => println(s"MsgActor接收到消息:${id}/${message}") sender ! ReplyMessage("收到消息!", "JIm") } } } } def main(args: Array[String]): Unit = { MsgActor.start() val future: Future[Any] = MsgActor !! Message(1, "你好!") while(!future.isSet) {} val replyMessage = future.apply().asInstanceOf[ReplyMessage] println(replyMessage) }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
本期的內容分享就到這里了,喜歡的小伙伴們記得點個贊,持續關注喲~下期為大家介紹如何使用Actor來完成
WordCount
的經典案例,敬請期待?(?>?)?
Java Scala
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。