Kubeedge:Metamanager源码速读(不定期更新)

Kubeedge源码版本:v1.15.1

在看Metamanager之前,先看一下Metamanager源码的目录结构(位于edge/pkg下)和官方文档:

目录结构如下面的两张图所示。请忽略绿色的文件高亮,这是Jetbrains goland对未提交修改的文件的标记。
在这里插入图片描述
在这里插入图片描述
然后简单地看一下官方文档。

官方文档宣称,Metamanager做的事情主要包括两类,分别是Edged和Edgehub之间的消息中介处理和消息持久化。

此外:

  • Metamanager抽象了一系列客户端接口,允许edged封装资源的变更信息并且发给Metamanager
  • Metamanager开启了一个HTTPServer用来接受k8s API的直连。

Metamanager基于下面的这些操作收发不同种类的message:

Insert
Update
Delete
Query
Response
NodeConnection
MetaSync

其中:

Insert操作(比如,新增一个pod)的主要流程图如下:
在这里插入图片描述

Update操作(比如云端给pod追加标签,或者edged检测到了pod的变更,向云上汇报)的主要流程图如下,根据变更的来源不同,有两种信息流动的流程:
在这里插入图片描述

Delete操作主要流程图如下:
在这里插入图片描述
在delete操作中,云端下达指令,edgehub转发,metamng会先把边缘里的数据删掉,然后把指令下发给edged。

Query操作主要允许edged查询本地的数据库缓存和云上(比如configmap/secret)的etcd。消息源可以拆成3个part(resKey/resType/resId),主要流程如下:
在这里插入图片描述

Response操作:就是上面那些图片里的请求对应的相应。

NodeConnection操作:edgehub会发送向边缘组件广播当前边缘节点的连接状态——告知云边是否连接。metamanager会在内存里维护这个信息,用于特定的操作(比如向云发送query)。

MetaSync操作:定期同步edge上pod的状态。

下面考察Metamanager的源码。从Start函数开始:

func (m *metaManager) Start() {
	if metaserverconfig.Config.Enable {
		imitator.StorageInit() // 初始化资源版本(RV)
		go metaserver.NewMetaServer().Start(beehiveContext.Done())
	}

	m.runMetaManager()
}

StorageInit做的事情主要是初始化RV。从边缘数据库metav2表里拿到最新的资源版本。

// StorageInit must be called before using imitator storage (run metaserver or metamanager)
func StorageInit() {
	m := new(v2.MetaV2)
	// get the most recent record as the init resource version
	_, err := dbm.DBAccess.QueryTable(v2.NewMetaTableName).OrderBy("-" + v2.RV).Limit(1).All(m)
	utilruntime.Must(err)
	DefaultV2Client.SetRevision(m.ResourceVersion)
}

然后,根据指定的配置生成一个metaserver:

func NewMetaServer() *MetaServer {
	ls := MetaServer{
		HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
		LongRunningFunc:       genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
		NegotiatedSerializer:  serializer.NewNegotiatedSerializer(),
		Factory:               handlerfactory.NewFactory(),
		Auth:                  buildAuth(),
	}
	return &ls
}

最后Start:

func (ls *MetaServer) Start(stopChan <-chan struct{}) {
	if kefeatures.DefaultFeatureGate.Enabled(kefeatures.RequireAuthorization) {
		ls.startHTTPSServer(stopChan)
	} else {
		ls.startHTTPServer(stopChan)
	}
}

HTTPSServer主要就是增加了openssl x509的那些密钥,用于构建安全的HTTP服务器。

为代码分析方便起见,我们忽略安全性部分,看一下HTTPServer里干了什么事情:

主要是指定了Handler,用于处理云端APIServer直连时收发的信息,最后启动了一个http服务器。

func (ls *MetaServer) startHTTPServer(stopChan <-chan struct{}) {
	h := ls.BuildBasicHandler()
	h = BuildHandlerChain(h, ls)
	s := http.Server{
		Addr:    metaserverconfig.Config.Server,
		Handler: h,
	}

	go func() { // 用于server的退出
		<-stopChan

		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // 把退出的消息往下传
		defer cancel()
		if err := s.Shutdown(ctx); err != nil {
			klog.Errorf("Server shutdown failed: %s", err)
		}
	}()

	klog.Infof("[metaserver]start to listen and server at http://%v", s.Addr)
	utilruntime.HandleError(s.ListenAndServe())
	// When the MetaServer stops abnormally, other module services are stopped at the same time.
	beehiveContext.Cancel()
}

具体的handler函数逻辑见下:

func (ls *MetaServer) BuildBasicHandler() http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		ctx := req.Context()
		reqInfo, ok := apirequest.RequestInfoFrom(ctx)
		//klog.Infof("[metaserver]get a req(%v)(%v)", reqInfo.Path, reqInfo.Verb)
		//klog.Infof("[metaserver]get a req(\nPath:%v; \nVerb:%v; \nHeader:%+v)", reqInfo.Path, reqInfo.Verb, req.Header)
		if !ok {
			err := fmt.Errorf("invalid request")
			responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
			return
		}

		if reqInfo.IsResourceRequest {
			switch {
			case reqInfo.Verb == "get":
				ls.Factory.Get().ServeHTTP(w, req)
			case reqInfo.Verb == "list", reqInfo.Verb == "watch":
				ls.Factory.List().ServeHTTP(w, req)
			case reqInfo.Verb == "create":
				ls.Factory.Create(reqInfo).ServeHTTP(w, req)
			case reqInfo.Verb == "delete":
				ls.Factory.Delete().ServeHTTP(w, req)
			case reqInfo.Verb == "update":
				ls.Factory.Update(reqInfo).ServeHTTP(w, req)
			case reqInfo.Verb == "patch":
				ls.Factory.Patch(reqInfo).ServeHTTP(w, req)
			default:
				err := fmt.Errorf("unsupported req verb")
				responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
			}
			return
		}

		if passthrough.IsPassThroughPath(reqInfo.Path, reqInfo.Verb) {
			ls.Factory.PassThrough().ServeHTTP(w, req)
			return
		}

		err := fmt.Errorf("request[%s::%s] isn't supported", reqInfo.Path, reqInfo.Verb)
		responsewriters.ErrorNegotiated(errors.NewInternalError(err), ls.NegotiatedSerializer, schema.GroupVersion{}, w, req)
	})
}

最后,在Start函数里执行runMetaManager()

func (m *metaManager) runMetaManager() {
	go func() {
		for {
			select {
			case <-beehiveContext.Done():
				klog.Warning("MetaManager main loop stop")
				return
			default:
			}
			msg, err := beehiveContext.Receive(m.Name())
			if err != nil {
				klog.Errorf("get a message %+v: %v", msg, err)
				continue
			}
			klog.V(2).Infof("get a message %+v", msg)
			m.process(msg)
		}
	}()
}

主要的逻辑就是不断地从beehive框架那里拿一个message然后进行处理。但是处理的过程比较长。重点是这个process函数:

func (m *metaManager) process(message model.Message) {
	operation := message.GetOperation()

	switch operation {
	case model.InsertOperation:
		m.processInsert(message)
	case model.UpdateOperation:
		m.processUpdate(message)
	case model.PatchOperation:
		m.processPatch(message)
	case model.DeleteOperation:
		m.processDelete(message)
	case model.QueryOperation:
		m.processQuery(message)
	case model.ResponseOperation:
		m.processResponse(message)
	case constants.CSIOperationTypeCreateVolume,
		constants.CSIOperationTypeDeleteVolume,
		constants.CSIOperationTypeControllerPublishVolume,
		constants.CSIOperationTypeControllerUnpublishVolume:
		m.processVolume(message)
	default:
		klog.Errorf("metamanager not supported operation: %v", operation)
	}
}

可以看到,process函数会处理Insert、Update、Patch、Delete、Query、Response信息,以及处理和卷相关的信息。

我们以m.processUpdate(message)为例考察process的处理逻辑。主要是看kubeedge的源码,跳过migration的部分:

func (m *metaManager) processUpdate(message model.Message) {
	imitator.DefaultV2Client.Inject(message)

	msgSource := message.GetSource()
	_, resType, _ := parseResource(&message)
	if msgSource == modules.EdgedModuleName && resType == model.ResourceTypeLease {
		// 来自于edged的消息(需要转发到云上)并且type为lease(用于节点心跳)
		// edged里的kubelet会定时地向云端发送心跳信息,但是在边缘设备里需要对消息做更进一步的包裹
		if !connect.IsConnected() { // 云边断连就直接返回
			klog.Warningf("process remote failed, req[%s], err: %v", msgDebugInfo(&message), errNotConnected)
			feedbackError(fmt.Errorf("failed to process remote: %s", errNotConnected), message) // 把错误消息返回给edged
			return
		}
		m.processRemote(message) // 这个函数相当于由metamng代替edged发送消息,并且处理自动回复,当然processRemote的功能不止于此
		return
	}

	// 如果不是“edged的心跳信息”,比如是pod的更新信息,那么咱们自己处理一手
	if err := m.handleMessage(&message); err != nil { // 拿到消息后先经过m.handleMessage函数记录到db中
		feedbackError(err, message)
		return
	}

	// 根据edged的模块名,自行决定转发路径
	switch msgSource {
	case modules.EdgedModuleName:
		// For pod status update message, we need to wait for the response message
		// to ensure that the pod status is correctly reported to the kube-apiserver
		sendToCloud(&message)
		resp := message.NewRespByMessage(&message, OK)
		sendToEdged(resp, message.IsSync())
	case cloudmodules.EdgeControllerModuleName, cloudmodules.DynamicControllerModuleName:
		sendToEdged(&message, message.IsSync())
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
	case cloudmodules.DeviceControllerModuleName:
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
		message.SetRoute(modules.MetaGroup, modules.DeviceTwinModuleName)
		beehiveContext.Send(modules.DeviceTwinModuleName, message)
	case cloudmodules.PolicyControllerModuleName:
		resp := message.NewRespByMessage(&message, OK)
		sendToCloud(resp)
	default:
		klog.Errorf("unsupport message source, %s", msgSource)
	}
}

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-26 14:18:05       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-26 14:18:05       106 阅读
  3. 在Django里面运行非项目文件

    2024-04-26 14:18:05       87 阅读
  4. Python语言-面向对象

    2024-04-26 14:18:05       96 阅读

热门阅读

  1. 市政行业乙级资质改革对公共交通工程的影响

    2024-04-26 14:18:05       29 阅读
  2. 商业认证项目表

    2024-04-26 14:18:05       35 阅读
  3. Leetcode 5.最长回文子串

    2024-04-26 14:18:05       38 阅读
  4. 自动驾驶---OpenSpace之Hybrid A*规划算法

    2024-04-26 14:18:05       38 阅读
  5. word 第十四课

    2024-04-26 14:18:05       29 阅读
  6. IOS恢复

    IOS恢复

    2024-04-26 14:18:05      37 阅读