From e64a48451809372a923572c8feeac0edb587e125 Mon Sep 17 00:00:00 2001 From: Jonny Date: Wed, 17 May 2023 17:01:10 +0100 Subject: [PATCH 1/4] Capacity tracking support --- go.mod | 1 - pkg/lvm/controllerserver.go | 35 +++++++- pkg/lvm/lvm.go | 170 +++++++++++++++++++++++++++--------- 3 files changed, 160 insertions(+), 46 deletions(-) diff --git a/go.mod b/go.mod index efed3a21..af0ac6f0 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,6 @@ require ( golang.org/x/time v0.3.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230119192704-9d59e20e5cd1 // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/pkg/lvm/controllerserver.go b/pkg/lvm/controllerserver.go index ccc9fee7..a3beb785 100644 --- a/pkg/lvm/controllerserver.go +++ b/pkg/lvm/controllerserver.go @@ -17,6 +17,8 @@ limitations under the License. package lvm import ( + "fmt" + "google.golang.org/protobuf/types/known/wrapperspb" "strconv" "github.com/container-storage-interface/spec/lib/go/csi" @@ -55,6 +57,7 @@ func newControllerServer(ephemeral bool, nodeID string, devicesPattern string, v } // creates the clientset kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { return nil, err } @@ -62,6 +65,7 @@ func newControllerServer(ephemeral bool, nodeID string, devicesPattern string, v caps: getControllerServiceCapabilities( []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_GET_CAPACITY, // TODO // csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, // csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, @@ -212,6 +216,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } +func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { + nodeName := req.GetAccessibleTopology().GetSegments()[topologyKeyNode] + lvmType := req.GetParameters()["type"] + totalBytes, err := createGetCapacityPod(ctx, volumeAction{ + action: "getcapacity", + name: fmt.Sprintf("%s-%s", lvmType, nodeName), + nodeName: nodeName, + lvmType: lvmType, + devicesPattern: cs.devicesPattern, + provisionerImage: cs.provisionerImage, + pullPolicy: cs.pullPolicy, + kubeClient: cs.kubeClient, + namespace: cs.namespace, + vgName: cs.vgName, + hostWritePath: cs.hostWritePath, + }) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to query devices capacity: %v", err) + } + + return &csi.GetCapacityResponse{ + AvailableCapacity: totalBytes, + MaximumVolumeSize: wrapperspb.Int64(totalBytes), + MinimumVolumeSize: wrapperspb.Int64(0), + }, nil +} + func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { return &csi.ControllerGetCapabilitiesResponse{ Capabilities: cs.caps, @@ -287,10 +318,6 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req * return nil, status.Error(codes.Unimplemented, "") } -func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { - return nil, status.Error(codes.Unimplemented, "") -} - func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { return nil, status.Error(codes.Unimplemented, "") } diff --git a/pkg/lvm/lvm.go b/pkg/lvm/lvm.go index 07072d00..6dbe092c 100644 --- a/pkg/lvm/lvm.go +++ b/pkg/lvm/lvm.go @@ -19,6 +19,7 @@ package lvm import ( "context" "fmt" + "k8s.io/client-go/rest" "os" "os/exec" "path/filepath" @@ -81,17 +82,19 @@ type volumeAction struct { } const ( - linearType = "linear" - stripedType = "striped" - mirrorType = "mirror" - actionTypeCreate = "create" - actionTypeDelete = "delete" - pullIfNotPresent = "ifnotpresent" - fsTypeRegexpString = `TYPE="(\w+)"` + linearType = "linear" + stripedType = "striped" + mirrorType = "mirror" + actionTypeCreate = "create" + actionTypeDelete = "delete" + pullIfNotPresent = "ifnotpresent" + fsTypeRegexpString = `TYPE="(\w+)"` + pvCapacityRegexpString = `B (\d+)B` ) var ( - fsTypeRegexp = regexp.MustCompile(fsTypeRegexpString) + fsTypeRegexp = regexp.MustCompile(fsTypeRegexpString) + pvCapacityRegexp = regexp.MustCompile(pvCapacityRegexpString) ) // NewLvmDriver creates the driver @@ -253,6 +256,82 @@ func umountLV(targetPath string) { } } +func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { + pvCapacityRegexp, err := regexp.Compile(pvCapacityRegexpString) + if err != nil { + klog.Errorf("unable to compile regexp for querying physical volume capacity regexp:%s err:%v", pvCapacityRegexpString, err) + return 0, err + } + + // Wrap command in a shell or the device pattern is wrapped in quotes causing pvdisplay to error + provisionerPod, deleteFunc, err := createPod( + ctx, + "sh", + []string{"-c", fmt.Sprintf("pvdisplay %s %s %s", va.devicesPattern, "--units=B", "-C")}, + va, + ) + if err != nil { + return 0, err + } + defer deleteFunc() + + completed := false + retrySeconds := 60 + podLogRequest := &rest.Request{} + + for i := 0; i < retrySeconds; i++ { + pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) + if pod.Status.Phase == v1.PodFailed { + klog.Infof("get capacity pod terminated with failure node:%s", va.nodeName) + return 0, status.Error(codes.ResourceExhausted, "get capacity failed") + } + if err != nil { + klog.Errorf("error reading get capacity pod:%v node:%s", err, va.nodeName) + } else if pod.Status.Phase == v1.PodSucceeded { + klog.Infof("get capacity pod terminated successfully, getting logs node:%s", va.nodeName) + podLogRequest = va.kubeClient.CoreV1().Pods(va.namespace).GetLogs(provisionerPod.Name, &v1.PodLogOptions{}) + completed = true + break + } + klog.Infof("get capacity pod status:%s node:%s", pod.Status.Phase, va.nodeName) + time.Sleep(1 * time.Second) + } + if !completed { + return 0, fmt.Errorf("get capacity process timeout after %v seconds node:%s", retrySeconds, va.nodeName) + } + + resp := podLogRequest.Do(ctx) + if resp.Error() != nil { + return 0, fmt.Errorf("failed to get logs from pv capacity pod: %v node:%s", err, va.nodeName) + } + logs, err := resp.Raw() + if err != nil { + return 0, fmt.Errorf("failed to read logs from pv capacity pod: %v node:%s", err, va.nodeName) + } + + matches := pvCapacityRegexp.FindStringSubmatch(string(logs)) + totalBytes := int64(0) + for i := 1; i < len(matches); i++ { + i, err := strconv.ParseInt(matches[i], 10, 64) + if err != nil { + klog.Errorf("unable to parse returned free space output:%s err:%v node:%s", matches[i], err, va.nodeName) + return 0, err + } + totalBytes += i + } + + lvmTypeBytes := int64(0) + switch va.lvmType { + case linearType, stripedType: + lvmTypeBytes = totalBytes + case mirrorType: + lvmTypeBytes = totalBytes / 2 + } + + klog.Infof("pvdisplay output for remaining pv capacity: %d bytes node:%s", lvmTypeBytes, va.nodeName) + return lvmTypeBytes, nil +} + func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { if va.name == "" || va.nodeName == "" { return fmt.Errorf("invalid empty name or path or node") @@ -270,6 +349,42 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { } args = append(args, "--lvname", va.name, "--vgname", va.vgName) + provisionerPod, deleteFunc, err := createPod(ctx, "/csi-lvmplugin-provisioner", args, va) + if err != nil { + return err + } + defer deleteFunc() + + completed := false + retrySeconds := 60 + for i := 0; i < retrySeconds; i++ { + pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) + if pod.Status.Phase == v1.PodFailed { + // pod terminated in time, but with failure + // return ResourceExhausted so the requesting pod can be rescheduled to anonther node + // see https://github.com/kubernetes-csi/external-provisioner/pull/405 + klog.Info("provisioner pod terminated with failure") + return status.Error(codes.ResourceExhausted, "volume creation failed") + } + if err != nil { + klog.Errorf("error reading provisioner pod:%v", err) + } else if pod.Status.Phase == v1.PodSucceeded { + klog.Info("provisioner pod terminated successfully") + completed = true + break + } + klog.Infof("provisioner pod status:%s", pod.Status.Phase) + time.Sleep(1 * time.Second) + } + if !completed { + return fmt.Errorf("create process timeout after %v seconds", retrySeconds) + } + + klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName) + return nil +} + +func createPod(ctx context.Context, cmd string, args []string, va volumeAction) (*v1.Pod, func(), error) { klog.Infof("start provisionerPod with args:%s", args) hostPathType := v1.HostPathDirectoryOrCreate privileged := true @@ -290,7 +405,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { { Name: "csi-lvmplugin-" + string(va.action), Image: va.provisionerImage, - Command: []string{"/csi-lvmplugin-provisioner"}, + Command: []string{cmd}, Args: args, VolumeMounts: []v1.VolumeMount{ { @@ -377,6 +492,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { }, }, }, + { Name: "lvmlock", VolumeSource: v1.VolumeSource{ @@ -392,45 +508,17 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { // If it already exists due to some previous errors, the pod will be cleaned up later automatically // https://github.com/rancher/local-path-provisioner/issues/27 - _, err = va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{}) + _, err := va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{}) if err != nil && !k8serror.IsAlreadyExists(err) { - return err + return nil, nil, err } - defer func() { + return provisionerPod, func() { e := va.kubeClient.CoreV1().Pods(va.namespace).Delete(ctx, provisionerPod.Name, metav1.DeleteOptions{}) if e != nil { klog.Errorf("unable to delete the provisioner pod: %v", e) } - }() - - completed := false - retrySeconds := 60 - for i := 0; i < retrySeconds; i++ { - pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) - if pod.Status.Phase == v1.PodFailed { - // pod terminated in time, but with failure - // return ResourceExhausted so the requesting pod can be rescheduled to anonther node - // see https://github.com/kubernetes-csi/external-provisioner/pull/405 - klog.Info("provisioner pod terminated with failure") - return status.Error(codes.ResourceExhausted, "volume creation failed") - } - if err != nil { - klog.Errorf("error reading provisioner pod:%v", err) - } else if pod.Status.Phase == v1.PodSucceeded { - klog.Info("provisioner pod terminated successfully") - completed = true - break - } - klog.Infof("provisioner pod status:%s", pod.Status.Phase) - time.Sleep(1 * time.Second) - } - if !completed { - return fmt.Errorf("create process timeout after %v seconds", retrySeconds) - } - - klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName) - return nil + }, nil } // VgExists checks if the given volume group exists From c569d3625ff810b527bc84b908d0183309e0e875 Mon Sep 17 00:00:00 2001 From: Jonny Date: Thu, 13 Jul 2023 13:29:51 +0100 Subject: [PATCH 2/4] GetCapacity to use pvs, json reports --- go.mod | 1 + pkg/lvm/lvm.go | 75 ++++++++++++++++++++++++++++++-------------------- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index af0ac6f0..aa9e6731 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( golang.org/x/net v0.5.0 golang.org/x/sys v0.4.0 google.golang.org/grpc v1.53.0 + google.golang.org/protobuf v1.28.1 k8s.io/api v0.26.0 k8s.io/apimachinery v0.26.0 k8s.io/client-go v0.26.0 diff --git a/pkg/lvm/lvm.go b/pkg/lvm/lvm.go index 6dbe092c..7ffc4ce3 100644 --- a/pkg/lvm/lvm.go +++ b/pkg/lvm/lvm.go @@ -19,6 +19,7 @@ package lvm import ( "context" "fmt" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/rest" "os" "os/exec" @@ -81,20 +82,41 @@ type volumeAction struct { hostWritePath string } +type pvReport struct { + Report []struct { + PV []struct { + PVName string `json:"pv_name"` + PVFree string `json:"pv_free"` + } `json:"pv"` + } `json:"report"` +} + +func (p *pvReport) totalFree() (int64, error) { + totalFree := int64(0) + for _, report := range p.Report { + for _, pv := range report.PV { + free, err := strconv.ParseInt(pv.PVFree, 10, 0) + if err != nil { + return 0, fmt.Errorf("failed to parse free space for device %s with error: %w", pv.PVName, err) + } + totalFree += free + } + } + return totalFree, nil +} + const ( - linearType = "linear" - stripedType = "striped" - mirrorType = "mirror" - actionTypeCreate = "create" - actionTypeDelete = "delete" - pullIfNotPresent = "ifnotpresent" - fsTypeRegexpString = `TYPE="(\w+)"` - pvCapacityRegexpString = `B (\d+)B` + linearType = "linear" + stripedType = "striped" + mirrorType = "mirror" + actionTypeCreate = "create" + actionTypeDelete = "delete" + pullIfNotPresent = "ifnotpresent" + fsTypeRegexpString = `TYPE="(\w+)"` ) var ( - fsTypeRegexp = regexp.MustCompile(fsTypeRegexpString) - pvCapacityRegexp = regexp.MustCompile(pvCapacityRegexpString) + fsTypeRegexp = regexp.MustCompile(fsTypeRegexpString) ) // NewLvmDriver creates the driver @@ -257,17 +279,11 @@ func umountLV(targetPath string) { } func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { - pvCapacityRegexp, err := regexp.Compile(pvCapacityRegexpString) - if err != nil { - klog.Errorf("unable to compile regexp for querying physical volume capacity regexp:%s err:%v", pvCapacityRegexpString, err) - return 0, err - } - - // Wrap command in a shell or the device pattern is wrapped in quotes causing pvdisplay to error + // Wrap command in a shell or the device pattern is wrapped in quotes causing pvs to error provisionerPod, deleteFunc, err := createPod( ctx, "sh", - []string{"-c", fmt.Sprintf("pvdisplay %s %s %s", va.devicesPattern, "--units=B", "-C")}, + []string{"-c", fmt.Sprintf("pvs %s %s %s %s", va.devicesPattern, "--units=B", "--reportformat=json", "--nosuffix")}, va, ) if err != nil { @@ -302,22 +318,21 @@ func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { resp := podLogRequest.Do(ctx) if resp.Error() != nil { - return 0, fmt.Errorf("failed to get logs from pv capacity pod: %v node:%s", err, va.nodeName) + return 0, fmt.Errorf("failed to get logs from pv capacity pod: %w node:%s", err, va.nodeName) } logs, err := resp.Raw() if err != nil { - return 0, fmt.Errorf("failed to read logs from pv capacity pod: %v node:%s", err, va.nodeName) + return 0, fmt.Errorf("failed to read logs from pv capacity pod: %w node:%s", err, va.nodeName) } - matches := pvCapacityRegexp.FindStringSubmatch(string(logs)) - totalBytes := int64(0) - for i := 1; i < len(matches); i++ { - i, err := strconv.ParseInt(matches[i], 10, 64) - if err != nil { - klog.Errorf("unable to parse returned free space output:%s err:%v node:%s", matches[i], err, va.nodeName) - return 0, err - } - totalBytes += i + pvReport := pvReport{} + err = json.Unmarshal(logs, &pvReport) + if err != nil { + return 0, fmt.Errorf("failed to format pvs output: %w node:%s", err, va.nodeName) + } + totalBytes, err := pvReport.totalFree() + if err != nil { + return 0, fmt.Errorf("%w node:%s", err, va.nodeName) } lvmTypeBytes := int64(0) @@ -328,7 +343,7 @@ func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { lvmTypeBytes = totalBytes / 2 } - klog.Infof("pvdisplay output for remaining pv capacity: %d bytes node:%s", lvmTypeBytes, va.nodeName) + klog.Infof("pvs output for remaining pv capacity: %d bytes node:%s", lvmTypeBytes, va.nodeName) return lvmTypeBytes, nil } From e0765c42fc73e290bd6154edef6a4b85f03cc8bd Mon Sep 17 00:00:00 2001 From: Jonny Date: Tue, 13 Aug 2024 10:46:17 +0100 Subject: [PATCH 3/4] Instantiate drives and ignore pvs errors --- pkg/lvm/lvm.go | 21 +++++++++++++++++---- tests/bats/test.bats | 5 +++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pkg/lvm/lvm.go b/pkg/lvm/lvm.go index 7ffc4ce3..275af58e 100644 --- a/pkg/lvm/lvm.go +++ b/pkg/lvm/lvm.go @@ -279,11 +279,26 @@ func umountLV(targetPath string) { } func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { + // Prior to running the pvs command, run pvcreate with the device pattern as physical volumes aren't initialised + // if the node is new. This would result in 0 capacity causing PVCs to never be scheduled onto a new node. + pvCreateVA := va + pvCreateVA.action = "pvcreate" + _, deleteFunc, err := createPod( + ctx, + "sh", + []string{"-c", fmt.Sprintf("pvcreate %s", va.devicesPattern)}, + pvCreateVA, + ) + if err != nil { + return 0, err + } + defer deleteFunc() + // Wrap command in a shell or the device pattern is wrapped in quotes causing pvs to error provisionerPod, deleteFunc, err := createPod( ctx, "sh", - []string{"-c", fmt.Sprintf("pvs %s %s %s %s", va.devicesPattern, "--units=B", "--reportformat=json", "--nosuffix")}, + []string{"-c", fmt.Sprintf("pvs %s %s %s %s %s", va.devicesPattern, "--units=B", "--reportformat=json", "--nosuffix", "2>/dev/null")}, va, ) if err != nil { @@ -318,7 +333,7 @@ func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { resp := podLogRequest.Do(ctx) if resp.Error() != nil { - return 0, fmt.Errorf("failed to get logs from pv capacity pod: %w node:%s", err, va.nodeName) + return 0, fmt.Errorf("failed to get logs from pv capacity pod: %w node:%s", resp.Error(), va.nodeName) } logs, err := resp.Raw() if err != nil { @@ -629,8 +644,6 @@ func CreateLVS(vg string, name string, size uint64, lvmType string) (string, err return "", fmt.Errorf("lvmType is incorrect: %s", lvmType) } - // TODO: check available capacity, fail if request doesn't fit - args := []string{"-v", "--yes", "-n", name, "-W", "y", "-L", fmt.Sprintf("%db", size)} pvs, err := pvCount(vg) diff --git a/tests/bats/test.bats b/tests/bats/test.bats index 01eb564a..e55b4405 100644 --- a/tests/bats/test.bats +++ b/tests/bats/test.bats @@ -124,6 +124,11 @@ [ "$status" -eq 0 ] } +@test "check capacity tracking" { + run kubectl wait --for=jsonpath='{.status.phase}'=Running -f files/pod.inline.vol.xfs.yaml --timeout=10s + [ "$status" -eq 0 ] +} + @test "delete csi-lvm-controller" { echo "⏳ Wait 10s for all PVCs to be cleaned up..." >&3 sleep 10 From ea1c7bb78366164b0f42f8adbd80edd0f6adaf63 Mon Sep 17 00:00:00 2001 From: Jonny Date: Fri, 16 Aug 2024 13:43:02 +0100 Subject: [PATCH 4/4] Always return 0 for pvs commands incase all device patterns dont exist --- pkg/lvm/lvm.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/lvm/lvm.go b/pkg/lvm/lvm.go index 275af58e..0433cd55 100644 --- a/pkg/lvm/lvm.go +++ b/pkg/lvm/lvm.go @@ -19,8 +19,6 @@ package lvm import ( "context" "fmt" - "k8s.io/apimachinery/pkg/util/json" - "k8s.io/client-go/rest" "os" "os/exec" "path/filepath" @@ -29,6 +27,9 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/client-go/rest" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -298,7 +299,7 @@ func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { provisionerPod, deleteFunc, err := createPod( ctx, "sh", - []string{"-c", fmt.Sprintf("pvs %s %s %s %s %s", va.devicesPattern, "--units=B", "--reportformat=json", "--nosuffix", "2>/dev/null")}, + []string{"-c", fmt.Sprintf("pvs %s %s %s %s %s", va.devicesPattern, "--units=B", "--reportformat=json", "--nosuffix", "2>/dev/null || true")}, va, ) if err != nil {