消費組從stream中獲取到消息后,會分配給自己組中其中的一個消費者進行消費魂务,消費者消費完畢曼验,需要給消費組返回ACK,表示這條消息已經消費完畢了粘姜。
當消費者從消費組獲取到消息的時候鬓照,會先把消息添加到自己的pending消息列表,當消費者給消費組返回ACK的時候孤紧,就會把這條消息從pending隊列刪除豺裆。(每個消費者都有自己的pending消息隊列)
消費者可能沒有及時的返回ACK。例如消費者消費完畢后号显,宕機臭猜,沒有及時返回ACK躺酒,此時就會導致這條消息占用2倍的內存(stream中保存一份, 消費者的的pending消息列表中保存一份)
關于Stream的基礎姿勢,可以先看看這篇貼帖子
如何在Springboot中使用Redis5的Stream
開始之前蔑歌,通過Redis客戶端模擬一點數據
1羹应,新打開Redis客戶端(我們稱之為:生產端), 創(chuàng)建streamm丐膝,名稱叫做:my_stream
XADD my_stream * hello world
隨便添加一條消息量愧,目的是為了初始化stream
2,創(chuàng)建一個消費組帅矗,名稱叫做:my_group
XGROUP CREATE my_stream my_group $
3偎肃,再新啟動一個Redis客戶端(我們稱之為:消費端1),使用消費組進行阻塞消費浑此,指定消費者:my_consumer1
XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream >
4累颂,再新啟動一個Redis客戶端(我們稱之為:消費端2),使用消費組進行阻塞消費凛俱,指定消費者:my_consumer2
XREADGROUP GROUP my_group my_consumer2 BLOCK 0 STREAMS my_stream >
5紊馏,通過生產端,推送3條消息
XADD my_stream * message1 Hello
XADD my_stream * message2 SpringBoot
XADD my_stream * message3 Community
生產端
消費端1
消費端2
可以看到蒲犬,一共Push了3條消息朱监,它們的ID分別是
- 1605524648266-0 (message1 )
- 1605524657157-0 (message2)
- 1605524665215-0 (message3)
現在的狀況是,消費者1原叮,消費了2條消息(message1和message3)赫编,消費者2,消費了1條消息(message2)奋隶。都是消費成功了的擂送,但是它們都還沒有進行ACK。
在客戶端唯欣,消費者消費到一條消息后會立即返回嘹吨,需要重新執(zhí)行命令,來回到阻塞狀態(tài)
ACK消息
現在我們打算境氢,把消費者1蟀拷,消費的那條message1
進行ACK
XACK my_stream my_group 1605524648266-0
獲取指定消費組中,待確認(ACK)的消息
查看消費組的所有待確認消息統(tǒng)計
127.0.0.1:6379> XPENDING my_stream my_group
1) (integer) 2 # 消費組中萍聊,所有消費者的pending消息數量
2) "1605524657157-0" # pending消息中的匹厘,最小消息ID
3) "1605524665215-0" # pending消息中的,最大消息ID
4) 1) 1) "my_consumer1" # 消費者1
2) "1" # 有1條待確認消息
2) 1) "my_consumer2" # 消費者2
2) "1" # 有2條待確認消息
查看消費者1的待確認消息詳情
127.0.0.1:6379> XPENDING my_stream my_group 0 + 10 my_consumer1
1) 1) "1605524665215-0" # 待ACK消息ID
2) "my_consumer1" # 所屬消費者
3) (integer) 847437 # 消息自從被消費者獲取后到現在過去的時間(毫秒) - idle time
4) (integer) 1 # 消息被獲取的次數 - delivery counter
這條命令脐区,表示查詢消費組my_group
中消費者my_consumer1
的opending隊列,開始ID是0她按,結束ID是最大牛隅,最多檢索10個結果炕柔。
現在的情況就是,一共3條消息媒佣,消費者1消費了2條匕累,ack了1條。消費者2消費了1條默伍,沒有ack欢嘿。消費者1和2,各自的pending隊列中都有一條未ack的消息
如何實現將未被成功消費的消息獲取出來重新進行消費也糊?之前的演示炼蹦,目的都是為了造一些數據,所以是用的客戶端命令狸剃,從這里開始掐隐,所有的演示,都會使用spring-data-redis
中的RedisTemplate
钞馁。
遍歷消費者的pending列表虑省,讀取到未ACK的消息,直接進行ACK
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import io.springboot.jwt.SpringBootJwtApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Test
public void test() {
StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
// 獲取my_group中的pending消息信息僧凰,本質上就是執(zhí)行XPENDING指令
PendingMessagesSummary pendingMessagesSummary = streamOperations.pending("my_stream", "my_group");
// 所有pending消息的數量
long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
// 消費組名稱
String groupName= pendingMessagesSummary.getGroupName();
// pending隊列中的最小ID
String minMessageId = pendingMessagesSummary.minMessageId();
// pending隊列中的最大ID
String maxMessageId = pendingMessagesSummary.maxMessageId();
LOGGER.info("消費組:{}探颈,一共有{}條pending消息,最大ID={}训措,最小ID={}", groupName, totalPendingMessages, minMessageId, maxMessageId);
// 每個消費者的pending消息數量
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
pendingMessagesPerConsumer.entrySet().forEach(entry -> {
// 消費者
String consumer = entry.getKey();
// 消費者的pending消息數量
long consumerTotalPendingMessages = entry.getValue();
LOGGER.info("消費者:{}伪节,一共有{}條pending消息", consumer, consumerTotalPendingMessages);
if (consumerTotalPendingMessages > 0) {
// 讀取消費者pending隊列的前10條記錄,從ID=0的記錄開始隙弛,一直到ID最大值
PendingMessages pendingMessages = streamOperations.pending("my_stream", Consumer.from("my_group", consumer), Range.closed("0", "+"), 10);
// 遍歷所有Opending消息的詳情
pendingMessages.forEach(message -> {
// 消息的ID
RecordId recordId = message.getId();
// 消息從消費組中獲取架馋,到此刻的時間
Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();
// 消息被獲取的次數
long deliveryCount = message.getTotalDeliveryCount();
LOGGER.info("openg消息,id={}, elapsedTimeSinceLastDelivery={}, deliveryCount={}", recordId, elapsedTimeSinceLastDelivery, deliveryCount);
/**
* 演示手動消費的這個判斷非常的針對全闷,目的就是要讀取消費者“my_consumer1”pending消息中叉寂,ID=1605524665215-0的這條消息
*/
if (consumer.equals("my_consumer1") && recordId.toString().equals("1605524665215-0")) {
// 通過streamOperations,直接讀取這條pending消息总珠,
List<MapRecord<String, String, String>> result = streamOperations.range("my_stream", Range.rightOpen("1605524665215-0", "1605524665215-0"));
// 開始和結束都是同一個ID屏鳍,所以結果只有一條
MapRecord<String, String, String> record = result.get(0);
// 這里執(zhí)行日志輸出,模擬的就是消費邏輯
LOGGER.info("消費了pending消息:id={}, value={}", record.getId(), record.getValue());
// 如果手動消費成功后局服,往消費組提交消息的ACK
Long retVal = streamOperations.acknowledge("my_group", record);
LOGGER.info("消息ack钓瞭,一共ack了{}條", retVal);
}
});
}
});
}
}
這種方式就是,遍歷消費組的pending消息情況淫奔,再遍歷每個消費者的pending消息id列表山涡,再根據id,直接去stream讀取這條消息,進行消費Ack鸭丛。
輸出日志
消費組:my_group竞穷,一共有2條pending消息,最大ID=1605524657157-0鳞溉,最小ID=1605524665215-0
消費者:my_consumer1瘾带,一共有1條pending消息
openg消息,id=1605524665215-0, elapsedTimeSinceLastDelivery=PT1H9M4.061S, deliveryCount=1
消費了pending消息:id=1605524665215-0, value={message3=Community}
消息ack熟菲,一共ack了1條
消費者:my_consumer2看政,一共有1條pending消息
openg消息,id=1605524657157-0, elapsedTimeSinceLastDelivery=PT1H9M12.172S, deliveryCount=1
最終的結果就是抄罕,消費者1的唯一一條pending消息被Ack了允蚣,這里有幾個點要注意
遍歷消費者pending列表時候,最小/大消息id贞绵,可以根據
XPENDING
指令中的結果來厉萝,我寫0 - +
,只是為了偷懶遍歷到消費者pending消息的時候榨崩,可以根據
elapsedTimeSinceLastDelivery
(idle time)和deliveryCount
(delivery counter)做一些邏輯判斷谴垫,elapsedTimeSinceLastDelivery
越長,表示這條消息被消費了很久母蛛,都沒Ack翩剪,deliveryCount
表示重新投遞N次后(下文會講),都沒被消費成功彩郊,可能是消費邏輯有問題前弯,或者是Ack有問題。
再次查看XPENDING信息
127.0.0.1:6379> XPENDING my_stream my_group
1) (integer) 1
2) "1605524657157-0"
3) "1605524657157-0"
4) 1) 1) "my_consumer2"
2) "1"
消費者1秫逝,唯1條待ack的消息看恕出,已經被我們遍歷出來手動消費,手動ack了违帆,所以只剩下消費者2還有1條pending消息浙巫。。
通過XCLAIM改變消息的消費者
如果一個消費者刷后,一直不能消費掉某條消息的畴,或者說這個消費者因為某些消息,永遠也不能上過線了尝胆,那么可以把這個消費者的pending消息丧裁,轉移到其他的消費者pending列表中,重新消費
其實我們這里要做的事情含衔,就是把“消費者2”的唯一1條pending消息“ 1605524657157-0”(message2)煎娇,交給“消費者1”二庵,重新進行消費。
Redis命令的實現
XCLAIM my_stream my_group my_consumer1 10000 1605524657157-0
把1605524657157-0
這條消息逊桦,重新給my_group
中的my_consumer1
進行消費眨猎,前提條件是這條消息的idle time
大于了10秒鐘(從獲取消息到現在超過10秒都沒Ack)。
Java客戶端的實現
import java.time.Duration;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import io.springboot.jwt.SpringBootJwtApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Test
public void test() {
List<ByteRecord> retVal = this.stringRedisTemplate.execute(new RedisCallback<List<ByteRecord>>() {
@Override
public List<ByteRecord> doInRedis(RedisConnection connection) throws DataAccessException {
// XCLAIM 指令的實現方法
return connection.streamCommands().xClaim("my_stream".getBytes(), "my_group", "my_consumer1", Duration.ofSeconds(10), RecordId.of("1605524657157-0"));
}
});
for (ByteRecord byteRecord : retVal) {
LOGGER.info("改了消息的消費者:id={}, value={}", byteRecord.getId(), byteRecord.getValue());
}
}
}
日志輸出
改了消息的消費者:id=1605524657157-0, value={[B@10b4f345=[B@63de4fa}
再次查看XPENDING信息
127.0.0.1:6379> XPENDING my_stream my_group
1) (integer) 1
2) "1605524657157-0"
3) "1605524657157-0"
4) 1) 1) "my_consumer1"
2) "1"
可以看到强经,消息 “1605524657157-0”(message2),已經從“消費者2”名下寺渗,轉移到了”消費者1”匿情,接下來要做的事情,就是遍歷“消費者1”的pending列表信殊,消費掉它炬称。
讀取pending消息列表,進行消費
最開始在控制涡拘,演示了通過客戶端玲躯,進行消費者阻塞消費的時候,寫了一條命令
XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream >
其中最后那個>
鳄乏,表示ID跷车,是一個特殊字符,如果不是橱野,當ID不是特殊字符>
時, XREADGROUP
不再是從消息隊列中讀取消息, 而是從消費者的的pending消息列表中讀取歷史消息朽缴。(一般將參數設為0-0,表示讀取所有的pending消息)
Redis命令
127.0.0.1:6379> XREADGROUP GROUP my_group my_consumer1 BLOCK 0 STREAMS my_stream 0
1) 1) "my_stream"
2) 1) 1) "1605524657157-0"
2) 1) "message2"
2) "SpringBoot"
讀取到了水援,消費者1密强,pending消息中的唯一一條消息記錄
Java實現
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import io.springboot.jwt.SpringBootJwtApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
@SuppressWarnings("unchecked")
@Test
public void test() {
StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
// 從消費者的pending隊列中讀取消息
List<MapRecord<String, String, String>> retVal = streamOperations.read(Consumer.from("my_group", "my_consumer1"), StreamOffset.create("my_stream", ReadOffset.from("0")));
// 遍歷消息
for (MapRecord<String, String, String> record : retVal ) {
// 消費消息
LOGGER.info("消息id={}, 消息value={}", record.getId(), record.getValue());
// 手動ack消息
streamOperations.acknowledge("my_group", record);
}
}
}
這種方式,就是直接從消費者的pending隊列中讀取數據蜗元,手動進行消費或渤,然后Ack
日志
消息id=1605524657157-0, 消息value={message2=SpringBoot}
再次查看XPENDING信息
127.0.0.1:6379> XPENDING my_stream my_group
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
沒了,一條都沒奕扣,全部已經Ack了薪鹦。
死信
死信,就是一直沒法被消費的消息成畦,可以根據這個兩個屬性idle time
和delivery counter
進行判斷
idle time
當消息被消費者讀取后距芬,就會開始計時,如果一個pending消息的idle time
很長循帐,表示這消息框仔,可能是在Ack時發(fā)生了異常,或者說還沒來得及Ack拄养,消費者就宕機了离斩,導致一直沒有被Ack银舱,當消息發(fā)生了轉移,它會清零跛梗,重新計時寻馏。
delivery counter
,它表示轉移的次數核偿,每當一條消息的消費者發(fā)生變更的時候诚欠,它的值都會+1,如果一條pending消息的delivery counter
值很大漾岳,表示它在多個消費者之間進行了多次轉移都沒法成功消費轰绵,可以人工的讀取,消費掉尼荆。
最后
redis5的stream左腔,可以說功能還是蠻強大(設計上狠狠借鑒了一把Kakfa)。如果應用規(guī)模并不大捅儒,需要一個MQ服務液样,我想Stream的你可以試試看,比起自己搭建kakfa巧还,RocketMQ之類的鞭莽,來的快當而且更好維護。