From e64a48451809372a923572c8feeac0edb587e125 Mon Sep 17 00:00:00 2001 From: Jonny Date: Wed, 17 May 2023 17:01:10 +0100 Subject: [PATCH] Capacity tracking support --- go.mod | 1 - pkg/lvm/controllerserver.go | 35 +++++++- pkg/lvm/lvm.go | 170 +++++++++++++++++++++++++++--------- 3 files changed, 160 insertions(+), 46 deletions(-) diff --git a/go.mod b/go.mod index efed3a21..af0ac6f0 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,6 @@ require ( golang.org/x/time v0.3.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230119192704-9d59e20e5cd1 // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/pkg/lvm/controllerserver.go b/pkg/lvm/controllerserver.go index ccc9fee7..a3beb785 100644 --- a/pkg/lvm/controllerserver.go +++ b/pkg/lvm/controllerserver.go @@ -17,6 +17,8 @@ limitations under the License. package lvm import ( + "fmt" + "google.golang.org/protobuf/types/known/wrapperspb" "strconv" "github.com/container-storage-interface/spec/lib/go/csi" @@ -55,6 +57,7 @@ func newControllerServer(ephemeral bool, nodeID string, devicesPattern string, v } // creates the clientset kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { return nil, err } @@ -62,6 +65,7 @@ func newControllerServer(ephemeral bool, nodeID string, devicesPattern string, v caps: getControllerServiceCapabilities( []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_GET_CAPACITY, // TODO // csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, // csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, @@ -212,6 +216,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } +func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { + nodeName := req.GetAccessibleTopology().GetSegments()[topologyKeyNode] + lvmType := req.GetParameters()["type"] + totalBytes, err := createGetCapacityPod(ctx, volumeAction{ + action: "getcapacity", + name: fmt.Sprintf("%s-%s", lvmType, nodeName), + nodeName: nodeName, + lvmType: lvmType, + devicesPattern: cs.devicesPattern, + provisionerImage: cs.provisionerImage, + pullPolicy: cs.pullPolicy, + kubeClient: cs.kubeClient, + namespace: cs.namespace, + vgName: cs.vgName, + hostWritePath: cs.hostWritePath, + }) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to query devices capacity: %v", err) + } + + return &csi.GetCapacityResponse{ + AvailableCapacity: totalBytes, + MaximumVolumeSize: wrapperspb.Int64(totalBytes), + MinimumVolumeSize: wrapperspb.Int64(0), + }, nil +} + func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { return &csi.ControllerGetCapabilitiesResponse{ Capabilities: cs.caps, @@ -287,10 +318,6 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req * return nil, status.Error(codes.Unimplemented, "") } -func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { - return nil, status.Error(codes.Unimplemented, "") -} - func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { return nil, status.Error(codes.Unimplemented, "") } diff --git a/pkg/lvm/lvm.go b/pkg/lvm/lvm.go index 07072d00..6dbe092c 100644 --- a/pkg/lvm/lvm.go +++ b/pkg/lvm/lvm.go @@ -19,6 +19,7 @@ package lvm import ( "context" "fmt" + "k8s.io/client-go/rest" "os" "os/exec" "path/filepath" @@ -81,17 +82,19 @@ type volumeAction struct { } const ( - linearType = "linear" - stripedType = "striped" - mirrorType = "mirror" - actionTypeCreate = "create" - actionTypeDelete = "delete" - pullIfNotPresent = "ifnotpresent" - fsTypeRegexpString = `TYPE="(\w+)"` + linearType = "linear" + stripedType = "striped" + mirrorType = "mirror" + actionTypeCreate = "create" + actionTypeDelete = "delete" + pullIfNotPresent = "ifnotpresent" + fsTypeRegexpString = `TYPE="(\w+)"` + pvCapacityRegexpString = `B (\d+)B` ) var ( - fsTypeRegexp = regexp.MustCompile(fsTypeRegexpString) + fsTypeRegexp = regexp.MustCompile(fsTypeRegexpString) + pvCapacityRegexp = regexp.MustCompile(pvCapacityRegexpString) ) // NewLvmDriver creates the driver @@ -253,6 +256,82 @@ func umountLV(targetPath string) { } } +func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { + pvCapacityRegexp, err := regexp.Compile(pvCapacityRegexpString) + if err != nil { + klog.Errorf("unable to compile regexp for querying physical volume capacity regexp:%s err:%v", pvCapacityRegexpString, err) + return 0, err + } + + // Wrap command in a shell or the device pattern is wrapped in quotes causing pvdisplay to error + provisionerPod, deleteFunc, err := createPod( + ctx, + "sh", + []string{"-c", fmt.Sprintf("pvdisplay %s %s %s", va.devicesPattern, "--units=B", "-C")}, + va, + ) + if err != nil { + return 0, err + } + defer deleteFunc() + + completed := false + retrySeconds := 60 + podLogRequest := &rest.Request{} + + for i := 0; i < retrySeconds; i++ { + pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) + if pod.Status.Phase == v1.PodFailed { + klog.Infof("get capacity pod terminated with failure node:%s", va.nodeName) + return 0, status.Error(codes.ResourceExhausted, "get capacity failed") + } + if err != nil { + klog.Errorf("error reading get capacity pod:%v node:%s", err, va.nodeName) + } else if pod.Status.Phase == v1.PodSucceeded { + klog.Infof("get capacity pod terminated successfully, getting logs node:%s", va.nodeName) + podLogRequest = va.kubeClient.CoreV1().Pods(va.namespace).GetLogs(provisionerPod.Name, &v1.PodLogOptions{}) + completed = true + break + } + klog.Infof("get capacity pod status:%s node:%s", pod.Status.Phase, va.nodeName) + time.Sleep(1 * time.Second) + } + if !completed { + return 0, fmt.Errorf("get capacity process timeout after %v seconds node:%s", retrySeconds, va.nodeName) + } + + resp := podLogRequest.Do(ctx) + if resp.Error() != nil { + return 0, fmt.Errorf("failed to get logs from pv capacity pod: %v node:%s", err, va.nodeName) + } + logs, err := resp.Raw() + if err != nil { + return 0, fmt.Errorf("failed to read logs from pv capacity pod: %v node:%s", err, va.nodeName) + } + + matches := pvCapacityRegexp.FindStringSubmatch(string(logs)) + totalBytes := int64(0) + for i := 1; i < len(matches); i++ { + i, err := strconv.ParseInt(matches[i], 10, 64) + if err != nil { + klog.Errorf("unable to parse returned free space output:%s err:%v node:%s", matches[i], err, va.nodeName) + return 0, err + } + totalBytes += i + } + + lvmTypeBytes := int64(0) + switch va.lvmType { + case linearType, stripedType: + lvmTypeBytes = totalBytes + case mirrorType: + lvmTypeBytes = totalBytes / 2 + } + + klog.Infof("pvdisplay output for remaining pv capacity: %d bytes node:%s", lvmTypeBytes, va.nodeName) + return lvmTypeBytes, nil +} + func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { if va.name == "" || va.nodeName == "" { return fmt.Errorf("invalid empty name or path or node") @@ -270,6 +349,42 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { } args = append(args, "--lvname", va.name, "--vgname", va.vgName) + provisionerPod, deleteFunc, err := createPod(ctx, "/csi-lvmplugin-provisioner", args, va) + if err != nil { + return err + } + defer deleteFunc() + + completed := false + retrySeconds := 60 + for i := 0; i < retrySeconds; i++ { + pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) + if pod.Status.Phase == v1.PodFailed { + // pod terminated in time, but with failure + // return ResourceExhausted so the requesting pod can be rescheduled to anonther node + // see https://github.com/kubernetes-csi/external-provisioner/pull/405 + klog.Info("provisioner pod terminated with failure") + return status.Error(codes.ResourceExhausted, "volume creation failed") + } + if err != nil { + klog.Errorf("error reading provisioner pod:%v", err) + } else if pod.Status.Phase == v1.PodSucceeded { + klog.Info("provisioner pod terminated successfully") + completed = true + break + } + klog.Infof("provisioner pod status:%s", pod.Status.Phase) + time.Sleep(1 * time.Second) + } + if !completed { + return fmt.Errorf("create process timeout after %v seconds", retrySeconds) + } + + klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName) + return nil +} + +func createPod(ctx context.Context, cmd string, args []string, va volumeAction) (*v1.Pod, func(), error) { klog.Infof("start provisionerPod with args:%s", args) hostPathType := v1.HostPathDirectoryOrCreate privileged := true @@ -290,7 +405,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { { Name: "csi-lvmplugin-" + string(va.action), Image: va.provisionerImage, - Command: []string{"/csi-lvmplugin-provisioner"}, + Command: []string{cmd}, Args: args, VolumeMounts: []v1.VolumeMount{ { @@ -377,6 +492,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { }, }, }, + { Name: "lvmlock", VolumeSource: v1.VolumeSource{ @@ -392,45 +508,17 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { // If it already exists due to some previous errors, the pod will be cleaned up later automatically // https://github.com/rancher/local-path-provisioner/issues/27 - _, err = va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{}) + _, err := va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{}) if err != nil && !k8serror.IsAlreadyExists(err) { - return err + return nil, nil, err } - defer func() { + return provisionerPod, func() { e := va.kubeClient.CoreV1().Pods(va.namespace).Delete(ctx, provisionerPod.Name, metav1.DeleteOptions{}) if e != nil { klog.Errorf("unable to delete the provisioner pod: %v", e) } - }() - - completed := false - retrySeconds := 60 - for i := 0; i < retrySeconds; i++ { - pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) - if pod.Status.Phase == v1.PodFailed { - // pod terminated in time, but with failure - // return ResourceExhausted so the requesting pod can be rescheduled to anonther node - // see https://github.com/kubernetes-csi/external-provisioner/pull/405 - klog.Info("provisioner pod terminated with failure") - return status.Error(codes.ResourceExhausted, "volume creation failed") - } - if err != nil { - klog.Errorf("error reading provisioner pod:%v", err) - } else if pod.Status.Phase == v1.PodSucceeded { - klog.Info("provisioner pod terminated successfully") - completed = true - break - } - klog.Infof("provisioner pod status:%s", pod.Status.Phase) - time.Sleep(1 * time.Second) - } - if !completed { - return fmt.Errorf("create process timeout after %v seconds", retrySeconds) - } - - klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName) - return nil + }, nil } // VgExists checks if the given volume group exists