kube-proxy 源碼分析之 IPtables 模式 Proxier (二)

syncProxyRule 同步配置與規(guī)則

proxier.syncProxyRules() 實現(xiàn)監(jiān)聽svc或ep更新配置到iptables規(guī)則的一致性同步機制功能,這也是iptables proxer最核心的邏輯代碼狠半。作者實現(xiàn)是利用了iptables-save/iptables-restore機制將現(xiàn)存的iptables配置和服務與端點同步的信息來生成相對應的iptables鏈與規(guī)則數(shù)據(jù)幅骄,每次同步執(zhí)行寫入可restore標準格式的規(guī)則數(shù)據(jù)后通過iptables-restore命令進行重設iptables規(guī)則遣蚀。

這個同步規(guī)則處理代碼比較長色乾,我們后面將分解成小塊來講解。下面為syncProxyRules(部分已解析注釋替代)代碼框架說明窍奋,內(nèi)注釋的每塊內(nèi)容在后面都將有單獨代碼分析說明荐健。

!FILENAME pkg/proxy/iptables/proxier.go:634

func (proxier *Proxier) syncProxyRules() {
  proxier.mu.Lock()
    defer proxier.mu.Unlock()

    start := time.Now()
    defer func() {
        metrics.SyncProxyRulesLatency.Observe(metrics.SinceInMicroseconds(start))
        klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
    }()
    // don't sync rules till we've received services and endpoints
    if !proxier.endpointsSynced || !proxier.servicesSynced {
        klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
        return
    }

  // 檢測與更新變化(service/endpoints)
  //...

  // 創(chuàng)建與聯(lián)接kube鏈
  //...

    // 獲取現(xiàn)存在的Filter/Nat表鏈數(shù)據(jù)
  //...


  // 創(chuàng)建iptables-save/restore格式數(shù)據(jù)(表頭、鏈)
  //...

  // 寫kubernets特有的SNAT地址偽裝規(guī)則
  //...

    // Accumulate NAT chains to keep.
    activeNATChains := map[utiliptables.Chain]bool{} 
    // Accumulate the set of local ports that we will be holding open once this update is complete
    replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}

    endpoints := make([]*endpointsInfo, 0)
    endpointChains := make([]utiliptables.Chain, 0)

    args := make([]string, 64)

    // Compute total number of endpoint chains across all services.
    proxier.endpointChainsNumber = 0
    for svcName := range proxier.serviceMap {
        proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName])
    }

  // 為個"服務"創(chuàng)建rules(service portal規(guī)則的創(chuàng)建)
    for svcName, svc := range proxier.serviceMap {
     //...
    }

    // 刪除不再使用的鏈
  //...

    // nodeports鏈
  //...
  
  // FORWARD策略
  //...

    // 配置clusterCIDR規(guī)則
  //...

    // 寫結整標簽
  //...

    //匯集與iptables-restore加載數(shù)據(jù)
  //...

  // 關閉過舊的本地端口琳袄,更新portmap數(shù)據(jù)
    for k, v := range proxier.portsMap {
        if replacementPortsMap[k] == nil {
            v.Close()
        }
    }
    proxier.portsMap = replacementPortsMap

    // 更新healthz timestamp.
    if proxier.healthzServer != nil {
        proxier.healthzServer.UpdateTimestamp()
    }

    // 更新healthchecks.
    if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
        klog.Errorf("Error syncing healthcheck services: %v", err)
    }
    if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
        klog.Errorf("Error syncing healthcheck endpoints: %v", err)
    }

    // 完成清理工作
    for _, svcIP := range staleServices.UnsortedList() {
        if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
            klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
        }
    }
    proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}

下面將分解詳述每塊代碼邏輯:

更新 service 和 endpoints ;返回更新結果

!FILENAME pkg/proxy/iptables/proxier.go:652

    //更新SVC/EP
  serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
    endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
  
    staleServices := serviceUpdateResult.UDPStaleClusterIP
  // 從EndpointsMap更新結果返回中合并UDP協(xié)議廢棄服務信息
    for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
        if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP {
            klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString())
            staleServices.Insert(svcInfo.ClusterIPString())
        }
    }

UpdateServiceMap() SVC 服務的更新實現(xiàn)

!FILENAME pkg/proxy/service.go:212

func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
    result.UDPStaleClusterIP = sets.NewString()   // 已廢棄的UDP端口
    serviceMap.apply(changes, result.UDPStaleClusterIP)  // 應用更新map->

    result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
    for svcPortName, info := range serviceMap {
        if info.GetHealthCheckNodePort() != 0 {
            result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort())    //健康檢測的node Port
        }
    }

    return result
}

**serviceMap.apply() **應用更新變化事件的服務項(merge->filter->unmerge)

!FILENAME pkg/proxy/service.go:268

func (serviceMap *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
    changes.lock.Lock()
    defer changes.lock.Unlock()
    for _, change := range changes.items {
        serviceMap.merge(change.current)         //合并變更增加項到serviceMap              ①
        change.previous.filter(change.current)   //過濾掉已處理更新變化的項江场,跳過unmerge處理  ②
        serviceMap.unmerge(change.previous, UDPStaleClusterIP)  //刪除廢棄的項            ③
    }
    // clear changes after applying them to ServiceMap.
    changes.items = make(map[types.NamespacedName]*serviceChange)
    return
}

① serviceMap.merge() 合并增加"other“內(nèi)容到當前的serviceMap內(nèi)。"即將變化的服務列表進行合并”窖逗。

!FILENAME pkg/proxy/service.go:301

func (sm *ServiceMap) merge(other ServiceMap) sets.String {
    existingPorts := sets.NewString()
    for svcPortName, info := range other {
        existingPorts.Insert(svcPortName.String())
        _, exists := (*sm)[svcPortName]
     //...
        (*sm)[svcPortName] = info
    }
    return existingPorts
}

② serviceMap.unmerge() 從當前map移除"other"存在的內(nèi)容項址否。"即刪除廢棄的項"

!FILENAME pkg/proxy/service.go:330

func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
    for svcPortName := range other {
        info, exists := (*sm)[svcPortName]
        if exists {
            if info.GetProtocol() == v1.ProtocolUDP {
                UDPStaleClusterIP.Insert(info.ClusterIPString())  //存儲已廢丟UDP服務的集群IP列表 
            }
            delete(*sm, svcPortName)
        } //...
    }
}

③ serviceMap.filter() 基于"other"給定的服務端口名(key值),過濾掉存在于serviceMap的項

!FILENAME pkg/proxy/service.go:319

func (sm *ServiceMap) filter(other ServiceMap) {
    for svcPortName := range *sm {
        if _, ok := other[svcPortName]; ok {
            delete(*sm, svcPortName)
        }
    }
}

UpdateEndpointsMap() 端點更新的實現(xiàn)

!FILENAME pkg/proxy/endpoints.go:163

func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
    result.StaleEndpoints = make([]ServiceEndpoint, 0)
    result.StaleServiceNames = make([]ServicePortName, 0)

    endpointsMap.apply(changes, &result.StaleEndpoints, &result.StaleServiceNames)

    // TODO: If this will appear to be computationally expensive, consider
    // computing this incrementally similarly to endpointsMap.
    result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
    localIPs := GetLocalEndpointIPs(endpointsMap)
    for nsn, ips := range localIPs {
        result.HCEndpointsLocalIPSize[nsn] = len(ips)
    }

    return result
}

EndpointsMap.apply() 應用更新變化事件的端點項(merge->unmerge)

!FILENAME pkg/proxy/endpoints.go:242

func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
    if changes == nil {
        return
    }
    changes.lock.Lock()
    defer changes.lock.Unlock()
    for _, change := range changes.items {
        endpointsMap.Unmerge(change.previous)                  // 刪除                ①
        endpointsMap.Merge(change.current)                     // 更新                ②
        detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)                                           //廢棄查找              ③      
    }
    changes.items = make(map[types.NamespacedName]*endpointsChange)
}

① EndpointsMap.Merge() 將"other"內(nèi)指定項的值(端點列表)更新至EndpointMap

!FILENAME pkg/proxy/endpoints.go:259

func (em EndpointsMap) Merge(other EndpointsMap) {
    for svcPortName := range other {
        em[svcPortName] = other[svcPortName]
    }
}

② EndpointsMap.Unmerge() 刪除"other"內(nèi)指定項

!FILENAME pkg/proxy/endpoints.go:266

func (em EndpointsMap) Unmerge(other EndpointsMap) {
    for svcPortName := range other {
        delete(em, svcPortName)
    }
}

③ EndpointsMap.detectStaleConnections() 查找廢棄后端連接信息項

!FILENAME pkg/proxy/endpoints.go:291

func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
    for svcPortName, epList := range oldEndpointsMap {
        for _, ep := range epList {
            stale := true
            for i := range newEndpointsMap[svcPortName] {
                if newEndpointsMap[svcPortName][i].Equal(ep) {     //存在則stale為否
                    stale = false
                    break
                }
            }
            if stale {  
                klog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.String())
                *staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})   //存儲廢棄的endpoint列表
            }
        }
    }

    for svcPortName, epList := range newEndpointsMap {
    //對于UDP服務碎紊,如果后端變化從0至非0佑附,可能存在conntrack項將服務的流量黑洞
        if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
            *staleServiceNames = append(*staleServiceNames, svcPortName)
      //存儲廢棄的服務名列表
        }
    }
}

創(chuàng)建與聯(lián)接 kube 鏈

  • filter表中INPUT鏈頭部插入自定義鏈調(diào)轉到KUBE-EXTERNAL-SERVICES鏈
    iptables -I "INPUT" -t "filter" -m "conntrack" --ctstate "NEW" -m comment --comment "kubernetes externally-visible service portals" -j "KUBE-EXTERNAL-SERVICES"

  • filter表中OUTPUT鏈頭部插入自定義鏈調(diào)轉到KUBE-SERVICE鏈
    iptables -I "OUTPUT" -t "filter" -m "conntrack" --ctstate "NEW" -m comment --comment "kubernetes service portals" -j "KUBE-SERVICES"

  • nat表中OUTPUT鏈頭部插入自定義鏈調(diào)轉到KUBE-SERVICES鏈
    iptables -I "OUTPUT" -t "nat" -m comment --comment "kubernetes service portals" -j "KUBE-SERVICES"

  • nat表中PREROUTING鏈頭部插入自定義鏈調(diào)轉到KUBE-SERVICES鏈
    iptables -I "PREROUTING" -t "nat" -m comment --comment "kubernetes service portals" -j "KUBE-SERVICES"

  • nat表中POSTROUTING鏈頭部插入自定義鏈調(diào)轉到KUBE-POSTROUTING鏈
    iptables -I "POSTROUTING" -t "nat" -m comment --comment "kubernetes postrouting rules" -j "KUBE-POSTROUTING"

  • filter表中FORWARD鏈頭部插入自定義鏈調(diào)轉到KUBE-FORWARD鏈
    iptables -I "FORWARD" -t "filter" -m comment --comment "kubernetes forwarding rules" -j "KUBE-FORWARD"

!FILENAME pkg/proxy/iptables/proxier.go:667

//循環(huán)iptablesJumpChains定義
for _, chain := range iptablesJumpChains {
    //底層命令iptables -t $tableName -N $chainName
        if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err)
            return
        }
        args := append(chain.extraArgs,
            "-m", "comment", "--comment", chain.comment,
            "-j", string(chain.chain),
        )
    //底層命令iptables  -I $chainName -t $tableName -m comment --comment $comment -j $chain
        if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err)
            return
        }
    }

創(chuàng)建 Iptables 基礎數(shù)據(jù)

  • 獲取現(xiàn)存在的Filter/Nat表鏈數(shù)據(jù)
  • 創(chuàng)建iptables-save/restore格式數(shù)據(jù)(表頭、鏈)
  • 創(chuàng)建SNAT地址偽裝規(guī)則

!FILENAME pkg/proxy/iptables/proxier.go:688

    //現(xiàn)存在的filter表鏈獲取
    existingFilterChains := make(map[utiliptables.Chain][]byte)
    proxier.existingFilterChainsData.Reset()
    err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)  //通過iptables-save方式來獲取
    if err != nil { 
        klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
    } else { 
        existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes()) //輸出結果
    }

    //同上仗考,現(xiàn)存在的nat表鏈獲取
    existingNATChains := make(map[utiliptables.Chain][]byte)
    proxier.iptablesData.Reset()
    err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
    if err != nil { 
        klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
    } else { 
        existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
    }

    // Reset all buffers used later.
    // This is to avoid memory reallocations and thus improve performance.
    proxier.filterChains.Reset()
    proxier.filterRules.Reset()
    proxier.natChains.Reset()
    proxier.natRules.Reset()

    // 寫表頭
    writeLine(proxier.filterChains, "*filter")
    writeLine(proxier.natChains, "*nat")

寫鏈數(shù)據(jù)

fileter: "KUBE-SERVICES" / "KUBE-EXTERNAL-SERVICES"/ "KUBE-FORWARD"
nat: "KUBE-SERVICES" / "KUBE-NODEPORTS" / "KUBE-POSTROUTING" / "KUBE-MARK-MASQ"

!FILENAME pkg/proxy/iptables/proxier.go:720

    // 寫chain鏈數(shù)據(jù),將filter和Nat相關鏈格式化存放buffer
    for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
        if chain, ok := existingFilterChains[chainName]; ok {
            writeBytesLine(proxier.filterChains, chain)
        } else {
      // iptables-save/restore格式的鏈行":$chainName - [0:0]"
            writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
        }
    }
    for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
        if chain, ok := existingNATChains[chainName]; ok {
            writeBytesLine(proxier.natChains, chain)
        } else {
            writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
        }
    }

寫地址偽裝規(guī)則音同,在POSTROUTING階段對地址進行MASQUERADE(基于接口動態(tài)IP的SNAT)處理,原始請求源IP將被丟失秃嗜,被請求POD的應用看到為NodeIP或CNI設備IP(bridge/vxlan設備)

!FILENAME pkg/proxy/iptables/proxier.go:738

  // 寫kubernets特有的SNAT地址偽裝規(guī)則
  // -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE
    writeLine(proxier.natRules, []string{
        "-A", string(kubePostroutingChain),
        "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
        "-m", "mark", "--mark", proxier.masqueradeMark,
        "-j", "MASQUERADE",
    }...)

  //-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
    writeLine(proxier.natRules, []string{
        "-A", string(KubeMarkMasqChain),
        "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
    }...)

為每個 service 創(chuàng)建 rules

先了解serviceInfo的完整定義說明

!FILENAME pkg/proxy/iptables/proxier.go:141

type serviceInfo struct {
    *proxy.BaseServiceInfo
    // The following fields are computed and stored for performance reasons.
    serviceNameString        string
  servicePortChainName     utiliptables.Chain  // KUBE-SVC-XXXX16BitXXXX  服務鏈
    serviceFirewallChainName utiliptables.Chain  // KUBE-FW-XXXX16BitXXXX   Firewall鏈
    serviceLBChainName       utiliptables.Chain  // KUBE-XLB-XXXX16BitXXXX  SLB鏈
}

type BaseServiceInfo struct {
  ClusterIP                net.IP                   //PortalIP(VIP)
    Port                     int                      //portal端口
    Protocol                 v1.Protocol              //協(xié)議
    NodePort                 int                      //node節(jié)點端口     
    LoadBalancerStatus       v1.LoadBalancerStatus    //LB Ingress
    SessionAffinityType      v1.ServiceAffinity       //會話保持
    StickyMaxAgeSeconds      int                      //保持最大時長
    ExternalIPs              []string                 //ExternalIPs(指定的node上監(jiān)聽端口)
    LoadBalancerSourceRanges []string                 //過濾源地址流量
    HealthCheckNodePort      int                      //HealthCheck檢測端口
    OnlyNodeLocalEndpoints   bool
}

為每個服務創(chuàng)建服務"KUBE-SVC-XXX…"和外部負載均衡"KUBE-XLB-XXX…"鏈

!FILENAME pkg/proxy/iptables/proxier.go:791

        svcChain := svcInfo.servicePortChainName  //"KUBE-SVC-XXX..."
        if hasEndpoints {
            // Create the per-service chain, retaining counters if possible.
            if chain, ok := existingNATChains[svcChain]; ok {
                writeBytesLine(proxier.natChains, chain)
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
            }
            activeNATChains[svcChain] = true
        }

        svcXlbChain := svcInfo.serviceLBChainName  // "KUBE-XLB-XXX…" 
        if svcInfo.OnlyNodeLocalEndpoints {
            // Only for services request OnlyLocal traffic
            // create the per-service LB chain, retaining counters if possible.
            if lbChain, ok := existingNATChains[svcXlbChain]; ok {
                writeBytesLine(proxier.natChains, lbChain)
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
            }
            activeNATChains[svcXlbChain] = true
        }

clusterIP流量的匹配权均,clusterIP為默認方式,僅資源集群內(nèi)可訪問锅锨。

!FILENAME pkg/proxy/iptables/proxier.go:815

    //存在端點叽赊,寫規(guī)則
        if hasEndpoints {
            args = append(args[:0],
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
                "-m", protocol, "-p", protocol,
                "-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
                "--dport", strconv.Itoa(svcInfo.Port),
            )
     // proxier配置masqueradeAll
     //  -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d $clusterIP \
     //     --dport $port -j KUBE-MARK-MASQ
            if proxier.masqueradeAll {
                writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
            } else if len(proxier.clusterCIDR) > 0 {
       // proxier配置clusterCIDR情況:
       // -A KUBE-SERVICES ! -s $clusterCIDR -m comment --comment "..." -m $prot \
       //    -p $prot -d $clusterIP --dport $port -j  KUBE-MARK-MASQ
                writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
            }
      //  -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d $clusterIP \
      //     --dport $port -j KUBE-SVC-XXXX16bitXXXX
            writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
        } else {
      // 無Endpoints的情況纹腌,則創(chuàng)建REJECT規(guī)則
      // -A KUBE-SERVICES -m comment --comment $svcName -m $prot -p $prot -d $clusterIP \
      // --dport $port -j REJECT
            writeLine(proxier.filterRules,
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                "-m", protocol, "-p", protocol,
                "-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
                "--dport", strconv.Itoa(svcInfo.Port),
                "-j", "REJECT",
            )
        }

服務是否啟用ExternalIPs(指定的node上開啟監(jiān)聽端口)

!FILENAME pkg/proxy/iptables/proxier.go:846

    for _, externalIP := range svcInfo.ExternalIPs {
      // 判斷externalIP是否為本node的IP以及協(xié)議為SCTP贾节,且端口是否已開啟
      // 如果未開啟則在本地打開監(jiān)聽端口
            if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
                klog.Errorf("can't determine if IP is local, assuming not: %v", err)
            } else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) {
                lp := utilproxy.LocalPort{
                    Description: "externalIP for " + svcNameString,
                    IP:          externalIP,
                    Port:        svcInfo.Port,
                    Protocol:    protocol,
                }
                if proxier.portsMap[lp] != nil {
                    klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                    replacementPortsMap[lp] = proxier.portsMap[lp]
                } else {
          //打開與監(jiān)聽本地端口
                    socket, err := proxier.portMapper.OpenLocalPort(&lp)
                    if err != nil {
                        msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)

                        proxier.recorder.Eventf(
                            &v1.ObjectReference{
                                Kind:      "Node",
                                Name:      proxier.hostname,
                                UID:       types.UID(proxier.hostname),
                                Namespace: "",
                            }, v1.EventTypeWarning, err.Error(), msg)
                        klog.Error(msg)
                        continue
                    }
                    replacementPortsMap[lp] = socket
                }
            }
      
      //存在端點萌焰,寫規(guī)則
            if hasEndpoints {
                args = append(args[:0],
                    "-A", string(kubeServicesChain),        
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
                    "-m", protocol, "-p", protocol,
                    "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
                    "--dport", strconv.Itoa(svcInfo.Port),
                )

         // -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
        //    $externalIP --dport $port -j KUBE-MARK-MASQ 
                writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
       
        //  -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
        //     $externalIP --dport $port -m physdev ! --physdev-is-in \ 
        //     -m addrtype ! --src-type Local -j KUBE-SVC-XXXX16bitXXXXX
                externalTrafficOnlyArgs := append(args,
                    "-m", "physdev", "!", "--physdev-is-in",
                    "-m", "addrtype", "!", "--src-type", "LOCAL")
                writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
                dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
        
        //  -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
        //     $externalIP --dport $port -m addrtype --dst-type Local 
        //     -j KUBE-SVC-XXXX16bitXXXXX
                writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
            } else {
        // 不存在端點信息則reject
        // -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
        //    $externalIP --dport $port -j REJECT 
                writeLine(proxier.filterRules,
                    "-A", string(kubeExternalServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                    "-m", protocol, "-p", protocol,
                    "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
                    "--dport", strconv.Itoa(svcInfo.Port),
                    "-j", "REJECT",
                )
            }
        }

服務是否啟用了外部負載均衡服務load-balancer ingress

!FILENAME pkg/proxy/iptables/proxier.go:917

    //存在端點淤翔,寫規(guī)則
        if hasEndpoints {
            fwChain := svcInfo.serviceFirewallChainName     //"KUBE-FW-XXXX16bitXXXXX"
            for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
                if ingress.IP != "" {
                    // 創(chuàng)建服務KUBE-FW-X鏈
                    if chain, ok := existingNATChains[fwChain]; ok {
                        writeBytesLine(proxier.natChains, chain)
                    } else {  //原來不存在則新建
                        writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
                    }
                    activeNATChains[fwChain] = true
                    // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
                    // This currently works for loadbalancers that preserves source ips.
                    // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.

                    args = append(args[:0],
                        "-A", string(kubeServicesChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
                        "-m", protocol, "-p", protocol,
                        "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)),
                        "--dport", strconv.Itoa(svcInfo.Port),
                    )          
          // -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
          //    $ingresIP --dport $port -j KUBE-FW-XXXX16bitXXXXX 
                    writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)

                    args = append(args[:0],
                        "-A", string(fwChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
                    )

           // 在KUBE-FW鏈,每個源匹配規(guī)則可能跳轉至一個SVC或XLB鏈
                    chosenChain := svcXlbChain
                    // If we are proxying globally, we need to masquerade in case we cross nodes.
                    // If we are proxying only locally, we can retain the source IP.
                    if !svcInfo.OnlyNodeLocalEndpoints {
            // -j "KUBE-MARK-MASQ" 地址偽裝實現(xiàn)跨主機訪問 
                        writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                        chosenChain = svcChain   // 選擇為SVC鏈
                    }

                    if len(svcInfo.LoadBalancerSourceRanges) == 0 {
            // 允許所有源衙伶,直接跳轉 
                        writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
                    } else {
                        // 基于source range配置過濾 "-s $srcRanges"
            allowFromNode := false
                        for _, src := range svcInfo.LoadBalancerSourceRanges {
                            writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
                            _, cidr, _ := net.ParseCIDR(src)
                            if cidr.Contains(proxier.nodeIP) {
                                allowFromNode = true    //配置CIDR包含節(jié)點IP,則允許來自節(jié)點請求
                            }
                        }
            
            // 添加 "-s $ingresIP" 來允許LB后端主機請求
                        if allowFromNode {
                            writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...)
                        }
                    }
          
          // 條件ingress.IP為空"-j KUBE-MARK-DROP"
                    writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
                }
            }
        }

服務是否啟用了nodeport(在每個節(jié)點上都將開啟一個nodeport端口)

!FILENAME pkg/proxy/iptables/proxier.go:989

        if svcInfo.NodePort != 0 {
      // 獲取node addresses
            addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
            if err != nil {
                klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
                continue
            }

            lps := make([]utilproxy.LocalPort, 0)
            for address := range addresses {
                lp := utilproxy.LocalPort{
                    Description: "nodePort for " + svcNameString,
                    IP:          address,
                    Port:        svcInfo.NodePort,
                    Protocol:    protocol,
                }
                if utilproxy.IsZeroCIDR(address) {
                    // Empty IP address means all
                    lp.IP = ""
                    lps = append(lps, lp)
                    // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
                    break
                }
                lps = append(lps, lp)     //IP列表
            }

      // 為node節(jié)點的ips打開端口并保存持有socket句柄
            for _, lp := range lps {
                if proxier.portsMap[lp] != nil {
                    klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                    replacementPortsMap[lp] = proxier.portsMap[lp]
                } else if svcInfo.GetProtocol() != v1.ProtocolSCTP {
          // 打開和監(jiān)聽端口
                    socket, err := proxier.portMapper.OpenLocalPort(&lp)
                    if err != nil {
                        klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
                        continue
                    }
                    if lp.Protocol == "udp" {
                //清理udp conntrack記錄
                        err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
                        if err != nil {
                            klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
                        }
                    }
                    replacementPortsMap[lp] = socket //socket保存
                }
            }
      
      //存在端點,寫規(guī)則
            if hasEndpoints {
        // -A KUBE-NODEPORTS -m comment --comment "..." -m $prot -p $prot --dport $nodePort
                args = append(args[:0],
                    "-A", string(kubeNodePortsChain),
                    "-m", "comment", "--comment", svcNameString,
                    "-m", protocol, "-p", protocol,
                    "--dport", strconv.Itoa(svcInfo.NodePort),
                )
                if !svcInfo.OnlyNodeLocalEndpoints {
          //非本地nodeports則需SNAT規(guī)則添加,
          // -j KUBE-MARK-MASQ -j KUBE-XLB-XXXX16bitXXXX
                    writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                    // Jump to the service chain.
                    writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
                } else {
                    loopback := "127.0.0.0/8"
                    if isIPv6 {
                        loopback = "::1/128"
                    }
           // 本地nodeports則規(guī)則添加,
          // -s $loopback -j KUBE-MARK-MASQ -j KUBE-XLB-XXXX16bitXXXX
                    writeLine(proxier.natRules, append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...)
                    writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
                }
            } else {
        // 無hasEndpoints研侣,添加-j reject規(guī)則 
        // -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m addrtype \
        //     --dst-type LOCAL -m $prot -p $prot --dport $nodePort -j REJECT
                writeLine(proxier.filterRules,
                    "-A", string(kubeExternalServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                    "-m", "addrtype", "--dst-type", "LOCAL",
                    "-m", protocol, "-p", protocol,
                    "--dport", strconv.Itoa(svcInfo.NodePort),
                    "-j", "REJECT",
                )
            }
        }

基于服務名和協(xié)議,生成每個端點鏈

!FILENAME pkg/proxy/iptables/proxier.go:1087

  for _, ep := range proxier.endpointsMap[svcName] {
            epInfo, ok := ep.(*endpointsInfo)
            if !ok {
                klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
                continue
            }
            endpoints = append(endpoints, epInfo)
      //基于服務名和協(xié)議生成端點鏈名稱 "KUBE-SEP-XXXX16bitXXXX"
            endpointChain = epInfo.endpointChain(svcNameString, protocol)
            endpointChains = append(endpointChains, endpointChain)

      // 創(chuàng)建端點鏈
            if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
                writeBytesLine(proxier.natChains, chain) 
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
            }
            activeNATChains[endpointChain] = true
        }

寫SessionAffinity會話保持規(guī)則炮捧,實現(xiàn)在一段時間內(nèi)保持session affinity庶诡,保持時間為180秒,通過添加“-m recent –rcheck –seconds 180 –reap”的iptables規(guī)則實現(xiàn)了會話保持咆课。

!FILENAME pkg/proxy/iptables/proxier.go:1107

    //SessionAffinityType設置為"ClientIP"末誓,則寫session保持規(guī)則
    // -A KUBE-SVC-XXXX16bitXXXX -m recent -m comment –comment "..." \
    //    --name KUBE-SEP-XXXX16bitXXXX --rcheck --seconds 180 --reap \
    //    -j KUBE-SEP-XXXX16bitXXXX
        if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
            for _, endpointChain := range endpointChains {
                args = append(args[:0],
                    "-A", string(svcChain),
                )
                proxier.appendServiceCommentLocked(args, svcNameString)
                args = append(args,
                    "-m", "recent", "--name", string(endpointChain),
                    "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap",
                    "-j", string(endpointChain),
                )
                writeLine(proxier.natRules, args...)
            }
        }

寫負載均衡和DNAT規(guī)則扯俱,使用“-m statistic –-mode random -–probability ” iptables規(guī)則將后端POD組成一個基于概率訪問的組合,實現(xiàn)服務訪問的負載均衡功能效果。

  • 針對服務的每個端點在nat表內(nèi)該service對應的自定義鏈“KUBE-SVC-XXXX16bitXXXX”中加入iptables規(guī)則喇澡。如果該服務對應的endpoints大于等于2迅栅,則添加負載均衡規(guī)則。
  • 針對選擇非本地Node上的POD晴玖,需進行DNAT读存,將請求的目標地址設置成后選的POD的IP后進行路由。KUBE-MARK-MASQ將重設(偽裝)源地址

-A KUBE-SVC-XXXX16bitXXXX -m comment –comment "..." -m statistic --mode random --probability $prob -j KUBE-SEP-XXXX16bitXXXX

-A KUBE-SEP-XXXX16bitXXXX -m comment –comment "..." -s $epIp -j "KUBE-MARK-MASQ"

-A KUBE-SVC-XXXX16bitXXXX -m comment –comment "…" -m prot -p $prot -j DNAT --to-destination X.X.X.X:xxx

!FILENAME pkg/proxy/iptables/proxier.go:1123

    // 寫負載均衡和DNAT規(guī)則
        n := len(endpointChains)
        for i, endpointChain := range endpointChains {
            epIP := endpoints[i].IP()
            if epIP == "" {
                // Error parsing this endpoint has been logged. Skip to next endpoint.
                continue
            }
            // 每個服務生成的負載均衡規(guī)則呕屎,后端POD組成一個基于概率訪問的組合
      //  // -A KUBE-SVC-XXXX16bitXXXX -m comment –comment "..." 
      //    -m statistic --mode random --probability $prob 
      //    -j KUBE-SEP-XXXX16bitXXXX
            args = append(args[:0], "-A", string(svcChain))
            proxier.appendServiceCommentLocked(args, svcNameString)
            if i < (n - 1) {   // 當端點大于或等于2 
                args = append(args,
                    "-m", "statistic",
                    "--mode", "random",
                    "--probability", proxier.probability(n-i))
            }
            // The final (or only if n == 1) rule is a guaranteed match.
            args = append(args, "-j", string(endpointChain))
            writeLine(proxier.natRules, args...)

      // 每個端點鏈規(guī)則
      // -A KUBE-SEP-XXXX16bitXXXX -m comment –comment "..."  -s $epIp -j "KUBE-MARK-MASQ"
            args = append(args[:0], "-A", string(endpointChain))
            proxier.appendServiceCommentLocked(args, svcNameString)
            // Handle traffic that loops back to the originator with SNAT.
            writeLine(proxier.natRules, append(args,
                "-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
                "-j", string(KubeMarkMasqChain))...)

      // 如配置session保持"ClientIP"
      // -m recent --name KUBE-SEP-XXXX16bitXXXX --set 
            if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
                args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
            }
            // DNAT至最終的端點服務上
      // -A KUBE-SVC-XXXX16bitXXXX -m comment –comment "..." 
      //    -m $prot -p $prot -j DNAT --to-destination X.X.X.X:xxx
            args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
            writeLine(proxier.natRules, args...)
        }

    // 服務請求僅本地流量
        localEndpoints := make([]*endpointsInfo, 0)
        localEndpointChains := make([]utiliptables.Chain, 0)
        for i := range endpointChains {
            if endpoints[i].IsLocal {
                // These slices parallel each other; must be kept in sync
                localEndpoints = append(localEndpoints, endpoints[i])
                localEndpointChains = append(localEndpointChains, endpointChains[i])
            }
        }

啟用clusterCIDR (Kube-proxy中的--cluster-dir指定的是集群中pod使用的網(wǎng)段让簿,而pod使用的網(wǎng)段和apiserver中指定的service的cluster ip或vip網(wǎng)段不是同一個網(wǎng)段)

!FILENAME pkg/proxy/iptables/proxier.go:1179

     // pod -> external VIP流量導向服務VIP(服務鏈)
    // -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -s $clusterCIDR 
    //    -j  KUBE-SVC-XXXX16bitXXXXX
        if len(proxier.clusterCIDR) > 0 {
            args = append(args[:0],
                "-A", string(svcXlbChain),
                "-m", "comment", "--comment",
                `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
                "-s", proxier.clusterCIDR,
                "-j", string(svcChain),
            )
            writeLine(proxier.natRules, args...)
        }

生成本地端點鏈規(guī)則,本地源IP保持(當只在本地選擇POD服務請求時秀睛,則不存在SNAT規(guī)則尔当,可保持源地址IP信息。在nodePort或XLB時蹂安,可定義"externalTrafficPolicy": "Local"控制向屬于這個service的本地的POD轉發(fā)請求椭迎,如果本地沒有POD能服務這個請求,請求將被DROP掉藤抡,客戶端會發(fā)現(xiàn)請求超時沒有響應侠碧。

!FILENAME pkg/proxy/iptables/proxier.go:1190

    numLocalEndpoints := len(localEndpointChains)
        if numLocalEndpoints == 0 {
      // 無本地端點,將流量Drop(流量黑洞處理)
      // -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -j KUBE-MARK-DROP
            args = append(args[:0],
                "-A", string(svcXlbChain),
                "-m", "comment", "--comment",
                fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
                "-j",
                string(KubeMarkDropChain),
            )
            writeLine(proxier.natRules, args...)
        } else {
            // 本地端點會話保持開啟
      // -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -m recent \ 
      //    --name KUBE-SEP-XXXX16bitXXXX \
      //    --rcheck --seconds $StickyMaxAge --reap -j KUBE-SEP-XXXX16bitXXXX
            if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
                for _, endpointChain := range localEndpointChains {
                    writeLine(proxier.natRules,
                        "-A", string(svcXlbChain),
                        "-m", "comment", "--comment", svcNameString,
                        "-m", "recent", "--name", string(endpointChain),
                        "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap",
                        "-j", string(endpointChain))
                }
            }

      // 本地端點負載均衡處理"-m statistic --mode random --probability"
      // 后端POD組成一個基于概率訪問的組合
            for i, endpointChain := range localEndpointChains {
                // Balancing rules in the per-service chain.
                args = append(args[:0],
                    "-A", string(svcXlbChain),
                    "-m", "comment", "--comment",
                    fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
                )
                if i < (numLocalEndpoints - 1) {
                    // Each rule is a probabilistic match.
                    args = append(args,
                        "-m", "statistic",
                        "--mode", "random",
                        "--probability", proxier.probability(numLocalEndpoints-i))
                }
        
                args = append(args, "-j", string(endpointChain))
      // -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -m recent \ 
      //    --name KUBE-SEP-XXXX16bitXXXX -m statistic --mode random \
      //    --probability 0.50000000000  -j KUBE-SEP-XXXX16bitXXXX
                writeLine(proxier.natRules, args...)
            }
        }

配置收尾規(guī)則數(shù)據(jù)

刪除不再使用的服務自定義kube鏈"KUBE-SVC-*"/"KUBE-SEP-*"/"KUBE-FW-*"/"KUBE-XLB-*"缠黍。

!FILENAME pkg/proxy/iptables/proxier.go:1237

    for chain := range existingNATChains {
        if !activeNATChains[chain] {
            chainString := string(chain)
            if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
                // Ignore chains that aren't ours.
                continue
            }
            // We must (as per iptables) write a chain-line for it, which has
            // the nice effect of flushing the chain.  Then we can remove the
            // chain.
            writeBytesLine(proxier.natChains, existingNATChains[chain])
            writeLine(proxier.natRules, "-X", chainString)
        }
    }

添加服務的nodeports規(guī)則(nat表-"KUBE-SERVICES"鏈)

!FILENAME pkg/proxy/iptables/proxier.go:1254

// -A KUBE-SERVICES -m comment --comment "..." -m addrtype --dst-type LOCAL \
//    -j KUBE-NODEPORTS
//
// -A KUBE-SERVICES -m comment --comment "..." -d $NODEIP -j KUBE-NODEPORTS
//
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
    if err != nil {
        klog.Errorf("Failed to get node ip address matching nodeport cidr")
    } else {
        isIPv6 := proxier.iptables.IsIpv6()
        for address := range addresses {
            // TODO(thockin, m1093782566): If/when we have dual-stack support we will want to distinguish v4 from v6 zero-CIDRs.
            if utilproxy.IsZeroCIDR(address) {
                args = append(args[:0],
                    "-A", string(kubeServicesChain),
                    "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
                    "-m", "addrtype", "--dst-type", "LOCAL",
                    "-j", string(kubeNodePortsChain))
                writeLine(proxier.natRules, args...)
                // Nothing else matters after the zero CIDR.
                break
            }
            // Ignore IP addresses with incorrect version
            if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) {
                klog.Errorf("IP address %s has incorrect IP version", address)
                continue
            }
            // create nodeport rules for each IP one by one
            args = append(args[:0],
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
                "-d", address,
                "-j", string(kubeNodePortsChain))
            writeLine(proxier.natRules, args...)
        }
    }

添加forward規(guī)則(filter表-"KUBE-FORWARD"鏈)

!FILENAME pkg/proxy/iptables/proxier.go:1289

//-A KUBE-FORWARD -m comment --comment "..." -m mark --mark 0xFFFF/0xFFFF -j ACCEPT
writeLine(proxier.filterRules,
        "-A", string(kubeForwardChain),
        "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
        "-m", "mark", "--mark", proxier.masqueradeMark,
        "-j", "ACCEPT",
    )

添加帶clusterCIDR配置(源/目標)規(guī)則(filter表-"KUBE-FORWARD"鏈)

!FILENAME pkg/proxy/iptables/proxier.go:1297

//Kube-proxy中的cluster-dir指定的是集群中pod使用的網(wǎng)段
//pod使用的網(wǎng)段和service的cluster ip網(wǎng)段不是同一個網(wǎng)段
//
// -A KUBE-FORWARD -s $clusterCIDR -m comment --comment "..." -m conntrack --ctstate \ 
//    RELATED,ESTABLISHED -j ACCEPT
// -A KUBE-FORWARD -m comment --comment "..." -d $clusterCIDR -m conntrack --ctstate \ 
//    RELATED,ESTABLISHED -j ACCEPT
//
if len(proxier.clusterCIDR) != 0 {
        writeLine(proxier.filterRules,
            "-A", string(kubeForwardChain),
            "-s", proxier.clusterCIDR,           //指定源
            "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
            "-m", "conntrack",
            "--ctstate", "RELATED,ESTABLISHED",
            "-j", "ACCEPT",
        )
        writeLine(proxier.filterRules,
            "-A", string(kubeForwardChain),
            "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
            "-d", proxier.clusterCIDR,            //指定目標
            "-m", "conntrack",
            "--ctstate", "RELATED,ESTABLISHED",
            "-j", "ACCEPT",
        )
    }

結尾標志寫入

    writeLine(proxier.filterRules, "COMMIT")
    writeLine(proxier.natRules, "COMMIT")

匯集與加載 iptables 配置規(guī)則數(shù)據(jù)

!FILENAME pkg/proxy/iptables/proxier.go:1326

  //匯集前面所處理的filter和nat表數(shù)據(jù)至iptablesData
  proxier.iptablesData.Reset()
    proxier.iptablesData.Write(proxier.filterChains.Bytes())
    proxier.iptablesData.Write(proxier.filterRules.Bytes())
    proxier.iptablesData.Write(proxier.natChains.Bytes())
    proxier.iptablesData.Write(proxier.natRules.Bytes())

    klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
// iptables-restore加載新配置(iptablesData)
    err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
    if err != nil {
        klog.Errorf("Failed to execute iptables-restore: %v", err)
        // Revert new local ports.
        klog.V(2).Infof("Closing local ports after iptables-restore failure")
        utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
        return
    }

IPtables 底層的 runner 實現(xiàn)

前面基本已看完整個proxy的執(zhí)行流程弄兜,最后iptables proxier是如何使用系統(tǒng)層iptables命令進行底層的iptables規(guī)則CRUD操作(通俗的理解:iptables proxier實現(xiàn)都是在操作iptables命令生成相應的規(guī)則),下面我來看一下kuber-proxy組件底層iptables操作器的封裝瓷式。

iptables 執(zhí)行器

Interface**接口為運行iptables命令定義

!FILENAME pkg/util/iptables/iptables.go:45

//接口與接口方法定義
type Interface interface {
    GetVersion() (string, error)
    EnsureChain(table Table, chain Chain) (bool, error)
    FlushChain(table Table, chain Chain) error
    DeleteChain(table Table, chain Chain) error
    EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
    DeleteRule(table Table, chain Chain, args ...string) error
    IsIpv6() bool
    SaveInto(table Table, buffer *bytes.Buffer) error
    Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
    RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
    AddReloadFunc(reloadFunc func())
    Destroy()
}

iptables Interface接實現(xiàn)類runner,完成對iptables命令執(zhí)行器的定義

!FILENAME pkg/util/iptables/iptables.go:135

// 類結構定義
type runner struct {
    mu              sync.Mutex              //同步鎖 
    exec            utilexec.Interface      //osExec命令執(zhí)行
    dbus            utildbus.Interface      //D-Bus操作API接口 
    protocol        Protocol                //協(xié)議IPv4/IPv6
    hasCheck        bool                    //"-C"檢測命令flag
  hasListener     bool                    //D-Bus信號監(jiān)聽是否開啟(FirewallD start/restart)
    waitFlag        []string                //iptables命令"wait"flag,等待xtables鎖
    restoreWaitFlag []string                //iptables-restore命令"wait"flag
    lockfilePath    string                  //xtables鎖文件位置

    reloadFuncs []func()                    //定義reload處理func
    signal      chan *godbus.Signal         //dbus信號
}

// Runner實現(xiàn)Iterface方法列表替饿,后面將詳細分析關鍵的方法實現(xiàn)代碼邏輯
func (runner *runner) GetVersion() (string, error)
func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) 
func (runner *runner) FlushChain(table Table, chain Chain) error 
func (runner *runner) DeleteChain(table Table, chain Chain) error
func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) 
func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error
func (runner *runner) IsIpv6() bool 
func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error
func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error 
func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
func (runner *runner) AddReloadFunc(reloadFunc func()) 
func (runner *runner) Destroy() 

// Runner內(nèi)部方法列表
func (runner *runner) connectToFirewallD()
func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
func (runner *runner) run(op operation, args []string) ([]byte, error)
func (runner *runner) runContext(ctx context.Context, op operation, args []string) ([]byte, error)
func (runner *runner) checkRule(table Table, chain Chain, args ...string) (bool, error)
func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...string) (bool, error) 
func (runner *runner) checkRuleUsingCheck(args []string) (bool, error)
func (runner *runner) dbusSignalHandler(bus utildbus.Connection)
func (runner *runner) reload()

iptables runner對象的構造New() -> newInternal(),返回runner{…}實例化對象(Interface接口類型) ,完成了創(chuàng)建一個iptables的命令執(zhí)行器生成工作贸典。

!FILENAME pkg/util/iptables/iptables.go:152

func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol, lockfilePath string) Interface {
    vstring, err := getIPTablesVersionString(exec, protocol)   //iptables版本獲取
    if err != nil {
        klog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err)
        vstring = MinCheckVersion
    }

    if lockfilePath == "" {
        lockfilePath = LockfilePath16x                      //默認鎖文件位置"/run/xtables.lock"
    }

    runner := &runner{
        exec:            exec,                                         //utilexec = osExec封裝
        dbus:            dbus,                                         //utildbus
        protocol:        protocol,                                     //IPv4 or IPv6
        hasCheck:        getIPTablesHasCheckCommand(vstring),          //"-C" flag是否指定
        hasListener:     false,
        waitFlag:        getIPTablesWaitFlag(vstring),                 //iptables -wait
        restoreWaitFlag: getIPTablesRestoreWaitFlag(exec, protocol),   //iptables-restore -wait
        lockfilePath:    lockfilePath,                                 //xtables鎖文件位置
    }
    return runner
}

// 返回iptables exec命令執(zhí)行器對象runner
func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface {
    return newInternal(exec, dbus, protocol, "")     
}

iptables 執(zhí)行器方法

runner.run() 這個是方法是runner最基礎和公共調(diào)用的內(nèi)部方法视卢,也就是iptables命令執(zhí)行os exec調(diào)用代碼。run()有兩個傳參:1. 指定iptables操作command廊驼,2.參數(shù)列表据过。通過傳參將組成一個完整的iptables命令進行exec調(diào)用執(zhí)行。runContext()此方法內(nèi)含有帶context上下文和不帶context兩種執(zhí)行方式妒挎。

!FILENAME pkg/util/iptables/iptables.go:218

func (runner *runner) run(op operation, args []string) ([]byte, error) {
    return runner.runContext(nil, op, args)
}

func (runner *runner) runContext(ctx context.Context, op operation, args []string) ([]byte, error) {
    iptablesCmd := iptablesCommand(runner.protocol)  // "iptabels or ip6tables"
    fullArgs := append(runner.waitFlag, string(op))
    fullArgs = append(fullArgs, args...)
    klog.V(5).Infof("running iptables %s %v", string(op), args)
  // 根據(jù)是否傳有Context上下文绳锅,調(diào)用不同的執(zhí)行command/commandContext
    if ctx == nil {
        return runner.exec.Command(iptablesCmd, fullArgs...).CombinedOutput()
    }
    return runner.exec.CommandContext(ctx, iptablesCmd, fullArgs...).CombinedOutput()
}

//支持的iptables操作commands
type operation string

//runner.exec實現(xiàn)是osexec命令的執(zhí)行
func New() Interface {
    return &executor{}
}

// Command is part of the Interface interface.
func (executor *executor) Command(cmd string, args ...string) Cmd {
    return (*cmdWrapper)(osexec.Command(cmd, args...))
}

// CommandContext is part of the Interface interface.
func (executor *executor) CommandContext(ctx context.Context, cmd string, args ...string) Cmd {
    return (*cmdWrapper)(osexec.CommandContext(ctx, cmd, args...))
}

runner.GetVersion() 獲取系統(tǒng)安裝的iptables版本信息,格式為 "X.Y.Z"

!FILENAME pkg/util/iptables/iptables.go:218

func (runner *runner) GetVersion() (string, error) {
    return getIPTablesVersionString(runner.exec, runner.protocol)
}

func getIPTablesVersionString(exec utilexec.Interface, protocol Protocol) (string, error) {
    // 執(zhí)行命令"iptables or ip6tables --version"
    iptablesCmd := iptablesCommand(protocol)   
    bytes, err := exec.Command(iptablesCmd, "--version").CombinedOutput()
    if err != nil {
        return "", err
    }
  // 正則匹配酝掩,查找版本字符串鳞芙,格式為 "X.Y.Z"
    versionMatcher := regexp.MustCompile("v([0-9]+(\\.[0-9]+)+)")
    match := versionMatcher.FindStringSubmatch(string(bytes))
    if match == nil {
        return "", fmt.Errorf("no iptables version found in string: %s", bytes)
    }
    return match[1], nil
}

runner.EnsureChain() "-N" 檢測指定的規(guī)則鏈是否存在,如果不存則創(chuàng)建此鏈,存在則返回true

!FILENAME pkg/util/iptables/iptables.go:223

func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) {
    fullArgs := makeFullArgs(table, chain)

    runner.mu.Lock()
    defer runner.mu.Unlock()

  //執(zhí)行"iptables -t $tableName -N $chainName"
    out, err := runner.run(opCreateChain, fullArgs)
    if err != nil {
        if ee, ok := err.(utilexec.ExitError); ok {
            if ee.Exited() && ee.ExitStatus() == 1 {
                return true, nil
            }
        }
        return false, fmt.Errorf("error creating chain %q: %v: %s", chain, err, out)
    }
    return false, nil
}

runner.FlushChain() "-F" 清空指定鏈

!FILENAME pkg/util/iptables/iptables.go:242

func (runner *runner) FlushChain(table Table, chain Chain) error {
    fullArgs := makeFullArgs(table, chain)
  //...
  //執(zhí)行"iptables -t $tableName -F $chainName"
    out, err := runner.run(opFlushChain, fullArgs)
  //...
}

runner.DeleteChain() "-X" 刪除指定的鏈

!FILENAME pkg/util/iptables/iptables.go:256

func (runner *runner) DeleteChain(table Table, chain Chain) error {
    fullArgs := makeFullArgs(table, chain)
  //...
  //執(zhí)行"iptables -t $tableName -X $chainName"
    out, err := runner.run(opDeleteChain, fullArgs)
  //...
}

runner.EnsureRule() 檢測規(guī)則是否存在原朝,不存在則指定的"表內(nèi)鏈上", 指定position添加規(guī)則

!FILENAME pkg/util/iptables/iptables.go:271

func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) {
    fullArgs := makeFullArgs(table, chain, args...)

    runner.mu.Lock()
    defer runner.mu.Unlock()
  // 檢測規(guī)則是否存在
    exists, err := runner.checkRule(table, chain, args...)
    if err != nil {
        return false, err
    }
    if exists {
        return true, nil
    }
  // RulePosition "-I" "-A"
  // 指定鏈序插入規(guī)則,執(zhí)行"iptables  -I $chainName -t $tableName ... "
  // 鏈末添加規(guī)則,執(zhí)行"iptables  -A $chainName -t $tableName ... "    
    out, err := runner.run(operation(position), fullArgs)  
    if err != nil {
        return false, fmt.Errorf("error appending rule: %v: %s", err, out)
    }
    return false, nil
}

//checkRule()先判斷iptables是否支持"-C"flag,調(diào)用不同版本的檢測rule的方法
func (runner *runner) checkRule(table Table, chain Chain, args ...string) (bool, error) {
    if runner.hasCheck {
        return runner.checkRuleUsingCheck(makeFullArgs(table, chain, args...))
    }
    return runner.checkRuleWithoutCheck(table, chain, args...)
}

//支持"-C"flag
func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()
  //...
  //執(zhí)行"iptables -wait -C $chainName -t $tableName ... "
    out, err := runner.runContext(ctx, opCheckRule, args)
  //...
}

 //不支持"-C"flag驯嘱,為了兼容iptables版本<1.4.11 
func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...string) (bool, error) {
  // 'iptables-save -t $tableName'
    iptablesSaveCmd := iptablesSaveCommand(runner.protocol)
    klog.V(1).Infof("running %s -t %s", iptablesSaveCmd, string(table))
    out, err := runner.exec.Command(iptablesSaveCmd, "-t", string(table)).CombinedOutput()
    if err != nil {
        return false, fmt.Errorf("error checking rule: %v", err)
    }

    //移除引號
    var argsCopy []string
    for i := range args {
        tmpField := strings.Trim(args[i], "\"")
        tmpField = trimhex(tmpField)
        argsCopy = append(argsCopy, strings.Fields(tmpField)...)
    }
    argset := sets.NewString(argsCopy...)

    for _, line := range strings.Split(string(out), "\n") {
        var fields = strings.Fields(line)

        //檢測rule的鏈是否一致
        if !strings.HasPrefix(line, fmt.Sprintf("-A %s", string(chain))) || len(fields) != len(argsCopy)+2 {
            continue
        }

    // 移除所有引號
        for i := range fields {
            fields[i] = strings.Trim(fields[i], "\"")
            fields[i] = trimhex(fields[i])
        }

        //字符集匹配查找是否存在
        if sets.NewString(fields...).IsSuperset(argset) {
            return true, nil
        }
        klog.V(5).Infof("DBG: fields is not a superset of args: fields=%v  args=%v", fields, args)
    }

    return false, nil
}

runner.DeleteRule() "-D" 指定的"表中鏈上"刪除規(guī)則

!FILENAME pkg/util/iptables/iptables.go:292

func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error {
    fullArgs := makeFullArgs(table, chain, args...)
  //...
  //檢測規(guī)則是否存在
    exists, err := runner.checkRule(table, chain, args...)
  //...
  //執(zhí)行"iptables -D $chainName -t $tableName ..."
    out, err := runner.run(opDeleteRule, fullArgs)
  //...
}

runner.SaveInto() 保存指定表的iptables規(guī)則集(buffer內(nèi))

!FILENAME pkg/util/iptables/iptables.go:317

func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error {
  //...
  // 執(zhí)行 "iptables-save -t $tableName"
    iptablesSaveCmd := iptablesSaveCommand(runner.protocol)
    args := []string{"-t", string(table)}

    cmd := runner.exec.Command(iptablesSaveCmd, args...)

    cmd.SetStdout(buffer)
    cmd.SetStderr(buffer)
    return cmd.Run()
}

runner.Restore() 裝載指定表由iptables-save保存的規(guī)則集(從標準輸入接收輸入)

!FILENAME pkg/util/iptables/iptables.go:340

func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
    // "iptables-restore -T $tableName"
    args := []string{"-T", string(table)}
    return runner.restoreInternal(args, data, flush, counters) //call and return
}

// restoreInternal()參數(shù)組裝和iptables-restore命令恢復規(guī)則集data
func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
    runner.mu.Lock()
    defer runner.mu.Unlock()

    trace := utiltrace.New("iptables restore")
    defer trace.LogIfLong(2 * time.Second)
   
  //參數(shù)的組裝 "--noflush" "--counters" "--wait"
    if !flush {
        args = append(args, "--noflush")
    }
    if counters {
        args = append(args, "--counters")
    }
    if len(runner.restoreWaitFlag) == 0 {
        locker, err := grabIptablesLocks(runner.lockfilePath)
        if err != nil {
            return err
        }
        trace.Step("Locks grabbed")
        defer func(locker iptablesLocker) {
            if err := locker.Close(); err != nil {
                klog.Errorf("Failed to close iptables locks: %v", err)
            }
        }(locker)
    }


    fullArgs := append(runner.restoreWaitFlag, args...)
    iptablesRestoreCmd := iptablesRestoreCommand(runner.protocol)
    klog.V(4).Infof("running %s %v", iptablesRestoreCmd, fullArgs)
  
  // "iptables-restore -T $tableName --wait --noflush --counters < data"
    cmd := runner.exec.Command(iptablesRestoreCmd, fullArgs...)
  //從標準輸入接受輸入規(guī)則集data
    cmd.SetStdin(bytes.NewBuffer(data))
  //command對象執(zhí)行與輸出反饋
    b, err := cmd.CombinedOutput()
    if err != nil {
        return fmt.Errorf("%v (%s)", err, b)
    }
    return nil
}

**runner.RestoreAll() ** 如同上Restore(),調(diào)用命令iptables-restore裝載所有備份規(guī)則集

!FILENAME pkg/util/iptables/iptables.go:347

func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
    args := make([]string, 0)
  //同上喳坠,無參數(shù)限制
    return runner.restoreInternal(args, data, flush, counters) 
}

**runner.AddReloadFunc() **注冊reload回調(diào)函數(shù)鞠评,實現(xiàn)iptables reload重新加載規(guī)則

!FILENAME pkg/util/iptables/iptables.go:679

func (runner *runner) AddReloadFunc(reloadFunc func()) {
    runner.mu.Lock()
    defer runner.mu.Unlock()

    //是否已啟動監(jiān)聽
    if !runner.hasListener {
        runner.connectToFirewallD()  //啟動D-bus監(jiān)聽
    }
    runner.reloadFuncs = append(runner.reloadFuncs, reloadFunc) //注冊信號觸發(fā)回調(diào)func
}

//通過Linux內(nèi)核D-bus機制實現(xiàn)對FirewallD進程的信號監(jiān)聽與處理(實現(xiàn)reload iptables規(guī)則)
func (runner *runner) connectToFirewallD() {
    bus, err := runner.dbus.SystemBus()
    if err != nil {
        klog.V(1).Infof("Could not connect to D-Bus system bus: %s", err)
        return
    }
    runner.hasListener = true

  //SystemBus對象添加匹配規(guī)則定義(firewalld)
    rule := fmt.Sprintf("type='signal',sender='%s',path='%s',interface='%s',member='Reloaded'", firewalldName, firewalldPath, firewalldInterface)
    bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)

    rule = fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", firewalldName)
    bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)

    runner.signal = make(chan *godbus.Signal, 10)
    bus.Signal(runner.signal)

    go runner.dbusSignalHandler(bus)  //D-Bus信號監(jiān)聽處理Handler
}

//goroutine監(jiān)聽D-Bus信號,監(jiān)聽FirewallD發(fā)生變化和reload信號則reload規(guī)則集
func (runner *runner) dbusSignalHandler(bus utildbus.Connection) {
    firewalld := bus.Object(firewalldName, firewalldPath)

    for s := range runner.signal {
        if s == nil {
            // 反注冊dbus
            bus.Signal(runner.signal)
            return
        }

        switch s.Name {
        case "org.freedesktop.DBus.NameOwnerChanged":  //信號:指定名稱的擁有者發(fā)生了變化
            name := s.Body[0].(string)
            new_owner := s.Body[2].(string)

      // 信號名稱為"org.fedoraproject.FirewallD1"
            if name != firewalldName || len(new_owner) == 0 {
                continue
            }

            firewalld.Call(firewalldInterface+".getDefaultZone", 0)

      runner.reload()  //重新加載與同步規(guī)則(遍歷調(diào)用runner.reloadFuncs())
        case firewalldInterface + ".Reloaded":
            runner.reload()
        }
    }
}

runner.Destroy() D-bus監(jiān)聽注消

!FILENAME pkg/util/iptables/iptables.go:218

func (runner *runner) Destroy() {
    if runner.signal != nil {
        runner.signal <- nil            //D-Bug信號channel置為空實現(xiàn)反注冊
    }
}

上面為kube-proxy第三層的iptables Proxier代碼分析所有內(nèi)容丙笋,對于另外兩種模式ipvs谢澈、userspace模式的proxier實現(xiàn)代碼分析可查詢userspace-mode proxier和ipvs-mode proxier文章內(nèi)容。

~本文 END~

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末御板,一起剝皮案震驚了整個濱河市锥忿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌怠肋,老刑警劉巖敬鬓,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異笙各,居然都是意外死亡钉答,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進店門杈抢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來数尿,“玉大人,你說我怎么就攤上這事惶楼∮冶模” “怎么了?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵歼捐,是天一觀的道長何陆。 經(jīng)常有香客問我,道長豹储,這世上最難降的妖魔是什么贷盲? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮剥扣,結果婚禮上巩剖,老公的妹妹穿的比我還像新娘。我一直安慰自己钠怯,他們只是感情好球及,可當我...
    茶點故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著呻疹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上刽锤,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天镊尺,我揣著相機與錄音,去河邊找鬼并思。 笑死庐氮,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的宋彼。 我是一名探鬼主播弄砍,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼输涕!你這毒婦竟也來了音婶?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤莱坎,失蹤者是張志新(化名)和其女友劉穎衣式,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體檐什,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡碴卧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了乃正。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片住册。...
    茶點故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖瓮具,靈堂內(nèi)的尸體忽然破棺而出荧飞,到底是詐尸還是另有隱情,我是刑警寧澤搭综,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布垢箕,位于F島的核電站,受9級特大地震影響兑巾,放射性物質(zhì)發(fā)生泄漏条获。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一蒋歌、第九天 我趴在偏房一處隱蔽的房頂上張望帅掘。 院中可真熱鬧,春花似錦堂油、人聲如沸修档。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吱窝。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間院峡,已是汗流浹背兴使。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留照激,地道東北人发魄。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像俩垃,于是被迫代替她去往敵國和親励幼。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容