0. 大綱
- 引言
- ScyllaDB架構(gòu)思想篇
- ScyllaDB整體流程分析
a) 了解源碼結(jié)構(gòu)
b) 快速弄清流程
c) 確立組件依賴
一. 引言
上節(jié)介紹了ScyllaDB的背景及應(yīng)用場(chǎng)景之后,讓我們來單獨(dú)更多的了解ScyllaDB剑勾。
首先ScyllaDB是個(gè)強(qiáng)p2p數(shù)據(jù)庫(kù)隔披,ElasticSearch以及Kafka是個(gè)簡(jiǎn)化版的p2p數(shù)據(jù)庫(kù),由于它們會(huì)首先選舉出Controller進(jìn)行代理操作。而ScyllaDB不一樣,它采用了基于Ring的操作方式,更加去中心化山憨,客戶端可以連接任意節(jié)點(diǎn)作為集群Coordinator節(jié)點(diǎn)代理接口服務(wù)。當(dāng)然有人會(huì)質(zhì)疑kafka究竟是不是數(shù)據(jù)庫(kù)弥喉,kafka有事務(wù)郁竟,并且官方的目標(biāo)也是構(gòu)造實(shí)時(shí)計(jì)算的數(shù)據(jù)庫(kù)基礎(chǔ)設(shè)施。
p2p數(shù)據(jù)庫(kù)有什么優(yōu)點(diǎn)與缺點(diǎn)呢由境?
由于p2p數(shù)據(jù)庫(kù)是沒有主節(jié)點(diǎn)的棚亩,所以擴(kuò)容是非常方便的,使得運(yùn)維成本大大降低虏杰。只要指定seed節(jié)點(diǎn)讥蟆,分分鐘擴(kuò)容操作即可自動(dòng)完成。seed節(jié)點(diǎn)相當(dāng)于我們的穩(wěn)定代理節(jié)點(diǎn)纺阔,相當(dāng)于整個(gè)集群的地址薄管理處瘸彤,僅作為整個(gè)集群的元數(shù)據(jù)代理入口。
但是另一方面州弟,由于沒有主節(jié)點(diǎn)钧栖,所有的操作必須由選舉的Controller代理進(jìn)行操作(去中心化程度低)低零,或者通過共識(shí)協(xié)議大家商量一起處理(去中心化程度高)婆翔。ScyllaDB屬于后一種情況。這樣就會(huì)導(dǎo)致另一個(gè)問題掏婶,集體成員越多啃奴,進(jìn)行協(xié)商的網(wǎng)絡(luò)效應(yīng)越強(qiáng),使得網(wǎng)絡(luò)產(chǎn)生一定量的延遲雄妥。所以這種共識(shí)機(jī)制往往節(jié)點(diǎn)數(shù)不能過多最蕾,只能維持在上千個(gè)節(jié)點(diǎn)左右依溯。如果要上萬,那么只能進(jìn)行委托進(jìn)行超級(jí)節(jié)點(diǎn)的選舉瘟则。前一種去中心化程度低情況相當(dāng)于超級(jí)節(jié)點(diǎn)為1的選舉黎炉。
p2p特性使得一個(gè)數(shù)據(jù)庫(kù)非常容易維護(hù)。而ScyllaDB不只做了這些醋拧。ScyllaDB強(qiáng)大的特性太多了慷嗜。
ScyllaDB有著強(qiáng)大的服務(wù)接口,reset api, 兼容Cassandra類SQL的CQL查詢丹壕,跨語言的thrift服務(wù)接口庆械,以及兼容DynamoDB的服務(wù)接口。
ScyllaDB有著強(qiáng)大的存儲(chǔ)模型菌赖。內(nèi)存表Memtable缭乘,物理表SSTable,恢復(fù)日志CommitLog,刪除墓碑Tombstones這些都是基本必備琉用。分布式協(xié)調(diào)功能也不可少堕绩,它還有Snitch數(shù)據(jù)網(wǎng)絡(luò)架構(gòu)分布, VNode動(dòng)態(tài)節(jié)點(diǎn)分布, Partitioner基礎(chǔ)分片功能. 然后它還加入了Hinted Hanfdoff, Anti-Entrop, Read Repair, batchlog以及Throttle限流, 使得數(shù)據(jù)一致性及穩(wěn)定性大為增強(qiáng),運(yùn)維自動(dòng)化程度極其高效邑时。甚至似逛尚,它還引入了lwt輕量級(jí)事務(wù)功能,甚至它還允許持久化cache進(jìn)行預(yù)熱刁愿,簡(jiǎn)直是母親版的呵護(hù)绰寞,讓人不禁有十一分的感動(dòng)!
同時(shí)铣口,在此之上ScyllaDB還有強(qiáng)大的數(shù)據(jù)模型滤钱,半結(jié)構(gòu)化支持非常好。有set, list, map脑题,甚至允許自定義數(shù)據(jù)類型件缸,類似于struct(map的固態(tài)模式化版本)。物化視圖功能的引入對(duì)于二級(jí)索引也是非常大的增強(qiáng)叔遂。并且對(duì)于數(shù)據(jù)的時(shí)效性他炊,有ttl時(shí)間周期的支持,非常好用已艰。
當(dāng)然這都是細(xì)節(jié)痊末,在此之上,還有更強(qiáng)大的分區(qū)功能哩掺,寬表功能(基于partition key跟cluster key)凿叠。真的是太強(qiáng)大了,自己慢慢體會(huì)。
對(duì)于集群的運(yùn)行情況盒件,必不可以的就是監(jiān)控功能了蹬碧!ScyllaDB也是非常鍵全的,提供了JMX與prometheus接口炒刁。JMX接口是異常強(qiáng)大恩沽,包羅萬象,甚至可以回調(diào)翔始,實(shí)時(shí)修改參數(shù)飒筑。而基于prometheus之上擔(dān)任了scylla-monitoring,通過Grafana十分友好的展示著數(shù)據(jù)的運(yùn)行情況绽昏。
二. ScyllaDB架構(gòu)思想篇
ScyllDB的架構(gòu)由很多服務(wù)組成协屡。
- 啟動(dòng)時(shí),加載配置信息全谤,得到網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)Snitch肤晓。
接著,加載memtable, sstable, commitlog物理存儲(chǔ)后啟動(dòng)Storage Engine认然。 - 通過gossip p2p協(xié)議加入節(jié)點(diǎn)到集群中补憾。
- 通過物理存儲(chǔ)建立數(shù)據(jù)模型,生成各種系統(tǒng)keyspaces并進(jìn)行commit log的數(shù)據(jù)恢復(fù)操作卷员,在此之上提供query processor查詢引擎盈匾。
- 啟動(dòng)分布式基礎(chǔ)RPC服務(wù), message service, storage proxy, storage service。
- 啟動(dòng)外部接口服務(wù), rest api, thrift, cql, alternator毕骡。
我們這里僅講解cql服務(wù)削饵。
1) 當(dāng)用戶調(diào)用cql服務(wù)的時(shí)候,cql調(diào)用storage service進(jìn)行處理未巫。
storage service則根據(jù)操作類型窿撬,選擇本地操作或者分布式操作。如果是分布式操作則通過storage proxy分發(fā)到其它機(jī)器叙凡。
- 這里的storage proxy是走的message service 協(xié)議劈伴,這里我們后面會(huì)講到, storage proxy是message service協(xié)議里面的一個(gè)handler,還有其它服務(wù)handler。
- 當(dāng)其它機(jī)器接受到message service之后握爷,調(diào)用storage proxy handler進(jìn)行處理跛璧,再進(jìn)一步將控制權(quán)交回了其它機(jī)器的storage service。這里就相當(dāng)了一個(gè)完美的閉環(huán)了新啼。
- 在整個(gè)分布式協(xié)調(diào)過程中追城,所有的操作處理最終由底層的Storage Engine完成。
所以基本上整個(gè)流程是:
cql -> storage service -> storage proxy -> message service(rpc) -> storage proxy -> storage service -> storage engine
現(xiàn)在分布式基礎(chǔ)架構(gòu)這里就介紹完了师抄,然后我們來大致過一下源碼漓柑。我們這一節(jié)主要集中在分布式服務(wù)這一塊,后面的Storage Engine需要花更多的篇章完成叨吮,敬請(qǐng)期待辆布。
三. ScyllaDB整體流程分析
如何去快速了解一下源碼呢?這里我們開始進(jìn)行實(shí)踐茶鉴。
首先我們要了解架構(gòu)思想锋玲,前面一個(gè)章節(jié)已經(jīng)講過了,也許你了解得沒有我多涵叮,這個(gè)不是主要問題惭蹂,后面可以慢慢補(bǔ)充。一般這個(gè)周期主要是系統(tǒng)性看書看文檔割粮,在兩個(gè)星期至一個(gè)月左右盾碗。當(dāng)然如果有老司機(jī)帶隊(duì),就可以完全加速了舀瓢。對(duì)于新人的話廷雅,時(shí)間久點(diǎn)也沒關(guān)系,這里只是給出基本目標(biāo)作為參考京髓。
那我們從哪里開始呢航缀?從功能交互開始是最為方便的。
1. 了解源碼
我們知道ScyllaDB的CQL服務(wù)端口是9042, 在配置文件里面進(jìn)行配置:
- conf/scylla.yaml
# port for the CQL native transport to listen for clients on
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
native_transport_port: 9042
那么我們對(duì)源碼進(jìn)行搜索
larrys-MacBook-Pro:scylla larluo$ find . -name "*.cc" | grep -v '/tests/' | xargs -I {} grep -nH native_transport_port {}
./db/config.cc:500: "Enable or disable the native transport server. Uses the same address as the rpc_address, but the port is different from the rpc_port. See native_transport_port.")
./db/config.cc:501: , native_transport_port(this, "native_transport_port", value_status::Used, 9042,
./db/config.cc:503: , native_transport_port_ssl(this, "native_transport_port_ssl", value_status::Used, 9142,
./db/config.cc:505: "Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption"
./db/config.cc:506: "for native_transport_port. Setting native_transport_port_ssl to a different value"
./db/config.cc:507: "from native_transport_port will use encryption for native_transport_port_ssl while"
./db/config.cc:508: "keeping native_transport_port unencrypted")
./db/config.cc:753: native_transport_port.add_command_line_option(init, "cql-port", "alias for 'native-transport-port'");
./service/storage_service.cc:2294: std::vector<listen_cfg> configs({ { socket_address{ip, cfg.native_transport_port()} }});
./service/storage_service.cc:2318: if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) {
./service/storage_service.cc:2319: configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, std::move(cred)});
由此可見這個(gè)服務(wù)是在 service/storage_service.cc 里面啟動(dòng).
進(jìn)一看法進(jìn)行追查可以看到來自于storage_service::start_native_transport:
future<> storage_service::start_native_transport() {
...
...
return f.then([cserver, configs = std::move(configs), keepalive] {
return parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
slogger.info("Starting listening for CQL clients on {} ({})"
, cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
);
});
});
}
...
...
最終可以發(fā)現(xiàn)堰怨,storage service啟動(dòng)了cql_transport::cql_server::listen進(jìn)行服務(wù)監(jiān)聽芥玉。
現(xiàn)在我們有了目標(biāo),就是storage service备图,并且我們從架構(gòu)上也知道了它的功能灿巧。那么我們接著就要看它是如何啟動(dòng)的。
因?yàn)閱?dòng)過程細(xì)節(jié)比較多揽涮,所以我們主要依據(jù)兩條線索砸烦,查看前30名的文件名確立重要模塊專攻點(diǎn),以及日志追蹤绞吁。
larrys-MacBook-Pro:scylla larluo$ find . -name "*.cc" | grep -vE '/tests/|/api/|/alternator/|/thrift/' | xargs -I {} wc -l {} | sort -nr | head -30
3820 ./types.cc
3789 ./service/storage_proxy.cc
3558 ./service/storage_service.cc
3433 ./sstables/sstables.cc
2905 ./db/schema_tables.cc
2592 ./mutation_partition.cc
2568 ./table.cc
2562 ./repair/row_level.cc
2331 ./utils/logalloc.cc
2319 ./gms/gossiper.cc
2181 ./db/commitlog/commitlog.cc
2109 ./db/system_keyspace.cc
1993 ./database.cc
1818 ./db/view/view.cc
1755 ./mutation_reader.cc
1641 ./transport/server.cc
1621 ./cql3/statements/select_statement.cc
1470 ./repair/repair.cc
1442 ./sstables/mc/writer.cc
1332 ./row_cache.cc
1319 ./schema.cc
1223 ./message/messaging_service.cc
1167 ./main.cc
1079 ./service/migration_manager.cc
998 ./db/hints/manager.cc
981 ./sstables/compaction.cc
979 ./flat_mutation_reader.cc
962 ./cql3/restrictions/statement_restrictions.cc
905 ./sstables/compaction_strategy.cc
867 ./sstables/compaction_manager.cc
這里我們已經(jīng)排除掉了api, alternator(DynamoDB), thrift服務(wù)接口幢痘。
主要可以歸類為四部分:
第一部分是基礎(chǔ)模塊
第一部分是網(wǎng)絡(luò)模塊
第二部分是服務(wù)模塊
第三部分是存儲(chǔ)模塊
基礎(chǔ)模塊有:
3820 ./types.cc
2331 ./utils/logalloc.cc
網(wǎng)絡(luò)模塊有:
2319 ./gms/gossiper.cc
服務(wù)模塊有:
3789 ./service/storage_proxy.cc
3558 ./service/storage_service.cc
1641 ./transport/server.cc
1621 ./cql3/statements/select_statement.cc
1223 ./message/messaging_service.cc
1167 ./main.cc
1079 ./service/migration_manager.cc
962 ./cql3/restrictions/statement_restrictions.cc
存儲(chǔ)模塊有:
3433 ./sstables/sstables.cc
2905 ./db/schema_tables.cc
2592 ./mutation_partition.cc
2568 ./table.cc
2562 ./repair/row_level.cc
2181 ./db/commitlog/commitlog.cc
2109 ./db/system_keyspace.cc
1993 ./database.cc
1818 ./db/view/view.cc
1755 ./mutation_reader.cc
1470 ./repair/repair.cc
1442 ./sstables/mc/writer.cc
1332 ./row_cache.cc
1319 ./schema.cc
998 ./db/hints/manager.cc
981 ./sstables/compaction.cc
979 ./flat_mutation_reader.cc
905 ./sstables/compaction_strategy.cc
867 ./sstables/compaction_manager.cc
基礎(chǔ)模塊屬于源碼細(xì)節(jié),我們暫且不過多關(guān)注家破,需要時(shí)再去了解颜说。
- 網(wǎng)絡(luò)模塊比較底層,我們后面再來分析汰聋,這里代碼量不多是因?yàn)橹饕δ芏际窃诨A(chǔ)庫(kù)中實(shí)現(xiàn)门粪,復(fù)雜度不一定低。
- 服務(wù)模塊跟分布式架構(gòu)比較密切烹困。也是我們這章的關(guān)注點(diǎn)玄妈。
./main.cc為程序啟動(dòng)入口
./service/storage_proxy.cc, ./service/storage_service.cc這兩個(gè)已經(jīng)提到比較多了,屬于核心模塊.
./message/messaging_service.cc 前面提到過,屬于不同節(jié)點(diǎn)的通訊接口拟蜻,被storage_proxy使用绎签。
./transport/server.cc 里面包含了我們的cql_server外部接口實(shí)現(xiàn),用于處理用戶交互協(xié)議
./cql3/statements/select_statement.cc 里面包含cql解析引擎,也是數(shù)據(jù)庫(kù)查詢引擎的核心技術(shù)酝锅。
./cql3/restrictions/statement_restrictions.cc 里面包含了cql解析引擎之后的存儲(chǔ)查詢轉(zhuǎn)換诡必,相當(dāng)于將前面解析的cql轉(zhuǎn)化為執(zhí)行計(jì)劃的過程。 - 存儲(chǔ)模塊是數(shù)據(jù)庫(kù)的核心搔扁。所以占比非常大爸舒,內(nèi)容非常多,我們第一步先不分析它稿蹲,后面進(jìn)行拆解分析扭勉。這里唯一需要了解的地方是./database.cc 代表了系統(tǒng)架構(gòu)中的Storage Engine角色,是整個(gè)存儲(chǔ)模塊的入口點(diǎn)苛聘。
2. 快速弄清流程
接著進(jìn)入日志追蹤階段涂炎,也就是流程梳理階段。
有條件的可以運(yùn)行集群并查看日志焰盗,或者從生產(chǎn)環(huán)境拿日志璧尸。
這里我為了加強(qiáng)功力,采用零依賴手撕源碼熬拒。有條件的可以用日志來輔助分析爷光。
那么現(xiàn)在就從./main.cc開始,首先查看打印消息澎粟。
代碼打印消息是非常要用的蛀序,切記,也是運(yùn)維排錯(cuò)的第一入手點(diǎn)活烙。
int main(int ac, char** av) {
print_starting_message(ac, av, parsed_opts);
...
return app.run(ac, av, [&] () -> future<int> {
...
return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator,
&feature_service] {
...
startlog.info("Scylla version {} starting.", scylla_version());
...
supervisor::notify("starting prometheus API server");
...
supervisor::notify("creating tracing");
...
supervisor::notify("creating snitch");
...
supervisor::notify("determining DNS name");
...
supervisor::notify("starting API server");
...
supervisor::notify("initializing storage service");
...
supervisor::notify("starting per-shard database core");
...
supervisor::notify("creating data directories");
...
supervisor::notify("creating commitlog directory");
...
supervisor::notify("creating hints directories");
...
supervisor::notify("verifying directories");
...
supervisor::notify("starting gossip");
...
supervisor::notify("starting storage proxy");
...
supervisor::notify("starting migration manager");
...
supervisor::notify("starting query processor");
...
supervisor::notify("initializing batchlog manager");
...
supervisor::notify("loading system sstables");
...
supervisor::notify("loading non-system sstables");
...
supervisor::notify("starting view update generator");
...
supervisor::notify("discovering staging sstables");
...
supervisor::notify("setting up system keyspace");
...
supervisor::notify("starting commit log");
...
supervisor::notify("replaying commit log");
...
supervisor::notify("replaying commit log - flushing memtables");
...
supervisor::notify("replaying commit log - removing old commitlog segments");
...
supervisor::notify("initializing migration manager RPC verbs");
...
supervisor::notify("initializing storage proxy RPC verbs");
...
supervisor::notify("starting streaming service");
...
supervisor::notify("starting hinted handoff manager");
...
supervisor::notify("starting messaging service");
...
supervisor::notify("starting storage service", true);
...
supervisor::notify("starting batchlog manager");
...
supervisor::notify("starting load broadcaster");
...
supervisor::notify("starting cf cache hit rate calculator");
...
supervisor::notify("starting view update backlog broker");
...
supervisor::notify("allow replaying hints");
...
supervisor::notify("starting native transport");
...
supervisor::notify("serving");
...
startlog.info("Scylla version {} initialization completed.", scylla_version());
...
startlog.info("Scylla version {} shutdown complete.", scylla_version());
}
}
}
這里面內(nèi)容不算多徐裸,我們快速過一遍,我們可以看到:
- 啟動(dòng)prometheus監(jiān)控及tracing啸盏。
supervisor::notify("starting prometheus API server");
(void)prometheus::start(prometheus_server, pctx);
supervisor::notify("creating tracing");
tracing::tracing::create_tracing(tracing_backend_registry, "trace_keyspace_helper").get();
- 加載網(wǎng)絡(luò)信息snitch
supervisor::notify("creating snitch");
i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
supervisor::notify("determining DNS name");
return gms::inet_address::lookup(api_address, family, preferred).get0();
- 啟動(dòng)api服務(wù)
API服務(wù)是先創(chuàng)建重贺,最后整個(gè)流程完成后開啟。
supervisor::notify("starting API server");
with_scheduling_group(maintenance_scheduling_group, [&] {
return ctx.http_server.listen(socket_address{ip, api_port});
}).get();
startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port);
...
api::set_server_done(ctx).get();
supervisor::notify("serving");
- 構(gòu)建storage service
supervisor::notify("initializing storage service");
(void)init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg);
- 構(gòu)建Storage Engine
supervisor::notify("starting per-shard database core");
db.start(std::ref(*cfg), dbcfg).get();
supervisor::notify("creating data directories");
dirs.touch_and_lock(db.local().get_config().data_file_directories()).get();
supervisor::notify("creating commitlog directory");
dirs.touch_and_lock(db.local().get_config().commitlog_directory()).get();
supervisor::notify("creating hints directories");
dirs.touch_and_lock(db.local().get_config().hints_directory()).get();
supervisor::notify("verifying directories");
- 構(gòu)建message service以及gossip組件
supervisor::notify("starting gossip");
init_ms_fd_gossiper(gossiper
, feature_service
, *cfg
, listen_address
, storage_port
, ssl_storage_port
, tcp_nodelay_inter_dc
, encrypt_what
, trust_store
, cert
, key
, prio
, clauth
, cfg->internode_compression()
, seed_provider
, memory::stats().total_memory()
, scfg
, cluster_name
, phi
, cfg->listen_on_broadcast_address());
- 構(gòu)建storage proxy
supervisor::notify("starting storage proxy");
proxy.start(std::ref(db), spcfg, std::ref(node_backlog)).get();
- 構(gòu)建migration manager
supervisor::notify("starting migration manager");
mm.start().get();
- 構(gòu)建batchlog
supervisor::notify("initializing batchlog manager");
db::get_batchlog_manager().start(std::ref(qp), bm_cfg).get();
- 加載數(shù)據(jù)模型
supervisor::notify("loading system sstables");
distributed_loader::ensure_system_table_directories(db).get();
supervisor::notify("loading non-system sstables");
distributed_loader::init_non_system_keyspaces(db, proxy).get();
supervisor::notify("starting view update generator");
view_update_generator.start(std::ref(db), std::ref(proxy)).get();
supervisor::notify("discovering staging sstables");
supervisor::notify("setting up system keyspace");
db::system_keyspace::setup(db, qp, service::get_storage_service()).get();
supervisor::notify("starting commit log");
- 添加messaging_service rpc處理器migration manager
supervisor::notify("initializing migration manager RPC verbs");
service::get_migration_manager().invoke_on_all([] (auto& mm) {
mm.init_messaging_service();
}).get();
- 添加messaging_service rpc處理器storage proxy
supervisor::notify("starting batchlog manager");
proxy.invoke_on_all([] (service::storage_proxy& p) {
p.init_messaging_service();
}).get();
- 啟動(dòng)streaming service
supervisor::notify("starting streaming service");
streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator).get();
- 啟動(dòng)hinted handoff manager
(void)local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this(), ss.shared_from_this());
- messaging service添加REPAIR_CHECKSUM_RANGE處理器
supervisor::notify("starting messaging service");
// Start handling REPAIR_CHECKSUM_RANGE messages
netw::get_messaging_service().invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version) {
auto hv = hash_version ? *hash_version : repair_checksum::legacy;
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db, hv] (auto& keyspace, auto& cf, auto& range) {
return checksum_range(db, keyspace, cf, range, hv);
});
});
}).get();
- 啟動(dòng)storage service
supervisor::notify("starting storage service", true);
ss.init_messaging_service_part().get();
ss.init_server_without_the_messaging_service_part().get();
- 啟動(dòng)batchlog manager
supervisor::notify("starting batchlog manager");
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
return b.start();
}).get();
- 啟動(dòng)load broadcaster
supervisor::notify("starting load broadcaster");
lb->start_broadcasting();
- 啟動(dòng)cf cache hit rate calculator
supervisor::notify("starting cf cache hit rate calculator");
cf_cache_hitrate_calculator.start(std::ref(db), std::ref(cf_cache_hitrate_calculator)).get();
- 啟動(dòng)view update backlog broker
supervisor::notify("starting view update backlog broker");
view_backlog_broker.start(std::ref(proxy), std::ref(gms::get_gossiper())).get();
view_backlog_broker.invoke_on_all(&service::view_update_backlog_broker::start).get();
- 啟動(dòng)外部服務(wù)接口
supervisor::notify("starting native transport");
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
return service::get_local_storage_service().start_native_transport();
}).get();
if (start_thrift) {
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
return service::get_local_storage_service().start_rpc_server();
}).get();
}
if (cfg->alternator_port() || cfg->alternator_https_port()) {
alternator_server.init(addr, alternator_port, alternator_https_port, creds, cfg->alternator_enforce_authorization()).get();
}
從本章角度來說回懦,主要關(guān)注以下幾個(gè)組件
- storage proxy
首先構(gòu)建storage proxy气笙,然后掛鉤message service
主要用于節(jié)點(diǎn)間分布式協(xié)調(diào) - query processor
構(gòu)建cql解析器 - messaging service
節(jié)點(diǎn)間rpc服務(wù),用于服務(wù)組件添加handler處理器怯晕。 - storage service
分布式服務(wù)的核心模塊,與storage engine分別掌管服務(wù)與存儲(chǔ)功能潜圃。 - native transport
提供CQL外部服務(wù)接口
三. 確立組件依賴
- 用戶首先發(fā)送請(qǐng)求給native_transport:
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
return service::get_local_storage_service().start_native_transport();
}).get();
start_native_transport屬于storage_service 模塊.
- 接著start_native_transport[service/storage_service.cc] 啟動(dòng)cql_transport::cql_server
future<> storage_service::start_native_transport() {
return f.then([cserver, configs = std::move(configs), keepalive] {
return parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
slogger.info("Starting listening for CQL clients on {} ({})"
, cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
);
});
});
});
}
- 接著cql_server[transport/server.cc]調(diào)用query_processor進(jìn)行處理
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
{
...
return _server._query_processor.local().process(query, query_state, options).then([this, stream, &query_state, skip_metadata] (auto msg) {
tracing::trace(query_state.get_trace_state(), "Done processing - preparing a result");
return this->make_result(stream, msg, query_state.get_trace_state(), skip_metadata);
}
- 接著query_processor[cql3/query_processor.cc]解析cql后調(diào)用storage proxy進(jìn)行處理
future<shared_ptr<cql_transport::messages::result_message>>
select_statement::execute(service::storage_proxy& proxy,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
const query_options& options,
gc_clock::time_point now)
{
...
return proxy.query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
return this->process_results(std::move(qr.query_result), cmd, options, now);
});
}
- storage_proxy[service/storage_proxy.cc]掛鉤message service,將消息發(fā)送給本機(jī)storage proxy節(jié)點(diǎn)以及其它節(jié)點(diǎn)message service舟茶。其它節(jié)點(diǎn)storage proxy注冊(cè)message service進(jìn)行本機(jī)storage proxy消息處理谭期。
void storage_proxy::init_messaging_service() {
...
ms.register_counter_mutation([] (const rpc::client_info& cinfo, rpc::opt_time_point t, std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info) {
...
}
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
auto opts = want_digest
? query::result_options{query::result_request::result_and_digest, digest_algorithm()}
: query::result_options{query::result_request::only_result, query::digest_algorithm::none};
if (fbu::is_me(ep)) {
tracing::trace(_trace_state, "read_data: querying locally");
return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
} else {
auto& ms = netw::get_local_messaging_service();
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
return ms.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>> result_hit_rate) {
auto&& [result, hit_rate] = result_hit_rate;
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid())));
});
}
}
- 所有節(jié)點(diǎn)的storage proxy進(jìn)行storage engine本地?cái)?shù)據(jù)讀取操作
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, uint64_t max_size) {
return db.query(gs, *cmd, opts, prv, trace_state, max_size, timeout).then([trace_state](auto&& f, cache_temperature ht) {
tracing::trace(trace_state, "Querying is done");
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(make_foreign(std::move(f)), ht));
});
}
至此堵第,整個(gè)分布式架構(gòu)源碼及組件依賴已經(jīng)完成。
- storage_service#native transport 啟動(dòng)調(diào)用cql_server
- cql_server啟動(dòng)并調(diào)用query parser 進(jìn)行解析
- cql_server解析后調(diào)用storage proxy
- storage proxy通過message service進(jìn)行節(jié)點(diǎn)的消息分發(fā)
- 每個(gè)節(jié)點(diǎn)的storage proxy調(diào)用database engine存儲(chǔ)引擎進(jìn)行數(shù)據(jù)的處理操作
請(qǐng)關(guān)注下一篇
由于篇幅關(guān)系隧出,下一篇將會(huì)更細(xì)致得講解各個(gè)組件的內(nèi)容踏志,敬請(qǐng)期待。