RxJS 連接世界

      網友投稿 681 2022-05-30

      本文是一系列介紹 RxJS 文章的第二篇,這一系列的文章將從一個小的例子開始,逐漸深入的講解 RxJS 在各種場景下的應用。對應的,也會有對 RxJS 各種操作符的講解。這篇文章將接著第一篇Hello Rx 中的例子,將更多的異步業務(Http 請求) 接入我們的 Todo App 中。在例子中,會使用更多操作符(RxJS Operator) 來處理我們的業務,后續的文章中會詳細的講解這些操作符的作用和使用場景。

      準備工作

      首先在 GitHub - teambition/learning-rxjs: Learning RxJS step by step clone 項目所需的 seed,并基于 article1 分支 checkout 一個你的 article2 分支。本文中所有涉及到 RxJS 的代碼將全部使用 TypeScript 編寫。

      這篇文章中,我們將使用 RxJS 實現以下幾個功能:

      按回車或點擊 add button 后發送一個請求,在請求返回結果后再清空輸入框,并將返回的結果變成一個 todo item。如果在請求返回結果前又一次按下回車或 add 按鈕,對比此時輸入框的值和上次發送的值是否相同,如果相同則不進行任何操作,如果不同則取消掉上次的請求并發送新的請求。

      在點擊一個 todo item 的時候發送請求,間隔 300 毫秒內的點擊,只會發出一次請求。

      在輸入框中每次輸入字符,都會在 200 毫秒后發送一個請求,搜索是否匹配到已存在的 todo item,如果已存在則高亮這個 todo item。如果在一次搜索的結果返回前輸入了新的字符,則取消掉前一個請求,再發一個搜索請求。

      使用 switchMap 切換 Observable

      為了實現需求,首先需要在原有的邏輯中加入請求的邏輯,我們可以在 lib.ts 中找到 mockHttpPost 方法:

      export?const?mockHttpPost?=?(value:?string):?Observable?=>?{??return?Observable.create((observer:?Observer)?=>?{ ????let?status?=?'pending' ????const?timmer?=?setTimeout(()?=>?{??????const?result?=?{ ????????_id:?++dbIndex,?value, ????????isDone:?false ??????} ??????searchStorage.set(result._id,?result) ??????status?=?'done' ??????observer.next(result) ??????observer.complete() ????},?random(10,?1000))????return?()?=>?{ ??????clearTimeout(timmer)??????if?(status?===?'pending')?{ ????????console.warn('post?canceled') ??????} ????} ??}) }

      這里我并沒有真正的發送一個 http 請求,在真實的業務場景中,將請求轉化成 Observable 的過程應該是這樣的:

      Observable.create(observer?=>?{ ??request(xxxx,?response?=>?{????//?success?callback ????observer.next(parse(response)) ????observer.complete() ??},?err?=>?{????//?error?callback ????observer.error(err) ??})??//?teardown?logic ??return?()?=>?request.abort() })

      在 app.ts 中引入 mockHttpPost:

      ...import?{ ??createTodoItem, ??mockHttpPost }?from?'./lib'...const?item$?=?input$ ??.map(()?=>?$input.value) ??.filter(r?=>?r?!==?'') ??.switchMap(mockHttpPost) ??.map(data?=>?createTodoItem(data.value)) ??.do((ele:?HTMLLIElement)?=>?{ ????$list.appendChild(ele) ????$input.value?=?'' ??}) ??.publishReplay(1) ??.refCount()

      修改 createTodoItem helper,讓它支持傳入 HttpResponse 格式的數據:

      //?lib.tsexport?const?createTodoItem?=?(data:?HttpResponse)?=>?{??const?result?=?document.createElement('LI') ??result.classList.add('list-group-item',?`todo-item-${data._id}`) ??result.setAttribute('data-id',?`${data._id}`)??const?innerHTML?=?` ????${data.value} ???? ?????? ???? ??` ??result.innerHTML?=?innerHTML??return?result }

      這樣 $item 部分的代碼可以簡化成:

      const?item$?=?input$ ??.map(()?=>?$input.value) ??.filter(r?=>?r?!==?'') ??.switchMap(mockHttpPost) ??.map(createTodoItem) ??.do((ele:?HTMLLIElement)?=>?{ ????$list.appendChild(ele) ????$input.value?=?'' ??}) ??.publishReplay(1) ??.refCount()

      此時代碼運行的行為是這樣的:

      直接輸入值并回車,todo item 像以前一樣被創建;

      輸入值,并在 todo item 生成前多次回車,可以看到請求被 cancel 了多次。

      這里的 switchMap 其實是 map and switch,而 switch 操作符的行為是:

      如果 Observable 中流動的數據也是 Observable,switch 會將數據流中最新的一個 Observable 訂閱并將它的值傳遞給下一個操作符,然后取消訂閱之前的 Observable。

      所以這里的 switchMap 實際是:

      const?item$?=?input$ ??.map(()?=>?$input.value) ??.filter(r?=>?r?!==?'') ??.map(mockHttpPost) ??.switch() ??.map(createTodoItem) ...

      的縮寫。同樣的,之前用到的 mergeMap 也是 map and merge。

      如果你有興趣,可以嘗試下面的代碼觀察 switchMap 行為:

      //?你可以在項目目錄下執行:?npm?i?-g?ts-node?&&?ts-node?example/switchMap.ts?觀察運行結果import?{?Observable,?Observer?}?from?'rxjs'const?stream?=?Observable.create((observer:?Observer)?=>?{ ??let?i?=?0 ??const?intervalId?=?setInterval(()?=>?{ ????observer.next(++i) ??},?1000)??return?()?=>?clearInterval(intervalId) }) function?createIntervalObservable(base:?number)?{ ??let?i?=?0 ??return?Observable.create((observer:?Observer)?=>?{????const?intervalId?=?setInterval(()?=>?{ ??????observer.next(`base:?${base},?value:?${++i}`) ????},?200)????return?()?=>?{ ??????clearInterval(intervalId) ??????console.log(`unsubscribe?base:?${base}`) ????} ??}) } stream.switchMap(createIntervalObservable) ??.subscribe(result?=>?console.log(result))

      使用 distinct* 操作符過濾數據

      但這里的邏輯還有一點不足,我們輸入一個值并快速按下多次回車,前幾次的請求被 cancel ,但如果 input 的值不變我們其實不需要 cancel 掉這些請求,只需要忽略后幾次的點擊即可。可以使用 distinct 操作符實現這個需求:

      const?item$?=?input$ ??.map(()?=>?$input.value) ??.filter(r?=>?r?!==?'') ??.distinct() ??.switchMap(mockHttpPost) ??.map(createTodoItem) ??.do((ele:?HTMLLIElement)?=>?{ ????$list.appendChild(ele) ????$input.value?=?'' ??}) ??.publishReplay(1) ??.refCount()

      用 RxJS 連接世界

      此時,如果在請求返回前不停的按下回車,只有在 input value 改變的時候才會 cancel 上一個請求:

      使用 Subject 推送數據

      此時還存在一個小問題,在生成 todo item 后再輸入與上次同樣的值并按下回車,這次的值會被 distinct 操作符過濾掉。為了解決這個問題,我們可以指定 distinct 操作符的第二個參數 flushes 來清除 distinct 操作符的緩存:

      import?{?Observable,?Subject?}?from?'rxjs'const?clearInputSubject$?=?new?Subject()const?item$?=?input$ ??.map(()?=>?$input.value) ??.filter(r?=>?r?!==?'') ??.distinct(null,?clearInputSubject$) ??.switchMap(mockHttpPost) ??.map(createTodoItem) ??.do((ele:?HTMLLIElement)?=>?{ ????$list.appendChild(ele) ????$input.value?=?'' ????clearInputSubject$.next() ??}) ??.publishReplay(1) ??.refCount()

      這里出現的 Subject 既有 Observer 的功能,也有 Observable 的功能,但又有一些區別。上一篇講過了 Observable 是 unioncast 的,也就意味著 Observable 中一個值只會發送給一個訂閱者。而 publish/share 操作符可以將它們變成 muticast 的,但它依然是 lazy 的,也就是要有訂閱者它才會執行。而這里的 Subject 與 Observable 相比,不僅是 muticast 的,而且是非 lazy 的,它可以在任意時刻任意地點推送數據,這些數據可以被任意多的訂閱者共享。

      根據 Subject 的特性可以看出來,item$ 這個 publish 出來的 Observable 可以改寫成一個 Subject,有興趣的讀者可以自行嘗試(訂閱 input 并在 subscribe 中 next 值)。

      使用 debounceTime 過濾重復的操作

      我們已經實現了第一個需求,接下來要完成第二個 在點擊一個 todo item 的時候發送請求,在請求返回結果前的點擊都會被忽略。請求的邏輯和上一個一樣:

      import?{ ??createTodoItem, ??mockToggle, ??mockHttpPost }?from?'./lib'...const?toggle$?=?item$.mergeMap($todoItem?=>?{return?Observable.fromEvent($todoItem,?'click') ????.filter(e?=>?e.target?===?$todoItem) ????.mapTo({ ??????data:?{ ????????_id:?$todoItem.dataset['id'], ????????isDone:?$todoItem.classList.contains('done') ??????},?$todoItem ????}) }) ??.switchMap(result?=>?{????return?mockToggle(result.data._id,?result.data.isDone) ??????.mapTo(result.$todoItem) ??})

      這里短時間的重復點擊,會讓前一個點擊請求取消掉,但這與我們的需求不符,我們需要的是間隔 300 毫秒內的點擊,只會發出一次請求,debounceTime 操作符可以完成這個工作:

      const?toggle$?=?item$.mergeMap($todoItem?=>?{??return?Observable.fromEvent($todoItem,?'click') ????.debounceTime(300) ????.filter(e?=>?e.target?===?$todoItem) ????.mapTo({ ??????data:?{ ????????_id:?$todoItem.dataset['id'], ????????isDone:?$todoItem.classList.contains('done') ??????},?$todoItem ????}) }) ??.switchMap(result?=>?{????return?mockToggle(result.data._id,?result.data.isDone) ??????.mapTo(result.$todoItem) ??})

      debounce and switchMap,最小化使用你的資源

      最后一個需求,需要同時使用 debounceTime 和 switchMap :

      import?{ ??createTodoItem, ??mockToggle, ??mockHttpPost, ??search, ??HttpResponse }?from?'./lib'//?后面的?search$?與?enter?應該時從同一個?Observable?中轉換出來,這里將?input?事件的?Observable?publish?成?muticastconst?type$?=?Observable.fromEvent($input,?'keydown') ??.publish() ??.refCount()const?enter$?=?type$ ??.filter(r?=>?r.keyCode?===?13)const?search$?=?type$.debounceTime(200) ??.filter(evt?=>?evt.keyCode?!==?13) ??.map(result?=>?(result.target).value) ??.switchMap(search) ??.do((result:?HttpResponse?|?null)?=>?{????const?actived?=?document.querySelectorAll('.active') ????Array.prototype.forEach.call(actived,?(item:?HTMLElement)?=>?{ ??????item.classList.remove('active') ????})????if?(result)?{??????const?item?=?document.querySelector(`.todo-item-${result._id}`) ??????item.classList.add('active') ????} ??})const?app$?=?toggle$.merge(remove$,?search$) ??.do(r?=>?{ ????console.log(r) ??})

      試著用不同的速度輸入一系列的字符串,觀察控制臺的響應。在 200 毫秒內的輸入被忽略,在 response 回來之前的輸入會讓前一個 request abort 掉。如果匹配到相同的 todo item 則會高亮它。

      總結

      一個簡陋的 Todo App 就此完成(delete 與 toggle 類似,有興趣可以自行實現),它涵蓋了一些 RxJS 擅長的領域:

      將同步/異步代碼抽象成同樣的形狀,并使用操作符加以組合;

      在需要的時候 cancel ,最大化節約資源。

      但可以明顯看出,在業務逐漸復雜以后,直接的組合 Observable 與 Observable 已經會讓數據流變得難以預測(hard to reason about),特別是在它們互相依賴互相派生的情況更加復雜的場景下。而大家知道,Flux/Redux 非常擅長處理這種場景,后面的文章中也會講到如何運用單向數據流的思想管理 Observable,以及如何使用 Redux Observable 將 RxJS 作為 Redux 的 Epics。

      本文轉載自異步社區。

      https://www.epubit.com/articleDetails?id=NC7E3EF9231900001329B1E008EAF10D4

      Web應用防火墻 WAF 前端開發

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:《智能系統與技術叢書 生成對抗網絡入門指南》—3.4GAN的工程實踐
      下一篇:一文快速掌握華為云IPv6基礎知識及使用指南
      相關文章
      www国产亚洲精品久久久日本| 亚洲国产精品无码久久一区二区 | 亚洲日韩精品无码专区加勒比| 亚洲色偷偷偷网站色偷一区| 亚洲第一精品在线视频| 中国亚洲女人69内射少妇| 亚洲av第一网站久章草| 亚洲午夜电影在线观看高清| 亚洲黄色免费在线观看| 亚洲AV永久无码精品水牛影视| 国产V亚洲V天堂无码| 国产亚洲精品自在久久| 国产亚洲精品精品国产亚洲综合| 亚洲AV无码国产剧情| 亚洲国产AV无码一区二区三区| youjizz亚洲| 亚洲精品免费在线视频| 亚洲最新视频在线观看| 亚洲美女视频网站| 亚洲视频一区在线观看| 久久精品亚洲精品国产色婷| 亚洲酒色1314狠狠做| 亚洲图片在线观看| 久久久久久久亚洲Av无码| 亚洲精品国产电影午夜| 亚洲三级在线免费观看| 亚洲制服丝袜精品久久| 亚洲va乱码一区二区三区| 亚洲人妖女同在线播放| 亚洲中文字幕日本无线码| 亚洲精品美女网站| 亚洲精品中文字幕| 国产精品无码亚洲精品2021| 亚洲精品无码不卡在线播放| 精品国产日韩亚洲一区在线| 国产精品久久亚洲一区二区| 国产精品亚洲二区在线| 国产精品亚洲高清一区二区| 国产亚洲一区二区三区在线观看| 久久久久亚洲AV成人无码网站| 国产精品久久久亚洲|