Mrli
别装作很努力,
因为结局不会陪你演戏。
Contacts:
QQ博客园

k8s之kube-proxy源码分析

2022/03/29 环境部署 源码解析
Word count: 13,991 | Reading time: 69min

kubernetes 简单介绍

背景介绍

Kubernetes最初源于谷歌内部的Borg,提供了面向应用的容器集群部署和管理系统,目前为主流的微服务应用编排管理工具。

Kubernetes的目标旨在消除编排物理/虚拟计算、网络和存储基础设施的负担,并使应用程序运营商和开发人员完全将重点放在以容器为中心的原理上进行自助运营。

Kubernetes具备完善的集群管理能力,包括多层次的安全防护和准入机制、多租户应用支撑能力、透明的服务注册和服务发现机制、内建负载均衡器、故障发现和自我修复能力、服务滚动升级和在线扩容、可扩展的资源自动调度机制、多粒度的资源配额管理能力。

Borg是谷歌内部的大规模集群管理系统,负责对谷歌内部很多核心服务的调度和管理。Borg的目的是让用户能够不必操心资源管理的问题,从而更专注于自己的核心业务。Borg甚至能做到跨多个数据中心的资源利用率最大化。

架构设计

k8s

Kubernetes属于主从分布式架构,主要由Master和Node组成,以及包括客户端命令行工具kubectl和其它附加项。

  • Master:作为控制节点,对集群所有工作节点Node进行调度管理;由kube-apiServer、kube-scheduler、kube-controller-manager和etcd组成。
  • Node:作为工作节点,运行业务应用的容器;由kubelet、kube-proxy和docker(目前仍是主流的运行时)组成。

代码整体分析

目录分析

通过git clone下来最新的K8S代码后,目前v1.22.2版本的代码结构通过tree命令可看到如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
.
├── api
├── build
├── CHANGELOG
├── cluster
├── cmd
├── docs
├── hack
├── LICENSES
├── logo
├── pkg
├── plugin
├── staging
├── test
├── third_party
└── vendor

k8s的程序入口都在cmd目录下面,核心逻辑都在pkg目录下面。这么做的好处是把程序入口和逻辑分开,因为程序入口只做了一些最简单的调用,具体逻辑是模块化的类库,即增强了程序的可读性,也便于一人负责一个模块多人互相协作,这一点在大型项目中非常重要。我们可以看到pkg目录下面有很多子目录,每一个子目录都包含了一个重要的组件。

proxy源码目录结构分析

要学习一份项目代码,除了学习其实现的原理外,其代码的组织结构也是非常有借鉴和学习价值的,并且先看明白目录结构更有助于我们检索想要阅读的代码段。

cmd/kube-proxy目录负责kube-proxy的创建,是启动的入口

1
2
3
4
5
6
7
8
9
10
11
12
.
├── app
│   ├── conntrack.go // 全局sysctl的一个接口, 各种sysctl字段的描述和辅助方法可以在这里找到
│   ├── init_others.go // 判断Cli参数中OS, 并根据结果进行设置非 Win OS配置参数
│   ├── init_windows.go // 判断Cli参数中OS, 并根据结果进行设置配置Win OS参数
│   ├── server.go //Options、ProxyServer 结构定义及其创建(NewProxyServerDefault)和运行(Run)的方法。
│   ├── server_others.go // 非Win OS下的 NewProxyServer实现,创建好匹配 ProxyMode 的ProxyServer
│   ├── server_others_test.go
│   ├── server_test.go
│   └── server_windows.go // Win OS下的 NewProxyServer 实现,创建好匹配 ProxyMode 的ProxyServer
├── OWNERS
└── proxy.go //kube-proxy的入口文件,提供main方法, 通过NewProxyCommand()产生Cobra.Command命令

pkg/proxy是kube-proxy核心实现的代码目录,通过tree -L 3展示其3层深度的目录结构如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
.
├── apis
│   ├── config
│   │   ├── doc.go
│   │   ├── fuzzer
│   │   ├── OWNERS
│   │   ├── register.go
│   │   ├── register_test.go
│   │   ├── scheme
│   │   ├── types.go
│   │   ├── v1alpha1
│   │   ├── validation
│   │   └── zz_generated.deepcopy.go
│   └── well_known_labels.go
├── config
│   ├── api_test.go
│   ├── config.go //定义ServiceUpdate,EndpointUpdate结构体以及ServiceConfigHandler,EndpointConfigHandler来处理Service和Endpoint的Update
│   ├── config_test.go
│   ├── doc.go
│   └── OWNERS
├── doc.go
├── endpoints.go // 定义了基础的BaseEndpoints, 实现了 endpoints 的更新与维护
├── endpointslicecache.go // endpoints缓存实现
├── endpointslicecache_test.go
├── endpoints_test.go
├── healthcheck // 健康检查, 负责service listener和endpoint的health check,add/delete请求。
│   ├── common.go
│   ├── doc.go
│   ├── healthcheck_test.go
│   ├── proxier_health.go
│   └── service_health.go
├── iptables //proxy mode为 iptables 的实现
│   ├── OWNERS
│   ├── proxier.go
│   └── proxier_test.go
├── ipvs //proxy mode为 ipvs 的实现
│   ├── graceful_termination.go
│   ├── graceful_termination_test.go
│   ├── ipset.go
│   ├── ipset_test.go
│   ├── netlink.go
│   ├── netlink_linux.go
│   ├── netlink_unsupported.go
│   ├── OWNERS
│   ├── proxier.go
│   ├── proxier_test.go
│   ├── README.md
│   ├── safe_ipset.go
│   └── testing
│   ├── fake.go
│   ├── fake_test.go
│   └── util.go
├── metaproxier
│   └── meta_proxier.go
├── metrics
│   └── metrics.go
├── OWNERS
├── service.go
├── service_test.go
├── topology.go // 实现对 Endpoint 的路由过滤:提供FilterLocalEndpoint、FilterEndpoints两个方法
├── topology_test.go
├── types.go // ServicePort、Endpoint 重要接口定义
├── userspace // proxy mode为 userspace 的实现
│   ├── loadbalancer.go
│   ├── OWNERS
│   ├── port_allocator.go
│   ├── port_allocator_test.go
│   ├── proxier.go
│   ├── proxier_test.go
│   ├── proxysocket.go
│   ├── rlimit.go
│   ├── rlimit_windows.go
│   ├── roundrobin.go
│   └── roundrobin_test.go
├── util
│   ├── endpoints.go
│   ├── endpoints_test.go
│   ├── iptables
│   │   ├── traffic.go
│   │   └── traffic_test.go
│   ├── network.go
│   ├── testing
│   │   └── fake.go
│   ├── utils.go
│   └── utils_test.go
├── winkernel // Win OS下的Kernel实现
│   ├── hns_test.go
│   ├── hnsV1.go
│   ├── hnsV2.go
│   ├── metrics.go
│   ├── OWNERS
│   ├── proxier.go
│   └── proxier_test.go
└── winuserspace // 系统为windows OS时,proxy mode为 userspace 的实现
├── loadbalancer.go
├── proxier.go
├── proxier_test.go
├── proxysocket.go
├── roundrobin.go
├── roundrobin_test.go
└── types.go

kube-proxy分析

kube-proxy是管理service的访问入口,实现Kubenetes Service通信与负载均衡机制,提供了集群内Pod对Service的访问和集群外访问service的方式。当用户创建 service 的时候,endpointController 会根据service 的 selector 找到对应的 pod,然后生成 endpoints 对象保存到 etcd 中。运行在每个节点上的Kube-proxy会通过api-server 获得etcd 中 Service和Endpoints的变化信息,并调用 kube-proxy 配置的代理模式来更新主机上的iptables 转发规则,通过修改iptables规则从而改变报文的流向,让集群中服务解析。从而实现了将业务请求连接到Service后具体的执行结点(endpoints)。

接下来我将会以iptables 代理模式为例,对proxy 的功能实现进行分析。基于iptables模式的kube-proxy的主要职责包括两大块:一是侦听service更新事件,并更新service相关的iptables规则;二是侦听endpoint更新事件,更新endpoint相关的iptables规则。

分析目标

带着问题看代码能收获的内容更多,因此在此,我们在心里保留两个问题:

  1. kube-proxy如何让集群内节点无法ping通clusterIP的?
  2. 为什么集群内节点可以通过ClusterIP:Port形式访问服务;集群外可以通过NodeIP:NodePort形式访问到服务

好了,明确了以上两个问题后就让我们来让代码吧

重要结构体说明

kubernetes/cmd/proxy-server/app/server.go作为cmd/kube-proxy入口的真正执行文件,主要是围绕ProxyServer和两个结构体进行了一系列初始化赋值的操作。因此,针对这两个结构体,我们需要详细了解下有什么内容

ProxyServer

ProxyServer 结构体中定义的属性代表了kube-proxy server 运行时需要的所有变量。kube-proxy server 调用的方法均来该结构体内变量拥有的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type ProxyServer struct {
Client clientset.Interface
EventClient v1core.EventsGetter
IptInterface utiliptables.Interface // 接口中定义了更新iptables 的方法集合,如DeleteChian,DeleteRule, EnsureChain,EnsureRule
IpvsInterface utilipvs.Interface // 定义操作 ipvs 的方法集
IpsetInterface utilipset.Interface //定义了操作 ipset 的方法集
execer exec.Interface // 定义包装os/exec库中Command, Commandcontext, LookPath方法的接口
Proxier proxy.ProxyProvider //Proxier 有五种实现方式,分别对应Linux环境中三种的代理模式和Windows环境下的三种的代理模式
Broadcaster record.EventBroadcaster //接受Event,交于各个处理函数进行处理
Recorder record.EventRecorder // Event 记录者
ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
Conntracker Conntracker // if nil, ignored
ProxyMode string
NodeRef *v1.ObjectReference
CleanupAndExit bool
CleanupIPVS bool
MetricsBindAddress string //127.0.0.1:10249 http prometheus metrics port;
EnableProfiling bool
OOMScoreAdj *int32 //通过一个数值用来表征进程当发生OOM时系统对该进程的行为
ResourceContainer string
ConfigSyncPeriod time.Duration //Default 15m0s
ServiceEventHandler config.ServiceHandler //
EndpointsEventHandler config.EndpointsHandler //
HealthzServer *healthcheck.HealthzServer // 0.0.0.0:10256 http healthz port;
}

根据源代码的注释提醒,以上数据成员都是必须有值的。

Options

kube各组件中都以Options结构体提供配置参数,kube-scheduler中创建了专门的options目录以及options.go文件,kube-proxy中直接将其放在了cmd/app/server.go中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** Options中记录了创建和运行proxy服务器所需的一切 */
type Options struct {
ConfigFile string // 记录了proxy服务器配置文件的位置
WriteConfigTo string // 默认配置将被写入的路径。
CleanupAndExit bool // 当CleanupAndExit为真时,proxy服务器将会清理iptables和ipvs规则,然后退出。
WindowsService bool // kube-proxy是否在Windows上作为一个服务运行,其对应的标志只在Windows构建中被注册
config *kubeproxyconfig.KubeProxyConfiguration // proxy服务器的配置对象,方便运行时读取配置
watcher filesystem.FSWatcher // 用于观察ConfigFile的更新变化。
proxyServer proxyRun // 启动proxy服务器的接口
errCh chan error // 发送错误的通道
// 下面的字段是占位符,不能直接映射到 config.KubeProxyConfiguration.
// TODO remove these fields once the deprecated flags are removed.
master string // master用于覆盖kubeconfig对apiserver的URL。
healthzPort int32 // healthz 服务器的端口
metricsPort int32 // metrics 服务器的端口
hostnameOverride string // 如果从命令行标志中设置该值,则将优先于配置文件中的`HostnameOverride`值。
}

Proxier

在每一种代理模式下,都定义了自己的Proxier 结构体,该结构体及方法实现了该模式下的代理规则的更新方法。在Iptables 模式下,kubernetes/pkg/proxy/iptables/proxier.go文件中Proxier 结构体的定义如下所示:

kubernetes/pkg/proxy/iptables/proxier.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
type Proxier struct {
// the ipfamily on which this proxy is operating on.
ipFamily v1.IPFamily
//EndpointChangeTracker中items属性为一个两级map,用来保存所有namespace 下endpoints 的变化信息。
//第一级map以 types.NamespacedName 为键,value 值为该namespace下所有endpoints 更新前(previous)、后(current)的信息:前、后信息分别为一个map ,即第二级map: ServiceMap。
//第二级map的key为ServicePortName 结构,[]Endpoint为值, 标记endpoints 对应的service,value为endpoint信息。
// EndpointChangeTracker 中实现了更新 endpoint 的方法
endpointsChanges *proxy.EndpointChangeTracker
serviceChanges *proxy.ServiceChangeTracker // 同理,ServiceChangeTracker 中使用一个两级map保存所有namespace 下的service的变化信息,并定义了更新service的方法

// 互斥锁,保护下列字段
mu sync.Mutex
serviceMap proxy.ServiceMap // 同serviceChanges 的第二级 map 结构,记录了所有namespace下需要更新iptables规则的service
endpointsMap proxy.EndpointsMap // 同endpointsChanges 的第二级 map 结构,记录了所有namespace 需要更新iptables规则的endpoints
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
endpointsSynced bool // Proxier 初始化时为False
servicesSynced bool // Proxier 初始化时为False
initialized int32
syncRunner *async.BoundedFrequencyRunner // async.BoundedFrequencyRunner 具有QPS功能,控制被托管方法的发生速率

// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
minSyncPeriod time.Duration
// Values are CIDR's to exclude when cleaning up IPVS rules.
excludeCIDRs []*net.IPNet
// Set to true to set sysctls arp_ignore and arp_announce
strictARP bool
iptables utiliptables.Interface // iptables 的执行器,定义了 Iptables 的操作方法
ipvs utilipvs.Interface // iptables 的执行器,定义了 IPVS 的操作方法
ipset utilipset.Interface
exec utilexec.Interface
masqueradeAll bool
masqueradeMark string
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
portMapper netutils.PortOpener // 已打开的UDP或TCP端口
recorder events.EventRecorder // 事件记录者

serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater

ipvsScheduler string
// Added as a member to the struct to allow injection for testing.
ipGetter IPGetter
// The following buffers are used to reuse memory and avoid allocations
// that are significantly impacting performance.
iptablesData *bytes.Buffer
filterChainsData *bytes.Buffer
natChains *bytes.Buffer
filterChains *bytes.Buffer
natRules *bytes.Buffer
filterRules *bytes.Buffer
// Added as a member to the struct to allow injection for testing.
netlinkHandle NetLinkHandle
// ipsetList is the list of ipsets that ipvs proxier used.
ipsetList map[string]*IPSet
// Values are as a parameter to select the interfaces which nodeport works.
nodePortAddresses []string
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer utilproxy.NetworkInterfacer
gracefuldeleteManager *GracefulTerminationManager
}

程序启动过程

从CLI命令接收参数输入的入口切入,我们可以看到一个从初始化到服务启动的完整的流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Step1. 命令入口
func NewProxyCommand() *cobra.Command {
// 创建空的Options结构体对象
opts := NewOptions()

// 创建kube-proxy命令,并定义执行函数
cmd := &cobra.Command{
Use: "kube-proxy",
Long: `The Kubernetes network proxy runs on each node. This
reflects services as defined in the Kubernetes API on each node and can do simple
TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends.
Service cluster IPs and ports are currently found through Docker-links-compatible
environment variables specifying ports opened by the service proxy. There is an optional
addon that provides cluster DNS for these cluster IPs. The user must create a service
with the apiserver API to configure the proxy.`,
Run: func(cmd *cobra.Command, args []string) {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())

if err := initForOS(opts.WindowsService); err != nil {
klog.ErrorS(err, "Failed OS init")
// ACTION REQUIRED: Exit code changed from 255 to 1
os.Exit(1)
}

if err := opts.Complete(); err != nil {
klog.ErrorS(err, "Failed complete")
// ACTION REQUIRED: Exit code changed from 255 to 1
os.Exit(1)
}

if err := opts.Validate(); err != nil {
klog.ErrorS(err, "Failed validate")
// ACTION REQUIRED: Exit code changed from 255 to 1
os.Exit(1)
}

// Options结构体初始化完成后, 根据配置启动proxy server
if err := opts.Run(); err != nil {
klog.ErrorS(err, "Error running ProxyServer")
os.Exit(1)
}
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
var err error
// 为opts对象中的记录运行时配置的config对象进行默认初始化
opts.config, err = opts.ApplyDefaults(opts.config)
if err != nil {
klog.ErrorS(err, "Unable to create flag defaults")
// ACTION REQUIRED: Exit code changed from 255 to 1
os.Exit(1)
}
// 接收命令FlagSet
fs := cmd.Flags()
// 增加Flag参数
opts.AddFlags(fs)
fs.AddGoFlagSet(goflag.CommandLine) // for --boot-id-file and --machine-id-file
// 对以下文件类型进行命令补全提示
_ = cmd.MarkFlagFilename("config", "yaml", "yml", "json")

return cmd
}

// Step2. ①完善Options中的Config运行时配置对象;②启动具体的ProxyServer.
func (o *Options) Run() error {
defer close(o.errCh)
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}
// 创建ProxyServer
proxyServer, err := NewProxyServer(o)
if err != nil {
return err
}
// 如果参数为真, 则清理iptables和ipvs规则,然后退出。
if o.CleanupAndExit {
return proxyServer.CleanupAndExit()
}
o.proxyServer = proxyServer
return o.runLoop()
}


// Step3. runLoop阻塞循环将不断侦听proxy server配置文件的更新变化。
func (o *Options) runLoop() error {
// 如果有监听器的话启动监听器
if o.watcher != nil {
o.watcher.Run()
}
// 另起协程运行proxy server
go func() {
err := o.proxyServer.Run()
o.errCh <- err
}()
// while循环等待, 并通过o.errCh阻塞,如果错误通道中读取到异常则退出程序
for {
err := <-o.errCh
if err != nil {
return err
}
}
}

ProxyServer创建过程

cmd/kube-proxy/app/server_others.go中根据Options中已初始化好的参数创建ProxyServer实例,其中会根据getProxyMode(string(config.Mode), canUseIPVS, iptables.LinuxKernelCompatTester{})来确定具体使用哪个proxier实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// 创建ProxyServer对象
func NewProxyServer(o *Options) (*ProxyServer, error) {
return newProxyServer(o.config, o.CleanupAndExit, o.master)
}

func newProxyServer(
config *proxyconfigapi.KubeProxyConfiguration,
cleanupAndExit bool,
master string) (*ProxyServer, error) {

if config == nil {
return nil, errors.New("config is required")
}

if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
c.Set(config)
} else {
return nil, fmt.Errorf("unable to register configz: %s", err)
}

var iptInterface utiliptables.Interface
var ipvsInterface utilipvs.Interface
var kernelHandler ipvs.KernelHandler
var ipsetInterface utilipset.Interface

// 创建iptables工具,包装了os/exec中的command,LookPath,CommandContext 方法,组装一个系统调用的命令和参数
execer := exec.New()

kernelHandler = ipvs.NewLinuxKernelHandler()
// 得到操作 ipset 的方法集接口
ipsetInterface = utilipset.New(execer)
// 判断是否能够使用IPVS模式, IPVS在一定条件下会被退化成iptables
canUseIPVS, err := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface, config.IPVS.Scheduler)

if string(config.Mode) == proxyModeIPVS && err != nil {
klog.ErrorS(err, "Can't use the IPVS proxier")
}

if canUseIPVS {
ipvsInterface = utilipvs.New(execer)
}

// We omit creation of pretty much everything if we run in cleanup mode
if cleanupAndExit {
return &ProxyServer{
execer: execer,
IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
}, nil
}

if len(config.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
// 从运行时配置中得到hostname
hostname, err := utilnode.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}
// 从给定的配置中创建与apiserver相匹配的kube客户端和event客户端。
client, eventClient, err := createClients(config.ClientConnection, master)
if err != nil {
return nil, err
}
// 得到proxy server使用的NodeIP
nodeIP := detectNodeIP(client, hostname, config.BindAddress)
klog.InfoS("Detected node IP", "address", nodeIP.String())

// Create event recorder
//EventBroadcaster会将收到的Event交于各个处理函数进行处理。接收Event的缓冲队列长为1000,不停地取走Event并广播给各个watcher;
//watcher通过recordEvent()方法将Event写入对应的EventSink里,最大重试次数为12次,重试间隔随机生成(见staging/src/k8s.io/client-go/tools/record/event.go);
// EnventSink 将在ProxyServer.Run() 中调用s.Broadcaster.StartRecordingToSink() 传进来;
// NewBroadcaster() 最后会启动一个goroutine 运行Loop 方法(staging/src/k8s.io/apimachinery/pkg/watch/mux.go)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
//EventRecorder通过generateEvent()实际生成各种Event,并将其添加到监视队列。
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy")

// 获得节点Node
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}

var healthzServer healthcheck.ProxierHealthUpdater
if len(config.HealthzBindAddress) > 0 {
//服务健康检查的 IP 地址和端口(IPv4默认为0.0.0.0:10256,对于所有 IPv6 接口设置为 ::)
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
}
// 声明proxier接口, 是kube-proxy work的真正实现类
var proxier proxy.Provider
var detectLocalMode proxyconfigapi.LocalMode
// 得到代理模式,有三种: Userspace、IPTables、IPVS; 注:IPVS可能会退化成IPTables, IPTables也可能退化成Userspace
proxyMode := getProxyMode(string(config.Mode), canUseIPVS, iptables.LinuxKernelCompatTester{})
// 选择CIDR模式,目前有两种: NodeCIDR、ClusterCIDR
detectLocalMode, err = getDetectLocalMode(config)
if err != nil {
return nil, fmt.Errorf("cannot determine detect-local-mode: %v", err)
}
// 声明节点信息
var nodeInfo *v1.Node
if detectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", hostname)
nodeInfo, err = waitForPodCIDR(client, hostname)
if err != nil {
return nil, err
}
klog.InfoS("NodeInfo", "PodCIDR", nodeInfo.Spec.PodCIDR, "PodCIDRs", nodeInfo.Spec.PodCIDRs)
}

klog.V(2).InfoS("DetectLocalMode", "LocalMode", string(detectLocalMode))
// 根据IP协议确定合适的更新iptables的实现类
primaryProtocol := utiliptables.ProtocolIPv4
if netutils.IsIPv6(nodeIP) {
primaryProtocol = utiliptables.ProtocolIPv6
}
//iptInterface 赋值为runner结构体,该结构体实现了接口utiliptables.Interface中定义的方法,
//各方法中通过runContext()方法调用execer的命令包装方法返回一个被包装的iptables 命令
iptInterface = utiliptables.New(execer, primaryProtocol)

var ipt [2]utiliptables.Interface
dualStack := true // While we assume that node supports, we do further checks below

if proxyMode != proxyModeUserspace {
// Create iptables handlers for both families, one is already created
// Always ordered as IPv4, IPv6
// 以IPv4->IPv6的顺序创建iptables处理程序
if primaryProtocol == utiliptables.ProtocolIPv4 {
ipt[0] = iptInterface
ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIPv6)
} else {
ipt[0] = utiliptables.New(execer, utiliptables.ProtocolIPv4)
ipt[1] = iptInterface
}

// 检查内核是否支持iptable接口
for _, perFamilyIpt := range ipt {
if !perFamilyIpt.Present() {
klog.V(0).InfoS("kube-proxy running in single-stack mode, this ipFamily is not supported", "ipFamily", perFamilyIpt.Protocol())
dualStack = false
}
}
}
// 如果代理模式为IPTables
if proxyMode == proxyModeIPTables {
klog.V(0).InfoS("Using iptables Proxier")
if config.IPTables.MasqueradeBit == nil {
// MasqueradeBit must be specified or defaulted.
return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
}
// 是否支持双栈
if dualStack {
klog.V(0).InfoS("kube-proxy running in dual-stack mode", "ipFamily", iptInterface.Protocol())
klog.V(0).InfoS("Creating dualStackProxier for iptables")
// Always ordered to match []ipt
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}

// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewDualStackProxier(
ipt,
utilsysctl.New(),
execer,
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
localDetectors,
hostname,
nodeIPTuple(config.BindAddress),
recorder,
healthzServer,
config.NodePortAddresses,
)
} else {
// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
var localDetector proxyutiliptables.LocalTrafficDetector
localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}

// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewProxier(
iptInterface,
utilsysctl.New(),
execer,
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
localDetector,
hostname,
nodeIP,
recorder,
healthzServer,
config.NodePortAddresses,
)
}

if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
// 注册指标
proxymetrics.RegisterMetrics()
} else if proxyMode == proxyModeIPVS {
// ... 省略
} else {
// ... 省略
}

useEndpointSlices := true
if proxyMode == proxyModeUserspace {
// userspace mode doesn't support endpointslice.
useEndpointSlices = false
}

return &ProxyServer{
Client: client,
EventClient: eventClient,
IptInterface: iptInterface,
IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
execer: execer,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
ConntrackConfiguration: config.Conntrack,
Conntracker: &realConntracker{},
ProxyMode: proxyMode,
NodeRef: nodeRef,
MetricsBindAddress: config.MetricsBindAddress,
BindAddressHardFail: config.BindAddressHardFail,
EnableProfiling: config.EnableProfiling,
OOMScoreAdj: config.OOMScoreAdj,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
HealthzServer: healthzServer,
UseEndpointSlices: useEndpointSlices,
}, nil
}

创建完成后,即可通过err := o.proxyServer.Run()启动proxyServer,接下来我们以iptables模式为例继续分析。

iptables proxy server

代码主要在pkg/proxy/iptables/proxier.go中实现,首先以单栈proxier分析iptables proxier构造特性见NewProxier函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// 创建一个iptables proxier实例, 由于iptables的逻辑, 我们假设一台机器上只有一个Proxier在工作。如果iptables未能更新或获得初始锁,将返回一个错误。一旦代理服务器被创建,它将在后台保持iptables的更新,如果某个iptables调用失败,它将不会终止。
func NewProxier(ipt utiliptables.Interface,
sysctl utilsysctl.Interface,
exec utilexec.Interface,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
masqueradeAll bool,
masqueradeBit int,
localDetector proxyutiliptables.LocalTrafficDetector,
hostname string,
nodeIP net.IP,
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
nodePortAddresses []string,
) (*Proxier, error) {
// kube-proxy要求NODE节点操作系统中有/sys/module/br_netfilter模块,还要设置bridge-nf-call-iptables=1;
//如果不满足要求,kube-proxy在运行过程中设置的某些iptables规则就不会工作。
if err := utilproxy.EnsureSysctl(sysctl, sysctlRouteLocalnet, 1); err != nil {
return nil, err
}

// 当容器连接到Linux网桥(但不是SDN网桥)时,代理需要br_netfilter和bridge-nf-call-iptables=1。 在大多数插件处理这个问题之前,当配置缺失时要记录日志
if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables, proxy may not work as intended")
}

// 生成用于SNAT规则的mark标记
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)

serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses)

ipFamily := v1.IPv4Protocol
if ipt.IsIPv6() {
ipFamily = v1.IPv6Protocol
}

ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
nodePortAddresses = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.InfoS("Found node IPs of the wrong family", "ipFamily", ipFamily, "IPs", strings.Join(ips, ","))
}

proxier := &Proxier{
portsMap: make(map[netutils.LocalPort]netutils.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
syncPeriod: syncPeriod,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &netutils.ListenPortOpener,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil),
existingFilterChainsData: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
nodePortAddresses: nodePortAddresses,
networkInterfacer: utilproxy.RealNetwork{},
}

burstSyncs := 2
klog.V(2).InfoS("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
// time.Hour is arbitrary.
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)

// 开启iptables监视的协程, 主要的通过 proxier.syncProxyRules 的方法来维护iptables规则变化
go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
proxier.syncProxyRules, syncPeriod, wait.NeverStop)

if ipt.HasRandomFully() {
klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
} else {
klog.V(2).InfoS("Iptables does not support --random-fully", "ipFamily", ipt.Protocol())
}

return proxier, nil
}

ProxyServer.Run()

是proxyServer的启动命令,在cmd/kube-proxy/app/server.go中统一定义的模板方法,并没有被具体的proxier实现。开启了go serviceConfig.Run(wait.NeverStop)go endpointsConfig.Run(wait.NeverStop)orgo endpointSliceConfig.Run(wait.NeverStop)go nodeConfig.Run(wait.NeverStop)go s.Proxier.SyncLoop()多个协程,自身被errCh通道阻塞,等待接收从子协程抛出的err

在Run() 方法中,大致做了如下工作:

  • 准备工作,如设置OOMScoreAdj并设置连接跟踪
  • 注册service 和endpoints 的处理方法,使用list-watch 机制对service,endpoints资源监听。
  • 最后进入一个无限循环,对service与endpoints的变化进行iptables规则的同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
func (s *ProxyServer) Run() error {
// To help debugging, immediately log version
klog.InfoS("Version info", "version", version.Get())

/** 1.1 设置oom数值 */
// //在用户空间通过写oomScoreAdj参数到/proc/self/oom_score_adj文件来改变进程的 oom_adj 内核参数;
//oom_adj的值的大小决定了进程被 OOM killer,取值范围[-1000,1000] 选中杀掉的概率,值越低越不容易被杀死.此处默认值是-999。
// TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster
if s.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
klog.V(2).InfoS("Failed to apply OOMScore", "err", err)
}
}
/** 设置oom数值 */

/** 1.2 设置连接跟踪 */
if s.Broadcaster != nil && s.EventClient != nil {
// EventSinkImpl 包装了处理event 的方法create ,update, patchs
//s.Broadcaster 已经在ProxyServer 初始化中作为一个goroutine 在运行。
stopCh := make(chan struct{})
s.Broadcaster.StartRecordingToSink(stopCh)
}

// TODO(thockin): make it possible for healthz and metrics to be on the same port.

var errCh chan error
if s.BindAddressHardFail {
errCh = make(chan error)
}

// Start up a healthz server if requested
serveHealthz(s.HealthzServer, errCh)

// Start up a metrics server if requested
serveMetrics(s.MetricsBindAddress, s.ProxyMode, s.EnableProfiling, errCh)

// Tune conntrack, if requested
// Conntracker is always nil for windows
if s.Conntracker != nil {
max, err := getConntrackMax(s.ConntrackConfiguration)
if err != nil {
return err
}
if max > 0 {
err := s.Conntracker.SetMax(max)
if err != nil {
if err != errReadOnlySysFS {
return err
}
// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
// the only remediation we know is to restart the docker daemon.
// Here we'll send an node event with specific reason and message, the
// administrator should decide whether and how to handle this issue,
// whether to drain the node and restart docker. Occurs in other container runtimes
// as well.
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "CRI error: /sys is read-only: " +
"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, err.Error(), "StartKubeProxy", message)
}
}

if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}

if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
}
}
/** 设置连接跟踪 */
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
if err != nil {
return err
}

noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
if err != nil {
return err
}

labelSelector := labels.NewSelector()
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)

// 创建informer,过滤出期望以非默认service proxy运行的对象。
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}))


/** 2.创建配置记录变量, 创建ServiceConfig、endpointsConfig or EndpointSliceConfig 结构体,注册informer包括回调函数 */
//ServiceConfig结构体跟踪记录Service配置信息的变化
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
//RegisterEventHandler 是将Service的处理方法追加到serviceConfig的eventHandlers 中,eventHandlers为一个列表,元素类型ServiceHandler接口
// ServiceHandler接口定义了每个hanlder 处理service的api方法:OnServiceAdd,OnServiceUpdate,OnServiceDelete,OnServiceSynced
// 此处 s.ServiceEventHandler 为proxier,s.Proxier实现了 ServiceHandler 接口定义的方法
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)

// 如果 s.Proxier 的类型为 EndpointsHandler
if endpointsHandler, ok := s.Proxier.(config.EndpointsHandler); ok && !s.UseEndpointSlices {
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
// 注册事件处理handler
endpointsConfig.RegisterEventHandler(endpointsHandler)
go endpointsConfig.Run(wait.NeverStop)
} else {
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
//RegisterEventHandler 是将EndpointSliceHandler的处理方法追加到EndpointSliceConfig的eventHandlers 中,eventHandlers为一个列表,元素类型 EndpointSliceHandler 接口
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
}

// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
// functions must configure their shared informer event handlers first.
informerFactory.Start(wait.NeverStop)

if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
// Make an informer that selects for our nodename.
currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
}))
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
nodeConfig.RegisterEventHandler(s.Proxier)
go nodeConfig.Run(wait.NeverStop)

// This has to start after the calls to NewNodeConfig because that must
// configure the shared informer event handler first.
currentNodeInformerFactory.Start(wait.NeverStop)
}
/** 2.创建配置记录变量 */

// Birth Cry after the birth is successful
s.birthCry()
/** 3.开启新协程进入无限Loop循环进行工作,对service与endpoints的变化进行iptables规则的同步。*/
go s.Proxier.SyncLoop()

return <-errCh
}

注:XxxxxConfig.RegisterEventHandler(s.Proxier)中s.Proxier都是实现了对应其接口的实体proxier,详细的实现可以去找具体的proxier,如iptables/proxier.go有很多On开头的方法

附:Service和Endpoint、EndpointSlice资源类结构体,可以看到其中全都有个eventHandlers的Slice维护了处理Handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ServiceConfig tracks a set of service configurations.
type ServiceConfig struct {
listerSynced cache.InformerSynced
eventHandlers []ServiceHandler
}
// EndpointsConfig tracks a set of endpoints configurations.
type EndpointsConfig struct {
listerSynced cache.InformerSynced
eventHandlers []EndpointsHandler
}
// EndpointSliceConfig tracks a set of endpoints configurations.
type EndpointSliceConfig struct {
listerSynced cache.InformerSynced
eventHandlers []EndpointSliceHandler
}

那么具体的proxier是如何侦听endpoint和service的变化的呢?这个就要见上述各资源的注册过程中配置了什么,见kubernetes/pkg/proxy/config/config.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
result := &ServiceConfig{
listerSynced: serviceInformer.Informer().HasSynced,
}


// serviceInformer.Informer() 返回一个sharedIndexInformer 实例(staing/src/k8s.io/client-go/tools/cache/shared_informer.go),通过其AddEventHandlerWithResyncPeriod() 方法,将ResourceEventHandler实例赋值给processorListener结构体的handler属性,作为其事件发生的处理函数
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
//结构体cache.ResourceEventHandlerFuncs 是一个ResourceEventHandler接口类型(staing/src/k8s.io/client-go/tools/cache/controller.go),将ServicConfig 结构体的handleAddService 等方法赋予了cache.ResourceEventHandlerFuncs,实现一个ResourceEventHandler实例
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddService,
UpdateFunc: result.handleUpdateService,
DeleteFunc: result.handleDeleteService,
},
resyncPeriod,
)
return result
}

// 如果发生了增加Service事件
func (c *ServiceConfig) handleAddService(obj interface{}) {
service, ok := obj.(*v1.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnServiceAdd")
// 转而调用 注册的事件handler接口,即proxyServer.Proxier 来处理
c.eventHandlers[i].OnServiceAdd(service)
}
}

从(s *ProxyServerproxier)Run的方法中可以看到service处理方法的被调用流程:通过serviceConfig.RegisterEventHandler()方法注册挂载了proxyServer.Proxier实例,然后在serviceConfig中的handleAddService()等方法中以c.eventHandlers[i].OnServiceAdd(service)的形式调用proxier中的OnServiceAdd()等对应的方法。

其中,可以看到proxier的处理函数都以订阅者的模式被注册好了等待事件发生触发,此时的执行权力都转交给了上层,那么serviceInformer.Informer().AddEventHandlerWithResyncPeriod就成了新的执行的入口,因此我们可以看看它是怎么work的,\staging\src\k8s.io\client-go\tools\cache\shared_informer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
//... 省略

// 将加载好的handler注册到listener监听器中
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

if !s.started {
s.processor.addListener(listener)
return
}

s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// processor为sharedProcessor类型,其有一个 processorListener 的集合,可以向其监听器分发通知对象。 分发操作有两种, 会被同步分发到listener的子集中,(a)在运行时偶尔调用shouldResync时被重新得到的listener(b)每个最初被放入listener。
s.processor.addListener(listener)

看到上面的代码,proxier 中OnServiceAdd() 等方法的调用流程大致就有数了:在上述serviceInformer.Informer()返回之前,还将调用SharedIndexInformer.InformerFor()方法给informerFactory的informers属性赋值f.informers[informerType] = informer,如下所示\staging\src\k8s.io\apiextensions-apiserver\examples\client-go\pkg\client\informers\externalversions\factory.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerType := reflect.TypeOf(obj)
// 判断是否有 informerType 类型的informers
// f.informers为map[reflect.Type]cache.SharedIndexInformer
informer, exists := f.informers[informerType]
if exists {
return informer
}

resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}

informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer

return informer
}

此段代码的意义可理解为:从api server 监听到 informerType类型资源变化的处理者记录(映射)为informer。此处的资源类型即为service, informer 便为sharedIndexInformer。具体的调用时机和最上层方法入口就要去看informerFactory这个东西了,这又是k8s 中另一个比较系统的公共组件了,即它涉及到client-go的SharedInformer的触发规则和实现原理了。

记录资源变化

Proxier 实现了 services 和 endpoints 事件各种最终的观察者,最终的事件触发都会在 proxier 中进行处理。对于通过监听 API Server 变化的信息,通过调用ResourceHandler将变化的信息保存到 endpointsChanges 和 serviceChanges。那么一个ResourceHandler是如何实现的呢?service 和endpoints 的变化如何记录为servriceChanges 和endpointsChanges?回看上边源码中被注册的对象s.ServiceEventHandler,s.EndpointsEventHandler的具体实现便可明白。

service 和endpoints 的处理原则相似,以对servcie 的处理为例,看一下对service 的处理方法,pkg/proxy/iptables/proxier.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 增加Service
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
proxier.OnServiceUpdate(nil, service)
}

// 更新Service
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.syncRunner.Run() // 通过channel 发送一个信号,调用tryRun()
}
}
// 删除Service时
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
proxier.OnServiceUpdate(service, nil)
}

// ...

// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.Sync()
}
}

可以看到,上述三个情况:增加、删除service 都是给proxier.OnServiceUpdate() 传入参数后,由OnServiceUpdate() 方法处理,而proxier.OnServiceUpdate() 中又是通过proxier.serviceChanges.Update(oldService, service)实现的 ,因此重点看一下serviceChanges的Update() 方法是如何实现的。见pkg/proxy/service.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 根据<previous, current>服务对,更新给定服务的变化图。 如果项目发生了变化,它返回true。
// 否则返回false。 Update可以用来添加/更新/删除ServiceChangeMap的项目。 例如。
// 添加项目
// - 传递<nil, service>作为<previous, current>对。
// 更新项目
// - 传递<oldService, service>作为<previous, current>对。
// 删除项目
// - 传递<service, nil>作为<previous, current>对。
func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
svc := current
if svc == nil {
svc = previous
}
// previous == nil && current == nil is unexpected, we should return false directly.
if svc == nil {
return false
}
namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}

sct.lock.Lock()
defer sct.lock.Unlock()

change, exists := sct.items[namespacedName]
if !exists { // 在serviceChanges 中不存在一个以namespacedName 为key 的资源
change = &serviceChange{} // 初始化一个serviceChange
change.previous = sct.serviceToServiceMap(previous)
sct.items[namespacedName] = change
}
change.current = sct.serviceToServiceMap(current)
// if change.previous equal to change.current, it means no change
if reflect.DeepEqual(change.previous, change.current) { // 从update传递进来的资源没有变化,则从serviceChanges中删除。
delete(sct.items, namespacedName)
}
return len(sct.items) > 0
}

Update 方法就是根据`<previous ,current> 参数对新生成一个change 或者修改一个存在的change。并且把无变化的资源从serviceChanges 中删除。serviceChanges.items 会在将变化信息更新到proxier.serviceMap 后清空。

IPTables相关

Iptables 创建KUBE自定义链和规则

介绍了kube-proxy关于资源监听和记录的实现后,再来看一下kube-proxy是如何将资源的变化反馈到iptables规则中的。在具体proxier的创建过程中,以iptables为例,是创建了iptable监视器的,其会通过不断调用proxier.syncProxyRules来同步更新iptables规则。

1
2
3
4
5
6
7
8
9
10
func NewProxier(ipt utiliptables.Interface,
// ...
){
// ... 省略

// 开启iptables监视的协程, 主要的通过 proxier.syncProxyRules 的方法来维护iptables规则变化
go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
proxier.syncProxyRules, syncPeriod, wait.NeverStop)

}

继续研读syncProxyRule()方法看看其实修改iptables规则的细节流程,我们将能明白在node节点观察到的新链及规则产生的方式及目的,见kubernetes/pkg/proxy/iptables/proxier.go,更新proxier.endpointsMap,proxier.servieMap以及产生和维护Kube自定义链。

  • proxier.serviceMap:把sercvieChanges.current 写入proxier.serviceMap,再把存在于sercvieChanges.previous 但不存在于sercvieChanges.current 的service 从 proxier.serviceMap中删除,并且删除的时候,把使用UDP协议的cluster_ip 记录于UDPStaleClusterIP 。
  • proxier.endpointsMap:把endpointsChanges.previous 从proxier.endpointsMap 删除,再把endpointsChanges.current 加入proxier.endpointsMap。把存在于endpointsChanges.previous 但不存在于endpointsChanges.current 的endpoint 组装为ServiceEndpoint 结构,把该结构记录于staleEndpoints。

具体相关代码流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// syncProxyRules是所有iptables-save/restore调用发生的地方。
// 唯一的其他iptables规则是那些在iptablesInit()中设置的规则。
func (proxier *Proxier) syncProxyRules() {
// ...省略

serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)

staleServices := serviceUpdateResult.UDPStaleClusterIP

// 利用endpointUpdateResult.StaleServiceNames,再次更新 staleServices
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP {
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString())
staleServices.Insert(svcInfo.ClusterIPString())
}
}
...
}

//kubernetes/pkg/proxy/servcie.go
func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
result.UDPStaleClusterIP = sets.NewString()
// apply 方法中,继续调用了merge,filter, umerge
// merge:将change.current的servicemap 信息合入proxier.servicemap中。
// filter:将change.previous和change.current共同存在的servicemap从将change.previous删除
// unmerge: 将change.previous 中使用UDP 的servicemap 从 proxier.serviceMap 中删除,并记录删除的服务IP 到UDPStaleClusterIP
//apply中最后重置了proxy.serviceChanges.items
serviceMap.apply(changes, result.UDPStaleClusterIP)
//HCServiceNodePorts 保存proxier.serviceMap 中所有服务的健康检查端口
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap {
if info.GetHealthCheckNodePort() != 0 {
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort())
}
}
return result
}

//kubernetes/pkg/proxy/endpoints.go
func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
result.StaleEndpoints = make([]ServiceEndpoint, 0)
result.StaleServiceNames = make([]ServicePortName, 0)

// 从proixer.endpointsMap 中删除和change.previous 相同的elelment.
// 将change.current 添加至proixer.endpointsMap
// StaleEndpoints 保存了存在于previous 但不存在current的endpoints
// StaleServicenames保存了一种ServicePortName,这样的ServicePortName在change.previous不存在对应的endpoints,在change.current存在endpoints。
// 最后重置了了proxy.endpointsChanges.items
endpointsMap.apply(changes, &result.StaleEndpoints, &result.StaleServiceNames)
// computing this incrementally similarly to endpointsMap.
result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
localIPs := GetLocalEndpointIPs(endpointsMap)
for nsn, ips := range localIPs {
result.HCEndpointsLocalIPSize[nsn] = len(ips)
}
return result
}

准备好更新iptables需要的资源变量后,接下来就是调用iptables 命令建立自定义链了,并在对应的内核链上引用这些自定义链。这些自定义链在k8s 服务中是必须的,不会跟随资源变化而变化,所以在更新规则之前,提前无条件生成这些链,做好准备工作,随后会在这些自定义链上创建相应的规则。

继续看kubernetes/pkg/proxy/iptables/proxier.go,就是kube自定义链的创建过程了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 创建并连接kube链
for _, jump := range iptablesJumpChains {
if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
return
}
args := append(jump.extraArgs,
"-m", "comment", "--comment", jump.comment,
"-j", string(jump.dstChain),
)
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
return
}
}

// ensure KUBE-MARK-DROP chain exist but do not change any rules
for _, ch := range iptablesEnsureChains {
if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
return
}
}

// EnsureChain is part of Interface.
func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) {
fullArgs := makeFullArgs(table, chain)
runner.mu.Lock()
defer runner.mu.Unlock()

out, err := runner.run(opCreateChain, fullArgs)
if err != nil {
if ee, ok := err.(utilexec.ExitError); ok {
if ee.Exited() && ee.ExitStatus() == 1 {
return true, nil
}
}
return false, fmt.Errorf("error creating chain %q: %v: %s", chain, err, out)
}
return false, nil
}

由于涉及到iptables内容了,以目前的能力再阅读代码已经深入不进了,因此只好借助网上的代码来分析实现了。据网上资料,上边代码完成的iptables命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 在filter表中创建KUBE-EXTERNAL-SERVICES链
iptables -w -N KUBE-EXTERNAL-SERVICES -t filter
# 在filter表的INPUT链头增加跳转到 自定义KUBE-EXTERNAL-SERVICES 链的规则
iptables -w -I INPUT -t filter -m conntrack --ctstate NEW -m comment --comment -j KUBE-EXTERNAL-SERVICES kubernetes externally-visible service portals

# 在filter表中创建KUBE-SERVICES链
iptables -w -N KUBE-SERVICES -t filter
# 在filter表的OUTPUT链头增加跳转到 自定义 KUBE-SERVICES 链的规则
iptables -w -I OUTPUT -t filter -m conntrack --ctstate NEW -m comment --comment -j KUBE-SERVICES kubernetes service portals

# 在 nat 表中创建KUBE-SERVICES链
iptables -w -N KUBE-SERVICES -t nat
# 在 nat 表的OUTPUT链头增加跳转到 自定义 KUBE-SERVICES 链的规则
iptables -w -I OUTPUT -t nat -m conntrack --ctstate NEW -m comment --comment -j KUBE-SERVICES kubernetes service portals

iptables -w -N KUBE-SERVICES -t nat
# 在 nat 表的 PREROUTING 链头增加跳转到 自定义 KUBE-SERVICES 链的规则
iptables -w -I PREROUTING -t nat -m conntrack --ctstate NEW -m comment --comment -j KUBE-SERVICES kubernetes service portals

# 在 nat 表中创建KUBE-POSTROUTING 链
iptables -w -N KUBE-POSTROUTING -t nat
# 在 nat 表的 PREROUTING 链头增加跳转到 自定义 KUBE-POSTROUTING 链的规则
iptables -w -I POSTROUTING -t nat -m conntrack --ctstate NEW -m comment --comment -j KUBE-POSTROUTING kubernetes postrouting rules

# 在 filter 表中创建 KUBE-FORWARD 链
iptables -w -N KUBE-FORWARD -t filter
# 在 filter 表的 FORWARD 链头增加跳转到 自定义 KUBE-FORWARD 链的规则
iptables -w -I FORWARD -t filter -m conntrack --ctstate NEW -m comment --comment -j KUBE-FORWARD kubernetes forwarding rules

可见的是,在IPTables做了以下修改:

  • filter表:
    • 创建 KUBE-EXTERNAL-SERVICES 链
    • 创建 KUBE-SERVICES 链
    • 创建 KUBE-FORWARD 链
  • nat 表:创建KUBE-SERVICES链
    • 创建 KUBE-SERVICES 链
    • 创建 KUBE-POSTROUTING 链

并且在nat和filter表上原有的固定链前都增加了跳转到Kube自定义链的转发规则,从而使得所有进入固定链的流包在nat或filter 时,都会导入自定义链中。

可见通过上述的创建,在内核固定链中引用 K8S 的链时,这些新链都是作为内核固定链在nat表或filter表中的第一条规则。这样,所有进入固定链的流包在nat或filter 时,都会导入自定义链中。特别地,PREROUTING 和OUTPUT 的首条NAT规则都先将所有流量导入KUBE-SERVICE 链中,这样就截获了所有的入流量和出流量,进而可以对k8s 相关流量进行重定向处理。

继续沿着syncProxyRules函数看下去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
if chain, ok := existingFilterChains[chainName]; ok {
proxier.filterChains.WriteBytes(chain)
} else {
proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
}
}

// 这边新出来了三条chain:kubeNodePortsChain、KubeMarkMasqChain
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
if chain, ok := existingNATChains[chainName]; ok {
proxier.natChains.WriteBytes(chain)
} else {
//KUBE-NODEPORTS,KUBE-MARK-MASQ 之前并未被创建,现在创建
proxier.natChains.Write(utiliptables.MakeChainLine(chainName))
}
}

至此,iptables/proxier.go中定义的8个chain我们已经见到了6个了,剩下的两个分别是KUBE-MARK-DROP、KUBE-PROXY-CANARY,这两个对我们走读执行逻辑就没有太多关联了,因此就不继续分析这两个链了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const (
// the services chain
kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
// the external services chain
kubeExternalServicesChain utiliptables.Chain = "KUBE-EXTERNAL-SERVICES"
// the nodeports chain
kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
// the kubernetes postrouting chain
kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
// KubeMarkMasqChain is the mark-for-masquerade chain
KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// KubeMarkDropChain is the mark-for-drop chain
KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
// the kubernetes forward chain
kubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
// kube proxy canary chain is used for monitoring rule reload
kubeProxyCanaryChain utiliptables.Chain = "KUBE-PROXY-CANARY"
)

KUBE-MARK-DROP和KUBE-MARK-MASQ本质上就是使用了iptables的MARK命令:

  • 对于KUBE-MARK-MASQ链中所有规则设置了kubernetes独有MARK标记,在KUBE-POSTROUTING链中对NODE节点上匹配kubernetes独有MARK标记的数据包,当报文离开node节点时进行SNAT,MASQUERADE源IP
  • 而对于KUBE-MARK-DROP设置标记的报文则会在KUBE_FIREWALL中全部丢弃

Service类型分析

继续往下看,终于到了重点的地方:proxy如果通过service找到对应的EndPoints

1
2
3
4
5
6
7
8
9
10
11
12
13
func (proxier *Proxier) syncProxyRules() {
// ...

// Build rules for each service.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
// ...
allEndpoints := proxier.endpointsMap[svcName]
// 对拓扑感知端点进行过滤。该函数仅在适当的功能gates被启用,并且该服务没有冲突的配置(如externalTrafficPolicy=Local)时才会过滤端点。
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
// svcChain通过utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol))创建的chain
svcChain := svcInfo.servicePortChainName
// ...

clusterIP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
      /*** 				Capture the clusterIP.			***/
if hasEndpoints {
// 填充转发选项-d和--dport
args = append(args[:0],
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
"--dport", strconv.Itoa(svcInfo.Port()),
)
if proxier.masqueradeAll {
// 生成结果-A KUBE-SERVICES ...--comment ${svc-port-name} cluster IP ... -d ${cluster_ip}/32 -dport xxx -j KUBE-MARK-MASQ
args := prepend(args, "-A", string(svcChain))
args = append(args, "-j", string(KubeMarkMasqChain))
proxier.natRules.Write(args...)
} else if proxier.localDetector.IsImplemented() {
// 生成结果 -A KUBE-SERVICES ... --comment ${svc-port-name} cluster IP ... -d ${cluster_ip}/32 -dport XXX ! -s ${cluster_cidr} -j KUBE-MARK-MASQ
args := prepend(args, "-A", string(svcChain))
args = proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))
proxier.natRules.Write(args...)
}
args = prepend(args, "-A", string(kubeServicesChain))
// 总是将将目的地址是{cluster_ip:port} 的流量导入到KUBE-SVC-XXX, 即-A KUBE-SERVICES ... --comment ${svc-port-name} cluster IP ... -d ${cluster_ip}/32 -dport xxx -j KUBE-SVC-XXX
args = append(args, "-j", string(svcChain))
proxier.natRules.Write(args...)
} else {
// No endpoints.
// 如果服务没有endpoints, 在KUBE-SERVICES链上建立filter 规则((将规则写入proxier.filterRules ,下同),表示放弃访问目的地址{cluster_ip:port}的包。 省略
// 生成结果:-A KUBE-SERVICES ...--comment {svc-port-name} has no endpoints ... -d ${cluster_ip}/32 -dport xxx -j REJECT
}
/*** Capture the clusterIP. ***/

externalIPs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
 /*** 				Capture externalIPs.		***/
for _, externalIP := range svcInfo.ExternalIPStrings() {
// 如果 "外部 "IP恰好是本机的IP,则保持本机端口开放,这样就没有其他进程可以打开它(因为套接字可能会打开,但它永远不会工作)。
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(netutils.ParseIPSloppy(externalIP)) {
lp := netutils.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
IPFamily: localPortIPFamily,
Port: svcInfo.Port(),
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())

proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
continue
}
klog.V(2).InfoS("Opened local port", "port", lp)
replacementPortsMap[lp] = socket
}
}

if hasEndpoints {
args = append(args[:0],
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(netutils.ParseIPSloppy(externalIP)),
"--dport", strconv.Itoa(svcInfo.Port()),
)

destChain := svcXlbChain
if !svcInfo.NodeLocalExternal() {
destChain = svcChain
args := prepend(args, "-A", string(svcChain))
// This masquerades off-cluster traffic to a External IP.
// -A KUBE-SERVICES ... --comment ${svc-port-name} external IP ... -d ${external_ip}/32 -dport xxx -m physdev ! --physdev-is-in -m addrtype ! --src-type LOCAL -j KUBE-SVC-xxx
if proxier.localDetector.IsImplemented() {
proxier.natRules.Write(
proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...)
} else {
proxier.natRules.Write(
append(args, "-j", string(KubeMarkMasqChain))...)
}
}
// Send traffic bound for external IPs to the service chain.
// -A KUBE-SERVICES ... --comment ${svc-port-name} external IP ... -d ${external_ip}/32 -dport xxx -m addrtype --dst-type LOCAL -j KUBE-SVC-xxx
args = prepend(args, "-A", string(kubeServicesChain))
proxier.natRules.Write(
append(args, "-j", string(destChain))...)
} else {
// No endpoints.
// -A KUBE-EXTERNAL-SERVICES ...--comment ${svc-port-name} has no endpoints ... -d ${external_ip}/32 -dport xxx -j REJECT
}
}
/*** Capture externalIPs. ***/

load-balancer

1
// ... Capture load-balancer ingress. 省略

NodePort.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
      /*** 				Capture NodePort.		***/
// 如果我们有2条以上的规则,也许值得为节点端口规则做一个新的每个服务链,但只有2条规则,最终是一种浪费和认知上的负担。
if svcInfo.NodePort() != 0 {
// 保持本地端口的开放,因此没有其他进程可以打开它(因为套接字可能会打开,但它永远不会工作)。
if len(nodeAddresses) == 0 {
continue
}

lps := make([]netutils.LocalPort, 0)
for address := range nodeAddresses {
lp := netutils.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
IPFamily: localPortIPFamily,
Port: svcInfo.NodePort(),
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
lp.IP = ""
lps = append(lps, lp)
// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
break
}
lps = append(lps, lp)
}

// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())

proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
continue
}
klog.V(2).InfoS("Opened local port", "port", lp)
replacementPortsMap[lp] = socket
}
}
/*** Capture externalIPs. ***/


if hasEndpoints {
args = append(args[:0],
"-m", "comment", "--comment", svcNameString,
"-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort()),
)
//if hasEndpoints && if !svcInfo.NodeLocalExternal, 在NAT表写入:-A KUBE-NODEPORTS ... --comment ${svc-port-name} --dport {nodeport} -j KUBE-MARK-MASQ、 -A KUBE-NODEPORTS ... --comment ${svc-port-name} --dport ${nodeport} -j KUBE-SVC-xxx
if !svcInfo.NodeLocalExternal() {
// Nodeports need SNAT, unless they're local.
proxier.natRules.Write(
append(prepend(args, "-A", string(svcChain)), "-j", string(KubeMarkMasqChain))...)
// Jump to the service chain.
proxier.natRules.Write(
append(prepend(args, "-A", string(kubeNodePortsChain)), "-j", string(svcChain))...)
} else {
// 否则-A KUBE-NODEPORTS ... --comment ${svc-port-name} --dport ${nodeport} -s 127.0.0.0/8 -j KUBE-SVC-xxx、 -A KUBE-NODEPORTS ... --comment ${svc-port-name} --dport ${nodeport} -j KUBE-XLB-xxx
// Fix localhost martian source error
loopback := "127.0.0.0/8"
if isIPv6 {
loopback = "::1/128"
}
args = prepend(args, "-A", string(kubeNodePortsChain))
proxier.natRules.Write(
append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...)
proxier.natRules.Write(
append(args, "-j", string(svcXlbChain))...)
}
} else {
// No endpoints.
// 增加REJECT规则拒绝该流量, 省略
// -A KUBE-EXTERNAL-SERVICES ... -m addrtype --dst-type LOCAL ... --dport ${nodeport} -j REJECT
}
}
/*** Capture NodePort. ***/

上述代码主要针对四种情况做了规则处理:

  1. 为cluster_ip 设置访问规则
  • 为有endpints 的服务在KUBE-SERVICES 链上建立nat表规则(将规则写入proxier.natRules ,下同):
    • 如果设置了proxier.masqueradeAll , kube-proxy 会对所有目的地址是{cluster_ip:port}的包打标签,进 而做SNAT;或者如果指定了–cluster–cidr , kube-proxy 会对目的地址是{cluster_ip:port} 的集群外部(! -s ${cluster_cidr})流量包打标签,进而做SNAT;(以上规则二选一)
    • 总是将将目的地址是{cluster_ip:port} 的流量导入到KUBE-SVC-XXX
  • 如果服务没有endpoints, 在KUBE-SERVICES链上建立filter 规则,表示放弃访问目的地址{cluster_ip:port}的包。

得到的iptable命令如下:

1
2
3
4
5
-A KUBE-SERVICES ...--comment ${svc-port-name} cluster IP ...  -d ${cluster_ip}/32 -dport xxx  -j KUBE-MARK-MASQ  # if proxier.masqueradeAll =True
-A KUBE-SERVICES ... --comment ${svc-port-name} cluster IP ... -d ${cluster_ip}/32 -dport XXX ! -s ${cluster_cidr} -j KUBE-MARK-MASQ # else if len(proxier.clusterCIDR) > 0
-A KUBE-SERVICES ... --comment ${svc-port-name} cluster IP ... -d ${cluster_ip}/32 -dport xxx -j KUBE-SVC-XXX # 有endpoints 时总是添加此规则

-A KUBE-SERVICES ...--comment {svc-port-name} has no endpoints ... -d ${cluster_ip}/32 -dport xxx -j REJECT // 没有endpoint时,直接将发往此IP:Port的包丢弃
  1. 为externalIP 类型服务建立规则

    如果external IP 是本机IP,并且服务使用的协议不是SCTP, 生成结构体LocalPort 以记录这样的服务的external IP , port ,协议,以及描述信息。 确认在本机上打开服务端口(可以把这个socket理解为“占位符”,以便让操作系统为本机其他应用程序分配端口时让开该端口),并且添加{LocalPort :socket} 到replacementPortsMap。

    • 如果该服务有endpoints ,在KUBE-SERVICES 链添加 nat 表规则
      • 对于到external_ip:port 的包打标签;
      • 对于从集群外发送的目的地址是extenralIP 的包建立规则
      • 对于目的地址和node 地址相同的包建立规则
    • 如果该服务没有endpoints ,在KUBE-EXTERNAL-SERVICES 添加 filter 规则,表示放弃目的地址是{ {external_ip:xxx}的包

    得到的iptable命令如下:

    1
    2
    3
    4
    5
    6
    7
    -A KUBE-SERVICES ... --comment ${svc-port-name} external IP ... -d ${external_ip}/32 -dport xxx -j KUBE-MARK-MASQ

    -A KUBE-SERVICES ... --comment ${svc-port-name} external IP ... -d ${external_ip}/32 -dport xxx -m physdev ! --physdev-is-in -m addrtype ! --src-type LOCAL -j KUBE-SVC-xxx

    -A KUBE-SERVICES ... --comment ${svc-port-name} external IP ... -d ${external_ip}/32 -dport xxx -m addrtype --dst-type LOCAL -j KUBE-SVC-xxx

    -A KUBE-EXTERNAL-SERVICES ...--comment ${svc-port-name} has no endpoints ... -d ${external_ip}/32 -dport xxx -j REJECT
  2. 服务类型为LoadBalancer时,设置外部负载均衡相关规则

  3. 为NodePort 类型服务规则建立:

    得到的iptable命令如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    replacementPortsMap[lp] = proxier.portsMap[lp] ,并且打开端口

    //if hasEndpoints && if !svcInfo.OnlyNodeLocalEndpoints, 在NAT表写入:
    -A KUBE-NODEPORTS ... --comment ${svc-port-name} --dport {nodeport} -j KUBE-MARK-MASQ
    -A KUBE-NODEPORTS ... --comment ${svc-port-name} --dport ${nodeport} -j KUBE-SVC-xxx

    //if hasEndpoints && if svcInfo.OnlyNodeLocalEndpoints,在NAT表写入:
    -A KUBE-NODEPORTS ... --comment ${svc-port-name} --dport ${nodeport} -s 127.0.0.0/8 -j KUBE-SVC-xxx
    -A KUBE-NODEPORTS ... --comment ${svc-port-name} --dport ${nodeport} -j KUBE-XLB-xxx

    // !if hasEndpoints ,在Filter表写入:
    -A KUBE-EXTERNAL-SERVICES ... -m addrtype --dst-type LOCAL ... --dport ${nodeport} -j REJECT

以上讲解了IPTables模式下将流量转发到相应的具体的KUBE-SERVICE-XXX上,而之后还需要解决的是将service的流量发给具体的EndPoint即执行Pod——建立 endpoints 相关的链和规则

  1. 为同一个service 的所有endpoints 在nat 表建立链 KUBE-SEP-XXX : KUBE-SEP-XXX -[0:0],并且记录 activeNATChains[endpointChain] = true,endpointChain为endpointChain := epInfo.endpointChain(svcNameString, protocol)中创建的KUBE-SEP-XXX

  2. 如果服务设置了”clientIP“ 亲和性, 则为该服务的每一个endpoint 设置会话亲和性-A KUBE-SVC-XXX -m recent --name KUBE-SEP-XXX --rcheck --seconds xxx --reap -j KUBE-SEP-XXX //多个endpoints,则有多条类似规则

  3. 在endpointsChain 链上建立NAT规则

    • 对于多个endpoints (n >1) ,利用iptables 的随机和概率转发的功能,循环建立规则。概率计算是通过查表(precomputeProbabilities 字符串数组)或者现场计算(n>= len(precomputeProbabilities) 的方式完成。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      /** 概率是通过1.0/float64(n-i)计算出来的,n 代表endpoints的个数 */
      // 前n-1个endpoints使用此规则
      -A KUBE-SVC-XXX -m static --mode random --probability xxx -j KUBE-SEP-XXX
      // 第n个endpoint 建立此规则
      -A KUBE-SVC-XXX -j KUBE-SEP-XXX

      -A KUBE-SEP-XXX -s ${endpoint_ip}/32 -j KUBE-MARK-MASQ
      -A KUBE-SEP-XXX -m recent --name KUBE-SEP-XXX --set -j DNAT --to-destination xxx // 如果设置了会话亲和性,写入该条规则
      -A KUBE-SEP-XXX -j DNAT --to-destination xxx //如果没有设置会话亲和性,写入该条规则

      在KUBE-SEP-XXX链上通过DNAT规则,真正把流量交到了对应POD的服务上。

    • 如果服务还具有OnlyNodeLocalEndpoints 属性,表示只将流量导入到本机上的后端pod上。挑选出和proxy 在相同机器运行的endpoints,在nat 表建立如下规则

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      -A KUBE-XLB-XXX ... -s ${cluster-ip} -j KUBE-SVC-XXX  // 设置了clusterCIDR

      //如果没有Local POD
      -A KUBE-XLB-XXX ... --comment ${svc-port-name} has no local endpoints -j KUBE-MARK-DROP
      //如果有Local POD
      -A KUBE-XLB-XXX ... -m recent --name KUBE-SEP-XXX -rchenck --seconds xxx -j KUBE-SEP-XXX //设置了亲和性

      //如果有多个pods,设置
      -A KUBE-XLB-XXX ... -m static --mode --probability xxx -j KUBE-SEP-XXX
      -A KUBE-XLB-XXX ... -m static --mode --probability xxx -j KUBE-SEP-XXX

至此,我们看完了func (proxier *Proxier) syncProxyRules()中的核心内容:遍历完成了对serviceMap 中所有服务及对应的endpoints 建立规则。针对Service的不同类型,我们也知道了为什么可以Work。也知道了为什么本地服务可以通过ClusterIP:Port的方式找到服务,外网的请求可以通过NodeIP:NodePort的方式找到服务。至于如何找到相应Pod的再详细说明可以见网上的样例分析:kubernetes入门之kube-proxy实现原理

总结图

最后的最后,放张我自己画的CLI启动ProxyServer Run()的顺序图,和大佬总结的资源更新示意图以及链、规则建立顺序图吧。

Proxy

seq

iptables

Reference:

  1. spf13/cobra
  2. flag–命令行参数解析之StringVar
  3. 【kubernetes/k8s概念】kube-proxy启动参数
  4. k8s源码分析——kube-proxy 源码分析——梳理了整体的执行过程
  5. kubernetes入门之kube-proxy实现原理——分析了外网如何通过NodePort找到相应EndPoint的案例
  6. 理解kubernetes环境的iptables——对链上关系分析的比较好

Author: Mrli

Link: https://nymrli.top/2021/10/26/k8s之kube-proxy源码分析/

Copyright: All articles in this blog are licensed under CC BY-NC-SA 3.0 unless stating additionally.

< PreviousPost
浙江大学期末考试——Go语言
NextPost >
Java Logger
CATALOG
  1. 1. kubernetes 简单介绍
    1. 1.1. 背景介绍
    2. 1.2. 架构设计
    3. 1.3. 代码整体分析
      1. 1.3.1. proxy源码目录结构分析
  2. 2. kube-proxy分析
    1. 2.1. 重要结构体说明
      1. 2.1.1. ProxyServer
      2. 2.1.2. Options
      3. 2.1.3. Proxier
    2. 2.2. 程序启动过程
      1. 2.2.1. ProxyServer创建过程
      2. 2.2.2. iptables proxy server
      3. 2.2.3. ProxyServer.Run()
    3. 2.3. 记录资源变化
    4. 2.4. IPTables相关
      1. 2.4.1. Iptables 创建KUBE自定义链和规则
      2. 2.4.2. Service类型分析
    5. 2.5. 总结图
  3. 3. Reference: