基于Redis實現(xiàn)分布式定時任務(wù)

目錄

1. 技術(shù)背景
2. 設(shè)計思想
3. 總結(jié)
常見問題
附錄

1. 技術(shù)背景

1.1. Redis Keyspace Notifications

Redis 2.8.0+開始Redis提供了Keyspace Notifications[^1]特性; 這一特性使得客戶端可以通過發(fā)布/訂閱來接收redis影響數(shù)據(jù)集相關(guān)事件, 例如:

  • 新建KEY
  • 對KEY執(zhí)行了LPUSH操作
  • KEY過期

1.1.1 配置

由于該特性會新增CPU消耗, keyspance events notifications是默認(rèn)關(guān)閉的, 可通過修改redis.conf或CONFIG SET 配置notify-keyspace-events來開啟,

K     Keyspace events, published with __keyspace@__ prefix.
E     Keyevent events, published with __keyevent@__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
A     Alias for g$lshzxe, so that the "AKE" string means all the events.

配置中至少需要出現(xiàn)K/E, 否則將不會接收到任何事件, 如果配置為KEA則會接收到任何可能的事件豺总。

#  specify at least one of K or E, no events will be delivered.
notify-keyspace-events "KEA"

注意: Redis的發(fā)布/訂閱閱后即焚是不支持持久化的, 故如果客戶端斷開重連則在這期間的消息將丟失!

1.1.2 測試

訂閱事件

s1.vm.net:6379> PSUBSCRIBE __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1

過期一個KEY

SET foo val EX 10

收到通知

1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "a"

1.1.3 RedisKeyExpiredEvent

網(wǎng)上實際有很多其他方案, 在spring-data-redis中已提供了對上面特性的實現(xiàn)只是很少有人介紹到, 我推薦使用以下方案, 則每當(dāng)有KEY失效則以下listener會收到消息:

public @Bean ApplicationListener redisKeyExpiredEventListener() {
        return event -> {
            System.out.println(String.format("A Received expire event for key=%s with value %s.", new String(event.getSource()), event.getValue()));
        }
}

實現(xiàn)原理是在org.springframework.data.redis.listener.KeyExpirationEventMessageListener中訂閱事件__keyevent@*__:expired如下:


public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
        ApplicationEventPublisherAware {

  private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");

    @Override
    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
    }
  ... 
}

1.2 Distributed Locks

有多種方式去實現(xiàn)分布式鎖, 關(guān)于使用Redis做分布式鎖我推薦大家可以看看附錄[^2]官方的文章, 里面詳細(xì)介紹了官方推薦的正確的實現(xiàn)方式彤钟。

1.2.1 RedisLockRegistry

Spring Integration[^3]中從4.0開始就提供了一種基于redis的分布式鎖實現(xiàn)RedisLockRegistry, 可用過用obtain方法直接獲取到java.util.concurrent.locks.Lock也很簡單:

// 1\. 創(chuàng)建對象
public @Bean RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) {         return new RedisLockRegistry(connectionFactory, "Foo-API"); 
}

@Autowired
private RedisLockRegistry redisLockRegistry;

// 并發(fā)方法
public void foo() {
    java.util.concurrent.locks.Lock lock = null;
    try {
        lock = redisLockRegistry.obtain(DistributedLockService.createLockKey(trigger));
        if (!lock.tryLock()) {
            // 未獲取到鎖
            return;
        }
        // 已成功獲取到分布式鎖
    } finally {
         // Unlock safely
         if (lock != null) try { lock.unlock(); } catch (Exception e) { /* NOTHING */ }
    }
}

1.2.3 java.util.concurrent.locks.Lock

根據(jù)實際的需求選擇使用tryLock/lock來實現(xiàn)我們的具體場景, java中對該對象定義如下:

public interface Lock {
    /**
     * Acquires the lock.
     *
     * If the lock is not available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until the
     * lock has been acquired.
     */
    void lock();

    /**
     * Acquires the lock unless the current thread is
     * {@linkplain Thread#interrupt interrupted}.
     *
     * Acquires the lock if it is available and returns immediately.
     *
     * If the lock is not available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until
     * one of two things happens:
     *
     * 
     * The lock is acquired by the current thread; or
     * Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread, and interruption of lock acquisition is supported.
     * 
     *
     * If the current thread:
     * 
     * has its interrupted status set on entry to this method; or
     * is {@linkplain Thread#interrupt interrupted} while acquiring the
     * lock, and interruption of lock acquisition is supported,
     * 
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is
     *         interrupted while acquiring the lock (and interruption
     *         of lock acquisition is supported)
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * Acquires the lock only if it is free at the time of invocation.
     *
     * Acquires the lock if it is available and returns immediately
     * with the value {@code true}.
     * If the lock is not available then this method will return
     * immediately with the value {@code false}.
     *
     * A typical usage idiom for this method would be:
     *   {@code
     * Lock lock = ...;
     * if (lock.tryLock()) {
     *   try {
     *     // manipulate protected state
     *   } finally {
     *     lock.unlock();
     *   }
     * } else {
     *   // perform alternative actions
     * }}
     *
     * This usage ensures that the lock is unlocked if it was acquired, and
     * doesn't try to unlock if the lock was not acquired.
     *
     * @return {@code true} if the lock was acquired and
     *         {@code false} otherwise
     */
    boolean tryLock();

    /**
     * Acquires the lock if it is free within the given waiting time and the
     * current thread has not been {@linkplain Thread#interrupt interrupted}.
     *
     * If the lock is available this method returns immediately
     * with the value {@code true}.
     * If the lock is not available then
     * the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until one of three things happens:
     * 
     * The lock is acquired by the current thread; or
     * Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread, and interruption of lock acquisition is supported; or
     * The specified waiting time elapses
     * 
     *
     * If the lock is acquired then the value {@code true} is returned.
     *
     * If the current thread:
     * 
     * has its interrupted status set on entry to this method; or
     * is {@linkplain Thread#interrupt interrupted} while acquiring
     * the lock, and interruption of lock acquisition is supported,
     * 
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.
     * If the time is
     * less than or equal to zero, the method will not wait at all.
     *
     * @param time the maximum time to wait for the lock
     * @param unit the time unit of the {@code time} argument
     * @return {@code true} if the lock was acquired and {@code false}
     *         if the waiting time elapsed before the lock was acquired
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while acquiring the lock (and interruption of lock
     *         acquisition is supported)
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * Releases the lock.
     */
    void unlock();

    ...
}

2. 設(shè)計思想

流程圖

2.1 任務(wù)管理

定義任務(wù)管理服務(wù), 用于受理其他服務(wù)程序通過RPC/DB/MQ等任務(wù)創(chuàng)建指令, 該服務(wù)根據(jù)任務(wù)等元數(shù)據(jù)(META DATA)判斷任務(wù)是需要立即執(zhí)行或是延時執(zhí)行虚倒。

  • 立即執(zhí)行 - 立即把任務(wù)交接給任務(wù)執(zhí)行立即開始執(zhí)行尖飞。
  • 延時執(zhí)行 - 將任務(wù)數(shù)據(jù)存入Redis并設(shè)置TTL = (執(zhí)行時間 - 當(dāng)前時間)蒜危。

2.2 執(zhí)行任務(wù)

根據(jù)不同等任務(wù)數(shù)據(jù)調(diào)用不用等任務(wù)具體實方法去執(zhí)行任務(wù), 例如執(zhí)行一條SQL破镰、執(zhí)行一個RPC調(diào)用等, 執(zhí)行成功則任務(wù)調(diào)度完成, 執(zhí)行不成功則根據(jù)任務(wù)元數(shù)據(jù)(META DATA)來控制任務(wù)執(zhí)行情況, 例如可約定以下數(shù)據(jù):

RETRY_INTERVAL = 3000 # 任務(wù)失敗重試間隔
MAX_RETRIES = 3 # 任務(wù)失敗最大重試次數(shù)

當(dāng)任務(wù)執(zhí)行失敗且還滿足可執(zhí)行條件, 則根據(jù)配置RETRY_INTERVAL將任務(wù)數(shù)據(jù)放入Redis并設(shè)置TTL = RETRY_INTERVAL, 則任務(wù)則會在TTL之后重新被執(zhí)行盼砍。

根據(jù)前面技術(shù)背景中提到當(dāng)Redis現(xiàn)有當(dāng)特性, 以及前面我們根據(jù)KEY的TTL來控制任務(wù)的執(zhí)行, 則收到KEY過期事件即代表任務(wù)達(dá)到執(zhí)行時間了; 但在分布式環(huán)境中, 多個JVM會同時監(jiān)聽到KEY過期, 為了防止任務(wù)重復(fù)執(zhí)行, 所以在可執(zhí)行任務(wù)前需要再結(jié)合分布式鎖獲取到鎖的JVM方可執(zhí)行任務(wù), 否則直接忽略該事件, 因為其他JVM已經(jīng)執(zhí)行了該任務(wù)碾盟。

3. 總結(jié)

本文描述的方案主要結(jié)合了Redis兩大特性:

  • Keyspace Notifications[^1]
  • 基于Redis的分布式鎖

來實現(xiàn)來分布式任務(wù)調(diào)度, 都基于Redis來實現(xiàn), 較大程度發(fā)揮了其自身優(yōu)勢, 相較于quartz[^4]更加輕量級妆丘。

常見問題

  • KEY過期沒有觸發(fā)失效事件
    檢查redis中notify-keyspace-events配置情況, 或者直接通過redis-cli連接到redis執(zhí)行MONITOR指令觀察消息情況锄俄。

附錄

[1] Redis Keyspace Notifications
https://redis.io/topics/notifications
[2] Distributed locks with Redis
https://redis.io/topics/distlock
[3] Spring Integration
https://spring.io/projects/spring-integration
[4] Quartz Enterprise Job Scheduler
http://www.quartz-scheduler.org/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市勺拣,隨后出現(xiàn)的幾起案子奶赠,更是在濱河造成了極大的恐慌,老刑警劉巖药有,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件毅戈,死亡現(xiàn)場離奇詭異,居然都是意外死亡愤惰,警方通過查閱死者的電腦和手機苇经,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來宦言,“玉大人扇单,你說我怎么就攤上這事〉焱” “怎么了蜘澜?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵施流,是天一觀的道長。 經(jīng)常有香客問我鄙信,道長瞪醋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任装诡,我火速辦了婚禮银受,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鸦采。我一直安慰自己蚓土,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布赖淤。 她就那樣靜靜地躺著蜀漆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪咱旱。 梳的紋絲不亂的頭發(fā)上确丢,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天,我揣著相機與錄音吐限,去河邊找鬼鲜侥。 笑死,一個胖子當(dāng)著我的面吹牛诸典,可吹牛的內(nèi)容都是我干的描函。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼狐粱,長吁一口氣:“原來是場噩夢啊……” “哼舀寓!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起肌蜻,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤互墓,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蒋搜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體篡撵,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年豆挽,在試婚紗的時候發(fā)現(xiàn)自己被綠了育谬。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡帮哈,死狀恐怖膛檀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤宿刮,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站私蕾,受9級特大地震影響僵缺,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜踩叭,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一磕潮、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧容贝,春花似錦自脯、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至满力,卻和暖如春焕参,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背油额。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工叠纷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人潦嘶。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓涩嚣,卻偏偏與公主長得像,于是被迫代替她去往敵國和親掂僵。 傳聞我的和親對象是個殘疾皇子航厚,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,514評論 2 348