博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kubernetes源码阅读笔记——Kubelet(之二)
阅读量:5011 次
发布时间:2019-06-12

本文共 18068 字,大约阅读时间需要 60 分钟。

这一篇文章我们先从NewMainKubelet开始。

一、NewMainKubelet

cmd/kubelet/kubelet.go// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.// No initialization of Kubelet and its modules should happen here.func NewMainKubelet(......) (*Kubelet, error) {   ...   if kubeDeps.PodConfig == nil {      var err error      kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)      if err != nil {         return nil, err      }   }   containerGCPolicy := kubecontainer.ContainerGCPolicy{      MinAge:             minimumGCAge.Duration,      MaxPerPodContainer: int(maxPerPodContainerCount),      MaxContainers:      int(maxContainerCount),   }   daemonEndpoints := &v1.NodeDaemonEndpoints{      KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},   }   imageGCPolicy := images.ImageGCPolicy{      MinAge:               kubeCfg.ImageMinimumGCAge.Duration,      HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),      LowThresholdPercent:  int(kubeCfg.ImageGCLowThresholdPercent),   }   ...   serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})   if kubeDeps.KubeClient != nil {      serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())      r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)      go r.Run(wait.NeverStop)   }   serviceLister := corelisters.NewServiceLister(serviceIndexer)   nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})   if kubeDeps.KubeClient != nil {      fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()      nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)      r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)      go r.Run(wait.NeverStop)   }   nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}   // TODO: get the real node object of ourself,   // and use the real node name and UID.   // TODO: what is namespace for node?   nodeRef := &v1.ObjectReference{      Kind:      "Node",      Name:      string(nodeName),      UID:       types.UID(nodeName),      Namespace: "",   }   containerRefManager := kubecontainer.NewRefManager()   oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)   ...   klet := &Kubelet{      hostname:                                hostname,      hostnameOverridden:                      len(hostnameOverride) > 0,      nodeName:                                nodeName,      kubeClient:                              kubeDeps.KubeClient,      csiClient:                               kubeDeps.CSIClient,      heartbeatClient:                         kubeDeps.HeartbeatClient,      onRepeatedHeartbeatFailure:              kubeDeps.OnHeartbeatFailure,      rootDirectory:                           rootDirectory,      resyncInterval:                          kubeCfg.SyncFrequency.Duration,      sourcesReady:                            config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),      registerNode:                            registerNode,      registerWithTaints:                      registerWithTaints,      registerSchedulable:                     registerSchedulable,      dnsConfigurer:                           dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),      serviceLister:                           serviceLister,      nodeInfo:                                nodeInfo,      masterServiceNamespace:                  masterServiceNamespace,      streamingConnectionIdleTimeout:          kubeCfg.StreamingConnectionIdleTimeout.Duration,      recorder:                                kubeDeps.Recorder,      cadvisor:                                kubeDeps.CAdvisorInterface,      cloud:                                   kubeDeps.Cloud,      externalCloudProvider:                   cloudprovider.IsExternal(cloudProvider),      providerID:                              providerID,      nodeRef:                                 nodeRef,      nodeLabels:                              nodeLabels,      nodeStatusUpdateFrequency:               kubeCfg.NodeStatusUpdateFrequency.Duration,      nodeStatusReportFrequency:               kubeCfg.NodeStatusReportFrequency.Duration,      os:                                      kubeDeps.OSInterface,      oomWatcher:                              oomWatcher,      cgroupsPerQOS:                           kubeCfg.CgroupsPerQOS,      cgroupRoot:                              kubeCfg.CgroupRoot,      mounter:                                 kubeDeps.Mounter,      maxPods:                                 int(kubeCfg.MaxPods),      podsPerCore:                             int(kubeCfg.PodsPerCore),      syncLoopMonitor:                         atomic.Value{},      daemonEndpoints:                         daemonEndpoints,      containerManager:                        kubeDeps.ContainerManager,      containerRuntimeName:                    containerRuntime,      redirectContainerStreaming:              crOptions.RedirectContainerStreaming,      nodeIP:                                  parsedNodeIP,      nodeIPValidator:                         validateNodeIP,      clock:                                   clock.RealClock{},      enableControllerAttachDetach:            kubeCfg.EnableControllerAttachDetach,      iptClient:                               utilipt.New(utilexec.New(), utildbus.New(), protocol),      makeIPTablesUtilChains:                  kubeCfg.MakeIPTablesUtilChains,      iptablesMasqueradeBit:                   int(kubeCfg.IPTablesMasqueradeBit),      iptablesDropBit:                         int(kubeCfg.IPTablesDropBit),      experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),      keepTerminatedPodVolumes:                keepTerminatedPodVolumes,      nodeStatusMaxImages:                     nodeStatusMaxImages,      enablePluginsWatcher:                    utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher),   }   if klet.cloud != nil {      klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)   }   var secretManager secret.Manager   var configMapManager configmap.Manager   switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {   case kubeletconfiginternal.WatchChangeDetectionStrategy:      secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)      configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)   case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:      secretManager = secret.NewCachingSecretManager(         kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))      configMapManager = configmap.NewCachingConfigMapManager(         kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))   case kubeletconfiginternal.GetChangeDetectionStrategy:      secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)      configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)   default:      return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)   }   klet.secretManager = secretManager   klet.configMapManager = configMapManager   if klet.experimentalHostUserNamespaceDefaulting {      klog.Infof("Experimental host user namespace defaulting is enabled.")   }   machineInfo, err := klet.cadvisor.MachineInfo()   if err != nil {      return nil, err   }   klet.machineInfo = machineInfo   imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)   klet.livenessManager = proberesults.NewManager()   ...   // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.   klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)   klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)   if remoteRuntimeEndpoint != "" {      // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified      if remoteImageEndpoint == "" {         remoteImageEndpoint = remoteRuntimeEndpoint      }   }   ...   switch containerRuntime {   case kubetypes.DockerContainerRuntime:      // Create and start the CRI shim running as a grpc server.      streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)      ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,         &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)      if err != nil {         return nil, err      }      if crOptions.RedirectContainerStreaming {         klet.criHandler = ds      }      // The unix socket for kubelet <-> dockershim communication.      klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",         remoteRuntimeEndpoint,         remoteImageEndpoint)      klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")      server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)      if err := server.Start(); err != nil {         return nil, err      }      // Create dockerLegacyService when the logging driver is not supported.      supported, err := ds.IsCRISupportedLogDriver()      if err != nil {         return nil, err      }      if !supported {         klet.dockerLegacyService = ds         legacyLogProvider = ds      }   case kubetypes.RemoteContainerRuntime:      // No-op.      break   default:      return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)   }   runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)   if err != nil {      return nil, err   }   klet.runtimeService = runtimeService   if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.DynamicKubeClient != nil {      klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.DynamicKubeClient)   }   runtime, err := kuberuntime.NewKubeGenericRuntimeManager(      kubecontainer.FilterEventRecorder(kubeDeps.Recorder),      klet.livenessManager,      seccompProfileRoot,      containerRefManager,      machineInfo,      klet,      kubeDeps.OSInterface,      klet,      httpClient,      imageBackOff,      kubeCfg.SerializeImagePulls,      float32(kubeCfg.RegistryPullQPS),      int(kubeCfg.RegistryBurst),      kubeCfg.CPUCFSQuota,      kubeCfg.CPUCFSQuotaPeriod,      runtimeService,      imageService,      kubeDeps.ContainerManager.InternalContainerLifecycle(),      legacyLogProvider,      klet.runtimeClassManager,   )   if err != nil {      return nil, err   }   klet.containerRuntime = runtime   klet.streamingRuntime = runtime   klet.runner = runtime   runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)   if err != nil {      return nil, err   }   klet.runtimeCache = runtimeCache   if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {      klet.StatsProvider = stats.NewCadvisorStatsProvider(         klet.cadvisor,         klet.resourceAnalyzer,         klet.podManager,         klet.runtimeCache,         klet.containerRuntime,         klet.statusManager)   } else {      klet.StatsProvider = stats.NewCRIStatsProvider(         klet.cadvisor,         klet.resourceAnalyzer,         klet.podManager,         klet.runtimeCache,         runtimeService,         imageService,         stats.NewLogMetricsService())   }   klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})   klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)   klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)   if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {      klog.Errorf("Pod CIDR update failed %v", err)   }   // setup containerGC   containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)   if err != nil {      return nil, err   }   klet.containerGC = containerGC   klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))   // setup imageManager   imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)   if err != nil {      return nil, fmt.Errorf("failed to initialize image manager: %v", err)   }   klet.imageManager = imageManager   ...   klet.probeManager = prober.NewManager(      klet.statusManager,      klet.livenessManager,      klet.runner,      containerRefManager,      kubeDeps.Recorder)   tokenManager := token.NewManager(kubeDeps.KubeClient)   if !utilfeature.DefaultFeatureGate.Enabled(features.MountPropagation) {      return nil, fmt.Errorf("mount propagation feature gate has been deprecated and will be removed in 1.14")   }   klet.volumePluginMgr, err =      NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)   if err != nil {      return nil, err   }   if klet.enablePluginsWatcher {      klet.pluginWatcher = pluginwatcher.NewWatcher(         klet.getPluginsRegistrationDir(), /* sockDir */         klet.getPluginsDir(),             /* deprecatedSockDir */      )   }   ...   // setup volumeManager   klet.volumeManager = volumemanager.NewVolumeManager(      kubeCfg.EnableControllerAttachDetach,      nodeName,      klet.podManager,      klet.statusManager,      klet.kubeClient,      klet.volumePluginMgr,      klet.containerRuntime,      kubeDeps.Mounter,      klet.getPodsDir(),      kubeDeps.Recorder,      experimentalCheckNodeCapabilitiesBeforeMount,      keepTerminatedPodVolumes)   ...   // Generating the status funcs should be the last thing we do,   // since this relies on the rest of the Kubelet having been constructed.   klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()   return klet, nil}

方法非常长,只贴出一部分,但是很重要。主要做了以下几件事:

(1)为kubelet加载各种配置,比如pod信息源、垃圾回收相关配置、监听的端口等。其中的podConfig引申一下,它是pod的三种信息来源的聚合(文件、URL和API Server)。进入makePodSourceConfig方法即可发现这一点。kubelet可以通过这三条途径获取pod的信息。

(2)根据前面的一系列配置,创建一个kubelet实例。可以看到kubelet实例的元素有数十个之多;

(3)创建manager等配置项,完善kubelet实例。manager用于管理pod运行时所需要加载的资源。如servicemanager和configmapmanager就是管理Pod运行时所需的secret和configmap的。注意这里有对于secret和configmap的不同的更新策略的选择;

(4)判断容器运行时。可见,rkt已经废弃,可用的就是docker和其他。如果是docker,则创建一个docker shim server并运行。这一块很重要,后面详细分析;

(5)继续包装这个kubelet实例,为实例添加runtimemanager、垃圾回收、imagemanager、volumemanager等等各种组件。其中有的组件值得仔细研究,后面会涉及到;

(6)将NodeStatusFunc加入kubelet实例。这一步通过调用defaultNodeStatusFuncs方法实现。

进入这一方法:

pkg/kubelet/kubelet_node_status.go// defaultNodeStatusFuncs is a factory that generates the default set of// setNodeStatus funcsfunc (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {    // if cloud is not nil, we expect the cloud resource sync manager to exist    var nodeAddressesFunc func() ([]v1.NodeAddress, error)    if kl.cloud != nil {        nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses    }    var validateHostFunc func() error    if kl.appArmorValidator != nil {        validateHostFunc = kl.appArmorValidator.ValidateHost    }    var setters []func(n *v1.Node) error    setters = append(setters,        nodestatus.NodeAddress(...),        nodestatus.MachineInfo(...),        nodestatus.VersionInfo(...),        nodestatus.DaemonEndpoints(kl.daemonEndpoints),        nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),        nodestatus.GoRuntime(),    )    if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {        setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))    }    setters = append(setters,        nodestatus.MemoryPressureCondition(...),        nodestatus.DiskPressureCondition(...),        nodestatus.PIDPressureCondition(...),        nodestatus.ReadyCondition(...),        nodestatus.VolumesInUse(...),        kl.recordNodeSchedulableEvent,    )    return setters}

可以看到方法将维护宿主节点运行状态相关的函数都加入进一个setters数组,并返回。这些函数,包括地址、内存、磁盘、存储卷等,维护了宿主机的信息和状态。

下面我们首先看一下docker shim server相关的内容。

二、DockerServer

前面提到,在NewMainKubelet方法中会判断容器运行时,对于docker的容器运行时进行一系列操作。

具体说来,先创建一个DockerService对象(通过NewDockerService方法)。这个对象中包含了一个CRI shim运行时需要包含的方法集合。根据这个对象创建一个DockerServer(通过NewDockerServer方法)结构体实例,然后执行server.Start方法运行这个实例。

看一下DockerServer结构体:

pkg/kubelet/dockershim/remote/docker_server.go// DockerServer is the grpc server of dockershim.type DockerServer struct {    // endpoint is the endpoint to serve on.    endpoint string    // service is the docker service which implements runtime and image services.    service dockershim.CRIService    // server is the grpc server.    server *grpc.Server}

可以看出,结构体包含了这个grpc服务运行的端点、包含的CRIService、以及服务自身。

CRIService是一个接口,进入接口可以看到:

pkg/kubelet/dockershim/docker_service.go// CRIService includes all methods necessary for a CRI server.type CRIService interface {   runtimeapi.RuntimeServiceServer   runtimeapi.ImageServiceServer   Start() error}

对于一个CRI,需要包含RuntimeServiceServer和ImageServiceServer两大类方法,分别用于操作容器和镜像。

进入RuntimeServiceServer接口,可以看到接口定义了容器的启动、停止、删除、状态、进入等一系列容器级别的操作,以及对pod sandbox的pod级别的操作。

而进入ImageServiceServer接口,则可以看到镜像的列举、状态、拉取、删除、信息五条操作。

也就是说,只要我们能够将这两个接口中定义的全部方法都实现,我们也可以定义自己的CRI。这就是kubelet与CRI的解耦。用图片来表示,就是这样:

kubelet通过grpc协议调用grpc server中定义的一系列方法,实现与CRI的交互,对容器和镜像进行具体操作。

事实上,由于docker是k8s默认的CRI,如上所说,启动docker shim的过程都写在了k8s的源码中了。同样的,docker CRI的RuntimeServiceServer和ImageServiceServer两大类方法也都在k8s源码中定义好了,具体位置在pkg/kubelet/dockershim包中,这里就不一一列举了。剩余部分在下一篇继续讲述。

转载于:https://www.cnblogs.com/00986014w/p/10895532.html

你可能感兴趣的文章
log
查看>>
663 如何做“低端”产品?(如何把低端做得高端 - 认同感)
查看>>
JDBC 第九课 —— 初次接触 JUnit
查看>>
Windows核心编程:第10章 同步设备IO与异步设备IO
查看>>
浏览器加载、解析、渲染的过程
查看>>
开放api接口签名验证
查看>>
sed 常用操作纪实
查看>>
C++复习:对C的拓展
查看>>
校外实习报告(九)
查看>>
android之android.intent.category.DEFAULT的用途和使用
查看>>
CAGradientLayer 透明渐变注意地方(原创)
查看>>
织梦DEDE多选项筛选_联动筛选功能的实现_二次开发
查看>>
iOS关于RunLoop和Timer
查看>>
SQL处理层次型数据的策略对比:Adjacency list vs. nested sets: MySQL【转载】
查看>>
已存在同名的数据库,或指定的文件无法打开或位于 UNC 共享目录中。
查看>>
MySQL的随机数函数rand()的使用技巧
查看>>
thymeleaf+bootstrap,onclick传参实现模态框中遇到的错误
查看>>
python字符串实战
查看>>
wyh的物品(二分)
查看>>
12: xlrd 处理Excel文件
查看>>