首頁技術(shù)文章正文

Redis實現(xiàn)消息隊列的三種方式

更新時間:2022-08-17 來源:黑馬程序員 瀏覽量:

消息隊列(Message Queue),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括3個角色:

消息隊列:存儲消息。

生產(chǎn)者:發(fā)送消息到消息隊列,在秒殺任務(wù)中負(fù)責(zé)判斷秒殺時間和庫存,校驗消費者權(quán)限是否是一人一單,發(fā)送優(yōu)惠券id和用戶id到消息隊列中。

消費者:從消息隊列獲取消息并處理消息,接受到訂單消息之后,完成下單。

Redis提供了三種不同的方式來實現(xiàn)消息隊列:

list結(jié)構(gòu):基于List結(jié)構(gòu)模擬消息隊列。

PubSub:基本的點對點消息模型。

Stream:比較完善的消息隊列模型。

消息隊列

基于List結(jié)構(gòu)模擬消息隊列

消息隊列(Message Queue),字面意思就是存放消息的隊列。而Redis的list數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表,很容易模擬出隊列效果。

隊列是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP來實現(xiàn)。

不過要注意的是,當(dāng)隊列中沒有消息時RPOP或LPOP操作會返回null,并不像JVM的阻塞隊列那樣會阻塞并等待消息。因此這里應(yīng)該使用BRPOP或者BLOPO來實現(xiàn)阻塞效果。

List結(jié)構(gòu)

List的消息隊列可利用Redis存儲,不受限于JVM內(nèi)存上限,是基于基于Redis的持久化機制,數(shù)據(jù)安全性有保證,同時也可以滿足消息隊列的有序性。但只支持但消費者,無法避免消息丟失是它最大的問題。

基于PubSub的消息隊列

PubSub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產(chǎn)者向?qū)?yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。對應(yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。

SUBSCRIBE channel [channel] :訂閱一個或多個頻道。

PUBLISH channel msg :向一個頻道發(fā)送消息。

PSUBSCRIBE pattern[pattern] :訂閱與pattern格式匹配的所有頻道。

基于PubSub的消息隊列

基于PubSub的消息隊列采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費,但也有不支持?jǐn)?shù)據(jù)持久化、無法避免消息丟失,消息堆積有上限的缺點,超出時數(shù)據(jù)會丟失。

例如:

基于Stream的消息隊列-XREAD

Stream讀取消息的方式之一:XREAD

Stream讀取消息的方式

例如:

XREAD阻塞方式,讀取最新的消息:

在業(yè)務(wù)開發(fā)中,我們可以循環(huán)的調(diào)用XREAD阻塞方式來查詢最新消息,從而實現(xiàn)持續(xù)監(jiān)聽隊列的效果,偽代碼如下:

注意:當(dāng)我們指定起始ID為$時,代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過1條以上的消息到達(dá)隊列,則下次獲取時也只能獲取到最新的一條,會出現(xiàn)漏讀消息的問題。
STREAM類型消息隊列的XREAD命令,消息可回溯,一個消息可以被多個消費者讀取,同時可能阻塞讀取,有消息漏讀的風(fēng)險。

基于Stream的消息隊列-消費者組

消費者組(Consumer Group):將多個消費者劃分到一個組中,監(jiān)聽同一個隊列。具備下列特點:

1.消息分流:隊列中的消息會分流給組內(nèi)的不同消費者,而不是重復(fù)消費,從而加快消息處理的速度。

2.消息標(biāo)示:消費者組會維護一個標(biāo)示,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標(biāo)示之后讀取消息。確保每一個消息都會被消費。

3.消息確認(rèn):消費者獲取消息后,消息處于pending狀態(tài),并存入一個pending-list。當(dāng)處理完成后需要通過XACK來確認(rèn)消息,標(biāo)記消息為已處理,才會從pending-list移除。

創(chuàng)建消費者組:

XGROUP CREATE  key groupName ID [MKSTREAM]

key:隊列名稱

groupName:消費者組名稱

ID:起始ID標(biāo)示,$代表隊列中最后一個消息,0則代表隊列中第一個消息

MKSTREAM:隊列不存在時自動創(chuàng)建隊列。

其它常見命令:

# 刪除指定的消費者組
XGROUP DESTORY key groupName

# 給指定的消費者組添加消費者
XGROUP CREATECONSUMER key groupname consumername

# 刪除消費者組中的指定消費者
XGROUP DELCONSUMER key groupname consumername

從消費者組讀取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

group:消費組名稱

consumer:消費者名稱,如果消費者不存在,會自動創(chuàng)建一個消費者

count:本次查詢的最大數(shù)量

BLOCK milliseconds:當(dāng)沒有消息時最長等待時間

NOACK:無需手動ACK,獲取到消息后自動確認(rèn)

STREAMS key:指定隊列名稱

ID:獲取消息的起始ID:">":從下一個未消費的消息開始

其它:根據(jù)指定id從pending-list中獲取已消費但未確認(rèn)的消息,例如0,是從pending-list中的第一個消息開始。

消費者監(jiān)聽消息的基本思路:

STREAM類型消息隊列的XREADGROUP命令特點:消息可回溯,一個消息可以只能被一個消費者讀取??梢宰枞x取.沒有消息漏讀的風(fēng)險,有消息確認(rèn)機制,保證消息至少被消費一次。

分享到:
在線咨詢 我要報名
和我們在線交談!