PHP借用Redis消息隊列實現高并發下發送郵件功能
參考:

我目前的做法是,借用redis的隊列,把要發送的消息,全部放到里面,然后就不管了
有一個后臺發送進程,來處理隊列里面的數據
1.如果需要重發,則把發送失敗的消息放到一個備份的隊列里,每次循環開始前,都把備份隊列里的數據放到發送的隊列里。
2.php進程不建議常駐,因此,可以把一個進程的生命周期設置為1min,再借用cron來實現進程的重啟
對接一個消息隊列,把你要處理的任務放入消息隊列,簡單的可以用redis,復雜點的可以beanstalkd, rabbitmq等
如果堅持用PHP實現,寫CLI腳本去這個消息隊列拿消息,拿到消息之后處理你的耗時任務
亦可使用其它技術實現,python,java,看你們團隊的實際情況和技術棧
PS: PHP有多任務的解決方案,用pthread擴展實現多線程或者pcntl擴展實現多進程,但也不要在web端做這個事情
這個可以借用消息隊列做異步處理, 你web后臺只是做個觸發, 真正的郵件之類的通知,要到異步處理, 這樣不會堵塞你web后臺的操作,消息隊列的話,有很多種方案, 簡單點的就是利用redis自己實現一個,或者網上有類似的。
隊列處理發送消息的動作的時候, 你可以根據你業務的重要, 比如, 我發送一次,不管成功不成功,無所謂,還是必須把消息發送成功, 必須發送成功的話, 你可以把失敗的, 寫到另外隊列,做處理,或者做log記錄之類的, 這個跟公司業務比較接近了。
1.正常的編寫郵件發送代碼
2.把最后的send改一下,變成存入redis隊列的函數
3.編寫一個取出redis隊列內容的函數3,然后按個進行發送
4.在command或者shell模塊編寫一個函數4,進行調用步驟3的函數
5.在crontab進行指定php執行步驟4的函數,進行異步發送郵件
總結:
這個是因為php沒有異步的功能,導致只能依靠linux的crontab進行異步
現在php的擴展swoole已經有了異步task,可以用來異步發送郵件!
那么如何實現異步消息隊列發送郵件呢??
傳統的操作方法是這樣的:
用戶輸入郵件信息
服務器獲取用戶輸入的數據,提交到第三方的郵件服務器
第三方郵件服務器發送郵件,返回處理結果
異步的處理郵件發送:
用戶輸入郵件相關信息
將注冊信息存儲在內存隊列,通知用戶發送成功
服務器端監聽內存隊列,將內存隊列中的郵件數據依次發送 用戶感知不到
兩者的區別在哪?
異步相對于同步來說,頁面非阻塞,減少了用戶等待的時間體驗相對來說比較好
Redis 應用-異步消息隊列與延時隊列
異步消息隊列
說道消息隊列,你肯定會想到 Kafka、Rabbitmq 等消息中間件,這些專業的消息中間件提供了很多功能特性,當然他的部署使用維護都是比較麻煩的。如果你對消息隊列沒那么高要求,想要輕量級的,使用 Redis 就沒錯啦。
Redis 通過 list 數據結構來實現消息隊列。主要使用到如下命令:
lpush 和 rpush 入隊列
lpop 和 rpop 出隊列
blpop 和 brpop 阻塞式出隊列
廢話補不多說上代碼:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//發送消息
$redis->lPush($list, $value);
//消費消息
while (true) {
try {
$msg = $redis->rPop($list);
if (!$msg) {
sleep(1);
}
//業務處理
} catch (Exception $e) {
echo $e->getMessage();
}
}
上面代碼會有個問題,如果隊列長時間是空的,那 rpop 操作就會不斷的循環執行,這樣會導致 Redis 的 QPS 升高,影響性能。所以我們使用 sleep 來解決,當沒有消息的時候阻塞一段時間。但其實這樣還會帶來另一個問題,就是 sleep 會導致消息的處理延遲增加。這個問題我們可以通過 blpop/brpop 來阻塞讀取隊列。
blpop/brpop 在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消息的延遲幾乎為零。用 blpop/brpop 替代前面的 lpop/rpop,就完美解決了上面的問題。
還有一個需要注意的點是我們需要是用 try/catch 來進行異常捕獲,如果一直阻塞在那里,Redis 服務器一般會主動斷開掉空鏈接,來減少閑置資源的占用。
延遲隊列
你是否在做電商項目的時候會遇到如下場景:
訂單下單后超過一小時用戶未支付,需要關閉訂單
訂單的評論如果 7 天未評價,系統需要自動產生一條評論
這個時候我們就需要用到延時隊列了,顧名思義就是需要延遲一段時間后執行。Redis 可通過 zset 來實現。我們可以將有序集合的 value 設置為我們的消息任務,把 value 的 score 設置為消息的到期時間,然后輪詢獲取有序集合的中的到期消息進行處理。
實現代碼如下:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->zAdd($delayQueue,$tts, $value);
while(true) {
try{
$msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
if(!$msg){
continue;
}
//刪除消息
$ok = $redis.zrem($delayQueue,$msg);
if($ok){
//業務處理
}
} catch(\Exception $e) {
}
}
這里又產生了一個問題,同一個任務可能會被多個進程取到之后再使用 zrem 進行爭搶,那些沒搶到的進程都是白取了一次任務,這是浪費。解決辦法:將 zrangebyscore 和 zrem 使用 lua 腳本進行原子化操作,這樣多個進程之間爭搶任務時就不會出現這種浪費了。
Redis可靠隊列
前一篇文章《Redis消息隊列》介紹了一種簡單的FIFO隊列的實現。
FIFO隊列中的消息一經發送出去,便從隊列里刪除。如果由于網絡原因消費者沒有收到消息,或者消費者在處理這條消息的過程中崩潰了,就再也無法還原出這條消息。也就是說,FIFO隊列不能保證消息會傳遞成功。
究其原因,在于FIFO隊列缺乏消息確認機制,即消費者向隊列報告消息已收到或已處理的機制。可靠隊列便是加入了這一機制的消息隊列。
Redis在RPOPLPUSH命令的文檔中提供了一種利用這一命令實現可靠隊列的方式。RPOPLPUSH命令可以在從一個list中獲取消息的同時把這條消息復制到另一個list里,并且這個過程是原子的。
利用RPOPLPUSH實現的可靠隊列由兩個列表組成,一個存儲待處理的消息(pending list),另一個存儲處理中的消息(processing list)。
生產者通過LPUSH將消息發送到待處理列表:
127.0.0.1:6379> LPUSH queue:pending "message"
消費者使用RPOPLPUSH從待處理列表獲取消息,同時將它加入處理中列表:
127.0.0.1:6379> RPOPLPUSH queue:pending queue:processing
"message"
此時這條消息已經從待處理列表中刪除,并且復制到了處理中列表:
127.0.0.1:6379> LRANGE queue:pending 0 -1
(empty list or set)
127.0.0.1:6379> LRANGE queue:processing 0 -1
1) "message"
消費者在收到消息或者處理完消息后,使用LREM命令從處理中列表刪除這條消息,即完成了消息確認:
127.0.0.1:6379> LREM queue:processing 1 "message"
使用LREM而不是RPOP的原因在于,在并發時,不能保證處理中的消息能按加入列表的先后順序被確認;而RPOP會按順序刪除消息。
沒有被確認的消息會一直存儲在處理中列表。如果一個消息在處理中列表呆的時間過長,那么可以認為這個消息的傳遞或處理失敗了。我們可以設定一個超時時間,定時掃描處理中列表,將超時的消息重新放回待處理列表等待重新傳遞。
PHP Redis 數據庫
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。