Kubelet的PodManager模块是负责管理存储和访问Pod, 维护静态Pod和Mirror Pod的Mapping关系。
从非API Server创建出来的Pod都叫静态Pod。静态Pod是只会绑定到一个特定的node上。
如果Pod的Anntation的"kubernetes.io/config.source"
的值不等于"api"
就说明是静态Pod。
// 代码位置 pkg/kubelet/types/pod_update.go
func IsStaticPod(pod *v1.Pod) bool {
source, err := GetPodSource(pod)
return err == nil && source != ApiserverSource
}
如何创建静态Pod呢?我们可以查询我们节点Kubelet的配置静态Pod的目录staticPodPath
对应的目录,我本地的目录是 /etc/kubernetes/manifests
, 因此我在该目录下创建以下文件, 即可被kubelet创建出来。 同样的,如果我们要删除静态Pod, 就删除我们创建的/etc/kubernetes/manifests/static-web.yaml文件即可。
cat <<EOF >/etc/kubernetes/manifests/static-web.yaml
apiVersion: v1
kind: Pod
metadata:
name: static-web
spec:
containers:
- name: web
image: nginx
ports:
- name: web
containerPort: 80
protocol: TCP
EOF
静态Pod是不能被API Server来控制的, 为了让用户或者Kubelet通过API Server来管理静态Pod, Kubelet会创建一个叫Mirror Pod,Mirror Pod其实跟Static Pod是一一对应的。
如何查询一个Pod是否是Mirror Pod呢,Mirror Pod是带了"kubernetes.io/config.mirror"
的Annotation
// 代码位置 pkg/kubelet/types/pod_update.go
func IsMirrorPod(pod *v1.Pod) bool {
_, ok := pod.Annotations[ConfigMirrorAnnotationKey]
return ok
}
来看一个具体的Static Pod例子, Annotation里面会自动带上kubernetes.io/config.mirror
这个key:
apiVersion: v1
kind: Pod
metadata:
annotations:
kubeadm.kubernetes.io/kube-apiserver.advertise-address.endpoint: xx
kubernetes.io/config.hash: xx
kubernetes.io/config.mirror: xx
kubernetes.io/config.seen: "2021-01-18T04:27:29.959402482+01:00"
kubernetes.io/config.source: file
creationTimestamp: "2021-01-18T03:28:55Z"
labels:
component: kube-apiserver
tier: control-plane
name: kube-apiserver-xx
namespace: kube-system
spec:
containers:
- command:
- kube-apiserver
- --advertise-address=...
查询Pod的Annotation, key是"kubernetes.io/config.source"
的值对应就是这个Pod的来源
// 代码位置 pkg/kubelet/types/pod_update.go
func GetPodSource(pod *v1.Pod) (string, error) {
if pod.Annotations != nil {
if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok {
return source, nil
}
}
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
}
通过一个Mirror Pod获取它对应的Static Pod,首先查询Mirror Pod的FullName, 然后查询basicManager
对象的podByFullName
map可得。
Pod的Full Name其实就是 pod.Name + "_" + pod.Namespace
// 代码位置 pkg/kubelet/pod/pod_manager.go
func (pm *basicManager) GetPodByMirrorPod(mirrorPod *v1.Pod) (*v1.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)]
return pod, ok
}
Kubelet
在实例化的时候,会调用NewBasicMirrorClient
和 NewBasicPodManager
去实例化podManager。
// 代码位置 pkg/kubelet/kubelet.go
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,...){
...
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager, checkpointManager)
}
实例化的时候,传入了secretManager
和configMapManager
,这是因为当Pod需要使用secret
或者configmap
的时候,需要Podmanager创建对应的监听,监听资源的变化来写入缓存
// 代码位置 pkg/kubelet/pod/pod_manager.go
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager, cpm checkpointmanager.CheckpointManager) Manager {
pm := &basicManager{}
pm.secretManager = secretManager
pm.configMapManager = configMapManager
pm.checkpointManager = cpm
pm.MirrorClient = client
pm.SetPods(nil)
return pm
}
Kubelet有三种Pod来源,分别是File, http和API Server。
// 代码位置 pkg/kubelet/kubelet.go
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
// 添加header
manifestURLHeader.Add(k, v[i])
}
}
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// 定义 StaticPodPath 目录
if kubeCfg.StaticPodPath != "" {
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// 定义 http 来源URL
if kubeCfg.StaticPodURL != "" {
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
// Restore from the checkpoint path
// NOTE: This MUST happen before creating the apiserver source
// below, or the checkpoint would override the source of truth.
var updatechannel chan<- interface{}
if bootstrapCheckpointPath != "" {
klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
if err != nil {
return nil, err
}
}
// 添加API Server为第三种source
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
return cfg, nil
}
fiile模式主要是用于kubelet
对staticPodPath
这个目录的监控,只要在这个目录下有任何文件(点开头的文件除外)的增删改, 就将事件写入channel中。
// 代码位置 pkg/kubelet/config/file.go
type sourceFile struct {
// 静态Pod的地址
path string
// 节点name
nodeName types.NodeName
// 更新周期
period time.Duration
// 缓存
store cache.Store
fileKeyMapping map[string]string
// 保存事件(Pod需要增加还是更新还是删除)的channel
updates chan<- interface{}
// 事件的数据结构,包括fileName和podEventType
watchEvents chan *watchEvent
}
我们可以看看file 的run 工作流程如下:
- 新启动一个go协程
// 代码位置 pkg/kubelet/config/file.go
func (s *sourceFile) run() {
listTicker := time.NewTicker(s.period)
go func() {
// Read path immediately to speed up startup.
if err := s.listConfig(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
}
for {
select {
case <-listTicker.C:
if err := s.listConfig(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
}
case e := <-s.watchEvents:
if err := s.consumeWatchEvent(e); err != nil {
klog.Errorf("Unable to process watch event: %v", err)
}
}
}
}()
s.startWatch()
}
工作流程如下:
开启一个go协程永久执行如下事项:
- 如果正在执行watch 则直接返回
- 执行doWatch(),如果返回错误不为空,则判断错误是否可以retry解决,如果是则执行retry
- 使用
fsnotify
监控配置的静态Pod的目录的任何改动(点开头的文件忽略)。- 如果是创建文件,设置
eventType
为podAdd
- 如果是改动文件或者改动权限,设置
eventType
为podModify
- 如果是重命名文件或者删除文件,设置
eventType
为podDelete
- 如果是创建文件,设置
- 将
event
和eventType
写入watchEvents
channel 中
- 使用
// 代码位置 pkg/kubelet/config/file_linux.go
func (s *sourceFile) startWatch() {
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
backOffID := "watch"
go wait.Forever(func() {
if backOff.IsInBackOffSinceUpdate(backOffID, time.Now()) {
return
}
if err := s.doWatch(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
if _, retryable := err.(*retryableError); !retryable {
backOff.Next(backOffID, time.Now())
}
}
}, retryPeriod)
}
func (s *sourceFile) doWatch() error {
_, err := os.Stat(s.path)
if err != nil {
if !os.IsNotExist(err) {
return err
}
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
return &retryableError{"path does not exist, ignoring"}
}
w, err := fsnotify.NewWatcher()
...
err = w.Add(s.path)
...
for {
select {
case event := <-w.Events:
if err = s.produceWatchEvent(&event); err != nil {
return fmt.Errorf("error while processing inotify event (%+v): %v", event, err)
}
case err = <-w.Errors:
return fmt.Errorf("error while watching %q: %v", s.path, err)
}
}
}
func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
// Ignore file start with dots
if strings.HasPrefix(filepath.Base(e.Name), ".") {
klog.V(4).Infof("Ignored pod manifest: %s, because it starts with dots", e.Name)
return nil
}
var eventType podEventType
switch {
case (e.Op & fsnotify.Create) > 0:
eventType = podAdd
case (e.Op & fsnotify.Write) > 0:
eventType = podModify
case (e.Op & fsnotify.Chmod) > 0:
eventType = podModify
case (e.Op & fsnotify.Remove) > 0:
eventType = podDelete
case (e.Op & fsnotify.Rename) > 0:
eventType = podDelete
default:
// Ignore rest events
return nil
}
s.watchEvents <- &watchEvent{e.Name, eventType}
return nil
}
这里的消费者只是放入缓存,真正的消费者,仍然是背后的syncLoop
。
消费者的工作流程如下:
eventType
属于增加/更改的情况下:
1. 提取文件内容给Pod对象,将此对象增加到sourceFile(client-go)的Store缓存
eventType
属于删除的情况下:
- 获取文件名objKey,从缓存中获取这个Key的Pod对象,如果Pod不存在就直接返回,否则从缓存里面删除该对象
func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
switch e.eventType {
case podAdd, podModify:
pod, err := s.extractFromFile(e.fileName)
if err != nil {
return fmt.Errorf("can't process config file %q: %v", e.fileName, err)
}
return s.store.Add(pod)
case podDelete:
if objKey, keyExist := s.fileKeyMapping[e.fileName]; keyExist {
pod, podExist, err := s.store.GetByKey(objKey)
if err != nil {
return err
} else if !podExist {
return fmt.Errorf("the pod with key %s doesn't exist in cache", objKey)
} else {
if err = s.store.Delete(pod); err != nil {
return fmt.Errorf("failed to remove deleted pod from cache: %v", err)
}
delete(s.fileKeyMapping, e.fileName)
}
}
}
return nil
}
http模式跟file模式类似,也是kubelet定期去扫描,不一样的是,http模式是定期到manifest-url
参数配置定期下载指定文件,并且转行成yaml格式的Pod文件,再去创建静态Pod。
(通过kubelet的启动参数–manifest-url
–manifest-url-header
来定义manifest url。 )
举个例子,我们创建以下yaml 文件,并且保存在一个我们自己的web服务上,生成一个URL
apiVersion: v1
kind: Pod
metadata:
name: http-web
labels:
role: myrole
spec:
containers:
- name: web
image: nginx
ports:
- name: web
containerPort: 80
protocol: TCP
当我们的Kubelet启动的时候带上KUBELET_ARGS="--manifest-url=<manifest-url>"
那么该静态Pod就会被创建。
工作流程如下:
- 使用Get尝试发送http 请求下载文件, 尝试检查response.StatusCode是否是200,以及返回的数据大小
- 比对上次读取的数据有没有发生改变,如果没有改变,则直接返回
- 走到这里,说明数据是发生了改变,尝试decode是否是一个Pod,检验数据是否是Pod格式,如果没有错误,则将Pod数据和Pod source:
kubetypes.HTTPSource
写入到channel中。 - 尝试decode 是否是多个Pod, 如果是,则轮询将数据写入pods中,然后写入到channel中。
// 代码位置 pkg/kubelet/config/http.go
func (s *sourceURL) extractFromURL() error {
req, err := http.NewRequest("GET", s.url, nil)
req.Header = s.header
resp, err := s.client.Do(req)
defer resp.Body.Close()
data, err := utilio.ReadAtMost(resp.Body, maxConfigLength)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%v: %v", s.url, resp.Status)
}
if len(data) == 0 {
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return fmt.Errorf("zero-length data received from %v", s.url)
}
// 比对上次读取的数据有没有发生改变,如果没有改变,则直接返回
if bytes.Equal(data, s.data) {
return nil
}
s.data = data
// 尝试decode是否是一个Pod,检验数据是否是Pod格式
parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
if parsed {
if singlePodErr != nil {
// It parsed but could not be used.
return singlePodErr
}
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return nil
}
// 尝试decode 是否是多个Pod, 如果是,则轮询将数据写入pods中,然后写入到channel中
parsed, podList, multiPodErr := tryDecodePodList(data, s.applyDefaults)
if parsed {
if multiPodErr != nil {
// It parsed but could not be used.
return multiPodErr
}
pods := make([]*v1.Pod, 0)
for i := range podList.Items {
pods = append(pods, &podList.Items[i])
}
s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.HTTPSource}
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as "+
"single (%v) or multiple pods (%v)",
s.url, string(data), singlePodErr, multiPodErr)
}
这是我们最常用的Pod创建方法,也就是用户通过kubectl apply -f
的方法将数据传给API Server, 经过调度器选出节点后,节点监听到是本机则在本机启动该Pod。这种方法创建的Pod不是静态Pod。
主要工作流程是:
- 监听来自API Server的Pod的消息(只监听Pod.nodename跟本节点nodeName一致)
- 将从1收到的消息,把Pod信息和来源,写入到channe中。
// 代码位置 pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
basicManager
是Kubelet下的一个模块,是负责保存Pod的数据,所有在baseManager的字段都是只读,然后被SetPods
、 AddPod
、UpdatePod
、DeletePod
负责调用。
type basicManager struct {
// Protects all internal maps.
lock sync.RWMutex
// Regular pods indexed by UID.
podByUID map[kubetypes.ResolvedPodUID]*v1.Pod
// Mirror pods indexed by UID.
mirrorPodByUID map[kubetypes.MirrorPodUID]*v1.Pod
// Pods indexed by full name for easy access.
podByFullName map[string]*v1.Pod
mirrorPodByFullName map[string]*v1.Pod
// Mirror pod UID to pod UID map.
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
// basicManager is keeping secretManager and configMapManager up-to-date.
secretManager secret.Manager
configMapManager configmap.Manager
checkpointManager checkpointmanager.CheckpointManager
// A mirror pod client to create/delete mirror pods.
MirrorClient
}
// 代码位置 pkg/kubelet/pod/pod_manager.go
func TestGetSetPods(t *testing.T) {
mirrorPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "987654321",
Name: "bar",
Namespace: "default",
Annotations: map[string]string{
kubetypes.ConfigSourceAnnotationKey: "api",
kubetypes.ConfigMirrorAnnotationKey: "mirror",
},
},
}
staticPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "123456789",
Name: "bar",
Namespace: "default",
Annotations: map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"},
},
}
expectedPods := []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
UID: "999999999",
Name: "taco",
Namespace: "default",
Annotations: map[string]string{kubetypes.ConfigSourceAnnotationKey: "api"},
},
},
staticPod,
}
updates := append(expectedPods, mirrorPod)
podManager, _ := newTestManager()
podManager.SetPods(updates)
// Tests that all regular pods are recorded correctly.
actualPods := podManager.GetPods()
if len(actualPods) != len(expectedPods) {
t.Errorf("expected %d pods, got %d pods; expected pods %#v, got pods %#v", len(expectedPods), len(actualPods),
expectedPods, actualPods)
}
for _, expected := range expectedPods {
found := false
for _, actual := range actualPods {
if actual.UID == expected.UID {
if !reflect.DeepEqual(&expected, &actual) {
t.Errorf("pod was recorded incorrectly. expect: %#v, got: %#v", expected, actual)
}
found = true
break
}
}
if !found {
t.Errorf("pod %q was not found in %#v", expected.UID, actualPods)
}
}
// Tests UID translation works as expected. Convert static pod UID for comparison only.
if uid := podManager.TranslatePodUID(mirrorPod.UID); uid != kubetypes.ResolvedPodUID(staticPod.UID) {
t.Errorf("unable to translate UID %q to the static POD's UID %q; %#v",
mirrorPod.UID, staticPod.UID, podManager.mirrorPodByUID)
}
// Test the basic Get methods.
actualPod, ok := podManager.GetPodByFullName("bar_default")
if !ok || !reflect.DeepEqual(actualPod, staticPod) {
t.Errorf("unable to get pod by full name; expected: %#v, got: %#v", staticPod, actualPod)
}
actualPod, ok = podManager.GetPodByName("default", "bar")
if !ok || !reflect.DeepEqual(actualPod, staticPod) {
t.Errorf("unable to get pod by name; expected: %#v, got: %#v", staticPod, actualPod)
}
}
然后podMgmt调用secretManger和configmapManager
然后以下三个都调用 Pod Manager
status Manager
volume Manager
runtime Manager
Kubelet 是从三个来源发现Pod的更新:
- file
- http
- API Server
从非API Server创建出来的Pod都叫静态Pod, 例如我们常见使用kubeadm
启动集群的API Server, Scheduler都是使用静态Pod 被Kubelet启动的。 同样的, API Server是不感知静态Pod的存在的。
为了监控静态Pod, Kubelet会给每一个静态Pod通过API Server去创建一个Mirror Pod。
一个Mirror Pod会跟它对应的static pod有相同pod name跟namespace,Mirror pod的状态总是反映静态pod的实际状态。当static pod被删除时,相关的Mirror pod也将被删除。