實(shí)現(xiàn)
主要利用redis的brpop阻塞讀和Golang的goroutine并發(fā)控制以及os/exec執(zhí)行程序对雪,實(shí)現(xiàn)隊(duì)列有數(shù)據(jù)就立即執(zhí)行對(duì)應(yīng)程序并把結(jié)果set任務(wù)key坯认。
運(yùn)行參數(shù)
設(shè)置brpop的超時(shí)(-t)和同步調(diào)度時(shí)返回的結(jié)果ttl(-e)
./dispatchdeploy
Usage: -h 192.168.6.151 -p 6388 -t 300 -a /path/testfile.pl -e 1800
-a string
start appname (default "/path/testfile.pl")
-e int
redis expire time sec (default 1800)
-h string
redis ip
-p int
redis port (default 6379)
-t int
redis brpop timeout (default 300)
靜態(tài)數(shù)據(jù)
const (
maxthread = 2 //最大并發(fā)協(xié)程數(shù)
queueName = "qn_kt" //阻塞讀隊(duì)列
result_queueName = "rt_kt" //同步返回結(jié)果的key前綴
token = "##" //執(zhí)行調(diào)度參數(shù)的指定分隔符
sync_flag = "1"
)
關(guān)鍵代碼
//阻塞讀助币,當(dāng)有數(shù)據(jù)分割參數(shù)职烧,使用channel控制并發(fā)協(xié)程數(shù)获三,在execCmd的cmd.wait正常后釋放channel
for {
content, _ := redisdb.brpop(queueName, *timeout)
if content != nil {
args := strings.Split(string(content[1]), token)
if len(args) != 4 {
log.Printf("%v lack of para length %s\n", args, len(args))
} else {
//控制并發(fā)數(shù)
sync_num <- 1
go execCmd(*appname, args, redisdb)
log.Printf("%s %v Go\n", *appname, args)
}
} else {
log.Printf("timeout %d get nil contenet , just go on", *timeout)
}
}
測(cè)試
lpush三個(gè)調(diào)度到隊(duì)列
127.0.0.1:6888> lpush qn_kt "6234##ZYYC0001##20170620140000##0" "5234##ZYYC0001##20170620140000##1" "7234##ZYYC0001##20170620140000##1"
(integer) 3
//控制并發(fā)數(shù)為2塔沃,立即調(diào)度執(zhí)行了兩個(gè)perl程序裆赵,等到返回結(jié)果執(zhí)行第三個(gè)
2017/06/20 16:45:21 Start listen qn_kt
2017/06/20 16:45:25 testfile.pl [6234 ZYYC0001 20170620140000 0] Go
2017/06/20 16:45:25 testfile.pl [5234 ZYYC0001 20170620140000 1] Go
2017/06/20 16:45:30 testfile.pl [6234 ZYYC0001 20170620140000 0] finish
2017/06/20 16:45:30 testfile.pl [7234 ZYYC0001 20170620140000 1] Go
2017/06/20 16:45:30 testfile.pl [5234 ZYYC0001 20170620140000 S] finish
2017/06/20 16:45:35 testfile.pl [7234 ZYYC0001 20170620140000 S] finish
2017/06/20 16:45:51 timeout 20 get nil contenet , just go on
2017/06/20 16:46:12 timeout 20 get nil contenet , just go on
//同步調(diào)度任務(wù)執(zhí)行完成后set對(duì)應(yīng)任務(wù)號(hào)双絮,由接口程序讀取弛矛,1800秒后redis回收
127.0.0.1:6888> get rt_kt_7234_ZYYC0001
"7234##ZYYC0001##20170620140000##1"
127.0.0.1:6888> ttl rt_kt_7234_ZYYC0001
(integer) 1791