Skip to content

Commit

Permalink
Implement CPLB userspace reverse proxy LB
Browse files Browse the repository at this point in the history
IPVS is problematic for many reasons, implement a userspace load
balancer which should get the job done and should be far less
problematic.

Signed-off-by: Juan-Luis de Sousa-Valadas Castaño <[email protected]>
  • Loading branch information
juanluisvaladas committed Nov 20, 2024
1 parent cc6b62b commit 7227791
Show file tree
Hide file tree
Showing 10 changed files with 399 additions and 44 deletions.
7 changes: 7 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/k0sproject/k0s/pkg/component/controller/cplb"
"github.com/k0sproject/k0s/pkg/component/controller/leaderelector"
"github.com/k0sproject/k0s/pkg/component/controller/workerconfig"
"github.com/k0sproject/k0s/pkg/component/iptables"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/component/prober"
"github.com/k0sproject/k0s/pkg/component/status"
Expand Down Expand Up @@ -238,6 +239,11 @@ func (c *command) start(ctx context.Context) error {
// Assume a single active controller during startup
numActiveControllers := value.NewLatest[uint](1)

nodeComponents.Add(ctx, &iptables.Component{
IPTablesMode: c.WorkerOptions.IPTablesMode,
BinDir: c.K0sVars.BinDir,
})

if cplbCfg := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplbCfg != nil && cplbCfg.Enabled {
if c.SingleNode {
return errors.New("control plane load balancing cannot be used in a single-node cluster")
Expand Down Expand Up @@ -663,6 +669,7 @@ func (c *command) startWorker(ctx context.Context, profile string, nodeConfig *v
wc.TokenArg = bootstrapConfig
wc.WorkerProfile = profile
wc.Labels = append(wc.Labels, fields.OneTermEqualSelector(constant.K0SNodeRoleLabel, "control-plane").String())
wc.DisableIPTables = true
if !c.SingleNode && !c.NoTaints {
key := path.Join(constant.NodeRoleLabelNamespace, "master")
taint := fields.OneTermEqualSelector(key, ":NoSchedule")
Expand Down
11 changes: 6 additions & 5 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ func (c *Command) Start(ctx context.Context) error {
c.WorkerProfile = "default-windows"
}

componentManager.Add(ctx, &iptables.Component{
IPTablesMode: c.WorkerOptions.IPTablesMode,
BinDir: c.K0sVars.BinDir,
})

if !c.DisableIPTables {
componentManager.Add(ctx, &iptables.Component{
IPTablesMode: c.WorkerOptions.IPTablesMode,
BinDir: c.K0sVars.BinDir,
})
}
componentManager.Add(ctx,
&worker.Kubelet{
CRISocket: c.CriSocket,
Expand Down
3 changes: 2 additions & 1 deletion inttest/Makefile.variables
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ smoketests := \
check-cnichange \
check-configchange \
check-containerdimports \
check-cplb \
check-cplb-ipvs \
check-cplb-userspace \
check-ctr \
check-custom-cidrs \
check-customca \
Expand Down
16 changes: 8 additions & 8 deletions inttest/cplb/cplb_test.go → inttest/cplb-ipvs/cplbipvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/suite"
)

type keepalivedSuite struct {
type CPLBIPVSSuite struct {
common.BootlooseSuite
}

Expand All @@ -50,7 +50,7 @@ spec:

// SetupTest prepares the controller and filesystem, getting it into a consistent
// state which we can run tests against.
func (s *keepalivedSuite) TestK0sGetsUp() {
func (s *CPLBIPVSSuite) TestK0sGetsUp() {
lb := s.getLBAddress()
ctx := s.Context()
var joinToken string
Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *keepalivedSuite) TestK0sGetsUp() {
// getLBAddress returns the IP address of the controller 0 and it adds 100 to
// the last octet unless it's bigger or equal to 154, in which case it
// subtracts 100. Theoretically this could result in an invalid IP address.
func (s *keepalivedSuite) getLBAddress() string {
func (s *CPLBIPVSSuite) getLBAddress() string {
ip := s.GetIPAddress(s.ControllerNode(0))
parts := strings.Split(ip, ".")
if len(parts) != 4 {
Expand All @@ -130,7 +130,7 @@ func (s *keepalivedSuite) getLBAddress() string {

// validateRealServers checks that the real servers are present in the
// ipvsadm output.
func (s *keepalivedSuite) validateRealServers(ctx context.Context, node string, vip string) {
func (s *CPLBIPVSSuite) validateRealServers(ctx context.Context, node string, vip string) {
ssh, err := s.SSH(ctx, node)
s.Require().NoError(err)
defer ssh.Disconnect()
Expand All @@ -151,7 +151,7 @@ func (s *keepalivedSuite) validateRealServers(ctx context.Context, node string,

// checkDummy checks that the dummy interface is present on the given node and
// that it has only the virtual IP address.
func (s *keepalivedSuite) checkDummy(ctx context.Context, node string, vip string) {
func (s *CPLBIPVSSuite) checkDummy(ctx context.Context, node string, vip string) {
ssh, err := s.SSH(ctx, node)
s.Require().NoError(err)
defer ssh.Disconnect()
Expand All @@ -167,7 +167,7 @@ func (s *keepalivedSuite) checkDummy(ctx context.Context, node string, vip strin

// hasVIP checks that the dummy interface is present on the given node and
// that it has only the virtual IP address.
func (s *keepalivedSuite) hasVIP(ctx context.Context, node string, vip string) bool {
func (s *CPLBIPVSSuite) hasVIP(ctx context.Context, node string, vip string) bool {
ssh, err := s.SSH(ctx, node)
s.Require().NoError(err)
defer ssh.Disconnect()
Expand All @@ -180,8 +180,8 @@ func (s *keepalivedSuite) hasVIP(ctx context.Context, node string, vip string) b

// TestKeepAlivedSuite runs the keepalived test suite. It verifies that the
// virtual IP is working by joining a node to the cluster using the VIP.
func TestKeepAlivedSuite(t *testing.T) {
suite.Run(t, &keepalivedSuite{
func TestCPLBIPVSSuite(t *testing.T) {
suite.Run(t, &CPLBIPVSSuite{
common.BootlooseSuite{
ControllerCount: 3,
WorkerCount: 1,
Expand Down
223 changes: 223 additions & 0 deletions inttest/cplb-userspace/cplbuserspace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2024 k0s authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package keepalived

import (
"context"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"testing"
"time"

"github.com/k0sproject/k0s/inttest/common"

"github.com/stretchr/testify/suite"
)

type CPLBUserSpaceSuite struct {
common.BootlooseSuite
}

const haControllerConfig = `
spec:
network:
controlPlaneLoadBalancing:
enabled: true
type: Keepalived
keepalived:
vrrpInstances:
- virtualIPs: ["%s/16"]
authPass: "123456"
nodeLocalLoadBalancing:
enabled: true
type: EnvoyProxy
`

// SetupTest prepares the controller and filesystem, getting it into a consistent
// state which we can run tests against.
func (s *CPLBUserSpaceSuite) TestK0sGetsUp() {
lb := s.getLBAddress()
ctx := s.Context()
var joinToken string

for idx := range s.BootlooseSuite.ControllerCount {
s.Require().NoError(s.WaitForSSH(s.ControllerNode(idx), 2*time.Minute, 1*time.Second))
s.PutFile(s.ControllerNode(idx), "/tmp/k0s.yaml", fmt.Sprintf(haControllerConfig, lb))

// Note that the token is intentionally empty for the first controller
s.Require().NoError(s.InitController(idx, "--config=/tmp/k0s.yaml", "--disable-components=metrics-server", "--enable-worker", joinToken))
s.Require().NoError(s.WaitJoinAPI(s.ControllerNode(idx)))

// With the primary controller running, create the join token for subsequent controllers.
if idx == 0 {
token, err := s.GetJoinToken("controller")
s.Require().NoError(err)
joinToken = token
}
}

// Final sanity -- ensure all nodes see each other according to etcd
for idx := range s.BootlooseSuite.ControllerCount {
s.Require().Len(s.GetMembers(idx), s.BootlooseSuite.ControllerCount)
}

// Create a worker join token
workerJoinToken, err := s.GetJoinToken("worker")
s.Require().NoError(err)

// Start the workers using the join token
s.Require().NoError(s.RunWorkersWithToken(workerJoinToken))

client, err := s.KubeClient(s.ControllerNode(0))
s.Require().NoError(err)

for idx := range s.BootlooseSuite.ControllerCount {
s.Require().NoError(s.WaitForNodeReady(s.ControllerNode(idx), client))
}
s.Require().NoError(s.WaitForNodeReady(s.WorkerNode(0), client))

// Verify that none of the servers has the dummy interface
for idx := range s.BootlooseSuite.ControllerCount {
s.checkDummy(ctx, s.ControllerNode(idx))
}

// Verify that only one controller has the VIP in eth0
count := 0
for idx := range s.BootlooseSuite.ControllerCount {
if s.hasVIP(ctx, s.ControllerNode(idx), lb) {
count++
}
}
s.Require().Equal(1, count, "Expected exactly one controller to have the VIP")

// Verify that controller+worker nodes are working normally.
s.T().Log("waiting to see CNI pods ready")
s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), client), "kube router did not start")
s.T().Log("waiting to see konnectivity-agent pods ready")
s.Require().NoError(common.WaitForDaemonSet(s.Context(), client, "konnectivity-agent", "kube-system"), "konnectivity-agent did not start")
s.T().Log("waiting to get logs from pods")
s.Require().NoError(common.WaitForPodLogs(s.Context(), client, "kube-system"))

s.T().Log("Testing that the load balancer is actually balancing the load")
// Other stuff may be querying the controller, running the HTTPS request 15 times
// should be more than we need.
signatures := make(map[string]int)
url := url.URL{Scheme: "https", Host: net.JoinHostPort(lb, strconv.Itoa(6443))}
for range 15 {
signature, err := getServerCertSignature(url.String())
s.Require().NoError(err)
signatures[signature] = 1
}

s.Require().Len(signatures, 3, "Expected 3 different signatures, got %d", len(signatures))
}

// getLBAddress returns the IP address of the controller 0 and it adds 100 to
// the last octet unless it's bigger or equal to 154, in which case it
// subtracts 100. Theoretically this could result in an invalid IP address.
func (s *CPLBUserSpaceSuite) getLBAddress() string {
ip := s.GetIPAddress(s.ControllerNode(0))
parts := strings.Split(ip, ".")
if len(parts) != 4 {
s.T().Fatalf("Invalid IP address: %q", ip)
}
lastOctet, err := strconv.Atoi(parts[3])
s.Require().NoErrorf(err, "Failed to convert last octet %q to int", parts[3])
if lastOctet >= 154 {
lastOctet -= 100
} else {
lastOctet += 100
}

return fmt.Sprintf("%s.%d", strings.Join(parts[:3], "."), lastOctet)
}

// checkDummy checks that the dummy interface isn't present in the node.
func (s *CPLBUserSpaceSuite) checkDummy(ctx context.Context, node string) {
ssh, err := s.SSH(ctx, node)
s.Require().NoError(err)
defer ssh.Disconnect()

_, err = ssh.ExecWithOutput(ctx, "ip --oneline addr show dummyvip0")
s.Require().Error(err)
}

// hasVIP checks that the dummy interface is present on the given node and
// that it has only the virtual IP address.
func (s *CPLBUserSpaceSuite) hasVIP(ctx context.Context, node string, vip string) bool {
ssh, err := s.SSH(ctx, node)
s.Require().NoError(err)
defer ssh.Disconnect()

output, err := ssh.ExecWithOutput(ctx, "ip --oneline addr show eth0")
s.Require().NoError(err)

return strings.Contains(output, fmt.Sprintf("inet %s/16", vip))
}

// getServerCertSignature connects to the given HTTPS URL and returns the server certificate signature.
func getServerCertSignature(url string) (string, error) {
// Create a custom HTTP client with a custom TLS configuration
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // Skip verification for demonstration purposes
},
},
}

// Make a request to the URL
resp, err := client.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()

// Get the TLS connection state
connState := resp.TLS
if connState == nil {
return "", errors.New("no TLS connection state")
}

// Get the server certificate
if len(connState.PeerCertificates) == 0 {
return "", errors.New("no server certificate found")
}
cert := connState.PeerCertificates[0]

// Get the certificate signature
signature := cert.Signature

// Return the signature as a hex string
return hex.EncodeToString(signature), nil
}

// TestKeepAlivedSuite runs the keepalived test suite. It verifies that the
// virtual IP is working by joining a node to the cluster using the VIP.
func TestCPLBUserSpaceSuite(t *testing.T) {
suite.Run(t, &CPLBUserSpaceSuite{
common.BootlooseSuite{
ControllerCount: 3,
WorkerCount: 1,
},
})
}
2 changes: 1 addition & 1 deletion inttest/customports/customports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (s *customPortsSuite) TestControllerJoinsWithCustomPort() {
s.AssertSomeKubeSystemPods(kc)

s.T().Log("waiting to see CNI pods ready")
s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), kc), "calico did not start")
s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), kc), "kube-router did not start")
s.T().Log("waiting to see konnectivity-agent pods ready")
s.Require().NoError(common.WaitForDaemonSet(s.Context(), kc, "konnectivity-agent", "kube-system"), "konnectivity-agent did not start")

Expand Down
14 changes: 14 additions & 0 deletions pkg/apis/k0s/v1beta1/cplb.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ type KeepalivedSpec struct {
// Configuration options related to the virtual servers. This is an array
// which allows to configure multiple load balancers.
VirtualServers VirtualServers `json:"virtualServers,omitempty"`
// UserspaceProxyPort is the port where the userspace proxy will bind
// to. This port is only exposed on the localhost interface and is only
// used internally. Defaults to 6444.
// +kubebuilder:default=6444
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=65535
// +optional
UserSpaceProxyPort int `json:"userSpaceProxyBindPort,omitempty"`
}

// VRRPInstances is a list of VRRPInstance
Expand Down Expand Up @@ -293,6 +301,12 @@ func (k *KeepalivedSpec) Validate(externalAddress string) (errs []error) {

errs = append(errs, k.validateVRRPInstances(nil)...)
errs = append(errs, k.validateVirtualServers()...)
if k.UserSpaceProxyPort == 0 {
k.UserSpaceProxyPort = 6444
} else if k.UserSpaceProxyPort < 1 || k.UserSpaceProxyPort > 65535 {
errs = append(errs, errors.New("UserSpaceProxyPort must be in the range of 1-65535"))
}

// CPLB reconciler relies in watching kubernetes.default.svc endpoints
if externalAddress != "" && len(k.VirtualServers) > 0 {
errs = append(errs, errors.New(".spec.api.externalAddress and virtual servers cannot be used together"))
Expand Down
Loading

0 comments on commit 7227791

Please sign in to comment.