Akka的事務STM
一個事務必須具有以下四個特點,即所謂的ACID特性:

原子性:所有的操作要么全部成功,要么全部失敗。
一致性:在事務完成后,系統保持一致性狀態。
隔離性:在一個事務成功或失敗前,產生的數據對于系統中的其他事務是不見的。
持久性:事務操作的結果要持久化保存。
Akka使用(Software Transactional Memory)軟件事務內存來實現事務。這是一種多線程之間數據共享的同步機制。對于并行計算編程而言,只要將線程中需要 訪問共享內存的關鍵邏輯 部分劃分出來封裝到一個事務中即可。
傳統的保護共享數據的方法就是加同步鎖。在java或Scala中以synchronized同步代碼塊的形式來實現:
var seats = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) val reservedSeat = seats.synchronized {// seats的synchronized構建臨界區 val head = seats.head// 取出第一個元素 seats = seats.tail// 把除取出的第一個元素外的元素重新賦值給synchronized head }
1
2
3
4
5
6
7
所有的線程在synchronized塊上面是依次執行的,保證了在同一時刻只有一個線程訪問共享變量,確保了共享變量的一致性。但是如果有線程只是想去讀取共享變量,而不是要去修改時,遇到synchronized塊也是要等待的,這樣會降低了系統的整體性能,這種鎖,叫做“悲觀鎖”,這種鎖它會假設在任何時候都可能會有線程去修改共享變量。
相應的就會有**“樂觀鎖”**,樂觀鎖認為在訪問和修改共享變量時,都不會產生任何問題。因此在執行代碼時不會有任何鎖。在樂觀鎖的實現中,當線程離開了臨界區時,系統會檢測可能的更新沖突,如果檢測不到更新沖突,那么就直接提交事務,如果檢測到有沖突發生,那么所有的改變都會回滾并嘗試重新執行臨界區代碼。
STM使用的是樂觀鎖。Akka通過將共享變量包裝到STM的引用中,檢測共享數據在事務中是否已經發生改變,從而能防止因多線程訪問共享變量造成的數據不一致的問題。
要使用STM必須在build.sbt中加入:
libraryDependencies += "com.typesafe.akka" %% "akka-agent" % "2.5.23"
1
未例代碼:
import concurrent.stm._ object HelloScala { def main(args: Array[String]): Unit = { // 使用Ref包裹變量 val mySeq = Seq[Int](1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) val seats = Ref(mySeq) // 在atomic塊中使用Ref變量示例 val getSeat = atomic{implicit txt => val head = seats().head// 取出第一個值 // 共享變量 seats() = seats().tail// 重新賦值 head } println(getSeat) } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
上面的seats變量,只能夠中atomic塊中使用,在atomic塊的代碼將被視為一個原子命令被執行。在編譯時,atomic塊需要一個隱式變量如上述的txt來為Ref中手沖突做檢測。上述代碼跟synchronized做的是同樣的事情,但是工作機制完全不同。STM使用的是樂觀鎖,當atomic塊執行完成后,有一個檢查將會執行,這個操作就是去檢查是否有沖突發生。ACID樂觀鎖實現了三個,沒有實現持久化,因STM都是發生在內存中,內存中的事務永遠都不會持久化。面使用synchronized的臨界區只有執行一次,使用的是悲觀鎖。
有時候,我們只想讀取共享變量,而不做任何改變。我們就可以使用Ref.View來讀取共享變量來提高性能:
println(seats.single.get)// 得到seats視圖,調用視圖上的get方法來獲取值 println(seats.single.get.head) println(seats.single.get)
1
2
3
讀取Agent事務中的數據
Akka中的Agent和Actor都是基于STM來處理事務的。akka的Agent提供了一個獨立于位置的異步操作,所有對Agent的操作都是異步的。
import akka.agent.Agent import scala.concurrent.{Future} import scala.concurrent.ExecutionContext.Implicits.global case class Seat(var a: Int) object HelloScala { def main(args: Array[String]): Unit = { val agent = Agent(50)// 創建Agent println(agent())// 使用agent()方式讀取Agent數據 println(agent.get)// 使用get()方式讀取Agent數據 agent send(888)// 使用send修改Agent的值 val future1 :Future[Int] = agent alter(_+100)// 修改Agent數據 println(future1 foreach println) } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。