kubeproxy源码分析Word格式文档下载.docx
- 文档编号:441840
- 上传时间:2023-04-28
- 格式:DOCX
- 页数:40
- 大小:72.76KB
kubeproxy源码分析Word格式文档下载.docx
《kubeproxy源码分析Word格式文档下载.docx》由会员分享,可在线阅读,更多相关《kubeproxy源码分析Word格式文档下载.docx(40页珍藏版)》请在冰点文库上搜索。
│├──http.go
│├──listener.go
│└──worker.go
├──iptables//proxymode为iptables的实现
│├──proxier.go
│└──proxier_test.go
├──types.go
├──userspace//proxymode为userspace的实现
│├──loadbalancer.go
│├──port_allocator.go
│├──port_allocator_test.go
│├──proxier_test.go
│├──proxysocket.go
│├──rlimit.go
│├──rlimit_windows.go
│├──roundrobin.go
│├──roundrobin_test.go
│└──udp_server.go
└──winuserspace//windowsOS时,proxymode为userspace的实现
├──loadbalancer.go
├──port_allocator.go
├──port_allocator_test.go
├──proxier.go
├──proxier_test.go
├──proxysocket.go
├──roundrobin.go
├──roundrobin_test.go
└──udp_server.go
源码分析
main
kube-proxy的main入口在:
cmd/kube-proxy/proxy.Go:
39
funcmain(){
//创建kube-proxy的默认config对象
config:
=options.NewProxyConfig()
//用kube-proxy命令行的参数替换默认参数
config.AddFlags(pflag.CommandLine)
flag.InitFlags()
logs.InitLogs()
deferlogs.FlushLogs()
verflag.PrintAndExitIfRequested()
//根据config创建ProxyServer
s,err:
=app.NewProxyServerDefault(config)
iferr!
=nil{
fmt.Fprintf(os.Stderr,"
%v\n"
err)
os.Exit
(1)
}
//执行Run方法让kube-proxy开始干活了
iferr=s.Run();
err!
}
main方法中,我们重点关注app.NewProxyServerDefault(config)创建ProxyServer和Run方法。
创建ProxyServer
NewProxyServerDefault负责根据提供的config参数创建一个新的ProxyServer对象,其代码比较长,逻辑相对复杂,下面会挑重点说一下。
cmd/kube-proxy/app/server.go:
131
funcNewProxyServerDefault(config*options.ProxyServerConfig)(*ProxyServer,error){
...
//Createaiptablesutils.
execer:
=exec.New()
ifruntime.GOOS=="
windows"
{
netshInterface=utilnetsh.New(execer)
}else{
dbus=utildbus.New()
iptInterface=utiliptables.New(execer,dbus,protocol)
//设置OOM_SCORE_ADJ
varoomAdjuster*oom.OOMAdjuster
ifconfig.OOMScoreAdj!
oomAdjuster=oom.NewOOMAdjuster()
iferr:
=oomAdjuster.ApplyOOMScoreAdj(0,int(*config.OOMScoreAdj));
glog.V
(2).Info(err)
//CreateaKubeClient
//创建eventBroadcaster和eventrecorder
hostname:
=nodeutil.GetHostname(config.HostnameOverride)
eventBroadcaster:
=record.NewBroadcaster()
recorder:
=eventBroadcaster.NewRecorder(v1.EventSource{Component:
"
kube-proxy"
Host:
hostname})
//定义proxier和endpointsHandler,分别用于处理services和endpoints的updateevent。
varproxierproxy.ProxyProvider
varendpointsHandlerproxyconfig.EndpointsConfigHandler
//从config中获取proxymode
proxyMode:
=getProxyMode(string(config.Mode),client.Core().Nodes(),hostname,iptInterface,iptables.LinuxKernelCompatTester{})
//proxymode为iptables场景
ifproxyMode==proxyModeIPTables{
glog.V(0).Info("
UsingiptablesProxier."
)
ifconfig.IPTablesMasqueradeBit==nil{
//IPTablesMasqueradeBitmustbespecifiedordefaulted.
returnnil,fmt.Errorf("
UnabletoreadIPTablesMasqueradeBitfromconfig"
//调用pkg/proxy/iptables/proxier.go:
222中的iptables.NewProxier来创建proxier,赋值给前面定义的proxier和endpointsHandler,表示由该proxier同时负责service和endpoint的event处理。
proxierIPTables,err:
=iptables.NewProxier(iptInterface,utilsysctl.New(),execer,config.IPTablesSyncPeriod.Duration,config.IPTablesMinSyncPeriod.Duration,config.MasqueradeAll,int(*config.IPTablesMasqueradeBit),config.ClusterCIDR,hostname,getNodeIP(client,hostname))
glog.Fatalf("
Unabletocreateproxier:
%v"
proxier=proxierIPTables
endpointsHandler=proxierIPTables
//Noturningback.RemoveartifactsthatmightstillexistfromtheuserspaceProxier.
Tearingdownuserspacerules."
userspace.CleanupLeftovers(iptInterface)
}
//proxymode为userspace场景
else{
UsinguserspaceProxier."
//Thisisaproxy.LoadBalancerwhichNewProxierneedsbuthasmethodswedon'
tneedfor
//ourconfig.EndpointsConfigHandler.
loadBalancer:
=userspace.NewLoadBalancerRR()
//setEndpointsConfigHandlertoourloadBalancer
endpointsHandler=loadBalancer
varproxierUserspaceproxy.ProxyProvider
//windowsOS场景下,调用pkg/proxy/winuserspace/proxier.go:
146的winuserspace.NewProxier来创建proxier。
proxierUserspace,err=winuserspace.NewProxier(
loadBalancer,
net.ParseIP(config.BindAddress),
netshInterface,
*utilnet.ParsePortRangeOrDie(config.PortRange),
//TODO@piresreplacebelowwithdefaultvalues,ifapplicable
config.IPTablesSyncPeriod.Duration,
config.UDPIdleTimeout.Duration,
)
//linuxOS场景下,调用pkg/proxy/userspace/proxier.go:
143的userspace.NewProxier来创建proxier。
proxierUserspace,err=userspace.NewProxier(
iptInterface,
config.IPTablesMinSyncPeriod.Duration,
proxier=proxierUserspace
//Removeartifactsfromthepure-iptablesProxier,ifnotonWindows.
ifruntime.GOOS!
="
Tearingdownpure-iptablesproxyrules."
iptables.CleanupLeftovers(iptInterface)
//Addiptablesreloadfunction,ifnotonWindows.
iptInterface.AddReloadFunc(proxier.Sync)
//Createconfigs(i.e.WatchesforServicesandEndpoints)
//创建serviceConfig负责service的watchforUpdates
serviceConfig:
=proxyconfig.NewServiceConfig()
//给serviceConfig注册proxier,既添加对应的listener用来处理serviceupdate时逻辑。
serviceConfig.RegisterHandler(proxier)
//创建endpointsConfig负责endpoint的watchforUpdates
endpointsConfig:
=proxyconfig.NewEndpointsConfig()
//给endpointsConfig注册endpointsHandler,既添加对应的listener用来处理endpointupdate时的逻辑。
endpointsConfig.RegisterHandler(endpointsHandler)
//NewSourceAPIcreatesconfigsourcethatwatchesforchangestotheservicesandendpoints.
//NewSourceAPI通过ListWatchapiserver的Service和endpoint,并周期性的维护serviceStore和endpointStore的更新
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
config.ConfigSyncPeriod,
serviceConfig.Channel("
api"
),//ServiceUpdateChannel
endpointsConfig.Channel("
),//endpointupdatechannel
//把前面创建的对象作为参数,构造出ProxyServer对象。
returnNewProxyServer(client,config,iptInterface,proxier,eventBroadcaster,recorder,conntracker,proxyMode)
NewProxyServerDefault中的核心逻辑我都已经在上述代码中添加了注释,其中有几个地方需要我们再深入跟进去看看:
proxyconfig.NewServiceConfig,proxyconfig.NewEndpointsConfig,serviceConfig.RegisterHandler,endpointsConfig.RegisterHandler,proxyconfig.NewSourceAPI。
proxyconfig.NewServiceConfig
我们对ServiceConfig的代码分析一遍,EndpointsConfig的代码则类似。
pkg/proxy/config/config.go:
192
funcNewServiceConfig()*ServiceConfig{
//创建updateschannel
updates:
=make(chanstruct{},1)
//构建serviceStore对象
store:
=&
serviceStore{updates:
updates,services:
make(map[string]map[types.NamespacedName]api.Service)}
mux:
=config.NewMux(store)
//新建Broadcaster,在后续的serviceConfig.RegisterHandler会注册该Broadcaster的listener。
bcaster:
=config.NewBroadcaster()
//启动协程,马上开始watchupdateschannel
gowatchForUpdates(bcaster,store,updates)
return&
ServiceConfig{mux,bcaster,store}
下面我们再跟进watchForUpdates去看看。
292
funcwatchForUpdates(bcaster*config.Broadcaster,accessorconfig.Accessor,updates<
-chanstruct{}){
fortrue{
<
-updates
bcaster.Notify(accessor.MergedState())
watchForUpdates就是一直在watchupdateschannel,如果有数据,则立刻由该BroadcasterNotify到注册的listeners。
Notify的代码如下,可见,它负责将数据通知给所有的listener,并调用各个listener的OnUpdate方法。
pkg/util/config/config.go:
133
//Notifynotifiesalllisteners.
func(b*Broadcaster)Notify(instanceinterface{}){
b.listenerLock.RLock()
listeners:
=b.listeners
b.listenerLock.RUnlock()
for_,listener:
=rangelisteners{
listener.OnUpdate(instance)
func(fListenerFunc)OnUpdate(instanceinterface{}){
f(instance)
serviceConfig.RegisterHandler
上面分析的proxyconfig.NewServiceConfig负责创建ServiceConfig,开始watchupdateschannel了,当从channel中取到值的时候,Broadcaster就会通知listener进行处理。
serviceConfig.RegisterHandler正是负责给Broadcaster注册listener的,其代码如下。
205
func(c*ServiceConfig)RegisterHandler(handlerServiceConfigHandler){
//给ServiceConfig的Broadcaster注册listener。
c.bcaster.Add(config.ListenerFunc(func(instanceinterface{}){
glog.V(3).Infof("
Callinghandler.OnServiceUpdate()"
handler.OnServiceUpdate(instance.([]api.Service))
}))
上面分析proxyconfig.NewServiceConfig时可知,当从updateschannel中取到值的时候,最终会调用对应的ListenerFunc(instance)进行处理,在这里,也就是调用:
即调用到handler.OnServiceUpdate。
每种proxymode对应的proxier都有对应的handler.OnServiceUpdate接口实现,我们以iptablesmode为例,看看handler.OnServiceUpdate的实现:
pkg/proxy/iptables/proxier.go:
428
func(proxier*Proxier)OnServiceUpdate(allServices[]api.Service){
proxier.syncProxyRules()
proxier.deleteServiceConnections(staleUDPServices.List())
因此,最终关键的逻辑都转向了proxier.syncProxyRules(),从我们上面给出的内部模块交互图
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- kubeproxy 源码 分析