Kubeedge-edgecontroller 模塊源碼分析

概述

Edgecontroller 是 Kubernetes apiserver 與 Edgecore 之間的橋梁氯迂,負(fù)責(zé)兩者間的通信與協(xié)議轉(zhuǎn)換扯旷。

其核心為:

  • UpstreamController(上行消息控制器):負(fù)責(zé)從邊緣訂閱消息并同步到 kubernetes api server耸棒。
  • DownstreamController(下行消息控制器):負(fù)責(zé)抓取 kubernetes api server 更改并將更改發(fā)送到邊緣。

結(jié)構(gòu)

EdgeController.png

EdgeController

對(duì)于 EdgeController 而言亡问,它所做的事情非常簡(jiǎn)單汞幢,它主要就是將自己注冊(cè)到 Core 中,并且使用 config 實(shí)例化 <font color="#ff0000">UpstreamController</font>舰罚,<font color="#ff0000">DownstreamController</font> 纽门。然后在 Core 啟動(dòng)該模塊時(shí)啟動(dòng) UpstreamController 與 DownstreamController。

Register

  1. 根據(jù)配置實(shí)例化上行控制器--UpstreamController
  2. 根據(jù)配置實(shí)例化下行控制器--DownstreamController
func Register(ec *v1alpha1.EdgeController) {  
   core.Register(newEdgeController(ec))  
}

func newEdgeController(config *v1alpha1.EdgeController) *EdgeController {  
   ec := &EdgeController{config: *config}  
   if !ec.Enable() {  
      return ec  
   }  
   var err error  
   ec.upstream, err = controller.NewUpstreamController(config, informers.GetInformersManager().GetK8sInformerFactory())  
   ... 
   ec.downstream, err = controller.NewDownstreamController(config, informers.GetInformersManager().GetK8sInformerFactory(), informers.GetInformersManager(), informers.GetInformersManager().GetCRDInformerFactory())  
   ... 
   return ec  
}

Start

  1. 啟動(dòng)上行控制器
  2. 啟動(dòng)下行控制器
func (ec *EdgeController) Start() {  
   if err := ec.upstream.Start(); err != nil {  
      klog.Exitf("start upstream failed with error: %s", err)  
   }  
  
   if err := ec.downstream.Start(); err != nil {  
      klog.Exitf("start downstream failed with error: %s", err)  
   }  
}

UpstreamController

對(duì)于上下控制器而言营罢,它的主要作用就是訂閱邊緣的消息赏陵,然后進(jìn)行協(xié)議轉(zhuǎn)換和處理后,發(fā)送給 k8s apiserver饲漾。

創(chuàng)建上行控制器實(shí)例---NewUpstreamController

NewUpstreamController 方法主要作用就是創(chuàng)建一個(gè) UpstreamController 實(shí)例瘟滨,其主要流程如下:

  1. 獲取 k8s 相關(guān)客戶端
  2. 獲取消息傳輸層實(shí)例
  3. 獲取各類資源Lister---Node,Pod能颁,ConfigMap杂瘸,Secret,Lease
  4. 創(chuàng)建各類資源數(shù)據(jù)的處理通道
func NewUpstreamController(config *v1alpha1.EdgeController, factory k8sinformer.SharedInformerFactory) (*UpstreamController, error) {  
   uc := &UpstreamController{  
      kubeClient:   client.GetKubeClient(),  
      messageLayer: messagelayer.EdgeControllerMessageLayer(),  
      crdClient:    client.GetCRDClient(),  
      config:       *config,  
   }
   //獲取各類資源Lister
   uc.nodeLister = factory.Core().V1().Nodes().Lister()  
   ... 
   //創(chuàng)建各類資源數(shù)據(jù)的處理通道
   uc.nodeStatusChan = make(chan model.Message, config.Buffer.UpdateNodeStatus) 
   uc.podStatusChan = make(chan model.Message, config.Buffer.UpdatePodStatus) 
   ...
   return uc, nil  
}

啟動(dòng)控制器---Start

Start 的主要作用就是啟動(dòng) UpstreamController 實(shí)例伙菊,開始接收來(lái)自消息層的消息败玉,然后分發(fā)給各個(gè)函數(shù)處理敌土,其主要流程如下:

  1. 開協(xié)程進(jìn)行消息分發(fā)處理---dispatchMessage
  2. 根據(jù)配置開協(xié)程處理各類資源消息
    1. updateNodeStatus
    2. updatePodStatus
    3. queryConfigMap
    4. querySecret
    5. processServiceAccountToken
    6. ... --- 太多了,不一一例舉了
func (uc *UpstreamController) Start() error {  
   klog.Info("start upstream controller")  
   //開協(xié)程進(jìn)行消息分發(fā)處理
   go uc.dispatchMessage()  
   //根據(jù)配置開協(xié)程處理各類資源消息
   for i := 0; i < int(uc.config.Load.UpdateNodeStatusWorkers); i++ {  
      go uc.updateNodeStatus()  
   }  
   for i := 0; i < int(uc.config.Load.UpdatePodStatusWorkers); i++ {  
      go uc.updatePodStatus()  
   }  
   ...
   return nil  
}

分發(fā)消息---dispatchMessage

dispatchMessage 主要用于從消息層接收消息运翼,并根據(jù)資源類型的不同返干,通過(guò)不同的通道將消息分發(fā)不同的處理函數(shù)。其主要流程如下:

  1. 開啟循環(huán)
  2. 接收到結(jié)束信號(hào)就退出循環(huán)
  3. 接收到消息就獲取并判斷資源類型
  4. 根據(jù)資源類型和操作將消息通過(guò)不同的通道將消息分發(fā)不同的處理函數(shù)

資源類型

const (  
   ResourceTypePod                 = "pod"  
   ResourceTypeConfigmap           = "configmap"  
   ResourceTypeServiceAccountToken = "serviceaccounttoken"  
   ResourceTypeSecret              = "secret"  
   ResourceTypeNode                = "node"  
   ResourceTypePodlist             = "podlist"  
   ResourceTypePodStatus           = "podstatus"  
   ResourceTypePodPatch            = "podpatch"  
   ResourceTypeNodeStatus          = "nodestatus"  
   ResourceTypeNodePatch           = "nodepatch"  
   ResourceTypeRule                = "rule"  
   ResourceTypeRuleEndpoint        = "ruleendpoint"  
   ResourceTypeRuleStatus          = "rulestatus"  
   ResourceTypeLease               = "lease"  
)

操作類型

const (  
   InsertOperation        = "insert"  
   DeleteOperation        = "delete"  
   QueryOperation         = "query"  
   UpdateOperation        = "update"  
   PatchOperation         = "patch"  
   UploadOperation        = "upload"  
   ResponseOperation      = "response"  
   ResponseErrorOperation = "error"  
)

方法

func (uc *UpstreamController) dispatchMessage() {  
   for {  
      select { 
      // 接收到結(jié)束信號(hào)就退出循環(huán)
      case <-beehiveContext.Done():  
         return  
      default:  
      }  
      //接收到消息
      msg, err := uc.messageLayer.Receive()  
      if err != nil {  
         continue  
      }  
      //獲取資源類型
      resourceType, err := messagelayer.GetResourceType(msg)  
      if err != nil {   
         continue  
      }
      //判斷資源類型  
      //根據(jù)資源類型和操作將消息通過(guò)不同的通道將消息分發(fā)不同的處理函數(shù)
      switch resourceType {  
      case model.ResourceTypeNodeStatus:  
         uc.nodeStatusChan <- msg  
      case model.ResourceTypePodStatus:  
         uc.podStatusChan <- msg  
      case model.ResourceTypeConfigmap:  
         uc.configMapChan <- msg  
      case model.ResourceTypeSecret:  
         uc.secretChan <- msg  
      case model.ResourceTypeServiceAccountToken:  
         uc.serviceAccountTokenChan <- msg  
      case common.ResourceTypePersistentVolume:  
         uc.persistentVolumeChan <- msg  
      case common.ResourceTypePersistentVolumeClaim:  
         uc.persistentVolumeClaimChan <- msg  
      case common.ResourceTypeVolumeAttachment:  
         uc.volumeAttachmentChan <- msg  
      case model.ResourceTypeNode:  
         switch msg.GetOperation() {  
         case model.InsertOperation:  
            uc.createNodeChan <- msg  
         case model.QueryOperation:  
            uc.queryNodeChan <- msg  
         case model.UpdateOperation:  
            uc.updateNodeChan <- msg  
         default:  
         }  
      case model.ResourceTypeNodePatch:  
         uc.patchNodeChan <- msg  
      case model.ResourceTypePodPatch:  
         uc.patchPodChan <- msg  
      case model.ResourceTypePod:  
         if msg.GetOperation() == model.DeleteOperation {  
            uc.podDeleteChan <- msg  
         } 
      case model.ResourceTypeRuleStatus:  
         uc.ruleStatusChan <- msg  
      case model.ResourceTypeLease:  
         switch msg.GetOperation() {  
         case model.InsertOperation, model.UpdateOperation:  
            uc.createLeaseChan <- msg  
         case model.QueryOperation:  
            uc.queryLeaseChan <- msg  
         }  
      default:  
      }  
   }  
}

資源處理函數(shù)

大部分資源處理函數(shù)都只是將消息進(jìn)行轉(zhuǎn)換后血淌,進(jìn)行了簡(jiǎn)單的判斷矩欠,然后就將實(shí)際的資源根據(jù)請(qǐng)求的資源類型和操作去請(qǐng)求 k8s apiserver 了。在這里我就不過(guò)多敘述了悠夯,只舉一個(gè)例子給大家看就懂了癌淮。

常規(guī)資源處理

常規(guī)資源請(qǐng)求方法--kubeClientGet

這個(gè)方法主要用于去請(qǐng)求一些常規(guī)的資源,在控制器中沦补,被多個(gè)地方調(diào)用它的上層封裝函數(shù) queryInner乳蓄。它的主要流程就非常簡(jiǎn)單:

  1. 判斷請(qǐng)求的什么資源
  2. 請(qǐng)求k8s apiserver 獲取資源
  3. 返回資源和錯(cuò)誤信息
func kubeClientGet(uc *UpstreamController, namespace string, name string, queryType string, msg model.Message) (metaV1.Object, error) {  
   var obj metaV1.Object  
   var err error  
   switch queryType {  
   case model.ResourceTypeConfigmap:  
      obj, err = uc.configMapLister.ConfigMaps(namespace).Get(name)  
   case model.ResourceTypeSecret:  
      obj, err = uc.secretLister.Secrets(namespace).Get(name)  
   case common.ResourceTypePersistentVolume:  
      obj, err = uc.kubeClient.CoreV1().PersistentVolumes().Get(context.Background(), name, metaV1.GetOptions{})  
   case common.ResourceTypePersistentVolumeClaim:  
      obj, err = uc.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.Background(), name, metaV1.GetOptions{})  
   case common.ResourceTypeVolumeAttachment:  
      obj, err = uc.kubeClient.StorageV1().VolumeAttachments().Get(context.Background(), name, metaV1.GetOptions{})  
   case model.ResourceTypeNode:  
      obj, err = uc.nodeLister.Get(name)  
   case model.ResourceTypeServiceAccountToken:  
      obj, err = uc.getServiceAccountToken(namespace, name, msg)  
   case model.ResourceTypeLease:  
      obj, err = uc.leaseLister.Leases(namespace).Get(name)  
   default:  
      err = stderrors.New("wrong query type")  
   }  
   if err != nil {  
      return nil, err  
   }  
   if err := util.SetMetaType(obj.(runtime.Object)); err != nil {  
      return nil, err  
   }  
   return obj, nil  
}

資源請(qǐng)求方法--queryInner

它的主要作用是解析消息本體,獲取消息中包含的 k8s 資源信息夕膀,然后根據(jù)這些信息去調(diào)用下層的常規(guī)資源請(qǐng)求方法 kubeClientGet 去獲取資源數(shù)據(jù),請(qǐng)求成功就再次通過(guò)消息層將請(qǐng)求的數(shù)據(jù)包裝成私有協(xié)議后發(fā)送出去虚倒。其主要流程如下:

  1. 解析消息中包含的 k8s 資源信息
  2. 判斷是不是請(qǐng)求操作
  3. 是就調(diào)用下層的常規(guī)資源請(qǐng)求方法 kubeClientGet 獲取資源數(shù)據(jù)
  4. 獲取成功就調(diào)用消息層包裝資源數(shù)據(jù)成私有協(xié)議
  5. 調(diào)用消息層發(fā)送包裝后的消息
func queryInner(uc *UpstreamController, msg model.Message, queryType string) {  
   //解析消息中包含的 k8s 資源信息
   namespace, err := messagelayer.GetNamespace(msg)  
   if err != nil {  
      return  
   }  
   name, err := messagelayer.GetResourceName(msg)  
   if err != nil {  
      return  
   }  
   //判斷是不是請(qǐng)求操作
   switch msg.GetOperation() {  
   case model.QueryOperation: 
      // 調(diào)用下層的常規(guī)資源請(qǐng)求方法 kubeClientGet 獲取資源數(shù)據(jù)
      object, err := kubeClientGet(uc, namespace, name, queryType, msg)  
      if errors.IsNotFound(err) {  
         return  
      }  
      if err != nil {  
         return  
      }  
  
      nodeID, err := messagelayer.GetNodeID(msg)  
      if err != nil {  
         return  
      }  
      //調(diào)用消息層包裝資源數(shù)據(jù)成私有協(xié)議
      resource, err := messagelayer.BuildResource(nodeID, namespace, queryType, name)  
      if err != nil {  
         return  
      }  
      //調(diào)用消息層發(fā)送包裝后的消息
      resMsg := model.NewMessage(msg.GetID()).  
         SetResourceVersion(object.GetResourceVersion()).  
         FillBody(object).  
         BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)  
      err = uc.messageLayer.Response(*resMsg)  
      if err != nil {  
         return  
      }   
   default:  
   }  
}

例子:查詢ConfigMap --- queryConfigMap

主要流程如下:

  1. 開啟循環(huán)
  2. 從通道接收消息
  3. 如果是結(jié)束通道傳來(lái)的消息就退出函數(shù)
  4. 如果是消息通道傳來(lái)的消息,就接收消息产舞,然后交給資源請(qǐng)求方法(queryInner)處理魂奥。
func (uc *UpstreamController) queryConfigMap() {  
   for {  
      select {  
      case <-beehiveContext.Done():  
         return  
      case msg := <-uc.configMapChan:  
         queryInner(uc, msg, model.ResourceTypeConfigmap)  
      }  
   }  
}

更新節(jié)點(diǎn)狀態(tài) --- updateNodeStatus(已棄用)

主要作用就是更新節(jié)點(diǎn)的狀態(tài),其流程如下:

接收退出信號(hào)和消息信號(hào)就不再過(guò)多贅述了易猫,直接介紹處理流程耻煤。

  1. 獲取 k8s 資源相關(guān)的消息(命名空間,資源名稱等)
  2. 判斷需要對(duì)消息進(jìn)行的操作
  3. 新增操作:
    1. 先從 k8s apiserver 查詢?cè)摴?jié)點(diǎn)資源擦囊,查詢到就返回資源已存在,不用創(chuàng)建
    2. 查詢錯(cuò)誤如果不是資源不存在就返回錯(cuò)誤消息
    3. 該節(jié)點(diǎn)資源不存在就調(diào)用 k8s client 創(chuàng)建節(jié)點(diǎn)資源
    4. 創(chuàng)建失敗就返回失敗消息嘴办,成功就返回成功消息
  4. 更新操作:
    1. 先從 k8s apiserver 查詢?cè)摴?jié)點(diǎn)資源瞬场,不存在就打印錯(cuò)誤消息
    2. 存在就更新資源狀態(tài)里的心跳時(shí)間信息
    3. 為節(jié)點(diǎn)資源設(shè)置Annotations
    4. 更新資源狀態(tài)信息
    5. 調(diào)用 k8s client 更新節(jié)點(diǎn)狀態(tài)信息
    6. 調(diào)用消息層返回更新節(jié)點(diǎn)資源成功的消息
func (uc *UpstreamController) updateNodeStatus() {  
   for {  
      select {  
      case <-beehiveContext.Done():  
         return  
      case msg := <-uc.nodeStatusChan:  
         data, err := msg.GetContentData()  
         if err != nil {  
            continue  
         }  
         namespace, err := messagelayer.GetNamespace(msg)  
         if err != nil {  
            continue  
         }  
         name, err := messagelayer.GetResourceName(msg)  
         if err != nil {  
            continue  
         }  
  
         switch msg.GetOperation() {  
         case model.InsertOperation:  
            _, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})  
            if err == nil {   
               uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)  
               continue  
            }  
  
            if !errors.IsNotFound(err) {  
               uc.nodeMsgResponse(name, namespace, errLog, msg)  
               continue  
            }  
  
            node := &v1.Node{}  
            err = json.Unmarshal(data, node)  
            if err != nil {  
               uc.nodeMsgResponse(name, namespace, errLog, msg)  
               continue  
            }  
  
            if _, err = uc.createNode(name, node); err != nil {  
               uc.nodeMsgResponse(name, namespace, errLog, msg)  
               continue  
            }  
  
            uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)  
  
         case model.UpdateOperation:  
            nodeStatusRequest := &edgeapi.NodeStatusRequest{}  
            err := json.Unmarshal(data, nodeStatusRequest)  
            if err != nil {  
               continue  
            }  
  
            getNode, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})  
            if errors.IsNotFound(err) {  
               continue  
            }  
  
            if err != nil {  
               continue  
            }  
            for i := range nodeStatusRequest.Status.Conditions {  
               if time.Since(nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime.Time) > time.Duration(uc.config.NodeUpdateFrequency)*time.Second {  
                  nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime = metaV1.NewTime(time.Now())  
               }  
            }  
  
            if getNode.Annotations == nil {  
               getNode.Annotations = make(map[string]string)  
            }  
            for name, v := range nodeStatusRequest.ExtendResources {  
               if name == constants.NvidiaGPUScalarResourceName {  
                  var gpuStatus []types.NvidiaGPUStatus  
                  for _, er := range v {  
                     gpuStatus = append(gpuStatus, types.NvidiaGPUStatus{ID: er.Name, Healthy: true})  
                  }  
                  if len(gpuStatus) > 0 {  
                     data, _ := json.Marshal(gpuStatus)  
                     getNode.Annotations[constants.NvidiaGPUStatusAnnotationKey] = string(data)  
                  }  
               }  
               data, err := json.Marshal(v)  
               if err != nil {  
                  continue  
               }  
               getNode.Annotations[string(name)] = string(data)  
            }  
  
            if getNode.Status.DaemonEndpoints.KubeletEndpoint.Port != 0 {  
               nodeStatusRequest.Status.DaemonEndpoints.KubeletEndpoint.Port = getNode.Status.DaemonEndpoints.KubeletEndpoint.Port  
            }  
  
            getNode.Status = nodeStatusRequest.Status  
  
            node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metaV1.UpdateOptions{})  
            if err != nil {  
               continue  
            }  
  
            nodeID, err := messagelayer.GetNodeID(msg)  
            if err != nil {  
               continue  
            }  
  
            resource, err := messagelayer.BuildResource(nodeID, namespace, model.ResourceTypeNode, name)  
            if err != nil {  
               continue  
            }  
  
            resMsg := model.NewMessage(msg.GetID()).  
               SetResourceVersion(node.ResourceVersion).  
               FillBody(common.MessageSuccessfulContent).  
               BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)  
            if err = uc.messageLayer.Response(*resMsg); err != nil { 
               continue  
            }  
  
         default:  
            continue  
         }  
      }  
   }  
}

其他的上行消息處理都是比較簡(jiǎn)單的,我在這里就不一一例舉了涧郊,其大概的處理流程如下:

  1. 獲取信息
  2. 判斷操作什么資源
  3. 判斷對(duì)資源需要做什么操作
  4. 進(jìn)行一定的處理
  5. 根據(jù)對(duì)應(yīng)的資源類型和操作類型請(qǐng)求 k8s api server
  6. 如果需要響應(yīng)請(qǐng)求贯被,就調(diào)用消息層構(gòu)建并響應(yīng)消息。

DownstreamController

DownstreamController 主要用于接收 k8s apiserver 的變更事件信息妆艘,然后經(jīng)過(guò)協(xié)議轉(zhuǎn)換彤灶,發(fā)送給邊緣。

創(chuàng)建下行控制器實(shí)例---NewDownstreamController

NewDownstreamController 主要作用是創(chuàng)建一個(gè) DownstreamController 實(shí)例批旺,其主要流程如下:

  1. 獲取一個(gè)本地緩存實(shí)例存儲(chǔ)節(jié)點(diǎn)和 ConfigMap,Secret 間的關(guān)系
  2. 獲取 k8s client
  3. 獲取各類 manager 管理各類資源的變化事件
  4. 獲取消息層用于發(fā)送消息給邊緣
  5. 初始化本地緩存
func NewDownstreamController(config *v1alpha1.EdgeController, k8sInformerFactory k8sinformers.SharedInformerFactory, keInformerFactory informers.KubeEdgeCustomInformer,  
   crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) {  
   //獲取一個(gè)本地緩存實(shí)例
   lc := &manager.LocationCache{}  
   //獲取各類 manager 管理各類資源的變化事件
   podInformer := k8sInformerFactory.Core().V1().Pods()  
   podManager, err := manager.NewPodManager(config, podInformer.Informer())  
   if err != nil {    
      return nil, err  
   }  
  
   configMapInformer := k8sInformerFactory.Core().V1().ConfigMaps()  
   configMapManager, err := manager.NewConfigMapManager(config, configMapInformer.Informer())  
   if err != nil {   
      return nil, err  
   }  
  
   secretInformer := k8sInformerFactory.Core().V1().Secrets()  
   secretManager, err := manager.NewSecretManager(config, secretInformer.Informer())  
   if err != nil {   
      return nil, err  
   }  
   nodeInformer := keInformerFactory.EdgeNode()  
   nodesManager, err := manager.NewNodesManager(nodeInformer)  
   if err != nil {    
      return nil, err  
   }  
  
   rulesInformer := crdInformerFactory.Rules().V1().Rules().Informer()  
   rulesManager, err := manager.NewRuleManager(config, rulesInformer)  
   if err != nil {   
      return nil, err  
   }  
  
   ruleEndpointsInformer := crdInformerFactory.Rules().V1().RuleEndpoints().Informer()  
   ruleEndpointsManager, err := manager.NewRuleEndpointManager(config, ruleEndpointsInformer)  
   if err != nil { 
      return nil, err  
   }  
  
   dc := &DownstreamController{  
      //獲取 k8s client
      kubeClient:           client.GetKubeClient(),  
      podManager:           podManager,  
      configmapManager:     configMapManager,  
      secretManager:        secretManager,  
      nodeManager:          nodesManager,  
      //獲取消息層用于發(fā)送消息給邊緣
      messageLayer:         messagelayer.EdgeControllerMessageLayer(),  
      lc:                   lc,  
      podLister:            podInformer.Lister(),  
      rulesManager:         rulesManager,  
      ruleEndpointsManager: ruleEndpointsManager,  
   }  
   //初始化本地緩存
   if err := dc.initLocating(); err != nil {  
      return nil, err  
   }  
  
   return dc, nil  
}

啟動(dòng)控制器---Start

下行控制器的啟動(dòng)和上行控制器的啟動(dòng)一樣非常簡(jiǎn)單幌陕,就是開啟協(xié)程處理各類事件:

func (dc *DownstreamController) Start() error {  
   klog.Info("start downstream controller")  
   // pod  
   go dc.syncPod()  
  
   // configmap  
   go dc.syncConfigMap()  
  
   // secret  
   go dc.syncSecret()  
  
   // nodes  
   go dc.syncEdgeNodes()  
  
   // rule  
   go dc.syncRule()  
  
   // ruleendpoint  
   go dc.syncRuleEndpoint()  
  
   return nil  
}

本地緩存 --- LocationCache

在上游控制器中,我們提到過(guò) LocationCache 的作用主要就是用于本地緩存汽煮,緩存節(jié)點(diǎn)和 ConfigMap,Secret 間的關(guān)系搏熄,這樣做主要是為了能給在 ConfigMap 和 Secret 發(fā)生變更是能夠快速知道有哪些節(jié)點(diǎn)在使用棚唆,然后下發(fā)給對(duì)應(yīng)節(jié)點(diǎn)。其數(shù)據(jù)結(jié)構(gòu)如下:

type LocationCache struct {  
   //EdgeNodes 用于存儲(chǔ)邊緣 nodeName心例,把Map用作了Set
   EdgeNodes sync.Map  
   // configMapNode 用于儲(chǔ)存 node 和 ConfigMap 間的關(guān)系宵凌,
   // key 是 namespace/configMapName, value 是 nodeName   
   configMapNode sync.Map  
   // configMapNode 用于儲(chǔ)存 node 和 secret 間的關(guān)系,
   // key 是 namespace/configMapName, value 是 nodeName 
   secretNode sync.Map  
}

我們接下來(lái)看看它的幾個(gè)關(guān)鍵的處理方法:

初始化 --- initLocating

initLocating 函數(shù)用于下行控制器創(chuàng)建的時(shí)候初始化本地緩存這個(gè)實(shí)例止后,在初始化的時(shí)候瞎惫,其主要流程如下:

  1. 創(chuàng)建一個(gè)用于獲取邊緣節(jié)點(diǎn)的labelSelector
  2. 從 k8s client 查詢節(jié)點(diǎn)數(shù)據(jù)
  3. 更新本地緩存的邊緣節(jié)點(diǎn)信息
  4. 查詢 k8s 集群 所有 pod信息
  5. 判斷 pod 是不是邊緣節(jié)點(diǎn)的 pod
  6. 是就使用 pod 的里的信息更新本地緩存
func (dc *DownstreamController) initLocating() error { 
   //創(chuàng)建一個(gè)用于獲取邊緣節(jié)點(diǎn)的labelSelector
   set := labels.Set{commonconstants.EdgeNodeRoleKey: commonconstants.EdgeNodeRoleValue}  
   selector := labels.SelectorFromSet(set)  
   //從 k8s client 查詢節(jié)點(diǎn)數(shù)據(jù)
   nodes, err := dc.kubeClient.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{LabelSelector: selector.String()})  
   if err != nil {  
      return err  
   } 
   //更新本地緩存的邊緣節(jié)點(diǎn)信息
   for _, node := range nodes.Items {  
      dc.lc.UpdateEdgeNode(node.ObjectMeta.Name)  
   }  
   //查詢 k8s 集群 所有 pod信息
   pods, err := dc.kubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{})  
   if err != nil {  
      return err  
   }  
   for _, p := range pods.Items {  
      //判斷 pod 是不是邊緣節(jié)點(diǎn)的 pod
      if dc.lc.IsEdgeNode(p.Spec.NodeName) {
         //是就使用 pod 的里的信息更新本地緩存 
         dc.lc.AddOrUpdatePod(p)  
      }  
   }  
  
   return nil  
}

緩存邊緣節(jié)點(diǎn)信息 --- UpdateEdgeNode

緩存節(jié)點(diǎn)信息則是直接將節(jié)點(diǎn)名稱存入 map 。

func (lc *LocationCache) UpdateEdgeNode(nodeName string) {  
   lc.EdgeNodes.Store(nodeName, struct{}{})  
}

判斷節(jié)點(diǎn)是否是邊緣節(jié)點(diǎn) --- IsEdgeNode

對(duì)應(yīng)判斷是否是邊緣節(jié)點(diǎn)译株,則直接查詢 EdgeNodes 這個(gè) map瓜喇,如果能夠查出來(lái)就說(shuō)明是邊緣節(jié)點(diǎn),如果不能古戴,就說(shuō)明不是欠橘。

為什么可以這樣判斷?

因?yàn)槲覀兂跏蓟镜鼐彺娴臅r(shí)候就將所有的邊緣節(jié)點(diǎn)查詢出來(lái)加入了 EdgeNodes 這個(gè)map现恼。

func (lc *LocationCache) IsEdgeNode(nodeName string) bool {  
   _, ok := lc.EdgeNodes.Load(nodeName)  
   return ok  
}

根據(jù) Pod 添加或更新緩存信息 --- AddOrUpdatePod

AddOrUpdatePod 方法主要作用是將 Pod 用到的 ConfigMap 和 Secret 信息從 Pod 中拿出來(lái)進(jìn)行緩存肃续,這樣的話,我們就能在 ConfigMap 或 Secret 發(fā)生變更的時(shí)候及時(shí)通知給對(duì)應(yīng)邊緣叉袍。

主要流程如下:

  1. 從 pod 中獲取用到的 ConfigMaps 和 Secrets.
  2. 判斷 configMapNode 中有沒(méi)有對(duì)應(yīng)的 configMapKey 和 nodeName
  3. 沒(méi)有就將缺失的數(shù)據(jù)將入進(jìn)去始锚,有就忽略。
  4. 判斷 secretNode 中有沒(méi)有對(duì)應(yīng)的 secretKey 和 nodeName
  5. 沒(méi)有就將缺失的數(shù)據(jù)將入進(jìn)去喳逛,有就忽略瞧捌。
func (lc *LocationCache) AddOrUpdatePod(pod v1.Pod) {  
   configMaps, secrets := lc.PodConfigMapsAndSecrets(pod)  
   for _, c := range configMaps {  
      configMapKey := fmt.Sprintf("%s/%s", pod.Namespace, c)  
      // update configMapPod  
      value, ok := lc.configMapNode.Load(configMapKey)  
      var newNodes []string  
      if ok {  
         nodes, _ := value.([]string)  
         newNodes = lc.newNodes(nodes, pod.Spec.NodeName)  
      } else {  
         newNodes = []string{pod.Spec.NodeName}  
      }  
      lc.configMapNode.Store(configMapKey, newNodes)  
   }  
  
   for _, s := range secrets {  
      secretKey := fmt.Sprintf("%s/%s", pod.Namespace, s)  
      // update secretPod  
      value, ok := lc.secretNode.Load(secretKey)  
      var newNodes []string  
      if ok {  
         nodes, _ := value.([]string)  
         newNodes = lc.newNodes(nodes, pod.Spec.NodeName)  
      } else {  
         newNodes = []string{pod.Spec.NodeName}  
      }  
      lc.secretNode.Store(secretKey, newNodes)  
   }  
}

從 Pod 中獲取 ConfigMaps & Secrets --- PodConfigMapsAndSecrets

該函數(shù)主要用于從 Pod 中獲取該 Pod 用到的ConfigMaps & Secrets。 其主要流程如下:

  1. 遍歷 pod 的 Volumes 獲取 ConfigMaps & Secrets
  2. 遍歷 pod 的 Containers 的 envs 獲取 ConfigMaps & Secrets
  3. 遍歷 pod 的 ImagePullSecrets 獲取 Secrets
func (lc *LocationCache) PodConfigMapsAndSecrets(pod v1.Pod) (configMaps, secrets []string) {  
   for _, v := range pod.Spec.Volumes {  
      if v.ConfigMap != nil {  
         configMaps = append(configMaps, v.ConfigMap.Name)  
      }  
      if v.Secret != nil {  
         secrets = append(secrets, v.Secret.SecretName)  
      }  
      if v.Projected != nil {  
         for _, source := range v.Projected.Sources {  
            switch {  
            case source.ConfigMap != nil:  
               configMaps = append(configMaps, source.ConfigMap.Name)  
            case source.Secret != nil:  
               secrets = append(secrets, source.Secret.Name)  
            }  
         }  
      }  
   }  
   // used by envs  
   for _, s := range pod.Spec.Containers {  
      for _, ef := range s.EnvFrom {  
         if ef.ConfigMapRef != nil {  
            configMaps = append(configMaps, ef.ConfigMapRef.Name)  
         }  
         if ef.SecretRef != nil {  
            secrets = append(secrets, ef.SecretRef.Name)  
         }  
      }  
      for _, e := range s.Env {  
         if e.ValueFrom == nil {  
            continue  
         }  
  
         if e.ValueFrom.ConfigMapKeyRef != nil {  
            configMaps = append(configMaps, e.ValueFrom.ConfigMapKeyRef.Name)  
         } else if e.ValueFrom.SecretKeyRef != nil {  
            secrets = append(secrets, e.ValueFrom.SecretKeyRef.Name)  
         }  
      }  
   }  
   // used by ImagePullSecrets  
   for _, s := range pod.Spec.ImagePullSecrets {  
      secrets = append(secrets, s.Name)  
   }  
   return  
}

公共事件管理器 --- CommonResourceEventHandler

公共事件管理器润文,主要用來(lái)接收來(lái)自 k8s apiserver informer 的事件消息姐呐,然后將該事件消息通過(guò)通道傳遞出去。

其數(shù)據(jù)結(jié)構(gòu)如下:

type CommonResourceEventHandler struct {  
   events chan watch.Event  
}

它實(shí)現(xiàn)了 cache 包的 ResourceEventHandler 接口用來(lái)接收事件通知典蝌。

type ResourceEventHandler interface {  
   OnAdd(obj interface{})  
   OnUpdate(oldObj, newObj interface{})  
   OnDelete(obj interface{})  
}

func (c *CommonResourceEventHandler) OnAdd(obj interface{}) {  
   c.obj2Event(watch.Added, obj)  
}  
func (c *CommonResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {  
   c.obj2Event(watch.Modified, newObj)  
}  
func (c *CommonResourceEventHandler) OnDelete(obj interface{}) {  
   c.obj2Event(watch.Deleted, obj)  
}

func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) {  
   eventObj, ok := obj.(runtime.Object)  
   if !ok {  
      klog.Warningf("unknown type: %T, ignore", obj)  
      return  
   } 
   err := util.SetMetaType(eventObj)  
   if err != nil {  
      klog.Warningf("failed to set meta type :%v", err)  
   }  
   c.events <- watch.Event{Type: t, Object: eventObj}  
}

定義了 Manager 接口用來(lái)發(fā)送消息曙砂。用來(lái)傳遞事件消息。

type Manager interface {  
   Events() chan watch.Event  
}

創(chuàng)建事件管理器

func NewCommonResourceEventHandler(events chan watch.Event) *CommonResourceEventHandler {  
   return &CommonResourceEventHandler{events: events}  
}

節(jié)點(diǎn)事件管理

節(jié)點(diǎn)事件管理器實(shí)現(xiàn)了公共事件管理器的接口骏掀。其代碼如下:

type NodesManager struct {  
   events chan watch.Event  
}  
// manager 接口實(shí)現(xiàn)
func (nm *NodesManager) Events() chan watch.Event {  
   return nm.events  
}  
// 創(chuàng)建一個(gè)節(jié)點(diǎn)事件管理器
func NewNodesManager(si cache.SharedIndexInformer) (*NodesManager, error) {  
   events := make(chan watch.Event)  
   //獲取一個(gè)公共事件管理器實(shí)例
   rh := NewCommonResourceEventHandler(events)  
   //注冊(cè)到 Informer
   si.AddEventHandler(rh)  
  
   return &NodesManager{events: events}, nil  
}

我們可以看到它的實(shí)現(xiàn)非常簡(jiǎn)單鸠澈,主要就是利用 informer 和 CommonResourceEventHandler 的能力進(jìn)行事件傳遞。

其他的如:

  1. ConfigMapManager
  2. SecretManager
  3. RuleManager
  4. RuleEndpointManager

都是如上的實(shí)現(xiàn)截驮,只要 PodManager 有異同缰泡。

Pod 事件管理器 --- PodManager

PodManager 與其他事件管理器的不同是因?yàn)樗鼘?duì)事件消息進(jìn)行了合并和緩存后定枷,然后再將消息傳遞下行控制器處理云稚。

它擁有兩個(gè)通道个初,一個(gè)通道用來(lái)接收來(lái)自 api server 事件消息,一個(gè)通道用來(lái)傳遞合并后的事件信息坡锡。

type PodManager struct {  
   // 接收來(lái)自 api server 事件消息 
   realEvents chan watch.Event  
  
   // 用來(lái)傳遞合并后的事件信息
   mergedEvents chan watch.Event  
  
   // pods, key 是 UID, value 是 *v1.Pod  
   pods sync.Map  
}

觀察下面代碼,我們可以發(fā)現(xiàn)妹笆,創(chuàng)建 manager 的時(shí)候块请,創(chuàng)建了兩個(gè)通道:

  • 一個(gè)通道 realEvents 傳遞給了CommonResourceEventHandler 用來(lái)接收來(lái)自 api server 事件消息
  • 一個(gè)通道 mergedEvents 返回給下行控制器用來(lái)傳遞合并后的事件信息。
func (pm *PodManager) Events() chan watch.Event {  
   return pm.mergedEvents  
}  
  
func NewPodManager(config *v1alpha1.EdgeController, si cache.SharedIndexInformer) (*PodManager, error) {  
   realEvents := make(chan watch.Event, config.Buffer.PodEvent)  
   mergedEvents := make(chan watch.Event, config.Buffer.PodEvent)  
   rh := NewCommonResourceEventHandler(realEvents)  
   si.AddEventHandler(rh)  
   pm := &PodManager{realEvents: realEvents, mergedEvents: mergedEvents}  
   go pm.merge()  
   return pm, nil  
}

消息合并

消息合并主要合并的是修改信息拳缠,如果 pod.spec 和 metadata 沒(méi)有改變墩新,那么這個(gè)事件就會(huì)被忽略:

  1. 接收事件信息
  2. 新增:
    1. 將數(shù)據(jù)緩存起來(lái)。
    2. 判斷是否是刪除窟坐,是就將事件類型變?yōu)樾薷?/li>
    3. 通過(guò)通道傳遞數(shù)據(jù)出去
  3. 刪除:
    1. 刪除緩存
    2. 通過(guò)通道傳遞數(shù)據(jù)出去
  4. 修改 :
    1. 取出緩存數(shù)據(jù)
    2. 更新緩存數(shù)據(jù)
    3. 沒(méi)有緩存就通過(guò)通道傳遞數(shù)據(jù)出去
    4. 有就判斷數(shù)據(jù)是否修改海渊,修改了就傳遞,沒(méi)有就不傳遞哲鸳。
func (pm *PodManager) isPodUpdated(old *CachePod, new *v1.Pod) bool {  
   // 忽略一些不關(guān)心的字段  
   old.ObjectMeta.ResourceVersion = new.ObjectMeta.ResourceVersion  
   old.ObjectMeta.Generation = new.ObjectMeta.Generation  
  
   // 深度比較 pod.spec 和 metadata 有沒(méi)有改變
   return !reflect.DeepEqual(old.ObjectMeta, new.ObjectMeta) || !reflect.DeepEqual(old.Spec, new.Spec)  
}

func (pm *PodManager) merge() {  
   for re := range pm.realEvents {  
      pod := re.Object.(*v1.Pod)  
      switch re.Type {  
      case watch.Added:  
         pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})  
         if pod.DeletionTimestamp == nil {  
            pm.mergedEvents <- re  
         } else {  
            re.Type = watch.Modified  
            pm.mergedEvents <- re  
         }  
      case watch.Deleted:  
         pm.pods.Delete(pod.UID)  
         pm.mergedEvents <- re  
      case watch.Modified:  
         value, ok := pm.pods.Load(pod.UID)  
         pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})  
         if ok {  
            cachedPod := value.(*CachePod)  
            if pm.isPodUpdated(cachedPod, pod) {  
               pm.mergedEvents <- re  
            }  
         } else {  
            pm.mergedEvents <- re  
         }  
      default:  
         klog.Warningf("event type: %s unsupported", re.Type)  
      }  
   }  
}

事件處理

事件從事件管理器傳遞出來(lái)后臣疑,它的具體處理是由下行控制器里面的各個(gè)函數(shù)去處理。

Pod

對(duì)于 Pod 事件的處理:

  1. 判斷是否是邊緣節(jié)點(diǎn)上的pod
  2. 是就使用消息層構(gòu)建自定義消息
  3. 對(duì)于新增和修改事件徙菠,更新本地緩存
  4. 通過(guò)消息層發(fā)送自定義消息
func (dc *DownstreamController) syncPod() {  
   for {  
      select {  
      case <-beehiveContext.Done():   
         return  
      case e := <-dc.podManager.Events():  
         pod, ok := e.Object.(*v1.Pod)  
         if !ok {  
            continue  
         }  
         if !dc.lc.IsEdgeNode(pod.Spec.NodeName) {  
            continue  
         }  
         resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name)  
         if err != nil {  
            continue  
         }  
         msg := model.NewMessage("").  
            SetResourceVersion(pod.ResourceVersion).  
            FillBody(pod)  
         switch e.Type {  
         case watch.Added:  
            msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.InsertOperation)  
            dc.lc.AddOrUpdatePod(*pod)  
         case watch.Deleted:  
            msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)  
         case watch.Modified:  
            msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.UpdateOperation)  
            dc.lc.AddOrUpdatePod(*pod)  
         default:    
            continue  
         }  
         dc.messageLayer.Send(*msg)
         ...  
      }  
   }  
}

Nodes

對(duì)于 node 事件的處理:

  1. 向邊緣傳遞的消息只有刪除事件
  2. 對(duì)于新增事件會(huì)直接忽略
  3. 修改事件會(huì)修改本地緩存

之所以忽略新增事件讯沈,是因?yàn)樾略稣?qǐng)求是由邊緣傳遞到 apiserver 的,所以不需要再將新增事件傳遞回去婿奔。

func (dc *DownstreamController) syncEdgeNodes() {  
   for {  
      select {  
      case <-beehiveContext.Done():  
         return  
      case e := <-dc.nodeManager.Events():  
         node, ok := e.Object.(*v1.Node)  
         if !ok {   
            continue  
         }  
         switch e.Type {  
         case watch.Added:  
            fallthrough  
         case watch.Modified:  
            dc.lc.UpdateEdgeNode(node.ObjectMeta.Name)  
         case watch.Deleted:  
            dc.lc.DeleteNode(node.ObjectMeta.Name)  
            resource, err := messagelayer.BuildResource(node.Name, "namespace", constants.ResourceNode, node.Name)  
            if err != nil {  
               break  
            }  
            msg := model.NewMessage("").  
               BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)  
            err = dc.messageLayer.Send(*msg)  
            ... 
         default:   
         }  
      }  
   }  
}

configMap & Secret

對(duì)于 configMap 和 Secret缺狠, 其處理流程如下:

  1. 獲取 configMap/Secret 事件信息
  2. 根據(jù)事件類型修改自定義消息操作類型
  3. 從本地緩存獲取用到該資源的節(jié)點(diǎn)名稱
  4. 如果是刪除,刪除本地緩存信息
  5. 遍歷節(jié)點(diǎn)名稱列表構(gòu)建自定義消息
  6. 通過(guò)消息層發(fā)送自定義消息
func (dc *DownstreamController) syncXXX() {  
   for {  
      select {  
      case <-beehiveContext.Done():  
         return  
      case e := <-dc.XXXManager.Events():  
         XXX, ok := e.Object.(*v1.XXX)  
         if !ok {   
            continue  
         }  
         var operation string  
         switch e.Type {  
         case watch.Added:  
            operation = model.InsertOperation  
         case watch.Modified:  
            operation = model.UpdateOperation  
         case watch.Deleted:  
            operation = model.DeleteOperation  
         default: 
            continue 
         }  
  
         nodes := dc.lc.XXXNodes(XXX.Namespace, XXX.Name)  
         if e.Type == watch.Deleted {  
            dc.lc.DeleteXXX(configMap.Namespace, XXX.Name)  
         }  
         for _, n := range nodes {  
            resource, err := messagelayer.BuildResource(n, XXX.Namespace, model.ResourceTypeXXX, configMap.Name)  
            if err != nil {  
               continue  
            }  
            msg := model.NewMessage("").  
               SetResourceVersion(XXX.ResourceVersion).  
               BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, operation).  
               FillBody(XXX)  
            err = dc.messageLayer.Send(*msg)   
         }  
      }  
   }  
}

Rule & RuleEndpoint

對(duì)于 Rule & RuleEndpoint萍摊, 其處理流程如下:

  1. 獲取 Rule / RuleEndpoint 事件信息
  2. 根據(jù)操作類型構(gòu)建自定義消息
  3. 通過(guò)消息層發(fā)送自定義消息
func (dc *DownstreamController) syncRuleEndpoint() {  
   for {  
      select {  
      case <-beehiveContext.Done():  
         return  
      case e := <-dc.ruleEndpointsManager.Events():  
         ruleEndpoint, ok := e.Object.(*routerv1.RuleEndpoint)  
         if !ok {  
            continue  
         }  
         resource, err := messagelayer.BuildResourceForRouter(model.ResourceTypeRuleEndpoint, ruleEndpoint.Name)  
         if err != nil {  
            continue  
         }  
         msg := model.NewMessage("").  
            SetResourceVersion(ruleEndpoint.ResourceVersion).  
            FillBody(ruleEndpoint)  
         switch e.Type {  
         case watch.Added:  
            msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.InsertOperation)  
         case watch.Deleted:  
            msg.BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)  
         case watch.Modified:  
            continue  
         default:  
            continue  
         }  
         dc.messageLayer.Send(*msg)
         ...  
      }  
   }  
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末挤茄,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子冰木,更是在濱河造成了極大的恐慌穷劈,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,548評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件踊沸,死亡現(xiàn)場(chǎng)離奇詭異歇终,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)逼龟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門评凝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人审轮,你說(shuō)我怎么就攤上這事肥哎×伤祝” “怎么了疾渣?”我有些...
    開封第一講書人閱讀 167,990評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)崖飘。 經(jīng)常有香客問(wèn)我榴捡,道長(zhǎng),這世上最難降的妖魔是什么朱浴? 我笑而不...
    開封第一講書人閱讀 59,618評(píng)論 1 296
  • 正文 為了忘掉前任吊圾,我火速辦了婚禮达椰,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘项乒。我一直安慰自己啰劲,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評(píng)論 6 397
  • 文/花漫 我一把揭開白布檀何。 她就那樣靜靜地躺著蝇裤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪频鉴。 梳的紋絲不亂的頭發(fā)上栓辜,一...
    開封第一講書人閱讀 52,246評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音垛孔,去河邊找鬼藕甩。 笑死,一個(gè)胖子當(dāng)著我的面吹牛周荐,可吹牛的內(nèi)容都是我干的狭莱。 我是一名探鬼主播,決...
    沈念sama閱讀 40,819評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼羡藐,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼贩毕!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起仆嗦,我...
    開封第一講書人閱讀 39,725評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤辉阶,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后瘩扼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體谆甜,經(jīng)...
    沈念sama閱讀 46,268評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評(píng)論 3 340
  • 正文 我和宋清朗相戀三年集绰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了规辱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,488評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡栽燕,死狀恐怖罕袋,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情碍岔,我是刑警寧澤浴讯,帶...
    沈念sama閱讀 36,181評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站蔼啦,受9級(jí)特大地震影響榆纽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評(píng)論 3 333
  • 文/蒙蒙 一奈籽、第九天 我趴在偏房一處隱蔽的房頂上張望饥侵。 院中可真熱鬧,春花似錦衣屏、人聲如沸躏升。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)煮甥。三九已至,卻和暖如春藕赞,著一層夾襖步出監(jiān)牢的瞬間成肘,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工斧蜕, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留双霍,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,897評(píng)論 3 376
  • 正文 我出身青樓批销,卻偏偏與公主長(zhǎng)得像洒闸,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子均芽,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容