Skip to content

Commit

Permalink
added code changes for protocol topological awarness
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Roushan committed Sep 7, 2021
1 parent c78fe52 commit e116c0c
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 133 deletions.
106 changes: 76 additions & 30 deletions src/csi/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"regexp"
"sync"
"utils"
"utils/k8sutils"
"utils/log"
)

const (
// TopologyRequirement constant for topology filter function
TopologyRequirement = "topologyRequirement"
SupportedTopologies = "supportedTopologies"
// supported topology key in CSI plugin configuration
supportedTopologiesKey = "supportedTopologies"
)

var (
Expand All @@ -39,6 +42,7 @@ var (
}
)

// AccessibleTopology represents selected node topology
type AccessibleTopology struct {
RequisiteTopologies []map[string]string
PreferredTopologies []map[string]string
Expand Down Expand Up @@ -122,26 +126,10 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er
return nil, errors.New("parameters must be configured for backend")
}

supportedTopologies := make([]map[string]string, 0)
if topologies, exist := config[SupportedTopologies]; exist {
topologyArray, ok := topologies.([]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
for _, topologyArrElem := range topologyArray {
topologyMap, ok := topologyArrElem.(map[string]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
tempMap := make(map[string]string, 0)
for topologyKey, value := range topologyMap {
if topologyValue, ok := value.(string); ok {
tempMap[topologyKey] = topologyValue
}
}

supportedTopologies = append(supportedTopologies, tempMap)
}
// Get supported topologies for backend
supportedTopologies, err := getSupportedTopologies(config)
if err != nil {
return nil, err
}

plugin := plugin.GetPlugin(storage)
Expand Down Expand Up @@ -174,6 +162,50 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er
}, nil
}

func getSupportedTopologies(config map[string]interface{}) ([]map[string]string, error) {
supportedTopologies := make([]map[string]string, 0)

topologies, exist := config[supportedTopologiesKey]
if !exist {
return supportedTopologies, nil
}

// populate configured topologies
topologyArray, ok := topologies.([]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
for _, topologyArrElem := range topologyArray {
topologyMap, ok := topologyArrElem.(map[string]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
tempMap := make(map[string]string, 0)
for topologyKey, value := range topologyMap {
if topologyValue, ok := value.(string); ok {
tempMap[topologyKey] = topologyValue
}
}
supportedTopologies = append(supportedTopologies, tempMap)
}

return supportedTopologies, nil
}

// addProtocolTopology add up protocol specific topological support
func addProtocolTopology(backend *Backend, driverName string) {
proto, protocolAvailable := backend.Parameters["protocol"]
if protocol, isString := proto.(string); protocolAvailable && isString {
backend.SupportedTopologies = append(backend.SupportedTopologies, map[string]string{
k8sutils.TopologyPrefix + "/protocol." + protocol: driverName,
})
return
}

log.Warningf("supported topology for protocol may not work as protocol is miss configured " +
"in backend configuration")
}

func analyzeBackend(config map[string]interface{}) (*Backend, error) {
backendName, exist := config["name"].(string)
if !exist {
Expand Down Expand Up @@ -245,7 +277,7 @@ func updateReplicaBackends() {
}
}

func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool) error {
func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool, driverName string) error {
for _, i := range backendConfigs {
backend, err := analyzeBackend(i)
if err != nil {
Expand All @@ -259,6 +291,18 @@ func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool) er
return err
}

// Note: Protocol is considered as special topological parameter. The protocol topology
// is populated internally by plugin using protocol name.
// If configured protocol for backend is "iscsi", CSI plugin internally add
// topology.kubernetes.io/protocol.iscsi = csi.huawei.com in supportedTopologies.
//
// Now users can opt to provision volumes based on protocol by
// 1. Labeling kubernetes nodes with protocol specific label (ie topology.kubernetes.io/protocol.iscsi = csi.huawei.com)
// 2. Configure topology support in plugin
// 3. Configure protocol topology in allowedTopologies fo Storage class
// addProtocolTopology is called after backend plugin init as init takes care of protocol validation
addProtocolTopology(backend, driverName)

csiBackends[backend.Name] = backend
}

Expand Down Expand Up @@ -590,15 +634,17 @@ func filterPoolsOnTopology(candidatePools []*StoragePool, requisiteTopologies []
continue
}

if len(backend.SupportedTopologies) > 0 {
for _, topology := range requisiteTopologies {
if isTopologySupportedByBackend(backend, topology) {
filteredPools = append(filteredPools, pool)
break
}
}
} else {
// when backend is not configured with supported topology
if len(backend.SupportedTopologies) == 0 {
filteredPools = append(filteredPools, pool)
continue
}

for _, topology := range requisiteTopologies {
if isTopologySupportedByBackend(backend, topology) {
filteredPools = append(filteredPools, pool)
break
}
}
}

Expand Down
58 changes: 32 additions & 26 deletions src/csi/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,8 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}
}

// Get topology requirements
accessibleTopology := req.GetAccessibilityRequirements()
if accessibleTopology != nil {
var requisiteTopologies = make([]map[string]string, 0)
for _, requisite := range accessibleTopology.GetRequisite() {
requirement := make(map[string]string)
for k, v := range requisite.GetSegments() {
requirement[k] = v
}
requisiteTopologies = append(requisiteTopologies, requirement)
}

var preferredTopologies = make([]map[string]string, 0)
for _, preferred := range accessibleTopology.GetPreferred() {
preference := make(map[string]string)
for k, v := range preferred.GetSegments() {
preference[k] = v
}
preferredTopologies = append(preferredTopologies, preference)
}

parameters[backend.TopologyRequirement] = backend.AccessibleTopology{
RequisiteTopologies: requisiteTopologies,
PreferredTopologies: preferredTopologies,
}
}
// process accessibility requirements
d.processAccessibilityRequirements(req, parameters)

localPool, remotePool, err := backend.SelectStoragePool(size, parameters)
if err != nil {
Expand Down Expand Up @@ -128,6 +104,36 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}, nil
}

func (d *Driver) processAccessibilityRequirements(req *csi.CreateVolumeRequest, parameters map[string]interface{}) {
accessibleTopology := req.GetAccessibilityRequirements()
if accessibleTopology == nil {
return
}

var requisiteTopologies = make([]map[string]string, 0)
for _, requisite := range accessibleTopology.GetRequisite() {
requirement := make(map[string]string)
for k, v := range requisite.GetSegments() {
requirement[k] = v
}
requisiteTopologies = append(requisiteTopologies, requirement)
}

var preferredTopologies = make([]map[string]string, 0)
for _, preferred := range accessibleTopology.GetPreferred() {
preference := make(map[string]string)
for k, v := range preferred.GetSegments() {
preference[k] = v
}
preferredTopologies = append(preferredTopologies, preference)
}

parameters[backend.TopologyRequirement] = backend.AccessibleTopology{
RequisiteTopologies: requisiteTopologies,
PreferredTopologies: preferredTopologies,
}
}

func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeId := req.GetVolumeId()

Expand Down
4 changes: 2 additions & 2 deletions src/csi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ func main() {
}()

if *controller || *controllerFlagFile != "" {
err := backend.RegisterBackend(config.Backends, true)
err := backend.RegisterBackend(config.Backends, true, *driverName)
if err != nil {
log.Fatalf("Register backends error: %v", err)
}

go updateBackendCapabilities()
} else {
err := backend.RegisterBackend(config.Backends, false)
err := backend.RegisterBackend(config.Backends, false, *driverName)
if err != nil {
log.Fatalf("Register backends error: %v", err)
}
Expand Down
94 changes: 94 additions & 0 deletions src/utils/k8sutils/k8s_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright (c) Huawei Technologies Co., Ltd. 2021-2021. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package k8sutils provides Kubernetes utilities
package k8sutils

import (
"fmt"
"regexp"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const (
// TopologyPrefix supported by CSI plugin
TopologyPrefix = "topology.kubernetes.io"
topologyRegx = TopologyPrefix + "/.*"
)

// Interface is a kubernetes utility interface required by CSI plugin to interact with Kubernetes
type Interface interface {
// GetNodeTopology returns configured kubernetes node's topological labels
GetNodeTopology(nodeName string) (map[string]string, error)
}

type kubeClient struct {
clientSet *kubernetes.Clientset
}

// NewK8SUtils returns an object of Kubernetes utility interface
func NewK8SUtils(kubeConfig string) (Interface, error) {
var clientset *kubernetes.Clientset

if kubeConfig != "" {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return nil, err
}

clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
} else {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
}

return &kubeClient{
clientSet: clientset,
}, nil
}

func (k *kubeClient) GetNodeTopology(nodeName string) (map[string]string, error) {
k8sNode, err := k.getNode(nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get node topology with error: %v", err)
}

topology := make(map[string]string)
for key, value := range k8sNode.Labels {
if match, err := regexp.MatchString(topologyRegx, key); err == nil && match {
topology[key] = value
}
}

return topology, nil
}

func (k *kubeClient) getNode(nodeName string) (*corev1.Node, error) {
return k.clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
}
Loading

0 comments on commit e116c0c

Please sign in to comment.