目錄
1. 技術(shù)背景
2. 設(shè)計思想
3. 總結(jié)
常見問題
附錄
1. 技術(shù)背景
1.1. Redis Keyspace Notifications
從Redis 2.8.0+
開始Redis提供了Keyspace Notifications
[^1]特性; 這一特性使得客戶端可以通過發(fā)布/訂閱來接收redis影響數(shù)據(jù)集相關(guān)事件, 例如:
- 新建KEY
- 對KEY執(zhí)行了
LPUSH
操作 - KEY過期
1.1.1 配置
由于該特性會新增CPU消耗, keyspance events notifications
是默認(rèn)關(guān)閉的, 可通過修改redis.conf或CONFIG SET
配置notify-keyspace-events
來開啟,
K Keyspace events, published with __keyspace@__ prefix.
E Keyevent events, published with __keyevent@__ prefix.
g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$ String commands
l List commands
s Set commands
h Hash commands
z Sorted set commands
x Expired events (events generated every time a key expires)
e Evicted events (events generated when a key is evicted for maxmemory)
A Alias for g$lshzxe, so that the "AKE" string means all the events.
配置中至少需要出現(xiàn)K
/E
, 否則將不會接收到任何事件, 如果配置為KEA
則會接收到任何可能的事件豺总。
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events "KEA"
注意: Redis的發(fā)布/訂閱閱后即焚是不支持持久化的, 故如果客戶端斷開重連則在這期間的消息將丟失!
1.1.2 測試
訂閱事件
s1.vm.net:6379> PSUBSCRIBE __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1
過期一個KEY
SET foo val EX 10
收到通知
1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "a"
1.1.3 RedisKeyExpiredEvent
網(wǎng)上實際有很多其他方案, 在spring-data-redis
中已提供了對上面特性的實現(xiàn)只是很少有人介紹到, 我推薦使用以下方案, 則每當(dāng)有KEY失效則以下listener會收到消息:
public @Bean ApplicationListener redisKeyExpiredEventListener() {
return event -> {
System.out.println(String.format("A Received expire event for key=%s with value %s.", new String(event.getSource()), event.getValue()));
}
}
實現(xiàn)原理是在org.springframework.data.redis.listener.KeyExpirationEventMessageListener
中訂閱事件__keyevent@*__:expired
如下:
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
ApplicationEventPublisherAware {
private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
@Override
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
}
...
}
1.2 Distributed Locks
有多種方式去實現(xiàn)分布式鎖, 關(guān)于使用Redis做分布式鎖我推薦大家可以看看附錄[^2]官方的文章, 里面詳細(xì)介紹了官方推薦的正確的實現(xiàn)方式彤钟。
1.2.1 RedisLockRegistry
在Spring Integration
[^3]中從4.0開始就提供了一種基于redis的分布式鎖實現(xiàn)RedisLockRegistry
, 可用過用obtain
方法直接獲取到java.util.concurrent.locks.Lock
也很簡單:
// 1\. 創(chuàng)建對象
public @Bean RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) { return new RedisLockRegistry(connectionFactory, "Foo-API");
}
@Autowired
private RedisLockRegistry redisLockRegistry;
// 并發(fā)方法
public void foo() {
java.util.concurrent.locks.Lock lock = null;
try {
lock = redisLockRegistry.obtain(DistributedLockService.createLockKey(trigger));
if (!lock.tryLock()) {
// 未獲取到鎖
return;
}
// 已成功獲取到分布式鎖
} finally {
// Unlock safely
if (lock != null) try { lock.unlock(); } catch (Exception e) { /* NOTHING */ }
}
}
1.2.3 java.util.concurrent.locks.Lock
根據(jù)實際的需求選擇使用tryLock
/lock
來實現(xiàn)我們的具體場景, java中對該對象定義如下:
public interface Lock {
/**
* Acquires the lock.
*
* If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until the
* lock has been acquired.
*/
void lock();
/**
* Acquires the lock unless the current thread is
* {@linkplain Thread#interrupt interrupted}.
*
* Acquires the lock if it is available and returns immediately.
*
* If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
*
*
* The lock is acquired by the current thread; or
* Some other thread {@linkplain Thread#interrupt interrupts} the
* current thread, and interruption of lock acquisition is supported.
*
*
* If the current thread:
*
* has its interrupted status set on entry to this method; or
* is {@linkplain Thread#interrupt interrupted} while acquiring the
* lock, and interruption of lock acquisition is supported,
*
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is
* interrupted while acquiring the lock (and interruption
* of lock acquisition is supported)
*/
void lockInterruptibly() throws InterruptedException;
/**
* Acquires the lock only if it is free at the time of invocation.
*
* Acquires the lock if it is available and returns immediately
* with the value {@code true}.
* If the lock is not available then this method will return
* immediately with the value {@code false}.
*
* A typical usage idiom for this method would be:
* {@code
* Lock lock = ...;
* if (lock.tryLock()) {
* try {
* // manipulate protected state
* } finally {
* lock.unlock();
* }
* } else {
* // perform alternative actions
* }}
*
* This usage ensures that the lock is unlocked if it was acquired, and
* doesn't try to unlock if the lock was not acquired.
*
* @return {@code true} if the lock was acquired and
* {@code false} otherwise
*/
boolean tryLock();
/**
* Acquires the lock if it is free within the given waiting time and the
* current thread has not been {@linkplain Thread#interrupt interrupted}.
*
* If the lock is available this method returns immediately
* with the value {@code true}.
* If the lock is not available then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until one of three things happens:
*
* The lock is acquired by the current thread; or
* Some other thread {@linkplain Thread#interrupt interrupts} the
* current thread, and interruption of lock acquisition is supported; or
* The specified waiting time elapses
*
*
* If the lock is acquired then the value {@code true} is returned.
*
* If the current thread:
*
* has its interrupted status set on entry to this method; or
* is {@linkplain Thread#interrupt interrupted} while acquiring
* the lock, and interruption of lock acquisition is supported,
*
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned.
* If the time is
* less than or equal to zero, the method will not wait at all.
*
* @param time the maximum time to wait for the lock
* @param unit the time unit of the {@code time} argument
* @return {@code true} if the lock was acquired and {@code false}
* if the waiting time elapsed before the lock was acquired
*
* @throws InterruptedException if the current thread is interrupted
* while acquiring the lock (and interruption of lock
* acquisition is supported)
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* Releases the lock.
*/
void unlock();
...
}
2. 設(shè)計思想
2.1 任務(wù)管理
定義任務(wù)管理服務(wù), 用于受理其他服務(wù)程序通過RPC/DB/MQ等任務(wù)創(chuàng)建指令, 該服務(wù)根據(jù)任務(wù)等元數(shù)據(jù)(META DATA)判斷任務(wù)是需要立即執(zhí)行或是延時執(zhí)行虚倒。
-
立即執(zhí)行 - 立即把任務(wù)交接給
任務(wù)執(zhí)行
立即開始執(zhí)行尖飞。 -
延時執(zhí)行 - 將任務(wù)數(shù)據(jù)存入
Redis
并設(shè)置TTL = (執(zhí)行時間 - 當(dāng)前時間)
蒜危。
2.2 執(zhí)行任務(wù)
根據(jù)不同等任務(wù)數(shù)據(jù)調(diào)用不用等任務(wù)具體實方法去執(zhí)行任務(wù), 例如執(zhí)行一條SQL破镰、執(zhí)行一個RPC調(diào)用等, 執(zhí)行成功則任務(wù)調(diào)度完成, 執(zhí)行不成功則根據(jù)任務(wù)元數(shù)據(jù)(META DATA)來控制任務(wù)執(zhí)行情況, 例如可約定以下數(shù)據(jù):
RETRY_INTERVAL = 3000 # 任務(wù)失敗重試間隔
MAX_RETRIES = 3 # 任務(wù)失敗最大重試次數(shù)
當(dāng)任務(wù)執(zhí)行失敗且還滿足可執(zhí)行條件, 則根據(jù)配置RETRY_INTERVAL
將任務(wù)數(shù)據(jù)放入Redis
并設(shè)置TTL = RETRY_INTERVAL
, 則任務(wù)則會在TTL
之后重新被執(zhí)行盼砍。
根據(jù)前面技術(shù)背景中提到當(dāng)Redis
現(xiàn)有當(dāng)特性, 以及前面我們根據(jù)KEY的TTL來控制任務(wù)的執(zhí)行, 則收到KEY過期事件即代表任務(wù)達(dá)到執(zhí)行時間了; 但在分布式環(huán)境中, 多個JVM會同時監(jiān)聽到KEY過期, 為了防止任務(wù)重復(fù)執(zhí)行, 所以在可執(zhí)行任務(wù)前需要再結(jié)合分布式鎖獲取到鎖的JVM方可執(zhí)行任務(wù), 否則直接忽略該事件, 因為其他JVM已經(jīng)執(zhí)行了該任務(wù)碾盟。
3. 總結(jié)
本文描述的方案主要結(jié)合了Redis
兩大特性:
- Keyspace Notifications[^1]
- 基于Redis的分布式鎖
來實現(xiàn)來分布式任務(wù)調(diào)度, 都基于Redis來實現(xiàn), 較大程度發(fā)揮了其自身優(yōu)勢, 相較于quartz
[^4]更加輕量級妆丘。
常見問題
- KEY過期沒有觸發(fā)失效事件
檢查redis中notify-keyspace-events
配置情況, 或者直接通過redis-cli
連接到redis執(zhí)行MONITOR
指令觀察消息情況锄俄。
附錄
[1] Redis Keyspace Notifications
https://redis.io/topics/notifications
[2] Distributed locks with Redis
https://redis.io/topics/distlock
[3] Spring Integration
https://spring.io/projects/spring-integration
[4] Quartz Enterprise Job Scheduler
http://www.quartz-scheduler.org/