Skip to content

Commit

Permalink
manage routes for instances in multiple vpcs in a single region (#241)
Browse files Browse the repository at this point in the history
* manage routes for instances in multiple vpcs in a single region

* fix readme and helm chart

* address review comments

* add test for multiple vpcs

* remove vpc from cache if it doesn't exist

* address review comments
  • Loading branch information
rahulait authored Nov 14, 2024
1 parent dc407dd commit e3c2045
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 138 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ When running k8s clusters within VPC, node specific podCIDRs need to be allowed
##### Example usage in values.yaml
```yaml
routeController:
vpcName: <name of VPC>
vpcNames: <comma separated names of VPCs managed by CCM>
clusterCIDR: 10.0.0.0/8
configureCloudRoutes: true
```
Expand Down
43 changes: 9 additions & 34 deletions cloud/linode/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"net"
"os"
"strconv"
"sync"
"time"

"github.com/spf13/pflag"
"golang.org/x/exp/slices"
"k8s.io/client-go/informers"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"

"github.com/linode/linode-cloud-controller-manager/cloud/linode/client"
)
Expand All @@ -35,41 +35,14 @@ var Options struct {
KubeconfigFlag *pflag.Flag
LinodeGoDebug bool
EnableRouteController bool
// Deprecated: use VPCNames instead
VPCName string
VPCNames string
LoadBalancerType string
BGPNodeSelector string
LinodeExternalNetwork *net.IPNet
}

// vpcDetails is set when VPCName options flag is set.
// We use it to list instances running within the VPC if set
type vpcDetails struct {
mu sync.RWMutex
id int
name string
}

func (v *vpcDetails) setDetails(client client.Client, name string) error {
v.mu.Lock()
defer v.mu.Unlock()

id, err := getVPCID(client, Options.VPCName)
if err != nil {
return fmt.Errorf("failed finding VPC ID: %w", err)
}
v.id = id
v.name = name
return nil
}

func (v *vpcDetails) getID() int {
v.mu.Lock()
defer v.mu.Unlock()
return v.id
}

var vpcInfo vpcDetails = vpcDetails{id: 0, name: ""}

type linodeCloud struct {
client client.Client
instances cloudprovider.InstancesV2
Expand Down Expand Up @@ -114,11 +87,13 @@ func newCloud() (cloudprovider.Interface, error) {
linodeClient.SetDebug(true)
}

if Options.VPCName != "" && Options.VPCNames != "" {
return nil, fmt.Errorf("cannot have both vpc-name and vpc-names set")
}

if Options.VPCName != "" {
err := vpcInfo.setDetails(linodeClient, Options.VPCName)
if err != nil {
return nil, fmt.Errorf("failed finding VPC ID: %w", err)
}
klog.Warningf("vpc-name flag is deprecated. Use vpc-names instead")
Options.VPCNames = Options.VPCName
}

routes, err := newRoutes(linodeClient)
Expand Down
15 changes: 10 additions & 5 deletions cloud/linode/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,16 @@ func (nc *nodeCache) refreshInstances(ctx context.Context, client client.Client)

// If running within VPC, find instances and store their ips
vpcNodes := map[int][]string{}
vpcID := vpcInfo.getID()
if vpcID != 0 {
resp, err := client.ListVPCIPAddresses(ctx, vpcID, linodego.NewListOptions(0, ""))
vpcNames := strings.Split(Options.VPCNames, ",")
for _, v := range vpcNames {
vpcName := strings.TrimSpace(v)
if vpcName == "" {
continue
}
resp, err := GetVPCIPAddresses(ctx, client, vpcName)
if err != nil {
return err
klog.Errorf("failed updating instances cache for VPC %s. Error: %s", vpcName, err.Error())
continue
}
for _, r := range resp {
if r.Address == nil {
Expand All @@ -97,7 +102,7 @@ func (nc *nodeCache) refreshInstances(ctx context.Context, client client.Client)
for i, instance := range instances {

// if running within VPC, only store instances in cache which are part of VPC
if vpcID != 0 && len(vpcNodes[instance.ID]) == 0 {
if Options.VPCNames != "" && len(vpcNodes[instance.ID]) == 0 {
continue
}
node := linodeInstance{
Expand Down
126 changes: 68 additions & 58 deletions cloud/linode/route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -19,37 +20,43 @@ import (
)

type routeCache struct {
sync.RWMutex
Mu sync.RWMutex
routes map[int][]linodego.VPCIP
lastUpdate time.Time
ttl time.Duration
}

func (rc *routeCache) refreshRoutes(ctx context.Context, client client.Client) error {
rc.Lock()
defer rc.Unlock()
// RefreshCache checks if cache has expired and updates it accordingly
func (rc *routeCache) refreshRoutes(ctx context.Context, client client.Client) {
rc.Mu.Lock()
defer rc.Mu.Unlock()

if time.Since(rc.lastUpdate) < rc.ttl {
return nil
return
}

vpcNodes := map[int][]linodego.VPCIP{}
vpcID := vpcInfo.getID()
resp, err := client.ListVPCIPAddresses(ctx, vpcID, linodego.NewListOptions(0, ""))
if err != nil {
return err
}
for _, r := range resp {
vpcNodes[r.LinodeID] = append(vpcNodes[r.LinodeID], r)
vpcNames := strings.Split(Options.VPCNames, ",")
for _, v := range vpcNames {
vpcName := strings.TrimSpace(v)
if vpcName == "" {
continue
}
resp, err := GetVPCIPAddresses(ctx, client, vpcName)
if err != nil {
klog.Errorf("failed updating cache for VPC %s. Error: %s", vpcName, err.Error())
continue
}
for _, r := range resp {
vpcNodes[r.LinodeID] = append(vpcNodes[r.LinodeID], r)
}
}

rc.routes = vpcNodes
rc.lastUpdate = time.Now()
return nil
}

type routes struct {
vpcid int
client client.Client
instances *instances
routeCache *routeCache
Expand All @@ -64,13 +71,11 @@ func newRoutes(client client.Client) (cloudprovider.Routes, error) {
}
klog.V(3).Infof("TTL for routeCache set to %d seconds", timeout)

vpcid := vpcInfo.getID()
if Options.EnableRouteController && vpcid == 0 {
return nil, fmt.Errorf("cannot enable route controller as vpc [%s] not found", Options.VPCName)
if Options.EnableRouteController && Options.VPCNames == "" {
return nil, fmt.Errorf("cannot enable route controller as vpc-names is empty")
}

return &routes{
vpcid: vpcid,
client: client,
instances: newInstances(client),
routeCache: &routeCache{
Expand All @@ -82,8 +87,8 @@ func newRoutes(client client.Client) (cloudprovider.Routes, error) {

// instanceRoutesByID returns routes for given instance id
func (r *routes) instanceRoutesByID(id int) ([]linodego.VPCIP, error) {
r.routeCache.RLock()
defer r.routeCache.RUnlock()
r.routeCache.Mu.RLock()
defer r.routeCache.Mu.RUnlock()
instanceRoutes, ok := r.routeCache.routes[id]
if !ok {
return nil, fmt.Errorf("no routes found for instance %d", id)
Expand All @@ -94,10 +99,7 @@ func (r *routes) instanceRoutesByID(id int) ([]linodego.VPCIP, error) {
// getInstanceRoutes returns routes for given instance id
// It refreshes routeCache if it has expired
func (r *routes) getInstanceRoutes(ctx context.Context, id int) ([]linodego.VPCIP, error) {
if err := r.routeCache.refreshRoutes(ctx, r.client); err != nil {
return nil, err
}

r.routeCache.refreshRoutes(ctx, r.client)
return r.instanceRoutesByID(id)
}

Expand Down Expand Up @@ -135,22 +137,25 @@ func (r *routes) CreateRoute(ctx context.Context, clusterName string, nameHint s
// check already configured routes
intfRoutes := []string{}
intfVPCIP := linodego.VPCIP{}
for _, ir := range instanceRoutes {
if ir.VPCID != r.vpcid {
continue
}

if ir.Address != nil {
intfVPCIP = ir
continue
}
for _, vpcid := range GetAllVPCIDs() {
for _, ir := range instanceRoutes {
if ir.VPCID != vpcid {
continue
}

if ir.AddressRange != nil && *ir.AddressRange == route.DestinationCIDR {
klog.V(4).Infof("Route already exists for node %s", route.TargetNode)
return nil
}
if ir.Address != nil {
intfVPCIP = ir
continue
}

intfRoutes = append(intfRoutes, *ir.AddressRange)
if ir.AddressRange != nil && *ir.AddressRange == route.DestinationCIDR {
klog.V(4).Infof("Route already exists for node %s", route.TargetNode)
return nil
}

intfRoutes = append(intfRoutes, *ir.AddressRange)
}
}

if intfVPCIP.Address == nil {
Expand Down Expand Up @@ -185,21 +190,24 @@ func (r *routes) DeleteRoute(ctx context.Context, clusterName string, route *clo
// check already configured routes
intfRoutes := []string{}
intfVPCIP := linodego.VPCIP{}
for _, ir := range instanceRoutes {
if ir.VPCID != r.vpcid {
continue
}

if ir.Address != nil {
intfVPCIP = ir
continue
}
for _, vpcid := range GetAllVPCIDs() {
for _, ir := range instanceRoutes {
if ir.VPCID != vpcid {
continue
}

if ir.AddressRange != nil && *ir.AddressRange == route.DestinationCIDR {
continue
}
if ir.Address != nil {
intfVPCIP = ir
continue
}

if ir.AddressRange != nil && *ir.AddressRange == route.DestinationCIDR {
continue
}

intfRoutes = append(intfRoutes, *ir.AddressRange)
intfRoutes = append(intfRoutes, *ir.AddressRange)
}
}

if intfVPCIP.Address == nil {
Expand Down Expand Up @@ -234,17 +242,19 @@ func (r *routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpr
}

// check for configured routes
for _, ir := range instanceRoutes {
if ir.Address != nil || ir.VPCID != r.vpcid {
continue
}
for _, vpcid := range GetAllVPCIDs() {
for _, ir := range instanceRoutes {
if ir.Address != nil || ir.VPCID != vpcid {
continue
}

if ir.AddressRange != nil {
route := &cloudprovider.Route{
TargetNode: types.NodeName(instance.Label),
DestinationCIDR: *ir.AddressRange,
if ir.AddressRange != nil {
route := &cloudprovider.Route{
TargetNode: types.NodeName(instance.Label),
DestinationCIDR: *ir.AddressRange,
}
configuredRoutes = append(configuredRoutes, route)
}
configuredRoutes = append(configuredRoutes, route)
}
}
}
Expand Down
Loading

0 comments on commit e3c2045

Please sign in to comment.