交易隊(duì)列的輸入
交易隊(duì)列的輸入有兩個(gè)鹦筹,分別是接收到其他節(jié)點(diǎn)的廣播交易和自身節(jié)點(diǎn)提交的交易懂鸵。
分別來看這兩種輸入方式:
-
接收廣播交易
在前面區(qū)塊鏈同步章節(jié)中提到過,接收到交易后會(huì)通過調(diào)用TransactionQueue::enqueue()
來將新交易放入交易隊(duì)列中预明,這個(gè)函數(shù)代碼非常簡(jiǎn)單:void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId) { bool queued = false; { Guard l(x_queue); unsigned itemCount = _data.itemCount(); for (unsigned i = 0; i < itemCount; ++i) { if (m_unverified.size() >= c_maxVerificationQueueSize) { LOG(m_logger) << "Transaction verification queue is full. Dropping " << itemCount - i << " transactions"; break; } m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId)); queued = true; } } if (queued) m_queueReady.notify_all(); }
只是將交易放入m_unverified
中役耕,然后通知校驗(yàn)線程來校驗(yàn)采转。
再來看校驗(yàn)線程:
void TransactionQueue::verifierBody()
{
while (!m_aborting)
{
UnverifiedTransaction work;
{
unique_lock<Mutex> l(x_queue);
m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
if (m_aborting)
return;
work = move(m_unverified.front());
m_unverified.pop_front();
}
try
{
Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
ImportResult ir = import(t);
m_onImport(ir, t.sha3(), work.nodeId);
}
catch (...)
{
// should not happen as exceptions are handled in import.
cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
}
}
}
這里是一個(gè)簡(jiǎn)單的生產(chǎn)消費(fèi)者隊(duì)列,先將UnverifiedTransaction
取出瞬痘,然后調(diào)用import()
函數(shù)進(jìn)行校驗(yàn)故慈。由于使用了線程,校驗(yàn)過程是異步的框全。
-
自身提交交易
節(jié)點(diǎn)自身提交的交易與上面的交易不同察绷,是同步的,也就是直接會(huì)調(diào)用import()
函數(shù)來進(jìn)行校驗(yàn)竣况。ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik) { try { Transaction t = Transaction(_transactionRLP, CheckTransaction::Everything); return import(t, _ik); } catch (Exception const&) { return ImportResult::Malformed; } }
交易的校驗(yàn)
我們來看這個(gè)校驗(yàn)的過程,也就是TransactionQueue::import()
函數(shù)筒严。
ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
{
if (_transaction.hasZeroSignature())
return ImportResult::ZeroSignature;
// Check if we already know this transaction.
h256 h = _transaction.sha3(WithSignature);
ImportResult ret;
{
UpgradableGuard l(m_lock);
auto ir = check_WITH_LOCK(h, _ik);
if (ir != ImportResult::Success)
return ir;
{
_transaction.safeSender(); // Perform EC recovery outside of the write lock
UpgradeGuard ul(l);
ret = manageImport_WITH_LOCK(h, _transaction);
}
}
return ret;
}
可以看到先是調(diào)用TransactionQueue::check_WITH_LOCK()
函數(shù)來做一個(gè)簡(jiǎn)單檢查丹泉。檢查的過程是看這個(gè)交易是否是已經(jīng)校驗(yàn)過的,是否是之前被刪除的鸭蛙。
接著調(diào)用_transaction.safeSender()
摹恨,這個(gè)函數(shù)是通過簽名反推sender
,在交易那一節(jié)已經(jīng)說過娶视。最后調(diào)用manageImport_WITH_LOCK()
函數(shù)來處理交易晒哄。
manageImport_WITH_LOCK()
函數(shù)過程稍稍復(fù)雜睁宰,我們可以一步一步來分析。
第一步:
auto cs = m_currentByAddressAndNonce.find(_transaction.from());
if (cs != m_currentByAddressAndNonce.end())
{
auto t = cs->second.find(_transaction.nonce());
if (t != cs->second.end())
{
if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
return ImportResult::OverbidGasPrice;
else
{
h256 dropped = (*t->second).transaction.sha3();
remove_WITH_LOCK(dropped);
m_onReplaced(dropped);
}
}
}
這一步是檢查m_currentByAddressAndNonce
中是否有重復(fù)項(xiàng)寝凌,判斷標(biāo)準(zhǔn)是sender
和nonce
柒傻,如果存在重復(fù)的則檢查gasPrice
,如果新的交易gasPrice
更低较木,則校驗(yàn)失敗红符,否則將現(xiàn)有的交易刪除,替換為gasPrice
更高的交易伐债。
第二步是檢查m_future
预侯,檢查過程與第一步類似。
第三步:
// If valid, append to transactions.
insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
LOG(m_loggerDetail) << "Queued vaguely legit-looking transaction " << _h;
while (m_current.size() > m_limit)
{
LOG(m_loggerDetail) << "Dropping out of bounds transaction " << _h;
remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
}
m_onReady();
這里調(diào)用insertCurrent_WITH_LOCK()
插入隊(duì)列峰锁,然后將超出容量m_limit
的部分刪除萎馅,并調(diào)用m_onReady()
表示目前隊(duì)列有數(shù)據(jù)了,可以來取了虹蒋。
我們?cè)賮砜?code>insertCurrent_WITH_LOCK()函數(shù)是怎么將交易插入隊(duì)列的糜芳。
void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
{
if (m_currentByHash.count(_p.first))
{
cwarn << "Transaction hash" << _p.first << "already in current?!";
return;
}
Transaction const& t = _p.second;
// Insert into current
auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
inserted.first->second = handle;
m_currentByHash[_p.first] = handle;
// Move following transactions from future to current
makeCurrent_WITH_LOCK(t);
m_known.insert(_p.first);
}
先還是檢查是否有重復(fù)項(xiàng),然后把交易插入m_current
里千诬,并記下插入的位置(迭代器)耍目,再分別加入m_currentByAddressAndNonce
和m_currentByHash
中。
注意到末尾還有個(gè)makeCurrent_WITH_LOCK()
的調(diào)用徐绑,這個(gè)是看情況將m_future
里的交易移到m_current
中邪驮。
交易隊(duì)列的輸出
交易隊(duì)列輸出只有一個(gè),那就是區(qū)塊block傲茄。在Block::sync()
中會(huì)調(diào)用TransactionQueue::topTransactions()
來取隊(duì)列的前N項(xiàng)數(shù)據(jù)毅访,再加入block
中。