ScyllaDB源碼分析-01: 輕松入門分布式架構(gòu)源碼

0. 大綱

  1. 引言
  2. ScyllaDB架構(gòu)思想篇
  3. ScyllaDB整體流程分析
    a) 了解源碼結(jié)構(gòu)
    b) 快速弄清流程
    c) 確立組件依賴

一. 引言

上節(jié)介紹了ScyllaDB的背景及應(yīng)用場(chǎng)景之后,讓我們來單獨(dú)更多的了解ScyllaDB剑勾。

首先ScyllaDB是個(gè)強(qiáng)p2p數(shù)據(jù)庫(kù)隔披,ElasticSearch以及Kafka是個(gè)簡(jiǎn)化版的p2p數(shù)據(jù)庫(kù),由于它們會(huì)首先選舉出Controller進(jìn)行代理操作。而ScyllaDB不一樣,它采用了基于Ring的操作方式,更加去中心化山憨,客戶端可以連接任意節(jié)點(diǎn)作為集群Coordinator節(jié)點(diǎn)代理接口服務(wù)。當(dāng)然有人會(huì)質(zhì)疑kafka究竟是不是數(shù)據(jù)庫(kù)弥喉,kafka有事務(wù)郁竟,并且官方的目標(biāo)也是構(gòu)造實(shí)時(shí)計(jì)算的數(shù)據(jù)庫(kù)基礎(chǔ)設(shè)施。

p2p數(shù)據(jù)庫(kù)有什么優(yōu)點(diǎn)與缺點(diǎn)呢由境?
由于p2p數(shù)據(jù)庫(kù)是沒有主節(jié)點(diǎn)的棚亩,所以擴(kuò)容是非常方便的,使得運(yùn)維成本大大降低虏杰。只要指定seed節(jié)點(diǎn)讥蟆,分分鐘擴(kuò)容操作即可自動(dòng)完成。seed節(jié)點(diǎn)相當(dāng)于我們的穩(wěn)定代理節(jié)點(diǎn)纺阔,相當(dāng)于整個(gè)集群的地址薄管理處瘸彤,僅作為整個(gè)集群的元數(shù)據(jù)代理入口。
但是另一方面州弟,由于沒有主節(jié)點(diǎn)钧栖,所有的操作必須由選舉的Controller代理進(jìn)行操作(去中心化程度低)低零,或者通過共識(shí)協(xié)議大家商量一起處理(去中心化程度高)婆翔。ScyllaDB屬于后一種情況。這樣就會(huì)導(dǎo)致另一個(gè)問題掏婶,集體成員越多啃奴,進(jìn)行協(xié)商的網(wǎng)絡(luò)效應(yīng)越強(qiáng),使得網(wǎng)絡(luò)產(chǎn)生一定量的延遲雄妥。所以這種共識(shí)機(jī)制往往節(jié)點(diǎn)數(shù)不能過多最蕾,只能維持在上千個(gè)節(jié)點(diǎn)左右依溯。如果要上萬,那么只能進(jìn)行委托進(jìn)行超級(jí)節(jié)點(diǎn)的選舉瘟则。前一種去中心化程度低情況相當(dāng)于超級(jí)節(jié)點(diǎn)為1的選舉黎炉。

p2p特性使得一個(gè)數(shù)據(jù)庫(kù)非常容易維護(hù)。而ScyllaDB不只做了這些醋拧。ScyllaDB強(qiáng)大的特性太多了慷嗜。
ScyllaDB有著強(qiáng)大的服務(wù)接口,reset api, 兼容Cassandra類SQL的CQL查詢丹壕,跨語言的thrift服務(wù)接口庆械,以及兼容DynamoDB的服務(wù)接口。

ScyllaDB有著強(qiáng)大的存儲(chǔ)模型菌赖。內(nèi)存表Memtable缭乘,物理表SSTable,恢復(fù)日志CommitLog,刪除墓碑Tombstones這些都是基本必備琉用。分布式協(xié)調(diào)功能也不可少堕绩,它還有Snitch數(shù)據(jù)網(wǎng)絡(luò)架構(gòu)分布, VNode動(dòng)態(tài)節(jié)點(diǎn)分布, Partitioner基礎(chǔ)分片功能. 然后它還加入了Hinted Hanfdoff, Anti-Entrop, Read Repair, batchlog以及Throttle限流, 使得數(shù)據(jù)一致性及穩(wěn)定性大為增強(qiáng),運(yùn)維自動(dòng)化程度極其高效邑时。甚至似逛尚,它還引入了lwt輕量級(jí)事務(wù)功能,甚至它還允許持久化cache進(jìn)行預(yù)熱刁愿,簡(jiǎn)直是母親版的呵護(hù)绰寞,讓人不禁有十一分的感動(dòng)!

同時(shí)铣口,在此之上ScyllaDB還有強(qiáng)大的數(shù)據(jù)模型滤钱,半結(jié)構(gòu)化支持非常好。有set, list, map脑题,甚至允許自定義數(shù)據(jù)類型件缸,類似于struct(map的固態(tài)模式化版本)。物化視圖功能的引入對(duì)于二級(jí)索引也是非常大的增強(qiáng)叔遂。并且對(duì)于數(shù)據(jù)的時(shí)效性他炊,有ttl時(shí)間周期的支持,非常好用已艰。
當(dāng)然這都是細(xì)節(jié)痊末,在此之上,還有更強(qiáng)大的分區(qū)功能哩掺,寬表功能(基于partition key跟cluster key)凿叠。真的是太強(qiáng)大了,自己慢慢體會(huì)。

對(duì)于集群的運(yùn)行情況盒件,必不可以的就是監(jiān)控功能了蹬碧!ScyllaDB也是非常鍵全的,提供了JMX與prometheus接口炒刁。JMX接口是異常強(qiáng)大恩沽,包羅萬象,甚至可以回調(diào)翔始,實(shí)時(shí)修改參數(shù)飒筑。而基于prometheus之上擔(dān)任了scylla-monitoring,通過Grafana十分友好的展示著數(shù)據(jù)的運(yùn)行情況绽昏。

二. ScyllaDB架構(gòu)思想篇

ScyllDB的架構(gòu)由很多服務(wù)組成协屡。

  1. 啟動(dòng)時(shí),加載配置信息全谤,得到網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)Snitch肤晓。
    接著,加載memtable, sstable, commitlog物理存儲(chǔ)后啟動(dòng)Storage Engine认然。
  2. 通過gossip p2p協(xié)議加入節(jié)點(diǎn)到集群中补憾。
  3. 通過物理存儲(chǔ)建立數(shù)據(jù)模型,生成各種系統(tǒng)keyspaces并進(jìn)行commit log的數(shù)據(jù)恢復(fù)操作卷员,在此之上提供query processor查詢引擎盈匾。
  4. 啟動(dòng)分布式基礎(chǔ)RPC服務(wù), message service, storage proxy, storage service。
  5. 啟動(dòng)外部接口服務(wù), rest api, thrift, cql, alternator毕骡。

我們這里僅講解cql服務(wù)削饵。
1) 當(dāng)用戶調(diào)用cql服務(wù)的時(shí)候,cql調(diào)用storage service進(jìn)行處理未巫。
storage service則根據(jù)操作類型窿撬,選擇本地操作或者分布式操作。如果是分布式操作則通過storage proxy分發(fā)到其它機(jī)器叙凡。

  1. 這里的storage proxy是走的message service 協(xié)議劈伴,這里我們后面會(huì)講到, storage proxy是message service協(xié)議里面的一個(gè)handler,還有其它服務(wù)handler。
  2. 當(dāng)其它機(jī)器接受到message service之后握爷,調(diào)用storage proxy handler進(jìn)行處理跛璧,再進(jìn)一步將控制權(quán)交回了其它機(jī)器的storage service。這里就相當(dāng)了一個(gè)完美的閉環(huán)了新啼。
  3. 在整個(gè)分布式協(xié)調(diào)過程中追城,所有的操作處理最終由底層的Storage Engine完成。

所以基本上整個(gè)流程是:
cql -> storage service -> storage proxy -> message service(rpc) -> storage proxy -> storage service -> storage engine

現(xiàn)在分布式基礎(chǔ)架構(gòu)這里就介紹完了师抄,然后我們來大致過一下源碼漓柑。我們這一節(jié)主要集中在分布式服務(wù)這一塊,后面的Storage Engine需要花更多的篇章完成叨吮,敬請(qǐng)期待辆布。

三. ScyllaDB整體流程分析

如何去快速了解一下源碼呢?這里我們開始進(jìn)行實(shí)踐茶鉴。

首先我們要了解架構(gòu)思想锋玲,前面一個(gè)章節(jié)已經(jīng)講過了,也許你了解得沒有我多涵叮,這個(gè)不是主要問題惭蹂,后面可以慢慢補(bǔ)充。一般這個(gè)周期主要是系統(tǒng)性看書看文檔割粮,在兩個(gè)星期至一個(gè)月左右盾碗。當(dāng)然如果有老司機(jī)帶隊(duì),就可以完全加速了舀瓢。對(duì)于新人的話廷雅,時(shí)間久點(diǎn)也沒關(guān)系,這里只是給出基本目標(biāo)作為參考京髓。

那我們從哪里開始呢航缀?從功能交互開始是最為方便的。

1. 了解源碼

我們知道ScyllaDB的CQL服務(wù)端口是9042, 在配置文件里面進(jìn)行配置:

  • conf/scylla.yaml
# port for the CQL native transport to listen for clients on
# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
native_transport_port: 9042

那么我們對(duì)源碼進(jìn)行搜索

larrys-MacBook-Pro:scylla larluo$ find . -name "*.cc" | grep -v '/tests/' | xargs -I {} grep -nH native_transport_port {}
./db/config.cc:500:        "Enable or disable the native transport server. Uses the same address as the rpc_address, but the port is different from the rpc_port. See native_transport_port.")
./db/config.cc:501:    , native_transport_port(this, "native_transport_port", value_status::Used, 9042,
./db/config.cc:503:    , native_transport_port_ssl(this, "native_transport_port_ssl", value_status::Used, 9142,
./db/config.cc:505:        "Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption"
./db/config.cc:506:        "for native_transport_port. Setting native_transport_port_ssl to a different value"
./db/config.cc:507:        "from native_transport_port will use encryption for native_transport_port_ssl while"
./db/config.cc:508:        "keeping native_transport_port unencrypted")
./db/config.cc:753:    native_transport_port.add_command_line_option(init, "cql-port", "alias for 'native-transport-port'");
./service/storage_service.cc:2294:                std::vector<listen_cfg> configs({ { socket_address{ip, cfg.native_transport_port()} }});
./service/storage_service.cc:2318:                    if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) {
./service/storage_service.cc:2319:                        configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, std::move(cred)});

由此可見這個(gè)服務(wù)是在 service/storage_service.cc 里面啟動(dòng).
進(jìn)一看法進(jìn)行追查可以看到來自于storage_service::start_native_transport:

future<> storage_service::start_native_transport() {
   ...
   ...
                return f.then([cserver, configs = std::move(configs), keepalive] {
                    return parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
                        return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
                            slogger.info("Starting listening for CQL clients on {} ({})"
                                            , cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
                                            );
                        });
                    });

}
...
...

最終可以發(fā)現(xiàn)堰怨,storage service啟動(dòng)了cql_transport::cql_server::listen進(jìn)行服務(wù)監(jiān)聽芥玉。

現(xiàn)在我們有了目標(biāo),就是storage service备图,并且我們從架構(gòu)上也知道了它的功能灿巧。那么我們接著就要看它是如何啟動(dòng)的。

因?yàn)閱?dòng)過程細(xì)節(jié)比較多揽涮,所以我們主要依據(jù)兩條線索砸烦,查看前30名的文件名確立重要模塊專攻點(diǎn),以及日志追蹤绞吁。

larrys-MacBook-Pro:scylla larluo$ find . -name "*.cc" | grep -vE '/tests/|/api/|/alternator/|/thrift/' | xargs -I {} wc -l {} | sort -nr | head -30
    3820 ./types.cc
    3789 ./service/storage_proxy.cc
    3558 ./service/storage_service.cc
    3433 ./sstables/sstables.cc
    2905 ./db/schema_tables.cc
    2592 ./mutation_partition.cc
    2568 ./table.cc
    2562 ./repair/row_level.cc
    2331 ./utils/logalloc.cc
    2319 ./gms/gossiper.cc
    2181 ./db/commitlog/commitlog.cc
    2109 ./db/system_keyspace.cc
    1993 ./database.cc
    1818 ./db/view/view.cc
    1755 ./mutation_reader.cc
    1641 ./transport/server.cc
    1621 ./cql3/statements/select_statement.cc
    1470 ./repair/repair.cc
    1442 ./sstables/mc/writer.cc
    1332 ./row_cache.cc
    1319 ./schema.cc
    1223 ./message/messaging_service.cc
    1167 ./main.cc
    1079 ./service/migration_manager.cc
     998 ./db/hints/manager.cc
     981 ./sstables/compaction.cc
     979 ./flat_mutation_reader.cc
     962 ./cql3/restrictions/statement_restrictions.cc
     905 ./sstables/compaction_strategy.cc
     867 ./sstables/compaction_manager.cc

這里我們已經(jīng)排除掉了api, alternator(DynamoDB), thrift服務(wù)接口幢痘。

主要可以歸類為四部分:
第一部分是基礎(chǔ)模塊
第一部分是網(wǎng)絡(luò)模塊
第二部分是服務(wù)模塊
第三部分是存儲(chǔ)模塊

基礎(chǔ)模塊有:

    3820 ./types.cc
    2331 ./utils/logalloc.cc

網(wǎng)絡(luò)模塊有:

    2319 ./gms/gossiper.cc

服務(wù)模塊有:

    3789 ./service/storage_proxy.cc
    3558 ./service/storage_service.cc
    1641 ./transport/server.cc
    1621 ./cql3/statements/select_statement.cc
    1223 ./message/messaging_service.cc
    1167 ./main.cc
    1079 ./service/migration_manager.cc
     962 ./cql3/restrictions/statement_restrictions.cc

存儲(chǔ)模塊有:

    3433 ./sstables/sstables.cc
    2905 ./db/schema_tables.cc
    2592 ./mutation_partition.cc
    2568 ./table.cc
    2562 ./repair/row_level.cc
    2181 ./db/commitlog/commitlog.cc
    2109 ./db/system_keyspace.cc
    1993 ./database.cc
    1818 ./db/view/view.cc
    1755 ./mutation_reader.cc
    1470 ./repair/repair.cc
    1442 ./sstables/mc/writer.cc
    1332 ./row_cache.cc
    1319 ./schema.cc
     998 ./db/hints/manager.cc
     981 ./sstables/compaction.cc
     979 ./flat_mutation_reader.cc
     905 ./sstables/compaction_strategy.cc
     867 ./sstables/compaction_manager.cc

基礎(chǔ)模塊屬于源碼細(xì)節(jié),我們暫且不過多關(guān)注家破,需要時(shí)再去了解颜说。

  • 網(wǎng)絡(luò)模塊比較底層,我們后面再來分析汰聋,這里代碼量不多是因?yàn)橹饕δ芏际窃诨A(chǔ)庫(kù)中實(shí)現(xiàn)门粪,復(fù)雜度不一定低。
  • 服務(wù)模塊跟分布式架構(gòu)比較密切烹困。也是我們這章的關(guān)注點(diǎn)玄妈。
    ./main.cc為程序啟動(dòng)入口
    ./service/storage_proxy.cc, ./service/storage_service.cc這兩個(gè)已經(jīng)提到比較多了,屬于核心模塊.
    ./message/messaging_service.cc 前面提到過,屬于不同節(jié)點(diǎn)的通訊接口拟蜻,被storage_proxy使用绎签。
    ./transport/server.cc 里面包含了我們的cql_server外部接口實(shí)現(xiàn),用于處理用戶交互協(xié)議
    ./cql3/statements/select_statement.cc 里面包含cql解析引擎,也是數(shù)據(jù)庫(kù)查詢引擎的核心技術(shù)酝锅。
    ./cql3/restrictions/statement_restrictions.cc 里面包含了cql解析引擎之后的存儲(chǔ)查詢轉(zhuǎn)換诡必,相當(dāng)于將前面解析的cql轉(zhuǎn)化為執(zhí)行計(jì)劃的過程。
  • 存儲(chǔ)模塊是數(shù)據(jù)庫(kù)的核心搔扁。所以占比非常大爸舒,內(nèi)容非常多,我們第一步先不分析它稿蹲,后面進(jìn)行拆解分析扭勉。這里唯一需要了解的地方是./database.cc 代表了系統(tǒng)架構(gòu)中的Storage Engine角色,是整個(gè)存儲(chǔ)模塊的入口點(diǎn)苛聘。

2. 快速弄清流程

接著進(jìn)入日志追蹤階段涂炎,也就是流程梳理階段。
有條件的可以運(yùn)行集群并查看日志焰盗,或者從生產(chǎn)環(huán)境拿日志璧尸。
這里我為了加強(qiáng)功力,采用零依賴手撕源碼熬拒。有條件的可以用日志來輔助分析爷光。
那么現(xiàn)在就從./main.cc開始,首先查看打印消息澎粟。
代碼打印消息是非常要用的蛀序,切記,也是運(yùn)維排錯(cuò)的第一入手點(diǎn)活烙。

int main(int ac, char** av) { 
    print_starting_message(ac, av, parsed_opts);
    ...
    return app.run(ac, av, [&] () -> future<int> {
        ...
        return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator,
                               &feature_service] {
            ...
            startlog.info("Scylla version {} starting.", scylla_version());
            ...
            supervisor::notify("starting prometheus API server");
            ...
            supervisor::notify("creating tracing");
            ...
            supervisor::notify("creating snitch");
            ...
            supervisor::notify("determining DNS name");
            ...
            supervisor::notify("starting API server");
            ...
            supervisor::notify("initializing storage service");
            ...
            supervisor::notify("starting per-shard database core");
            ...
            supervisor::notify("creating data directories");
            ...
            supervisor::notify("creating commitlog directory");
            ...
            supervisor::notify("creating hints directories");
            ...
            supervisor::notify("verifying directories");
            ...
            supervisor::notify("starting gossip");
            ...
            supervisor::notify("starting storage proxy");
            ...
            supervisor::notify("starting migration manager");
            ...
            supervisor::notify("starting query processor");
            ...
            supervisor::notify("initializing batchlog manager");
            ...
            supervisor::notify("loading system sstables");
            ...
            supervisor::notify("loading non-system sstables");
            ...
            supervisor::notify("starting view update generator");
            ...
            supervisor::notify("discovering staging sstables");
            ...
            supervisor::notify("setting up system keyspace");
            ...
            supervisor::notify("starting commit log");
                    ...
                    supervisor::notify("replaying commit log");
                    ...
                    supervisor::notify("replaying commit log - flushing memtables");
                    ...
                    supervisor::notify("replaying commit log - removing old commitlog segments");
                    ...
            supervisor::notify("initializing migration manager RPC verbs");
            ...
            supervisor::notify("initializing storage proxy RPC verbs");
            ...
            supervisor::notify("starting streaming service");
            ...
            supervisor::notify("starting hinted handoff manager");
            ...
            supervisor::notify("starting messaging service");
            ...
            supervisor::notify("starting storage service", true);
            ...
            supervisor::notify("starting batchlog manager");
            ...
            supervisor::notify("starting load broadcaster");
            ...
            supervisor::notify("starting cf cache hit rate calculator");
            ...
            supervisor::notify("starting view update backlog broker");
            ...
            supervisor::notify("allow replaying hints");
            ...
            supervisor::notify("starting native transport");
            ...
            supervisor::notify("serving");
            ...
            startlog.info("Scylla version {} initialization completed.", scylla_version());
            ...
          startlog.info("Scylla version {} shutdown complete.", scylla_version());

    }
  }
}

這里面內(nèi)容不算多徐裸,我們快速過一遍,我們可以看到:

  • 啟動(dòng)prometheus監(jiān)控及tracing啸盏。
            supervisor::notify("starting prometheus API server");
                (void)prometheus::start(prometheus_server, pctx);

            supervisor::notify("creating tracing");
            tracing::tracing::create_tracing(tracing_backend_registry, "trace_keyspace_helper").get();

  • 加載網(wǎng)絡(luò)信息snitch
            supervisor::notify("creating snitch");
            i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
            supervisor::notify("determining DNS name");
                    return gms::inet_address::lookup(api_address, family, preferred).get0();

  • 啟動(dòng)api服務(wù)
    API服務(wù)是先創(chuàng)建重贺,最后整個(gè)流程完成后開啟。
            supervisor::notify("starting API server");
            with_scheduling_group(maintenance_scheduling_group, [&] {
                return ctx.http_server.listen(socket_address{ip, api_port});
            }).get();
            startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port);
            ...
            api::set_server_done(ctx).get();
            supervisor::notify("serving");

  • 構(gòu)建storage service
            supervisor::notify("initializing storage service");         
            (void)init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg);
  • 構(gòu)建Storage Engine
            supervisor::notify("starting per-shard database core");
            db.start(std::ref(*cfg), dbcfg).get();
            supervisor::notify("creating data directories");
            dirs.touch_and_lock(db.local().get_config().data_file_directories()).get();
            supervisor::notify("creating commitlog directory");
            dirs.touch_and_lock(db.local().get_config().commitlog_directory()).get();
            supervisor::notify("creating hints directories");
                dirs.touch_and_lock(db.local().get_config().hints_directory()).get();
            supervisor::notify("verifying directories");

  • 構(gòu)建message service以及gossip組件
            supervisor::notify("starting gossip");
            init_ms_fd_gossiper(gossiper
                    , feature_service
                    , *cfg
                    , listen_address
                    , storage_port
                    , ssl_storage_port
                    , tcp_nodelay_inter_dc
                    , encrypt_what
                    , trust_store
                    , cert
                    , key
                    , prio
                    , clauth
                    , cfg->internode_compression()
                    , seed_provider
                    , memory::stats().total_memory()
                    , scfg
                    , cluster_name
                    , phi
                    , cfg->listen_on_broadcast_address());
  • 構(gòu)建storage proxy
            supervisor::notify("starting storage proxy");
            proxy.start(std::ref(db), spcfg, std::ref(node_backlog)).get();
  • 構(gòu)建migration manager
            supervisor::notify("starting migration manager");
            mm.start().get();
  • 構(gòu)建batchlog
            supervisor::notify("initializing batchlog manager");
            db::get_batchlog_manager().start(std::ref(qp), bm_cfg).get();
  • 加載數(shù)據(jù)模型
            supervisor::notify("loading system sstables");

            distributed_loader::ensure_system_table_directories(db).get();

            supervisor::notify("loading non-system sstables");
            distributed_loader::init_non_system_keyspaces(db, proxy).get();

            supervisor::notify("starting view update generator");
            view_update_generator.start(std::ref(db), std::ref(proxy)).get();
            supervisor::notify("discovering staging sstables");
            supervisor::notify("setting up system keyspace");
            db::system_keyspace::setup(db, qp, service::get_storage_service()).get();
            supervisor::notify("starting commit log");

  • 添加messaging_service rpc處理器migration manager
            supervisor::notify("initializing migration manager RPC verbs");
            service::get_migration_manager().invoke_on_all([] (auto& mm) {
                mm.init_messaging_service();
            }).get();
  • 添加messaging_service rpc處理器storage proxy
            supervisor::notify("starting batchlog manager");
            proxy.invoke_on_all([] (service::storage_proxy& p) {
                p.init_messaging_service();
            }).get();
  • 啟動(dòng)streaming service
            supervisor::notify("starting streaming service");
            streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator).get();
  • 啟動(dòng)hinted handoff manager
                (void)local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this(), ss.shared_from_this());
  • messaging service添加REPAIR_CHECKSUM_RANGE處理器
            supervisor::notify("starting messaging service");
            // Start handling REPAIR_CHECKSUM_RANGE messages
            netw::get_messaging_service().invoke_on_all([&db] (auto& ms) {
                ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version) {
                    auto hv = hash_version ? *hash_version : repair_checksum::legacy;
                    return do_with(std::move(keyspace), std::move(cf), std::move(range),
                            [&db, hv] (auto& keyspace, auto& cf, auto& range) {
                        return checksum_range(db, keyspace, cf, range, hv);
                    });
                });
            }).get();

  • 啟動(dòng)storage service
            supervisor::notify("starting storage service", true);
            ss.init_messaging_service_part().get();
            ss.init_server_without_the_messaging_service_part().get();
  • 啟動(dòng)batchlog manager
            supervisor::notify("starting batchlog manager");
            db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
                return b.start();
            }).get();

  • 啟動(dòng)load broadcaster
            supervisor::notify("starting load broadcaster");
            lb->start_broadcasting();
  • 啟動(dòng)cf cache hit rate calculator
            supervisor::notify("starting cf cache hit rate calculator");
            cf_cache_hitrate_calculator.start(std::ref(db), std::ref(cf_cache_hitrate_calculator)).get();
  • 啟動(dòng)view update backlog broker
            supervisor::notify("starting view update backlog broker");

            view_backlog_broker.start(std::ref(proxy), std::ref(gms::get_gossiper())).get();
            view_backlog_broker.invoke_on_all(&service::view_update_backlog_broker::start).get();

  • 啟動(dòng)外部服務(wù)接口
            supervisor::notify("starting native transport");
            with_scheduling_group(dbcfg.statement_scheduling_group, [] {
                return service::get_local_storage_service().start_native_transport();
            }).get();
            if (start_thrift) {
                with_scheduling_group(dbcfg.statement_scheduling_group, [] {
                    return service::get_local_storage_service().start_rpc_server();
                }).get();
            }
            if (cfg->alternator_port() || cfg->alternator_https_port()) {
                alternator_server.init(addr, alternator_port, alternator_https_port, creds, cfg->alternator_enforce_authorization()).get();
            }

從本章角度來說回懦,主要關(guān)注以下幾個(gè)組件

  • storage proxy
    首先構(gòu)建storage proxy气笙,然后掛鉤message service
    主要用于節(jié)點(diǎn)間分布式協(xié)調(diào)
  • query processor
    構(gòu)建cql解析器
  • messaging service
    節(jié)點(diǎn)間rpc服務(wù),用于服務(wù)組件添加handler處理器怯晕。
  • storage service
    分布式服務(wù)的核心模塊,與storage engine分別掌管服務(wù)與存儲(chǔ)功能潜圃。
  • native transport
    提供CQL外部服務(wù)接口

三. 確立組件依賴

  • 用戶首先發(fā)送請(qǐng)求給native_transport:
            with_scheduling_group(dbcfg.statement_scheduling_group, [] {
                return service::get_local_storage_service().start_native_transport();
            }).get();

start_native_transport屬于storage_service 模塊.

  • 接著start_native_transport[service/storage_service.cc] 啟動(dòng)cql_transport::cql_server
future<> storage_service::start_native_transport() {

                return f.then([cserver, configs = std::move(configs), keepalive] {
                    return parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
                        return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
                            slogger.info("Starting listening for CQL clients on {} ({})"
                                            , cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
                                            );
                        });
                    });

                });
}
  • 接著cql_server[transport/server.cc]調(diào)用query_processor進(jìn)行處理
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
{
    ...
    return _server._query_processor.local().process(query, query_state, options).then([this, stream, &query_state, skip_metadata] (auto msg) {
         tracing::trace(query_state.get_trace_state(), "Done processing - preparing a result");
         return this->make_result(stream, msg, query_state.get_trace_state(), skip_metadata);
}
  • 接著query_processor[cql3/query_processor.cc]解析cql后調(diào)用storage proxy進(jìn)行處理
future<shared_ptr<cql_transport::messages::result_message>>
select_statement::execute(service::storage_proxy& proxy,
                          lw_shared_ptr<query::read_command> cmd,
                          dht::partition_range_vector&& partition_ranges,
                          service::query_state& state,
                          const query_options& options,
                          gc_clock::time_point now)
{
        ...
        return proxy.query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
            .then([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
                return this->process_results(std::move(qr.query_result), cmd, options, now);
            });
}
  • storage_proxy[service/storage_proxy.cc]掛鉤message service,將消息發(fā)送給本機(jī)storage proxy節(jié)點(diǎn)以及其它節(jié)點(diǎn)message service舟茶。其它節(jié)點(diǎn)storage proxy注冊(cè)message service進(jìn)行本機(jī)storage proxy消息處理谭期。
void storage_proxy::init_messaging_service() {
    ...
    ms.register_counter_mutation([] (const rpc::client_info& cinfo, rpc::opt_time_point t, std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info) {
    ...
}


    future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
        ++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
        auto opts = want_digest
                  ? query::result_options{query::result_request::result_and_digest, digest_algorithm()}
                  : query::result_options{query::result_request::only_result, query::digest_algorithm::none};
        if (fbu::is_me(ep)) {
            tracing::trace(_trace_state, "read_data: querying locally");
            return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
        } else {
            auto& ms = netw::get_local_messaging_service();
            tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
            return ms.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>> result_hit_rate) {
                auto&& [result, hit_rate] = result_hit_rate;
                tracing::trace(_trace_state, "read_data: got response from /{}", ep);
                return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid())));
            });
        }
    }

  • 所有節(jié)點(diǎn)的storage proxy進(jìn)行storage engine本地?cái)?shù)據(jù)讀取操作
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
                                  tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, uint64_t max_size) {
            return db.query(gs, *cmd, opts, prv, trace_state, max_size, timeout).then([trace_state](auto&& f, cache_temperature ht) {
                tracing::trace(trace_state, "Querying is done");
                return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(make_foreign(std::move(f)), ht));
            });
}

至此堵第,整個(gè)分布式架構(gòu)源碼及組件依賴已經(jīng)完成。

  1. storage_service#native transport 啟動(dòng)調(diào)用cql_server
  2. cql_server啟動(dòng)并調(diào)用query parser 進(jìn)行解析
  3. cql_server解析后調(diào)用storage proxy
  4. storage proxy通過message service進(jìn)行節(jié)點(diǎn)的消息分發(fā)
  5. 每個(gè)節(jié)點(diǎn)的storage proxy調(diào)用database engine存儲(chǔ)引擎進(jìn)行數(shù)據(jù)的處理操作

請(qǐng)關(guān)注下一篇

由于篇幅關(guān)系隧出,下一篇將會(huì)更細(xì)致得講解各個(gè)組件的內(nèi)容踏志,敬請(qǐng)期待。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末鸳劳,一起剝皮案震驚了整個(gè)濱河市狰贯,隨后出現(xiàn)的幾起案子也搓,更是在濱河造成了極大的恐慌赏廓,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件傍妒,死亡現(xiàn)場(chǎng)離奇詭異幔摸,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)颤练,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門既忆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人嗦玖,你說我怎么就攤上這事患雇。” “怎么了宇挫?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵苛吱,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我器瘪,道長(zhǎng)翠储,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任橡疼,我火速辦了婚禮援所,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘欣除。我一直安慰自己住拭,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布历帚。 她就那樣靜靜地躺著滔岳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抹缕。 梳的紋絲不亂的頭發(fā)上澈蟆,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音卓研,去河邊找鬼趴俘。 笑死睹簇,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的寥闪。 我是一名探鬼主播太惠,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼疲憋!你這毒婦竟也來了凿渊?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤缚柳,失蹤者是張志新(化名)和其女友劉穎埃脏,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體秋忙,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡彩掐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了灰追。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片堵幽。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖弹澎,靈堂內(nèi)的尸體忽然破棺而出朴下,到底是詐尸還是另有隱情,我是刑警寧澤苦蒿,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布殴胧,位于F島的核電站,受9級(jí)特大地震影響刽肠,放射性物質(zhì)發(fā)生泄漏溃肪。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一音五、第九天 我趴在偏房一處隱蔽的房頂上張望惫撰。 院中可真熱鬧,春花似錦躺涝、人聲如沸厨钻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽夯膀。三九已至,卻和暖如春苍蔬,著一層夾襖步出監(jiān)牢的瞬間诱建,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工碟绑, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留俺猿,地道東北人茎匠。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像押袍,于是被迫代替她去往敵國(guó)和親诵冒。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • feisky云計(jì)算谊惭、虛擬化與Linux技術(shù)筆記posts - 1014, comments - 298, trac...
    不排版閱讀 3,815評(píng)論 0 5
  • Apache Cassandra 是一個(gè)開源的汽馋、分布式、去中心化圈盔、彈性可擴(kuò)展豹芯、高可用性、容錯(cuò)药磺、一致性可調(diào)告组、面向行的...
    梁睿坤閱讀 13,998評(píng)論 2 25
  • FastDFS FastDFS的作者余慶在其 GitHub 上是這樣描述的:“FastDFS is an open...
    周若谷閱讀 1,782評(píng)論 0 5
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,089評(píng)論 1 32
  • 國(guó)家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說閱讀 10,869評(píng)論 6 13