Binlog 復(fù)制架構(gòu)
在Master開啟binlog后深胳,寫操作會記錄到binlog中所踊,Slave通過發(fā)送dump命令同步binlog。
Binlog 文件結(jié)構(gòu)
binlog二進制日志文件中記錄著每個寫操作事件,下面以MySQL 5.0.0+版本的日志進行介紹吆豹,對應(yīng)binlog版本4鱼的,該協(xié)議下增加了FORMAT_DESCRIPTION_EVENT事件。每個binlog日志文件固定4字節(jié)開頭:[ fe 'bin' ]痘煤;
第一個事件是FORMAT_DESCRIPTION_EVENT凑阶,描述了其他事件是如何布局,Slave在解析對應(yīng)事件時使用衷快。
最后一個事件是ROTATE_EVENT宙橱,記錄下一個binlog文件的信息。
事件類型
基于行復(fù)制的事件包括:
- TABLE_MAP_EVENT
- ROWS_EVENT
- DELETE_ROWS_EVENTv2
- UPDATE_ROWS_EVENTv2
- WRITE_ROWS_EVENTv2
每個插入、更新和刪除操作都會前綴一個TABLE_MAP_EVENT事件用于描述操作對應(yīng)的表信息养匈,2個連續(xù)事件通過table_id進行關(guān)聯(lián)哼勇。
table_id和表名并不是一一對應(yīng)都伪,table_id的作用只是在基于行復(fù)制的協(xié)議中用于關(guān)聯(lián)TABLE_MAP_EVENT和ROWS_EVENT
握手協(xié)議
MySQL提供了基于日志文件名-位置和GTID2種復(fù)制binlog的方式呕乎,分別對應(yīng)COM_BINLOG_DUMP和COM_BINLOG_DUMP_GTID事件。
在發(fā)送DUMP事件之前陨晶,Master需要對Slave進行權(quán)限認證猬仁。
- Slave連接到Master時,Master會發(fā)送handshark包對Slave進行認證;
- Slave收到handshark包后先誉,會將用戶名和密碼作為認證信息發(fā)送ahthentication包;
- Master驗證用戶名和密碼湿刽,如果認證通過,則發(fā)送OK_Packet褐耳,否則發(fā)送ERR_Packet诈闺。
Handshake包構(gòu)造如下:
Packet format:
Bytes Content
----- ----
1 protocol version (always 10)
n server version string, \0-terminated
4 thread id
8 first 8 bytes of the plugin provided data (scramble)
1 \0 byte, terminating the first part of a scramble
2 server capabilities (two lower bytes)
1 server character set
2 server status
2 server capabilities (two upper bytes)
1 length of the scramble
10 reserved, always 0
n rest of the plugin provided data (at least 12 bytes)
1 \0 byte, terminating the second part of a scramble
static bool send_server_handshake_packet(MPVIO_EXT *mpvio,
const char *data, uint data_len)
{
Protocol_classic *protocol= mpvio->protocol;
char *buff= (char *) my_alloca(1 + SERVER_VERSION_LENGTH + data_len + 64);
char scramble_buf[SCRAMBLE_LENGTH];
char *end= buff;
DBUG_ENTER("send_server_handshake_packet");
*end++= protocol_version;
protocol->set_client_capabilities(CLIENT_BASIC_FLAGS);
if (data_len)
{
mpvio->cached_server_packet.pkt= (char*) memdup_root(mpvio->mem_root,
data, data_len);
mpvio->cached_server_packet.pkt_len= data_len;
}
if (data_len < SCRAMBLE_LENGTH)
{
if (data_len)
{
/*
the first packet *must* have at least 20 bytes of a scramble.
if a plugin provided less, we pad it to 20 with zeros
*/
memcpy(scramble_buf, data, data_len);
memset(scramble_buf + data_len, 0, SCRAMBLE_LENGTH - data_len);
data= scramble_buf;
}
else
{
generate_user_salt(mpvio->scramble, SCRAMBLE_LENGTH + 1);
data= mpvio->scramble;
}
data_len= SCRAMBLE_LENGTH;
}
end= my_stpnmov(end, server_version, SERVER_VERSION_LENGTH) + 1;
DBUG_ASSERT(sizeof(my_thread_id) == 4);
int4store((uchar*) end, mpvio->thread_id);
end+= 4;
/* write server characteristics: up to 16 bytes allowed */
end[2]= (char) default_charset_info->number;
int2store(end + 3, mpvio->server_status[0]);
int2store(end + 5, protocol->get_client_capabilities() >> 16);
end[7]= data_len;
DBUG_EXECUTE_IF("poison_srv_handshake_scramble_len", end[7]= -100;);
memset(end + 8, 0, 10);
end+= 18;
/* write scramble tail */
end= (char*) memcpy(end, data + AUTH_PLUGIN_DATA_PART_1_LENGTH,
data_len - AUTH_PLUGIN_DATA_PART_1_LENGTH);
end+= data_len - AUTH_PLUGIN_DATA_PART_1_LENGTH;
end= strmake(end, plugin_name(mpvio->plugin)->str,
plugin_name(mpvio->plugin)->length);
int res= protocol->write((uchar*) buff, (size_t) (end - buff + 1)) ||
protocol->flush_net();
}
收到authentication包后,Master會解析出用戶名和密碼進行驗證铃芦。
static size_t parse_client_handshake_packet(MPVIO_EXT *mpvio,
uchar **buff, size_t pkt_len)
{
size_t user_len;
char *user= get_string(&end, &bytes_remaining_in_packet, &user_len);
size_t passwd_len= 0;
char *passwd= NULL;
passwd= get_length_encoded_string(&end, &bytes_remaining_in_packet,
&passwd_len);
if (passwd_len)
mpvio->auth_info.password_used= PASSWORD_USED_YES;
}
Dump命令解析
對于COM_BINLOG_DUMP命令雅镊,需要在之前發(fā)送COM_REGISTER_SLAVE進行注冊。
對于COM_BINLOG_DUMP_GTID命令刃滓,會根據(jù)該命令中g(shù)tidset字段從而定位起始發(fā)送日志位置仁烹。
Master收到命令后,會根據(jù)命令中flags字段是否設(shè)置BINLOG_DUMP_NON_BLOCK進行區(qū)分處理咧虎,未設(shè)置BINLOG_DUMP_NON_BLOCK的請求卓缰,會在binlog發(fā)送完成后,返回EOF_Packet砰诵,否則會一致阻塞等待下一個事件征唬。
bool com_binlog_dump_gtid(THD *thd, char *packet, size_t packet_length)
{
const uchar* packet_position= (uchar *) packet;
size_t packet_bytes_todo= packet_length;
Sid_map sid_map(NULL/*no sid_lock because this is a completely local object*/);
Gtid_set slave_gtid_executed(&sid_map);
thd->status_var.com_other++;
thd->enable_slow_log= opt_log_slow_admin_statements;
if (check_global_access(thd, REPL_SLAVE_ACL))
DBUG_RETURN(false);
//解析COM_BINLOG_DUMP_GTID https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
READ_INT(flags,2);
READ_INT(thd->server_id, 4);
READ_INT(name_size, 4);
READ_STRING(name, name_size, sizeof(name));
READ_INT(pos, 8);
DBUG_PRINT("info", ("pos=%llu flags=%d server_id=%d", pos, flags, thd->server_id));
READ_INT(data_size, 4);
CHECK_PACKET_SIZE(data_size);
if (slave_gtid_executed.add_gtid_encoding(packet_position, data_size) != //將包中內(nèi)容解析到slave_gtid_executed中interval
RETURN_STATUS_OK)
DBUG_RETURN(true);
slave_gtid_executed.to_string(>id_string); //解析為gtid_string
//T@2: | | | info: Slave 1828716545 requested to read at position 4 gtid set '075ca916-e025-11e9-bde7-bd71fea5404f:1'.
DBUG_PRINT("info", ("Slave %d requested to read %s at position %llu gtid set "
"'%s'.", thd->server_id, name, pos, gtid_string));
kill_zombie_dump_threads(thd);
query_logger.general_log_print(thd, thd->get_command(),
"Log: '%s' Pos: %llu GTIDs: '%s'",
name, pos, gtid_string);
my_free(gtid_string);
mysql_binlog_send(thd, name, (my_off_t) pos, &slave_gtid_executed, flags);
unregister_slave(thd, true, true/*need_lock_slave_list=true*/);
/* fake COM_QUIT -- if we get here, the thread needs to terminate */
DBUG_RETURN(true);
}
DUMP_GTID命令中slave_gtid_executed表示Slave已經(jīng)執(zhí)行過的事件集合,mysql_binlog_send函數(shù)中會根據(jù)該集合確定發(fā)送binlog的起點茁彭。
日志發(fā)送
發(fā)送日志邏輯在單獨的線程Binlog_sender中進行总寒,邏輯如下:
- 校驗slave_gtid_executed是否合法,定位第一個發(fā)送文件名尉间;
- 發(fā)送偽造的rotate_event事件偿乖,打開第一個發(fā)送文件名;
- 依次發(fā)送每個文件哲嘲。
void run()
{
init();
while (!has_error() && !m_thd->killed)
{
if (unlikely(fake_rotate_event(log_file, start_pos)))
break;
file= open_binlog_file(&log_cache, log_file, &m_errmsg); //根據(jù)文件名打開文件
if (send_binlog(&log_cache, start_pos)) //發(fā)送一個文件,返回0表示讀完了贪薪,即log_pos == end_pos,然后開始下一個文件
break;
/* Will go to next file, need to copy log file name */
set_last_file(log_file);
int error= mysql_bin_log.find_next_log(&m_linfo, 0); //定位下一個文件
}