Skip to content

Commit

Permalink
Merge branch 'main' into fix-e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
luthermonson authored Jan 9, 2024
2 parents 0332dfd + 5103be6 commit e783bbd
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 38 deletions.
33 changes: 33 additions & 0 deletions cloud/linode/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package linode

const (
// annLinodeDefaultProtocol is the annotation used to specify the default protocol
// for Linode load balancers. Options are tcp, http and https. Defaults to tcp.
annLinodeDefaultProtocol = "service.beta.kubernetes.io/linode-loadbalancer-default-protocol"
annLinodePortConfigPrefix = "service.beta.kubernetes.io/linode-loadbalancer-port-"
annLinodeDefaultProxyProtocol = "service.beta.kubernetes.io/linode-loadbalancer-default-proxy-protocol"

annLinodeCheckPath = "service.beta.kubernetes.io/linode-loadbalancer-check-path"
annLinodeCheckBody = "service.beta.kubernetes.io/linode-loadbalancer-check-body"
annLinodeHealthCheckType = "service.beta.kubernetes.io/linode-loadbalancer-check-type"

annLinodeHealthCheckInterval = "service.beta.kubernetes.io/linode-loadbalancer-check-interval"
annLinodeHealthCheckTimeout = "service.beta.kubernetes.io/linode-loadbalancer-check-timeout"
annLinodeHealthCheckAttempts = "service.beta.kubernetes.io/linode-loadbalancer-check-attempts"
annLinodeHealthCheckPassive = "service.beta.kubernetes.io/linode-loadbalancer-check-passive"

// annLinodeThrottle is the annotation specifying the value of the Client Connection
// Throttle, which limits the number of subsequent new connections per second from the
// same client IP. Options are a number between 1-20, or 0 to disable. Defaults to 20.
annLinodeThrottle = "service.beta.kubernetes.io/linode-loadbalancer-throttle"

annLinodeLoadBalancerPreserve = "service.beta.kubernetes.io/linode-loadbalancer-preserve"
annLinodeNodeBalancerID = "service.beta.kubernetes.io/linode-loadbalancer-nodebalancer-id"

annLinodeHostnameOnlyIngress = "service.beta.kubernetes.io/linode-loadbalancer-hostname-only-ingress"
annLinodeLoadBalancerTags = "service.beta.kubernetes.io/linode-loadbalancer-tags"
annLinodeCloudFirewallID = "service.beta.kubernetes.io/linode-loadbalancer-firewall-id"

annLinodeNodePrivateIP = "node.k8s.linode.com/private-ip"
annLinodeHostUUID = "node.k8s.linode.com/host-uuid"
)
33 changes: 33 additions & 0 deletions cloud/linode/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package linode

import (
"context"
"net/url"
"regexp"
"strings"

"github.com/linode/linodego"
)
Expand All @@ -27,3 +30,33 @@ type Client interface {

// linodego.Client implements Client
var _ Client = (*linodego.Client)(nil)

func newLinodeClient(token, ua, apiURL string) (*linodego.Client, error) {
linodeClient := linodego.NewClient(nil)
linodeClient.SetUserAgent(ua)
linodeClient.SetToken(token)

// Validate apiURL
parsedURL, err := url.Parse(apiURL)
if err != nil {
return nil, err
}

validatedURL := &url.URL{
Host: parsedURL.Host,
Scheme: parsedURL.Scheme,
}

linodeClient.SetBaseURL(validatedURL.String())

version := ""
matches := regexp.MustCompile(`/v\d+`).FindAllString(parsedURL.Path, -1)

if len(matches) > 0 {
version = strings.Trim(matches[len(matches)-1], "/")
}

linodeClient.SetAPIVersion(version)

return &linodeClient, nil
}
22 changes: 16 additions & 6 deletions cloud/linode/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
ProviderName = "linode"
accessTokenEnv = "LINODE_API_TOKEN"
regionEnv = "LINODE_REGION"
urlEnv = "LINODE_URL"
)

// Options is a configuration object for this cloudprovider implementation.
Expand Down Expand Up @@ -52,28 +53,37 @@ func newCloud() (cloudprovider.Interface, error) {
return nil, fmt.Errorf("%s must be set in the environment (use a k8s secret)", regionEnv)
}

linodeClient := linodego.NewClient(nil)
linodeClient.SetToken(apiToken)
url := os.Getenv(urlEnv)
ua := fmt.Sprintf("linode-cloud-controller-manager %s", linodego.DefaultUserAgent)

linodeClient, err := newLinodeClient(apiToken, ua, url)
if err != nil {
return nil, fmt.Errorf("client was not created succesfully: %w", err)
}

if Options.LinodeGoDebug {
linodeClient.SetDebug(true)
}
linodeClient.SetUserAgent(fmt.Sprintf("linode-cloud-controller-manager %s", linodego.DefaultUserAgent))

// Return struct that satisfies cloudprovider.Interface
return &linodeCloud{
client: &linodeClient,
instances: newInstances(&linodeClient),
loadbalancers: newLoadbalancers(&linodeClient, region),
client: linodeClient,
instances: newInstances(linodeClient),
loadbalancers: newLoadbalancers(linodeClient, region),
}, nil
}

func (c *linodeCloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stopCh <-chan struct{}) {
kubeclient := clientBuilder.ClientOrDie("linode-shared-informers")
sharedInformer := informers.NewSharedInformerFactory(kubeclient, 0)
serviceInformer := sharedInformer.Core().V1().Services()
nodeInformer := sharedInformer.Core().V1().Nodes()

serviceController := newServiceController(c.loadbalancers.(*loadbalancers), serviceInformer)
go serviceController.Run(stopCh)

nodeController := newNodeController(kubeclient, c.client, nodeInformer)
go nodeController.Run(stopCh)
}

func (c *linodeCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
Expand Down
2 changes: 1 addition & 1 deletion cloud/linode/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type instances struct {
nodeCache *nodeCache
}

func newInstances(client Client) cloudprovider.InstancesV2 {
func newInstances(client Client) *instances {
var timeout int
if raw, ok := os.LookupEnv("LINODE_INSTANCE_CACHE_TTL"); ok {
timeout, _ = strconv.Atoi(raw)
Expand Down
31 changes: 0 additions & 31 deletions cloud/linode/loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,6 @@ import (
"github.com/linode/linodego"
)

const (
// annLinodeDefaultProtocol is the annotation used to specify the default protocol
// for Linode load balancers. Options are tcp, http and https. Defaults to tcp.
annLinodeDefaultProtocol = "service.beta.kubernetes.io/linode-loadbalancer-default-protocol"
annLinodePortConfigPrefix = "service.beta.kubernetes.io/linode-loadbalancer-port-"
annLinodeDefaultProxyProtocol = "service.beta.kubernetes.io/linode-loadbalancer-default-proxy-protocol"

annLinodeCheckPath = "service.beta.kubernetes.io/linode-loadbalancer-check-path"
annLinodeCheckBody = "service.beta.kubernetes.io/linode-loadbalancer-check-body"
annLinodeHealthCheckType = "service.beta.kubernetes.io/linode-loadbalancer-check-type"

annLinodeHealthCheckInterval = "service.beta.kubernetes.io/linode-loadbalancer-check-interval"
annLinodeHealthCheckTimeout = "service.beta.kubernetes.io/linode-loadbalancer-check-timeout"
annLinodeHealthCheckAttempts = "service.beta.kubernetes.io/linode-loadbalancer-check-attempts"
annLinodeHealthCheckPassive = "service.beta.kubernetes.io/linode-loadbalancer-check-passive"

// annLinodeThrottle is the annotation specifying the value of the Client Connection
// Throttle, which limits the number of subsequent new connections per second from the
// same client IP. Options are a number between 1-20, or 0 to disable. Defaults to 20.
annLinodeThrottle = "service.beta.kubernetes.io/linode-loadbalancer-throttle"

annLinodeLoadBalancerPreserve = "service.beta.kubernetes.io/linode-loadbalancer-preserve"
annLinodeNodeBalancerID = "service.beta.kubernetes.io/linode-loadbalancer-nodebalancer-id"

annLinodeHostnameOnlyIngress = "service.beta.kubernetes.io/linode-loadbalancer-hostname-only-ingress"
annLinodeLoadBalancerTags = "service.beta.kubernetes.io/linode-loadbalancer-tags"
annLinodeCloudFirewallID = "service.beta.kubernetes.io/linode-loadbalancer-firewall-id"

annLinodeNodePrivateIP = "node.k8s.linode.com/private-ip"
)

var errNoNodesAvailable = errors.New("no nodes available for nodebalancer")

type lbNotFoundError struct {
Expand Down
124 changes: 124 additions & 0 deletions cloud/linode/node_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package linode

import (
"context"
"net/http"
"time"

"github.com/appscode/go/wait"
"github.com/linode/linodego"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

type nodeController struct {
client Client
instances *instances
kubeclient kubernetes.Interface
informer v1informers.NodeInformer

queue workqueue.DelayingInterface
}

func newNodeController(kubeclient kubernetes.Interface, client Client, informer v1informers.NodeInformer) *nodeController {
return &nodeController{
client: client,
instances: newInstances(client),
kubeclient: kubeclient,
informer: informer,
queue: workqueue.NewDelayingQueue(),
}
}

func (s *nodeController) Run(stopCh <-chan struct{}) {
s.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
return
}

klog.Infof("NodeController will handle newly created node (%s) metadata", node.Name)
s.queue.Add(node)
},
UpdateFunc: func(_, new interface{}) {
node, ok := new.(*v1.Node)
if !ok {
return
}

klog.Infof("NodeController will handle updated node (%s) metadata", node.Name)
s.queue.Add(node)
},
})

go wait.Until(s.worker, time.Second, stopCh)
s.informer.Informer().Run(stopCh)
}

// worker runs a worker thread that dequeues new or modified nodes and processes
// metadata (host UUID) on each of them.
func (s *nodeController) worker() {
for s.processNext() {
}
}

func (s *nodeController) processNext() bool {
key, quit := s.queue.Get()
if quit {
return false
}
defer s.queue.Done(key)

node, ok := key.(*v1.Node)
if !ok {
klog.Errorf("expected dequeued key to be of type *v1.Node but got %T", node)
return true
}

err := s.handleNodeAdded(context.TODO(), node)
switch deleteErr := err.(type) {
case nil:
break

case *linodego.Error:
if deleteErr.Code >= http.StatusInternalServerError || deleteErr.Code == http.StatusTooManyRequests {
klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", node.Name, err)
s.queue.AddAfter(node, retryInterval)
}

default:
klog.Errorf("failed to add metadata for node (%s); will not retry: %s", node.Name, err)
}
return true
}

func (s *nodeController) handleNodeAdded(ctx context.Context, node *v1.Node) error {
klog.Infof("NodeController handling node (%s) addition", node.Name)

linode, err := s.instances.lookupLinode(ctx, node)
if err != nil {
klog.Infof("instance lookup error: %s", err.Error())
return err
}

uuid, ok := node.Labels[annLinodeHostUUID]
if ok && uuid == linode.HostUUID {
return nil
}

node.Labels[annLinodeHostUUID] = linode.HostUUID

_, err = s.kubeclient.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})
if err != nil {
klog.Infof("node update error: %s", err.Error())
return err
}

return nil
}

0 comments on commit e783bbd

Please sign in to comment.