Taskflow 有興趣了解一下?

      網(wǎng)友投稿 1139 2025-03-31

      Taskflow 有興趣了解一下?


      最近在工作中會(huì)經(jīng)常使用到Taskflow這個(gè)東西,看起來雖然不是很難,但是遇到各種重寫的時(shí)候看起來還是有點(diǎn)煩的,這時(shí)候就必須來了解一下taskflow這樣一個(gè)東西了。

      聲明:

      請(qǐng)?jiān)徫易约菏止ぷ鞯膱D!

      TaskFlow是OpenStack開源的Python庫(kù),它幫助使任務(wù)執(zhí)行變得簡(jiǎn)單、一致、可伸縮和可靠。它允許創(chuàng)建輕量級(jí)任務(wù)對(duì)象或函數(shù),這些對(duì)象或函數(shù)以聲明的方式組合到Flow中。它包括以一種可以停止、恢復(fù)和安全地恢復(fù)的方式運(yùn)行這些Flow的引擎。使用這個(gè)庫(kù)實(shí)現(xiàn)的項(xiàng)目可以享受額外的狀態(tài)彈性、自然的聲明式構(gòu)造、更容易測(cè)試(因?yàn)槿蝿?wù)只做一件事)、工作流可插拔性、容錯(cuò)和簡(jiǎn)化的崩潰恢復(fù)/容錯(cuò)(以及更多)。

      簡(jiǎn)而言之就是:TaskFlow支持創(chuàng)建不同的?task,并以聲明的方式集成到一個(gè)?flow?中,這些?flow?會(huì)通過?engine?執(zhí)行、停止、繼續(xù)和恢復(fù)。

      Taskflow,先有task,再有flow,那我們先來看看task吧!

      Task很簡(jiǎn)單,就是把你想執(zhí)行的任務(wù)放到一個(gè)類(這個(gè)類要繼承自task類)里面的execute方法里就可以了。

      具體來說就是:

      from taskflow import task

      class TaskA(task.Task):

      def execute(self,str,*args,**kwargs):

      print('Hello,I am {0}'.format(str))

      Task說完了,創(chuàng)建的Task可以是任何不同的Task,他們既可以是順序的,也可以是毫無關(guān)聯(lián)的,還可以是微相關(guān)的。為什么可以這么做呢?

      因?yàn)樵贔low里,一共提供三種Flow的執(zhí)行方式來解決task關(guān)聯(lián)性的問題。

      例如,我現(xiàn)在要做一件事,我需要先在A表中去查到數(shù)據(jù)a,之后再依靠a作為條件去B表中查詢到b,最后再把a(bǔ),b處理成結(jié)果c,那么這時(shí)候就很顯然看到我們有三個(gè)task,并且這三個(gè)task是順序執(zhí)行的:

      這個(gè)時(shí)候我們就需要順序執(zhí)行這三個(gè)task了,我們使用線性流(linear_flow)

      from taskflow.patterns import linear_flow

      linear_flow.Flow('linear').add(

      taskA(),

      taskB(),

      taskC()

      )

      具體實(shí)現(xiàn)代碼就是這樣了。

      那如果我們想做的三個(gè)task A,B,C沒有依賴關(guān)系呢?那我們的三個(gè)任務(wù)是不是就完全可以并行執(zhí)行?

      回答:當(dāng)然是!

      taskflow里面又為我們提供了一種流叫無序流(unordered_flow)

      它可以讓這些task并行執(zhí)行,就像python里面的多線程一樣,誰先搶到資源誰就先執(zhí)行,等到三個(gè)都執(zhí)行完畢了,這個(gè)流就結(jié)束了。

      from taskflow.patterns import unordered_flow

      unordered_flow.Flow('linear').add(

      taskA(),

      taskB(),

      taskC()

      )

      最后還有一種流是圖流(graph_flow),官方是這么解釋的:

      所包含的流/任務(wù)將根據(jù)它們的依賴關(guān)系執(zhí)行,這些依賴關(guān)系將通過使用流/任務(wù)提供的和需要的映射來解決,或者通過遵循手動(dòng)創(chuàng)建的依賴關(guān)系鏈接來解決。

      根據(jù)依賴關(guān)系構(gòu)建有向圖。如果它有邊A -> B,這意味著B依賴于A(并且B的執(zhí)行必須等到A完成執(zhí)行,而恢復(fù)意味著A的恢復(fù)必須等到B完成恢復(fù))。

      https://docs.openstack.org/taskflow/latest/user/patterns.html#module-taskflow.patterns.graph_flow

      這個(gè)意思和之前的順序是不一樣的,就是說,比如我有taskA和taskB,在執(zhí)行A的時(shí)候我里面可以會(huì)用到一些和B掛鉤的事情,執(zhí)行B的時(shí)候也同樣,Task A,B之間存在依賴關(guān)系,那我們這個(gè)時(shí)候就可以使用圖流了:

      實(shí)現(xiàn):

      from taskflow.patterns import graph_flow

      graph_flow.Flow('linear').add(

      taskA(),

      taskB()

      )

      以上就是三種流的執(zhí)行方式,那么如果在flow的執(zhí)行過程中,某一條task執(zhí)行出現(xiàn)錯(cuò)誤了,那會(huì)怎么辦呢?

      這個(gè)時(shí)候就task里面的revert方法就體現(xiàn)出了很厲害的作用了。

      這里的機(jī)制其實(shí)和SQL事務(wù)里面的機(jī)制是類似的,當(dāng)我在flow里面的某條task執(zhí)行有問題的時(shí)候,整個(gè)flow也出現(xiàn)了問題。這個(gè)時(shí)候我就全部回滾,讓這些task都回到一開始的初始狀態(tài)。

      可以看一下官方文檔的例子:

      Conceptual example

      This pseudocode illustrates what how a?flow?would work for those who are familiar with?SQL?transactions.

      START TRANSACTION task1: call nova API to launch a server || ROLLBACK task2: when task1 finished, call cinder API to attach block storage to the server || ROLLBACK ...perform other tasks... COMMIT

      The above?flow?could be used by?Heat?(for example) as part of an orchestration to add a server with block storage attached. It may launch several of these in parallel to prepare a number of identical servers (or do other work depending on the desired request).

      對(duì)了,還有個(gè)東西忘了講了,我們有了task,構(gòu)造了flow,那我們?cè)趺慈?zhí)行呢?這個(gè)時(shí)候就需要用到engine方法了,作為啟動(dòng)flow的引擎是絕對(duì)不能丟的!

      engine里面自帶一個(gè)run方法,run方法需要穿兩個(gè)參數(shù),第一個(gè)參數(shù)是flow,第二個(gè)參數(shù)就是flow里面的task所需要的參數(shù),默認(rèn)是以字典的形式傳入的。

      我們可以來看一個(gè)簡(jiǎn)單的例子:

      import taskflow.engines

      from taskflow.patterns import linear_flow as lf

      from taskflow import task

      class TaskA(task.Task):

      def execute(self,str1,*args,**kwargs):

      print('Hello,I am {0}'.format(str1))

      class TaskB(task.Task):

      def execute(self,str2,*args,**kwargs):

      print('Hello,I am {0} years old'.format(str2))

      if __name__ == "__main__":

      flow = lf.Flow('simple-linear').add(TaskA(), TaskB())

      taskflow.engines.run(flow,store = dict(str1='john', str2="23"))

      最后附上一張官網(wǎng)的工作圖:

      over!

      如果想了解更多taskflow的知識(shí)

      你可以點(diǎn)擊閱讀原文

      還可以掃一掃下面這個(gè)二維碼

      SQL

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請(qǐng)聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:如何從Excel中的日期中刪除所有破折號(hào)/斜杠/連字符?
      下一篇:怎么用2019wps圖片轉(zhuǎn)文字
      相關(guān)文章
      国产综合成人亚洲区| 国产成人精品亚洲日本在线| 在线a亚洲老鸭窝天堂av高清| 亚洲国产精品自在线一区二区| 亚洲乱码精品久久久久..| 区三区激情福利综合中文字幕在线一区亚洲视频1 | 午夜在线a亚洲v天堂网2019| 亚洲国产片在线观看| 亚洲国产视频网站| 亚洲精品欧洲精品| 亚洲天堂一区在线| 亚洲人成人77777网站不卡| 亚洲三级在线视频| 精品亚洲AV无码一区二区| 亚洲第一区二区快射影院| 亚洲中文字幕久久久一区| 亚洲一区二区三区高清在线观看| 精品久久亚洲中文无码| 亚洲欧洲精品成人久久曰| 亚洲av午夜电影在线观看| 精品亚洲福利一区二区| 亚洲国产成人乱码精品女人久久久不卡 | 亚洲av日韩av永久无码电影| 亚洲av无码片vr一区二区三区 | 在线观看亚洲网站| 亚洲精品成人a在线观看| 国产亚洲精品激情都市| 亚洲精品少妇30p| 亚洲午夜精品一区二区| 亚洲理论精品午夜电影| 亚洲欧洲日韩极速播放| 亚洲av永久无码精品秋霞电影秋| 最新亚洲人成无码网www电影| 亚洲AV永久无码精品一区二区国产| 亚洲区不卡顿区在线观看| 亚洲精品亚洲人成人网| 亚洲一卡2卡三卡4卡有限公司| 亚洲综合久久久久久中文字幕| ass亚洲**毛茸茸pics| 亚洲aⅴ无码专区在线观看春色 | 亚洲精品午夜久久久伊人|