Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capacity tracking support #89

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
golang.org/x/net v0.5.0
golang.org/x/sys v0.4.0
google.golang.org/grpc v1.53.0
google.golang.org/protobuf v1.28.1
k8s.io/api v0.26.0
k8s.io/apimachinery v0.26.0
k8s.io/client-go v0.26.0
Expand Down Expand Up @@ -43,7 +44,6 @@ require (
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230119192704-9d59e20e5cd1 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
35 changes: 31 additions & 4 deletions pkg/lvm/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package lvm

import (
"fmt"
"google.golang.org/protobuf/types/known/wrapperspb"
"strconv"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -55,13 +57,15 @@ func newControllerServer(ephemeral bool, nodeID string, devicesPattern string, v
}
// creates the clientset
kubeClient, err := kubernetes.NewForConfig(config)

if err != nil {
return nil, err
}
return &controllerServer{
caps: getControllerServiceCapabilities(
[]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_GET_CAPACITY,
// TODO
// csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
// csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
Expand Down Expand Up @@ -212,6 +216,33 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}

func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
nodeName := req.GetAccessibleTopology().GetSegments()[topologyKeyNode]
lvmType := req.GetParameters()["type"]
totalBytes, err := createGetCapacityPod(ctx, volumeAction{
action: "getcapacity",
name: fmt.Sprintf("%s-%s", lvmType, nodeName),
nodeName: nodeName,
lvmType: lvmType,
devicesPattern: cs.devicesPattern,
provisionerImage: cs.provisionerImage,
pullPolicy: cs.pullPolicy,
kubeClient: cs.kubeClient,
namespace: cs.namespace,
vgName: cs.vgName,
hostWritePath: cs.hostWritePath,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to query devices capacity: %v", err)
}

return &csi.GetCapacityResponse{
AvailableCapacity: totalBytes,
MaximumVolumeSize: wrapperspb.Int64(totalBytes),
MinimumVolumeSize: wrapperspb.Int64(0),
}, nil
}

func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.caps,
Expand Down Expand Up @@ -287,10 +318,6 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
Expand Down
187 changes: 152 additions & 35 deletions pkg/lvm/lvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/util/json"
"k8s.io/client-go/rest"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -80,6 +83,29 @@ type volumeAction struct {
hostWritePath string
}

type pvReport struct {
Report []struct {
PV []struct {
PVName string `json:"pv_name"`
PVFree string `json:"pv_free"`
} `json:"pv"`
} `json:"report"`
}

func (p *pvReport) totalFree() (int64, error) {
totalFree := int64(0)
for _, report := range p.Report {
for _, pv := range report.PV {
free, err := strconv.ParseInt(pv.PVFree, 10, 0)
if err != nil {
return 0, fmt.Errorf("failed to parse free space for device %s with error: %w", pv.PVName, err)
}
totalFree += free
}
}
return totalFree, nil
}

const (
linearType = "linear"
stripedType = "striped"
Expand Down Expand Up @@ -253,6 +279,90 @@ func umountLV(targetPath string) {
}
}

func createGetCapacityPod(ctx context.Context, va volumeAction) (int64, error) {
// Prior to running the pvs command, run pvcreate with the device pattern as physical volumes aren't initialised
// if the node is new. This would result in 0 capacity causing PVCs to never be scheduled onto a new node.
pvCreateVA := va
pvCreateVA.action = "pvcreate"
_, deleteFunc, err := createPod(
ctx,
"sh",
[]string{"-c", fmt.Sprintf("pvcreate %s", va.devicesPattern)},
pvCreateVA,
)
if err != nil {
return 0, err
}
defer deleteFunc()

// Wrap command in a shell or the device pattern is wrapped in quotes causing pvs to error
provisionerPod, deleteFunc, err := createPod(
ctx,
"sh",
[]string{"-c", fmt.Sprintf("pvs %s %s %s %s %s", va.devicesPattern, "--units=B", "--reportformat=json", "--nosuffix", "2>/dev/null || true")},
va,
)
if err != nil {
return 0, err
}
defer deleteFunc()

completed := false
retrySeconds := 60
podLogRequest := &rest.Request{}

for i := 0; i < retrySeconds; i++ {
pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{})
if pod.Status.Phase == v1.PodFailed {
klog.Infof("get capacity pod terminated with failure node:%s", va.nodeName)
return 0, status.Error(codes.ResourceExhausted, "get capacity failed")
}
if err != nil {
klog.Errorf("error reading get capacity pod:%v node:%s", err, va.nodeName)
} else if pod.Status.Phase == v1.PodSucceeded {
klog.Infof("get capacity pod terminated successfully, getting logs node:%s", va.nodeName)
podLogRequest = va.kubeClient.CoreV1().Pods(va.namespace).GetLogs(provisionerPod.Name, &v1.PodLogOptions{})
completed = true
break
}
klog.Infof("get capacity pod status:%s node:%s", pod.Status.Phase, va.nodeName)
time.Sleep(1 * time.Second)
}
if !completed {
return 0, fmt.Errorf("get capacity process timeout after %v seconds node:%s", retrySeconds, va.nodeName)
}

resp := podLogRequest.Do(ctx)
if resp.Error() != nil {
return 0, fmt.Errorf("failed to get logs from pv capacity pod: %w node:%s", resp.Error(), va.nodeName)
}
logs, err := resp.Raw()
if err != nil {
return 0, fmt.Errorf("failed to read logs from pv capacity pod: %w node:%s", err, va.nodeName)
}

pvReport := pvReport{}
err = json.Unmarshal(logs, &pvReport)
if err != nil {
return 0, fmt.Errorf("failed to format pvs output: %w node:%s", err, va.nodeName)
}
totalBytes, err := pvReport.totalFree()
if err != nil {
return 0, fmt.Errorf("%w node:%s", err, va.nodeName)
}

lvmTypeBytes := int64(0)
switch va.lvmType {
case linearType, stripedType:
lvmTypeBytes = totalBytes
case mirrorType:
lvmTypeBytes = totalBytes / 2
}

klog.Infof("pvs output for remaining pv capacity: %d bytes node:%s", lvmTypeBytes, va.nodeName)
return lvmTypeBytes, nil
}

func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
if va.name == "" || va.nodeName == "" {
return fmt.Errorf("invalid empty name or path or node")
Expand All @@ -270,6 +380,42 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
}
args = append(args, "--lvname", va.name, "--vgname", va.vgName)

provisionerPod, deleteFunc, err := createPod(ctx, "/csi-lvmplugin-provisioner", args, va)
if err != nil {
return err
}
defer deleteFunc()

completed := false
retrySeconds := 60
for i := 0; i < retrySeconds; i++ {
pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{})
if pod.Status.Phase == v1.PodFailed {
// pod terminated in time, but with failure
// return ResourceExhausted so the requesting pod can be rescheduled to anonther node
// see https://github.com/kubernetes-csi/external-provisioner/pull/405
klog.Info("provisioner pod terminated with failure")
return status.Error(codes.ResourceExhausted, "volume creation failed")
}
if err != nil {
klog.Errorf("error reading provisioner pod:%v", err)
} else if pod.Status.Phase == v1.PodSucceeded {
klog.Info("provisioner pod terminated successfully")
completed = true
break
}
klog.Infof("provisioner pod status:%s", pod.Status.Phase)
time.Sleep(1 * time.Second)
}
if !completed {
return fmt.Errorf("create process timeout after %v seconds", retrySeconds)
}

klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName)
return nil
}

func createPod(ctx context.Context, cmd string, args []string, va volumeAction) (*v1.Pod, func(), error) {
klog.Infof("start provisionerPod with args:%s", args)
hostPathType := v1.HostPathDirectoryOrCreate
privileged := true
Expand All @@ -290,7 +436,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
{
Name: "csi-lvmplugin-" + string(va.action),
Image: va.provisionerImage,
Command: []string{"/csi-lvmplugin-provisioner"},
Command: []string{cmd},
Args: args,
VolumeMounts: []v1.VolumeMount{
{
Expand Down Expand Up @@ -377,6 +523,7 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
},
},
},

{
Name: "lvmlock",
VolumeSource: v1.VolumeSource{
Expand All @@ -392,45 +539,17 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {

// If it already exists due to some previous errors, the pod will be cleaned up later automatically
// https://github.com/rancher/local-path-provisioner/issues/27
_, err = va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{})
_, err := va.kubeClient.CoreV1().Pods(va.namespace).Create(ctx, provisionerPod, metav1.CreateOptions{})
if err != nil && !k8serror.IsAlreadyExists(err) {
return err
return nil, nil, err
}

defer func() {
return provisionerPod, func() {
e := va.kubeClient.CoreV1().Pods(va.namespace).Delete(ctx, provisionerPod.Name, metav1.DeleteOptions{})
if e != nil {
klog.Errorf("unable to delete the provisioner pod: %v", e)
}
}()

completed := false
retrySeconds := 60
for i := 0; i < retrySeconds; i++ {
pod, err := va.kubeClient.CoreV1().Pods(va.namespace).Get(ctx, provisionerPod.Name, metav1.GetOptions{})
if pod.Status.Phase == v1.PodFailed {
// pod terminated in time, but with failure
// return ResourceExhausted so the requesting pod can be rescheduled to anonther node
// see https://github.com/kubernetes-csi/external-provisioner/pull/405
klog.Info("provisioner pod terminated with failure")
return status.Error(codes.ResourceExhausted, "volume creation failed")
}
if err != nil {
klog.Errorf("error reading provisioner pod:%v", err)
} else if pod.Status.Phase == v1.PodSucceeded {
klog.Info("provisioner pod terminated successfully")
completed = true
break
}
klog.Infof("provisioner pod status:%s", pod.Status.Phase)
time.Sleep(1 * time.Second)
}
if !completed {
return fmt.Errorf("create process timeout after %v seconds", retrySeconds)
}

klog.Infof("Volume %v has been %vd on %v", va.name, va.action, va.nodeName)
return nil
}, nil
}

// VgExists checks if the given volume group exists
Expand Down Expand Up @@ -526,8 +645,6 @@ func CreateLVS(vg string, name string, size uint64, lvmType string) (string, err
return "", fmt.Errorf("lvmType is incorrect: %s", lvmType)
}

// TODO: check available capacity, fail if request doesn't fit

args := []string{"-v", "--yes", "-n", name, "-W", "y", "-L", fmt.Sprintf("%db", size)}

pvs, err := pvCount(vg)
Expand Down
5 changes: 5 additions & 0 deletions tests/bats/test.bats
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@
[ "$status" -eq 0 ]
}

@test "check capacity tracking" {
run kubectl wait --for=jsonpath='{.status.phase}'=Running -f files/pod.inline.vol.xfs.yaml --timeout=10s
[ "$status" -eq 0 ]
}

@test "delete csi-lvm-controller" {
echo "⏳ Wait 10s for all PVCs to be cleaned up..." >&3
sleep 10
Expand Down