配置自定义vpc
开启自定义vpc网关配置
对于自定义的vpc网关需要configmap配置文件进行开启
ovn-vpc-nat-config中指定了自定义vpc网关的pod使用的镜像,自定义vpc的网关就是一个pod,在pod中通过配置iptables来实现了eip、snat、dnat、fip
对于ovn-vpc-nat-gw-config指定了启动自定义vpc网关
kind: ConfigMap
apiVersion: v1
metadata:
name: ovn-vpc-nat-config
namespace: kube-system
data:
image: 'docker.io/kubeovn/vpc-nat-gateway:v1.12.4'
---
kind: ConfigMap
apiVersion: v1
metadata:
name: ovn-vpc-nat-gw-config
namespace: kube-system
data:
enable-vpc-nat-gw: 'true'
配置外部子网
下面的配置是创建外部子网
cidrBlock是外部子网的网段
gateway外部子网网关
excludeIps用来排除不可使用的网段,防止分配的eip和外部冲突
NetworkAttachmentDefinition资源用来定义macvlan,macvlan可以将一个物理网卡虚拟成多个虚拟网卡。
因此每创建一个eip便会根据ens37这个物理网卡创建一个虚拟网卡,配置kubeovn分配的ip地址,并将之放入自定义vpc网关的pod中
apiVersion: kubeovn.io/v1
kind: Subnet
metadata:
name: ovn-vpc-external-network
spec:
protocol: IPv4
provider: ovn-vpc-external-network.kube-system
cidrBlock: 192.168.100.0/24
gateway: 192.168.100.1 # IP address of the physical gateway
excludeIps:
- 192.168.100.1..192.168.100.220
---
apiVersion: "k8s.cni.cncf.io/v1"
kind: NetworkAttachmentDefinition
metadata:
name: ovn-vpc-external-network
namespace: kube-system
spec:
config: '{
"cniVersion": "0.3.0",
"type": "macvlan",
"master": "ens37",
"mode": "bridge",
"ipam": {
"type": "kube-ovn",
"server_socket": "/run/openvswitch/kube-ovn-daemon.sock",
"provider": "ovn-vpc-external-network.kube-system"
}
}'
创建自定义vpc
创建一个自定义vpc,每创建一个vpc,kube-ovn会创建一个ovn逻辑路由器,在vpc中定义的staticRoutes会被添加到ovn逻辑路由器上,下面vpc定义的静态路由是将所有的报文的nexthop到10.0.1.254,10.0.1.254是vpc内部子网的一个ip,是vpc的网关ip
kind: Vpc
apiVersion: kubeovn.io/v1
metadata:
name: test-vpc-1
spec:
namespaces:
- ns1
staticRoutes:
- cidr: 0.0.0.0/0
nextHopIP: 10.0.1.254
policy: policyDst
在自定义vpc下创建一个子网
在自定义vpc下创建了一个子网和一个pod
kind: Subnet
apiVersion: kubeovn.io/v1
metadata:
name: net1
spec:
vpc: test-vpc-1
cidrBlock: 10.0.1.0/24
protocol: IPv4
namespaces:
- ns1
---
apiVersion: v1
kind: Pod
metadata:
annotations:
ovn.kubernetes.io/logical_switch: net1
namespace: ns1
name: vpc1-pod
spec:
containers:
- name: vpc1-pod
image: docker.io/library/nginx:alpine
创建自定义vpc的网关
这里创建了自定义vpc的网关
vpc指定了这个网关属于test-vpc-1
subnet是test-vpc-1下的一个子网
lanIp是步骤2子网里的一个ip地址,用来作为vpc的网关ip,这个ip必须和test-vpc-1里面的静态路由一致
externalSubnets指定了外部子网,在创建vpc网关的pod时会从这个外部子网里分配一个eip,用来连接外部网络
kind: VpcNatGateway
apiVersion: kubeovn.io/v1
metadata:
name: gw1
spec:
vpc: test-vpc-1
subnet: net1
lanIp: 10.0.1.254
externalSubnets:
- ovn-vpc-external-network
给自定义vpc添加eip
下面配置给自定义vpc网关的pod创建了两个eip,并且配置了snat规则
一个自定义vpc的pod可以创建多个eip,下面便是创建了两个eip
v4ip表示给这个eip指定的地址
natGwDp表示在哪个网关上添加eip
在IptablesSnatRule中eip、internalCIDR表示在自定义vpc网关pod里面创建的snat规则,表示添加源10.1.1.0/24的源ip修改为192.168.100.230的iptables规则
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
name: eip1
spec:
v4ip: 192.168.100.230
natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:
name: snat01
spec:
eip: eip1
internalCIDR: 10.1.1.0/24
---
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
name: eip2
spec:
v4ip: 192.168.100.231
natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:
name: snat01
spec:
eip: eip2
internalCIDR: 10.1.1.0/24
给自定义vpc添加fip
fip表示浮动ip,用来将自定义vpc内部的一个internalIp和eip绑定,这样便可以实现外部网络能够直接访问pod。
创建eip和上面的操作一致,关键是fip规则
fip规则中指定了eip和internalIp,会在自定义vpc网关的pod里面添加iptables规则,来将eip和internalIp一一绑定
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
name: eip3
spec:
v4ip: 192.168.100.232
natGwDp: gw1
---
kind: IptablesFIPRule
apiVersion: kubeovn.io/v1
metadata:
name: fip01
spec:
eip: eip3
internalIp: 10.0.1.5
自定义vpc源码分析
开启自定义vpc网关的代码
在上面的配置中需要添加下面的配置才能够使用自定义vpc的网关,下面代码看一下这块配置的作用
kind: ConfigMap
apiVersion: v1
metadata:
name: ovn-vpc-nat-config
namespace: kube-system
data:
image: 'docker.io/kubeovn/vpc-nat-gateway:v1.12.4'
---
kind: ConfigMap
apiVersion: v1
metadata:
name: ovn-vpc-nat-gw-config
namespace: kube-system
data:
enable-vpc-nat-gw: 'true'
resyncVpcNatGwConfig这个函数是定时执行的,会定时检查是否已经创建了ovn-vpc-nat-config和ovn-vpc-nat-gw-config两个configmap文件,从中获取自定义vpc网关使用pod镜像
resyncVpcNatImage中获取配置中的自定义vpc网关的镜像
当两个配置都被指定后会更新自定义vpc网关的资源,这是因为在配置文件创建之前可能已经创建了自定义vpc网关
// TODO 是定时执行的 判断是否创建vpc网关的configmap
func (c *Controller) resyncVpcNatGwConfig() {
//TODO 從configmap中獲取ovn-vpc-nat-gw-config
cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatGatewayConfig)
if err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to get ovn-vpc-nat-gw-config, %v", err)
return
}
if k8serrors.IsNotFound(err) || cm.Data["enable-vpc-nat-gw"] == "false" {
if vpcNatEnabled == "false" {
return
}
klog.Info("start to clean up vpc nat gateway")
if err := c.cleanUpVpcNatGw(); err != nil {
klog.Errorf("failed to clean up vpc nat gateway, %v", err)
return
}
vpcNatEnabled = "false"
VpcNatCmVersion = ""
klog.Info("finish clean up vpc nat gateway")
return
}
if vpcNatEnabled == "true" && VpcNatCmVersion == cm.ResourceVersion {
return
}
gws, err := c.vpcNatGatewayLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get vpc nat gateway, %v", err)
return
}
//TODO 获取vpc网关镜像
if err = c.resyncVpcNatImage(); err != nil {
klog.Errorf("failed to resync vpc nat config, err: %v", err)
return
}
vpcNatEnabled = "true"
VpcNatCmVersion = cm.ResourceVersion
for _, gw := range gws {
c.addOrUpdateVpcNatGatewayQueue.Add(gw.Name)
}
klog.Info("finish establishing vpc-nat-gateway")
}
// TODO 在部署vpc网关的时候需要创建这个configmap,代码里需要从这个cm中获取镜像
func (c *Controller) resyncVpcNatImage() error {
cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatConfig)
if err != nil {
err = fmt.Errorf("failed to get ovn-vpc-nat-config, %v", err)
klog.Error(err)
return err
}
image, exist := cm.Data["image"]
if !exist {
err = fmt.Errorf("%s should have image field", util.VpcNatConfig)
klog.Error(err)
return err
}
vpcNatImage = image
return nil
}
创建自定义vpc代码
kind: Vpc
apiVersion: kubeovn.io/v1
metadata:
name: test-vpc-1
spec:
namespaces:
- ns1
staticRoutes:
- cidr: 0.0.0.0/0
nextHopIP: 10.0.1.254
policy: policyDst
对于创建自定义vpc可以添加静态路由和路由策略
ovn会创建一个逻辑交换机
给逻辑交换机添加静态路由和路由策略
nextHopIp很重要,是网关的ip,kube-ovn会在自定义vpc的逻辑路由器上添加一条将所有流量都引入到10.0.1.254的静态路由,是网关的一个内部ip地址。
func (c *Controller) handleAddOrUpdateVpc(key string) error {
c.vpcKeyMutex.LockKey(key)
defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update vpc %s", key)
// get latest vpc info
var (
vpc, cachedVpc *kubeovnv1.Vpc
err error
)
cachedVpc, err = c.vpcsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
vpc = cachedVpc.DeepCopy()
//TODO 对创建的vpc做一些默认值设置,并且检查vpc字段是否符合格式
if err = formatVpc(vpc, c); err != nil {
klog.Errorf("failed to format vpc %s: %v", key, err)
return err
}
//TODO 给新的vpc创建一个ovn的逻辑路由器,kube-ovn的vpc对应于ovn的逻辑路由器
if err = c.createVpcRouter(key); err != nil {
return err
}
//TODO 这个应该是在vpc中指定与之相连的另一端,暂时不考虑
var newPeers []string
for _, peering := range vpc.Spec.VpcPeerings {
if err = util.CheckCidrs(peering.LocalConnectIP); err != nil {
klog.Errorf("invalid cidr %s", peering.LocalConnectIP)
return err
}
newPeers = append(newPeers, peering.RemoteVpc)
if err := c.OVNNbClient.CreatePeerRouterPort(vpc.Name, peering.RemoteVpc, peering.LocalConnectIP); err != nil {
klog.Errorf("create peer router port for vpc %s, %v", vpc.Name, err)
return err
}
}
for _, oldPeer := range vpc.Status.VpcPeerings {
if !util.ContainsString(newPeers, oldPeer) {
if err = c.OVNNbClient.DeleteLogicalRouterPort(fmt.Sprintf("%s-%s", vpc.Name, oldPeer)); err != nil {
klog.Errorf("delete peer router port for vpc %s, %v", vpc.Name, err)
return err
}
}
}
// handle static route
var (
staticExistedRoutes []*ovnnb.LogicalRouterStaticRoute
staticTargetRoutes []*kubeovnv1.StaticRoute
staticRouteMapping map[string][]*kubeovnv1.StaticRoute
)
//TODO 获取vpc下的静态路由条目
staticExistedRoutes, err = c.OVNNbClient.ListLogicalRouterStaticRoutes(vpc.Name, nil, nil, "", nil)
if err != nil {
klog.Errorf("failed to get vpc %s static route list, %v", vpc.Name, err)
return err
}
//; kind: Vpc
//; apiVersion: kubeovn.io/v1
//; metadata:
//; name: test-vpc-1
//; spec:
//; staticRoutes:
//; - cidr: 0.0.0.0/0
//; nextHopIP: 10.0.1.254
//; policy: policyDst
//; - cidr: 172.31.0.0/24
//; nextHopIP: 10.0.1.253
//; policy: policySrc
//; routeTable: "rtb1"
//TODO 构建vpc下路由表名和条目的map
staticRouteMapping = c.getRouteTablesByVpc(vpc)
//TODO 在vpc下指定的静态路由
staticTargetRoutes = vpc.Spec.StaticRoutes
//TODO 对于默认vpc的处理
if vpc.Name == c.config.ClusterRouter {
if _, ok := staticRouteMapping[util.MainRouteTable]; !ok {
staticRouteMapping[util.MainRouteTable] = nil
}
joinSubnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
if err != nil {
if !k8serrors.IsNotFound(err) {
klog.Error("failed to get node switch subnet %s: %v", c.config.NodeSwitch)
return err
}
}
gatewayV4, gatewayV6 := util.SplitStringIP(joinSubnet.Spec.Gateway)
if gatewayV4 != "" {
for tabele := range staticRouteMapping {
staticTargetRoutes = append(
staticTargetRoutes,
&kubeovnv1.StaticRoute{
Policy: kubeovnv1.PolicyDst,
CIDR: "0.0.0.0/0",
NextHopIP: gatewayV4,
RouteTable: tabele,
},
)
}
}
if gatewayV6 != "" {
for tabele := range staticRouteMapping {
staticTargetRoutes = append(
staticTargetRoutes,
&kubeovnv1.StaticRoute{
Policy: kubeovnv1.PolicyDst,
CIDR: "::/0",
NextHopIP: gatewayV6,
RouteTable: tabele,
},
)
}
}
if c.config.EnableEipSnat {
cm, err := c.configMapsLister.ConfigMaps(c.config.ExternalGatewayConfigNS).Get(util.ExternalGatewayConfig)
if err == nil {
nextHop := cm.Data["external-gw-addr"]
if nextHop == "" {
externalSubnet, err := c.subnetsLister.Get(c.config.ExternalGatewaySwitch)
if err != nil {
klog.Errorf("failed to get subnet %s, %v", c.config.ExternalGatewaySwitch, err)
return err
}
nextHop = externalSubnet.Spec.Gateway
if nextHop == "" {
klog.Errorf("no available gateway address")
return fmt.Errorf("no available gateway address")
}
}
if strings.Contains(nextHop, "/") {
nextHop = strings.Split(nextHop, "/")[0]
}
lr, err := c.OVNNbClient.GetLogicalRouter(vpc.Name, false)
if err != nil {
klog.Errorf("failed to get logical router %s: %v", vpc.Name, err)
return err
}
for _, nat := range lr.Nat {
info, err := c.OVNNbClient.GetNATByUUID(nat)
if err != nil {
klog.Errorf("failed to get nat ip info for vpc %s, %v", vpc.Name, err)
return err
}
if info.LogicalIP != "" {
for table := range staticRouteMapping {
staticTargetRoutes = append(
staticTargetRoutes,
&kubeovnv1.StaticRoute{
Policy: kubeovnv1.PolicySrc,
CIDR: info.LogicalIP,
NextHopIP: nextHop,
RouteTable: table,
},
)
}
}
}
}
}
}
//TODO 比较出vpc需要删除和增加的路由
routeNeedDel, routeNeedAdd, err := diffStaticRoute(staticExistedRoutes, staticTargetRoutes)
if err != nil {
klog.Errorf("failed to diff vpc %s static route, %v", vpc.Name, err)
return err
}
//TODO 删除vpc下的路由
for _, item := range routeNeedDel {
klog.Infof("vpc %s del static route: %+v", vpc.Name, item)
policy := convertPolicy(item.Policy)
if err = c.OVNNbClient.DeleteLogicalRouterStaticRoute(vpc.Name, &item.RouteTable, &policy, item.CIDR, item.NextHopIP); err != nil {
klog.Errorf("del vpc %s static route failed, %v", vpc.Name, err)
return err
}
}
//TODO 添加vpc静态路由
for _, item := range routeNeedAdd {
if item.BfdID != "" {
klog.Infof("vpc %s add static ecmp route: %+v", vpc.Name, item)
if err = c.OVNNbClient.AddLogicalRouterStaticRoute(
vpc.Name, item.RouteTable, convertPolicy(item.Policy), item.CIDR, &item.BfdID, item.NextHopIP,
); err != nil {
klog.Errorf("failed to add bfd static route to vpc %s , %v", vpc.Name, err)
return err
}
} else {
klog.Infof("vpc %s add static route: %+v", vpc.Name, item)
if err = c.OVNNbClient.AddLogicalRouterStaticRoute(
vpc.Name, item.RouteTable, convertPolicy(item.Policy), item.CIDR, nil, item.NextHopIP,
); err != nil {
klog.Errorf("failed to add normal static route to vpc %s , %v", vpc.Name, err)
return err
}
}
}
// handle policy route
var (
policyRouteExisted, policyRouteNeedDel, policyRouteNeedAdd []*kubeovnv1.PolicyRoute
policyRouteLogical []*ovnnb.LogicalRouterPolicy
externalIDs = map[string]string{"vendor": util.CniTypeName}
)
//TODO 比较出需要删除和添加的路由策略
if vpc.Name == c.config.ClusterRouter {
policyRouteExisted = reversePolicies(vpc.Annotations[util.VpcLastPolicies])
// diff list
policyRouteNeedDel, policyRouteNeedAdd = diffPolicyRouteWithExisted(policyRouteExisted, vpc.Spec.PolicyRoutes)
} else {
if vpc.Spec.PolicyRoutes == nil {
// do not clean default vpc policy routes
if err = c.OVNNbClient.ClearLogicalRouterPolicy(vpc.Name); err != nil {
klog.Errorf("clean all vpc %s policy route failed, %v", vpc.Name, err)
return err
}
} else {
policyRouteLogical, err = c.OVNNbClient.ListLogicalRouterPolicies(vpc.Name, -1, nil)
if err != nil {
klog.Errorf("failed to get vpc %s policy route list, %v", vpc.Name, err)
return err
}
// diff vpc policy route
policyRouteNeedDel, policyRouteNeedAdd = diffPolicyRouteWithLogical(policyRouteLogical, vpc.Spec.PolicyRoutes)
}
}
//TODO 删除VPC路由策略
// delete policies non-exist
for _, item := range policyRouteNeedDel {
klog.Infof("delete policy route for router: %s, priority: %d, match %s", vpc.Name, item.Priority, item.Match)
if err = c.OVNNbClient.DeleteLogicalRouterPolicy(vpc.Name, item.Priority, item.Match); err != nil {
klog.Errorf("del vpc %s policy route failed, %v", vpc.Name, err)
return err
}
}
//TODO 添加vpc路由策略
// add new policies
for _, item := range policyRouteNeedAdd {
klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, item.Match, string(item.Action), item.NextHopIP, externalIDs)
if err = c.OVNNbClient.AddLogicalRouterPolicy(vpc.Name, item.Priority, item.Match, string(item.Action), []string{item.NextHopIP}, externalIDs); err != nil {
klog.Errorf("add policy route to vpc %s failed, %v", vpc.Name, err)
return err
}
}
vpc.Status.Router = key
vpc.Status.Standby = true
vpc.Status.VpcPeerings = newPeers
//TODO 这里的lb是ovn的lb吗?
if c.config.EnableLb {
vpcLb, err := c.addLoadBalancer(key)
if err != nil {
klog.Error(err)
return err
}
vpc.Status.TCPLoadBalancer = vpcLb.TCPLoadBalancer
vpc.Status.TCPSessionLoadBalancer = vpcLb.TCPSessLoadBalancer
vpc.Status.UDPLoadBalancer = vpcLb.UDPLoadBalancer
vpc.Status.UDPSessionLoadBalancer = vpcLb.UDPSessLoadBalancer
vpc.Status.SctpLoadBalancer = vpcLb.SctpLoadBalancer
vpc.Status.SctpSessionLoadBalancer = vpcLb.SctpSessLoadBalancer
}
bytes, err := vpc.Status.Bytes()
if err != nil {
klog.Error(err)
return err
}
//TODO 修改vpc
vpc, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Patch(context.Background(), vpc.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Error(err)
return err
}
if len(vpc.Annotations) != 0 && strings.ToLower(vpc.Annotations[util.VpcLbAnnotation]) == "on" {
if err = c.createVpcLb(vpc); err != nil {
return err
}
} else if err = c.deleteVpcLb(vpc); err != nil {
return err
}
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Error(err)
return err
}
for _, subnet := range subnets {
if subnet.Spec.Vpc == key {
c.addOrUpdateSubnetQueue.Add(subnet.Name)
}
}
if vpc.Name != util.DefaultVpc {
if cachedVpc.Spec.EnableExternal {
if !cachedVpc.Status.EnableExternal {
// connect vpc to default external
klog.Infof("connect external network with vpc %s", vpc.Name)
if err := c.handleAddVpcExternalSubnet(key, c.config.ExternalGatewaySwitch); err != nil {
klog.Errorf("failed to add default external connection for vpc %s, error %v", key, err)
return err
}
}
if vpc.Spec.EnableBfd {
klog.Infof("remove normal static ecmp route for vpc %s", vpc.Name)
// auto remove normal type static route, if using ecmp based bfd
if err := c.reconcileCustomVpcDelNormalStaticRoute(vpc.Name); err != nil {
klog.Errorf("failed to reconcile del vpc %q normal static route", vpc.Name)
return err
}
}
if !vpc.Spec.EnableBfd {
// auto add normal type static route, if not use ecmp based bfd
klog.Infof("add normal external static route for enable external vpc %s", vpc.Name)
if err := c.reconcileCustomVpcAddNormalStaticRoute(vpc.Name); err != nil {
klog.Errorf("failed to reconcile vpc %q bfd static route", vpc.Name)
return err
}
}
if cachedVpc.Spec.ExtraExternalSubnets != nil {
sort.Strings(vpc.Spec.ExtraExternalSubnets)
}
// add external subnets only in spec and delete external subnets only in status
if !reflect.DeepEqual(vpc.Spec.ExtraExternalSubnets, vpc.Status.ExtraExternalSubnets) {
for _, subnetStatus := range cachedVpc.Status.ExtraExternalSubnets {
if !slices.Contains(cachedVpc.Spec.ExtraExternalSubnets, subnetStatus) {
klog.Infof("delete external subnet %s connection for vpc %s", subnetStatus, vpc.Name)
if err := c.handleDelVpcExternalSubnet(vpc.Name, subnetStatus); err != nil {
klog.Errorf("failed to delete external subnet %s connection for vpc %s, error %v", subnetStatus, vpc.Name, err)
return err
}
}
}
for _, subnetSpec := range cachedVpc.Spec.ExtraExternalSubnets {
if !slices.Contains(cachedVpc.Status.ExtraExternalSubnets, subnetSpec) {
klog.Infof("connect external subnet %s with vpc %s", subnetSpec, vpc.Name)
if err := c.handleAddVpcExternalSubnet(key, subnetSpec); err != nil {
klog.Errorf("failed to add external subnet %s connection for vpc %s, error %v", subnetSpec, key, err)
return err
}
}
}
if err := c.updateVpcAddExternalStatus(key, true); err != nil {
klog.Error("failed to update additional external subnets status, %v", err)
return err
}
}
}
if !cachedVpc.Spec.EnableBfd && cachedVpc.Status.EnableBfd {
lrpEipName := fmt.Sprintf("%s-%s", key, c.config.ExternalGatewaySwitch)
if err := c.OVNNbClient.DeleteBFD(lrpEipName, ""); err != nil {
klog.Error(err)
return err
}
if err := c.handleDeleteVpcStaticRoute(key); err != nil {
klog.Errorf("failed to delete bfd route for vpc %s, error %v", key, err)
return err
}
}
if !cachedVpc.Spec.EnableExternal && cachedVpc.Status.EnableExternal {
// disconnect vpc to default external
if err := c.handleDelVpcExternalSubnet(key, c.config.ExternalGatewaySwitch); err != nil {
klog.Errorf("failed to delete external connection for vpc %s, error %v", key, err)
return err
}
}
if cachedVpc.Status.ExtraExternalSubnets != nil && !cachedVpc.Spec.EnableExternal {
// disconnect vpc to extra external subnets
for _, subnet := range cachedVpc.Status.ExtraExternalSubnets {
klog.Infof("disconnect external network %s to vpc %s", subnet, vpc.Name)
if err := c.handleDelVpcExternalSubnet(key, subnet); err != nil {
klog.Error(err)
return err
}
}
if err := c.updateVpcAddExternalStatus(key, false); err != nil {
klog.Error("failed to update additional external subnets status, %v", err)
return err
}
}
}
return nil
}
创建自定义vpc网关
kind: VpcNatGateway
apiVersion: kubeovn.io/v1
metadata:
name: gw1
spec:
vpc: test-vpc-1
subnet: net1
lanIp: 10.0.1.254
externalSubnets:
- ovn-vpc-external-network
会检查这个vpc是否存在
会检查这个vpc的网关是否存在
会检查自定义vpc的子网是否存在
这里的lanIp必须和上面创建自定义vpc时指定的nextHopIp一致,会在网关pod上创建一个以lanIp为ip地址的网卡,用于作为这个自定义vpc的网关内部的地址
externalSubnets指定了使用的外部网络,会从这个子网中分配一个ip地址作为外部网关地址
然后会根据上面指定的网关镜像构造一个StatefulSets并且创建自定vpc的StatefulSets对象,里面有网关pod
func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
// create nat gw statefulset
c.vpcNatGwKeyMutex.LockKey(key)
defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update vpc nat gateway %s", key)
if vpcNatEnabled != "true" {
return fmt.Errorf("iptables nat gw not enable")
}
//TODO 检查指定vpc网关是否存在
gw, err := c.vpcNatGatewayLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
//TODO 检查vpc网关指定的vpc是否存在
if _, err := c.vpcsLister.Get(gw.Spec.Vpc); err != nil {
err = fmt.Errorf("failed to get vpc '%s', err: %v", gw.Spec.Vpc, err)
klog.Error(err)
return err
}
//TODO 检查vpc内子网是否存在,会在vpc网关pod里面创建一个以这个子网中ip为网关的网卡
if _, err := c.subnetsLister.Get(gw.Spec.Subnet); err != nil {
err = fmt.Errorf("failed to get subnet '%s', err: %v", gw.Spec.Subnet, err)
klog.Error(err)
return err
}
// check or create statefulset
needToCreate := false
needToUpdate := false
//TODO 判断vpc网关pod的StatefulSets是否存在,如果存在表示是更新操作,如果不存在表示需要创建网关pod
oldSts, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
Get(context.Background(), util.GenNatGwStsName(gw.Name), metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
needToCreate = true
} else {
klog.Error(err)
return err
}
}
//TODO 构建vpc网关pod StatefulSets的配置文件
newSts := c.genNatGwStatefulSet(gw, oldSts.DeepCopy())
if !needToCreate && isVpcNatGwChanged(gw) {
needToUpdate = true
}
switch {
case needToCreate:
//TODO 创建vpc网关pod的StatefulSets。以这个为pod为vpc的出网网关
// if pod create successfully, will add initVpcNatGatewayQueue
if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
Create(context.Background(), newSts, metav1.CreateOptions{}); err != nil {
err := fmt.Errorf("failed to create statefulset '%s', err: %v", newSts.Name, err)
klog.Error(err)
return err
}
if err = c.patchNatGwStatus(key); err != nil {
klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err)
return err
}
return nil
case needToUpdate:
if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
Update(context.Background(), newSts, metav1.UpdateOptions{}); err != nil {
err := fmt.Errorf("failed to update statefulset '%s', err: %v", newSts.Name, err)
klog.Error(err)
return err
}
if err = c.patchNatGwStatus(key); err != nil {
klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err)
return err
}
default:
// check if need to change qos
if gw.Spec.QoSPolicy != gw.Status.QoSPolicy {
if gw.Status.QoSPolicy != "" {
if err = c.execNatGwQoS(gw, gw.Status.QoSPolicy, QoSDel); err != nil {
klog.Errorf("failed to add qos for nat gw %s, %v", key, err)
return err
}
}
if gw.Spec.QoSPolicy != "" {
if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil {
klog.Errorf("failed to del qos for nat gw %s, %v", key, err)
return err
}
}
if err := c.updateCrdNatGwLabels(key, gw.Spec.QoSPolicy); err != nil {
err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err)
klog.Error(err)
return err
}
// if update qos success, will update nat gw status
if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil {
klog.Errorf("failed to patch nat gw qos status for nat gw %s, %v", key, err)
return err
}
}
}
return nil
}
自定义vpc网关pod创建完成后会执行下面函数
func (c *Controller) handleInitVpcNatGw(key string) error {
if vpcNatEnabled != "true" {
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.LockKey(key)
defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
klog.Infof("handle init vpc nat gateway %s", key)
gw, err := c.vpcNatGatewayLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
// subnet for vpc-nat-gw has been checked when create vpc-nat-gw
oriPod, err := c.getNatGwPod(key)
if err != nil {
err := fmt.Errorf("failed to get nat gw %s pod: %v", gw.Name, err)
klog.Error(err)
return err
}
pod := oriPod.DeepCopy()
if pod.Status.Phase != corev1.PodRunning {
time.Sleep(10 * time.Second)
return fmt.Errorf("failed to init vpc nat gateway, pod is not ready")
}
//TODO 判断是否已经初始化了,已经初始化后直接返回
if _, hasInit := pod.Annotations[util.VpcNatGatewayInitAnnotation]; hasInit {
return nil
}
//TODO 没有初始化,开始进行初始化
natGwCreatedAT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
klog.V(3).Infof("nat gw pod '%s' inited at %s", key, natGwCreatedAT)
//TODO 在vpc的网关pod中执行,指定nat规则,后面需要细看
if err = c.execNatGwRules(pod, natGwInit, []string{fmt.Sprintf("%s,%s", c.config.ServiceClusterIPRange, pod.Annotations[util.GatewayAnnotation])}); err != nil {
err = fmt.Errorf("failed to init vpc nat gateway, %v", err)
klog.Error(err)
return err
}
//TODO 在vpc的网关pod中执行,进行qos
if gw.Spec.QoSPolicy != "" {
if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil {
klog.Errorf("failed to add qos for nat gw %s, %v", key, err)
return err
}
}
// if update qos success, will update nat gw status
if gw.Spec.QoSPolicy != gw.Status.QoSPolicy {
if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil {
klog.Errorf("failed to patch status for nat gw %s, %v", key, err)
return err
}
}
if err := c.updateCrdNatGwLabels(gw.Name, gw.Spec.QoSPolicy); err != nil {
err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err)
klog.Error(err)
return err
}
c.updateVpcFloatingIPQueue.Add(key)
c.updateVpcDnatQueue.Add(key)
c.updateVpcSnatQueue.Add(key)
c.updateVpcSubnetQueue.Add(key)
c.updateVpcEipQueue.Add(key)
pod.Annotations[util.VpcNatGatewayInitAnnotation] = "true"
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
klog.Error(err)
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
err := fmt.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)
klog.Error(err)
return err
}
return nil
}
给自定义vpc创建eip和snat规则
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
name: eip1
spec:
v4ip: 192.168.100.230
natGwDp: gw1
---
kind: IptablesSnatRule
apiVersion: kubeovn.io/v1
metadata:
name: snat01
spec:
eip: eip1
internalCIDR: 10.1.1.0/24
下面代码是创建eip的过程
首先会判断eip是否已经存在
判断在EIP资源中是否指定了ip,如果没有指定则根据外部子网随机生成
在自定义vpc网关的pod中创建网卡,设置ip地址
func (c *Controller) handleAddIptablesEip(key string) error {
if vpcNatEnabled != "true" {
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.LockKey(key)
defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
klog.Infof("handle add iptables eip %s", key)
cachedEip, err := c.iptablesEipsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
//TODO eip资源已经ok,直接返回
if cachedEip.Status.Ready && cachedEip.Status.IP != "" {
// already ok
return nil
}
var v4ip, v6ip, mac, eipV4Cidr, v4Gw string
//TODO 如果没有指定,默认是ovn-vpc-external-network
externalNetwork := util.GetExternalNetwork(cachedEip.Spec.ExternalSubnet)
//TODO ovn-vpc-external-network.kube-system
externalProvider := fmt.Sprintf("%s.%s", externalNetwork, attachmentNs)
portName := ovs.PodNameToPortName(cachedEip.Name, cachedEip.Namespace, externalProvider)
if cachedEip.Spec.V4ip != "" {
//TODO 创建eip时指定了ip,从ovn-vpc-external-network中分配ip
if v4ip, v6ip, mac, err = c.acquireStaticEip(cachedEip.Name, cachedEip.Namespace, portName, cachedEip.Spec.V4ip, externalNetwork); err != nil {
klog.Errorf("failed to acquire static eip, err: %v", err)
return err
}
} else {
//TODO 随机生成ip,从ovn-vpc-external-network中分配ip
// Random allocate
if v4ip, v6ip, mac, err = c.acquireEip(cachedEip.Name, cachedEip.Namespace, portName, externalNetwork); err != nil {
klog.Errorf("failed to allocate eip, err: %v", err)
return err
}
}
//TODO ovn-vpc-external-network子网的的eip和掩码
if eipV4Cidr, err = c.getEipV4Cidr(v4ip, externalNetwork); err != nil {
klog.Errorf("failed to get eip cidr, err: %v", err)
return err
}
//TODO ovn-vpc-external-network子网的网关
if v4Gw, _, err = c.GetGwBySubnet(externalNetwork); err != nil {
klog.Errorf("failed to get gw, err: %v", err)
return err
}
//kind: IptablesEIP
//apiVersion: kubeovn.io/v1
// metadata:
// name: eip-static
//spec:
// natGwDp: gw1
// v4ip: 10.0.1.111
//TODO 参数:vpc网关pod名字、ovn-vpc-external-network子网的网关、ovn-vpc-external-network子网的的eip和掩码
//TODO 这个函数给vpc网关pod创建了网卡ip
if err = c.createEipInPod(cachedEip.Spec.NatGwDp, v4Gw, eipV4Cidr); err != nil {
klog.Errorf("failed to create eip '%s' in pod, %v", key, err)
return err
}
if cachedEip.Spec.QoSPolicy != "" {
if err = c.addEipQoS(cachedEip, v4ip); err != nil {
klog.Errorf("failed to add qos '%s' in pod, %v", key, err)
return err
}
}
if err = c.createOrUpdateCrdEip(key, v4ip, v6ip, mac, cachedEip.Spec.NatGwDp, cachedEip.Spec.QoSPolicy, externalNetwork); err != nil {
klog.Errorf("failed to update eip %s, %v", key, err)
return err
}
return nil
}
下面是创建snat规则的函数
获取snat绑定的eip,获取需要进行snat的网段和snat需要设置成的ip地址
进入自定义vpc网关的pod中执行iptables规则,来设置。是直接调用的脚本,后面会列出脚本的内容
// TODO iptables snat规则的配置文件
func (c *Controller) handleAddIptablesSnatRule(key string) error {
if vpcNatEnabled != "true" {
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.LockKey(key)
defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
klog.Infof("handle add iptables snat rule %s", key)
snat, err := c.iptablesSnatRulesLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
//TODO 判断 snat资源是否已经就绪,如果就绪直接退出
if snat.Status.Ready && snat.Status.V4ip != "" {
// already ok
return nil
}
klog.V(3).Infof("handle add iptables snat %s", key)
//TODO 获得做snat规则的源ip地址
eipName := snat.Spec.EIP
if eipName == "" {
return fmt.Errorf("failed to create snat rule, should set eip")
}
//TODO 获取eip对象
eip, err := c.GetEip(eipName)
if err != nil {
klog.Errorf("failed to get eip, %v", err)
return err
}
// create snat
//TODO 获取需要做snat的ip范围
v4Cidr, _ := util.SplitStringIP(snat.Spec.InternalCIDR)
if v4Cidr == "" {
// only support IPv4 snat
err = fmt.Errorf("failed to get snat v4 internal cidr, original cidr is %s", snat.Spec.InternalCIDR)
return err
}
//TODO 进入vpc网关pod 添加snat规则
if err = c.createSnatInPod(eip.Spec.NatGwDp, eip.Status.IP, v4Cidr); err != nil {
klog.Errorf("failed to create snat, %v", err)
return err
}
//TODO 修改iptbales snat对象状态,为就绪状态
if err = c.patchSnatStatus(key, eip.Status.IP, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil {
klog.Errorf("failed to update status for snat %s, %v", key, err)
return err
}
//TODO 修改iptbales snat对象label信息
if err = c.patchSnatLabel(key, eip); err != nil {
klog.Errorf("failed to patch label for snat %s, %v", key, err)
return err
}
if err = c.handleAddIptablesSnatFinalizer(key); err != nil {
klog.Errorf("failed to handle add finalizer for snat, %v", err)
return err
}
//TODO 修改eip对象状态
if err = c.patchEipStatus(eipName, "", "", "", true); err != nil {
// refresh eip nats
klog.Errorf("failed to patch snat use eip %s, %v", key, err)
return err
}
return nil
}
给自定义vpc创建fip
首先说明一下fip的作用,fip全称的floating Ip,用于将一个外网ip和ovn的内部ip一一绑定,这样ovn内部的ip可以直接通过访问fip来进行访问。
从下面的配置中可以看到其实fip就是eip和fiprule的组合而成的一个概念
kind: IptablesEIP
apiVersion: kubeovn.io/v1
metadata:
name: eip3
spec:
v4ip: 192.168.100.232
natGwDp: gw1
---
kind: IptablesFIPRule
apiVersion: kubeovn.io/v1
metadata:
name: fip01
spec:
eip: eip3
internalIp: 10.0.1.5
eip的创建上面已经介绍了,下面主要烤fiprule,主要有两个参数eip和internalIp,这个规则的作用就是将ovn的内部ip地址和一个外部ip地址进行一一绑定
对于internalIp和eip的绑定也是通过在自定义vpc网关的pod中执行iptables规则实现的,下面主要说明iptables规则
# TODO 将internalIp和eip使用iptables规则进行一一对应。
function add_floating_ip() {
# make sure inited
check_inited
for rule in $@
do
arr=(${rule//,/ })
eip=(${arr[0]//\// })
internalIp=${arr[1]}
# check if already exist
iptables-save | grep "EXCLUSIVE_DNAT" | grep -w "\-d $eip/32" | grep "destination" && exit 0
exec_cmd "iptables -t nat -A EXCLUSIVE_DNAT -d $eip -j DNAT --to-destination $internalIp"
exec_cmd "iptables -t nat -A EXCLUSIVE_SNAT -s $internalIp -j SNAT --to-source $eip"
done
}
可以看到主要添加了两条iptables规则,如下所示
EXCLUSIVE_DNAT 和 EXCLUSIVE_SNAT 是kube-ovn自定义的两条链
直接将EXCLUSIVE_DNAT理解为nat表下的PREROUTING链,将EXCLUSIVE_SNAT理解为POSTROUTING链即可,添加了snat和dnat规则。
iptables -t nat -A EXCLUSIVE_DNAT -d $eip -j DNAT --to-destination
iptables -t nat -A EXCLUSIVE_SNAT -s $internalIp -j SNAT --to-source $eip
我们自己也是可以实现的
eg:iptables -t nat -A PREROUTING -d $eip -j DNAT --to-destination $internalIp
iptables -t nat -A POSTROUTING -s $internalIp -j SNAT --to-source $eip
可以分析一下具体的流量走势:
对于自定义vpc下的一个pod需要访问互联网时,首先会将报文转发到自定义vpc网关的pod里面,在流量进入网关pod的协议栈时会首先进入到PREROUTING链,在这个链里面没有匹配到规则,然后会进行路由发现不是进入到本地的因此会进入到FORWARD链同样不会匹配到,再然后进入到POSTROUTING链,因为我们在POSTROUTING链添加了规则能够匹配上,因此会进行snat转换,将源ip设置为eip的地址,因此数据包便能够出去了。
对于互联网想要访问ovn内部网络的internalIp地址时,因为对外服务的ip是eip,因此外部互联网用户想要访问的肯定是eip,当流量进入到自定义vpc网关pod时,会进入到PREROUTING链,因为我们添加了dnat规则,因此能够匹配到然后将目的ip地址设置为internalIp,然后进行路由选择,判定为不是本机ip地址,因此会进入到FORWARD链,没有规则被匹配,再进入到POSTROUTING链没有被匹配,然后会将报文送入到ovn网络中