在SpringBoot中使用RedisTemplate重新消費Redis Stream中未ACK的消息

消費組從stream中獲取到消息后,會分配給自己組中其中的一個消費者進行消費魂务,消費者消費完畢曼验,需要給消費組返回ACK,表示這條消息已經消費完畢了粘姜。

當消費者從消費組獲取到消息的時候鬓照,會先把消息添加到自己的pending消息列表,當消費者給消費組返回ACK的時候孤紧,就會把這條消息從pending隊列刪除豺裆。(每個消費者都有自己的pending消息隊列)

消費者可能沒有及時的返回ACK。例如消費者消費完畢后号显,宕機臭猜,沒有及時返回ACK躺酒,此時就會導致這條消息占用2倍的內存(stream中保存一份, 消費者的的pending消息列表中保存一份)

關于Stream的基礎姿勢,可以先看看這篇貼帖子
如何在Springboot中使用Redis5的Stream

開始之前蔑歌,通過Redis客戶端模擬一點數據

1羹应,新打開Redis客戶端(我們稱之為:生產端), 創(chuàng)建streamm丐膝,名稱叫做:my_stream

XADD my_stream * hello world

隨便添加一條消息量愧,目的是為了初始化stream

2,創(chuàng)建一個消費組帅矗,名稱叫做:my_group

XGROUP CREATE my_stream my_group $

3偎肃,再新啟動一個Redis客戶端(我們稱之為:消費端1),使用消費組進行阻塞消費浑此,指定消費者:my_consumer1

XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream >

4累颂,再新啟動一個Redis客戶端(我們稱之為:消費端2),使用消費組進行阻塞消費凛俱,指定消費者:my_consumer2

XREADGROUP GROUP my_group  my_consumer2  BLOCK 0 STREAMS my_stream >

5紊馏,通過生產端,推送3條消息

XADD my_stream * message1 Hello
XADD my_stream * message2 SpringBoot
XADD my_stream * message3 Community

生產端

image.png

消費端1

image.png

消費端2

image.png

可以看到蒲犬,一共Push了3條消息朱监,它們的ID分別是

  • 1605524648266-0 (message1 )
  • 1605524657157-0 (message2)
  • 1605524665215-0 (message3)

現在的狀況是,消費者1原叮,消費了2條消息(message1和message3)赫编,消費者2,消費了1條消息(message2)奋隶。都是消費成功了的擂送,但是它們都還沒有進行ACK

在客戶端唯欣,消費者消費到一條消息后會立即返回嘹吨,需要重新執(zhí)行命令,來回到阻塞狀態(tài)

ACK消息

現在我們打算境氢,把消費者1蟀拷,消費的那條message1進行ACK

XACK my_stream my_group  1605524648266-0

獲取指定消費組中,待確認(ACK)的消息

查看消費組的所有待確認消息統(tǒng)計

127.0.0.1:6379> XPENDING my_stream  my_group
1) (integer) 2       # 消費組中萍聊,所有消費者的pending消息數量
2) "1605524657157-0" # pending消息中的匹厘,最小消息ID
3) "1605524665215-0" # pending消息中的,最大消息ID
4) 1) 1) "my_consumer1"  # 消費者1
      2) "1"             # 有1條待確認消息
   2) 1) "my_consumer2"  # 消費者2
      2) "1"             # 有2條待確認消息

查看消費者1的待確認消息詳情

127.0.0.1:6379> XPENDING my_stream  my_group 0 + 10 my_consumer1
1) 1) "1605524665215-0"  # 待ACK消息ID
   2) "my_consumer1"     # 所屬消費者
   3) (integer) 847437   # 消息自從被消費者獲取后到現在過去的時間(毫秒) - idle time
   4) (integer) 1        # 消息被獲取的次數 - delivery counter

這條命令脐区,表示查詢消費組my_group中消費者my_consumer1的opending隊列,開始ID是0她按,結束ID是最大牛隅,最多檢索10個結果炕柔。

現在的情況就是,一共3條消息媒佣,消費者1消費了2條匕累,ack了1條。消費者2消費了1條默伍,沒有ack欢嘿。消費者1和2,各自的pending隊列中都有一條未ack的消息

如何實現將未被成功消費的消息獲取出來重新進行消費也糊?之前的演示炼蹦,目的都是為了造一些數據,所以是用的客戶端命令狸剃,從這里開始掐隐,所有的演示,都會使用spring-data-redis中的RedisTemplate钞馁。

遍歷消費者的pending列表虑省,讀取到未ACK的消息,直接進行ACK

import java.time.Duration;
import java.util.List;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Test
    public void test() {
        StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
        
        // 獲取my_group中的pending消息信息僧凰,本質上就是執(zhí)行XPENDING指令
        PendingMessagesSummary pendingMessagesSummary = streamOperations.pending("my_stream", "my_group");
        
        // 所有pending消息的數量
        long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
        
        // 消費組名稱
        String groupName= pendingMessagesSummary.getGroupName();
        
        // pending隊列中的最小ID
        String minMessageId = pendingMessagesSummary.minMessageId();
        
        // pending隊列中的最大ID
        String maxMessageId = pendingMessagesSummary.maxMessageId();
        
        LOGGER.info("消費組:{}探颈,一共有{}條pending消息,最大ID={}训措,最小ID={}", groupName, totalPendingMessages, minMessageId, maxMessageId);
        

        // 每個消費者的pending消息數量
        Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
        
        pendingMessagesPerConsumer.entrySet().forEach(entry -> {
            
            // 消費者
            String consumer = entry.getKey();
            // 消費者的pending消息數量
            long consumerTotalPendingMessages = entry.getValue();
            
            LOGGER.info("消費者:{}伪节,一共有{}條pending消息", consumer, consumerTotalPendingMessages);
            
            if (consumerTotalPendingMessages > 0) {
                // 讀取消費者pending隊列的前10條記錄,從ID=0的記錄開始隙弛,一直到ID最大值
                PendingMessages pendingMessages = streamOperations.pending("my_stream", Consumer.from("my_group", consumer), Range.closed("0", "+"), 10);
                
                // 遍歷所有Opending消息的詳情
                pendingMessages.forEach(message -> {
                    // 消息的ID
                    RecordId recordId =  message.getId();
                    // 消息從消費組中獲取架馋,到此刻的時間
                    Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();
                    // 消息被獲取的次數
                    long deliveryCount = message.getTotalDeliveryCount();
                    
                    LOGGER.info("openg消息,id={}, elapsedTimeSinceLastDelivery={}, deliveryCount={}", recordId, elapsedTimeSinceLastDelivery, deliveryCount);
                    
                    /**
                     * 演示手動消費的這個判斷非常的針對全闷,目的就是要讀取消費者“my_consumer1”pending消息中叉寂,ID=1605524665215-0的這條消息
                     */
                    if (consumer.equals("my_consumer1") && recordId.toString().equals("1605524665215-0")) {
                        // 通過streamOperations,直接讀取這條pending消息总珠,
                        List<MapRecord<String, String, String>> result = streamOperations.range("my_stream", Range.rightOpen("1605524665215-0", "1605524665215-0"));
                        
                        // 開始和結束都是同一個ID屏鳍,所以結果只有一條
                        MapRecord<String, String, String> record = result.get(0);
                        
                        // 這里執(zhí)行日志輸出,模擬的就是消費邏輯
                        LOGGER.info("消費了pending消息:id={}, value={}", record.getId(), record.getValue());
                        
                        // 如果手動消費成功后局服,往消費組提交消息的ACK
                        Long retVal = streamOperations.acknowledge("my_group", record);
                        LOGGER.info("消息ack钓瞭,一共ack了{}條", retVal);
                    }
                });
            }
        });
    }
}

這種方式就是,遍歷消費組的pending消息情況淫奔,再遍歷每個消費者的pending消息id列表山涡,再根據id,直接去stream讀取這條消息,進行消費Ack鸭丛。

輸出日志

消費組:my_group竞穷,一共有2條pending消息,最大ID=1605524657157-0鳞溉,最小ID=1605524665215-0
消費者:my_consumer1瘾带,一共有1條pending消息
openg消息,id=1605524665215-0, elapsedTimeSinceLastDelivery=PT1H9M4.061S, deliveryCount=1
消費了pending消息:id=1605524665215-0, value={message3=Community}
消息ack熟菲,一共ack了1條
消費者:my_consumer2看政,一共有1條pending消息
openg消息,id=1605524657157-0, elapsedTimeSinceLastDelivery=PT1H9M12.172S, deliveryCount=1

最終的結果就是抄罕,消費者1的唯一一條pending消息被Ack了允蚣,這里有幾個點要注意

  1. 遍歷消費者pending列表時候,最小/大消息id贞绵,可以根據XPENDING指令中的結果來厉萝,我寫0 - +,只是為了偷懶

  2. 遍歷到消費者pending消息的時候榨崩,可以根據elapsedTimeSinceLastDelivery(idle time)和deliveryCount(delivery counter)做一些邏輯判斷谴垫,elapsedTimeSinceLastDelivery越長,表示這條消息被消費了很久母蛛,都沒Ack翩剪,deliveryCount表示重新投遞N次后(下文會講),都沒被消費成功彩郊,可能是消費邏輯有問題前弯,或者是Ack有問題。

再次查看XPENDING信息

127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 1
2) "1605524657157-0"
3) "1605524657157-0"
4) 1) 1) "my_consumer2"
      2) "1"

消費者1秫逝,唯1條待ack的消息看恕出,已經被我們遍歷出來手動消費,手動ack了违帆,所以只剩下消費者2還有1條pending消息浙巫。。

通過XCLAIM改變消息的消費者

如果一個消費者刷后,一直不能消費掉某條消息的畴,或者說這個消費者因為某些消息,永遠也不能上過線了尝胆,那么可以把這個消費者的pending消息丧裁,轉移到其他的消費者pending列表中,重新消費

其實我們這里要做的事情含衔,就是把“消費者2”的唯一1條pending消息“ 1605524657157-0”(message2)煎娇,交給“消費者1”二庵,重新進行消費。

Redis命令的實現

XCLAIM my_stream  my_group my_consumer1 10000 1605524657157-0

1605524657157-0這條消息逊桦,重新給my_group中的my_consumer1進行消費眨猎,前提條件是這條消息的idle time大于了10秒鐘(從獲取消息到現在超過10秒都沒Ack)。

Java客戶端的實現

import java.time.Duration;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Test
    public void test() {
        List<ByteRecord> retVal = this.stringRedisTemplate.execute(new RedisCallback<List<ByteRecord>>() {
            @Override
            public List<ByteRecord> doInRedis(RedisConnection connection) throws DataAccessException {
                // XCLAIM 指令的實現方法
                return connection.streamCommands().xClaim("my_stream".getBytes(), "my_group", "my_consumer1", Duration.ofSeconds(10), RecordId.of("1605524657157-0"));
            }
        });
        
        for (ByteRecord byteRecord : retVal) {
            LOGGER.info("改了消息的消費者:id={}, value={}", byteRecord.getId(), byteRecord.getValue());
        }
    }
}

日志輸出

改了消息的消費者:id=1605524657157-0, value={[B@10b4f345=[B@63de4fa}

再次查看XPENDING信息

127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 1
2) "1605524657157-0"
3) "1605524657157-0"
4) 1) 1) "my_consumer1"
      2) "1"

可以看到强经,消息 “1605524657157-0”(message2),已經從“消費者2”名下寺渗,轉移到了”消費者1”匿情,接下來要做的事情,就是遍歷“消費者1”的pending列表信殊,消費掉它炬称。

讀取pending消息列表,進行消費

最開始在控制涡拘,演示了通過客戶端玲躯,進行消費者阻塞消費的時候,寫了一條命令

XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream >

其中最后那個>鳄乏,表示ID跷车,是一個特殊字符,如果不是橱野,當ID不是特殊字符>時, XREADGROUP不再是從消息隊列中讀取消息, 而是從消費者的的pending消息列表中讀取歷史消息朽缴。(一般將參數設為0-0,表示讀取所有的pending消息)

Redis命令

127.0.0.1:6379> XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream 0
1) 1) "my_stream"
   2) 1) 1) "1605524657157-0"
         2) 1) "message2"
            2) "SpringBoot"

讀取到了水援,消費者1密强,pending消息中的唯一一條消息記錄

Java實現

import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @SuppressWarnings("unchecked")
    @Test
    public void test() {
        
        StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
        
        // 從消費者的pending隊列中讀取消息
        List<MapRecord<String, String, String>>  retVal = streamOperations.read(Consumer.from("my_group", "my_consumer1"), StreamOffset.create("my_stream", ReadOffset.from("0")));
        
        // 遍歷消息
        for (MapRecord<String, String, String> record : retVal ) {
            // 消費消息
            LOGGER.info("消息id={}, 消息value={}", record.getId(), record.getValue());
            // 手動ack消息
            streamOperations.acknowledge("my_group", record);
        }
    }
}

這種方式,就是直接從消費者的pending隊列中讀取數據蜗元,手動進行消費或渤,然后Ack

日志

消息id=1605524657157-0, 消息value={message2=SpringBoot}

再次查看XPENDING信息

127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

沒了,一條都沒奕扣,全部已經Ack了薪鹦。

死信

死信,就是一直沒法被消費的消息成畦,可以根據這個兩個屬性idle timedelivery counter 進行判斷

idle time 當消息被消費者讀取后距芬,就會開始計時,如果一個pending消息的idle time很長循帐,表示這消息框仔,可能是在Ack時發(fā)生了異常,或者說還沒來得及Ack拄养,消費者就宕機了离斩,導致一直沒有被Ack银舱,當消息發(fā)生了轉移,它會清零跛梗,重新計時寻馏。

delivery counter,它表示轉移的次數核偿,每當一條消息的消費者發(fā)生變更的時候诚欠,它的值都會+1,如果一條pending消息的delivery counter值很大漾岳,表示它在多個消費者之間進行了多次轉移都沒法成功消費轰绵,可以人工的讀取,消費掉尼荆。

最后

redis5的stream左腔,可以說功能還是蠻強大(設計上狠狠借鑒了一把Kakfa)。如果應用規(guī)模并不大捅儒,需要一個MQ服務液样,我想Stream的你可以試試看,比起自己搭建kakfa巧还,RocketMQ之類的鞭莽,來的快當而且更好維護。


首發(fā):https://springboot.io/t/topic/3001

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末狞悲,一起剝皮案震驚了整個濱河市撮抓,隨后出現的幾起案子,更是在濱河造成了極大的恐慌摇锋,老刑警劉巖丹拯,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異荸恕,居然都是意外死亡乖酬,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門融求,熙熙樓的掌柜王于貴愁眉苦臉地迎上來咬像,“玉大人,你說我怎么就攤上這事生宛∠匕海” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵陷舅,是天一觀的道長倒彰。 經常有香客問我,道長莱睁,這世上最難降的妖魔是什么待讳? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任芒澜,我火速辦了婚禮,結果婚禮上创淡,老公的妹妹穿的比我還像新娘痴晦。我一直安慰自己,他們只是感情好琳彩,可當我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布誊酌。 她就那樣靜靜地躺著,像睡著了一般汁针。 火紅的嫁衣襯著肌膚如雪术辐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天施无,我揣著相機與錄音,去河邊找鬼必孤。 笑死猾骡,一個胖子當著我的面吹牛,可吹牛的內容都是我干的敷搪。 我是一名探鬼主播兴想,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼赡勘!你這毒婦竟也來了嫂便?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤闸与,失蹤者是張志新(化名)和其女友劉穎毙替,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體践樱,經...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡厂画,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了拷邢。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袱院。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖瞭稼,靈堂內的尸體忽然破棺而出忽洛,到底是詐尸還是另有隱情,我是刑警寧澤环肘,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布欲虚,位于F島的核電站,受9級特大地震影響廷臼,放射性物質發(fā)生泄漏苍在。R本人自食惡果不足惜绝页,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望寂恬。 院中可真熱鬧续誉,春花似錦、人聲如沸初肉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽牙咏。三九已至臼隔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間妄壶,已是汗流浹背摔握。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留丁寄,地道東北人氨淌。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像伊磺,于是被迫代替她去往敵國和親盛正。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內容

  • 原文鏈接:Redis實現消息隊列的方案 Redis作為內存中的數據結構存儲屑埋,常用作數據庫豪筝、緩存和消息代理。它支持數...
    這個ID狠溫柔閱讀 101,263評論 2 28
  • Redis5.0最近被作者突然放出來了摘能,增加了很多新的特色功能续崖。而Redis5.0最大的新特性就是多出了一個數據結...
    仗劍身游歷半甲閱讀 798評論 0 0
  • 一、概述 消息隊列徊哑,Message Queue袜刷,常用于解決并發(fā)系統(tǒng)中的資源一致性問題,提升峰值的處理能力莺丑,同時保證...
    陽公子_閱讀 769評論 0 1
  • 黑色的海島上懸著一輪又大又圓的明月梢莽,毫不嫌棄地把溫柔的月色照在這寸草不生的小島上萧豆。一個少年白衣白發(fā),悠閑自如地倚坐...
    小水Vivian閱讀 3,116評論 1 5
  • 漸變的面目拼圖要我怎么拼洪鸭? 我是疲乏了還是投降了样刷? 不是不允許自己墜落, 我沒有滴水不進的保護膜览爵。 就是害怕變得面...
    悶熱當乘涼閱讀 4,252評論 0 13