diff --git a/go.mod b/go.mod index efed3a21..aa9e6731 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( golang.org/x/net v0.5.0 golang.org/x/sys v0.4.0 google.golang.org/grpc v1.53.0 + google.golang.org/protobuf v1.28.1 k8s.io/api v0.26.0 k8s.io/apimachinery v0.26.0 k8s.io/client-go v0.26.0 @@ -43,7 +44,6 @@ require ( golang.org/x/time v0.3.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230119192704-9d59e20e5cd1 // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/pkg/lvm/controllerserver.go b/pkg/lvm/controllerserver.go index ccc9fee7..a3beb785 100644 --- a/pkg/lvm/controllerserver.go +++ b/pkg/lvm/controllerserver.go @@ -17,6 +17,8 @@ limitations under the License. package lvm import ( + "fmt" + "google.golang.org/protobuf/types/known/wrapperspb" "strconv" "github.com/container-storage-interface/spec/lib/go/csi" @@ -55,6 +57,7 @@ func newControllerServer(ephemeral bool, nodeID string, devicesPattern string, v } // creates the clientset kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { return nil, err } @@ -62,6 +65,7 @@ func newControllerServer(ephemeral bool, nodeID string, devicesPattern string, v caps: getControllerServiceCapabilities( []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_GET_CAPACITY, // TODO // csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, // csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, @@ -212,6 +216,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } +func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { + nodeName := req.GetAccessibleTopology().GetSegments()[topologyKeyNode] + lvmType := req.GetParameters()["type"] + totalBytes, err := createGetCapacityPod(ctx, volumeAction{ + action: "getcapacity", + name: fmt.Sprintf("%s-%s", lvmType, nodeName), + nodeName: nodeName, + lvmType: lvmType, + devicesPattern: cs.devicesPattern, + provisionerImage: cs.provisionerImage, + pullPolicy: cs.pullPolicy, + kubeClient: cs.kubeClient, + namespace: cs.namespace, + vgName: cs.vgName, + hostWritePath: cs.hostWritePath, + }) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to query devices capacity: %v", err) + } + + return &csi.GetCapacityResponse{ + AvailableCapacity: totalBytes, + MaximumVolumeSize: wrapperspb.Int64(totalBytes), + MinimumVolumeSize: wrapperspb.Int64(0), + }, nil +} + func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { return &csi.ControllerGetCapabilitiesResponse{ Capabilities: cs.caps, @@ -287,10 +318,6 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req * return nil, status.Error(codes.Unimplemented, "") } -func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { - return nil, status.Error(codes.Unimplemented, "") -} - func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { return nil, status.Error(codes.Unimplemented, "") } diff --git a/pkg/lvm/lvm.go b/pkg/lvm/lvm.go index 07072d00..0433cd55 100644 --- a/pkg/lvm/lvm.go +++ b/pkg/lvm/lvm.go @@ -27,6 +27,9 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/client-go/rest" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -80,6 +83,29 @@ type volumeAction struct { hostWritePath string } +type pvReport struct { + Report []struct { + PV []struct { + PVName string `json:"pv_name"` + PVFree string `json:"pv_free"` + } `json:"pv"` + } `json:"report"` +} + +func (p *pvReport) totalFree() (int64, error) { + totalFree := int64(0) + for _, report := range p.Report { + for _, pv := range report.PV { + free, err := strconv.ParseInt(pv.PVFree, 10, 0) + if err != nil { + return 0, fmt.Errorf("failed to parse free space for device %s with error: %w", pv.PVName, err) + } + totalFree += free + } + } + return totalFree, nil +} + const ( linearType = "linear" stripedType = "striped" @@ -253,6 +279,90 @@ func umountLV(targetPath string) { } } +func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) { + // Prior to running the pvs command, run pvcreate with the device pattern as physical volumes aren't initialised + // if the node is new. This would result in 0 capacity causing PVCs to never be scheduled onto a new node. + pvCreateVA := va + pvCreateVA.action = "pvcreate" + _, deleteFunc, err := createPod( + ctx, + "sh", + []string{"-c", fmt.Sprintf("pvcreate %s", va.devicesPattern)}, + pvCreateVA, + ) + if err != nil { + return 0, err + } + defer deleteFunc() + + // Wrap command in a shell or the device pattern is wrapped in quotes causing pvs to error + provisionerPod, deleteFunc, err := createPod( + ctx, + "sh", + []string{"-c", fmt.Sprintf("pvs %s %s %s %s %s", va.devicesPattern, "--units=B", "--reportformat=json", "--nosuffix", "2>/dev/null || true")}, + va, + ) + if err != nil { + return 0, err + } + defer deleteFunc() + + completed := false + retrySeconds := 60 + podLogRequest := &rest.Request{} + + for i := 0; i < retrySeconds; i++ { + pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) + if pod.Status.Phase == v1.PodFailed { + klog.Infof("get capacity pod terminated with failure node:%s", va.nodeName) + return 0, status.Error(codes.ResourceExhausted, "get capacity failed") + } + if err != nil { + klog.Errorf("error reading get capacity pod:%v node:%s", err, va.nodeName) + } else if pod.Status.Phase == v1.PodSucceeded { + klog.Infof("get capacity pod terminated successfully, getting logs node:%s", va.nodeName) + podLogRequest = va.kubeClient.CoreV1().Pods(va.namespace).GetLogs(provisionerPod.Name, &v1.PodLogOptions{}) + completed = true + break + } + klog.Infof("get capacity pod status:%s node:%s", pod.Status.Phase, va.nodeName) + time.Sleep(1 * time.Second) + } + if !completed { + return 0, fmt.Errorf("get capacity process timeout after %v seconds node:%s", retrySeconds, va.nodeName) + } + + resp := podLogRequest.Do(ctx) + if resp.Error() != nil { + return 0, fmt.Errorf("failed to get logs from pv capacity pod: %w node:%s", resp.Error(), va.nodeName) + } + logs, err := resp.Raw() + if err != nil { + return 0, fmt.Errorf("failed to read logs from pv capacity pod: %w node:%s", err, va.nodeName) + } + + pvReport := pvReport{} + err = json.Unmarshal(logs, &pvReport) + if err != nil { + return 0, fmt.Errorf("failed to format pvs output: %w node:%s", err, va.nodeName) + } + totalBytes, err := pvReport.totalFree() + if err != nil { + return 0, fmt.Errorf("%w node:%s", err, va.nodeName) + } + + lvmTypeBytes := int64(0) + switch va.lvmType { + case linearType, stripedType: + lvmTypeBytes = totalBytes + case mirrorType: + lvmTypeBytes = totalBytes / 2 + } + + klog.Infof("pvs output for remaining pv capacity: %d bytes node:%s", lvmTypeBytes, va.nodeName) + return lvmTypeBytes, nil +} + func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { if va.name == "" || va.nodeName == "" { return fmt.Errorf("invalid empty name or path or node") @@ -270,6 +380,42 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { } args = append(args, "--lvname", va.name, "--vgname", va.vgName) + provisionerPod, deleteFunc, err := createPod(ctx, "/csi-lvmplugin-provisioner", args, va) + if err != nil { + return err + } + defer deleteFunc() + + completed := false + retrySeconds := 60 + for i := 0; i < retrySeconds; i++ { + pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) + if pod.Status.Phase == v1.PodFailed { + // pod terminated in time, but with failure + // return ResourceExhausted so the requesting pod can be rescheduled to anonther node + // see https://github.com/kubernetes-csi/external-provisioner/pull/405 + klog.Info("provisioner pod terminated with failure") + return status.Error(codes.ResourceExhausted, "volume creation failed") + } + if err != nil { + klog.Errorf("error reading provisioner pod:%v", err) + } else if pod.Status.Phase == v1.PodSucceeded { + klog.Info("provisioner pod terminated successfully") + completed = true + break + } + klog.Infof("provisioner pod status:%s", pod.Status.Phase) + time.Sleep(1 * time.Second) + } + if !completed { + return fmt.Errorf("create process timeout after %v seconds", retrySeconds) + } + + klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName) + return nil +} + +func createPod(ctx context.Context, cmd string, args []string, va volumeAction) (*v1.Pod, func(), error) { klog.Infof("start provisionerPod with args:%s", args) hostPathType := v1.HostPathDirectoryOrCreate privileged := true @@ -290,7 +436,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { { Name: "csi-lvmplugin-" + string(va.action), Image: va.provisionerImage, - Command: []string{"/csi-lvmplugin-provisioner"}, + Command: []string{cmd}, Args: args, VolumeMounts: []v1.VolumeMount{ { @@ -377,6 +523,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { }, }, }, + { Name: "lvmlock", VolumeSource: v1.VolumeSource{ @@ -392,45 +539,17 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) { // If it already exists due to some previous errors, the pod will be cleaned up later automatically // https://github.com/rancher/local-path-provisioner/issues/27 - _, err = va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{}) + _, err := va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{}) if err != nil && !k8serror.IsAlreadyExists(err) { - return err + return nil, nil, err } - defer func() { + return provisionerPod, func() { e := va.kubeClient.CoreV1().Pods(va.namespace).Delete(ctx, provisionerPod.Name, metav1.DeleteOptions{}) if e != nil { klog.Errorf("unable to delete the provisioner pod: %v", e) } - }() - - completed := false - retrySeconds := 60 - for i := 0; i < retrySeconds; i++ { - pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{}) - if pod.Status.Phase == v1.PodFailed { - // pod terminated in time, but with failure - // return ResourceExhausted so the requesting pod can be rescheduled to anonther node - // see https://github.com/kubernetes-csi/external-provisioner/pull/405 - klog.Info("provisioner pod terminated with failure") - return status.Error(codes.ResourceExhausted, "volume creation failed") - } - if err != nil { - klog.Errorf("error reading provisioner pod:%v", err) - } else if pod.Status.Phase == v1.PodSucceeded { - klog.Info("provisioner pod terminated successfully") - completed = true - break - } - klog.Infof("provisioner pod status:%s", pod.Status.Phase) - time.Sleep(1 * time.Second) - } - if !completed { - return fmt.Errorf("create process timeout after %v seconds", retrySeconds) - } - - klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName) - return nil + }, nil } // VgExists checks if the given volume group exists @@ -526,8 +645,6 @@ func CreateLVS(vg string, name string, size uint64, lvmType string) (string, err return "", fmt.Errorf("lvmType is incorrect: %s", lvmType) } - // TODO: check available capacity, fail if request doesn't fit - args := []string{"-v", "--yes", "-n", name, "-W", "y", "-L", fmt.Sprintf("%db", size)} pvs, err := pvCount(vg) diff --git a/tests/bats/test.bats b/tests/bats/test.bats index 01eb564a..e55b4405 100644 --- a/tests/bats/test.bats +++ b/tests/bats/test.bats @@ -124,6 +124,11 @@ [ "$status" -eq 0 ] } +@test "check capacity tracking" { + run kubectl wait --for=jsonpath='{.status.phase}'=Running -f files/pod.inline.vol.xfs.yaml --timeout=10s + [ "$status" -eq 0 ] +} + @test "delete csi-lvm-controller" { echo "⏳ Wait 10s for all PVCs to be cleaned up..." >&3 sleep 10