// These are effectively const and do not need the mutex to be held. syncPeriod time.Duration minSyncPeriod time.Duration // Values are CIDR's to exclude when cleaning up IPVS rules. excludeCIDRs []*net.IPNet // Set to true to set sysctls arp_ignore and arp_announce strictARP bool iptables utiliptables.Interface // iptables 的执行器,定义了 Iptables 的操作方法 ipvs utilipvs.Interface // iptables 的执行器,定义了 IPVS 的操作方法 ipset utilipset.Interface exec utilexec.Interface masqueradeAll bool masqueradeMark string localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP portMapper netutils.PortOpener // 已打开的UDP或TCP端口 recorder events.EventRecorder // 事件记录者
ipvsScheduler string // Added as a member to the struct to allow injection for testing. ipGetter IPGetter // The following buffers are used to reuse memory and avoid allocations // that are significantly impacting performance. iptablesData *bytes.Buffer filterChainsData *bytes.Buffer natChains *bytes.Buffer filterChains *bytes.Buffer natRules *bytes.Buffer filterRules *bytes.Buffer // Added as a member to the struct to allow injection for testing. netlinkHandle NetLinkHandle // ipsetList is the list of ipsets that ipvs proxier used. ipsetList map[string]*IPSet // Values are as a parameter to select the interfaces which nodeport works. nodePortAddresses []string // networkInterfacer defines an interface for several net library functions. // Inject for test purpose. networkInterfacer utilproxy.NetworkInterfacer gracefuldeleteManager *GracefulTerminationManager }
// 创建kube-proxy命令,并定义执行函数 cmd := &cobra.Command{ Use: "kube-proxy", Long: `The Kubernetes network proxy runs on each node. This reflects services as defined in the Kubernetes API on each node and can do simple TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends. Service cluster IPs and ports are currently found through Docker-links-compatible environment variables specifying ports opened by the service proxy. There is an optional addon that provides cluster DNS for these cluster IPs. The user must create a service with the apiserver API to configure the proxy.`, Run: func(cmd *cobra.Command, args []string) { verflag.PrintAndExitIfRequested() cliflag.PrintFlags(cmd.Flags())
if err := initForOS(opts.WindowsService); err != nil { klog.ErrorS(err, "Failed OS init") // ACTION REQUIRED: Exit code changed from 255 to 1 os.Exit(1) }
if err := opts.Complete(); err != nil { klog.ErrorS(err, "Failed complete") // ACTION REQUIRED: Exit code changed from 255 to 1 os.Exit(1) }
if err := opts.Validate(); err != nil { klog.ErrorS(err, "Failed validate") // ACTION REQUIRED: Exit code changed from 255 to 1 os.Exit(1) }
// Options结构体初始化完成后, 根据配置启动proxy server if err := opts.Run(); err != nil { klog.ErrorS(err, "Error running ProxyServer") os.Exit(1) } }, Args: func(cmd *cobra.Command, args []string)error { for _, arg := range args { iflen(arg) > 0 { return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args) } } returnnil }, } var err error // 为opts对象中的记录运行时配置的config对象进行默认初始化 opts.config, err = opts.ApplyDefaults(opts.config) if err != nil { klog.ErrorS(err, "Unable to create flag defaults") // ACTION REQUIRED: Exit code changed from 255 to 1 os.Exit(1) } // 接收命令FlagSet fs := cmd.Flags() // 增加Flag参数 opts.AddFlags(fs) fs.AddGoFlagSet(goflag.CommandLine) // for --boot-id-file and --machine-id-file // 对以下文件类型进行命令补全提示 _ = cmd.MarkFlagFilename("config", "yaml", "yml", "json")
var iptInterface utiliptables.Interface var ipvsInterface utilipvs.Interface var kernelHandler ipvs.KernelHandler var ipsetInterface utilipset.Interface
ifstring(config.Mode) == proxyModeIPVS && err != nil { klog.ErrorS(err, "Can't use the IPVS proxier") }
if canUseIPVS { ipvsInterface = utilipvs.New(execer) }
// We omit creation of pretty much everything if we run in cleanup mode if cleanupAndExit { return &ProxyServer{ execer: execer, IpvsInterface: ipvsInterface, IpsetInterface: ipsetInterface, }, nil }
var ipt [2]utiliptables.Interface dualStack := true// While we assume that node supports, we do further checks below
if proxyMode != proxyModeUserspace { // Create iptables handlers for both families, one is already created // Always ordered as IPv4, IPv6 // 以IPv4->IPv6的顺序创建iptables处理程序 if primaryProtocol == utiliptables.ProtocolIPv4 { ipt[0] = iptInterface ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIPv6) } else { ipt[0] = utiliptables.New(execer, utiliptables.ProtocolIPv4) ipt[1] = iptInterface }
// 检查内核是否支持iptable接口 for _, perFamilyIpt := range ipt { if !perFamilyIpt.Present() { klog.V(0).InfoS("kube-proxy running in single-stack mode, this ipFamily is not supported", "ipFamily", perFamilyIpt.Protocol()) dualStack = false } } } // 如果代理模式为IPTables if proxyMode == proxyModeIPTables { klog.V(0).InfoS("Using iptables Proxier") if config.IPTables.MasqueradeBit == nil { // MasqueradeBit must be specified or defaulted. returnnil, fmt.Errorf("unable to read IPTables MasqueradeBit from config") } // 是否支持双栈 if dualStack { klog.V(0).InfoS("kube-proxy running in dual-stack mode", "ipFamily", iptInterface.Protocol()) klog.V(0).InfoS("Creating dualStackProxier for iptables") // Always ordered to match []ipt var localDetectors [2]proxyutiliptables.LocalTrafficDetector localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo) if err != nil { returnnil, fmt.Errorf("unable to create proxier: %v", err) }
// TODO this has side effects that should only happen when Run() is invoked. proxier, err = iptables.NewDualStackProxier( ipt, utilsysctl.New(), execer, config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), localDetectors, hostname, nodeIPTuple(config.BindAddress), recorder, healthzServer, config.NodePortAddresses, ) } else { // Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support). var localDetector proxyutiliptables.LocalTrafficDetector localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo) if err != nil { returnnil, fmt.Errorf("unable to create proxier: %v", err) }
// TODO this has side effects that should only happen when Run() is invoked. proxier, err = iptables.NewProxier( iptInterface, utilsysctl.New(), execer, config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), localDetector, hostname, nodeIP, recorder, healthzServer, config.NodePortAddresses, ) }
// 当容器连接到Linux网桥(但不是SDN网桥)时,代理需要br_netfilter和bridge-nf-call-iptables=1。 在大多数插件处理这个问题之前,当配置缺失时要记录日志 if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 { klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables, proxy may not work as intended") }
// 生成用于SNAT规则的mark标记 masqueradeValue := 1 << uint(masqueradeBit) masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
ipFamily := v1.IPv4Protocol if ipt.IsIPv6() { ipFamily = v1.IPv6Protocol }
ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses) nodePortAddresses = ipFamilyMap[ipFamily] // Log the IPs not matching the ipFamily if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { klog.InfoS("Found node IPs of the wrong family", "ipFamily", ipFamily, "IPs", strings.Join(ips, ",")) }
burstSyncs := 2 klog.V(2).InfoS("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to. // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though. // time.Hour is arbitrary. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
if ipt.HasRandomFully() { klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol()) } else { klog.V(2).InfoS("Iptables does not support --random-fully", "ipFamily", ipt.Protocol()) }
// TODO(thockin): make it possible for healthz and metrics to be on the same port.
var errCh chan error if s.BindAddressHardFail { errCh = make(chan error) }
// Start up a healthz server if requested serveHealthz(s.HealthzServer, errCh)
// Start up a metrics server if requested serveMetrics(s.MetricsBindAddress, s.ProxyMode, s.EnableProfiling, errCh)
// Tune conntrack, if requested // Conntracker is always nil for windows if s.Conntracker != nil { max, err := getConntrackMax(s.ConntrackConfiguration) if err != nil { return err } if max > 0 { err := s.Conntracker.SetMax(max) if err != nil { if err != errReadOnlySysFS { return err } // errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000), // the only remediation we know is to restart the docker daemon. // Here we'll send an node event with specific reason and message, the // administrator should decide whether and how to handle this issue, // whether to drain the node and restart docker. Occurs in other container runtimes // as well. // TODO(random-liu): Remove this when the docker bug is fixed. const message = "CRI error: /sys is read-only: " + "cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)" s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, err.Error(), "StartKubeProxy", message) } }
/** 2.创建配置记录变量, 创建ServiceConfig、endpointsConfig or EndpointSliceConfig 结构体,注册informer包括回调函数 */ //ServiceConfig结构体跟踪记录Service配置信息的变化 serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) //RegisterEventHandler 是将Service的处理方法追加到serviceConfig的eventHandlers 中,eventHandlers为一个列表,元素类型ServiceHandler接口 // ServiceHandler接口定义了每个hanlder 处理service的api方法:OnServiceAdd,OnServiceUpdate,OnServiceDelete,OnServiceSynced // 此处 s.ServiceEventHandler 为proxier,s.Proxier实现了 ServiceHandler 接口定义的方法 serviceConfig.RegisterEventHandler(s.Proxier) go serviceConfig.Run(wait.NeverStop) // 如果 s.Proxier 的类型为 EndpointsHandler if endpointsHandler, ok := s.Proxier.(config.EndpointsHandler); ok && !s.UseEndpointSlices { endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) // 注册事件处理handler endpointsConfig.RegisterEventHandler(endpointsHandler) go endpointsConfig.Run(wait.NeverStop) } else { endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod) //RegisterEventHandler 是将EndpointSliceHandler的处理方法追加到EndpointSliceConfig的eventHandlers 中,eventHandlers为一个列表,元素类型 EndpointSliceHandler 接口 endpointSliceConfig.RegisterEventHandler(s.Proxier) go endpointSliceConfig.Run(wait.NeverStop) }
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those // functions must configure their shared informer event handlers first. informerFactory.Start(wait.NeverStop)
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { // Make an informer that selects for our nodename. currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String() })) nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod) nodeConfig.RegisterEventHandler(s.Proxier) go nodeConfig.Run(wait.NeverStop)
// This has to start after the calls to NewNodeConfig because that must // configure the shared informer event handler first. currentNodeInformerFactory.Start(wait.NeverStop) } /** 2.创建配置记录变量 */ // Birth Cry after the birth is successful s.birthCry() /** 3.开启新协程进入无限Loop循环进行工作,对service与endpoints的变化进行iptables规则的同步。*/ go s.Proxier.SyncLoop()
// ServiceConfig tracks a set of service configurations. type ServiceConfig struct { listerSynced cache.InformerSynced eventHandlers []ServiceHandler } // EndpointsConfig tracks a set of endpoints configurations. type EndpointsConfig struct { listerSynced cache.InformerSynced eventHandlers []EndpointsHandler } // EndpointSliceConfig tracks a set of endpoints configurations. type EndpointSliceConfig struct { listerSynced cache.InformerSynced eventHandlers []EndpointSliceHandler }
// OnServiceUpdate is called whenever modification of an existing // service object is observed. func(proxier *Proxier)OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.Sync() } }
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} { if chain, ok := existingFilterChains[chainName]; ok { proxier.filterChains.WriteBytes(chain) } else { proxier.filterChains.Write(utiliptables.MakeChainLine(chainName)) } }
// 这边新出来了三条chain:kubeNodePortsChain、KubeMarkMasqChain for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { if chain, ok := existingNATChains[chainName]; ok { proxier.natChains.WriteBytes(chain) } else { //KUBE-NODEPORTS,KUBE-MARK-MASQ 之前并未被创建,现在创建 proxier.natChains.Write(utiliptables.MakeChainLine(chainName)) } }
lps := make([]netutils.LocalPort, 0) for address := range nodeAddresses { lp := netutils.LocalPort{ Description: "nodePort for " + svcNameString, IP: address, IPFamily: localPortIPFamily, Port: svcInfo.NodePort(), Protocol: netutils.Protocol(svcInfo.Protocol()), } if utilproxy.IsZeroCIDR(address) { // Empty IP address means all lp.IP = "" lps = append(lps, lp) // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses. break } lps = append(lps, lp) }
// For ports on node IPs, open the actual port and hold it. for _, lp := range lps { if proxier.portsMap[lp] != nil { klog.V(4).InfoS("Port was open before and is still needed", "port", lp) replacementPortsMap[lp] = proxier.portsMap[lp] } elseif svcInfo.Protocol() != v1.ProtocolSCTP { socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
-A KUBE-SERVICES ...--comment ${svc-port-name} cluster IP ... -d ${cluster_ip}/32 -dport xxx -j KUBE-MARK-MASQ # if proxier.masqueradeAll =True -A KUBE-SERVICES ... --comment ${svc-port-name} cluster IP ... -d ${cluster_ip}/32 -dport XXX ! -s ${cluster_cidr} -j KUBE-MARK-MASQ # else if len(proxier.clusterCIDR) > 0 -A KUBE-SERVICES ... --comment ${svc-port-name} cluster IP ... -d ${cluster_ip}/32 -dport xxx -j KUBE-SVC-XXX # 有endpoints 时总是添加此规则
-A KUBE-SERVICES ...--comment {svc-port-name} has no endpoints ... -d ${cluster_ip}/32 -dport xxx -j REJECT // 没有endpoint时,直接将发往此IP:Port的包丢弃
为externalIP 类型服务建立规则
如果external IP 是本机IP,并且服务使用的协议不是SCTP, 生成结构体LocalPort 以记录这样的服务的external IP , port ,协议,以及描述信息。 确认在本机上打开服务端口(可以把这个socket理解为“占位符”,以便让操作系统为本机其他应用程序分配端口时让开该端口),并且添加{LocalPort :socket} 到replacementPortsMap。
-A KUBE-XLB-XXX ... -s ${cluster-ip} -j KUBE-SVC-XXX // 设置了clusterCIDR
//如果没有Local POD -A KUBE-XLB-XXX ... --comment ${svc-port-name} has no local endpoints -j KUBE-MARK-DROP //如果有Local POD -A KUBE-XLB-XXX ... -m recent --name KUBE-SEP-XXX -rchenck --seconds xxx -j KUBE-SEP-XXX //设置了亲和性