一個結(jié)合數(shù)據(jù)庫的任務隊列管理方案(iOS實現(xiàn))

前言

在開發(fā)中如果碰到需要執(zhí)行一些耗時比較長的任務渤早,但是又要保證任務不能丟失神妹,比如執(zhí)行過程中由于某種原因app發(fā)生了crash骑脱,需要在下次app啟動后的適當時機重新執(zhí)行任務的情況痴柔,該如何解決裳涛?

解決方案

  • 假設我們需要在后臺執(zhí)行的任務類型有上傳圖片的任務A,發(fā)送聊天消息的任務B迁筛,視頻編輯處理的任務C煤蚌,所有耗時比較久并且重要性很高,需要保證不能丟失的任務都可以抽象為一個TaskModel。這個TaskModel保留了任務執(zhí)行需要的基本信息铺然,以及用于隊列管理的必要屬性俗孝。方案的主要思路就是維護一個用于管理TaskModel的串行任務隊列TaskQueueService(后面簡稱為TaskQueue)酒甸,這個TaskQueue結(jié)合數(shù)據(jù)庫對外提供管理TaskModel的各類方法魄健,比如AddTask等,在一個任務需要被執(zhí)行時插勤,就會通過TaskQueue的add方法添加到隊列中沽瘦,并且同時將TaskModel持久化到數(shù)據(jù)庫中,保證任務不會丟失农尖。然后TaskQueue根據(jù)自己的某種規(guī)則從數(shù)據(jù)庫中取出TaskModel析恋,對任務進行調(diào)度執(zhí)行。

類結(jié)構(gòu)說明

  • 我們抽象出任務的基類為TaskModel盛卡,需要執(zhí)行的具體任務類型A助隧、B、C為繼承TaskModel的類TaskModelA滑沧、TaskModelB并村、TaskModelC,然后在各自的類中添加業(yè)務需要的基本信息滓技,以及根據(jù)需要重寫基類的方法哩牍。TaskModel結(jié)構(gòu)如下類圖所示:


    TaskModel.png

taskID為自增Id,當task被添加到隊列令漂,寫入數(shù)據(jù)庫中時會自動加1膝昆。
status表示任務執(zhí)行的狀態(tài),是一個枚舉值叠必,枚舉類型分為init荚孵、running、suspend纬朝、finish收叶、fail、remove六種狀態(tài)玄组,在taskmodel剛被寫入數(shù)據(jù)中時是init的狀態(tài)滔驾,在taskModel執(zhí)行失敗后會將狀態(tài)置為fail。
priority表示Task的優(yōu)先級俄讹,默認值為low哆致,在taskQueue中可以根據(jù)task的優(yōu)先級對高優(yōu)先級的任務優(yōu)先調(diào)度執(zhí)行。
customID用于對task做某種標記患膛,方便從數(shù)據(jù)庫查詢摊阀。
className存儲具體任務的類型如TaskModelA。
data內(nèi)存儲TaskModel歸檔后的data數(shù)據(jù)
runCount用于存儲task執(zhí)行的次數(shù),可以用于控制重試次數(shù)

run()方法是子類繼承TaskModel后必須實現(xiàn)的方法胞此,里面寫任務執(zhí)行的邏輯
prepareForAddToQueue()方法是在run之前會執(zhí)行的方法臣咖,如果重寫了該方法,那么在taskQueue執(zhí)行task之前會先調(diào)用prepare方法漱牵,返回為yes才會繼續(xù)執(zhí)行task的run方法夺蛇,prepare方法可以用于對task的一些校驗。
retryNextTime()方法通過調(diào)用taskQueue的retryTask方法將自身的status重置為init狀態(tài)酣胀,那么在taskQueue執(zhí)行接下來的任務時刁赦,就會重新執(zhí)行到該task。
suspend()方法是將自身的status改為suspend掛起狀態(tài)闻镶,那么taskQueue在執(zhí)行接下來的task時甚脉,由于只會取status為init的task執(zhí)行,就不會執(zhí)行到suspend狀態(tài)的task铆农。當需要重新執(zhí)行已掛起的task時牺氨,調(diào)用retry方法就可以重新將該task添加到執(zhí)行隊列中。

  • TaskQueueService是我們維護的任務隊列墩剖,它是一個單例對象猴凹。TaskQueueService的結(jié)構(gòu)如下所示:


    TaskQueueService.png

taskSignal用于在task執(zhí)行改變狀態(tài)時對外發(fā)送信號,在對應的controller中可以通過taskSignal傳出的task做一些ui或者業(yè)務邏輯涛碑。
runningTask表示當前隊列正在執(zhí)行的taskModel
suspendTasks保存了所有被掛起的taskModel

  • runNextTask()方法為TaskQueue最核心的方法精堕,該方法中用異步執(zhí)行通過某種規(guī)則從數(shù)據(jù)庫中取出的最合適的taskModel。

架構(gòu)圖

主要架構(gòu)圖.png
  • 如上圖所示蒲障,假設所有的task都為相同優(yōu)先級歹篓,TaskQueue的runNextTask方法中取從數(shù)據(jù)庫篩選出來status為init狀態(tài)并且根據(jù)taskId排序,拿到最先進入的一個task去執(zhí)行揉阎。在task執(zhí)行完成后庄撮,又會觸發(fā)runNextTask方法,從而繼續(xù)從數(shù)據(jù)庫讀取下一個最合適的taskModel去執(zhí)行任務毙籽。

實現(xiàn)

TaskModel
  • 首先TaskModel是需要存儲在數(shù)據(jù)庫中的洞斯,結(jié)合上一章WCDB的使用,我們將TaskModel繼承自RSModel類坑赡,以支持數(shù)據(jù)庫寫入烙如。TaskModel的.h文件十分簡潔,主要是一些關鍵屬性的暴露和task操作方法的抽象:
#import "RSModel.h"
typedef NS_ENUM(NSInteger ,RSTaskQueueTaskModelStatus) {
    RSTaskQueueTaskModelStatusInit,
    RSTaskQueueTaskModelStatusRunning,
    RSTaskQueueTaskModelStatusSuspend,
    RSTaskQueueTaskModelStatusFinish,
    RSTaskQueueTaskModelStatusFail,
    RSTaskQueueTaskModelStatusRemove,
};
typedef NS_ENUM(NSInteger ,RSTaskQueueTaskModelPriority) {
    RSTaskQueueTaskModelPriorityLow,
    RSTaskQueueTaskModelPriorityMiddle,
    RSTaskQueueTaskModelPriorityHigh,
};
@interface RSTaskQueueTaskModel : RSModel
@property (nonatomic, assign) NSInteger taskId;
@property (nonatomic, assign) RSTaskQueueTaskModelStatus status;
@property (nonatomic, assign) RSTaskQueueTaskModelPriority priority;
@property (nonatomic, strong) NSString *customId;
@property (nonatomic, strong) NSString *customType;
@property (nonatomic, strong) NSString *className;
@property (nonatomic, strong) NSData *data;
@property (nonatomic, assign) NSInteger runCount;
-(void)run;//任務運行主入口毅否,子類需要實現(xiàn)它
-(void)suspend;//執(zhí)行異步操作時調(diào)用亚铁,可以掛起當前任務,防止任務隊列被阻塞
-(void)pop;//任務執(zhí)行成功需要顯式調(diào)用螟加,將任務從隊列中移除
-(void)fail;//任務執(zhí)行失敗需要顯式調(diào)用, 修改任務的狀態(tài)
-(void)retryNextTime;//重新入隊徘溢,等待重試
-(BOOL)prepareForAddToQueue;//任務被插入任務隊列前吞琐,自動調(diào)用,子類可以重載它然爆。返回失敗則任務不用被加入任務隊列
-(BOOL)remove;//從任務隊列中移除任務
@end

TaskModel的.m文件需要定義類文件中綁定到數(shù)據(jù)庫表的字段以及主鍵的設置站粟、默認值的設置以及約束等。

#import "RSTaskQueueTaskModel+WCTTableCoding.h"
#import "RSTaskQueueTaskModel.h"
#import <WCDB/WCDB.h>
#import "RSDBService.h"
#import "RSTaskQueueService.h"
#import <YYModel.h>

@implementation RSTaskQueueTaskModel

WCDB_IMPLEMENTATION(RSTaskQueueTaskModel)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, taskId)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, customId)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, customType)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, className)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, data)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, runCount)
WCDB_SYNTHESIZE_DEFAULT(RSTaskQueueTaskModel, status, RSTaskQueueTaskModelStatusInit)
WCDB_SYNTHESIZE_DEFAULT(RSTaskQueueTaskModel, priority, RSTaskQueueTaskModelPriorityLow)
WCDB_PRIMARY_ASC_AUTO_INCREMENT(RSTaskQueueTaskModel, taskId)
//設置status和priority的默認值曾雕,以及taskId自增
-(instancetype)init {
    self = [super init];
    if (self) {
        static dispatch_once_t token;
        dispatch_once(&token, ^{
            [RSTaskQueueTaskModel createDBTable];
        });
        self.isAutoIncrement = YES;
        self.runCount = 0;
    }
    return self;
}
-(instancetype)initWithCoder:(NSCoder *)aDecoder {
    self = [super initWithCoder:aDecoder];
    if (self) {
        
    }
    return self;
}

+(void)createDBTable {
    if ([[RSDBService db] createTableAndIndexesOfName:NSStringFromClass([RSTaskQueueTaskModel class]) withClass:[RSTaskQueueTaskModel class]]) {
        NSLog(@"creat table RSTaskQueueTaskModel success");
    } else {
        NSLog(@"creat table RSTaskQueueTaskModel fail");
    }
}

-(BOOL)prepareForAddToQueue {
    self.className = NSStringFromClass([self class]);
    self.data = [self yy_modelToJSONData];
//這里使用YYModel將model轉(zhuǎn)換為data存入數(shù)據(jù)庫
    return YES;
}

-(void)run {
    
}
-(void)suspend {
    [[RSTaskQueueService shareInstance] suspendTask:self];
}
-(void)pop {
    [[RSTaskQueueService shareInstance] popTask:self];
}
-(void)retryNextTime {
    [[RSTaskQueueService shareInstance] retryTask:self];
}

-(void)fail {
    [[RSTaskQueueService shareInstance] failTask:self];
}
-(BOOL)remove {
    return [[RSTaskQueueService shareInstance] removeTask:self];
}
//以上的操作方法實現(xiàn)實際上就是調(diào)用TaskQueue的方法奴烙,傳入?yún)?shù)為本身,所以重點還是在TaskQueue對任務的調(diào)度翻默。TaskModel只是一個支持寫入數(shù)據(jù)庫的對Task抽象的Model
@end
TaskQueueService

TaskQueueService的.h基本和類圖中的屬性方法一致缸沃,按照實際需求多寫了幾個從數(shù)據(jù)庫查詢taskModel的接口。

#import <Foundation/Foundation.h>
#import "RSTaskQueueTaskModel.h"
@interface RSTaskQueueService : NSObject
@property (nonatomic, strong) RACSubject *taskSignal;
@property (nonatomic, strong)  RSTaskQueueTaskModel *runingTask;
@property (nonatomic, strong)  NSMutableArray<RSTaskQueueTaskModel *> *suspendTasks;
+(RSTaskQueueService *)shareInstance;

-(BOOL)addTask:(RSTaskQueueTaskModel *)task;
-(BOOL)retryTask:(RSTaskQueueTaskModel *)task;
-(void)runNextTask;
-(void)popTask:(RSTaskQueueTaskModel *)task;
-(BOOL)failTask:(RSTaskQueueTaskModel *)task;
-(BOOL)removeTask:(RSTaskQueueTaskModel *)task;
//-(void)finishTaskWaitForNextTime:(RSTaskQueueTaskModel *)task;
-(BOOL)suspendTask:(RSTaskQueueTaskModel *)task;
-(void)resetAllTasks;

-(void)retryAllFailTasks;

- (BOOL)deleteTaskWithClassName:(NSString *)className customId:(NSString*)customId;

-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className customId:(NSString*)customId;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className status:(RSTaskQueueTaskModelStatus)status;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className;

-(BOOL)isAllTaskFinish;
@end

在.m中修械,需要實現(xiàn)taskQueue的單例初始化方法,通過調(diào)用[RSTaskQueueService shareInstance]來獲取taskQueue。

+(RSTaskQueueService *)shareInstance {
    static dispatch_once_t once;
    static id sharedInstance;
    dispatch_once(&once, ^{
        sharedInstance = [[RSTaskQueueService alloc] init];
    });
    return sharedInstance;
}

-(instancetype)init {
    self = [super init];
    if (self) {
        self.runingTask = nil;
        self.taskSignal = [RACSubject subject];
        self.suspendTasks = [[NSMutableArray alloc] init];
    }
    return self;
}

在一個任務TaskModel需要被執(zhí)行是检盼,會調(diào)用TaskQueue的addTask方法肯污,如果該任務是符合執(zhí)行規(guī)定的,那么就會將任務寫入TaskModel數(shù)據(jù)庫吨枉,同時對外發(fā)送一個taskSignal信號蹦渣,以同步當前task的狀態(tài),然后調(diào)用runNextTask方法去做任務的調(diào)度執(zhí)行貌亭。

-(BOOL)addTask:(RSTaskQueueTaskModel *)task {
    if ([task prepareForAddToQueue]) {
        @synchronized(self) {
            BOOL result = [[RSDBService db] insertObject:task into:NSStringFromClass([RSTaskQueueTaskModel class])];
            if (result) {
                [self.taskSignal sendNext:task];
                [self runNextTask];
            }
            return result;
        }
    }
    return NO;
}

在runNextTask方法中柬唯,首先是開辟了一個異步線程,然后判斷是否有task正在執(zhí)行圃庭,如果有的話就返回锄奢,沒有的話從數(shù)據(jù)庫中讀取最合適的taskModel來跑。如何判斷是否是最合適的篩選條件剧腻,也就是task調(diào)度的方法了拘央。在這里選取的是status為init并且taskId最小,也就是最先加入的任務來執(zhí)行书在。

-(void)runNextTask {
    [RSUtils dispatch_async_background:^{
        @synchronized(self) {
            if (self.runingTask) {
                //有任務在執(zhí)行中
                return;
            }
            NSArray *tmp = [[RSDBService db] getObjectsOfClass:[RSTaskQueueTaskModel class] fromTable:NSStringFromClass([RSTaskQueueTaskModel class]) where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusInit orderBy:RSTaskQueueTaskModel.taskId.order(WCTOrderedAscending)];
            if ([tmp count] > 0) {
                RSLogInfo(@"RSTaskQueueService task count:%ld", tmp.count);
                RSTaskQueueTaskModel *task = [tmp firstObject];
                Class clazz = NSClassFromString(task.className);
                RSTaskQueueTaskModel * object = [[clazz alloc] init];
                [object yy_modelSetWithJSON:task.data];
                [object setTaskId:task.taskId];
                [object setStatus:RSTaskQueueTaskModelStatusRunning];
                object.runCount = object.runCount + 1;
               BOOL dbResult = [[RSDBService db] updateRowsInTable:DBTableName(RSTaskQueueTaskModel) onProperties:{RSTaskQueueTaskModel.status, RSTaskQueueTaskModel.runCount} withObject:object where:RSTaskQueueTaskModel.taskId==object.taskId];
                if (dbResult) {
                    self.runingTask = object;
                    [self.taskSignal sendNext:object];
                    RSLogInfo(@"RSTaskQueueService run taskId:%d", object.taskId);
                    [object run];
                } else {
                    RSLogError(@"RSTaskQueueService update task status fail taskId:%d", object.taskId);
                }
            } else {
                RSLogInfo(@"RSTaskQueueService is empty");
            }
        }
    }];
}

resetAllTasks是將所有未完成的任務重置的方法灰伟,主要用于app初始化的時候調(diào)用,以重置之前未執(zhí)行成功的tasks儒旬。

-(void)resetAllTasks {
    @synchronized(self) {
        self.runingTask = nil;
        [self.suspendTasks removeAllObjects];
        RSTaskQueueTaskModel *task = [RSTaskQueueTaskModel new];
        task.status = RSTaskQueueTaskModelStatusInit;
        [[RSDBService db] updateRowsInTable:NSStringFromClass([RSTaskQueueTaskModel class]) onProperty:RSTaskQueueTaskModel.status withObject:task where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusRunning||RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusSuspend];
    }
}

retryAllFailTasks方法將所有執(zhí)行失敗的task重新執(zhí)行栏账,原理還是將status狀態(tài)由fail改為init狀態(tài)寫入數(shù)據(jù)庫,下一次taskQueue開始run的時候就會考慮執(zhí)行到這些task了栈源。

-(void)retryAllFailTasks {
    NSArray *tmp = [[RSDBService db] getObjectsOfClass:[RSTaskQueueTaskModel class] fromTable:NSStringFromClass([RSTaskQueueTaskModel class]) where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusFail orderBy:RSTaskQueueTaskModel.taskId.order(WCTOrderedAscending)];
    for (RSTaskQueueTaskModel *task in tmp) {
        Class clazz = NSClassFromString(task.className);
        RSTaskQueueTaskModel * object = [[clazz alloc] init];
        [object yy_modelSetWithJSON:task.data];
        [object setTaskId:task.taskId];
        [object setStatus:task.status];
        [object retryNextTime];
    }
}

其他的方法就不列出了挡爵,代碼都差不多,核心點還是在于結(jié)合數(shù)據(jù)庫修改taskModel的狀態(tài)凉翻,然后taskQueue在執(zhí)行runNextTask的時候就會自動執(zhí)行最合適的task了讨。

結(jié)束

其實整個方案的架構(gòu)并不復雜捻激,通俗易懂,主要是在于實現(xiàn)以及解決應用中的問題前计。方案是可以根據(jù)需求隨時修改的胞谭,沒有最好的方案,只有最合適的方案男杈。當然文中的taskQueueService還有更多優(yōu)化的空間丈屹,有不合理的地方大家可以提出,一起交流伶棒,共同進步旺垒。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市肤无,隨后出現(xiàn)的幾起案子先蒋,更是在濱河造成了極大的恐慌,老刑警劉巖宛渐,帶你破解...
    沈念sama閱讀 221,430評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件竞漾,死亡現(xiàn)場離奇詭異,居然都是意外死亡窥翩,警方通過查閱死者的電腦和手機业岁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來寇蚊,“玉大人笔时,你說我怎么就攤上這事≌贪叮” “怎么了允耿?”我有些...
    開封第一講書人閱讀 167,834評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長爹梁。 經(jīng)常有香客問我右犹,道長,這世上最難降的妖魔是什么姚垃? 我笑而不...
    開封第一講書人閱讀 59,543評論 1 296
  • 正文 為了忘掉前任念链,我火速辦了婚禮,結(jié)果婚禮上积糯,老公的妹妹穿的比我還像新娘掂墓。我一直安慰自己,他們只是感情好看成,可當我...
    茶點故事閱讀 68,547評論 6 397
  • 文/花漫 我一把揭開白布君编。 她就那樣靜靜地躺著,像睡著了一般川慌。 火紅的嫁衣襯著肌膚如雪吃嘿。 梳的紋絲不亂的頭發(fā)上祠乃,一...
    開封第一講書人閱讀 52,196評論 1 308
  • 那天,我揣著相機與錄音兑燥,去河邊找鬼亮瓷。 笑死,一個胖子當著我的面吹牛降瞳,可吹牛的內(nèi)容都是我干的嘱支。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼挣饥,長吁一口氣:“原來是場噩夢啊……” “哼除师!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起扔枫,我...
    開封第一講書人閱讀 39,671評論 0 276
  • 序言:老撾萬榮一對情侶失蹤汛聚,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后茧吊,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贞岭,經(jīng)...
    沈念sama閱讀 46,221評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,303評論 3 340
  • 正文 我和宋清朗相戀三年搓侄,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片话速。...
    茶點故事閱讀 40,444評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡讶踪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出泊交,到底是詐尸還是另有隱情乳讥,我是刑警寧澤,帶...
    沈念sama閱讀 36,134評論 5 350
  • 正文 年R本政府宣布廓俭,位于F島的核電站云石,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏研乒。R本人自食惡果不足惜汹忠,卻給世界環(huán)境...
    茶點故事閱讀 41,810評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望雹熬。 院中可真熱鬧宽菜,春花似錦、人聲如沸竿报。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽烈菌。三九已至阵幸,卻和暖如春花履,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背挚赊。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評論 1 272
  • 我被黑心中介騙來泰國打工诡壁, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人咬腕。 一個月前我還...
    沈念sama閱讀 48,837評論 3 376
  • 正文 我出身青樓欢峰,卻偏偏與公主長得像,于是被迫代替她去往敵國和親涨共。 傳聞我的和親對象是個殘疾皇子纽帖,可洞房花燭夜當晚...
    茶點故事閱讀 45,455評論 2 359

推薦閱讀更多精彩內(nèi)容