這里主要分析下brpc中多線程下定時器實現(xiàn)锡搜,主要是設(shè)計思路橙困,順便列一下其他開源中的實現(xiàn),這里主要是整體架構(gòu)實現(xiàn)耕餐,非單純的定時器實現(xiàn)凡傅。
如果要測試對比性能,可能也沒法對比肠缔,可能分析下復(fù)雜度夏跷,因為不同的實現(xiàn)如小根堆,timewheel明未,linux內(nèi)核中的實現(xiàn)以及fd之類的實現(xiàn)槽华,各有不同,需要根據(jù)自己的業(yè)務(wù)去處理選擇趟妥,這邊以開源實現(xiàn)分析猫态,fd這個不是很清楚就暫時跳過。
管理多個定時節(jié)點的數(shù)據(jù)結(jié)構(gòu)有小根堆披摄,timewheel亲雪,linux內(nèi)核中的實現(xiàn)和brpc中多個bucket加鏈表的形式。一般定時任務(wù)都由一個定時線程來處理疚膊,可能是那種每隔一段時間執(zhí)行一次义辕,要執(zhí)行多少次,這里可以用個loop字段控制即可酿联。
小根堆
小根堆的時間復(fù)雜度在建堆時為O(n)终息,超時刪除根節(jié)點和增加定時節(jié)點時調(diào)整堆為O(logn),刪除某個定時節(jié)點要調(diào)整堆結(jié)構(gòu)贞让,也是O(logn)周崭。以根節(jié)點和當(dāng)前時間比較超時距離,epoll_wait這個時間差喳张,要么超時要么有事件發(fā)生续镇,類似實現(xiàn)比如redis,或者如phxrpc一樣強制4ms销部,雖然有點誤差摸航,但這里的設(shè)計有點微妙。
這里列出phxrpc中協(xié)程加timer的實現(xiàn)部分:
std::vector<TimerObj> timer_heap_; //timer的實現(xiàn)
245 bool UThreadEpollScheduler::Run() {
246 //more code...
250 int next_timeout = timer_.GetNextTimeout();
251
252 for (; (run_forever_) || (!runtime_.IsAllDone());) {
253 int nfds = epoll_wait(epoll_fd_, events, max_task_, 4);//強制wait 4ms
254 //more code...
285 DealwithTimeout(next_timeout); //處理超時任務(wù)
295 } //more code
296 return true;
297 }
315 void UThreadEpollScheduler::DealwithTimeout(int &next_timeout) {
316 while (true) {
317 next_timeout = timer_.GetNextTimeout();
318 if (0 != next_timeout) {
319 break;
320 }
321
322 UThreadSocket_t * socket = timer_.PopTimeout();
323 socket->waited_events = UThreadEpollREvent_Timeout;
324 runtime_.Resume(socket->uthread_id);
325 }
326 }
//讓出協(xié)程
533 void UThreadWait(UThreadSocket_t &socket, const int timeout_ms) {
534 socket.uthread_id = socket.scheduler->GetCurrUThread();
535 socket.scheduler->AddTimer(&socket, timeout_ms);//添加定時器
536 socket.scheduler->YieldTask(); //切出去
537 socket.scheduler->RemoveTimer(socket.timer_id);//切回來后刪除定時器
538 }
每個線程一個timer管理器舅桩,不會有競爭酱虎,且一個線程里有一些協(xié)程。
timewheel
timewheel是時間輪定時器擂涛,比如有100個槽的時間輪读串,會把當(dāng)前時間的對應(yīng)tick數(shù)加上一個超時時間tick,取99余,然后分發(fā)到某個槽中恢暖,比如每200ms一個tick排监,那么下一個tick時處理當(dāng)前的槽中,判斷每個節(jié)點有沒有超時杰捂,同一個槽中的節(jié)點都是對tick數(shù)取余相等的節(jié)點舆床,一般用鏈表實現(xiàn)。這樣余相同的可能會超時嫁佳,可能不會超時挨队,余不相同的肯定不會在當(dāng)前tick超時。
那這里可能每個tick要喚醒一次脱拼,然后遍歷當(dāng)前的槽并判斷有沒有超時瞒瘸,這個鏈表的長短可能不均勻坷备,且超時后熄浓,摘除節(jié)點為O(1)。如果tick的精度不夠省撑,可能被喚醒的次數(shù)更多赌蔑。
之前看過某開源是這么實現(xiàn)的,然后上一個抗ddos防火墻項目也是這么設(shè)計的竟秫,多線程娃惯,但每個線程中都有一個timewheel,掛著幾千萬個會話信息肥败,等待超時趾浅。這塊就不分析具體開源實現(xiàn)了。
類似實現(xiàn)可以參考TimingWheel[時間輪]介紹
linux內(nèi)核中的定時器
linux內(nèi)核中的定時器馒稍,我沒看皿哨,但是看的是skynet中的证膨,具體移至淺析skynet底層框架中篇,結(jié)構(gòu)如下:
46 struct timer {
47 struct link_list near[TIME_NEAR]; //即將處理的節(jié)點
48 struct link_list t[4][TIME_LEVEL]; //由時間遠近分層
49 struct spinlock lock;
50 uint32_t time; //累計多少個十毫秒
51 uint32_t starttime;
52 uint64_t current;
53 uint64_t current_point;
54 };
55 static void
56 wakeup(struct monitor *m, int busy) {
57 if (m->sleep >= m->count - busy) {
58 // signal sleep worker, "spurious wakeup" is harmless
59 pthread_cond_signal(&m->cond);
60 }
61 }
63 static void *
64 thread_socket(void *p) {
65 struct monitor * m = p;
66 skynet_initthread(THREAD_SOCKET);
67 for (;;) {
68 int r = skynet_socket_poll();
69 //more code ...
75 wakeup(m,0); //有消息過來央勒,喚醒一個工程線程處理定時任務(wù)
76 }
77 return NULL;
78 }
128 static void *
129 thread_timer(void *p) {
130 struct monitor * m = p;
131 skynet_initthread(THREAD_TIMER);
132 for (;;) {
133 skynet_updatetime();
134 CHECK_ABORT
135 wakeup(m,m->count-1); //喚醒一個工作線程
136 usleep(2500); //強制sleep2.5ms
137 //more code ...
141 }
142 //more code ...
149 return NULL;
150 }
152 static void *
153 thread_worker(void *p) {
154 //more code ...
161 while (!m->quit) {
162 q = skynet_context_message_dispatch(sm, q, weight);
163 if (q == NULL) {
164 if (pthread_mutex_lock(&m->mutex) == 0) {
165 ++ m->sleep;
166 // "spurious wakeup" is harmless,
167 // because skynet_context_message_dispatch() can be call at any time.
168 if (!m->quit)
169 pthread_cond_wait(&m->cond, &m->mutex);//沒消息則掛起
170 -- m->sleep;
171 if (pthread_mutex_unlock(&m->mutex)) {
173 exit(1);
174 }
175 }
176 }
177 }
178 return NULL;
179 }
95 static void
96 timer_add(struct timer *T,void *arg,size_t sz,int time) {
97 struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
98 memcpy(node+1,arg,sz);
99
100 SPIN_LOCK(T);
101
102 node->expire=time+T->time;
103 add_node(T,node);
104
105 SPIN_UNLOCK(T);
106 }
如上實現(xiàn)崔步,多線程競爭一把全局自旋鎖,然后把定時任務(wù)添加到timer結(jié)構(gòu)中井濒,并設(shè)置超時時間。
brpc timer的實現(xiàn)
關(guān)于brpc中timer+futex的實現(xiàn),其中自己實現(xiàn)了futex功能:
29 class SimuFutex {
30 public:
31 SimuFutex() : counts(0)
32 , ref(0) {
33 pthread_mutex_init(&lock, NULL);
34 pthread_cond_init(&cond, NULL);
35 }
36 ~SimuFutex() {
37 pthread_mutex_destroy(&lock);
38 pthread_cond_destroy(&cond);
39 }
40
41 public:
42 pthread_mutex_t lock;
43 pthread_cond_t cond;
44 int32_t counts; //有多少個線程等待
45 int32_t ref; //引用計數(shù)
46 };
48 static pthread_mutex_t s_futex_map_mutex = PTHREAD_MUTEX_INITIALIZER;
49 static pthread_once_t init_futex_map_once = PTHREAD_ONCE_INIT;
50 static std::unordered_map<void*, SimuFutex>* s_futex_map = NULL;
60 int futex_wait_private(void* addr1, int expected, const timespec* timeout) { //等待
61 if (pthread_once(&init_futex_map_once, InitFutexMap) != 0) {
63 exit(1);
64 }
65 std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);
66 SimuFutex& simu_futex = (*s_futex_map)[addr1];
67 ++simu_futex.ref;
68 mu.unlock();
69
70 int rc = 0;
71 {
72 std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
73 if (static_cast<butil::atomic<int>*>(addr1)->load() == expected) {
74 ++simu_futex.counts;
75 if (timeout) {
76 timespec timeout_abs = butil::timespec_from_now(*timeout);
77 if ((rc = pthread_cond_timedwait(&simu_futex.cond, &simu_futex.lock, &timeout_ab s)) != 0) {
78 errno = rc;
79 rc = -1;
80 }
81 } else {
82 if ((rc = pthread_cond_wait(&simu_futex.cond, &simu_futex.lock)) != 0) {
83 errno = rc;
84 rc = -1;
85 }
86 }
87 --simu_futex.counts;
88 } else {
89 errno = EAGAIN;
90 rc = -1;
91 }
92 }
94 std::unique_lock<pthread_mutex_t> mu1(s_futex_map_mutex);
95 if (--simu_futex.ref == 0) {
96 s_futex_map->erase(addr1);
97 }
98 mu1.unlock();
99 return rc;
100 }
102 int futex_wake_private(void* addr1, int nwake) { //喚醒
103 if (pthread_once(&init_futex_map_once, InitFutexMap) != 0) {
105 exit(1);
106 }
107 std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);
108 auto it = s_futex_map->find(addr1);
109 if (it == s_futex_map->end()) {
110 mu.unlock();
111 return 0;
112 }
113 SimuFutex& simu_futex = it->second;
114 ++simu_futex.ref;
115 mu.unlock();
116
117 int nwakedup = 0;
118 int rc = 0;
119 {
120 std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
121 nwake = (nwake < simu_futex.counts)? nwake: simu_futex.counts;
122 for (int i = 0; i < nwake; ++i) {
123 if ((rc = pthread_cond_signal(&simu_futex.cond)) != 0) {
124 errno = rc;
125 break;
126 } else {
127 ++nwakedup;
128 }
129 }
130 }
131
132 std::unique_lock<pthread_mutex_t> mu2(s_futex_map_mutex);
133 if (--simu_futex.ref == 0) {
134 s_futex_map->erase(addr1);
135 }
136 mu2.unlock();
137 return nwakedup;
138 }
引用連接中的一段話“For reference, on my 4.0 SELinux test server with support for syscall auditing enabled, the minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is 2.7 usec, and the average is more like 10 usec. That can be a big drag on RockDB’s single-writer design.”眼虱,不過貌似mutex的實現(xiàn)也差不多喻奥,先看能不能獲取鎖,能的話直接返回捏悬,如果下面的結(jié)構(gòu):
struct mutex {
引用計數(shù)器
1: 鎖可以利用撞蚕。
小于等于0:該鎖已被獲取,需要等待
atomic_t count;
自旋鎖類型过牙,保證多cpu下甥厦,對等待隊列訪問是安全的。
spinlock_t wait_lock;
等待隊列寇钉,如果該鎖被獲取刀疙,任務(wù)將掛在此隊列上,等待調(diào)度扫倡。
struct list_head wait_list;
};
brpc中的timer也是一個線程來處理谦秧,但這里把每個timer散列到buckets中的一個,這樣降低鎖的競爭撵溃,這種設(shè)計思想在leveldb中的cache也有疚鲤。每個bucket有一把鎖。
41 struct BAIDU_CACHELINE_ALIGNMENT TimerThread::Task {
42 Task* next; // For linking tasks in a Bucket.
43 int64_t run_time; // run the task at this realtime
44 void (*fn)(void*); // the fn(arg) to run
45 void* arg;
46 // Current TaskId, checked against version in TimerThread::run to test
47 // if this task is unscheduled.
48 TaskId task_id;
49 // initial_version: not run yet
50 // initial_version + 1: running
51 // initial_version + 2: removed (also the version of next Task reused
52 // this struct)
53 butil::atomic<uint32_t> version;
54
55 Task() : version(2/*skip 0*/) {}
56
57 // Run this task and delete this struct.
58 // Returns true if fn(arg) did run.
59 bool run_and_delete();
60
61 // Delete this struct if this task was unscheduled.
62 // Returns true on deletion.
63 bool try_delete();
64 };
66 // Timer tasks are sharded into different Buckets to reduce contentions.
67 class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket {
68 public:
69 Bucket()
70 : _nearest_run_time(std::numeric_limits<int64_t>::max())
71 , _task_head(NULL) {
72 }
73
74 ~Bucket() {}
75
76 struct ScheduleResult {
77 TimerThread::TaskId task_id;
78 bool earlier;
79 };
80
81 // Schedule a task into this bucket.
82 // Returns the TaskId and if it has the nearest run time.
83 ScheduleResult schedule(void (*fn)(void*), void* arg,
84 const timespec& abstime);
85
86 // Pull all scheduled tasks.
87 // This function is called in timer thread.
88 Task* consume_tasks();
89
90 private:
91 internal::FastPthreadMutex _mutex;
92 int64_t _nearest_run_time;
93 Task* _task_head;
94 };
180 TimerThread::Bucket::ScheduleResult
181 TimerThread::Bucket::schedule(void (*fn)(void*), void* arg,
182 const timespec& abstime) {
183 butil::ResourceId<Task> slot_id;
184 Task* task = butil::get_resource<Task>(&slot_id);
185 if (task == NULL) {
186 ScheduleResult result = { INVALID_TASK_ID, false };
187 return result;
188 }
189 task->next = NULL;
190 task->fn = fn;
191 task->arg = arg;
192 task->run_time = butil::timespec_to_microseconds(abstime);
193 uint32_t version = task->version.load(butil::memory_order_relaxed);
194 if (version == 0) { // skip 0.
195 task->version.fetch_add(2, butil::memory_order_relaxed);
196 version = 2;
197 }
198 const TaskId id = make_task_id(slot_id, version);
199 task->task_id = id;
200 bool earlier = false;
201 {
202 BAIDU_SCOPED_LOCK(_mutex);
203 task->next = _task_head;
204 _task_head = task;
205 if (task->run_time < _nearest_run_time) {
206 _nearest_run_time = task->run_time;
207 earlier = true;
208 }
209 }
210 ScheduleResult result = { id, earlier };
211 return result;
212 }
以上是定位到某個bucket后缘挑,和bucket自己的_nearest_run_time
比較集歇,如果更早則earlier為true且之后會與timer中的_nearest_run_time
比較,并可能進行wait或wake:
214 TimerThread::TaskId TimerThread::schedule(
215 void (*fn)(void*), void* arg, const timespec& abstime) {
216 if (_stop.load(butil::memory_order_relaxed) || !_started) {
217 // Not add tasks when TimerThread is about to stop.
218 return INVALID_TASK_ID;
219 }
220 // Hashing by pthread id is better for cache locality.
221 const Bucket::ScheduleResult result =
222 _buckets[butil::fmix64(pthread_numeric_id()) % _options.num_buckets]
223 .schedule(fn, arg, abstime);//hash到某個bucket中
224 if (result.earlier) { //有更早的timer過來
225 bool earlier = false;
226 const int64_t run_time = butil::timespec_to_microseconds(abstime);
227 {
228 BAIDU_SCOPED_LOCK(_mutex);
229 if (run_time < _nearest_run_time) {//和全局的比較
230 _nearest_run_time = run_time;
231 ++_nsignals;
232 earlier = true; //需要喚醒
233 }
234 }
235 if (earlier) {
236 futex_wake_private(&_nsignals, 1);
237 }
238 }
239 return result.task_id;
240 }
如果多個工作線程調(diào)用timerThread->schedule(...)
添加定時任務(wù)時语淘,而且都比hash到某個bucket中的_nearest_run_time
更早诲宇,其實還是在228行有競爭。
timer線程的主要邏輯如下:
310 void TimerThread::run() {
319 // min heap of tasks (ordered by run_time)
320 std::vector<Task*> tasks;
321 tasks.reserve(4096);
339 while (!_stop.load(butil::memory_order_relaxed)) {
343 {
344 BAIDU_SCOPED_LOCK(_mutex);
345 _nearest_run_time = std::numeric_limits<int64_t>::max();//在這條語句之前惶翻,_nearest_run_time是所有最早的時間點
346 }
347
348 // 遍歷_buckets收集沒有被unscheduled的定時任務(wù)
349 for (size_t i = 0; i < _options.num_buckets; ++i) {
350 Bucket& bucket = _buckets[i];
351 for (Task* p = bucket.consume_tasks(); p != NULL;
352 p = p->next, ++nscheduled) {
353 if (!p->try_delete()) { // remove the task if it's unscheduled
354 tasks.push_back(p);
355 std::push_heap(tasks.begin(), tasks.end(), task_greater);
356 }
357 }
358 }//維護個堆結(jié)構(gòu)
360 bool pull_again = false;
361 while (!tasks.empty()) {
362 Task* task1 = tasks[0]; // the about-to-run task
363 if (task1->try_delete()) { // already unscheduled
364 std::pop_heap(tasks.begin(), tasks.end(), task_greater);
365 tasks.pop_back();
366 continue;
367 }
368 if (butil::gettimeofday_us() < task1->run_time) { // not ready yet.
369 break;
370 }
381 {
382 BAIDU_SCOPED_LOCK(_mutex);
383 if (task1->run_time > _nearest_run_time) {
384 // a task is earlier than task1. We need to check buckets.
385 pull_again = true; //在執(zhí)行任務(wù)的過程中又有更早的任務(wù)到來姑蓝,則會重新遍歷一次
386 break;
387 }
388 }
389 std::pop_heap(tasks.begin(), tasks.end(), task_greater);
390 tasks.pop_back();
391 if (task1->run_and_delete()) { //執(zhí)行定時任務(wù)
393 }
394 }
395 if (pull_again) {
397 continue;
398 }
400 // 算出需要睡眠的時間
401 int64_t next_run_time = std::numeric_limits<int64_t>::max();
402 if (tasks.empty()) {
403 next_run_time = std::numeric_limits<int64_t>::max();
404 } else {
405 next_run_time = tasks[0]->run_time;
406 }
411 int expected_nsignals = 0;
412 {
413 BAIDU_SCOPED_LOCK(_mutex);
414 if (next_run_time > _nearest_run_time) {//全局時間有變化,需要重新處理
415 // a task is earlier that what we would wait for.
416 // We need to check buckets.
417 continue;
418 } else {
419 _nearest_run_time = next_run_time;
420 expected_nsignals = _nsignals;
421 }
422 }
423 timespec* ptimeout = NULL;
424 timespec next_timeout = { 0, 0 };
425 const int64_t now = butil::gettimeofday_us();
426 if (next_run_time != std::numeric_limits<int64_t>::max()) {
427 next_timeout = butil::microseconds_to_timespec(next_run_time - now);
428 ptimeout = &next_timeout;
429 }
//睡眠
431 futex_wait_private(&_nsignals, expected_nsignals, ptimeout);
433 }
435 }
在run的過程中维贺,可能會有些變量會被其他線程修改它掂,這時需要重新處理溯泣,以免造成誤差,比如不能及時醒來客给。
當(dāng)_nsignals不等于expected_nsignals
時就不會去真正的pthread_cond_timedwait
靶剑,而_nsignals的改變只會在有更早的定時任務(wù)到來時才會喚醒。
有些注釋和原理可以看下源碼缎讼,這里只是貼上主要代碼并加一些自己的理解坑匠。覺得自己還是要多理解思考下上面代碼的實現(xiàn)厘灼。
參考資料
timer_keeping.md
rocksdb源碼分析 寫優(yōu)化之JoinBatchGroup
Linux Futex淺析
多線程編程的時候设凹,使用無鎖結(jié)構(gòu)會不會比有鎖結(jié)構(gòu)更加快?