kube-ovn自定义vpc

配置自定义vpc

开启自定义vpc网关配置

对于自定义的vpc网关需要configmap配置文件进行开启

  1. ovn-vpc-nat-config中指定了自定义vpc网关的pod使用的镜像,自定义vpc的网关就是一个pod,在pod中通过配置iptables来实现了eip、snat、dnat、fip

  2. 对于ovn-vpc-nat-gw-config指定了启动自定义vpc网关

kind: ConfigMap
apiVersion: v1
metadata:
  name: ovn-vpc-nat-config
  namespace: kube-system
data:
  image: 'docker.io/kubeovn/vpc-nat-gateway:v1.12.4'
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: ovn-vpc-nat-gw-config
  namespace: kube-system
data:
  enable-vpc-nat-gw: 'true'

配置外部子网

下面的配置是创建外部子网

  1. cidrBlock是外部子网的网段

  2. gateway外部子网网关

  3. excludeIps用来排除不可使用的网段,防止分配的eip和外部冲突

  4. NetworkAttachmentDefinition资源用来定义macvlan,macvlan可以将一个物理网卡虚拟成多个虚拟网卡。

  5. 因此每创建一个eip便会根据ens37这个物理网卡创建一个虚拟网卡,配置kubeovn分配的ip地址,并将之放入自定义vpc网关的pod中

apiVersion: kubeovn.io/v1
kind: Subnet
metadata:
  name: ovn-vpc-external-network
spec:
  protocol: IPv4
  provider: ovn-vpc-external-network.kube-system
  cidrBlock: 192.168.100.0/24
  gateway: 192.168.100.1  # IP address of the physical gateway
  excludeIps:
  - 192.168.100.1..192.168.100.220
---
apiVersion: "k8s.cni.cncf.io/v1"
kind: NetworkAttachmentDefinition
metadata:
  name: ovn-vpc-external-network
  namespace: kube-system
spec:
  config: '{
      "cniVersion": "0.3.0",
      "type": "macvlan",
      "master": "ens37",
      "mode": "bridge",
      "ipam": {
        "type": "kube-ovn",
        "server_socket": "/run/openvswitch/kube-ovn-daemon.sock",
        "provider": "ovn-vpc-external-network.kube-system"
      }
    }'

创建自定义vpc

创建一个自定义vpc,每创建一个vpc,kube-ovn会创建一个ovn逻辑路由器,在vpc中定义的staticRoutes会被添加到ovn逻辑路由器上,下面vpc定义的静态路由是将所有的报文的nexthop到10.0.1.254,10.0.1.254是vpc内部子网的一个ip,是vpc的网关ip

kind: Vpc
apiVersion: kubeovn.io/v1
metadata:
  name: test-vpc-1
spec:
  namespaces:
  - ns1
  staticRoutes:
    - cidr: 0.0.0.0/0
      nextHopIP: 10.0.1.254
      policy: policyDst

在自定义vpc下创建一个子网

在自定义vpc下创建了一个子网和一个pod

kind: Subnet
apiVersion: kubeovn.io/v1
metadata:
  name: net1
spec:
  vpc: test-vpc-1
  cidrBlock: 10.0.1.0/24
  protocol: IPv4
  namespaces:
    - ns1
---
apiVersion: v1
kind: Pod
metadata:
  annotations:
    ovn.kubernetes.io/logical_switch: net1
  namespace: ns1
  name: vpc1-pod
spec:
  containers:
    - name: vpc1-pod
      image: docker.io/library/nginx:alpine

创建自定义vpc的网关

这里创建了自定义vpc的网关

  1. vpc指定了这个网关属于test-vpc-1

  2. subnet是test-vpc-1下的一个子网

  3. lanIp是步骤2子网里的一个ip地址,用来作为vpc的网关ip,这个ip必须和test-vpc-1里面的静态路由一致

  4. externalSubnets指定了外部子网,在创建vpc网关的pod时会从这个外部子网里分配一个eip,用来连接外部网络

kind: VpcNatGateway
apiVersion: kubeovn.io/v1
metadata:
  name: gw1
spec:
  vpc: test-vpc-1
  subnet: net1
  lanIp: 10.0.1.254
  externalSubnets:
    - ovn-vpc-external-network

给自定义vpc添加eip

下面配置给自定义vpc网关的pod创建了两个eip,并且配置了snat规则

  1. 一个自定义vpc的pod可以创建多个eip,下面便是创建了两个eip

  2. v4ip表示给这个eip指定的地址

  3. natGwDp表示在哪个网关上添加eip

  4. 在IptablesSnatRule中eip、internalCIDR表示在自定义vpc网关pod里面创建的snat规则,表示添加源10.1.1.0/24的源ip修改为192.168.100.230的iptables规则

kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
  name: eip1
spec:
  v4ip: 192.168.100.230
  natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:
  name: snat01
spec:
  eip: eip1
  internalCIDR: 10.1.1.0/24



---
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
  name: eip2
spec:
  v4ip: 192.168.100.231
  natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:
  name: snat01
spec:
  eip: eip2
  internalCIDR: 10.1.1.0/24

给自定义vpc添加fip

fip表示浮动ip,用来将自定义vpc内部的一个internalIp和eip绑定,这样便可以实现外部网络能够直接访问pod。

  1. 创建eip和上面的操作一致,关键是fip规则

  2. fip规则中指定了eip和internalIp,会在自定义vpc网关的pod里面添加iptables规则,来将eip和internalIp一一绑定

kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
  name: eip3
spec:
  v4ip: 192.168.100.232
  natGwDp: gw1
---
kind: IptablesFIPRule
apiVersion: kubeovn.io/v1
metadata:
  name: fip01
spec:
  eip: eip3
  internalIp: 10.0.1.5

自定义vpc源码分析

开启自定义vpc网关的代码

在上面的配置中需要添加下面的配置才能够使用自定义vpc的网关,下面代码看一下这块配置的作用

kind: ConfigMap
apiVersion: v1
metadata:
  name: ovn-vpc-nat-config
  namespace: kube-system
data:
  image: 'docker.io/kubeovn/vpc-nat-gateway:v1.12.4'
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: ovn-vpc-nat-gw-config
  namespace: kube-system
data:
  enable-vpc-nat-gw: 'true'

  1. resyncVpcNatGwConfig这个函数是定时执行的,会定时检查是否已经创建了ovn-vpc-nat-config和ovn-vpc-nat-gw-config两个configmap文件,从中获取自定义vpc网关使用pod镜像

  2. resyncVpcNatImage中获取配置中的自定义vpc网关的镜像

  3. 当两个配置都被指定后会更新自定义vpc网关的资源,这是因为在配置文件创建之前可能已经创建了自定义vpc网关

// TODO 是定时执行的 判断是否创建vpc网关的configmap
func (c *Controller) resyncVpcNatGwConfig() {
    //TODO 從configmap中獲取ovn-vpc-nat-gw-config
    cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatGatewayConfig)
    if err != nil && !k8serrors.IsNotFound(err) {
       klog.Errorf("failed to get ovn-vpc-nat-gw-config, %v", err)
       return
    }

    if k8serrors.IsNotFound(err) || cm.Data["enable-vpc-nat-gw"] == "false" {
       if vpcNatEnabled == "false" {
          return
       }
       klog.Info("start to clean up vpc nat gateway")
       if err := c.cleanUpVpcNatGw(); err != nil {
          klog.Errorf("failed to clean up vpc nat gateway, %v", err)
          return
       }
       vpcNatEnabled = "false"
       VpcNatCmVersion = ""
       klog.Info("finish clean up vpc nat gateway")
       return
    }
    if vpcNatEnabled == "true" && VpcNatCmVersion == cm.ResourceVersion {
       return
    }
    gws, err := c.vpcNatGatewayLister.List(labels.Everything())
    if err != nil {
       klog.Errorf("failed to get vpc nat gateway, %v", err)
       return
    }
    //TODO 获取vpc网关镜像
    if err = c.resyncVpcNatImage(); err != nil {
       klog.Errorf("failed to resync vpc nat config, err: %v", err)
       return
    }
    vpcNatEnabled = "true"
    VpcNatCmVersion = cm.ResourceVersion
    for _, gw := range gws {
       c.addOrUpdateVpcNatGatewayQueue.Add(gw.Name)
    }
    klog.Info("finish establishing vpc-nat-gateway")
}
// TODO 在部署vpc网关的时候需要创建这个configmap,代码里需要从这个cm中获取镜像
func (c *Controller) resyncVpcNatImage() error {
    cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatConfig)
    if err != nil {
       err = fmt.Errorf("failed to get ovn-vpc-nat-config, %v", err)
       klog.Error(err)
       return err
    }
    image, exist := cm.Data["image"]
    if !exist {
       err = fmt.Errorf("%s should have image field", util.VpcNatConfig)
       klog.Error(err)
       return err
    }
    vpcNatImage = image
    return nil
}

创建自定义vpc代码

kind: Vpc
apiVersion: kubeovn.io/v1
metadata:
  name: test-vpc-1
spec:
  namespaces:
  - ns1
  staticRoutes:
    - cidr: 0.0.0.0/0
      nextHopIP: 10.0.1.254
      policy: policyDst
  1. 对于创建自定义vpc可以添加静态路由和路由策略

  2. ovn会创建一个逻辑交换机

  3. 给逻辑交换机添加静态路由和路由策略

  4. nextHopIp很重要,是网关的ip,kube-ovn会在自定义vpc的逻辑路由器上添加一条将所有流量都引入到10.0.1.254的静态路由,是网关的一个内部ip地址。

func (c *Controller) handleAddOrUpdateVpc(key string) error {
    c.vpcKeyMutex.LockKey(key)
    defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }()
    klog.Infof("handle add/update vpc %s", key)

    // get latest vpc info
    var (
       vpc, cachedVpc *kubeovnv1.Vpc
       err            error
    )

    cachedVpc, err = c.vpcsLister.Get(key)
    if err != nil {
       if k8serrors.IsNotFound(err) {
          return nil
       }
       klog.Error(err)
       return err
    }
    vpc = cachedVpc.DeepCopy()
    //TODO 对创建的vpc做一些默认值设置,并且检查vpc字段是否符合格式
    if err = formatVpc(vpc, c); err != nil {
       klog.Errorf("failed to format vpc %s: %v", key, err)
       return err
    }
    //TODO 给新的vpc创建一个ovn的逻辑路由器,kube-ovn的vpc对应于ovn的逻辑路由器
    if err = c.createVpcRouter(key); err != nil {
       return err
    }

    //TODO 这个应该是在vpc中指定与之相连的另一端,暂时不考虑
    var newPeers []string
    for _, peering := range vpc.Spec.VpcPeerings {
       if err = util.CheckCidrs(peering.LocalConnectIP); err != nil {
          klog.Errorf("invalid cidr %s", peering.LocalConnectIP)
          return err
       }

       newPeers = append(newPeers, peering.RemoteVpc)
       if err := c.OVNNbClient.CreatePeerRouterPort(vpc.Name, peering.RemoteVpc, peering.LocalConnectIP); err != nil {
          klog.Errorf("create peer router port for vpc %s, %v", vpc.Name, err)
          return err
       }
    }
    for _, oldPeer := range vpc.Status.VpcPeerings {
       if !util.ContainsString(newPeers, oldPeer) {
          if err = c.OVNNbClient.DeleteLogicalRouterPort(fmt.Sprintf("%s-%s", vpc.Name, oldPeer)); err != nil {
             klog.Errorf("delete peer router port for vpc %s, %v", vpc.Name, err)
             return err
          }
       }
    }

    // handle static route
    var (
       staticExistedRoutes []*ovnnb.LogicalRouterStaticRoute
       staticTargetRoutes  []*kubeovnv1.StaticRoute
       staticRouteMapping  map[string][]*kubeovnv1.StaticRoute
    )
    //TODO 获取vpc下的静态路由条目
    staticExistedRoutes, err = c.OVNNbClient.ListLogicalRouterStaticRoutes(vpc.Name, nil, nil, "", nil)
    if err != nil {
       klog.Errorf("failed to get vpc %s static route list, %v", vpc.Name, err)
       return err
    }
    //; kind: Vpc
    //; apiVersion: kubeovn.io/v1
    //; metadata:
    //;   name: test-vpc-1
    //; spec:
    //;   staticRoutes:
    //;     - cidr: 0.0.0.0/0
    //;       nextHopIP: 10.0.1.254
    //;       policy: policyDst
    //;     - cidr: 172.31.0.0/24
    //;       nextHopIP: 10.0.1.253
    //;       policy: policySrc
    //;       routeTable: "rtb1"
    //TODO 构建vpc下路由表名和条目的map
    staticRouteMapping = c.getRouteTablesByVpc(vpc)
    //TODO 在vpc下指定的静态路由
    staticTargetRoutes = vpc.Spec.StaticRoutes

    //TODO 对于默认vpc的处理
    if vpc.Name == c.config.ClusterRouter {
       if _, ok := staticRouteMapping[util.MainRouteTable]; !ok {
          staticRouteMapping[util.MainRouteTable] = nil
       }

       joinSubnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
       if err != nil {
          if !k8serrors.IsNotFound(err) {
             klog.Error("failed to get node switch subnet %s: %v", c.config.NodeSwitch)
             return err
          }
       }
       gatewayV4, gatewayV6 := util.SplitStringIP(joinSubnet.Spec.Gateway)
       if gatewayV4 != "" {
          for tabele := range staticRouteMapping {
             staticTargetRoutes = append(
                staticTargetRoutes,
                &kubeovnv1.StaticRoute{
                   Policy:     kubeovnv1.PolicyDst,
                   CIDR:       "0.0.0.0/0",
                   NextHopIP:  gatewayV4,
                   RouteTable: tabele,
                },
             )
          }
       }
       if gatewayV6 != "" {
          for tabele := range staticRouteMapping {
             staticTargetRoutes = append(
                staticTargetRoutes,
                &kubeovnv1.StaticRoute{
                   Policy:     kubeovnv1.PolicyDst,
                   CIDR:       "::/0",
                   NextHopIP:  gatewayV6,
                   RouteTable: tabele,
                },
             )
          }
       }

       if c.config.EnableEipSnat {
          cm, err := c.configMapsLister.ConfigMaps(c.config.ExternalGatewayConfigNS).Get(util.ExternalGatewayConfig)
          if err == nil {
             nextHop := cm.Data["external-gw-addr"]
             if nextHop == "" {
                externalSubnet, err := c.subnetsLister.Get(c.config.ExternalGatewaySwitch)
                if err != nil {
                   klog.Errorf("failed to get subnet %s, %v", c.config.ExternalGatewaySwitch, err)
                   return err
                }
                nextHop = externalSubnet.Spec.Gateway
                if nextHop == "" {
                   klog.Errorf("no available gateway address")
                   return fmt.Errorf("no available gateway address")
                }
             }
             if strings.Contains(nextHop, "/") {
                nextHop = strings.Split(nextHop, "/")[0]
             }

             lr, err := c.OVNNbClient.GetLogicalRouter(vpc.Name, false)
             if err != nil {
                klog.Errorf("failed to get logical router %s: %v", vpc.Name, err)
                return err
             }

             for _, nat := range lr.Nat {
                info, err := c.OVNNbClient.GetNATByUUID(nat)
                if err != nil {
                   klog.Errorf("failed to get nat ip info for vpc %s, %v", vpc.Name, err)
                   return err
                }
                if info.LogicalIP != "" {
                   for table := range staticRouteMapping {
                      staticTargetRoutes = append(
                         staticTargetRoutes,
                         &kubeovnv1.StaticRoute{
                            Policy:     kubeovnv1.PolicySrc,
                            CIDR:       info.LogicalIP,
                            NextHopIP:  nextHop,
                            RouteTable: table,
                         },
                      )
                   }
                }
             }
          }
       }
    }

    //TODO 比较出vpc需要删除和增加的路由
    routeNeedDel, routeNeedAdd, err := diffStaticRoute(staticExistedRoutes, staticTargetRoutes)
    if err != nil {
       klog.Errorf("failed to diff vpc %s static route, %v", vpc.Name, err)
       return err
    }

    //TODO 删除vpc下的路由
    for _, item := range routeNeedDel {
       klog.Infof("vpc %s del static route: %+v", vpc.Name, item)
       policy := convertPolicy(item.Policy)
       if err = c.OVNNbClient.DeleteLogicalRouterStaticRoute(vpc.Name, &item.RouteTable, &policy, item.CIDR, item.NextHopIP); err != nil {
          klog.Errorf("del vpc %s static route failed, %v", vpc.Name, err)
          return err
       }
    }
    //TODO 添加vpc静态路由
    for _, item := range routeNeedAdd {
       if item.BfdID != "" {
          klog.Infof("vpc %s add static ecmp route: %+v", vpc.Name, item)
          if err = c.OVNNbClient.AddLogicalRouterStaticRoute(
             vpc.Name, item.RouteTable, convertPolicy(item.Policy), item.CIDR, &item.BfdID, item.NextHopIP,
          ); err != nil {
             klog.Errorf("failed to add bfd static route to vpc %s , %v", vpc.Name, err)
             return err
          }
       } else {
          klog.Infof("vpc %s add static route: %+v", vpc.Name, item)
          if err = c.OVNNbClient.AddLogicalRouterStaticRoute(
             vpc.Name, item.RouteTable, convertPolicy(item.Policy), item.CIDR, nil, item.NextHopIP,
          ); err != nil {
             klog.Errorf("failed to add normal static route to vpc %s , %v", vpc.Name, err)
             return err
          }
       }
    }

    // handle policy route
    var (
       policyRouteExisted, policyRouteNeedDel, policyRouteNeedAdd []*kubeovnv1.PolicyRoute
       policyRouteLogical                                         []*ovnnb.LogicalRouterPolicy
       externalIDs                                                = map[string]string{"vendor": util.CniTypeName}
    )
    //TODO 比较出需要删除和添加的路由策略
    if vpc.Name == c.config.ClusterRouter {
       policyRouteExisted = reversePolicies(vpc.Annotations[util.VpcLastPolicies])
       // diff list
       policyRouteNeedDel, policyRouteNeedAdd = diffPolicyRouteWithExisted(policyRouteExisted, vpc.Spec.PolicyRoutes)
    } else {
       if vpc.Spec.PolicyRoutes == nil {
          // do not clean default vpc policy routes
          if err = c.OVNNbClient.ClearLogicalRouterPolicy(vpc.Name); err != nil {
             klog.Errorf("clean all vpc %s policy route failed, %v", vpc.Name, err)
             return err
          }
       } else {
          policyRouteLogical, err = c.OVNNbClient.ListLogicalRouterPolicies(vpc.Name, -1, nil)
          if err != nil {
             klog.Errorf("failed to get vpc %s policy route list, %v", vpc.Name, err)
             return err
          }
          // diff vpc policy route
          policyRouteNeedDel, policyRouteNeedAdd = diffPolicyRouteWithLogical(policyRouteLogical, vpc.Spec.PolicyRoutes)
       }
    }
    //TODO 删除VPC路由策略
    // delete policies non-exist
    for _, item := range policyRouteNeedDel {
       klog.Infof("delete policy route for router: %s, priority: %d, match %s", vpc.Name, item.Priority, item.Match)
       if err = c.OVNNbClient.DeleteLogicalRouterPolicy(vpc.Name, item.Priority, item.Match); err != nil {
          klog.Errorf("del vpc %s policy route failed, %v", vpc.Name, err)
          return err
       }
    }
    //TODO 添加vpc路由策略
    // add new policies
    for _, item := range policyRouteNeedAdd {
       klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, item.Match, string(item.Action), item.NextHopIP, externalIDs)
       if err = c.OVNNbClient.AddLogicalRouterPolicy(vpc.Name, item.Priority, item.Match, string(item.Action), []string{item.NextHopIP}, externalIDs); err != nil {
          klog.Errorf("add policy route to vpc %s failed, %v", vpc.Name, err)
          return err
       }
    }

    vpc.Status.Router = key
    vpc.Status.Standby = true
    vpc.Status.VpcPeerings = newPeers
    //TODO 这里的lb是ovn的lb吗?
    if c.config.EnableLb {
       vpcLb, err := c.addLoadBalancer(key)
       if err != nil {
          klog.Error(err)
          return err
       }
       vpc.Status.TCPLoadBalancer = vpcLb.TCPLoadBalancer
       vpc.Status.TCPSessionLoadBalancer = vpcLb.TCPSessLoadBalancer
       vpc.Status.UDPLoadBalancer = vpcLb.UDPLoadBalancer
       vpc.Status.UDPSessionLoadBalancer = vpcLb.UDPSessLoadBalancer
       vpc.Status.SctpLoadBalancer = vpcLb.SctpLoadBalancer
       vpc.Status.SctpSessionLoadBalancer = vpcLb.SctpSessLoadBalancer
    }
    bytes, err := vpc.Status.Bytes()
    if err != nil {
       klog.Error(err)
       return err
    }
    //TODO 修改vpc
    vpc, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Patch(context.Background(), vpc.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
    if err != nil {
       klog.Error(err)
       return err
    }

    if len(vpc.Annotations) != 0 && strings.ToLower(vpc.Annotations[util.VpcLbAnnotation]) == "on" {
       if err = c.createVpcLb(vpc); err != nil {
          return err
       }
    } else if err = c.deleteVpcLb(vpc); err != nil {
       return err
    }

    subnets, err := c.subnetsLister.List(labels.Everything())
    if err != nil {
       klog.Error(err)
       return err
    }

    for _, subnet := range subnets {
       if subnet.Spec.Vpc == key {
          c.addOrUpdateSubnetQueue.Add(subnet.Name)
       }
    }
    if vpc.Name != util.DefaultVpc {
       if cachedVpc.Spec.EnableExternal {
          if !cachedVpc.Status.EnableExternal {
             // connect vpc to default external
             klog.Infof("connect external network with vpc %s", vpc.Name)
             if err := c.handleAddVpcExternalSubnet(key, c.config.ExternalGatewaySwitch); err != nil {
                klog.Errorf("failed to add default external connection for vpc %s, error %v", key, err)
                return err
             }
          }
          if vpc.Spec.EnableBfd {
             klog.Infof("remove normal static ecmp route for vpc %s", vpc.Name)
             // auto remove normal type static route, if using ecmp based bfd
             if err := c.reconcileCustomVpcDelNormalStaticRoute(vpc.Name); err != nil {
                klog.Errorf("failed to reconcile del vpc %q normal static route", vpc.Name)
                return err
             }
          }
          if !vpc.Spec.EnableBfd {
             // auto add normal type static route, if not use ecmp based bfd
             klog.Infof("add normal external static route for enable external vpc %s", vpc.Name)
             if err := c.reconcileCustomVpcAddNormalStaticRoute(vpc.Name); err != nil {
                klog.Errorf("failed to reconcile vpc %q bfd static route", vpc.Name)
                return err
             }
          }
          if cachedVpc.Spec.ExtraExternalSubnets != nil {
             sort.Strings(vpc.Spec.ExtraExternalSubnets)
          }
          // add external subnets only in spec and delete external subnets only in status
          if !reflect.DeepEqual(vpc.Spec.ExtraExternalSubnets, vpc.Status.ExtraExternalSubnets) {
             for _, subnetStatus := range cachedVpc.Status.ExtraExternalSubnets {
                if !slices.Contains(cachedVpc.Spec.ExtraExternalSubnets, subnetStatus) {
                   klog.Infof("delete external subnet %s connection for vpc %s", subnetStatus, vpc.Name)
                   if err := c.handleDelVpcExternalSubnet(vpc.Name, subnetStatus); err != nil {
                      klog.Errorf("failed to delete external subnet %s connection for vpc %s, error %v", subnetStatus, vpc.Name, err)
                      return err
                   }
                }
             }
             for _, subnetSpec := range cachedVpc.Spec.ExtraExternalSubnets {
                if !slices.Contains(cachedVpc.Status.ExtraExternalSubnets, subnetSpec) {
                   klog.Infof("connect external subnet %s with vpc %s", subnetSpec, vpc.Name)
                   if err := c.handleAddVpcExternalSubnet(key, subnetSpec); err != nil {
                      klog.Errorf("failed to add external subnet %s connection for vpc %s, error %v", subnetSpec, key, err)
                      return err
                   }
                }
             }
             if err := c.updateVpcAddExternalStatus(key, true); err != nil {
                klog.Error("failed to update additional external subnets status, %v", err)
                return err
             }
          }
       }

       if !cachedVpc.Spec.EnableBfd && cachedVpc.Status.EnableBfd {
          lrpEipName := fmt.Sprintf("%s-%s", key, c.config.ExternalGatewaySwitch)
          if err := c.OVNNbClient.DeleteBFD(lrpEipName, ""); err != nil {
             klog.Error(err)
             return err
          }
          if err := c.handleDeleteVpcStaticRoute(key); err != nil {
             klog.Errorf("failed to delete bfd route for vpc %s, error %v", key, err)
             return err
          }
       }

       if !cachedVpc.Spec.EnableExternal && cachedVpc.Status.EnableExternal {
          // disconnect vpc to default external
          if err := c.handleDelVpcExternalSubnet(key, c.config.ExternalGatewaySwitch); err != nil {
             klog.Errorf("failed to delete external connection for vpc %s, error %v", key, err)
             return err
          }
       }

       if cachedVpc.Status.ExtraExternalSubnets != nil && !cachedVpc.Spec.EnableExternal {
          // disconnect vpc to extra external subnets
          for _, subnet := range cachedVpc.Status.ExtraExternalSubnets {
             klog.Infof("disconnect external network %s to vpc %s", subnet, vpc.Name)
             if err := c.handleDelVpcExternalSubnet(key, subnet); err != nil {
                klog.Error(err)
                return err
             }
          }
          if err := c.updateVpcAddExternalStatus(key, false); err != nil {
             klog.Error("failed to update additional external subnets status, %v", err)
             return err
          }
       }
    }

    return nil
}

创建自定义vpc网关

kind: VpcNatGateway
apiVersion: kubeovn.io/v1
metadata:
  name: gw1
spec:
  vpc: test-vpc-1
  subnet: net1
  lanIp: 10.0.1.254
  externalSubnets:
    - ovn-vpc-external-network

  1. 会检查这个vpc是否存在

  2. 会检查这个vpc的网关是否存在

  3. 会检查自定义vpc的子网是否存在

  4. 这里的lanIp必须和上面创建自定义vpc时指定的nextHopIp一致,会在网关pod上创建一个以lanIp为ip地址的网卡,用于作为这个自定义vpc的网关内部的地址

  5. externalSubnets指定了使用的外部网络,会从这个子网中分配一个ip地址作为外部网关地址

  6. 然后会根据上面指定的网关镜像构造一个StatefulSets并且创建自定vpc的StatefulSets对象,里面有网关pod

func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
    // create nat gw statefulset
    c.vpcNatGwKeyMutex.LockKey(key)
    defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
    klog.Infof("handle add/update vpc nat gateway %s", key)

    if vpcNatEnabled != "true" {
       return fmt.Errorf("iptables nat gw not enable")
    }
    //TODO 检查指定vpc网关是否存在
    gw, err := c.vpcNatGatewayLister.Get(key)
    if err != nil {
       if k8serrors.IsNotFound(err) {
          return nil
       }
       klog.Error(err)
       return err
    }
    //TODO 检查vpc网关指定的vpc是否存在
    if _, err := c.vpcsLister.Get(gw.Spec.Vpc); err != nil {
       err = fmt.Errorf("failed to get vpc '%s', err: %v", gw.Spec.Vpc, err)
       klog.Error(err)
       return err
    }
    //TODO 检查vpc内子网是否存在,会在vpc网关pod里面创建一个以这个子网中ip为网关的网卡
    if _, err := c.subnetsLister.Get(gw.Spec.Subnet); err != nil {
       err = fmt.Errorf("failed to get subnet '%s', err: %v", gw.Spec.Subnet, err)
       klog.Error(err)
       return err
    }

    // check or create statefulset
    needToCreate := false
    needToUpdate := false
    //TODO 判断vpc网关pod的StatefulSets是否存在,如果存在表示是更新操作,如果不存在表示需要创建网关pod
    oldSts, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
       Get(context.Background(), util.GenNatGwStsName(gw.Name), metav1.GetOptions{})
    if err != nil {
       if k8serrors.IsNotFound(err) {
          needToCreate = true
       } else {
          klog.Error(err)
          return err
       }
    }
    //TODO 构建vpc网关pod StatefulSets的配置文件
    newSts := c.genNatGwStatefulSet(gw, oldSts.DeepCopy())
    if !needToCreate && isVpcNatGwChanged(gw) {
       needToUpdate = true
    }

    switch {
    case needToCreate:
       //TODO 创建vpc网关pod的StatefulSets。以这个为pod为vpc的出网网关
       // if pod create successfully, will add initVpcNatGatewayQueue
       if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
          Create(context.Background(), newSts, metav1.CreateOptions{}); err != nil {
          err := fmt.Errorf("failed to create statefulset '%s', err: %v", newSts.Name, err)
          klog.Error(err)
          return err
       }
       if err = c.patchNatGwStatus(key); err != nil {
          klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err)
          return err
       }
       return nil
    case needToUpdate:
       if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
          Update(context.Background(), newSts, metav1.UpdateOptions{}); err != nil {
          err := fmt.Errorf("failed to update statefulset '%s', err: %v", newSts.Name, err)
          klog.Error(err)
          return err
       }
       if err = c.patchNatGwStatus(key); err != nil {
          klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err)
          return err
       }
    default:
       // check if need to change qos
       if gw.Spec.QoSPolicy != gw.Status.QoSPolicy {
          if gw.Status.QoSPolicy != "" {
             if err = c.execNatGwQoS(gw, gw.Status.QoSPolicy, QoSDel); err != nil {
                klog.Errorf("failed to add qos for nat gw %s, %v", key, err)
                return err
             }
          }
          if gw.Spec.QoSPolicy != "" {
             if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil {
                klog.Errorf("failed to del qos for nat gw %s, %v", key, err)
                return err
             }
          }
          if err := c.updateCrdNatGwLabels(key, gw.Spec.QoSPolicy); err != nil {
             err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err)
             klog.Error(err)
             return err
          }
          // if update qos success, will update nat gw status
          if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil {
             klog.Errorf("failed to patch nat gw qos status for nat gw %s, %v", key, err)
             return err
          }
       }
    }

    return nil
}

  1. 自定义vpc网关pod创建完成后会执行下面函数

func (c *Controller) handleInitVpcNatGw(key string) error {
    if vpcNatEnabled != "true" {
       return fmt.Errorf("iptables nat gw not enable")
    }

    c.vpcNatGwKeyMutex.LockKey(key)
    defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
    klog.Infof("handle init vpc nat gateway %s", key)

    gw, err := c.vpcNatGatewayLister.Get(key)
    if err != nil {
       if k8serrors.IsNotFound(err) {
          return nil
       }
       klog.Error(err)
       return err
    }
    // subnet for vpc-nat-gw has been checked when create vpc-nat-gw

    oriPod, err := c.getNatGwPod(key)
    if err != nil {
       err := fmt.Errorf("failed to get nat gw %s pod: %v", gw.Name, err)
       klog.Error(err)
       return err
    }
    pod := oriPod.DeepCopy()

    if pod.Status.Phase != corev1.PodRunning {
       time.Sleep(10 * time.Second)
       return fmt.Errorf("failed to init vpc nat gateway, pod is not ready")
    }
    //TODO 判断是否已经初始化了,已经初始化后直接返回
    if _, hasInit := pod.Annotations[util.VpcNatGatewayInitAnnotation]; hasInit {
       return nil
    }
    //TODO 没有初始化,开始进行初始化
    natGwCreatedAT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
    klog.V(3).Infof("nat gw pod '%s' inited at %s", key, natGwCreatedAT)
    //TODO 在vpc的网关pod中执行,指定nat规则,后面需要细看
    if err = c.execNatGwRules(pod, natGwInit, []string{fmt.Sprintf("%s,%s", c.config.ServiceClusterIPRange, pod.Annotations[util.GatewayAnnotation])}); err != nil {
       err = fmt.Errorf("failed to init vpc nat gateway, %v", err)
       klog.Error(err)
       return err
    }
    //TODO 在vpc的网关pod中执行,进行qos
    if gw.Spec.QoSPolicy != "" {
       if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil {
          klog.Errorf("failed to add qos for nat gw %s, %v", key, err)
          return err
       }
    }
    // if update qos success, will update nat gw status
    if gw.Spec.QoSPolicy != gw.Status.QoSPolicy {
       if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil {
          klog.Errorf("failed to patch status for nat gw %s, %v", key, err)
          return err
       }
    }

    if err := c.updateCrdNatGwLabels(gw.Name, gw.Spec.QoSPolicy); err != nil {
       err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err)
       klog.Error(err)
       return err
    }

    c.updateVpcFloatingIPQueue.Add(key)
    c.updateVpcDnatQueue.Add(key)
    c.updateVpcSnatQueue.Add(key)
    c.updateVpcSubnetQueue.Add(key)
    c.updateVpcEipQueue.Add(key)
    pod.Annotations[util.VpcNatGatewayInitAnnotation] = "true"
    patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
    if err != nil {
       klog.Error(err)
       return err
    }
    if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
       types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
       err := fmt.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)
       klog.Error(err)
       return err
    }
    return nil
}

给自定义vpc创建eip和snat规则

kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
  name: eip1
spec:
  v4ip: 192.168.100.230
  natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:
  name: snat01
spec:
  eip: eip1
  internalCIDR: 10.1.1.0/24

  1. 下面代码是创建eip的过程

  2. 首先会判断eip是否已经存在

  3. 判断在EIP资源中是否指定了ip,如果没有指定则根据外部子网随机生成

  4. 在自定义vpc网关的pod中创建网卡,设置ip地址

func (c *Controller) handleAddIptablesEip(key string) error {
    if vpcNatEnabled != "true" {
       return fmt.Errorf("iptables nat gw not enable")
    }

    c.vpcNatGwKeyMutex.LockKey(key)
    defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
    klog.Infof("handle add iptables eip %s", key)
    cachedEip, err := c.iptablesEipsLister.Get(key)
    if err != nil {
       if k8serrors.IsNotFound(err) {
          return nil
       }
       klog.Error(err)
       return err
    }
    //TODO eip资源已经ok,直接返回
    if cachedEip.Status.Ready && cachedEip.Status.IP != "" {
       // already ok
       return nil
    }
    var v4ip, v6ip, mac, eipV4Cidr, v4Gw string
    //TODO  如果没有指定,默认是ovn-vpc-external-network
    externalNetwork := util.GetExternalNetwork(cachedEip.Spec.ExternalSubnet)
    //TODO ovn-vpc-external-network.kube-system
    externalProvider := fmt.Sprintf("%s.%s", externalNetwork, attachmentNs)

    portName := ovs.PodNameToPortName(cachedEip.Name, cachedEip.Namespace, externalProvider)
    if cachedEip.Spec.V4ip != "" {
       //TODO 创建eip时指定了ip,从ovn-vpc-external-network中分配ip
       if v4ip, v6ip, mac, err = c.acquireStaticEip(cachedEip.Name, cachedEip.Namespace, portName, cachedEip.Spec.V4ip, externalNetwork); err != nil {
          klog.Errorf("failed to acquire static eip, err: %v", err)
          return err
       }
    } else {
       //TODO 随机生成ip,从ovn-vpc-external-network中分配ip
       // Random allocate
       if v4ip, v6ip, mac, err = c.acquireEip(cachedEip.Name, cachedEip.Namespace, portName, externalNetwork); err != nil {
          klog.Errorf("failed to allocate eip, err: %v", err)
          return err
       }
    }
    //TODO ovn-vpc-external-network子网的的eip和掩码
    if eipV4Cidr, err = c.getEipV4Cidr(v4ip, externalNetwork); err != nil {
       klog.Errorf("failed to get eip cidr, err: %v", err)
       return err
    }
    //TODO ovn-vpc-external-network子网的网关
    if v4Gw, _, err = c.GetGwBySubnet(externalNetwork); err != nil {
       klog.Errorf("failed to get gw, err: %v", err)
       return err
    }
    //kind: IptablesEIP
    //apiVersion: kubeovn.io/v1
    // metadata:
    // name: eip-static
    //spec:
    // natGwDp: gw1
    // v4ip: 10.0.1.111
    //TODO 参数:vpc网关pod名字、ovn-vpc-external-network子网的网关、ovn-vpc-external-network子网的的eip和掩码
    //TODO 这个函数给vpc网关pod创建了网卡ip
    if err = c.createEipInPod(cachedEip.Spec.NatGwDp, v4Gw, eipV4Cidr); err != nil {
       klog.Errorf("failed to create eip '%s' in pod, %v", key, err)
       return err
    }

    if cachedEip.Spec.QoSPolicy != "" {
       if err = c.addEipQoS(cachedEip, v4ip); err != nil {
          klog.Errorf("failed to add qos '%s' in pod, %v", key, err)
          return err
       }
    }
    if err = c.createOrUpdateCrdEip(key, v4ip, v6ip, mac, cachedEip.Spec.NatGwDp, cachedEip.Spec.QoSPolicy, externalNetwork); err != nil {
       klog.Errorf("failed to update eip %s, %v", key, err)
       return err
    }
    return nil
}

  1. 下面是创建snat规则的函数

  2. 获取snat绑定的eip,获取需要进行snat的网段和snat需要设置成的ip地址

  3. 进入自定义vpc网关的pod中执行iptables规则,来设置。是直接调用的脚本,后面会列出脚本的内容

// TODO iptables snat规则的配置文件
func (c *Controller) handleAddIptablesSnatRule(key string) error {
    if vpcNatEnabled != "true" {
       return fmt.Errorf("iptables nat gw not enable")
    }

    c.vpcNatGwKeyMutex.LockKey(key)
    defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
    klog.Infof("handle add iptables snat rule %s", key)

    snat, err := c.iptablesSnatRulesLister.Get(key)
    if err != nil {
       if k8serrors.IsNotFound(err) {
          return nil
       }
       klog.Error(err)
       return err
    }
    //TODO 判断 snat资源是否已经就绪,如果就绪直接退出
    if snat.Status.Ready && snat.Status.V4ip != "" {
       // already ok
       return nil
    }
    klog.V(3).Infof("handle add iptables snat %s", key)
    //TODO 获得做snat规则的源ip地址
    eipName := snat.Spec.EIP
    if eipName == "" {
       return fmt.Errorf("failed to create snat rule, should set eip")
    }
    //TODO 获取eip对象
    eip, err := c.GetEip(eipName)
    if err != nil {
       klog.Errorf("failed to get eip, %v", err)
       return err
    }
    // create snat
    //TODO 获取需要做snat的ip范围
    v4Cidr, _ := util.SplitStringIP(snat.Spec.InternalCIDR)
    if v4Cidr == "" {
       // only support IPv4 snat
       err = fmt.Errorf("failed to get snat v4 internal cidr, original cidr is %s", snat.Spec.InternalCIDR)
       return err
    }
    //TODO 进入vpc网关pod 添加snat规则
    if err = c.createSnatInPod(eip.Spec.NatGwDp, eip.Status.IP, v4Cidr); err != nil {
       klog.Errorf("failed to create snat, %v", err)
       return err
    }
    //TODO 修改iptbales snat对象状态,为就绪状态
    if err = c.patchSnatStatus(key, eip.Status.IP, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil {
       klog.Errorf("failed to update status for snat %s, %v", key, err)
       return err
    }
    //TODO 修改iptbales snat对象label信息
    if err = c.patchSnatLabel(key, eip); err != nil {
       klog.Errorf("failed to patch label for snat %s, %v", key, err)
       return err
    }
    if err = c.handleAddIptablesSnatFinalizer(key); err != nil {
       klog.Errorf("failed to handle add finalizer for snat, %v", err)
       return err
    }
    //TODO 修改eip对象状态
    if err = c.patchEipStatus(eipName, "", "", "", true); err != nil {
       // refresh eip nats
       klog.Errorf("failed to patch snat use eip %s, %v", key, err)
       return err
    }
    return nil
}

给自定义vpc创建fip

首先说明一下fip的作用,fip全称的floating Ip,用于将一个外网ip和ovn的内部ip一一绑定,这样ovn内部的ip可以直接通过访问fip来进行访问。

从下面的配置中可以看到其实fip就是eip和fiprule的组合而成的一个概念

kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
  name: eip3
spec:
  v4ip: 192.168.100.232
  natGwDp: gw1
---
kind: IptablesFIPRule
apiVersion: kubeovn.io/v1
metadata:
  name: fip01
spec:
  eip: eip3
  internalIp: 10.0.1.5

  1. eip的创建上面已经介绍了,下面主要烤fiprule,主要有两个参数eip和internalIp,这个规则的作用就是将ovn的内部ip地址和一个外部ip地址进行一一绑定

  2. 对于internalIp和eip的绑定也是通过在自定义vpc网关的pod中执行iptables规则实现的,下面主要说明iptables规则

# TODO 将internalIp和eip使用iptables规则进行一一对应。
function add_floating_ip() {
    # make sure inited
    check_inited
    for rule in $@
    do
        arr=(${rule//,/ })
        eip=(${arr[0]//\// })
        internalIp=${arr[1]}
        # check if already exist
        iptables-save  | grep "EXCLUSIVE_DNAT" | grep -w "\-d $eip/32" | grep  "destination" && exit 0
        exec_cmd "iptables -t nat -A EXCLUSIVE_DNAT -d $eip -j DNAT --to-destination $internalIp"
        exec_cmd "iptables -t nat -A EXCLUSIVE_SNAT -s $internalIp -j SNAT --to-source $eip"
    done
}

可以看到主要添加了两条iptables规则,如下所示

EXCLUSIVE_DNAT 和 EXCLUSIVE_SNAT 是kube-ovn自定义的两条链

直接将EXCLUSIVE_DNAT理解为nat表下的PREROUTING链,将EXCLUSIVE_SNAT理解为POSTROUTING链即可,添加了snat和dnat规则。

iptables -t nat -A EXCLUSIVE_DNAT -d $eip -j DNAT --to-destination

iptables -t nat -A EXCLUSIVE_SNAT -s $internalIp -j SNAT --to-source $eip

我们自己也是可以实现的

eg:iptables -t nat -A PREROUTING -d $eip -j DNAT --to-destination $internalIp

iptables -t nat -A POSTROUTING -s $internalIp -j SNAT --to-source $eip

可以分析一下具体的流量走势:

  1. 对于自定义vpc下的一个pod需要访问互联网时,首先会将报文转发到自定义vpc网关的pod里面,在流量进入网关pod的协议栈时会首先进入到PREROUTING链,在这个链里面没有匹配到规则,然后会进行路由发现不是进入到本地的因此会进入到FORWARD链同样不会匹配到,再然后进入到POSTROUTING链,因为我们在POSTROUTING链添加了规则能够匹配上,因此会进行snat转换,将源ip设置为eip的地址,因此数据包便能够出去了。

  2. 对于互联网想要访问ovn内部网络的internalIp地址时,因为对外服务的ip是eip,因此外部互联网用户想要访问的肯定是eip,当流量进入到自定义vpc网关pod时,会进入到PREROUTING链,因为我们添加了dnat规则,因此能够匹配到然后将目的ip地址设置为internalIp,然后进行路由选择,判定为不是本机ip地址,因此会进入到FORWARD链,没有规则被匹配,再进入到POSTROUTING链没有被匹配,然后会将报文送入到ovn网络中

相关推荐

  1. kube-ovn定义vpc

    2024-02-20 16:10:05       23 阅读
  2. VC++6.0定义实现日志记录到文件及界面显示

    2024-02-20 16:10:05       11 阅读
  3. 定义中间件

    2024-02-20 16:10:05       31 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-20 16:10:05       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-20 16:10:05       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-20 16:10:05       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-20 16:10:05       18 阅读

热门阅读

  1. 开源BLHELI-S 代码详细解读(四)

    2024-02-20 16:10:05       22 阅读
  2. Vue3中watch与watchEffect的区别

    2024-02-20 16:10:05       23 阅读
  3. OSS业务存储适配器模式

    2024-02-20 16:10:05       21 阅读
  4. python数据分析numpy基础之var求数组方差

    2024-02-20 16:10:05       26 阅读
  5. 缓存使用常见思路及问题

    2024-02-20 16:10:05       18 阅读
  6. BUG:required a single bean, but 2 were found:

    2024-02-20 16:10:05       22 阅读
  7. Prompt Engineering 提示工程教程详情

    2024-02-20 16:10:05       29 阅读
  8. LeetCode_20_简单_有效的括号

    2024-02-20 16:10:05       31 阅读
  9. Github 2024-02-19 开源项目日报 Top10

    2024-02-20 16:10:05       31 阅读