diff --git a/src/csi/backend/backend.go b/src/csi/backend/backend.go index 314f0b58..5bc074c2 100644 --- a/src/csi/backend/backend.go +++ b/src/csi/backend/backend.go @@ -8,12 +8,15 @@ import ( "regexp" "sync" "utils" + "utils/k8sutils" "utils/log" ) const ( + // TopologyRequirement constant for topology filter function TopologyRequirement = "topologyRequirement" - SupportedTopologies = "supportedTopologies" + // supported topology key in CSI plugin configuration + supportedTopologiesKey = "supportedTopologies" ) var ( @@ -39,6 +42,7 @@ var ( } ) +// AccessibleTopology represents selected node topology type AccessibleTopology struct { RequisiteTopologies []map[string]string PreferredTopologies []map[string]string @@ -122,26 +126,10 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er return nil, errors.New("parameters must be configured for backend") } - supportedTopologies := make([]map[string]string, 0) - if topologies, exist := config[SupportedTopologies]; exist { - topologyArray, ok := topologies.([]interface{}) - if !ok { - return nil, errors.New("invalid supported topologies configuration") - } - for _, topologyArrElem := range topologyArray { - topologyMap, ok := topologyArrElem.(map[string]interface{}) - if !ok { - return nil, errors.New("invalid supported topologies configuration") - } - tempMap := make(map[string]string, 0) - for topologyKey, value := range topologyMap { - if topologyValue, ok := value.(string); ok { - tempMap[topologyKey] = topologyValue - } - } - - supportedTopologies = append(supportedTopologies, tempMap) - } + // Get supported topologies for backend + supportedTopologies, err := getSupportedTopologies(config) + if err != nil { + return nil, err } plugin := plugin.GetPlugin(storage) @@ -174,6 +162,50 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er }, nil } +func getSupportedTopologies(config map[string]interface{}) ([]map[string]string, error) { + supportedTopologies := make([]map[string]string, 0) + + topologies, exist := config[supportedTopologiesKey] + if !exist { + return supportedTopologies, nil + } + + // populate configured topologies + topologyArray, ok := topologies.([]interface{}) + if !ok { + return nil, errors.New("invalid supported topologies configuration") + } + for _, topologyArrElem := range topologyArray { + topologyMap, ok := topologyArrElem.(map[string]interface{}) + if !ok { + return nil, errors.New("invalid supported topologies configuration") + } + tempMap := make(map[string]string, 0) + for topologyKey, value := range topologyMap { + if topologyValue, ok := value.(string); ok { + tempMap[topologyKey] = topologyValue + } + } + supportedTopologies = append(supportedTopologies, tempMap) + } + + return supportedTopologies, nil +} + +// addProtocolTopology add up protocol specific topological support +func addProtocolTopology(backend *Backend, driverName string) { + proto, protocolAvailable := backend.Parameters["protocol"] + if protocol, isString := proto.(string); protocolAvailable && isString { + backend.SupportedTopologies = append(backend.SupportedTopologies, map[string]string{ + k8sutils.TopologyPrefix + "/protocol." + protocol: driverName, + }) + return + } + + log.Warningf("supported topology for protocol may not work as protocol is miss configured " + + "in backend configuration") +} + func analyzeBackend(config map[string]interface{}) (*Backend, error) { backendName, exist := config["name"].(string) if !exist { @@ -245,7 +277,7 @@ func updateReplicaBackends() { } } -func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool) error { +func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool, driverName string) error { for _, i := range backendConfigs { backend, err := analyzeBackend(i) if err != nil { @@ -259,6 +291,18 @@ func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool) er return err } + // Note: Protocol is considered as special topological parameter. The protocol topology + // is populated internally by plugin using protocol name. + // If configured protocol for backend is "iscsi", CSI plugin internally add + // topology.kubernetes.io/protocol.iscsi = csi.huawei.com in supportedTopologies. + // + // Now users can opt to provision volumes based on protocol by + // 1. Labeling kubernetes nodes with protocol specific label (ie topology.kubernetes.io/protocol.iscsi = csi.huawei.com) + // 2. Configure topology support in plugin + // 3. Configure protocol topology in allowedTopologies fo Storage class + // addProtocolTopology is called after backend plugin init as init takes care of protocol validation + addProtocolTopology(backend, driverName) + csiBackends[backend.Name] = backend } @@ -585,20 +629,23 @@ func filterPoolsOnTopology(candidatePools []*StoragePool, requisiteTopologies [] } for _, pool := range candidatePools { + // mutex lock acquired in pool selection backend, exist := csiBackends[pool.Parent] if !exist { continue } - if len(backend.SupportedTopologies) > 0 { - for _, topology := range requisiteTopologies { - if isTopologySupportedByBackend(backend, topology) { - filteredPools = append(filteredPools, pool) - break - } - } - } else { + // when backend is not configured with supported topology + if len(backend.SupportedTopologies) == 0 { filteredPools = append(filteredPools, pool) + continue + } + + for _, topology := range requisiteTopologies { + if isTopologySupportedByBackend(backend, topology) { + filteredPools = append(filteredPools, pool) + break + } } } @@ -650,3 +697,15 @@ func sortPoolsByPreferredTopologies(candidatePools []*StoragePool, preferredTopo }) return append(orderedPools, remainingPools...) } + +func (pool *StoragePool) GetSupportedTopologies() []map[string]string { + mutex.Lock() + defer mutex.Unlock() + backend, exist := csiBackends[pool.Parent] + if !exist { + log.Warningf("Backend [%v] does not exist in CSI backend pool", pool.Parent) + return make([]map[string]string, 0) + } + + return backend.SupportedTopologies +} diff --git a/src/csi/driver/controller.go b/src/csi/driver/controller.go index 8c38ae17..ccdcf4e7 100644 --- a/src/csi/driver/controller.go +++ b/src/csi/driver/controller.go @@ -35,53 +35,14 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) parameters["backend"], parameters["cloneFrom"] = utils.SplitVolumeId(cloneFrom) } - contentSource := req.GetVolumeContentSource() - if contentSource != nil { - if contentSnapshot := contentSource.GetSnapshot(); contentSnapshot != nil { - sourceSnapshotId := contentSnapshot.GetSnapshotId() - sourceBackendName, snapshotParentId, sourceSnapshotName := utils.SplitSnapshotId(sourceSnapshotId) - parameters["sourceSnapshotName"] = sourceSnapshotName - parameters["snapshotParentId"] = snapshotParentId - parameters["backend"] = sourceBackendName - log.Infof("Start to create volume from snapshot %s", sourceSnapshotName) - } else if contentVolume := contentSource.GetVolume(); contentVolume != nil { - sourceVolumeId := contentVolume.GetVolumeId() - sourceBackendName, sourceVolumeName := utils.SplitVolumeId(sourceVolumeId) - parameters["sourceVolumeName"] = sourceVolumeName - parameters["backend"] = sourceBackendName - log.Infof("Start to create volume from volume %s", sourceVolumeName) - } else { - log.Errorf("The source %s is not snapshot either volume", contentSource) - return nil, status.Error(codes.InvalidArgument, "no source ID provided is invalid") - } + // process volume content source + err := d.processVolumeContentSource(req, parameters) + if err != nil { + return nil, err } - // Get topology requirements - accessibleTopology := req.GetAccessibilityRequirements() - if accessibleTopology != nil { - var requisiteTopologies = make([]map[string]string, 0) - for _, requisite := range accessibleTopology.GetRequisite() { - requirement := make(map[string]string) - for k, v := range requisite.GetSegments() { - requirement[k] = v - } - requisiteTopologies = append(requisiteTopologies, requirement) - } - - var preferredTopologies = make([]map[string]string, 0) - for _, preferred := range accessibleTopology.GetPreferred() { - preference := make(map[string]string) - for k, v := range preferred.GetSegments() { - preference[k] = v - } - preferredTopologies = append(preferredTopologies, preference) - } - - parameters[backend.TopologyRequirement] = backend.AccessibleTopology{ - RequisiteTopologies: requisiteTopologies, - PreferredTopologies: preferredTopologies, - } - } + // process accessibility requirements + d.processAccessibilityRequirements(req, parameters) localPool, remotePool, err := backend.SelectStoragePool(size, parameters) if err != nil { @@ -104,30 +65,105 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) log.Infof("Volume %s is created", name) + volume, err := d.getCreatedVolume(req, volName, localPool) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.CreateVolumeResponse{ + Volume: volume, + }, nil +} + +func (d *Driver) getCreatedVolume(req *csi.CreateVolumeRequest, volName string, pool *backend.StoragePool) (*csi.Volume, error) { + contentSource := req.GetVolumeContentSource() + size := req.GetCapacityRange().GetRequiredBytes() + + accessibleTopologies := make([]*csi.Topology, 0) + supportedTopology := pool.GetSupportedTopologies() + if len(supportedTopology) > 0 { + for _, segment := range supportedTopology { + accessibleTopologies = append(accessibleTopologies, &csi.Topology{Segments: segment}) + } + } + if contentSource != nil { attributes := map[string]string{ - "backend": localPool.Parent, + "backend": pool.Parent, "name": volName, } - return &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: localPool.Parent + "." + volName, - CapacityBytes: size, - VolumeContext: attributes, - ContentSource: req.VolumeContentSource, - }, + return &csi.Volume{ + VolumeId: pool.Parent + "." + volName, + CapacityBytes: size, + VolumeContext: attributes, + ContentSource: contentSource, + AccessibleTopology: accessibleTopologies, }, nil } - return &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: localPool.Parent + "." + volName, - CapacityBytes: size, - }, + return &csi.Volume{ + VolumeId: pool.Parent + "." + volName, + CapacityBytes: size, + AccessibleTopology: accessibleTopologies, }, nil } +func (d *Driver) processVolumeContentSource(req *csi.CreateVolumeRequest, parameters map[string]interface{}) error { + contentSource := req.GetVolumeContentSource() + if contentSource != nil { + if contentSnapshot := contentSource.GetSnapshot(); contentSnapshot != nil { + sourceSnapshotId := contentSnapshot.GetSnapshotId() + sourceBackendName, snapshotParentId, sourceSnapshotName := utils.SplitSnapshotId(sourceSnapshotId) + parameters["sourceSnapshotName"] = sourceSnapshotName + parameters["snapshotParentId"] = snapshotParentId + parameters["backend"] = sourceBackendName + log.Infof("Start to create volume from snapshot %s", sourceSnapshotName) + } else if contentVolume := contentSource.GetVolume(); contentVolume != nil { + sourceVolumeId := contentVolume.GetVolumeId() + sourceBackendName, sourceVolumeName := utils.SplitVolumeId(sourceVolumeId) + parameters["sourceVolumeName"] = sourceVolumeName + parameters["backend"] = sourceBackendName + log.Infof("Start to create volume from volume %s", sourceVolumeName) + } else { + log.Errorf("The source %s is not snapshot either volume", contentSource) + return status.Error(codes.InvalidArgument, "no source ID provided is invalid") + } + } + + return nil +} + +func (d *Driver) processAccessibilityRequirements(req *csi.CreateVolumeRequest, parameters map[string]interface{}) { + accessibleTopology := req.GetAccessibilityRequirements() + if accessibleTopology == nil { + return + } + + var requisiteTopologies = make([]map[string]string, 0) + for _, requisite := range accessibleTopology.GetRequisite() { + requirement := make(map[string]string) + for k, v := range requisite.GetSegments() { + requirement[k] = v + } + requisiteTopologies = append(requisiteTopologies, requirement) + } + + var preferredTopologies = make([]map[string]string, 0) + for _, preferred := range accessibleTopology.GetPreferred() { + preference := make(map[string]string) + for k, v := range preferred.GetSegments() { + preference[k] = v + } + preferredTopologies = append(preferredTopologies, preference) + } + + parameters[backend.TopologyRequirement] = backend.AccessibleTopology{ + RequisiteTopologies: requisiteTopologies, + PreferredTopologies: preferredTopologies, + } +} + func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeId := req.GetVolumeId() diff --git a/src/csi/main.go b/src/csi/main.go index 5f781d14..66b2860d 100644 --- a/src/csi/main.go +++ b/src/csi/main.go @@ -62,7 +62,7 @@ var ( "absolute path to the kubeconfig file") nodeName = flag.String("nodename", os.Getenv(nodeNameEnv), - "absolute path to the kubeconfig file") + "node name in kubernetes cluster") config CSIConfig secret CSISecret @@ -182,14 +182,14 @@ func main() { }() if *controller || *controllerFlagFile != "" { - err := backend.RegisterBackend(config.Backends, true) + err := backend.RegisterBackend(config.Backends, true, *driverName) if err != nil { log.Fatalf("Register backends error: %v", err) } go updateBackendCapabilities() } else { - err := backend.RegisterBackend(config.Backends, false) + err := backend.RegisterBackend(config.Backends, false, *driverName) if err != nil { log.Fatalf("Register backends error: %v", err) } diff --git a/src/utils/k8sutils/k8s_utils.go b/src/utils/k8sutils/k8s_utils.go new file mode 100644 index 00000000..8a38ec75 --- /dev/null +++ b/src/utils/k8sutils/k8s_utils.go @@ -0,0 +1,94 @@ +/* + Copyright (c) Huawei Technologies Co., Ltd. 2021-2021. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package k8sutils provides Kubernetes utilities +package k8sutils + +import ( + "fmt" + "regexp" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + // TopologyPrefix supported by CSI plugin + TopologyPrefix = "topology.kubernetes.io" + topologyRegx = TopologyPrefix + "/.*" +) + +// Interface is a kubernetes utility interface required by CSI plugin to interact with Kubernetes +type Interface interface { + // GetNodeTopology returns configured kubernetes node's topological labels + GetNodeTopology(nodeName string) (map[string]string, error) +} + +type kubeClient struct { + clientSet *kubernetes.Clientset +} + +// NewK8SUtils returns an object of Kubernetes utility interface +func NewK8SUtils(kubeConfig string) (Interface, error) { + var clientset *kubernetes.Clientset + + if kubeConfig != "" { + config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) + if err != nil { + return nil, err + } + + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + } else { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + clientset, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + } + + return &kubeClient{ + clientSet: clientset, + }, nil +} + +func (k *kubeClient) GetNodeTopology(nodeName string) (map[string]string, error) { + k8sNode, err := k.getNode(nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get node topology with error: %v", err) + } + + topology := make(map[string]string) + for key, value := range k8sNode.Labels { + if match, err := regexp.MatchString(topologyRegx, key); err == nil && match { + topology[key] = value + } + } + + return topology, nil +} + +func (k *kubeClient) getNode(nodeName string) (*corev1.Node, error) { + return k.clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) +} diff --git a/src/utils/k8sutils/k8sutils.go b/src/utils/k8sutils/k8sutils.go deleted file mode 100644 index dff1deac..00000000 --- a/src/utils/k8sutils/k8sutils.go +++ /dev/null @@ -1,75 +0,0 @@ -package k8sutils - -import ( - "errors" - "fmt" - "regexp" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" -) - -const ( - topologyRegx = "topology.kubernetes.io/*" -) - -type Interface interface { - GetNodeTopology(nodeName string) (map[string]string, error) -} - -type KubeClient struct { - clientSet *kubernetes.Clientset -} - -func NewK8SUtils(kubeConfig string) (Interface, error) { - var clientset *kubernetes.Clientset - - if kubeConfig != "" { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - return nil, err - } - - clientset, err = kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - } else { - config, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - - clientset, err = kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - } - - return &KubeClient{ - clientSet: clientset, - }, nil -} - -func (k *KubeClient) GetNodeTopology(nodeName string) (map[string]string, error) { - k8sNode, err := k.getNode(nodeName) - if err != nil { - return nil, errors.New(fmt.Sprintf("failed to get node topology with error: %v", err)) - } - - topology := make(map[string]string) - for key, value := range k8sNode.Labels { - if match, err := regexp.MatchString(topologyRegx, key); err == nil && match { - topology[key] = value - } - } - - return topology, nil -} - -func (k *KubeClient) getNode(nodeName string) (*corev1.Node, error) { - return k.clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) -}