十九:從庫MTS多線程并行回放(一)(筆記)

一、分發(fā)調(diào)用流程

->ev->apply_event(rli); Log_event::apply_event 這里如果是非MTS進行應用 如果MTS  如果是GTID event 進行WORKER線程的分配 胞谭,如果不是則獲取WORKER線程
      
      -> 是否是進行 MTS recovery if (rli->is_mts_recovery())
         根據(jù) bitmap 設置進行跳過處理 
         
          if (rli->is_mts_recovery())//如果是恢復 這個地方就是前面恢復掃描出來的位置
               {
                 bool skip=
                   bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) &&
                   (get_mts_execution_mode(::server_id,
                                           rli->mts_group_status ==
                                           Relay_log_info::MTS_IN_GROUP,
                                           rli->current_mts_submode->get_type() ==
                                           MTS_PARALLEL_TYPE_DB_NAME)
                    == EVENT_EXEC_PARALLEL);
                 if (skip)
                 {
                   DBUG_RETURN(0);
                 }
                 else
                 {
                   DBUG_RETURN(do_apply_event(rli));
                 }
               }
         
         
      -> 如果是單線程直接調(diào)用 do_apply_event
      
      -> 如果是多線程MTS !Lバ怼!!9家ぁ9呈觥!D滤椤Q揽薄!K鳌7矫妗!I恰9Ы稹!
        ->Log_event::get_slave_worker 主要是根據(jù)不同的EVENT進行不同的操作 包含1褂策、判定是否可以并發(fā) 2横腿、判定由哪一個worker進行執(zhí)行
          ->如果是GTID event !U夼唷!匿名GTID Event也可以
             ->is_gtid_event
           ->初始化一個組 Slave_job_group
           ->在GAQ中分配隊列序號
           ->rli->mts_groups_assigned++ 增加
           ->使用GTID-event的位置和mts_groups_assigned將GROUP中的master_log_pos位置 和 total_seqno初始化
           ->將GROUP加到 GAQ并且分配的 序號 gaq->assigned_group_index= gaq->en_queue(&group);
           ->初始化一個Slave_job_item 
           ->加入到rli->curr_group_da.push_back中
           ->進行GTID 模式下 判定是否可以并發(fā)
            ->schedule_next_event
              ->Mts_submode_logical_clock::schedule_next_event 基于COMMIT_ORDER和WRITE SET的都使用這個方法 
                主要判斷是否可以進行并發(fā)并且進行等待
               ->獲取GTID EVENT中的last commit和seq number
               ->如果不能進行并發(fā)則需要等待last commit > LWM SEQ NUMBER(最新一次除沒有提交事物之前的一個事物的seq number)
                 ->wait_for_last_committed_trx 進入等待 他會設置一個min_waited_timestamp 作為
                   其他事物提交時更新LWM SEQ NUMBER的標記邢锯,等待直到last commit<=LWM SEQ NUMBER
                   等待標記為
                   stage_worker_waiting_for_commit_parent 
                   Waiting for dependent transaction to commit
                   
                   同時還會更新 mts_total_wait_overlap
                   my_atomic_add64(&rli->mts_total_wait_overlap, diff_timespec(&ts[1], &ts[0]));
                  

                    獲取 LWM SEQ NUMBER 的源碼注釋: 
                                   
                                  the last time index containg lwm
                                      +------+
                                      | LWM  |
                                      |  |   |
                                      V  V   V
                       GAQ:x  xoooooxxxxxXXXXX...X
                                    ^   ^
                                    |   | LWM+1
                                    |
                                    +- tne new current_lwm
                    
                             <---- logical (commit) time ----
                             
                       here `x' stands for committed, `X' for committed and discarded from
                       the running range of the queue, `o' for not committed.
                         
               
               
               
         ->如果 query event Q锶铩!丹擎!
           ->初始化一個Slave_job_item 
           ->將其加入到rli->curr_group_da.push_back(job_item);中
           ->設置 rli->curr_group_seen_begin= true; 說明找到了query event
           ->進行DATABASE模式的分配 不考慮
           
         ->如果是MAP EVENT
           ->開始獲取WORKER線程到這里已經(jīng)可以并發(fā)執(zhí)行了尾抑,需要進行WORKER線程的獲取
            ret_worker=rli->current_mts_submode->get_least_occupied_worker(rli, &rli->workers,this); 
            Mts_submode_logical_clock::get_least_occupied_worker
            -> 第一次rli->last_assigned_worker為空 這需要新分配
               -> Mts_submode_logical_clock::get_free_worker 進行分配
                  ->循環(huán)每一個worker線程,看是否有正在等待處理的event蒂培,找到一個沒有任何工作的worker線程
                    這里也能出是輪詢每一個worker線程找到空閑的worker線程就可以了再愈。判斷標準就是
                     if (w_i->jobs.len == 0)
                                     
                  -> 如果沒有找到,分配失敗护戳,進行等待等待為
                    stage_slave_waiting_for_workers_to_process_queue
                    Waiting for slave workers to process their queues
                  -> 循環(huán)獲取work線程翎冲,直到成功
                  -> 獲取成功后更新信息
                     等待的時間:rli->mts_total_wait_worker_avail += diff_timespec(&ts[1], &ts[0]);
                     增加一次等待次數(shù):rli->mts_wq_no_underrun_cnt++;
                  ->如果開啟了參數(shù) slave_preserve_commit_order=1 注冊事物
                    rli->get_commit_order_manager()->register_trx(worker);
                  ->ptr_group->worker_id= ret_worker->id;//設置本次事物組的worker_id 就是分配的工作線程
           ->伴隨著Woker線程的分配,如果是開啟了參數(shù)slave_preserve_commit_order需要注冊這個事務
             if (rli->get_commit_order_manager() != NULL && worker != NULL)
               rli->get_commit_order_manager()->register_trx(worker);//注冊事物

        ->如果是DEL event
             步驟同上 只是不需要分配work線程了因為已經(jīng)分配了
        ->如果是XID event
             步驟同上 不過還需要更新group 的checkpoint信息 如下:
               if (!ret_worker->checkpoint_notified) //將GROUP中填寫 checkpoint信息
               {
                 if (!ptr_group)
                   ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index);
                 ptr_group->checkpoint_log_name= 
                   my_strdup(key_memory_log_event, rli->get_group_master_log_name(), MYF(MY_WME));
                 ptr_group->checkpoint_log_pos= rli->get_group_master_log_pos();
                 ptr_group->checkpoint_relay_log_name=
                   my_strdup(key_memory_log_event, rli->get_group_relay_log_name(), MYF(MY_WME));
                 ptr_group->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
                 ptr_group->shifted= ret_worker->bitmap_shifted; //checkpoint 后 移動的個數(shù) 用于后面提交的時候改變參考Slave_worker::commit_positions    設置參考mts_checkpoint_routine()
                 ret_worker->bitmap_shifted= 0;//重置移動量
                 ret_worker->checkpoint_notified= TRUE;
               }
               ptr_group->checkpoint_seqno= rli->checkpoint_seqno; //獲取seqno 這個值會在chkpt后減去偏移量
               ptr_group->ts= common_header->when.tv_sec + (time_t) exec_time; // Seconds_behind_master related  //checkpoint的時候會將這個值再次傳遞 mts_checkpoint_routine()
               rli->checkpoint_seqno++;//增加seqno     
               到這里 Log_event::get_slave_worker 每個event的處理流程完成媳荒,每次都會回到
               Log_event::apply_event
       ->Log_event::apply_event 返回到 apply_event_and_update_pos      
    ->回到apply_event_and_update_pos 下面邏輯MTS才進行 也就是入隊到woker中去
      開始進入worker 隊列抗悍,GTID和QUERY EVNET會跟隨 MAP EVENT一起進入隊列加入了li->curr_group_da中
      初始化map event的Slave_job_item 
      設置ev屬于在GAP中的位置 ev->mts_group_idx= rli->gaq->assigned_group_index;
      如果是map event的話還會幫助GTID和QUERY event入隊
      然后自己入隊(append_item_to_jobs(job_item, w, rli))
      
      其他event 比如delete event和xid event則自己調(diào)用(append_item_to_jobs(job_item, w, rli))
      入隊
      -> append_item_to_jobs(job_item, w, rli)
        ->如果入隊的event 因為worker線程的隊列已經(jīng)滿了則等待:
          進入狀態(tài)stage_slave_waiting_worker_queue
          Waiting for Slave Worker queue

          wroker隊列的大小為:mts_slave_worker_queue_len_max= 16384;
          每次等待增加一次
          worker->jobs.overfill= TRUE;
          worker->jobs.waited_overfill++;
          rli->mts_wq_overfill_cnt++;
      
      
      
      
      (rli->is_parallel_exec() && rli->mts_events_assigned % 1024 == 1) 
      如果每個event的前面的操作操作120秒 則會出現(xiàn)通知 這個警告經(jīng)常遇到:
      從上面我們看到的等待來講超過120秒的可能有3種
      
      1、由于上一組并發(fā)有大事物沒有提交
         導致不能并發(fā)worker線程的等待時間
      2钳枕、worker線程都在完成工作及在應用上一個事物的event缴渊,沒有新的worker線程以供新分配
      3、worker線程已經(jīng)分配鱼炒,但是由于worker線程的分配隊列為16384衔沼,如果應用比較慢則可能入不了
         分配隊列,一般也是大事物造成的。
      
      
      
sql_print_information("Multi-threaded slave statistics%s: "
                "seconds elapsed = %lu; "
                "events assigned = %llu; "
                "worker queues filled over overrun level = %lu; "
                "waited due a Worker queue full = %lu; "
                "waited due the total size = %lu; "
                "waited at clock conflicts = %llu "
                "waited (count) when Workers occupied = %lu "
                "waited when Workers occupied = %llu",
                rli->get_for_channel_str(),
                static_cast<unsigned long>
                (my_now - rli->mts_last_online_stat),//消耗總時間 單位秒
                rli->mts_events_assigned,//總的event分配的個數(shù)
                rli->mts_wq_overrun_cnt,// worker線程分配隊列大于 90%的次數(shù) 當前硬編碼  14746
                rli->mts_wq_overfill_cnt,    //由于work 分配隊列已滿造成的等待次數(shù) 當前硬編碼 16384
                rli->wq_size_waits_cnt, //大Event的個數(shù) 一般不會存在
                rli->mts_total_wait_overlap,//由于上一組并行有大事物沒有提交導致不能分配worker線程的等待時間 單位納秒
                rli->mts_wq_no_underrun_cnt, //work線程由于沒有空閑的而等待的次數(shù)
                rli->mts_total_wait_worker_avail);//work線程由于沒有空閑的而等待的時間   單位納秒
                                
    ->回到apply_event_and_update_pos 下面 進行pos的更新 這個pos是 event_relay_log_pos ,不會出現(xiàn)在show slave或者其他地方  
      更新內(nèi)部變量讀取到的relay log位置和名字 這個不用于外部訪問
         uint event_relay_log_number; 這兩個是正在執(zhí)行的relay log的位置
         ulonglong event_relay_log_pos;                        

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末指蚁,一起剝皮案震驚了整個濱河市菩佑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌欣舵,老刑警劉巖擎鸠,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異缘圈,居然都是意外死亡劣光,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門糟把,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绢涡,“玉大人,你說我怎么就攤上這事遣疯⌒劭桑” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵缠犀,是天一觀的道長数苫。 經(jīng)常有香客問我,道長辨液,這世上最難降的妖魔是什么虐急? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮滔迈,結果婚禮上止吁,老公的妹妹穿的比我還像新娘。我一直安慰自己燎悍,他們只是感情好敬惦,可當我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谈山,像睡著了一般俄删。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上奏路,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天抗蠢,我揣著相機與錄音,去河邊找鬼思劳。 笑死迅矛,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的潜叛。 我是一名探鬼主播秽褒,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼壶硅,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了销斟?” 一聲冷哼從身側響起庐椒,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蚂踊,沒想到半個月后约谈,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡犁钟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年棱诱,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涝动。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡迈勋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出醋粟,到底是詐尸還是另有隱情靡菇,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布米愿,位于F島的核電站厦凤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏育苟。R本人自食惡果不足惜较鼓,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望宙搬。 院中可真熱鬧笨腥,春花似錦拓哺、人聲如沸勇垛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽闲孤。三九已至,卻和暖如春烤礁,著一層夾襖步出監(jiān)牢的瞬間讼积,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工脚仔, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留勤众,地道東北人。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓鲤脏,卻偏偏與公主長得像们颜,于是被迫代替她去往敵國和親吕朵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容