syncProxyRule 同步配置與規(guī)則
proxier.syncProxyRules() 實現(xiàn)監(jiān)聽svc或ep更新配置到iptables規(guī)則的一致性同步機制功能,這也是iptables proxer最核心的邏輯代碼狠半。作者實現(xiàn)是利用了iptables-save/iptables-restore機制將現(xiàn)存的iptables配置和服務與端點同步的信息來生成相對應的iptables鏈與規(guī)則數(shù)據(jù)幅骄,每次同步執(zhí)行寫入可restore標準格式的規(guī)則數(shù)據(jù)后通過iptables-restore命令進行重設iptables規(guī)則遣蚀。
這個同步規(guī)則處理代碼比較長色乾,我們后面將分解成小塊來講解。下面為syncProxyRules(部分已解析注釋替代)代碼框架說明窍奋,內(nèi)注釋的每塊內(nèi)容在后面都將有單獨代碼分析說明荐健。
!FILENAME pkg/proxy/iptables/proxier.go:634
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInMicroseconds(start))
klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || !proxier.servicesSynced {
klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}
// 檢測與更新變化(service/endpoints)
//...
// 創(chuàng)建與聯(lián)接kube鏈
//...
// 獲取現(xiàn)存在的Filter/Nat表鏈數(shù)據(jù)
//...
// 創(chuàng)建iptables-save/restore格式數(shù)據(jù)(表頭、鏈)
//...
// 寫kubernets特有的SNAT地址偽裝規(guī)則
//...
// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{}
// Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
args := make([]string, 64)
// Compute total number of endpoint chains across all services.
proxier.endpointChainsNumber = 0
for svcName := range proxier.serviceMap {
proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName])
}
// 為個"服務"創(chuàng)建rules(service portal規(guī)則的創(chuàng)建)
for svcName, svc := range proxier.serviceMap {
//...
}
// 刪除不再使用的鏈
//...
// nodeports鏈
//...
// FORWARD策略
//...
// 配置clusterCIDR規(guī)則
//...
// 寫結整標簽
//...
//匯集與iptables-restore加載數(shù)據(jù)
//...
// 關閉過舊的本地端口琳袄,更新portmap數(shù)據(jù)
for k, v := range proxier.portsMap {
if replacementPortsMap[k] == nil {
v.Close()
}
}
proxier.portsMap = replacementPortsMap
// 更新healthz timestamp.
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
// 更新healthchecks.
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.Errorf("Error syncing healthcheck services: %v", err)
}
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
}
// 完成清理工作
for _, svcIP := range staleServices.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
}
}
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}
下面將分解詳述每塊代碼邏輯:
更新 service 和 endpoints ;返回更新結果
!FILENAME pkg/proxy/iptables/proxier.go:652
//更新SVC/EP
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
staleServices := serviceUpdateResult.UDPStaleClusterIP
// 從EndpointsMap更新結果返回中合并UDP協(xié)議廢棄服務信息
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP {
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString())
staleServices.Insert(svcInfo.ClusterIPString())
}
}
UpdateServiceMap() SVC 服務的更新實現(xiàn)
!FILENAME pkg/proxy/service.go:212
func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
result.UDPStaleClusterIP = sets.NewString() // 已廢棄的UDP端口
serviceMap.apply(changes, result.UDPStaleClusterIP) // 應用更新map->
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap {
if info.GetHealthCheckNodePort() != 0 {
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort()) //健康檢測的node Port
}
}
return result
}
**serviceMap.apply() **應用更新變化事件的服務項(merge->filter->unmerge)
!FILENAME pkg/proxy/service.go:268
func (serviceMap *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
serviceMap.merge(change.current) //合并變更增加項到serviceMap ①
change.previous.filter(change.current) //過濾掉已處理更新變化的項江场,跳過unmerge處理 ②
serviceMap.unmerge(change.previous, UDPStaleClusterIP) //刪除廢棄的項 ③
}
// clear changes after applying them to ServiceMap.
changes.items = make(map[types.NamespacedName]*serviceChange)
return
}
① serviceMap.merge() 合并增加"other“內(nèi)容到當前的serviceMap內(nèi)。"即將變化的服務列表進行合并”窖逗。
!FILENAME pkg/proxy/service.go:301
func (sm *ServiceMap) merge(other ServiceMap) sets.String {
existingPorts := sets.NewString()
for svcPortName, info := range other {
existingPorts.Insert(svcPortName.String())
_, exists := (*sm)[svcPortName]
//...
(*sm)[svcPortName] = info
}
return existingPorts
}
② serviceMap.unmerge() 從當前map移除"other"存在的內(nèi)容項址否。"即刪除廢棄的項"
!FILENAME pkg/proxy/service.go:330
func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
for svcPortName := range other {
info, exists := (*sm)[svcPortName]
if exists {
if info.GetProtocol() == v1.ProtocolUDP {
UDPStaleClusterIP.Insert(info.ClusterIPString()) //存儲已廢丟UDP服務的集群IP列表
}
delete(*sm, svcPortName)
} //...
}
}
③ serviceMap.filter() 基于"other"給定的服務端口名(key值),過濾掉存在于serviceMap的項
!FILENAME pkg/proxy/service.go:319
func (sm *ServiceMap) filter(other ServiceMap) {
for svcPortName := range *sm {
if _, ok := other[svcPortName]; ok {
delete(*sm, svcPortName)
}
}
}
UpdateEndpointsMap() 端點更新的實現(xiàn)
!FILENAME pkg/proxy/endpoints.go:163
func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
result.StaleEndpoints = make([]ServiceEndpoint, 0)
result.StaleServiceNames = make([]ServicePortName, 0)
endpointsMap.apply(changes, &result.StaleEndpoints, &result.StaleServiceNames)
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
localIPs := GetLocalEndpointIPs(endpointsMap)
for nsn, ips := range localIPs {
result.HCEndpointsLocalIPSize[nsn] = len(ips)
}
return result
}
EndpointsMap.apply() 應用更新變化事件的端點項(merge->unmerge)
!FILENAME pkg/proxy/endpoints.go:242
func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
if changes == nil {
return
}
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
endpointsMap.Unmerge(change.previous) // 刪除 ①
endpointsMap.Merge(change.current) // 更新 ②
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) //廢棄查找 ③
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
}
① EndpointsMap.Merge() 將"other"內(nèi)指定項的值(端點列表)更新至EndpointMap
!FILENAME pkg/proxy/endpoints.go:259
func (em EndpointsMap) Merge(other EndpointsMap) {
for svcPortName := range other {
em[svcPortName] = other[svcPortName]
}
}
② EndpointsMap.Unmerge() 刪除"other"內(nèi)指定項
!FILENAME pkg/proxy/endpoints.go:266
func (em EndpointsMap) Unmerge(other EndpointsMap) {
for svcPortName := range other {
delete(em, svcPortName)
}
}
③ EndpointsMap.detectStaleConnections() 查找廢棄后端連接信息項
!FILENAME pkg/proxy/endpoints.go:291
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
for svcPortName, epList := range oldEndpointsMap {
for _, ep := range epList {
stale := true
for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) { //存在則stale為否
stale = false
break
}
}
if stale {
klog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.String())
*staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName}) //存儲廢棄的endpoint列表
}
}
}
for svcPortName, epList := range newEndpointsMap {
//對于UDP服務碎紊,如果后端變化從0至非0佑附,可能存在conntrack項將服務的流量黑洞
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
*staleServiceNames = append(*staleServiceNames, svcPortName)
//存儲廢棄的服務名列表
}
}
}
創(chuàng)建與聯(lián)接 kube 鏈
filter表中INPUT鏈頭部插入自定義鏈調(diào)轉到KUBE-EXTERNAL-SERVICES鏈
iptables -I "INPUT" -t "filter" -m "conntrack" --ctstate "NEW" -m comment --comment "kubernetes externally-visible service portals" -j "KUBE-EXTERNAL-SERVICES"filter表中OUTPUT鏈頭部插入自定義鏈調(diào)轉到KUBE-SERVICE鏈
iptables -I "OUTPUT" -t "filter" -m "conntrack" --ctstate "NEW" -m comment --comment "kubernetes service portals" -j "KUBE-SERVICES"nat表中OUTPUT鏈頭部插入自定義鏈調(diào)轉到KUBE-SERVICES鏈
iptables -I "OUTPUT" -t "nat" -m comment --comment "kubernetes service portals" -j "KUBE-SERVICES"nat表中PREROUTING鏈頭部插入自定義鏈調(diào)轉到KUBE-SERVICES鏈
iptables -I "PREROUTING" -t "nat" -m comment --comment "kubernetes service portals" -j "KUBE-SERVICES"nat表中POSTROUTING鏈頭部插入自定義鏈調(diào)轉到KUBE-POSTROUTING鏈
iptables -I "POSTROUTING" -t "nat" -m comment --comment "kubernetes postrouting rules" -j "KUBE-POSTROUTING"filter表中FORWARD鏈頭部插入自定義鏈調(diào)轉到KUBE-FORWARD鏈
iptables -I "FORWARD" -t "filter" -m comment --comment "kubernetes forwarding rules" -j "KUBE-FORWARD"
!FILENAME pkg/proxy/iptables/proxier.go:667
//循環(huán)iptablesJumpChains定義
for _, chain := range iptablesJumpChains {
//底層命令iptables -t $tableName -N $chainName
if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil {
klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err)
return
}
args := append(chain.extraArgs,
"-m", "comment", "--comment", chain.comment,
"-j", string(chain.chain),
)
//底層命令iptables -I $chainName -t $tableName -m comment --comment $comment -j $chain
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil {
klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err)
return
}
}
創(chuàng)建 Iptables 基礎數(shù)據(jù)
- 獲取現(xiàn)存在的Filter/Nat表鏈數(shù)據(jù)
- 創(chuàng)建iptables-save/restore格式數(shù)據(jù)(表頭、鏈)
- 創(chuàng)建SNAT地址偽裝規(guī)則
!FILENAME pkg/proxy/iptables/proxier.go:688
//現(xiàn)存在的filter表鏈獲取
existingFilterChains := make(map[utiliptables.Chain][]byte)
proxier.existingFilterChainsData.Reset()
err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData) //通過iptables-save方式來獲取
if err != nil {
klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else {
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes()) //輸出結果
}
//同上仗考,現(xiàn)存在的nat表鏈獲取
existingNATChains := make(map[utiliptables.Chain][]byte)
proxier.iptablesData.Reset()
err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
if err != nil {
klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else {
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
}
// Reset all buffers used later.
// This is to avoid memory reallocations and thus improve performance.
proxier.filterChains.Reset()
proxier.filterRules.Reset()
proxier.natChains.Reset()
proxier.natRules.Reset()
// 寫表頭
writeLine(proxier.filterChains, "*filter")
writeLine(proxier.natChains, "*nat")
寫鏈數(shù)據(jù)
fileter: "KUBE-SERVICES" / "KUBE-EXTERNAL-SERVICES"/ "KUBE-FORWARD"
nat: "KUBE-SERVICES" / "KUBE-NODEPORTS" / "KUBE-POSTROUTING" / "KUBE-MARK-MASQ"
!FILENAME pkg/proxy/iptables/proxier.go:720
// 寫chain鏈數(shù)據(jù),將filter和Nat相關鏈格式化存放buffer
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
if chain, ok := existingFilterChains[chainName]; ok {
writeBytesLine(proxier.filterChains, chain)
} else {
// iptables-save/restore格式的鏈行":$chainName - [0:0]"
writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
}
}
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
if chain, ok := existingNATChains[chainName]; ok {
writeBytesLine(proxier.natChains, chain)
} else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
}
}
寫地址偽裝規(guī)則音同,在POSTROUTING階段對地址進行MASQUERADE(基于接口動態(tài)IP的SNAT)處理,原始請求源IP將被丟失秃嗜,被請求POD的應用看到為NodeIP或CNI設備IP(bridge/vxlan設備)
!FILENAME pkg/proxy/iptables/proxier.go:738
// 寫kubernets特有的SNAT地址偽裝規(guī)則
// -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE
writeLine(proxier.natRules, []string{
"-A", string(kubePostroutingChain),
"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
"-m", "mark", "--mark", proxier.masqueradeMark,
"-j", "MASQUERADE",
}...)
//-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
writeLine(proxier.natRules, []string{
"-A", string(KubeMarkMasqChain),
"-j", "MARK", "--set-xmark", proxier.masqueradeMark,
}...)
為每個 service 創(chuàng)建 rules
先了解serviceInfo的完整定義說明
!FILENAME pkg/proxy/iptables/proxier.go:141
type serviceInfo struct {
*proxy.BaseServiceInfo
// The following fields are computed and stored for performance reasons.
serviceNameString string
servicePortChainName utiliptables.Chain // KUBE-SVC-XXXX16BitXXXX 服務鏈
serviceFirewallChainName utiliptables.Chain // KUBE-FW-XXXX16BitXXXX Firewall鏈
serviceLBChainName utiliptables.Chain // KUBE-XLB-XXXX16BitXXXX SLB鏈
}
type BaseServiceInfo struct {
ClusterIP net.IP //PortalIP(VIP)
Port int //portal端口
Protocol v1.Protocol //協(xié)議
NodePort int //node節(jié)點端口
LoadBalancerStatus v1.LoadBalancerStatus //LB Ingress
SessionAffinityType v1.ServiceAffinity //會話保持
StickyMaxAgeSeconds int //保持最大時長
ExternalIPs []string //ExternalIPs(指定的node上監(jiān)聽端口)
LoadBalancerSourceRanges []string //過濾源地址流量
HealthCheckNodePort int //HealthCheck檢測端口
OnlyNodeLocalEndpoints bool
}
為每個服務創(chuàng)建服務"KUBE-SVC-XXX…"和外部負載均衡"KUBE-XLB-XXX…"鏈
!FILENAME pkg/proxy/iptables/proxier.go:791
svcChain := svcInfo.servicePortChainName //"KUBE-SVC-XXX..."
if hasEndpoints {
// Create the per-service chain, retaining counters if possible.
if chain, ok := existingNATChains[svcChain]; ok {
writeBytesLine(proxier.natChains, chain)
} else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
}
activeNATChains[svcChain] = true
}
svcXlbChain := svcInfo.serviceLBChainName // "KUBE-XLB-XXX…"
if svcInfo.OnlyNodeLocalEndpoints {
// Only for services request OnlyLocal traffic
// create the per-service LB chain, retaining counters if possible.
if lbChain, ok := existingNATChains[svcXlbChain]; ok {
writeBytesLine(proxier.natChains, lbChain)
} else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
}
activeNATChains[svcXlbChain] = true
}
clusterIP流量的匹配权均,clusterIP為默認方式,僅資源集群內(nèi)可訪問锅锨。
!FILENAME pkg/proxy/iptables/proxier.go:815
//存在端點叽赊,寫規(guī)則
if hasEndpoints {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
"--dport", strconv.Itoa(svcInfo.Port),
)
// proxier配置masqueradeAll
// -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d $clusterIP \
// --dport $port -j KUBE-MARK-MASQ
if proxier.masqueradeAll {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
} else if len(proxier.clusterCIDR) > 0 {
// proxier配置clusterCIDR情況:
// -A KUBE-SERVICES ! -s $clusterCIDR -m comment --comment "..." -m $prot \
// -p $prot -d $clusterIP --dport $port -j KUBE-MARK-MASQ
writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
}
// -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d $clusterIP \
// --dport $port -j KUBE-SVC-XXXX16bitXXXX
writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
} else {
// 無Endpoints的情況纹腌,則創(chuàng)建REJECT規(guī)則
// -A KUBE-SERVICES -m comment --comment $svcName -m $prot -p $prot -d $clusterIP \
// --dport $port -j REJECT
writeLine(proxier.filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
"--dport", strconv.Itoa(svcInfo.Port),
"-j", "REJECT",
)
}
服務是否啟用ExternalIPs(指定的node上開啟監(jiān)聽端口)
!FILENAME pkg/proxy/iptables/proxier.go:846
for _, externalIP := range svcInfo.ExternalIPs {
// 判斷externalIP是否為本node的IP以及協(xié)議為SCTP贾节,且端口是否已開啟
// 如果未開啟則在本地打開監(jiān)聽端口
if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
klog.Errorf("can't determine if IP is local, assuming not: %v", err)
} else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) {
lp := utilproxy.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
Port: svcInfo.Port,
Protocol: protocol,
}
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
//打開與監(jiān)聽本地端口
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, v1.EventTypeWarning, err.Error(), msg)
klog.Error(msg)
continue
}
replacementPortsMap[lp] = socket
}
}
//存在端點萌焰,寫規(guī)則
if hasEndpoints {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.Port),
)
// -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
// $externalIP --dport $port -j KUBE-MARK-MASQ
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
// $externalIP --dport $port -m physdev ! --physdev-is-in \
// -m addrtype ! --src-type Local -j KUBE-SVC-XXXX16bitXXXXX
externalTrafficOnlyArgs := append(args,
"-m", "physdev", "!", "--physdev-is-in",
"-m", "addrtype", "!", "--src-type", "LOCAL")
writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
// -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
// $externalIP --dport $port -m addrtype --dst-type Local
// -j KUBE-SVC-XXXX16bitXXXXX
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
} else {
// 不存在端點信息則reject
// -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
// $externalIP --dport $port -j REJECT
writeLine(proxier.filterRules,
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.Port),
"-j", "REJECT",
)
}
}
服務是否啟用了外部負載均衡服務load-balancer ingress
!FILENAME pkg/proxy/iptables/proxier.go:917
//存在端點淤翔,寫規(guī)則
if hasEndpoints {
fwChain := svcInfo.serviceFirewallChainName //"KUBE-FW-XXXX16bitXXXXX"
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
if ingress.IP != "" {
// 創(chuàng)建服務KUBE-FW-X鏈
if chain, ok := existingNATChains[fwChain]; ok {
writeBytesLine(proxier.natChains, chain)
} else { //原來不存在則新建
writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
}
activeNATChains[fwChain] = true
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)),
"--dport", strconv.Itoa(svcInfo.Port),
)
// -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
// $ingresIP --dport $port -j KUBE-FW-XXXX16bitXXXXX
writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
args = append(args[:0],
"-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
)
// 在KUBE-FW鏈,每個源匹配規(guī)則可能跳轉至一個SVC或XLB鏈
chosenChain := svcXlbChain
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if !svcInfo.OnlyNodeLocalEndpoints {
// -j "KUBE-MARK-MASQ" 地址偽裝實現(xiàn)跨主機訪問
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
chosenChain = svcChain // 選擇為SVC鏈
}
if len(svcInfo.LoadBalancerSourceRanges) == 0 {
// 允許所有源衙伶,直接跳轉
writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
} else {
// 基于source range配置過濾 "-s $srcRanges"
allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges {
writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
_, cidr, _ := net.ParseCIDR(src)
if cidr.Contains(proxier.nodeIP) {
allowFromNode = true //配置CIDR包含節(jié)點IP,則允許來自節(jié)點請求
}
}
// 添加 "-s $ingresIP" 來允許LB后端主機請求
if allowFromNode {
writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...)
}
}
// 條件ingress.IP為空"-j KUBE-MARK-DROP"
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
}
}
}
服務是否啟用了nodeport(在每個節(jié)點上都將開啟一個nodeport端口)
!FILENAME pkg/proxy/iptables/proxier.go:989
if svcInfo.NodePort != 0 {
// 獲取node addresses
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
continue
}
lps := make([]utilproxy.LocalPort, 0)
for address := range addresses {
lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
Port: svcInfo.NodePort,
Protocol: protocol,
}
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
lp.IP = ""
lps = append(lps, lp)
// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
break
}
lps = append(lps, lp) //IP列表
}
// 為node節(jié)點的ips打開端口并保存持有socket句柄
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.GetProtocol() != v1.ProtocolSCTP {
// 打開和監(jiān)聽端口
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
if lp.Protocol == "udp" {
//清理udp conntrack記錄
err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
}
}
replacementPortsMap[lp] = socket //socket保存
}
}
//存在端點,寫規(guī)則
if hasEndpoints {
// -A KUBE-NODEPORTS -m comment --comment "..." -m $prot -p $prot --dport $nodePort
args = append(args[:0],
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcNameString,
"-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort),
)
if !svcInfo.OnlyNodeLocalEndpoints {
//非本地nodeports則需SNAT規(guī)則添加,
// -j KUBE-MARK-MASQ -j KUBE-XLB-XXXX16bitXXXX
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Jump to the service chain.
writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
} else {
loopback := "127.0.0.0/8"
if isIPv6 {
loopback = "::1/128"
}
// 本地nodeports則規(guī)則添加,
// -s $loopback -j KUBE-MARK-MASQ -j KUBE-XLB-XXXX16bitXXXX
writeLine(proxier.natRules, append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...)
writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
}
} else {
// 無hasEndpoints研侣,添加-j reject規(guī)則
// -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m addrtype \
// --dst-type LOCAL -m $prot -p $prot --dport $nodePort -j REJECT
writeLine(proxier.filterRules,
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", "addrtype", "--dst-type", "LOCAL",
"-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort),
"-j", "REJECT",
)
}
}
基于服務名和協(xié)議,生成每個端點鏈
!FILENAME pkg/proxy/iptables/proxier.go:1087
for _, ep := range proxier.endpointsMap[svcName] {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
continue
}
endpoints = append(endpoints, epInfo)
//基于服務名和協(xié)議生成端點鏈名稱 "KUBE-SEP-XXXX16bitXXXX"
endpointChain = epInfo.endpointChain(svcNameString, protocol)
endpointChains = append(endpointChains, endpointChain)
// 創(chuàng)建端點鏈
if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
writeBytesLine(proxier.natChains, chain)
} else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true
}
寫SessionAffinity會話保持規(guī)則炮捧,實現(xiàn)在一段時間內(nèi)保持session affinity庶诡,保持時間為180秒,通過添加“-m recent –rcheck –seconds 180 –reap”的iptables規(guī)則實現(xiàn)了會話保持咆课。
!FILENAME pkg/proxy/iptables/proxier.go:1107
//SessionAffinityType設置為"ClientIP"末誓,則寫session保持規(guī)則
// -A KUBE-SVC-XXXX16bitXXXX -m recent -m comment –comment "..." \
// --name KUBE-SEP-XXXX16bitXXXX --rcheck --seconds 180 --reap \
// -j KUBE-SEP-XXXX16bitXXXX
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains {
args = append(args[:0],
"-A", string(svcChain),
)
proxier.appendServiceCommentLocked(args, svcNameString)
args = append(args,
"-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap",
"-j", string(endpointChain),
)
writeLine(proxier.natRules, args...)
}
}
寫負載均衡和DNAT規(guī)則扯俱,使用“-m statistic –-mode random -–probability ” iptables規(guī)則將后端POD組成一個基于概率訪問的組合,實現(xiàn)服務訪問的負載均衡功能效果。
- 針對服務的每個端點在nat表內(nèi)該service對應的自定義鏈“KUBE-SVC-XXXX16bitXXXX”中加入iptables規(guī)則喇澡。如果該服務對應的endpoints大于等于2迅栅,則添加負載均衡規(guī)則。
- 針對選擇非本地Node上的POD晴玖,需進行DNAT读存,將請求的目標地址設置成后選的POD的IP后進行路由。KUBE-MARK-MASQ將重設(偽裝)源地址
-A KUBE-SVC-XXXX16bitXXXX -m comment –comment "..." -m statistic --mode random --probability $prob -j KUBE-SEP-XXXX16bitXXXX
-A KUBE-SEP-XXXX16bitXXXX -m comment –comment "..." -s $epIp -j "KUBE-MARK-MASQ"
-A KUBE-SVC-XXXX16bitXXXX -m comment –comment "…" -m prot -p $prot -j DNAT --to-destination X.X.X.X:xxx
!FILENAME pkg/proxy/iptables/proxier.go:1123
// 寫負載均衡和DNAT規(guī)則
n := len(endpointChains)
for i, endpointChain := range endpointChains {
epIP := endpoints[i].IP()
if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
}
// 每個服務生成的負載均衡規(guī)則呕屎,后端POD組成一個基于概率訪問的組合
// // -A KUBE-SVC-XXXX16bitXXXX -m comment –comment "..."
// -m statistic --mode random --probability $prob
// -j KUBE-SEP-XXXX16bitXXXX
args = append(args[:0], "-A", string(svcChain))
proxier.appendServiceCommentLocked(args, svcNameString)
if i < (n - 1) { // 當端點大于或等于2
args = append(args,
"-m", "statistic",
"--mode", "random",
"--probability", proxier.probability(n-i))
}
// The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain))
writeLine(proxier.natRules, args...)
// 每個端點鏈規(guī)則
// -A KUBE-SEP-XXXX16bitXXXX -m comment –comment "..." -s $epIp -j "KUBE-MARK-MASQ"
args = append(args[:0], "-A", string(endpointChain))
proxier.appendServiceCommentLocked(args, svcNameString)
// Handle traffic that loops back to the originator with SNAT.
writeLine(proxier.natRules, append(args,
"-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
"-j", string(KubeMarkMasqChain))...)
// 如配置session保持"ClientIP"
// -m recent --name KUBE-SEP-XXXX16bitXXXX --set
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT至最終的端點服務上
// -A KUBE-SVC-XXXX16bitXXXX -m comment –comment "..."
// -m $prot -p $prot -j DNAT --to-destination X.X.X.X:xxx
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
writeLine(proxier.natRules, args...)
}
// 服務請求僅本地流量
localEndpoints := make([]*endpointsInfo, 0)
localEndpointChains := make([]utiliptables.Chain, 0)
for i := range endpointChains {
if endpoints[i].IsLocal {
// These slices parallel each other; must be kept in sync
localEndpoints = append(localEndpoints, endpoints[i])
localEndpointChains = append(localEndpointChains, endpointChains[i])
}
}
啟用clusterCIDR (Kube-proxy中的--cluster-dir
指定的是集群中pod使用的網(wǎng)段让簿,而pod使用的網(wǎng)段和apiserver中指定的service的cluster ip或vip網(wǎng)段不是同一個網(wǎng)段)
!FILENAME pkg/proxy/iptables/proxier.go:1179
// pod -> external VIP流量導向服務VIP(服務鏈)
// -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -s $clusterCIDR
// -j KUBE-SVC-XXXX16bitXXXXX
if len(proxier.clusterCIDR) > 0 {
args = append(args[:0],
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
"-s", proxier.clusterCIDR,
"-j", string(svcChain),
)
writeLine(proxier.natRules, args...)
}
生成本地端點鏈規(guī)則,本地源IP保持(當只在本地選擇POD服務請求時秀睛,則不存在SNAT規(guī)則尔当,可保持源地址IP信息。在nodePort或XLB時蹂安,可定義"externalTrafficPolicy": "Local"
控制向屬于這個service的本地的POD轉發(fā)請求椭迎,如果本地沒有POD能服務這個請求,請求將被DROP掉藤抡,客戶端會發(fā)現(xiàn)請求超時沒有響應侠碧。
!FILENAME pkg/proxy/iptables/proxier.go:1190
numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 {
// 無本地端點,將流量Drop(流量黑洞處理)
// -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -j KUBE-MARK-DROP
args = append(args[:0],
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
"-j",
string(KubeMarkDropChain),
)
writeLine(proxier.natRules, args...)
} else {
// 本地端點會話保持開啟
// -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -m recent \
// --name KUBE-SEP-XXXX16bitXXXX \
// --rcheck --seconds $StickyMaxAge --reap -j KUBE-SEP-XXXX16bitXXXX
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
for _, endpointChain := range localEndpointChains {
writeLine(proxier.natRules,
"-A", string(svcXlbChain),
"-m", "comment", "--comment", svcNameString,
"-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap",
"-j", string(endpointChain))
}
}
// 本地端點負載均衡處理"-m statistic --mode random --probability"
// 后端POD組成一個基于概率訪問的組合
for i, endpointChain := range localEndpointChains {
// Balancing rules in the per-service chain.
args = append(args[:0],
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
)
if i < (numLocalEndpoints - 1) {
// Each rule is a probabilistic match.
args = append(args,
"-m", "statistic",
"--mode", "random",
"--probability", proxier.probability(numLocalEndpoints-i))
}
args = append(args, "-j", string(endpointChain))
// -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -m recent \
// --name KUBE-SEP-XXXX16bitXXXX -m statistic --mode random \
// --probability 0.50000000000 -j KUBE-SEP-XXXX16bitXXXX
writeLine(proxier.natRules, args...)
}
}
配置收尾規(guī)則數(shù)據(jù)
刪除不再使用的服務自定義kube鏈"KUBE-SVC-*"/"KUBE-SEP-*"/"KUBE-FW-*"/"KUBE-XLB-*"缠黍。
!FILENAME pkg/proxy/iptables/proxier.go:1237
for chain := range existingNATChains {
if !activeNATChains[chain] {
chainString := string(chain)
if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
// Ignore chains that aren't ours.
continue
}
// We must (as per iptables) write a chain-line for it, which has
// the nice effect of flushing the chain. Then we can remove the
// chain.
writeBytesLine(proxier.natChains, existingNATChains[chain])
writeLine(proxier.natRules, "-X", chainString)
}
}
添加服務的nodeports規(guī)則(nat表-"KUBE-SERVICES"鏈)
!FILENAME pkg/proxy/iptables/proxier.go:1254
// -A KUBE-SERVICES -m comment --comment "..." -m addrtype --dst-type LOCAL \
// -j KUBE-NODEPORTS
//
// -A KUBE-SERVICES -m comment --comment "..." -d $NODEIP -j KUBE-NODEPORTS
//
addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
klog.Errorf("Failed to get node ip address matching nodeport cidr")
} else {
isIPv6 := proxier.iptables.IsIpv6()
for address := range addresses {
// TODO(thockin, m1093782566): If/when we have dual-stack support we will want to distinguish v4 from v6 zero-CIDRs.
if utilproxy.IsZeroCIDR(address) {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
"-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(kubeNodePortsChain))
writeLine(proxier.natRules, args...)
// Nothing else matters after the zero CIDR.
break
}
// Ignore IP addresses with incorrect version
if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) {
klog.Errorf("IP address %s has incorrect IP version", address)
continue
}
// create nodeport rules for each IP one by one
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
"-d", address,
"-j", string(kubeNodePortsChain))
writeLine(proxier.natRules, args...)
}
}
添加forward規(guī)則(filter表-"KUBE-FORWARD"鏈)
!FILENAME pkg/proxy/iptables/proxier.go:1289
//-A KUBE-FORWARD -m comment --comment "..." -m mark --mark 0xFFFF/0xFFFF -j ACCEPT
writeLine(proxier.filterRules,
"-A", string(kubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding rules"`,
"-m", "mark", "--mark", proxier.masqueradeMark,
"-j", "ACCEPT",
)
添加帶clusterCIDR配置(源/目標)規(guī)則(filter表-"KUBE-FORWARD"鏈)
!FILENAME pkg/proxy/iptables/proxier.go:1297
//Kube-proxy中的cluster-dir指定的是集群中pod使用的網(wǎng)段
//pod使用的網(wǎng)段和service的cluster ip網(wǎng)段不是同一個網(wǎng)段
//
// -A KUBE-FORWARD -s $clusterCIDR -m comment --comment "..." -m conntrack --ctstate \
// RELATED,ESTABLISHED -j ACCEPT
// -A KUBE-FORWARD -m comment --comment "..." -d $clusterCIDR -m conntrack --ctstate \
// RELATED,ESTABLISHED -j ACCEPT
//
if len(proxier.clusterCIDR) != 0 {
writeLine(proxier.filterRules,
"-A", string(kubeForwardChain),
"-s", proxier.clusterCIDR, //指定源
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
writeLine(proxier.filterRules,
"-A", string(kubeForwardChain),
"-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
"-d", proxier.clusterCIDR, //指定目標
"-m", "conntrack",
"--ctstate", "RELATED,ESTABLISHED",
"-j", "ACCEPT",
)
}
結尾標志寫入
writeLine(proxier.filterRules, "COMMIT")
writeLine(proxier.natRules, "COMMIT")
匯集與加載 iptables 配置規(guī)則數(shù)據(jù)
!FILENAME pkg/proxy/iptables/proxier.go:1326
//匯集前面所處理的filter和nat表數(shù)據(jù)至iptablesData
proxier.iptablesData.Reset()
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())
klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
// iptables-restore加載新配置(iptablesData)
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
klog.Errorf("Failed to execute iptables-restore: %v", err)
// Revert new local ports.
klog.V(2).Infof("Closing local ports after iptables-restore failure")
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return
}
IPtables 底層的 runner 實現(xiàn)
前面基本已看完整個proxy的執(zhí)行流程弄兜,最后iptables proxier是如何使用系統(tǒng)層iptables命令進行底層的iptables規(guī)則CRUD操作(通俗的理解:iptables proxier實現(xiàn)都是在操作iptables命令生成相應的規(guī)則),下面我來看一下kuber-proxy組件底層iptables操作器的封裝瓷式。
iptables 執(zhí)行器
Interface**接口為運行iptables命令定義
!FILENAME pkg/util/iptables/iptables.go:45
//接口與接口方法定義
type Interface interface {
GetVersion() (string, error)
EnsureChain(table Table, chain Chain) (bool, error)
FlushChain(table Table, chain Chain) error
DeleteChain(table Table, chain Chain) error
EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
DeleteRule(table Table, chain Chain, args ...string) error
IsIpv6() bool
SaveInto(table Table, buffer *bytes.Buffer) error
Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
AddReloadFunc(reloadFunc func())
Destroy()
}
iptables Interface接實現(xiàn)類runner,完成對iptables命令執(zhí)行器的定義
!FILENAME pkg/util/iptables/iptables.go:135
// 類結構定義
type runner struct {
mu sync.Mutex //同步鎖
exec utilexec.Interface //osExec命令執(zhí)行
dbus utildbus.Interface //D-Bus操作API接口
protocol Protocol //協(xié)議IPv4/IPv6
hasCheck bool //"-C"檢測命令flag
hasListener bool //D-Bus信號監(jiān)聽是否開啟(FirewallD start/restart)
waitFlag []string //iptables命令"wait"flag,等待xtables鎖
restoreWaitFlag []string //iptables-restore命令"wait"flag
lockfilePath string //xtables鎖文件位置
reloadFuncs []func() //定義reload處理func
signal chan *godbus.Signal //dbus信號
}
// Runner實現(xiàn)Iterface方法列表替饿,后面將詳細分析關鍵的方法實現(xiàn)代碼邏輯
func (runner *runner) GetVersion() (string, error)
func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error)
func (runner *runner) FlushChain(table Table, chain Chain) error
func (runner *runner) DeleteChain(table Table, chain Chain) error
func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error
func (runner *runner) IsIpv6() bool
func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error
func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
func (runner *runner) AddReloadFunc(reloadFunc func())
func (runner *runner) Destroy()
// Runner內(nèi)部方法列表
func (runner *runner) connectToFirewallD()
func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
func (runner *runner) run(op operation, args []string) ([]byte, error)
func (runner *runner) runContext(ctx context.Context, op operation, args []string) ([]byte, error)
func (runner *runner) checkRule(table Table, chain Chain, args ...string) (bool, error)
func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...string) (bool, error)
func (runner *runner) checkRuleUsingCheck(args []string) (bool, error)
func (runner *runner) dbusSignalHandler(bus utildbus.Connection)
func (runner *runner) reload()
iptables runner對象的構造New() -> newInternal(),返回runner{…}實例化對象(Interface接口類型) ,完成了創(chuàng)建一個iptables的命令執(zhí)行器生成工作贸典。
!FILENAME pkg/util/iptables/iptables.go:152
func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol, lockfilePath string) Interface {
vstring, err := getIPTablesVersionString(exec, protocol) //iptables版本獲取
if err != nil {
klog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err)
vstring = MinCheckVersion
}
if lockfilePath == "" {
lockfilePath = LockfilePath16x //默認鎖文件位置"/run/xtables.lock"
}
runner := &runner{
exec: exec, //utilexec = osExec封裝
dbus: dbus, //utildbus
protocol: protocol, //IPv4 or IPv6
hasCheck: getIPTablesHasCheckCommand(vstring), //"-C" flag是否指定
hasListener: false,
waitFlag: getIPTablesWaitFlag(vstring), //iptables -wait
restoreWaitFlag: getIPTablesRestoreWaitFlag(exec, protocol), //iptables-restore -wait
lockfilePath: lockfilePath, //xtables鎖文件位置
}
return runner
}
// 返回iptables exec命令執(zhí)行器對象runner
func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface {
return newInternal(exec, dbus, protocol, "")
}
iptables 執(zhí)行器方法
runner.run() 這個是方法是runner最基礎和公共調(diào)用的內(nèi)部方法视卢,也就是iptables命令執(zhí)行os exec調(diào)用代碼。run()有兩個傳參:1. 指定iptables操作command廊驼,2.參數(shù)列表据过。通過傳參將組成一個完整的iptables命令進行exec調(diào)用執(zhí)行。runContext()此方法內(nèi)含有帶context上下文和不帶context兩種執(zhí)行方式妒挎。
!FILENAME pkg/util/iptables/iptables.go:218
func (runner *runner) run(op operation, args []string) ([]byte, error) {
return runner.runContext(nil, op, args)
}
func (runner *runner) runContext(ctx context.Context, op operation, args []string) ([]byte, error) {
iptablesCmd := iptablesCommand(runner.protocol) // "iptabels or ip6tables"
fullArgs := append(runner.waitFlag, string(op))
fullArgs = append(fullArgs, args...)
klog.V(5).Infof("running iptables %s %v", string(op), args)
// 根據(jù)是否傳有Context上下文绳锅,調(diào)用不同的執(zhí)行command/commandContext
if ctx == nil {
return runner.exec.Command(iptablesCmd, fullArgs...).CombinedOutput()
}
return runner.exec.CommandContext(ctx, iptablesCmd, fullArgs...).CombinedOutput()
}
//支持的iptables操作commands
type operation string
//runner.exec實現(xiàn)是osexec命令的執(zhí)行
func New() Interface {
return &executor{}
}
// Command is part of the Interface interface.
func (executor *executor) Command(cmd string, args ...string) Cmd {
return (*cmdWrapper)(osexec.Command(cmd, args...))
}
// CommandContext is part of the Interface interface.
func (executor *executor) CommandContext(ctx context.Context, cmd string, args ...string) Cmd {
return (*cmdWrapper)(osexec.CommandContext(ctx, cmd, args...))
}
runner.GetVersion() 獲取系統(tǒng)安裝的iptables版本信息,格式為 "X.Y.Z"
!FILENAME pkg/util/iptables/iptables.go:218
func (runner *runner) GetVersion() (string, error) {
return getIPTablesVersionString(runner.exec, runner.protocol)
}
func getIPTablesVersionString(exec utilexec.Interface, protocol Protocol) (string, error) {
// 執(zhí)行命令"iptables or ip6tables --version"
iptablesCmd := iptablesCommand(protocol)
bytes, err := exec.Command(iptablesCmd, "--version").CombinedOutput()
if err != nil {
return "", err
}
// 正則匹配酝掩,查找版本字符串鳞芙,格式為 "X.Y.Z"
versionMatcher := regexp.MustCompile("v([0-9]+(\\.[0-9]+)+)")
match := versionMatcher.FindStringSubmatch(string(bytes))
if match == nil {
return "", fmt.Errorf("no iptables version found in string: %s", bytes)
}
return match[1], nil
}
runner.EnsureChain() "-N" 檢測指定的規(guī)則鏈是否存在,如果不存則創(chuàng)建此鏈,存在則返回true
!FILENAME pkg/util/iptables/iptables.go:223
func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) {
fullArgs := makeFullArgs(table, chain)
runner.mu.Lock()
defer runner.mu.Unlock()
//執(zhí)行"iptables -t $tableName -N $chainName"
out, err := runner.run(opCreateChain, fullArgs)
if err != nil {
if ee, ok := err.(utilexec.ExitError); ok {
if ee.Exited() && ee.ExitStatus() == 1 {
return true, nil
}
}
return false, fmt.Errorf("error creating chain %q: %v: %s", chain, err, out)
}
return false, nil
}
runner.FlushChain() "-F" 清空指定鏈
!FILENAME pkg/util/iptables/iptables.go:242
func (runner *runner) FlushChain(table Table, chain Chain) error {
fullArgs := makeFullArgs(table, chain)
//...
//執(zhí)行"iptables -t $tableName -F $chainName"
out, err := runner.run(opFlushChain, fullArgs)
//...
}
runner.DeleteChain() "-X" 刪除指定的鏈
!FILENAME pkg/util/iptables/iptables.go:256
func (runner *runner) DeleteChain(table Table, chain Chain) error {
fullArgs := makeFullArgs(table, chain)
//...
//執(zhí)行"iptables -t $tableName -X $chainName"
out, err := runner.run(opDeleteChain, fullArgs)
//...
}
runner.EnsureRule() 檢測規(guī)則是否存在原朝,不存在則指定的"表內(nèi)鏈上", 指定position添加規(guī)則
!FILENAME pkg/util/iptables/iptables.go:271
func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) {
fullArgs := makeFullArgs(table, chain, args...)
runner.mu.Lock()
defer runner.mu.Unlock()
// 檢測規(guī)則是否存在
exists, err := runner.checkRule(table, chain, args...)
if err != nil {
return false, err
}
if exists {
return true, nil
}
// RulePosition "-I" "-A"
// 指定鏈序插入規(guī)則,執(zhí)行"iptables -I $chainName -t $tableName ... "
// 鏈末添加規(guī)則,執(zhí)行"iptables -A $chainName -t $tableName ... "
out, err := runner.run(operation(position), fullArgs)
if err != nil {
return false, fmt.Errorf("error appending rule: %v: %s", err, out)
}
return false, nil
}
//checkRule()先判斷iptables是否支持"-C"flag,調(diào)用不同版本的檢測rule的方法
func (runner *runner) checkRule(table Table, chain Chain, args ...string) (bool, error) {
if runner.hasCheck {
return runner.checkRuleUsingCheck(makeFullArgs(table, chain, args...))
}
return runner.checkRuleWithoutCheck(table, chain, args...)
}
//支持"-C"flag
func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
//...
//執(zhí)行"iptables -wait -C $chainName -t $tableName ... "
out, err := runner.runContext(ctx, opCheckRule, args)
//...
}
//不支持"-C"flag驯嘱,為了兼容iptables版本<1.4.11
func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...string) (bool, error) {
// 'iptables-save -t $tableName'
iptablesSaveCmd := iptablesSaveCommand(runner.protocol)
klog.V(1).Infof("running %s -t %s", iptablesSaveCmd, string(table))
out, err := runner.exec.Command(iptablesSaveCmd, "-t", string(table)).CombinedOutput()
if err != nil {
return false, fmt.Errorf("error checking rule: %v", err)
}
//移除引號
var argsCopy []string
for i := range args {
tmpField := strings.Trim(args[i], "\"")
tmpField = trimhex(tmpField)
argsCopy = append(argsCopy, strings.Fields(tmpField)...)
}
argset := sets.NewString(argsCopy...)
for _, line := range strings.Split(string(out), "\n") {
var fields = strings.Fields(line)
//檢測rule的鏈是否一致
if !strings.HasPrefix(line, fmt.Sprintf("-A %s", string(chain))) || len(fields) != len(argsCopy)+2 {
continue
}
// 移除所有引號
for i := range fields {
fields[i] = strings.Trim(fields[i], "\"")
fields[i] = trimhex(fields[i])
}
//字符集匹配查找是否存在
if sets.NewString(fields...).IsSuperset(argset) {
return true, nil
}
klog.V(5).Infof("DBG: fields is not a superset of args: fields=%v args=%v", fields, args)
}
return false, nil
}
runner.DeleteRule() "-D" 指定的"表中鏈上"刪除規(guī)則
!FILENAME pkg/util/iptables/iptables.go:292
func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error {
fullArgs := makeFullArgs(table, chain, args...)
//...
//檢測規(guī)則是否存在
exists, err := runner.checkRule(table, chain, args...)
//...
//執(zhí)行"iptables -D $chainName -t $tableName ..."
out, err := runner.run(opDeleteRule, fullArgs)
//...
}
runner.SaveInto() 保存指定表的iptables規(guī)則集(buffer內(nèi))
!FILENAME pkg/util/iptables/iptables.go:317
func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error {
//...
// 執(zhí)行 "iptables-save -t $tableName"
iptablesSaveCmd := iptablesSaveCommand(runner.protocol)
args := []string{"-t", string(table)}
cmd := runner.exec.Command(iptablesSaveCmd, args...)
cmd.SetStdout(buffer)
cmd.SetStderr(buffer)
return cmd.Run()
}
runner.Restore() 裝載指定表由iptables-save保存的規(guī)則集(從標準輸入接收輸入)
!FILENAME pkg/util/iptables/iptables.go:340
func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
// "iptables-restore -T $tableName"
args := []string{"-T", string(table)}
return runner.restoreInternal(args, data, flush, counters) //call and return
}
// restoreInternal()參數(shù)組裝和iptables-restore命令恢復規(guī)則集data
func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
runner.mu.Lock()
defer runner.mu.Unlock()
trace := utiltrace.New("iptables restore")
defer trace.LogIfLong(2 * time.Second)
//參數(shù)的組裝 "--noflush" "--counters" "--wait"
if !flush {
args = append(args, "--noflush")
}
if counters {
args = append(args, "--counters")
}
if len(runner.restoreWaitFlag) == 0 {
locker, err := grabIptablesLocks(runner.lockfilePath)
if err != nil {
return err
}
trace.Step("Locks grabbed")
defer func(locker iptablesLocker) {
if err := locker.Close(); err != nil {
klog.Errorf("Failed to close iptables locks: %v", err)
}
}(locker)
}
fullArgs := append(runner.restoreWaitFlag, args...)
iptablesRestoreCmd := iptablesRestoreCommand(runner.protocol)
klog.V(4).Infof("running %s %v", iptablesRestoreCmd, fullArgs)
// "iptables-restore -T $tableName --wait --noflush --counters < data"
cmd := runner.exec.Command(iptablesRestoreCmd, fullArgs...)
//從標準輸入接受輸入規(guī)則集data
cmd.SetStdin(bytes.NewBuffer(data))
//command對象執(zhí)行與輸出反饋
b, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%v (%s)", err, b)
}
return nil
}
**runner.RestoreAll() ** 如同上Restore(),調(diào)用命令iptables-restore裝載所有備份規(guī)則集
!FILENAME pkg/util/iptables/iptables.go:347
func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
args := make([]string, 0)
//同上喳坠,無參數(shù)限制
return runner.restoreInternal(args, data, flush, counters)
}
**runner.AddReloadFunc() **注冊reload回調(diào)函數(shù)鞠评,實現(xiàn)iptables reload重新加載規(guī)則
!FILENAME pkg/util/iptables/iptables.go:679
func (runner *runner) AddReloadFunc(reloadFunc func()) {
runner.mu.Lock()
defer runner.mu.Unlock()
//是否已啟動監(jiān)聽
if !runner.hasListener {
runner.connectToFirewallD() //啟動D-bus監(jiān)聽
}
runner.reloadFuncs = append(runner.reloadFuncs, reloadFunc) //注冊信號觸發(fā)回調(diào)func
}
//通過Linux內(nèi)核D-bus機制實現(xiàn)對FirewallD進程的信號監(jiān)聽與處理(實現(xiàn)reload iptables規(guī)則)
func (runner *runner) connectToFirewallD() {
bus, err := runner.dbus.SystemBus()
if err != nil {
klog.V(1).Infof("Could not connect to D-Bus system bus: %s", err)
return
}
runner.hasListener = true
//SystemBus對象添加匹配規(guī)則定義(firewalld)
rule := fmt.Sprintf("type='signal',sender='%s',path='%s',interface='%s',member='Reloaded'", firewalldName, firewalldPath, firewalldInterface)
bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
rule = fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", firewalldName)
bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
runner.signal = make(chan *godbus.Signal, 10)
bus.Signal(runner.signal)
go runner.dbusSignalHandler(bus) //D-Bus信號監(jiān)聽處理Handler
}
//goroutine監(jiān)聽D-Bus信號,監(jiān)聽FirewallD發(fā)生變化和reload信號則reload規(guī)則集
func (runner *runner) dbusSignalHandler(bus utildbus.Connection) {
firewalld := bus.Object(firewalldName, firewalldPath)
for s := range runner.signal {
if s == nil {
// 反注冊dbus
bus.Signal(runner.signal)
return
}
switch s.Name {
case "org.freedesktop.DBus.NameOwnerChanged": //信號:指定名稱的擁有者發(fā)生了變化
name := s.Body[0].(string)
new_owner := s.Body[2].(string)
// 信號名稱為"org.fedoraproject.FirewallD1"
if name != firewalldName || len(new_owner) == 0 {
continue
}
firewalld.Call(firewalldInterface+".getDefaultZone", 0)
runner.reload() //重新加載與同步規(guī)則(遍歷調(diào)用runner.reloadFuncs())
case firewalldInterface + ".Reloaded":
runner.reload()
}
}
}
runner.Destroy() D-bus監(jiān)聽注消
!FILENAME pkg/util/iptables/iptables.go:218
func (runner *runner) Destroy() {
if runner.signal != nil {
runner.signal <- nil //D-Bug信號channel置為空實現(xiàn)反注冊
}
}
上面為kube-proxy第三層的iptables Proxier代碼分析所有內(nèi)容丙笋,對于另外兩種模式ipvs谢澈、userspace模式的proxier實現(xiàn)代碼分析可查詢userspace-mode proxier和ipvs-mode proxier文章內(nèi)容。
~本文 END~