Skip to content

Commit

Permalink
feat: add Kubernetes-NodePort network plugin (#138)
Browse files Browse the repository at this point in the history
Signed-off-by: ChrisLiu <[email protected]>
  • Loading branch information
chrisliu1995 authored Apr 26, 2024
1 parent 56d9071 commit d547f61
Show file tree
Hide file tree
Showing 2 changed files with 442 additions and 0 deletions.
259 changes: 259 additions & 0 deletions cloudprovider/kubernetes/nodeport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
Copyright 2024 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubernetes

import (
"context"
gamekruiseiov1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1"
"github.com/openkruise/kruise-game/cloudprovider"
cperrors "github.com/openkruise/kruise-game/cloudprovider/errors"
"github.com/openkruise/kruise-game/cloudprovider/utils"
"github.com/openkruise/kruise-game/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
)

const (
NodePortNetwork = "Kubernetes-NodePort"

PortProtocolsConfigName = "PortProtocols"
)

type NodePortPlugin struct {
}

func (n *NodePortPlugin) Name() string {
return NodePortNetwork
}

func (n *NodePortPlugin) Alias() string {
return ""
}

func (n *NodePortPlugin) Init(client client.Client, options cloudprovider.CloudProviderOptions, ctx context.Context) error {
return nil
}

func (n *NodePortPlugin) OnPodAdded(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) {
return pod, nil
}

func (n *NodePortPlugin) OnPodUpdated(client client.Client, pod *corev1.Pod, ctx context.Context) (*corev1.Pod, cperrors.PluginError) {
networkManager := utils.NewNetworkManager(pod, client)

networkStatus, _ := networkManager.GetNetworkStatus()
networkConfig := networkManager.GetNetworkConfig()
npc, err := parseNodePortConfig(networkConfig)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.ParameterError, err.Error())
}

if networkStatus == nil {
pod, err := networkManager.UpdateNetworkStatus(gamekruiseiov1alpha1.NetworkStatus{
CurrentNetworkState: gamekruiseiov1alpha1.NetworkNotReady,
}, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}

// get svc
svc := &corev1.Service{}
err = client.Get(ctx, types.NamespacedName{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
}, svc)
if err != nil {
if errors.IsNotFound(err) {
return pod, cperrors.ToPluginError(client.Create(ctx, consNodePortSvc(npc, pod, client, ctx)), cperrors.ApiCallError)
}
return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error())
}

// update svc
if util.GetHash(npc) != svc.GetAnnotations()[ServiceHashKey] {
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.InternalError, err.Error())
}
return pod, cperrors.ToPluginError(client.Update(ctx, consNodePortSvc(npc, pod, client, ctx)), cperrors.ApiCallError)
}

// allow not ready containers
if util.IsAllowNotReadyContainers(networkManager.GetNetworkConfig()) {
toUpDateSvc, err := utils.AllowNotReadyContainers(client, ctx, pod, svc, false)
if err != nil {
return pod, err
}

if toUpDateSvc {
err := client.Update(ctx, svc)
if err != nil {
return pod, cperrors.ToPluginError(err, cperrors.ApiCallError)
}
}
}

// network ready
node := &corev1.Node{}
err = client.Get(ctx, types.NamespacedName{
Name: pod.Spec.NodeName,
}, node)
if err != nil {
return pod, cperrors.NewPluginError(cperrors.ApiCallError, err.Error())
}

if pod.Status.PodIP == "" {
// Pod IP not exist, Network NotReady
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}

internalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0)
externalAddresses := make([]gamekruiseiov1alpha1.NetworkAddress, 0)
for _, port := range svc.Spec.Ports {
instrIPort := port.TargetPort
if port.NodePort == 0 {
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkNotReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}
instrEPort := intstr.FromInt(int(port.NodePort))
internalAddress := gamekruiseiov1alpha1.NetworkAddress{
IP: pod.Status.PodIP,
Ports: []gamekruiseiov1alpha1.NetworkPort{
{
Name: instrIPort.String(),
Port: &instrIPort,
Protocol: port.Protocol,
},
},
}
externalAddress := gamekruiseiov1alpha1.NetworkAddress{
IP: getAddress(node),
Ports: []gamekruiseiov1alpha1.NetworkPort{
{
Name: instrIPort.String(),
Port: &instrEPort,
Protocol: port.Protocol,
},
},
}
internalAddresses = append(internalAddresses, internalAddress)
externalAddresses = append(externalAddresses, externalAddress)
}
networkStatus.InternalAddresses = internalAddresses
networkStatus.ExternalAddresses = externalAddresses
networkStatus.CurrentNetworkState = gamekruiseiov1alpha1.NetworkReady
pod, err = networkManager.UpdateNetworkStatus(*networkStatus, pod)
return pod, cperrors.ToPluginError(err, cperrors.InternalError)
}

func (n *NodePortPlugin) OnPodDeleted(client client.Client, pod *corev1.Pod, ctx context.Context) cperrors.PluginError {
return nil
}

func init() {
kubernetesProvider.registerPlugin(&NodePortPlugin{})
}

type nodePortConfig struct {
ports []int
protocols []corev1.Protocol
isFixed bool
}

func parseNodePortConfig(conf []gamekruiseiov1alpha1.NetworkConfParams) (*nodePortConfig, error) {
var ports []int
var protocols []corev1.Protocol
isFixed := false

for _, c := range conf {
switch c.Name {
case PortProtocolsConfigName:
ports, protocols = parsePortProtocols(c.Value)

case FixedKey:
var err error
isFixed, err = strconv.ParseBool(c.Value)
if err != nil {
return nil, err
}
}
}
return &nodePortConfig{
ports: ports,
protocols: protocols,
isFixed: isFixed,
}, nil
}

func parsePortProtocols(value string) ([]int, []corev1.Protocol) {
ports := make([]int, 0)
protocols := make([]corev1.Protocol, 0)
for _, pp := range strings.Split(value, ",") {
ppSlice := strings.Split(pp, "/")
port, err := strconv.Atoi(ppSlice[0])
if err != nil {
continue
}
ports = append(ports, port)
if len(ppSlice) != 2 {
protocols = append(protocols, corev1.ProtocolTCP)
} else {
protocols = append(protocols, corev1.Protocol(ppSlice[1]))
}
}
return ports, protocols
}

func consNodePortSvc(npc *nodePortConfig, pod *corev1.Pod, c client.Client, ctx context.Context) *corev1.Service {
svcPorts := make([]corev1.ServicePort, 0)
for i := 0; i < len(npc.ports); i++ {
svcPorts = append(svcPorts, corev1.ServicePort{
Name: strconv.Itoa(npc.ports[i]),
Port: int32(npc.ports[i]),
Protocol: npc.protocols[i],
TargetPort: intstr.FromInt(npc.ports[i]),
})
}

svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
Annotations: map[string]string{
ServiceHashKey: util.GetHash(npc),
},
OwnerReferences: consOwnerReference(c, ctx, pod, npc.isFixed),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Selector: map[string]string{
SvcSelectorKey: pod.GetName(),
},
Ports: svcPorts,
},
}
return svc
}
Loading

0 comments on commit d547f61

Please sign in to comment.