杜军 | 2015-01-22
Kube-proxy是kubernetes 里运行在minion节点上的一个组件, 它起的作用是一个服务代理的角色. 本文的内容将分为以下两部分, 源代码来自kubernetes release-0.8.1, 代码有删节,省略的代码或log输出用...表示:
1 Kube-proxy 简介
2 Kube-proxy代码解读
1 Kube-proxy简介
Kube-proxy网络代理运行在每个minion节点上。网上很多人所说这个proxy是kubernetes里的SDN组件,我本人并不这么认为, 我认为可以把他看成是一个高级的反向代理.它的功能反映了定义在每个节点上Kubernetes API中的Services信息,并且可以做简单的TCP流转发或在一组服务后端做round robin的流转发.服务端点目前通过与docker link 兼容的环境变量指定的端口被发现, 这些端口由服务代理打开.目前,用户必须在代理上选择一个端口以暴露服务.
2 Kube-proxy代码解读
Kube-proxy代码入口定义在cmd/kube-proxy/proxy.go中, 源代码较长,分段解读如下:
func main() {
flag.Parse()
util.InitLogs()
defer util.FlushLogs()
if err := util.ApplyOomScoreAdj(*oomScoreAdj); err != nil {
glog.Info(err)
}
首先
1. 使用flag pkg初始化命令行参数到相应的变量, 如etcd_servers选项
2. 初始化log
3. 应用oomScoreAdj参数到/proc/self/oom_score_adj文件. oom_score_adj 是-1000到1000的数值, 用来表征进程当发生OOM(out of memory)时系统对该进程的行为,值越低越不容易被杀死.默认值是-899.
4. 使用下列两个函数新建两个重要的数据结构 ServiceConfig 和 EndpointsConfig
serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()
由于proxy和kubernetes Service概念关系很大, 强烈建议读者访问kubernetes Service官方文档了解其基本概念. 先介绍ServiceConfig结构体和相关操作NewServiceConfig(), 源代码如下, 定义在pkg/proxy/config/config.go中
type ServiceConfig struct {
mux *config.Mux
bcaster *config.Broadcaster
store *serviceStore
}
func NewServiceConfig() *ServiceConfig {
updates := make(chan struct{})
store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
return &ServiceConfig{mux, bcaster, store}
}
ServiceConfig结构体跟踪记录Service配置信息的变化,他接受通过channel传递的Sevice上”set”, ”add”, ”remove”的操作, 并使用相应的handler函数处理这些变化. ServiceConfig通过组合的方式将config.Mux, config.Broadcaster和serviceStore类集成起来. 并启动一个goroutine watchForUpdates(bcaster, store, updates), watchForUpdates()函数源代码如下:
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
for _ = range updates {
bcaster.Notify(accessor.MergedState())
}
}
他的作用是当updates channel有可用的信息时, 调用bcaster.Notify()函数以广播的方式将MergedState()处理后的serivces信息通知到各个Listener. 其中MergeState实际调用的是(s *serviceStore) MergedState(); bcaster.Notify()函数源码如下, 它定义在pkg/util/config/config.go中
// 通知所有listener.
func (b *Broadcaster) Notify(instance interface{}) {
b.listenerLock.RLock()
listeners := b.listeners
b.listenerLock.RUnlock()
for _, listener := range listeners {
listener.OnUpdate(instance)
}
}
listener 相关的结构体源代码如下, 可以看出_golang interface_的飘逸灵活用法:
type Listener interface {
// OnUpdate 在每次object 有变化时被调用.
OnUpdate(instance interface{})
}
// ListenerFunc 是一个函数类型,他以下面的方式实现了OnUpdate()函数.
type ListenerFunc func(instance interface{})
func (f ListenerFunc) OnUpdate(instance interface{}) {
f(instance)
}
NewServiceConfig函数最后返回这个新建的ServiceConfig结构体.
EndpointsConfig 结构体类似于ServiceConfig结构体, 相关的操作也与Service极其相似,也是采用组合的方式将config.Mux, config.Broadcaster, endpointsStore组合起来, 并马上启动一个goroutine watchForUpdates(),调用的和之前的watchForUpdates相同.
5. 新建loadBalancerRR和代理proxier
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress), iptables.New(exec.New(), protocol))
if proxier == nil {
glog.Fatalf("failed to create proxier, aborting")
}
5.1 先介绍loadBalancer,源代码定义在pkg/proxy/roundrobin.go中,如下:
// LoadBalancerRR is 是一个使用round robin方式的负载均衡器的实现.
type LoadBalancerRR struct {
lock sync.RWMutex
endpointsMap map[string][]string //记录每个service的后端endpoint映射
rrIndex map[string]int //记录每个service下一次round robin轮到的后端index
serviceDtlMap map[string]serviceDetail //保存服务的名字和细节的映射
}
// NewLoadBalancerRR 返回一个新的LoadBalancerRR结构体.
func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{
endpointsMap: make(map[string][]string),
rrIndex: make(map[string]int),
serviceDtlMap: make(map[string]serviceDetail),
}
}
5.2 proxier源代码定义在pkg/proxy/proxier中, 如下
// Proxier 结构体实现了一个简单的tcp代理
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // 保护下面的serviceMap数据结构
serviceMap map[string]*serviceInfo
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
}
// NewProxier 新建并返回一个Proxier结构体
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface) *Proxier {
...
hostIP, err := chooseHostInterface()
...
// 清楚旧的iptables rules.
iptablesDeleteOld(iptables)
// 初始化 iptables.
if err := iptablesInit(iptables); err != nil {
glog.Errorf("Failed to initialize iptables: %v", err)
return nil
}
if err := iptablesFlush(iptables); err != nil {
glog.Errorf("Failed to flush iptables: %v", err)
return nil
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo),
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
}
}
如上面的代码注释所说NewProxier函数初始化主机上的iptables信息. proxier组成的元素主要包含刚刚创建的loadBalancer对象和一个iptables的执行器,他负责根据service的变化信息更新iptables设置.
6. 将刚刚新建的proxier和loadBalancer分别向servicesConfig和endpointsConfig绑定
serviceConfig.RegisterHandler(proxier)
endpointsConfig.RegisterHandler(loadBalancer)
他的作用是当service或endpoint的配置信息发生变化时,就调用proxier或loadbalancer的相关函数.
下面先介绍serviceConfig.RegisterHandler(proxier),源代码如下:
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Service))
}))
}
这个函数的功能是将proxier的OnUpdate函数注册添加到上文说的serviceConfig的bcaster的Listener[] slice里,当从channel收到新的可用信息, 实际调用的是下面的OnUpdate()函数即Proxier对象的OnUpdate(). 这个函数很重要,由于代码段比较长,下面分段解释,他定义在pkg/proxy/proxier.go 中.
首先看一下传进参数services的值
func (proxier *Proxier) OnUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
从后端的一台minion机器上的kube-proxy.log查看一下传进来的services信息如下:
I0112 09:48:02.876275 23642 proxier.go:443] Received update notice: [... ObjectMeta:{Name:redis-master ...
Spec:{Port:6379 Protocol:TCP Selector:map[name:redis-master] PortalIP:11.1.1.83 ProxyPort:0 CreateExternalLoadBalancer:false
PublicIPs:[] ContainerPort:{Kind:0 IntVal:6379 StrVal:} SessionAffinity:}]
如上展示的是kubernetes官方guest-example的redis-master的service, 他的重要参数是Spec这个字段,他包含service portal信息. 再继续往下进行,下面的代码段将刚收到的services信息和本地已有的通过info, exists := proxier.getServiceInfo(service.Name) 函数获得的info作比较. 如果service有更改, 则在后台重启或使用addServiceOnPort()函数新增代理:
activeServices := util.StringSet{}
for _, service := range services {
activeServices.Insert(service.Name)
info, exists := proxier.getServiceInfo(service.Name)
serviceIP := net.ParseIP(service.Spec.PortalIP)
//service 信息完全不变,不做任何操作
if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) {
continue
}
// service 信息发生改变,如portalPort或portalIP发生变化时,则重启
if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) {
glog.V(4).Infof("Something changed for service %q: stopping it", service.Name)
err := proxier.closePortal(service.Name, info)
...
err = proxier.stopProxy(service.Name, info)
...
}
...
info, err := proxier.addServiceOnPort(service.Name, service.Spec.Protocol, service.Spec.ProxyPort, udpIdleTimeout)
其中核心的addServiceOnPort()函数源代码如下, 他的最主要的作用是为每个service开启一个代理socket, 其中分配一个随机的port号为portNum, 并启动一个goroutine sock.ProxyLoop() 进行代理循环,这个函数定义在pkg/proxy/proxier.go中,它实现了一个round robin代理的逻辑,供读者自行阅读,这里由于篇幅所限不赘述:
func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
// 新建一个代理socket
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
...
_, portStr, err := net.SplitHostPort(sock.Addr().String())
...
portNum, err := strconv.Atoi(portStr)
...
//将sessionAffinityType设置为None, 默认没有sessionAffinity
si := &serviceInfo{
proxyPort: portNum,
protocol: protocol,
socket: sock,
timeout: timeout,
sessionAffinityType: api.AffinityTypeNone,
stickyMaxAgeMinutes: 180,
}
//将新建的serviceInfo保存进proxier的serviceMap数据结构里
proxier.setServiceInfo(service, si)
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, proxier *Proxier) {
defer util.HandleCrash()
sock.ProxyLoop(service, proxier)
}(service, proxier)
return si, nil
}
观察后台kube-proxy log 如下, 49263 就是个随机分配的port:
I0114 14:50:18.388765 21305 proxier.go:427] Proxying for service "redis-master" on TCP port 49623
回到OnUpdate函数里,之后将最新的service信息保存到info数据, 由于proxier.getServiceInfo(service.Name) 返回的是指针, 所以最后实际上保存到了proxier.serviceMap这个数据结构里.
info.portalIP = serviceIP
info.portalPort = service.Spec.Port
info.publicIP = service.Spec.PublicIPs
info.sessionAffinityType = service.Spec.SessionAffinity
info.stickyMaxAgeMinutes = 180
最后调用openPortal函数打开servicePortal
err = proxier.openPortal(service.Name, info)
他实际调用的是如下的openOnePortal函数,他的源码如下,定义在pkg/proxy/proxier.go中
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) error {
// 处理containers之间的流量.
args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...)
...
if !existed {
glog.Infof("Opened iptables from-containers portal for service %q on %s %s:%d", name, protocol, portalIP, portalPort)
}
...
}
他的作用是保证根据上文的service信息添加了正确的iptables rule, 并将相应的规则添加到iptables NAT表里, 最后调用的是pkg/util/iptables/iptables.go中的run函数,源代码如下:
func (runner *runner) run(op operation, args []string) ([]byte, error) {
iptablesCmd := runner.iptablesCommand()
fullArgs := append([]string{string(op)}, args...)
glog.V(1).Infof("running iptables %s %v", string(op), args)
return runner.exec.Command(iptablesCmd, fullArgs...).CombinedOutput()
}
从后台的kube-proxy log中可以看到下面的iptables操作
I0112 12:38:03.870741 7284 iptables.go:169] running iptables -N [KUBE-PROXY -t nat]
I0112 12:38:03.877783 7284 iptables.go:169] running iptables -C [PREROUTING -t nat -j KUBE-PROXY]
I0112 12:38:03.885954 7284 iptables.go:169] running iptables -C [OUTPUT -t nat -j KUBE-PROXY]
I0112 12:38:03.893237 7284 iptables.go:169] running iptables -C [KUBE-PROXY -t nat -m comment --comment redis-master -p tcp -m tcp -d 11.1.1.83/32 --dport 6379 -j REDIRECT --to-ports 37292]
I0112 12:38:03.904520 7284 iptables.go:169] running iptables -C [KUBE-PROXY -t nat -m comment --comment redisslave -p tcp -m tcp -d 11.1.1.176/32 --dport 6379 -j REDIRECT --to-ports 37334]
使用命令$ iptables -t nat -L 可以看到结果如下:
Chain KUBE-PROXY (2 references)
target prot opt source destination
REDIRECT tcp -- anywhere 11.1.1.83 /* redis-master */ tcp dpt:6379 redir ports 37292
REDIRECT tcp -- anywhere 11.1.1.176 /* redisslave */ tcp dpt:6379 redir ports 37334
REDIRECT tcp -- anywhere 11.1.1.1 /* redis-master2 */ tcp dpt:6379 redir ports 41074
REDIRECT tcp -- anywhere 11.1.1.169 /* kubernetes */ tcp dpt:https redir ports 54541
REDIRECT tcp -- anywhere 11.1.1.91 /* kubernetes-ro */ tcp dpt:http redir ports 35201
可以看到minion主机上的iptable新增了KUBE-PROXY CHAIN , 并且采用的是REDIRECT target. 这个CHAIN有两个引用, 分别是PREROUTING 和 OUTPUT CHAIN. 最后的结果是容器里的应用如果想通过11.1.1.176:6379 访问redisslave服务时,流量被REDIRECT到了本机的37334端口, 通过这个本地37334端口,再使用刚刚说到的每个service提供的代理socket导向到真正后端. 如下面的终端输出所示.这里的 11.1.1.176相当于一个虚拟的ip. 这个ip范围可在启动 kube-apiserver时指定portal_net参数指定.
vcap@224:~$ nc -zv 11.1.1.176 6379
Connection to 11.1.1.176 6379 port [tcp/*] succeeded!
再使用proxier.loadBalancer.NewService函数
proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes)
源代码定义在pkg/proxy/roundrobin.go中,源代码如下:
func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error {
if stickyMaxAgeMinutes == 0 {
stickyMaxAgeMinutes = 180 //单位分钟
}
if _, exists := lb.serviceDtlMap[service]; !exists {
lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes)
glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service])
}
return nil
}
可以看到一个很重要的操作是维护serviceDtlMap这样一个golang map数据结构, key是相应service name. 这里sessionAffinityType有基于clientIPAddress的方式,也有最普通的None模式,也就是常用的round robin方式无sessionAffinity, 如下面的log就是创建了一个名为redis-master的sessionAffinityType为 None的 service.
I0112 12:37:53.824530 7284 roundrobin.go:83] NewService. Service does not exist. So I created it: {name:redis-master sessionAffinityType:None sessionAffinityMap:map[] stickyMaxAgeMinutes:180}
OnUpdate函数最后的活动是对未活动的service, 即要被删除掉的service 关闭portal, 删除指定的iptables rule, 关闭代理套接字. 代码如下:
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices.Has(name) {
...
err := proxier.closePortal(name, info)
...
err = proxier.stopProxyInternal(name, info)
...
}
}
7.回到main函数,下一步指定变化的源 ,一般采用etcd_server ,即调用下面代码的else部分
if clientConfig.Host != "" {
...
} else {
var etcdClient *etcd.Client
// 创建 etcd client
if len(etcdServerList) > 0 {
...
etcdClient = etcd.NewClient(etcdServerList)
} ...
if etcdClient != nil {
glog.Infof("Using etcd servers %v", etcdClient.GetCluster())
config.NewConfigSourceEtcd(etcdClient,
serviceConfig.Channel("etcd"),
endpointsConfig.Channel("etcd"))
}
}
流程为 7.1 先使用给定的命令行参数etcd_servers 创建一个新的etcdClient对象, 使用的pkg是coreos的github.com/coreos/go-etcd/etcd pkg. 查看后端kube-proxy的log如下
I0107 10:30:25.561301 5921 proxy.go:117] Using etcd servers [http://127.0.0.1:4001]
7.2 之后使用config.NewConfigSourceEtcd(etcdClient, serviceConfig.Channel(“etcd”), endpointsConfig.Channel(“etcd”))函数. 注意到这个函数的后两个参数是serviceConfig.Channel(“etcd”)和endpointsConfig.Channel(“etcd”)的返回值. 这两个函数除了make创建出相应的channel之外还做了一些额外的操作值得注意! 下面介绍其中的一个serviceConfig.Channel(“etcd”), endpointsConfig.Channel(“etcd”)雷同.
func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
ch := c.mux.Channel(source)
serviceCh := make(chan ServiceUpdate)
go func() {
for update := range serviceCh {
ch <- update
}
close(ch)
}()
return serviceCh
}
他调用Mux里的Channel函数创建一个新的channel ch, 并且创建一个传送ServiceUpdate 的channel serviceCh, 一旦serviceCh有可用的信息如update就将他写入ch. 之后再回来看一下ConfigSourceEtcd结构体和相关的NewConfigSourceEtcd函数,它的源代码定义在pkg/proxy/config/etcd.go, 代码如下:
type ConfigSourceEtcd struct {
client *etcd.Client
serviceChannel chan ServiceUpdate
endpointsChannel chan EndpointsUpdate
interval time.Duration
}
func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd {
config := ConfigSourceEtcd{
client: client,
serviceChannel: serviceChannel,
endpointsChannel: endpointsChannel,
interval: 2 * time.Second,
}
go config.Run()
return config
}
NewConfigSourceEtcd作用是创建一个新的ConfigSourceEtcd结构体, 这个结构体里封装了etcdClient和两个传输信息的Channel: 1 serviceChannel 2 endpointsChannel.并启动一个goroutine config.Run()函数.源代码如下:
// Run函数watch etcd 上与service 和endpoint相关的key的变化,并将新的处理好的serviceUpdate和endpointUpate值输出到指定channel
// 如刚刚提到的1 serviceChannel 2 endpointsChannel.
func (s ConfigSourceEtcd) Run() {
...
for {
// 通过查询etcd上/registry/services/specs下的相关key获得service 和endpoint列表
services, endpoints, err = s.GetServices()
if err != nil {
...
} else {
// 传输到相应channel,如刚刚说到的(c *ServiceConfig) Channel函数里的所谓serviceCh channel
if len(services) > 0 {
serviceUpdate := ServiceUpdate{Op: SET, Services: services}
s.serviceChannel <- serviceUpdate
}
...
}
time.Sleep(30 * time.Second)
}
}
8.启动health监控
if *healthz_port > 0 {
go util.Forever(func() {
err := http.ListenAndServe(bindAddress.String()+":"+strconv.Itoa(*healthz_port), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second)
}
目前什么http的HandleFunc都没有,访问healthz_port会遇到404 not found error.
9.最后调用SyncLoop()
// Just loop forever for now...
proxier.SyncLoop()
其源代码定义在pkg/proxy/proxier.go中,如下:
func (proxier *Proxier) SyncLoop() {
for {
select {
// 每隔syncInterval 默认值是5s进行一次,保证iptables设置及portal正常工作
// 并清理除AffinityTypeNone之外的超时session
case <-time.After(syncInterval):
glog.V(2).Infof("Periodic sync")
if err := iptablesInit(proxier.iptables); err != nil {
glog.Errorf("Failed to ensure iptables: %v", err)
}
proxier.ensurePortals()
proxier.cleanupStaleStickySessions()
}
}
}
总结:
1 每个Kubernetes节点都运行一个kube-proxy这么一个服务代理,它 watch kubernetes 集群里 service 和 endpoints(label是某一特定条件的pods)这两个对象的增加或删除, 并且维护一个service 到 endpoints的映射. 他使用iptables REDIRECT target将对服务的请求流量转接到本地的一个port上, 然后再将流量发到后端, 这样的转发还支持一些策略, 如round robin等,所以我们可以把他看成是一个具有高级功能的反向代理。
2 下面为Kube-proxy内部goroutine及channel 使用图,以service信息传递为例
