bitcoin 代碼中大量使用 boost::signal
, boost::signal 實(shí)現(xiàn)了信號(hào)與槽的事件通知機(jī)制倔约,或者說(shuō)是一種消息的發(fā)布與訂閱機(jī)制蟋定, signal
類型是一個(gè)可調(diào)用類型已球,slot
就是callback 對(duì)象,或者說(shuō)事件的訂閱者谤狡,signal 實(shí)例是一個(gè)可調(diào)用對(duì)象并蝗,調(diào)用signal 對(duì)象,就相當(dāng)于發(fā)布了相應(yīng)的事件, signal 的connect
切蟋, disconnect
方法分別相當(dāng)于對(duì)事件的訂閱统捶,取消。
#include <boost/signals2.hpp>
#include <iostream>
void print_args(float x, float y)
{
std::cout << "The arguments are " << x << " and " << y << std::endl;
}
void print_sum(float x, float y)
{
std::cout << "The sum is " << x + y << std::endl;
}
void print_product(float x, float y)
{
std::cout << "The product is " << x * y << std::endl;
}
void print_difference(float x, float y)
{
std::cout << "The difference is " << x - y << std::endl;
}
void print_quotient(float x, float y)
{
std::cout << "The quotient is " << x / y << std::endl;
}
int main() {
boost::signals2::signal<void(float, float)> sig;
sig.connect(print_args);
sig.connect(print_sum);
sig.connect(print_product);
sig.connect(print_difference);
sig.connect(print_quotient);
sig(5., 3.);
return 0;
上面這個(gè)例子, 有五個(gè)函數(shù)訂閱sig 事件柄粹,sig(5. , 3.) 的調(diào)用觸發(fā)事件喘鸟,參數(shù)5,3驻右,相當(dāng)于事件攜帶的消息paylaod, 傳給了五個(gè)事件訂閱者什黑。
bitcoin 中定義了類型CMainSignals
來(lái)統(tǒng)一管理各個(gè)功能模塊的事件通知,CMainSignal
是一個(gè)資源管理類型堪夭, 主要工作代理給由unique_ptr
管理內(nèi)存的成員 m_internals
, 它的類型是MainSignalsInstance
愕把,內(nèi)部定義十個(gè)boost signal 變量, 分別表達(dá)十種要通知的事件森爽。
class CMainSignals {
private:
std::unique_ptr<MainSignalsInstance> m_internals;
friend void ::RegisterValidationInterface(CValidationInterface*);
friend void ::UnregisterValidationInterface(CValidationInterface*);
friend void ::UnregisterAllValidationInterfaces();
friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
void MempoolEntryRemoved(CTransactionRef tx, MemPoolRemovalReason reason);
public:
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */
void UnregisterBackgroundSignalScheduler();
/** Call any remaining callbacks on the calling thread */
void FlushBackgroundCallbacks();
size_t CallbacksPending();
/** Register with mempool to call TransactionRemovedFromMempool callbacks */
void RegisterWithMempoolSignals(CTxMemPool& pool);
/** Unregister with mempool */
void UnregisterWithMempoolSignals(CTxMemPool& pool);
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef &);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>> &);
void BlockDisconnected(const std::shared_ptr<const CBlock> &);
void SetBestChain(const CBlockLocator &);
void Inventory(const uint256 &);
void Broadcast(int64_t nBestBlockTime, CConnman* connman);
void BlockChecked(const CBlock&, const CValidationState&);
void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&);
};
struct MainSignalsInstance {
boost::signals2::signal<void (const CBlockIndex *, const CBlockIndex *, bool fInitialDownload)> UpdatedBlockTip;
boost::signals2::signal<void (const CTransactionRef &)> TransactionAddedToMempool;
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef>&)> BlockConnected;
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &)> BlockDisconnected;
boost::signals2::signal<void (const CTransactionRef &)> TransactionRemovedFromMempool;
boost::signals2::signal<void (const CBlockLocator &)> SetBestChain;
boost::signals2::signal<void (const uint256 &)> Inventory;
boost::signals2::signal<void (int64_t nBestBlockTime, CConnman* connman)> Broadcast;
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SingleThreadedSchedulerClient m_schedulerClient;
explicit MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
};
類型CValidationInterface
主要是統(tǒng)一表示某些對(duì)MainSignalsInstance
中定義的十個(gè)事件感興趣的對(duì)象恨豁, 即這些事件的訂閱者, 所有對(duì)這些事件感興趣代碼繼承CValidationInterface
類型爬迟,
提供自己版本的這些虛成員函數(shù)的實(shí)現(xiàn)橘蜜,覆蓋baseCValidationInterface
中對(duì)應(yīng)的空方法, 表達(dá)對(duì)相應(yīng)的事件感興趣付呕, 不感興趣的事件的回調(diào)方法繼續(xù)是那些繼承自base class 的空方法计福。
class CValidationInterface {
protected:
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {}
virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {}
virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted) {}
virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block) {}
virtual void SetBestChain(const CBlockLocator &locator) {}
virtual void Inventory(const uint256 &hash) {}
virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) {}
virtual void BlockChecked(const CBlock&, const CValidationState&) {}
virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
friend void ::RegisterValidationInterface(CValidationInterface*);
friend void ::UnregisterValidationInterface(CValidationInterface*);
friend void ::UnregisterAllValidationInterfaces();
};
CValidationInterface 有四個(gè)子類, CWallet , CZMQNotificationInterface, submitblock_StateCatcher, PeerLogicValidation 分別對(duì)應(yīng)四個(gè)對(duì)MainSignalsInstance
中的事件感興趣的訂閱者徽职。
class CWallet final : public CCryptoKeyStore, public CValidationInterface
{
...............
void TransactionAddedToMempool(const CTransactionRef& tx) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex *pindex, const std::vector<CTransactionRef>& vtxConflicted) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) override;
void TransactionRemovedFromMempool(const CTransactionRef &ptx) override;
void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) override;
void SetBestChain(const CBlockLocator& loc) override;
void Inventory(const uint256 &hash) override
{
{
LOCK(cs_wallet);
std::map<uint256, int>::iterator mi = mapRequestCount.find(hash);
if (mi != mapRequestCount.end())
(*mi).second++;
}
}
...............
};
class CZMQNotificationInterface final : public CValidationInterface
{
................
// CValidationInterface
void TransactionAddedToMempool(const CTransactionRef& tx) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
.................
};
class submitblock_StateCatcher : public CValidationInterface
{
public:
uint256 hash;
bool found;
CValidationState state;
explicit submitblock_StateCatcher(const uint256 &hashIn) : hash(hashIn), found(false), state() {}
protected:
void BlockChecked(const CBlock& block, const CValidationState& stateIn) override {
if (block.GetHash() != hash)
return;
found = true;
state = stateIn;
}
};
class PeerLogicValidation : public CValidationInterface, public NetEventsInterface
{
private:
CConnman* const connman;
public:
explicit PeerLogicValidation(CConnman* connman, CScheduler &scheduler);
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void BlockChecked(const CBlock& block, const CValidationState& state) override;
void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) override;
........
};
這四個(gè)訂閱者通過(guò)調(diào)用RegisterValidationInterface
象颖, UnregisterValidationInterface
訂閱,取消事件通知姆钉,函數(shù)接受參數(shù)是指向訂閱者的指針说订。
void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
g_signals.m_internals->TransactionAddedToMempool.connect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
g_signals.m_internals->BlockConnected.connect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
g_signals.m_internals->BlockDisconnected.connect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
g_signals.m_internals->TransactionRemovedFromMempool.connect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
g_signals.m_internals->SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
g_signals.m_internals->Inventory.connect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
g_signals.m_internals->BlockChecked.connect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2));
g_signals.m_internals->NewPoWValidBlock.connect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}
void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->BlockChecked.disconnect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2));
g_signals.m_internals->Broadcast.disconnect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
g_signals.m_internals->Inventory.disconnect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1));
g_signals.m_internals->SetBestChain.disconnect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
g_signals.m_internals->TransactionAddedToMempool.disconnect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1));
g_signals.m_internals->BlockConnected.disconnect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3));
g_signals.m_internals->BlockDisconnected.disconnect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1));
g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
}
在程序啟動(dòng),初始化階段育韩,啟動(dòng)調(diào)度器線程
bool AppInitMain()
{
.................
CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
.................
}
調(diào)用RegisterBackgroundSignalScheduler(), 初始化全局MainSignalsInstance 實(shí)例; 調(diào)用RegisterWithMempoolSignals(), 訂閱全局內(nèi)存池對(duì)象的NotifyEntryRemoved 事件通知, MainSignalsInstance 只對(duì)由于超時(shí)克蚂,大小限制闺鲸, blockchian 重組筋讨,替換等原因發(fā)生的離開內(nèi)存池事件感興趣, 收到后MainSignalsInstance 再作為事件發(fā)布者摸恍, 轉(zhuǎn)發(fā)給其他訂閱者悉罕, 如CWallet赤屋。
bool AppInitMain()
{
......
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
GetMainSignals().RegisterWithMempoolSignals(mempool);
........
}
初始化全局的連接管理對(duì)象connman 后, 初始化全局的peerLogic 對(duì)象壁袄, 然后調(diào)用RegisterValidationInterface(), peerLogic 成為MainSignalsInstance 對(duì)象的訂閱者, 如果用戶編譯了zeromq
支持模塊类早,調(diào)用RegisterValidationInterface(), pzmqNotificationInterface 訂閱MainSignalsInstance 。
bool AppInitMain()
{
.............
assert(!g_connman);
g_connman = std::unique_ptr<CConnman>(new CConnman(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max())));
CConnman& connman = *g_connman;
peerLogic.reset(new PeerLogicValidation(&connman, scheduler));
RegisterValidationInterface(peerLogic.get());
.............
#if ENABLE_ZMQ
pzmqNotificationInterface = CZMQNotificationInterface::Create();
if (pzmqNotificationInterface) {
RegisterValidationInterface(pzmqNotificationInterface);
}
#endif
...............
}
如果錢包功能開啟嗜逻, 啟動(dòng)后打開錢包過(guò)程涩僻,會(huì)把錢包注冊(cè)成為MainSignalsInstance的訂閱者。
bool AppInitMain()
{
........
#ifdef ENABLE_WALLET
if (!OpenWallets())
return false;
#else
LogPrintf("No wallet support compiled in!\n");
#endif
........
}
bool OpenWallets()
{
if (gArgs.GetBoolArg("-disablewallet", DEFAULT_DISABLE_WALLET)) {
LogPrintf("Wallet disabled!\n");
return true;
}
for (const std::string& walletFile : gArgs.GetArgs("-wallet")) {
CWallet * const pwallet = CWallet::CreateWalletFromFile(walletFile);
if (!pwallet) {
return false;
}
vpwallets.push_back(pwallet);
}
return true;
}
CWallet* CWallet::CreateWalletFromFile(const std::string walletFile)
{
......
CWallet *walletInstance = new CWallet(std::move(dbw));
RegisterValidationInterface(walletInstance);
......
}
在submitblock rpc
調(diào)用中栈顷, 用戶提交hex編碼的原始block逆日, 解析后, 調(diào)用ProcessNewBlock()檢查處理萄凤,使用類型submitblock_StateCatcher
的對(duì)象sc 作為MainSignalsInstance 的訂閱者室抽,
對(duì)提交過(guò)去的block 的驗(yàn)證結(jié)果,作為事件通知返回給rpc 調(diào)用靡努。
UniValue submitblock(const JSONRPCRequest& request)
{
...........
submitblock_StateCatcher sc(block.GetHash());
RegisterValidationInterface(&sc);
bool fAccepted = ProcessNewBlock(Params(), blockptr, true, nullptr);
UnregisterValidationInterface(&sc);
...........
}
CMainSignals
類型上面定義了一堆觸發(fā)事件的方法, 別的代碼模塊調(diào)用這些方法坪圾, 觸發(fā)相應(yīng)的事件,把事件通知發(fā)給相關(guān)的訂閱者惑朦。
void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
m_internals->m_schedulerClient.AddToProcessQueue([pindexNew, pindexFork, fInitialDownload, this] {
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
});
}
void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) {
m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
m_internals->TransactionAddedToMempool(ptx);
});
}
void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>>& pvtxConflicted) {
m_internals->m_schedulerClient.AddToProcessQueue([pblock, pindex, pvtxConflicted, this] {
m_internals->BlockConnected(pblock, pindex, *pvtxConflicted);
});
}
void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock> &pblock) {
m_internals->m_schedulerClient.AddToProcessQueue([pblock, this] {
m_internals->BlockDisconnected(pblock);
});
}
void CMainSignals::SetBestChain(const CBlockLocator &locator) {
m_internals->m_schedulerClient.AddToProcessQueue([locator, this] {
m_internals->SetBestChain(locator);
});
}
void CMainSignals::Inventory(const uint256 &hash) {
m_internals->m_schedulerClient.AddToProcessQueue([hash, this] {
m_internals->Inventory(hash);
});
}
void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman* connman) {
m_internals->Broadcast(nBestBlockTime, connman);
}
void CMainSignals::BlockChecked(const CBlock& block, const CValidationState& state) {
m_internals->BlockChecked(block, state);
}
void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
m_internals->NewPoWValidBlock(pindex, block);
}
CChainState 的ActivateBestChain
方法里兽泄, 發(fā)布BlockConnected, UpdatedBlockTip 事件:
bool CChainState::ActivateBestChain(CValidationState &state, const CChainParams& chainparams, std::shared_ptr<const CBlock> pblock) {
..............................
for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
assert(trace.pblock && trace.pindex);
GetMainSignals().BlockConnected(trace.pblock, trace.pindex, trace.conflictedTxs);
}
..............................
GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, fInitialDownload);
...............................
}
CChainState 的AcceptBlock 方法里行嗤,發(fā)布NewPoWValidBlock 事件:
bool CChainState::AcceptBlock(const std::shared_ptr<const CBlock>& pblock, CValidationState& state, const CChainParams& chainparams, CBlockIndex** ppindex, bool fRequested, const CDiskBlockPos* dbp, bool* fNewBlock)
{
.......................
if (!IsInitialBlockDownload() && chainActive.Tip() == pindex->pprev)
GetMainSignals().NewPoWValidBlock(pindex, pblock);
......................
}
CChainState 的DisconnectTip 方法里已日,發(fā)布 BlockDisconnected
事件:
bool CChainState::DisconnectTip(CValidationState& state, const CChainParams& chainparams, DisconnectedBlockTransactions *disconnectpool)
{
...................
GetMainSignals().BlockDisconnected(pblock);
..............
}
CChainState 的ConnectTip 方法里,發(fā)布 BlockChecked
事件:
bool CChainState::ConnectTip(CValidationState& state, const CChainParams& chainparams, CBlockIndex* pindexNew, const std::shared_ptr<const CBlock>& pblock, ConnectTrace& connectTrace, DisconnectedBlockTransactions &disconnectpool)
{
................
CCoinsViewCache view(pcoinsTip.get());
bool rv = ConnectBlock(blockConnecting, state, pindexNew, view, chainparams);
GetMainSignals().BlockChecked(blockConnecting, state);
................
}
PeerLogicValidation 的 SendMessage
方法里栅屏, 發(fā)布Broadcast事件, 通知錢包重新發(fā)送未確認(rèn)的交易:
bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptMsgProc)
{
.............
if (!fReindex && !fImporting && !IsInitialBlockDownload())
{
GetMainSignals().Broadcast(nTimeBestReceived, connman);
}
.............
}
從網(wǎng)絡(luò)上收到INV
消息后飘千,通知給錢包,更新內(nèi)部狀態(tài)栈雳。
bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
{
.................
for (CInv &inv : vInv)
{
............
GetMainSignals().Inventory(inv.hash);
}
................
}
validation.cpp
里面的 ProcessNewBlock
在內(nèi)部調(diào)用 CheckBlock 后护奈,檢查失敗后,發(fā)布 BlockCheck s事件哥纫, 通告給相關(guān)訂閱者霉旗。
bool ProcessNewBlock(const CChainParams& chainparams, const std::shared_ptr<const CBlock> pblock, bool fForceProcessing, bool *fNewBlock)
{
............
bool ret = CheckBlock(*pblock, state, chainparams.GetConsensus());
if (!ret) {
GetMainSignals().BlockChecked(*pblock, state);
return error("%s: AcceptBlock FAILED (%s)", __func__, state.GetDebugMessage());
}
...........
}
FlushStateToDisk
, 發(fā)布SetBestChain
事件, 通知錢包
bool static FlushStateToDisk(const CChainParams& chainparams, CValidationState &state, FlushStateMode mode, int nManualPruneHeight) {
...............
if (fDoFullFlush || ((mode == FLUSH_STATE_ALWAYS || mode == FLUSH_STATE_PERIODIC) && nNow > nLastSetChain + (int64_t)DATABASE_WRITE_INTERVAL * 1000000)) {
// Update best block in wallet (so we can detect restored wallets).
GetMainSignals().SetBestChain(chainActive.GetLocator());
nLastSetChain = nNow;
}
...............
}
validation.cpp
里面的AcceptToMemoryPoolWorker
, 在結(jié)束前蛀骇, 發(fā)布TransactionAddedToMempool
事件厌秒。
static bool AcceptToMemoryPoolWorker(const CChainParams& chainparams, CTxMemPool& pool, CValidationState& state, const CTransactionRef& ptx,
bool* pfMissingInputs, int64_t nAcceptTime, std::list<CTransactionRef>* plTxnReplaced,
bool bypass_limits, const CAmount& nAbsurdFee, std::vector<COutPoint>& coins_to_uncache)
{
................
GetMainSignals().TransactionAddedToMempool(ptx);
}
本文由 Copernicus團(tuán)隊(duì)
喻建寫作,轉(zhuǎn)載無(wú)需授權(quán)