和Bitcoin類似橄霉,以太坊的轉(zhuǎn)賬流程基本是這樣的:
1.發(fā)起交易:指定目標(biāo)地址和交易金額,以及必需的gas/gasLimit
2.交易簽名:使用賬戶私鑰對交易進(jìn)行簽名
3.提交交易:驗(yàn)簽交易邑蒋,并將交易提交到交易緩沖池
4.廣播交易:通知以太坊虛擬機(jī)吧交易信息廣播給其他節(jié)點(diǎn)
Eth Transaction結(jié)構(gòu)
首先姓蜂,在源碼中搜索到Transaction結(jié)構(gòu)的定義之處:./core/types/transaction.go
//交易結(jié)構(gòu)體
type Transaction struct {
//交易數(shù)據(jù)
data txdata
// caches
hash atomic.Value
size atomic.Value
from atomic.Value
}
type txdata struct {
//發(fā)送者發(fā)起的交易總數(shù)
AccountNonce uint64 `json:"nonce" gencodec:"required"`
//交易的Gas價(jià)格
Price *big.Int `json:"gasPrice" gencodec:"required"`
//交易允許消耗的最大Gas
GasLimit uint64 `json:"gas" gencodec:"required"`
//交易接收者地址
Recipient *common.Address `json:"to" rlp:"nil"` // nil means contract creation
//交易額
Amount *big.Int `json:"value" gencodec:"required"`
//其他數(shù)據(jù)
Payload []byte `json:"input" gencodec:"required"`
// Signature values
// 交易相關(guān)簽名數(shù)據(jù)
V *big.Int `json:"v" gencodec:"required"`
R *big.Int `json:"r" gencodec:"required"`
S *big.Int `json:"s" gencodec:"required"`
// This is only used when marshaling to JSON.
//交易HAsh
Hash *common.Hash `json:"hash" rlp:"-"`
}
Eth Tx轉(zhuǎn)賬邏輯
1.創(chuàng)建交易
首先我們曾在之前的geth基本功能一篇中使用轉(zhuǎn)賬命令eth.sendTransaction()進(jìn)行過轉(zhuǎn)賬操作。
當(dāng)命令行輸入該指令時(shí)医吊,geth內(nèi)部實(shí)際是調(diào)用了PublicTransactionPoolAPI的sendTransaction接口:./internal/ethapi/api.go
// SendTransaction will create a transaction from the given arguments and
// tries to sign it with the key associated with args.To. If the given passwd isn't
// able to decrypt the key it fails.
// 發(fā)起交易
func (s *PrivateAccountAPI) SendTransaction(ctx context.Context, args SendTxArgs, passwd string) (common.Hash, error) {
//交易參數(shù)相關(guān)判斷
if args.Nonce == nil {
// Hold the addresse's mutex around signing to prevent concurrent assignment of
// the same nonce to multiple accounts.
s.nonceLock.LockAddr(args.From)
defer s.nonceLock.UnlockAddr(args.From)
}
//交易簽名
signed, err := s.signTransaction(ctx, args, passwd)
if err != nil {
return common.Hash{}, err
}
//提交交易
return submitTransaction(ctx, s.b, signed)
}
然后钱慢,我們看一下交易是怎么實(shí)現(xiàn)簽名的。
// signTransactions sets defaults and signs the given transaction
// NOTE: the caller needs to ensure that the nonceLock is held, if applicable,
// and release it after the transaction has been submitted to the tx pool
// 交易簽名
func (s *PrivateAccountAPI) signTransaction(ctx context.Context, args SendTxArgs, passwd string) (*types.Transaction, error) {
// Look up the wallet containing the requested signer
//獲取交易發(fā)起方錢包
account := accounts.Account{Address: args.From}
wallet, err := s.am.Find(account)
if err != nil {
return nil, err
}
// Set some sanity defaults and terminate on failure
if err := args.setDefaults(ctx, s.b); err != nil {
return nil, err
}
// Assemble the transaction and sign with the wallet
//組裝交易
tx := args.toTransaction()
var chainID *big.Int
if config := s.b.ChainConfig(); config.IsEIP155(s.b.CurrentBlock().Number()) {
chainID = config.ChainID
}
//對交易進(jìn)行簽名
return wallet.SignTxWithPassphrase(account, passwd, tx, chainID)
}
繼續(xù)循著toTransaction線索去找創(chuàng)建交易的代碼:
func (args *SendTxArgs) toTransaction() *types.Transaction {
var input []byte
//相關(guān)賦值
if args.Data != nil {
input = *args.Data
} else if args.Input != nil {
input = *args.Input
}
//交易接收方地址為空卿堂,創(chuàng)建的交易為合約交易
if args.To == nil {
return types.NewContractCreation(uint64(*args.Nonce), (*big.Int)(args.Value), uint64(*args.Gas), (*big.Int)(args.GasPrice), input)
}
//創(chuàng)建普通的轉(zhuǎn)賬交易
return types.NewTransaction(uint64(*args.Nonce), *args.To, (*big.Int)(args.Value), uint64(*args.Gas), (*big.Int)(args.GasPrice), input)
}
這里終于找到了創(chuàng)建交易的方法NewTransaction:./core/types/transaction.go
//創(chuàng)建普通交易
func NewTransaction(nonce uint64, to common.Address, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction {
return newTransaction(nonce, &to, amount, gasLimit, gasPrice, data)
}
//創(chuàng)建合約交易
func NewContractCreation(nonce uint64, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction {
return newTransaction(nonce, nil, amount, gasLimit, gasPrice, data)
}
//創(chuàng)建普通交易
func newTransaction(nonce uint64, to *common.Address, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction {
if len(data) > 0 {
data = common.CopyBytes(data)
}
d := txdata{
AccountNonce: nonce,
Recipient: to,
Payload: data,
Amount: new(big.Int),
GasLimit: gasLimit,
Price: new(big.Int),
V: new(big.Int),
R: new(big.Int),
S: new(big.Int),
}
if amount != nil {
d.Amount.Set(amount)
}
if gasPrice != nil {
d.Price.Set(gasPrice)
}
return &Transaction{data: d}
}
2.交易簽名
從上面創(chuàng)建交易的代碼細(xì)節(jié)我們已經(jīng)知道對交易進(jìn)行簽名是通過錢包類的一個(gè)方法實(shí)現(xiàn)的wallet.SignTxWithPassphrase束莫。
源碼在./accounts/keystore/keystore_wallet.go
// SignTxWithPassphrase implements accounts.Wallet, attempting to sign the given
// transaction with the given account using passphrase as extra authentication.
// 交易簽名
func (w *keystoreWallet) SignTxWithPassphrase(account accounts.Account, passphrase string, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) {
// Make sure the requested account is contained within
//判斷賬戶合法性
if account.Address != w.account.Address {
return nil, accounts.ErrUnknownAccount
}
if account.URL != (accounts.URL{}) && account.URL != w.account.URL {
return nil, accounts.ErrUnknownAccount
}
// Account seems valid, request the keystore to sign
//真正的簽名
return w.keystore.SignTxWithPassphrase(account, passphrase, tx, chainID)
}
繼續(xù)深入到簽名函數(shù)里。
// SignTxWithPassphrase signs the transaction if the private key matching the
// given address can be decrypted with the given passphrase.
func (ks *KeyStore) SignTxWithPassphrase(a accounts.Account, passphrase string, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) {
// 判斷賬戶是否解鎖并獲取私鑰
_, key, err := ks.getDecryptedKey(a, passphrase)
if err != nil {
return nil, err
}
defer zeroKey(key.PrivateKey)
// Depending on the presence of the chain ID, sign with EIP155 or homestead
// EIP155規(guī)范需要chainID參數(shù)草描,即平時(shí)命令行使用的“--networkid”參數(shù)
if chainID != nil {
return types.SignTx(tx, types.NewEIP155Signer(chainID), key.PrivateKey)
}
return types.SignTx(tx, types.HomesteadSigner{}, key.PrivateKey)
}
終于見到交易的簽名函數(shù)本尊了麦箍。
// SignTx signs the transaction using the given signer and private key
func SignTx(tx *Transaction, s Signer, prv *ecdsa.PrivateKey) (*Transaction, error) {
//1.對交易進(jìn)行哈希
h := s.Hash(tx)
//2.生成簽名
sig, err := crypto.Sign(h[:], prv)
if err != nil {
return nil, err
}
//3.將簽名數(shù)據(jù)填充到Tx信息中
return tx.WithSignature(s, sig)
}
找到這里后,就可以繼續(xù)深入crypto.Sign方法看下簽名是怎么根據(jù)交易哈希和私鑰生成的陶珠。
// Sign calculates an ECDSA signature.
//
// This function is susceptible to chosen plaintext attacks that can leak
// information about the private key that is used for signing. Callers must
// be aware that the given hash cannot be chosen by an adversery. Common
// solution is to hash any input before calculating the signature.
//
// The produced signature is in the [R || S || V] format where V is 0 or 1.
//根據(jù)ECDSA算法生成簽名,以字節(jié)數(shù)組的形式返回 按[R || S || V]格式
func Sign(hash []byte, prv *ecdsa.PrivateKey) (sig []byte, err error) {
//哈希值判斷
if len(hash) != 32 {
return nil, fmt.Errorf("hash is required to be exactly 32 bytes (%d)", len(hash))
}
seckey := math.PaddedBigBytes(prv.D, prv.Params().BitSize/8)
defer zeroBytes(seckey)
return secp256k1.Sign(hash, seckey)
}
生成簽名后將簽名填充到交易信息的R享钞,S揍诽,V字段。
// WithSignature returns a new transaction with the given signature.
// This signature needs to be formatted as described in the yellow paper (v+27).
// 生成簽名后將簽名填充到交易信息的R栗竖,S暑脆,V字段。
func (tx *Transaction) WithSignature(signer Signer, sig []byte) (*Transaction, error) {
//獲取簽名信息
r, s, v, err := signer.SignatureValues(tx, sig)
if err != nil {
return nil, err
}
//將原有交易信息進(jìn)行一份拷貝
cpy := &Transaction{data: tx.data}
//簽名賦值
cpy.data.R, cpy.data.S, cpy.data.V = r, s, v
return cpy, nil
}
3.交易提交
交易簽名后就可以提交到交易緩沖池狐肢,這里是通過submitTransaction()函數(shù)實(shí)現(xiàn)的添吗。這里涉及到一個(gè)新的數(shù)據(jù)結(jié)構(gòu)交易緩沖池TxPool,所以先來看下TxPool的結(jié)構(gòu):./core/tx_pool.go
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
//交易緩沖池配置
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
//當(dāng)前所有可被處理的交易列表
pending map[common.Address]*txList // All currently processable transactions
//當(dāng)前所有不可被處理的交易隊(duì)列
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
//所有的交易列表 key為交易hash
all *txLookup // All transactions to allow lookups
//將all中的交易按照gas price進(jìn)行排列的數(shù)組份名,gas price相同按noce升序排列
priced *txPricedList // All transactions sorted by price
wg sync.WaitGroup // for shutdown sync
homestead bool
}
這里涉及到兩個(gè)重要的屬性pending和queue碟联,它們的類型都是txList妓美,所以就繼續(xù)看下txList的結(jié)構(gòu)。
// txList is a "list" of transactions belonging to an account, sorted by account
// nonce. The same type can be used both for storing contiguous transactions for
// the executable/pending queue; and for storing gapped transactions for the non-
// executable/future queue, with minor behavioral changes.
type txList struct {
//交易的nonce值是否連續(xù)
strict bool // Whether nonces are strictly continuous or not
//已排序的交易Map
txs *txSortedMap // Heap indexed sorted hash map of the transactions
//最高成本交易價(jià)格
costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
//最高花費(fèi)的gas限制
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
}
...
// txSortedMap is a nonce->transaction hash map with a heap based index to allow
// iterating over the contents in a nonce-incrementing way.
type txSortedMap struct {
//包含所有交易的字典鲤孵,key是交易對應(yīng)nonce
items map[uint64]*types.Transaction // Hash map storing the transaction data
//降序排列的Nonce值數(shù)組
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
//已經(jīng)排序的交易緩存
cache types.Transactions // Cache of the transactions already sorted
}
交易緩沖池這里的邏輯大概是這樣的:交易提交后首先是進(jìn)入到txPool的queue隊(duì)列緩存壶栋,然后再選擇一部分交易進(jìn)入peending列表進(jìn)行處理菱鸥。當(dāng)txPool滿了的時(shí)候近上,會根據(jù)priced的排序規(guī)則去除gas price廉價(jià)的交易來保證txPool正常運(yùn)行。
我們可以看一下Eth默認(rèn)的交易緩沖池配置:
// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal
PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
AccountSlots uint64 // Minimum number of executable transaction slots guaranteed per account
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}
// contains the default configurations for the transaction
// pool.
// TxPool默認(rèn)配置
var DefaultTxPoolConfig = TxPoolConfig{
Journal: "transactions.rlp",
Rejournal: time.Hour,
//允許進(jìn)入交易池的最低gas price
PriceLimit: 1,
//相同Nonce交易 gas price差值超過該值攀操,則使用新的交易
PriceBump: 10,
//pending列表中每個(gè)賬戶存儲的交易處閾值凯正,超過該數(shù)可能被認(rèn)為垃圾交易
AccountSlots: 16,
//pending列表最大長度
GlobalSlots: 4096,
//queue隊(duì)列中每個(gè)賬戶存儲的交易處閾值毙玻,超過該數(shù)可能被認(rèn)為垃圾交易
AccountQueue: 64,
//queue隊(duì)列最大長度
GlobalQueue: 1024,
Lifetime: 3 * time.Hour,
}
現(xiàn)在了解了txPool結(jié)構(gòu)之后,我們終于可以進(jìn)入正題來看submitTransaction()函數(shù)的實(shí)現(xiàn)了:./internal/ethapi/api.go
// submitTransaction is a helper function that submits tx to txPool and logs a message.
// 提交交易到交易池
func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
//b Backend是在eth Service初始化時(shí)創(chuàng)建的廊散,在ethapiBackend(./eth/api_backend.go)
// 通過Backend類真正實(shí)現(xiàn)提交交易
if err := b.SendTx(ctx, tx); err != nil {
return common.Hash{}, err
}
if tx.To() == nil {
signer := types.MakeSigner(b.ChainConfig(), b.CurrentBlock().Number())
from, err := types.Sender(signer, tx)
if err != nil {
return common.Hash{}, err
}
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info("Submitted contract creation", "fullhash", tx.Hash().Hex(), "contract", addr.Hex())
} else {
log.Info("Submitted transaction", "fullhash", tx.Hash().Hex(), "recipient", tx.To())
}
return tx.Hash(), nil
}
按圖索驥桑滩,深入到Bakend.sendTx函數(shù):
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}
然后繼續(xù)找到txPool的addLocal函數(shù):
// AddLocal enqueues a single transaction into the pool if it is valid, marking
// the sender as a local one in the mean time, ensuring it goes around the local
// pricing constraints.
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return pool.addTx(tx, !pool.config.NoLocals)
}
...
// addTx enqueues a single transaction into the pool if it is valid.
// 將一筆普通交易添加到TxPool中
func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
pool.mu.Lock()
defer pool.mu.Unlock()
// Try to inject the transaction and update any state
// 將交易加入交易池queue隊(duì)列
replace, err := pool.add(tx, local)
if err != nil {
return err
}
// If we added a new transaction, run promotion checks and return
// 通過promoteExecutables將queue中部分交易加入到pending列表中進(jìn)行處理
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
pool.promoteExecutables([]common.Address{from})
}
return nil
}
首先,先去看看將交易加入到equeu隊(duì)列的方法add():
// add validates a transaction and inserts it into the non-executable queue for
// later pending promotion and execution. If the transaction is a replacement for
// an already pending or queued one, it overwrites the previous and returns this
// so outer code doesn't uselessly call promote.
//
// If a newly added transaction is marked as local, its sending account will be
// whitelisted, preventing any associated transaction from being dropped out of
// the pool due to pricing constraints.
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the transaction is already known, discard it
//獲取交易hash并以此判斷交易池中是否已存在該筆交易
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
// If the transaction fails basic validation, discard it
// 驗(yàn)證交易合法性
if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxCounter.Inc(1)
return false, err
}
// If the transaction pool is full, discard underpriced transactions
// 如果交易池已滿奸汇,按priced數(shù)組中g(shù)as price較低的交易剔除
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if !local && pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
return false, ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
pool.removeTx(tx.Hash(), false)
}
}
// If the transaction is replacing an already pending one, do directly
// 如果交易已經(jīng)存在于pending列表施符,比較新舊交易gasPrice的差值是否超過PriceBump
// 若超過則使用新交易代替舊交易
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
pool.all.Add(tx)
pool.priced.Put(tx)
pool.journalTx(from, tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// We've directly injected a replacement transaction, notify subsystems
go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
// 將交易添加到equeu隊(duì)列
replace, err := pool.enqueueTx(hash, tx)
if err != nil {
return false, err
}
// Mark local addresses and journal local transactions
// 判斷是否本地交易,保證本地交易優(yōu)先被加入到TxPool
if local {
pool.locals.add(from)
}
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replace, nil
}
這里對交易合法性的驗(yàn)證必須滿足8個(gè)條件:
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
// 交易合法性驗(yàn)證
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
// 1.交易數(shù)據(jù)量必須 < 32KB
if tx.Size() > 32*1024 {
return ErrOversizedData
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
// 2.交易金額必須非負(fù)值
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
// 3.交易的gasLimit必須 < 交易池當(dāng)前規(guī)定最大gas
if pool.currentMaxGas < tx.Gas() {
return ErrGasLimit
}
// Make sure the transaction is signed properly
// 4.交易簽名必須有效
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price
// 5.交易的gas price必須大于交易池設(shè)置的gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
// 6.交易的Nonce值必須大于鏈上該賬戶的Nonce
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
// 7.交易賬戶余額必須 > 交易額 + gasPrice * gasLimit
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
// 8.交易的gasLimit必須 > 對應(yīng)數(shù)據(jù)量所需要的最低gas水平
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
return nil
}
接下來繼續(xù)看擂找,交易從queue隊(duì)列到pending列表又是怎么一個(gè)過程:
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
accounts = append(accounts, addr)
}
}
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
// 1.1丟棄交易nonce值 < 賬戶當(dāng)前nonce的交易
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas)
// 1.2.丟棄賬戶余額不足的
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
// Gather all executable transactions and promote them
// 3.將交易添加到pending列表
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction", "hash", hash)
promoted = append(promoted, tx)
}
}
// Drop all transactions over the allowed limit
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
}
}
// Notify subsystem for new promoted transactions.
if len(promoted) > 0 {
go pool.txFeed.Send(NewTxsEvent{promoted})
}
// If the pending limit is overflown, start equalizing allowances
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
//2 pending列表達(dá)到最大限量
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
for addr, list := range pool.pending {
// Only evict transactions from high rollers
// 統(tǒng)計(jì)高額交易
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
}
}
// Gradually drop transactions from offenders
// 逐漸驅(qū)逐高額交易
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
// Equalize balances until all the same or below threshold
// 均衡各賬戶存儲的交易數(shù)直到交易數(shù)相同
/* 均衡交易數(shù)時(shí)采取的策略是:
2.1.在超出交易數(shù)的賬戶里以交易數(shù)最少的為標(biāo)準(zhǔn)戳吝,將其他賬戶的交易數(shù)削減至該標(biāo)準(zhǔn)
eg:10個(gè)賬戶交易數(shù)超過了AccountSlots(16),其中交易數(shù)最少的為18,則將其他9個(gè)賬戶的交易數(shù)削減至18
2.2.經(jīng)過1后贯涎,pengding長度依舊超過GlobalSlots听哭,此時(shí)按照AccountSlots標(biāo)準(zhǔn)將超標(biāo)的賬戶里交易數(shù)削減至AccountSlots
eg:將2.1里的10個(gè)賬戶的交易數(shù)都削減至AccountSlots(16)
**/
// 2.1
if len(offenders) > 1 {
// Calculate the equalization threshold for all current offenders
// 超標(biāo)賬戶的最低交易數(shù)
threshold := pool.pending[offender.(common.Address)].Len()
// Iteratively reduce all offenders until below limit or threshold reached
// 將其他賬戶的交易數(shù)削減至threshold
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
pool.pendingState.SetNonce(offenders[i], nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
}
// If still above threshold, reduce to limit or min allowance
// 2.2 經(jīng)過1的交易數(shù)均衡后,pengding長度依舊超過GlobalSlots 此時(shí)按照AccountSlots標(biāo)準(zhǔn)將超標(biāo)的賬戶里交易數(shù)削減至AccountSlots
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders {
list := pool.pending[addr]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
// 3.eqeue隊(duì)列長度大于queue隊(duì)列最大長度
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
// 對隊(duì)列里的所有賬戶按最近一次心跳時(shí)間排序
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
}
sort.Sort(addresses)
// Drop transactions until the total is below the limit or only locals remain
// 按順序刪除相關(guān)賬戶的交易塘雳,直到queue隊(duì)列長度符合條件
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
addresses = addresses[:len(addresses)-1]
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
}
在這里promoteExecutables主要有三個(gè)作用:
1.將queue中選出符合條件的交易加入到pending中陆盘。在這之前需要對交易進(jìn)行一些判斷:
1.1丟棄交易nonce值 < 賬戶當(dāng)前nonce的交易
1.2.丟棄賬戶余額不足的
2.對pending列表進(jìn)行清理,以使其滿足相關(guān)配置條件败明。
2.1在超出交易數(shù)的賬戶里以交易數(shù)最少的為標(biāo)準(zhǔn)隘马,將其他賬戶的交易數(shù)削減至該標(biāo)準(zhǔn) eg:10個(gè)賬戶交易數(shù)超過了AccountSlots(16),其中交易數(shù)最少的為18,則將其他9個(gè)賬戶的交易數(shù)削減至18
2.2經(jīng)過1后妻顶,pengding長度依舊超過GlobalSlots酸员,此時(shí)按照AccountSlots標(biāo)準(zhǔn)將超標(biāo)的賬戶里交易數(shù)削減至AccountSlots eg:將2.1里的10個(gè)賬戶的交易數(shù)都削減至AccountSlots(16)
3.對queue隊(duì)列進(jìn)行清理,以使其滿足相關(guān)配置條件讳嘱。
eqeue隊(duì)列長度大于queue隊(duì)列最大長度,按順序刪除相關(guān)賬戶的交易幔嗦,直到queue隊(duì)列長度符合條件
執(zhí)行和廣播交易
接著pool.txFeed.Send發(fā)送一個(gè)TxPreEvent事件,外部呢會通過SubscribeNewTxsEvent()函數(shù)來訂閱該事件:
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}
在源碼中全局搜索這個(gè)函數(shù)沥潭,在./miner/worker.go中發(fā)現(xiàn)一次SubscribeNewTxsEvent的訂閱邀泉。
我們發(fā)現(xiàn)這里訂閱了TxPreEvent事件后,開啟了一個(gè)goroutine來處理該事件,進(jìn)一步分析update函數(shù)汇恤,可以看到庞钢,如果當(dāng)前節(jié)點(diǎn)不挖礦會調(diào)用commitTransactions函數(shù)提交交易;否則會調(diào)用commitNewWork函數(shù)屁置,但其內(nèi)部依然會調(diào)用commitTransactions函數(shù)提交交易焊夸。
func (self *worker) update() {
defer self.txsSub.Unsubscribe()
defer self.chainHeadSub.Unsubscribe()
defer self.chainSideSub.Unsubscribe()
for {
// A real event arrived, process interesting content
select {
// Handle ChainHeadEvent
case <-self.chainHeadCh:
self.commitNewWork()
// Handle ChainSideEvent
case ev := <-self.chainSideCh:
self.uncleMu.Lock()
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
// Handle NewTxsEvent
case ev := <-self.txsCh:
// Apply transactions to the pending state if we're not mining.
//
// Note all transactions received may not be continuous with transactions
// already included in the current mining block. These transactions will
// be automatically eliminated.
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs {
acc, _ := types.Sender(self.current.signer, tx)
txs[acc] = append(txs[acc], tx)
}
txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
//當(dāng)前節(jié)點(diǎn)不挖礦,提交交易
self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.updateSnapshot()
self.currentMu.Unlock()
} else {
// If we're mining, but nothing is being processed, wake on new transactions
// 當(dāng)前節(jié)點(diǎn)為礦工節(jié)點(diǎn)蓝角,commitNewWork進(jìn)行挖礦
if self.config.Clique != nil && self.config.Clique.Period == 0 {
self.commitNewWork()
}
}
// System stopped
case <-self.txsSub.Err():
return
case <-self.chainHeadSub.Err():
return
case <-self.chainSideSub.Err():
return
}
}
}
在上面搜索SubscribeNewTxsEvent函數(shù)時(shí)阱穗,另一個(gè)調(diào)用的地方便是./eth/handler.go。這里和上面一樣也是創(chuàng)建了一個(gè)gorountine來處理TxPreEvent事件使鹅。
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
}
...
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := <-pm.txsCh:
pm.BroadcastTxs(event.Txs)
// Err() channel will be closed when unsubscribing.
case <-pm.txsSub.Err():
return
}
}
}
...
// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
var txset = make(map[*peer]types.Transactions)
// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
peers := pm.peers.PeersWithoutTx(tx.Hash())
for _, peer := range peers {
txset[peer] = append(txset[peer], tx)
}
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
}
// FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for peer, txs := range txset {
peer.AsyncSendTransactions(txs)
}
}
至此揪阶,一筆交易從發(fā)起到構(gòu)建到簽名驗(yàn)證以及緩存到交易池然后廣播給其他節(jié)點(diǎn)的整個(gè)流程的邏輯就看完了。
更多以太坊源碼解析請移駕全球最大同性交友網(wǎng),覺得有用記得給個(gè)小star哦??????
.
.
.
.
互聯(lián)網(wǎng)顛覆世界患朱,區(qū)塊鏈顛覆互聯(lián)網(wǎng)!
--------------------------------------------------20180922 00:28