Skip to content

Commit

Permalink
Merge pull request #1 from rahulait/route-controller-multiple-vpcs
Browse files Browse the repository at this point in the history
manage routes for instances in multiple vpcs in a single region
  • Loading branch information
rahulait authored Nov 5, 2024
2 parents 1b046af + c04ffe9 commit d9da7e9
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 128 deletions.
43 changes: 9 additions & 34 deletions cloud/linode/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"net"
"os"
"strconv"
"sync"
"time"

"github.com/spf13/pflag"
"golang.org/x/exp/slices"
"k8s.io/client-go/informers"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"

"github.com/linode/linode-cloud-controller-manager/cloud/linode/client"
)
Expand All @@ -35,41 +35,14 @@ var Options struct {
KubeconfigFlag *pflag.Flag
LinodeGoDebug bool
EnableRouteController bool
// deprecated: use VPCNames instead
VPCName string
VPCNames string
LoadBalancerType string
BGPNodeSelector string
LinodeExternalNetwork *net.IPNet
}

// vpcDetails is set when VPCName options flag is set.
// We use it to list instances running within the VPC if set
type vpcDetails struct {
mu sync.RWMutex
id int
name string
}

func (v *vpcDetails) setDetails(client client.Client, name string) error {
v.mu.Lock()
defer v.mu.Unlock()

id, err := getVPCID(client, Options.VPCName)
if err != nil {
return fmt.Errorf("failed finding VPC ID: %w", err)
}
v.id = id
v.name = name
return nil
}

func (v *vpcDetails) getID() int {
v.mu.Lock()
defer v.mu.Unlock()
return v.id
}

var vpcInfo vpcDetails = vpcDetails{id: 0, name: ""}

type linodeCloud struct {
client client.Client
instances cloudprovider.InstancesV2
Expand Down Expand Up @@ -114,11 +87,13 @@ func newCloud() (cloudprovider.Interface, error) {
linodeClient.SetDebug(true)
}

if Options.VPCName != "" && Options.VPCNames != "" {
return nil, fmt.Errorf("cannot have both vpc-name and vpc-names set")
}

if Options.VPCName != "" {
err := vpcInfo.setDetails(linodeClient, Options.VPCName)
if err != nil {
return nil, fmt.Errorf("failed finding VPC ID: %w", err)
}
klog.Warningf("vpc-name flag is deprecated. Use vpc-names instead")
Options.VPCNames = Options.VPCName
}

routes, err := newRoutes(linodeClient)
Expand Down
29 changes: 20 additions & 9 deletions cloud/linode/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,36 @@ func (nc *nodeCache) refreshInstances(ctx context.Context, client client.Client)

// If running within VPC, find instances and store their ips
vpcNodes := map[int][]string{}
vpcID := vpcInfo.getID()
if vpcID != 0 {
resp, err := client.ListVPCIPAddresses(ctx, vpcID, linodego.NewListOptions(0, ""))
vpcNames := strings.Split(Options.VPCNames, ",")
for _, v := range vpcNames {
vpcName := strings.TrimSpace(v)
if vpcName == "" {
continue
}
vpcID, err := GetVPCID(client, strings.TrimSpace(vpcName))
if err != nil {
return err
klog.Errorf("failed updating instances cache for VPC %s. Error: %s", vpcName, err.Error())
continue
}
for _, r := range resp {
if r.Address == nil {
continue
if vpcID != 0 {
resp, err := client.ListVPCIPAddresses(ctx, vpcID, linodego.NewListOptions(0, ""))
if err != nil {
return err
}
for _, r := range resp {
if r.Address == nil {
continue
}
vpcNodes[r.LinodeID] = append(vpcNodes[r.LinodeID], *r.Address)
}
vpcNodes[r.LinodeID] = append(vpcNodes[r.LinodeID], *r.Address)
}
}

newNodes := make(map[int]linodeInstance, len(instances))
for i, instance := range instances {

// if running within VPC, only store instances in cache which are part of VPC
if vpcID != 0 && len(vpcNodes[instance.ID]) == 0 {
if Options.VPCNames != "" && len(vpcNodes[instance.ID]) == 0 {
continue
}
node := linodeInstance{
Expand Down
120 changes: 69 additions & 51 deletions cloud/linode/route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -19,28 +20,40 @@ import (
)

type routeCache struct {
sync.RWMutex
Mu sync.RWMutex
routes map[int][]linodego.VPCIP
lastUpdate time.Time
ttl time.Duration
}

// RefreshCache checks if cache has expired and updates it accordingly
func (rc *routeCache) refreshRoutes(ctx context.Context, client client.Client) error {
rc.Lock()
defer rc.Unlock()
rc.Mu.Lock()
defer rc.Mu.Unlock()

if time.Since(rc.lastUpdate) < rc.ttl {
return nil
}

vpcNodes := map[int][]linodego.VPCIP{}
vpcID := vpcInfo.getID()
resp, err := client.ListVPCIPAddresses(ctx, vpcID, linodego.NewListOptions(0, ""))
if err != nil {
return err
}
for _, r := range resp {
vpcNodes[r.LinodeID] = append(vpcNodes[r.LinodeID], r)
vpcNames := strings.Split(Options.VPCNames, ",")
for _, v := range vpcNames {
vpcName := strings.TrimSpace(v)
if vpcName == "" {
continue
}
vpcID, err := GetVPCID(client, strings.TrimSpace(vpcName))
if err != nil {
klog.Errorf("failed updating cache for VPC %s. Error: %s", vpcName, err.Error())
continue
}
resp, err := client.ListVPCIPAddresses(ctx, vpcID, linodego.NewListOptions(0, ""))
if err != nil {
return err
}
for _, r := range resp {
vpcNodes[r.LinodeID] = append(vpcNodes[r.LinodeID], r)
}
}

rc.routes = vpcNodes
Expand All @@ -49,7 +62,6 @@ func (rc *routeCache) refreshRoutes(ctx context.Context, client client.Client) e
}

type routes struct {
vpcid int
client client.Client
instances *instances
routeCache *routeCache
Expand All @@ -64,13 +76,11 @@ func newRoutes(client client.Client) (cloudprovider.Routes, error) {
}
klog.V(3).Infof("TTL for routeCache set to %d seconds", timeout)

vpcid := vpcInfo.getID()
if Options.EnableRouteController && vpcid == 0 {
return nil, fmt.Errorf("cannot enable route controller as vpc [%s] not found", Options.VPCName)
if Options.EnableRouteController && Options.VPCNames == "" {
return nil, fmt.Errorf("cannot enable route controller as vpc-names is empty")
}

return &routes{
vpcid: vpcid,
client: client,
instances: newInstances(client),
routeCache: &routeCache{
Expand All @@ -82,8 +92,8 @@ func newRoutes(client client.Client) (cloudprovider.Routes, error) {

// instanceRoutesByID returns routes for given instance id
func (r *routes) instanceRoutesByID(id int) ([]linodego.VPCIP, error) {
r.routeCache.RLock()
defer r.routeCache.RUnlock()
r.routeCache.Mu.RLock()
defer r.routeCache.Mu.RUnlock()
instanceRoutes, ok := r.routeCache.routes[id]
if !ok {
return nil, fmt.Errorf("no routes found for instance %d", id)
Expand Down Expand Up @@ -135,22 +145,25 @@ func (r *routes) CreateRoute(ctx context.Context, clusterName string, nameHint s
// check already configured routes
intfRoutes := []string{}
intfVPCIP := linodego.VPCIP{}
for _, ir := range instanceRoutes {
if ir.VPCID != r.vpcid {
continue
}

if ir.Address != nil {
intfVPCIP = ir
continue
}
for _, vpcid := range GetAllVPCIDs() {
for _, ir := range instanceRoutes {
if ir.VPCID != vpcid {
continue
}

if ir.AddressRange != nil && *ir.AddressRange == route.DestinationCIDR {
klog.V(4).Infof("Route already exists for node %s", route.TargetNode)
return nil
}
if ir.Address != nil {
intfVPCIP = ir
continue
}

intfRoutes = append(intfRoutes, *ir.AddressRange)
if ir.AddressRange != nil && *ir.AddressRange == route.DestinationCIDR {
klog.V(4).Infof("Route already exists for node %s", route.TargetNode)
return nil
}

intfRoutes = append(intfRoutes, *ir.AddressRange)
}
}

if intfVPCIP.Address == nil {
Expand Down Expand Up @@ -185,21 +198,24 @@ func (r *routes) DeleteRoute(ctx context.Context, clusterName string, route *clo
// check already configured routes
intfRoutes := []string{}
intfVPCIP := linodego.VPCIP{}
for _, ir := range instanceRoutes {
if ir.VPCID != r.vpcid {
continue
}

if ir.Address != nil {
intfVPCIP = ir
continue
}
for _, vpcid := range GetAllVPCIDs() {
for _, ir := range instanceRoutes {
if ir.VPCID != vpcid {
continue
}

if ir.AddressRange != nil && *ir.AddressRange == route.DestinationCIDR {
continue
}
if ir.Address != nil {
intfVPCIP = ir
continue
}

if ir.AddressRange != nil && *ir.AddressRange == route.DestinationCIDR {
continue
}

intfRoutes = append(intfRoutes, *ir.AddressRange)
intfRoutes = append(intfRoutes, *ir.AddressRange)
}
}

if intfVPCIP.Address == nil {
Expand Down Expand Up @@ -234,17 +250,19 @@ func (r *routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpr
}

// check for configured routes
for _, ir := range instanceRoutes {
if ir.Address != nil || ir.VPCID != r.vpcid {
continue
}
for _, vpcid := range GetAllVPCIDs() {
for _, ir := range instanceRoutes {
if ir.Address != nil || ir.VPCID != vpcid {
continue
}

if ir.AddressRange != nil {
route := &cloudprovider.Route{
TargetNode: types.NodeName(instance.Label),
DestinationCIDR: *ir.AddressRange,
if ir.AddressRange != nil {
route := &cloudprovider.Route{
TargetNode: types.NodeName(instance.Label),
DestinationCIDR: *ir.AddressRange,
}
configuredRoutes = append(configuredRoutes, route)
}
configuredRoutes = append(configuredRoutes, route)
}
}
}
Expand Down
Loading

0 comments on commit d9da7e9

Please sign in to comment.