environment:
fabric v2.2.0
peer加入通道流程
- 接收加入通道命令
- 進(jìn)行背書流程
- 調(diào)用CSCC系統(tǒng)鏈碼
- Peer本地文件初始化磅氨、數(shù)據(jù)庫初始化
- 開啟gossip服務(wù),leader Peer選舉并連接orderer
- leader peer開啟一個不斷向orderer請求區(qū)塊的服務(wù)
- 開啟一個服務(wù)不斷從gossip.payload獲取區(qū)塊寫入賬本
- 對于每個新創(chuàng)建的channel寞宫,需要install每一個sys chaincode(DeploySysCCs函數(shù))
peer&orderer deliver block流程圖
peer拉取區(qū)塊和peer寫入?yún)^(qū)塊流程代碼
在文件internal/peer/channel/join.go
函數(shù)調(diào)用順序?yàn)?code>joinCmd=>join=>executeJoin幻碱,然后接著背書毕籽,調(diào)用CSCC系統(tǒng)鏈碼税肪。
CSCC鏈碼在文件core/scc/cscc/configure.go
芙扎,調(diào)用函數(shù)順序Invoke=>InvokeNoShim=>joinChain
// joinChain will join the specified chain in the configuration block.
// Since it is the first block, it is the genesis block containing configuration
// for this chain, so we want to update the Chain object with this info
func (e *PeerConfiger) joinChain(
channelID string,
block *common.Block,
deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider,
lr plugindispatcher.LifecycleResources,
nr plugindispatcher.CollectionAndLifecycleResources,
) pb.Response {
if err := e.peer.CreateChannel(channelID, block, deployedCCInfoProvider, lr, nr); err != nil {
return shim.Error(err.Error())
}
return shim.Success(nil)
}
位于core/peer/peer.go
的函數(shù)CreateChannel
。
func (p *Peer) CreateChannel(
cid string,
cb *common.Block,
deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider,
legacyLifecycleValidation plugindispatcher.LifecycleResources,
newLifecycleValidation plugindispatcher.CollectionAndLifecycleResources,
) error {
// 初始化本地文件胚宦,數(shù)據(jù)庫等
l, err := p.LedgerMgr.CreateLedger(cid, cb)
if err != nil {
return errors.WithMessage(err, "cannot create ledger from genesis block")
}
// 開啟gossip服務(wù)首有,leader Peer選舉并連接orderer的grpc服務(wù)
if err := p.createChannel(cid, l, deployedCCInfoProvider, legacyLifecycleValidation, newLifecycleValidation); err != nil {
return err
}
p.initChannel(cid)
return nil
}
-
CreateLedger
函數(shù)主要是按照mychannel.block創(chuàng)建賬本文件,db數(shù)據(jù)庫等 -
createChannel
函數(shù)主要是開啟gossip服務(wù)枢劝,請求orderer拉取block和寫區(qū)塊流程
這里主要看兩個流程井联,對應(yīng)上面流程圖中peer寫入?yún)^(qū)塊流程:createChannel=>InitializeChannel=>NewGossipStateProvider=>deliverPayloads
和leader peer拉取區(qū)塊流程(設(shè)計(jì)節(jié)點(diǎn)為leader peer):createChannel=>InitializeChannel=>StartDeliverForChannel=>DeliverBlocks
peer寫入?yún)^(qū)塊服務(wù)
直接看位于gossip/state/state.go
的函數(shù)deliverPayloads
func (s *GossipStateProviderImpl) deliverPayloads() {
for {
select {
// Wait for notification that next seq has arrived
// 等待通道payloads readyChan接收信號
case <-s.payloads.Ready():
s.logger.Debugf("[%s] Ready to transfer payloads (blocks) to the ledger, next block number is = [%d]", s.chainID, s.payloads.Next())
// Collect all subsequent payloads
for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
rawBlock := &common.Block{}
if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
s.logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
continue
}
if rawBlock.Data == nil || rawBlock.Header == nil {
s.logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
payload.SeqNum, rawBlock.Header, rawBlock.Data)
continue
}
s.logger.Debugf("[%s] Transferring block [%d] with %d transaction(s) to the ledger", s.chainID, payload.SeqNum, len(rawBlock.Data.Data))
// Read all private data into slice
var p util.PvtDataCollections
if payload.PrivateData != nil {
err := p.Unmarshal(payload.PrivateData)
if err != nil {
s.logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
continue
}
}
// 寫入?yún)^(qū)塊
if err := s.commitBlock(rawBlock, p); err != nil {
if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
s.logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
return
}
s.logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
}
}
case <-s.stopCh:
s.logger.Debug("State provider has been stopped, finishing to push new blocks.")
return
}
}
}
這個服務(wù)就是不斷的等待payloadbuffer出現(xiàn)消息,如果有就從buffer拿出block寫到本地賬本
位于gossip/state/payloads_buffer.go
就是payloadbuffer接口
peer從orderer拉取區(qū)塊服務(wù)
// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (d *Deliverer) DeliverBlocks() {
failureCounter := 0
totalDuration := time.Duration(0)
// InitialRetryDelay * backoffExponentBase^n > MaxRetryDelay
// backoffExponentBase^n > MaxRetryDelay / InitialRetryDelay
// n * log(backoffExponentBase) > log(MaxRetryDelay / InitialRetryDelay)
// n > log(MaxRetryDelay / InitialRetryDelay) / log(backoffExponentBase)
maxFailures := int(math.Log(float64(d.MaxRetryDelay)/float64(d.InitialRetryDelay)) / math.Log(backoffExponentBase))
for {
select {
case <-d.DoneC:
return
default:
}
if failureCounter > 0 {
var sleepDuration time.Duration
if failureCounter-1 > maxFailures {
sleepDuration = d.MaxRetryDelay
} else {
sleepDuration = time.Duration(math.Pow(1.2, float64(failureCounter-1))*100) * time.Millisecond
}
totalDuration += sleepDuration
if totalDuration > d.MaxRetryDuration {
if d.YieldLeadership {
d.Logger.Warningf("attempted to retry block delivery for more than %v, giving up", d.MaxRetryDuration)
return
}
d.Logger.Warningf("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold")
}
d.sleeper.Sleep(sleepDuration, d.DoneC)
}
// 獲取本地賬本區(qū)塊高度
ledgerHeight, err := d.Ledger.LedgerHeight()
if err != nil {
d.Logger.Error("Did not return ledger height, something is critically wrong", err)
return
}
// 創(chuàng)建請求block的Envelope
seekInfoEnv, err := d.createSeekInfo(ledgerHeight)
if err != nil {
d.Logger.Error("Could not create a signed Deliver SeekInfo message, something is critically wrong", err)
return
}
// 請求連接orderer deliver服務(wù)
deliverClient, endpoint, cancel, err := d.connect(seekInfoEnv)
if err != nil {
d.Logger.Warningf("Could not connect to ordering service: %s", err)
failureCounter++
continue
}
connLogger := d.Logger.With("orderer-address", endpoint.Address)
recv := make(chan *orderer.DeliverResponse)
go func() {
for {
// 等待orderer deliver服務(wù)的區(qū)塊數(shù)據(jù)
resp, err := deliverClient.Recv()
if err != nil {
connLogger.Warningf("Encountered an error reading from deliver stream: %s", err)
close(recv)
return
}
select {
case recv <- resp:
case <-d.DoneC:
close(recv)
return
}
}
}()
RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the connection
for {
select {
case <-endpoint.Refreshed:
connLogger.Infof("Ordering endpoints have been refreshed, disconnecting from deliver to reconnect using updated endpoints")
break RecvLoop
// 等待recv通道消息您旁,block數(shù)據(jù)
case response, ok := <-recv:
if !ok {
connLogger.Warningf("Orderer hung up without sending status")
failureCounter++
break RecvLoop
}
// 處理消息
err = d.processMsg(response)
if err != nil {
connLogger.Warningf("Got error while attempting to receive blocks: %v", err)
failureCounter++
break RecvLoop
}
failureCounter = 0
case <-d.DoneC:
break RecvLoop
}
}
// cancel and wait for our spawned go routine to exit
cancel()
<-recv
}
}
- 通過grpc向orderer deliver服務(wù)獲取相應(yīng)高度的區(qū)塊
- 等待orderer deliver服務(wù)返回block數(shù)據(jù)
func (d *Deliverer) processMsg(msg *orderer.DeliverResponse) error {
switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
return errors.Errorf("received success for a seek that should never complete")
}
return errors.Errorf("received bad status %v from orderer", t.Status)
case *orderer.DeliverResponse_Block:
blockNum := t.Block.Header.Number
if err := d.BlockVerifier.VerifyBlock(gossipcommon.ChannelID(d.ChannelID), blockNum, t.Block); err != nil {
return errors.WithMessage(err, "block from orderer could not be verified")
}
marshaledBlock, err := proto.Marshal(t.Block)
if err != nil {
return errors.WithMessage(err, "block from orderer could not be re-marshaled")
}
// Create payload with a block received
payload := &gossip.Payload{
Data: marshaledBlock,
SeqNum: blockNum,
}
// Use payload to create gossip message
gossipMsg := &gossip.GossipMessage{
Nonce: 0,
Tag: gossip.GossipMessage_CHAN_AND_ORG,
Channel: []byte(d.ChannelID),
Content: &gossip.GossipMessage_DataMsg{
DataMsg: &gossip.DataMessage{
Payload: payload,
},
},
}
d.Logger.Debugf("Adding payload to local buffer, blockNum = [%d]", blockNum)
// Add payload to local state payloads buffer
// 增加區(qū)塊到payload buff
if err := d.Gossip.AddPayload(d.ChannelID, payload); err != nil {
d.Logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err)
return errors.WithMessage(err, "could not add block as payload")
}
// Gossip messages with other nodes
d.Logger.Debugf("Gossiping block [%d]", blockNum)
// 同步區(qū)塊到組織內(nèi)的其他peer節(jié)點(diǎn)
d.Gossip.Gossip(gossipMsg)
return nil
default:
d.Logger.Warningf("Received unknown: %v", t)
return errors.Errorf("unknown message type '%T'", msg.Type)
}
}
收到從orderer deliver返回的block數(shù)據(jù)低矮,然后處理block數(shù)據(jù)
- 把block數(shù)據(jù)push到payloadbuffer
- 同步區(qū)塊到組織內(nèi)的其他peer節(jié)點(diǎn)
orderer deliver流程代碼
orderer deliver grpc服務(wù)的入口函數(shù)Deliver位于orderer/common/server/server.go
// Deliver sends a stream of blocks to a client after ordering
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new Deliver handler")
defer func() {
if r := recover(); r != nil {
logger.Criticalf("Deliver client triggered panic: %s\n%s", r, debug.Stack())
}
logger.Debugf("Closing Deliver stream")
}()
// policyChecker策略檢查器方法,用于檢查消息是否滿足ChannelReaders(/Channel/Readers)通道讀權(quán)限策略
policyChecker := func(env *cb.Envelope, channelID string) error {
chain := s.GetChain(channelID)
if chain == nil {
return errors.Errorf("channel %s not found", channelID)
}
// In maintenance mode, we typically require the signature of /Channel/Orderer/Readers.
// This will block Deliver requests from peers (which normally satisfy /Channel/Readers).
sf := msgprocessor.NewSigFilter(policies.ChannelReaders, policies.ChannelOrdererReaders, chain)
return sf.Apply(env)
}
// 創(chuàng)建deliverServer類型被冒,Receiver消息追蹤器
deliverServer := &deliver.Server{
PolicyChecker: deliver.PolicyCheckerFunc(policyChecker),
Receiver: &deliverMsgTracer{
Receiver: srv,
msgTracer: msgTracer{
debug: s.debug,
function: "Deliver",
},
},
ResponseSender: &responseSender{
AtomicBroadcast_DeliverServer: srv,
},
}
return s.dh.Handle(srv.Context(), deliverServer)
}
接著看Handlecommon/deliver/deliver.go
// Handle receives incoming deliver requests.
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
addr := util.ExtractRemoteAddress(ctx)
logger.Debugf("Starting new deliver loop for %s", addr)
h.Metrics.StreamsOpened.Add(1)
defer h.Metrics.StreamsClosed.Add(1)
for {
logger.Debugf("Attempting to read seek info message from %s", addr)
envelope, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
// 從Orderer節(jié)點(diǎn)本地指定通道的區(qū)塊賬本中獲取請求的區(qū)塊數(shù)據(jù)
status, err := h.deliverBlocks(ctx, srv, envelope)
if err != nil {
return err
}
// 發(fā)送狀態(tài)回應(yīng)
err = srv.SendStatusResponse(status)
if status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
logger.Debugf("Waiting for new SeekInfo from %s", addr)
}
}
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
addr := util.ExtractRemoteAddress(ctx)
payload, chdr, shdr, err := h.parseEnvelope(ctx, envelope) // 解析消息負(fù)載payload,檢查消息負(fù)載頭和通道頭的合法性
if err != nil {
logger.Warningf("error parsing envelope from %s: %s", addr, err)
return cb.Status_BAD_REQUEST, nil
}
// 從多通道注冊管理器字典里獲取指定通道(ChainID)的鏈支持對象
chain := h.ChainManager.GetChain(chdr.ChannelId)
if chain == nil {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
return cb.Status_NOT_FOUND, nil
}
labels := []string{
"channel", chdr.ChannelId,
"filtered", strconv.FormatBool(isFiltered(srv)),
"data_type", srv.DataType(),
}
h.Metrics.RequestsReceived.With(labels...).Add(1)
defer func() {
labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
h.Metrics.RequestsCompleted.With(labels...).Add(1)
}()
seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err)
return cb.Status_BAD_REQUEST, nil
}
erroredChan := chain.Errored()
if seekInfo.ErrorResponse == ab.SeekInfo_BEST_EFFORT {
// In a 'best effort' delivery of blocks, we should ignore consenter errors
// and continue to deliver blocks according to the client's request.
erroredChan = nil
}
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Rejecting deliver request for %s because of consenter error", chdr.ChannelId, addr)
return cb.Status_SERVICE_UNAVAILABLE, nil
default:
}
// 構(gòu)建訪問控制對象accessControl轮蜕,封裝一些信息
accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, h.ExpirationCheckFunc)
if err != nil {
logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
return cb.Status_BAD_REQUEST, nil
}
// 檢查當(dāng)前信息是否滿足指定通道的訪問權(quán)限昨悼,檢查證書時間是否過期
if err := accessControl.Evaluate(); err != nil {
logger.Warningf("[channel: %s] Client %s is not authorized: %s", chdr.ChannelId, addr, err)
return cb.Status_FORBIDDEN, nil
}
// 區(qū)塊搜索信息seekInfo,檢查請求范圍是否合法
if seekInfo.Start == nil || seekInfo.Stop == nil {
logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop)
return cb.Status_BAD_REQUEST, nil
}
logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
// when seeking only the newest block (i.e. starting
// and stopping at newest), don't reevaluate the ledger
// height as this can lead to multiple blocks being
// sent when only one is expected
if proto.Equal(seekInfo.Start, seekInfo.Stop) {
stopNum = number
break
}
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
return cb.Status_BAD_REQUEST, nil
}
}
for {
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()-1 {
return cb.Status_NOT_FOUND, nil
}
}
var block *cb.Block
var status cb.Status
iterCh := make(chan struct{})
go func() {
block, status = cursor.Next() // 獲取需要讀取區(qū)塊的游標(biāo)cursor跃洛,會阻塞率触,等待下一個block的生成
close(iterCh)
}()
select {
case <-ctx.Done(): // peer主節(jié)點(diǎn)斷開連接
logger.Debugf("Context canceled, aborting wait for next block")
return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
case <-erroredChan:
// TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports
// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
return cb.Status_SERVICE_UNAVAILABLE, nil
case <-iterCh: // 檢查到有新block
// Iterator has set the block and status vars
}
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return status, nil
}
// increment block number to support FAIL_IF_NOT_READY deliver behavior
number++
// 再次檢查是否滿足訪問控制策略
if err := accessControl.Evaluate(); err != nil {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
return cb.Status_FORBIDDEN, nil
}
logger.Debugf("[channel: %s] Delivering block [%d] for (%p) for %s", chdr.ChannelId, block.Header.Number, seekInfo, addr)
signedData := &protoutil.SignedData{Data: envelope.Payload, Identity: shdr.Creator, Signature: envelope.Signature}
// 發(fā)送區(qū)塊數(shù)據(jù)
if err := srv.SendBlockResponse(block, chdr.ChannelId, chain, signedData); err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return cb.Status_INTERNAL_SERVER_ERROR, err
}
h.Metrics.BlocksSent.With(labels...).Add(1)
if stopNum == block.Header.Number {
break
}
}
logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
return cb.Status_SUCCESS, nil
}
值得注意的是這里,如果是拉取n+1高度區(qū)塊的話(當(dāng)前是區(qū)塊高度n)汇竭,cursor.Next()
會一直等待新的區(qū)塊才返回
go func() {
block, status = cursor.Next() // 獲取需要讀取區(qū)塊的游標(biāo)cursor葱蝗,會阻塞穴张,等待下一個block的生成
close(iterCh)
}()
參考:
菜鳥系列 Fabric 源碼學(xué)習(xí) - 區(qū)塊同步
技術(shù)指南:Fabric中數(shù)據(jù)同步的實(shí)現(xiàn)
菜鳥系列Fabric源碼學(xué)習(xí) — 區(qū)塊同步
Fabric v2.0 源碼解析——典型的業(yè)務(wù)流程