Skip to content

Commit

Permalink
Capacity tracking support
Browse files Browse the repository at this point in the history
  • Loading branch information
jleeh committed Jan 19, 2024
1 parent c5a5b28 commit e64a484
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 46 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ require (
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230119192704-9d59e20e5cd1 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
35 changes: 31 additions & 4 deletions pkg/lvm/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package lvm

import (
"fmt"
"google.golang.org/protobuf/types/known/wrapperspb"
"strconv"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -55,13 +57,15 @@ func newControllerServer(ephemeral bool, nodeID string, devicesPattern string, v
}
// creates the clientset
kubeClient, err := kubernetes.NewForConfig(config)

if err != nil {
return nil, err
}
return &controllerServer{
caps: getControllerServiceCapabilities(
[]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_GET_CAPACITY,
// TODO
// csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
// csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
Expand Down Expand Up @@ -212,6 +216,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}

func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
nodeName := req.GetAccessibleTopology().GetSegments()[topologyKeyNode]
lvmType := req.GetParameters()["type"]
totalBytes, err := createGetCapacityPod(ctx, volumeAction{
action: "getcapacity",
name: fmt.Sprintf("%s-%s", lvmType, nodeName),
nodeName: nodeName,
lvmType: lvmType,
devicesPattern: cs.devicesPattern,
provisionerImage: cs.provisionerImage,
pullPolicy: cs.pullPolicy,
kubeClient: cs.kubeClient,
namespace: cs.namespace,
vgName: cs.vgName,
hostWritePath: cs.hostWritePath,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to query devices capacity: %v", err)
}

return &csi.GetCapacityResponse{
AvailableCapacity: totalBytes,
MaximumVolumeSize: wrapperspb.Int64(totalBytes),
MinimumVolumeSize: wrapperspb.Int64(0),
}, nil
}

func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.caps,
Expand Down Expand Up @@ -287,10 +318,6 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
Expand Down
170 changes: 129 additions & 41 deletions pkg/lvm/lvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package lvm
import (
"context"
"fmt"
"k8s.io/client-go/rest"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -81,17 +82,19 @@ type volumeAction struct {
}

const (
linearType = "linear"
stripedType = "striped"
mirrorType = "mirror"
actionTypeCreate = "create"
actionTypeDelete = "delete"
pullIfNotPresent = "ifnotpresent"
fsTypeRegexpString = `TYPE="(\w+)"`
linearType = "linear"
stripedType = "striped"
mirrorType = "mirror"
actionTypeCreate = "create"
actionTypeDelete = "delete"
pullIfNotPresent = "ifnotpresent"
fsTypeRegexpString = `TYPE="(\w+)"`
pvCapacityRegexpString = `B (\d+)B`
)

var (
fsTypeRegexp = regexp.MustCompile(fsTypeRegexpString)
fsTypeRegexp = regexp.MustCompile(fsTypeRegexpString)
pvCapacityRegexp = regexp.MustCompile(pvCapacityRegexpString)
)

// NewLvmDriver creates the driver
Expand Down Expand Up @@ -253,6 +256,82 @@ func umountLV(targetPath string) {
}
}

func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) {
pvCapacityRegexp, err := regexp.Compile(pvCapacityRegexpString)
if err != nil {
klog.Errorf("unable to compile regexp for querying physical volume capacity regexp:%s err:%v", pvCapacityRegexpString, err)
return 0, err
}

// Wrap command in a shell or the device pattern is wrapped in quotes causing pvdisplay to error
provisionerPod, deleteFunc, err := createPod(
ctx,
"sh",
[]string{"-c", fmt.Sprintf("pvdisplay %s %s %s", va.devicesPattern, "--units=B", "-C")},
va,
)
if err != nil {
return 0, err
}
defer deleteFunc()

completed := false
retrySeconds := 60
podLogRequest := &rest.Request{}

for i := 0; i < retrySeconds; i++ {
pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{})
if pod.Status.Phase == v1.PodFailed {
klog.Infof("get capacity pod terminated with failure node:%s", va.nodeName)
return 0, status.Error(codes.ResourceExhausted, "get capacity failed")
}
if err != nil {
klog.Errorf("error reading get capacity pod:%v node:%s", err, va.nodeName)
} else if pod.Status.Phase == v1.PodSucceeded {
klog.Infof("get capacity pod terminated successfully, getting logs node:%s", va.nodeName)
podLogRequest = va.kubeClient.CoreV1().Pods(va.namespace).GetLogs(provisionerPod.Name, &v1.PodLogOptions{})
completed = true
break
}
klog.Infof("get capacity pod status:%s node:%s", pod.Status.Phase, va.nodeName)
time.Sleep(1 * time.Second)
}
if !completed {
return 0, fmt.Errorf("get capacity process timeout after %v seconds node:%s", retrySeconds, va.nodeName)
}

resp := podLogRequest.Do(ctx)
if resp.Error() != nil {
return 0, fmt.Errorf("failed to get logs from pv capacity pod: %v node:%s", err, va.nodeName)
}
logs, err := resp.Raw()
if err != nil {
return 0, fmt.Errorf("failed to read logs from pv capacity pod: %v node:%s", err, va.nodeName)
}

matches := pvCapacityRegexp.FindStringSubmatch(string(logs))
totalBytes := int64(0)
for i := 1; i < len(matches); i++ {
i, err := strconv.ParseInt(matches[i], 10, 64)
if err != nil {
klog.Errorf("unable to parse returned free space output:%s err:%v node:%s", matches[i], err, va.nodeName)
return 0, err
}
totalBytes += i
}

lvmTypeBytes := int64(0)
switch va.lvmType {
case linearType, stripedType:
lvmTypeBytes = totalBytes
case mirrorType:
lvmTypeBytes = totalBytes / 2
}

klog.Infof("pvdisplay output for remaining pv capacity: %d bytes node:%s", lvmTypeBytes, va.nodeName)
return lvmTypeBytes, nil
}

func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
if va.name == "" || va.nodeName == "" {
return fmt.Errorf("invalid empty name or path or node")
Expand All @@ -270,6 +349,42 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
}
args = append(args, "--lvname", va.name, "--vgname", va.vgName)

provisionerPod, deleteFunc, err := createPod(ctx, "/csi-lvmplugin-provisioner", args, va)
if err != nil {
return err
}
defer deleteFunc()

completed := false
retrySeconds := 60
for i := 0; i < retrySeconds; i++ {
pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{})
if pod.Status.Phase == v1.PodFailed {
// pod terminated in time, but with failure
// return ResourceExhausted so the requesting pod can be rescheduled to anonther node
// see https://github.com/kubernetes-csi/external-provisioner/pull/405
klog.Info("provisioner pod terminated with failure")
return status.Error(codes.ResourceExhausted, "volume creation failed")
}
if err != nil {
klog.Errorf("error reading provisioner pod:%v", err)
} else if pod.Status.Phase == v1.PodSucceeded {
klog.Info("provisioner pod terminated successfully")
completed = true
break
}
klog.Infof("provisioner pod status:%s", pod.Status.Phase)
time.Sleep(1 * time.Second)
}
if !completed {
return fmt.Errorf("create process timeout after %v seconds", retrySeconds)
}

klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName)
return nil
}

func createPod(ctx context.Context, cmd string, args []string, va volumeAction) (*v1.Pod, func(), error) {
klog.Infof("start provisionerPod with args:%s", args)
hostPathType := v1.HostPathDirectoryOrCreate
privileged := true
Expand All @@ -290,7 +405,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
{
Name: "csi-lvmplugin-" + string(va.action),
Image: va.provisionerImage,
Command: []string{"/csi-lvmplugin-provisioner"},
Command: []string{cmd},
Args: args,
VolumeMounts: []v1.VolumeMount{
{
Expand Down Expand Up @@ -377,6 +492,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
},
},
},

{
Name: "lvmlock",
VolumeSource: v1.VolumeSource{
Expand All @@ -392,45 +508,17 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {

// If it already exists due to some previous errors, the pod will be cleaned up later automatically
// https://github.com/rancher/local-path-provisioner/issues/27
_, err = va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{})
_, err := va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{})
if err != nil && !k8serror.IsAlreadyExists(err) {
return err
return nil, nil, err
}

defer func() {
return provisionerPod, func() {
e := va.kubeClient.CoreV1().Pods(va.namespace).Delete(ctx, provisionerPod.Name, metav1.DeleteOptions{})
if e != nil {
klog.Errorf("unable to delete the provisioner pod: %v", e)
}
}()

completed := false
retrySeconds := 60
for i := 0; i < retrySeconds; i++ {
pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{})
if pod.Status.Phase == v1.PodFailed {
// pod terminated in time, but with failure
// return ResourceExhausted so the requesting pod can be rescheduled to anonther node
// see https://github.com/kubernetes-csi/external-provisioner/pull/405
klog.Info("provisioner pod terminated with failure")
return status.Error(codes.ResourceExhausted, "volume creation failed")
}
if err != nil {
klog.Errorf("error reading provisioner pod:%v", err)
} else if pod.Status.Phase == v1.PodSucceeded {
klog.Info("provisioner pod terminated successfully")
completed = true
break
}
klog.Infof("provisioner pod status:%s", pod.Status.Phase)
time.Sleep(1 * time.Second)
}
if !completed {
return fmt.Errorf("create process timeout after %v seconds", retrySeconds)
}

klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName)
return nil
}, nil
}

// VgExists checks if the given volume group exists
Expand Down

0 comments on commit e64a484

Please sign in to comment.