tips:
本文所有代碼均在libraries\chainbase\include\chainbase.cpp或者plugins/producer_plugin/producer_plugin.cpp中
一、主要的數(shù)據(jù)結構
本文主要簡述EOS交易打包&區(qū)塊同步流程唠粥,而controller_imp類是交易打包&區(qū)塊同步的執(zhí)行核心芳来,所以從controller_imp類出發(fā),可以順藤摸瓜服协,了解整個流程。
首先啦粹,我們來看一下controller_impl類的主要成員變量:
struct controller_impl {
controller& self;
chainbase::database db; // 用于存儲已執(zhí)行的區(qū)塊偿荷,這些區(qū)塊可以回滾,一旦提交就成了不可逆的區(qū)塊
chainbase::database reversible_blocks; ///< a special database to persist blocks that have successfully been applied but are still reversible
block_log blog;
optional<pending_state> pending; // 尚未出的塊
block_state_ptr head; // 當前區(qū)塊的狀態(tài)
fork_database fork_db; // 用于存儲可分叉區(qū)塊的數(shù)據(jù)庫唠椭,可分叉的區(qū)塊都放到這里
wasm_interface wasmif; // wasm虛擬機的runtime
resource_limits_manager resource_limits; // 資源管理
authorization_manager authorization; // 權限管理
controller::config conf;
chain_id_type chain_id;
bool replaying = false;
bool in_trx_requiring_checks = false; ///< if true, checks that are normally skipped on replay (e.g. auth checks) cannot be skipped
}
下面挑幾個成員重點講一下:
db用于存儲已執(zhí)行的區(qū)塊跳纳,這些區(qū)塊可以回滾,一旦提交就不可逆
reversible_blocks用于存儲已執(zhí)行贪嫂,但是這些區(qū)塊是可逆的
pending用于存放當前正在生產(chǎn)或者驗證(收到)的區(qū)塊
fork_db用于存放所有區(qū)塊(包括自己生產(chǎn)和收到的)寺庄,這些區(qū)塊鏈是可分叉的
由于db和reversible_blocks都是chainbase的數(shù)據(jù)庫,這里簡單介紹一下:
EOS 中的數(shù)據(jù)庫索引是依賴chainbase實現(xiàn)的力崇,這個數(shù)據(jù)庫的實現(xiàn)比較簡單斗塘,主要是用內(nèi)存映射文件(memory mappedfile)。它認為LevelDB這種數(shù)據(jù)庫亮靴,性能不行馍盟,也不方便做多級索引,對區(qū)塊鏈來說台猴,狀態(tài)數(shù)據(jù)庫只是賬本日志的快照朽合,對持久化要求沒那么高俱两,所以可以更激進的利用內(nèi)存。
chainbase是一個基于boost::multi_index_container實現(xiàn)的適合頻繁順序讀寫的區(qū)塊鏈內(nèi)存數(shù)據(jù)庫曹步。
它有幾個特性:
1宪彩、多表多索引
2、狀態(tài)state可以持久化并多進程共享
3讲婚、嵌套的寫事務并支持undo處理尿孔。
chainbase::database定義在libraries\chainbase\include\chainbase.cpp和chainbase.hpp文件中,由于代碼太多筹麸,這里就不貼出來了活合,感興趣的同學可以自行閱讀。
fork_db的類型是fork_database物赶,用于存放所有區(qū)塊
/**
* @class fork_database
* @brief manages light-weight state for all potential unconfirmed forks
*
* As new blocks are received, they are pushed into the fork database. The fork
* database tracks the longest chain and the last irreversible block number. All
* blocks older than the last irreversible block are freed after emitting the
* irreversible signal.
*/
class fork_database {
public:
fork_database( const fc::path& data_dir );
~fork_database();
void close();
block_state_ptr get_block(const block_id_type& id)const;
block_state_ptr get_block_in_current_chain_by_num( uint32_t n )const;
// vector<block_state_ptr> get_blocks_by_number(uint32_t n)const;
/**
* Provides a "valid" blockstate upon which other forks may build.
*/
void set( block_state_ptr s );
/** this method will attempt to append the block to an exsting
* block_state and will return a pointer to the new block state or
* throw on error.
*/
block_state_ptr add( signed_block_ptr b, bool trust = false );
block_state_ptr add( block_state_ptr next_block );
void remove( const block_id_type& id );
void add( const header_confirmation& c );
const block_state_ptr& head()const;
/**
* Given two head blocks, return two branches of the fork graph that
* end with a common ancestor (same prior block)
*/
pair< branch_type, branch_type > fetch_branch_from( const block_id_type& first,
const block_id_type& second )const;
/**
* If the block is invalid, it will be removed. If it is valid, then blocks older
* than the LIB are pruned after emitting irreversible signal.
*/
void set_validity( const block_state_ptr& h, bool valid );
void mark_in_current_chain( const block_state_ptr& h, bool in_current_chain );
void prune( const block_state_ptr& h );
/**
* This signal is emited when a block state becomes irreversible, once irreversible
* it is removed unless it is the head block.
*/
signal<void(block_state_ptr)> irreversible;
private:
void set_bft_irreversible( block_id_type id );
unique_ptr<fork_database_impl> my;
}
pending的類型是pending_state的白指,用于存放當前正在生產(chǎn)或者驗證(收到)的區(qū)塊
struct pending_state {
pending_state( database::session&& s )
:_db_session( move(s) ){}
database::session _db_session; // 會話,用于和db數(shù)據(jù)庫交互
block_state_ptr _pending_block_state; // pending區(qū)塊的狀態(tài)
vector<action_receipt> _actions; // push_transaction后的action收據(jù)
controller::block_status _block_status = controller::block_status::incomplete; // pending區(qū)塊的驗證狀態(tài)酵紫,從incomplete到irreversible
void push() {
_db_session.push();
}
};
二告嘲、EOS處理交易和出塊的流程
如果對具體代碼實現(xiàn)沒有興趣,流程圖就已經(jīng)足夠了
交易和區(qū)塊處理主要由生產(chǎn)者插件完成奖地,主要代碼在plugins/producer_plugin/producer_plugin.cpp橄唬。
nodeos程序根據(jù)全局時間(epoch)重復執(zhí)行schedule_production_loop函數(shù),以達到周期性出塊的目的参歹。schedule_production_loop函數(shù)通過start_block函數(shù)判斷該節(jié)點當前是否具有epoch的出塊權限仰楚,如果有,則出塊
void producer_plugin_impl::schedule_production_loop() {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
_timer.cancel();
std::weak_ptr<producer_plugin_impl> weak_this = shared_from_this();
// 開始區(qū)塊
auto result = start_block();
if (result == start_block_result::failed) {
elog("Failed to start a pending block, will try again later");
_timer.expires_from_now( boost::posix_time::microseconds( config::block_interval_us / 10 ));
// we failed to start a block, so try again later?
_timer.async_wait([weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
auto self = weak_this.lock();
if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) {
self->schedule_production_loop();
}
});
} else if (_pending_block_mode == pending_block_mode::producing) {
// we succeeded but block may be exhausted
if (result == start_block_result::succeeded) {
// ship this block off no later than its deadline
static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1));
_timer.expires_at(epoch + boost::posix_time::microseconds(chain.pending_block_time().time_since_epoch().count()));
fc_dlog(_log, "Scheduling Block Production on Normal Block #${num} for ${time}", ("num", chain.pending_block_state()->block_num)("time",chain.pending_block_time()));
} else {
// ship this block off immediately
_timer.expires_from_now( boost::posix_time::microseconds( 0 ));
fc_dlog(_log, "Scheduling Block Production on Exhausted Block #${num} immediately", ("num", chain.pending_block_state()->block_num));
}
// 出塊時間到了犬庇,打包區(qū)塊
_timer.async_wait([&chain,weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
auto self = weak_this.lock();
if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) {
auto res = self->maybe_produce_block();
fc_dlog(_log, "Producing Block #${num} returned: ${res}", ("num", chain.pending_block_state()->block_num)("res", res) );
}
});
}
}
start_block函數(shù)代碼太多僧界,這里只貼出相關代碼
producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
// 調(diào)用controller::start_block函數(shù)
chain.start_block(block_time, blocks_to_confirm);
}
現(xiàn)在回到schedule_production_loop函數(shù),輪到當前節(jié)點出塊時械筛,會設置一個定時器捎泻,定時出塊飒炎,定時器超時會調(diào)用maybe_produce_block函數(shù)埋哟,停止打包,并提交區(qū)塊
bool producer_plugin_impl::maybe_produce_block() {
auto reschedule = fc::make_scoped_exit([this]{
schedule_production_loop();
});
try {
// 出塊
produce_block();
return true;
} FC_LOG_AND_DROP();
fc_dlog(_log, "Aborting block due to produce_block error");
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
chain.abort_block();
return false;
}
void producer_plugin_impl::produce_block() {
FC_ASSERT(_pending_block_mode == pending_block_mode::producing, "called produce_block while not actually producing");
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
const auto& pbs = chain.pending_block_state();
const auto& hbs = chain.head_block_state();
FC_ASSERT(pbs, "pending_block_state does not exist but it should, another plugin may have corrupted it");
auto signature_provider_itr = _signature_providers.find( pbs->block_signing_key );
FC_ASSERT(signature_provider_itr != _signature_providers.end(), "Attempting to produce a block for which we don't have the private key");
//idump( (fc::time_point::now() - chain.pending_block_time()) );
// 交易停止打包到區(qū)塊
chain.finalize_block();
// 對區(qū)塊進行簽名
chain.sign_block( [&]( const digest_type& d ) {
auto debug_logger = maybe_make_debug_time_logger();
return signature_provider_itr->second(d);
} );
// 提交區(qū)塊到db數(shù)據(jù)庫
chain.commit_block();
auto hbt = chain.head_block_time();
//idump((fc::time_point::now() - hbt));
block_state_ptr new_bs = chain.head_block_state();
// for newly installed producers we can set their watermarks to the block they became
if (hbs->active_schedule.version != new_bs->active_schedule.version) {
flat_set<account_name> new_producers;
new_producers.reserve(new_bs->active_schedule.producers.size());
for( const auto& p: new_bs->active_schedule.producers) {
if (_producers.count(p.producer_name) > 0)
new_producers.insert(p.producer_name);
}
for( const auto& p: hbs->active_schedule.producers) {
new_producers.erase(p.producer_name);
}
for (const auto& new_producer: new_producers) {
_producer_watermarks[new_producer] = chain.head_block_num();
}
}
_producer_watermarks[new_bs->header.producer] = chain.head_block_num();
ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]",
("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16))
("n",new_bs->block_num)("t",new_bs->header.timestamp)
("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", new_bs->header.confirmed));
}
現(xiàn)在我們看看commit_block函數(shù)郎汪,該函數(shù)將區(qū)塊放到到fork_db和db數(shù)據(jù)庫中
void commit_block( bool add_to_fork_db ) {
if( add_to_fork_db ) {
pending->_pending_block_state->validated = true;
// 將區(qū)塊放入fork_db
auto new_bsp = fork_db.add( pending->_pending_block_state );
emit( self.accepted_block_header, pending->_pending_block_state );
head = fork_db.head();
FC_ASSERT( new_bsp == head, "committed block did not become the new head in fork database" );
}
// ilog((fc::json::to_pretty_string(*pending->_pending_block_state->block)));
emit( self.accepted_block, pending->_pending_block_state );
if( !replaying ) {
reversible_blocks.create<reversible_block_object>( [&]( auto& ubo ) {
ubo.blocknum = pending->_pending_block_state->block_num;
ubo.set_block( pending->_pending_block_state->block );
});
}
// push到db中
pending->push();
pending.reset();
}
fork_db每次新增區(qū)塊赤赊,都會判斷是否有區(qū)塊不可逆,如果有煞赢,向db數(shù)據(jù)庫發(fā)信號抛计,db數(shù)據(jù)庫就會將不可逆的區(qū)塊提交到數(shù)據(jù)庫中
// fork_database.cpp
block_state_ptr fork_database::add( block_state_ptr n ) {
auto inserted = my->index.insert(n);
FC_ASSERT( inserted.second, "duplicate block added?" );
my->head = *my->index.get<by_lib_block_num>().begin();
auto lib = my->head->dpos_irreversible_blocknum;
auto oldest = *my->index.get<by_block_num>().begin();
// 移除比最后不可逆區(qū)塊還舊的區(qū)塊
if( oldest->block_num < lib ) {
prune( oldest );
}
return n;
}
void fork_database::prune( const block_state_ptr& h ) {
auto num = h->block_num;
auto& by_bn = my->index.get<by_block_num>();
auto bni = by_bn.begin();
while( bni != by_bn.end() && (*bni)->block_num < num ) {
prune( *bni );
bni = by_bn.begin();
}
auto itr = my->index.find( h->id );
if( itr != my->index.end() ) {
// 發(fā)送不可逆信號,使得不可逆區(qū)塊寫入db
irreversible(*itr);
// 移除不可逆區(qū)塊
my->index.erase(itr);
}
auto& numidx = my->index.get<by_block_num>();
auto nitr = numidx.lower_bound( num );
while( nitr != numidx.end() && (*nitr)->block_num == num ) {
auto itr_to_remove = nitr;
++nitr;
auto id = (*itr_to_remove)->id;
remove( id );
}
}
// controller.cpp
void on_irreversible( const block_state_ptr& s ) {
if( !blog.head() )
blog.read_head();
const auto& log_head = blog.head();
FC_ASSERT( log_head );
auto lh_block_num = log_head->block_num();
// 將不可逆區(qū)塊廣播到net_plugin
emit( self.irreversible_block, s );
// 將不可逆區(qū)塊提交到db
db.commit( s->block_num );
if( s->block_num <= lh_block_num ) {
// edump((s->block_num)("double call to on_irr"));
// edump((s->block_num)(s->block->previous)(log_head->id()));
return;
}
FC_ASSERT( s->block_num - 1 == lh_block_num, "unlinkable block", ("s->block_num",s->block_num)("lh_block_num", lh_block_num) );
FC_ASSERT( s->block->previous == log_head->id(), "irreversible doesn't link to block log head" );
blog.append(s->block);
const auto& ubi = reversible_blocks.get_index<reversible_block_index,by_num>();
auto objitr = ubi.begin();
while( objitr != ubi.end() && objitr->blocknum <= s->block_num ) {
reversible_blocks.remove( *objitr );
objitr = ubi.begin();
}
}
三照筑、EOS收到區(qū)塊&處理區(qū)塊流程圖
producer通過on_incoming_block函數(shù)接收區(qū)塊吹截,on_incoming_block會調(diào)用controller::push_block()函數(shù)
void on_incoming_block(const signed_block_ptr& block) {
fc_dlog(_log, "received incoming block ${id}", ("id", block->id()));
FC_ASSERT( block->timestamp < (fc::time_point::now() + fc::seconds(7)), "received a block from the future, ignoring it" );
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
/* de-dupe here... no point in aborting block if we already know the block */
auto id = block->id();
// 區(qū)塊已存在本地瘦陈,丟棄該區(qū)塊
auto existing = chain.fetch_block_by_id( id );
if( existing ) { return; }
// abort the pending block
chain.abort_block();
// exceptions throw out, make sure we restart our loop
auto ensure = fc::make_scoped_exit([this](){
schedule_production_loop();
});
// push the new block
bool except = false;
try {
// 調(diào)用controller::push_block()函數(shù)
chain.push_block(block);
} catch( const fc::exception& e ) {
elog((e.to_detail_string()));
except = true;
}
if( except ) {
app().get_channel<channels::rejected_block>().publish( block );
return;
}
if( chain.head_block_state()->header.timestamp.next().to_time_point() >= fc::time_point::now() )
_production_enabled = true;
if( fc::time_point::now() - block->timestamp < fc::minutes(5) || (block->block_num() % 1000 == 0) ) {
ilog("Received block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, conf: ${confs}, latency: ${latency} ms]",
("p",block->producer)("id",fc::variant(block->id()).as_string().substr(8,16))
("n",block_header::num_from_id(block->id()))("t",block->timestamp)
("count",block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", block->confirmed)("latency", (fc::time_point::now() - block->timestamp).count()/1000 ) );
}
}
我們現(xiàn)在來看controller::push_block()函數(shù)
// 壓入?yún)^(qū)塊
void push_block( const signed_block_ptr& b, controller::block_status s ) {
// idump((fc::json::to_pretty_string(*b)));
FC_ASSERT(!pending, "it is not valid to push a block when there is a pending block");
try {
FC_ASSERT( b );
FC_ASSERT( s != controller::block_status::incomplete, "invalid block status for a completed block" );
bool trust = !conf.force_all_checks && (s == controller::block_status::irreversible || s == controller::block_status::validated);
// 將收到的區(qū)塊加到fork_db
auto new_header_state = fork_db.add( b, trust );
emit( self.accepted_block_header, new_header_state );
maybe_switch_forks( s );
} FC_LOG_AND_RETHROW( )
}
controller::push_block()函數(shù)將區(qū)塊加到fork_db后,會判斷接收到的區(qū)塊和本地區(qū)塊鏈是否在同一條鏈上波俄,如果是晨逝,那么就執(zhí)行驗證交易;如果不是懦铺,那么切換到新的分支捉貌。
void maybe_switch_forks( controller::block_status s = controller::block_status::complete ) {
auto new_head = fork_db.head();
// 節(jié)點當前不出塊,當收到新區(qū)塊時冬念,判斷收到的區(qū)塊和自己本地的區(qū)塊是否能連上(是否分叉)
if( new_head->header.previous == head->id ) {
try {
// 驗證區(qū)塊和交易
apply_block( new_head->block, s );
fork_db.mark_in_current_chain( new_head, true );
fork_db.set_validity( new_head, true );
head = new_head;
} catch ( const fc::exception& e ) {
fork_db.set_validity( new_head, false ); // Removes new_head from fork_db index, so no need to mark it as not in the current chain.
throw;
}
} else if( new_head->id != head->id ) { // 切換分支
ilog("switching forks from ${current_head_id} (block number ${current_head_num}) to ${new_head_id} (block number ${new_head_num})",
("current_head_id", head->id)("current_head_num", head->block_num)("new_head_id", new_head->id)("new_head_num", new_head->block_num) );
auto branches = fork_db.fetch_branch_from( new_head->id, head->id );
for( auto itr = branches.second.begin(); itr != branches.second.end(); ++itr ) {
fork_db.mark_in_current_chain( *itr , false );
// 將原來的區(qū)塊pop出來趁窃,撤銷之前驗證執(zhí)行的交易
pop_block();
}
FC_ASSERT( self.head_block_id() == branches.second.back()->header.previous,
"loss of sync between fork_db and chainbase during fork switch" ); // _should_ never fail
for( auto ritr = branches.first.rbegin(); ritr != branches.first.rend(); ++ritr) {
optional<fc::exception> except;
try {
// 驗證執(zhí)行當前區(qū)塊里面的交易
apply_block( (*ritr)->block, (*ritr)->validated ? controller::block_status::validated : controller::block_status::complete );
head = *ritr;
fork_db.mark_in_current_chain( *ritr, true );
(*ritr)->validated = true;
}
catch (const fc::exception& e) { except = e; }
if (except) {
elog("exception thrown while switching forks ${e}", ("e",except->to_detail_string()));
// ritr currently points to the block that threw
// if we mark it invalid it will automatically remove all forks built off it.
fork_db.set_validity( *ritr, false );
// pop all blocks from the bad fork
// ritr base is a forward itr to the last block successfully applied
auto applied_itr = ritr.base();
for( auto itr = applied_itr; itr != branches.first.end(); ++itr ) {
fork_db.mark_in_current_chain( *itr , false );
pop_block();
}
FC_ASSERT( self.head_block_id() == branches.second.back()->header.previous,
"loss of sync between fork_db and chainbase during fork switch reversal" ); // _should_ never fail
// re-apply good blocks
for( auto ritr = branches.second.rbegin(); ritr != branches.second.rend(); ++ritr ) {
apply_block( (*ritr)->block, controller::block_status::validated /* we previously validated these blocks*/ );
head = *ritr;
fork_db.mark_in_current_chain( *ritr, true );
}
throw *except;
} // end if exception
} /// end for each block in branch
ilog("successfully switched fork to new head ${new_head_id}", ("new_head_id", new_head->id));
}
} /// push_block
最后我們來看一下pop_block函數(shù)
// 撤銷當前區(qū)塊和狀態(tài),恢復到上一個區(qū)塊和狀態(tài)
// 撤銷交易急前,交易轉為unapplied
void pop_block() {
auto prev = fork_db.get_block( head->header.previous );
FC_ASSERT( prev, "attempt to pop beyond last irreversible block" );
if( const auto* b = reversible_blocks.find<reversible_block_object,by_num>(head->block_num) )
{
reversible_blocks.remove( *b );
}
for( const auto& t : head->trxs )
unapplied_transactions[t->signed_id] = t;
head = prev;
db.undo();
}