Skip to content

Commit

Permalink
added code changes for protocol topological awarness
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Roushan committed Sep 7, 2021
1 parent c78fe52 commit b616bf6
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 166 deletions.
119 changes: 89 additions & 30 deletions src/csi/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"regexp"
"sync"
"utils"
"utils/k8sutils"
"utils/log"
)

const (
// TopologyRequirement constant for topology filter function
TopologyRequirement = "topologyRequirement"
SupportedTopologies = "supportedTopologies"
// supported topology key in CSI plugin configuration
supportedTopologiesKey = "supportedTopologies"
)

var (
Expand All @@ -39,6 +42,7 @@ var (
}
)

// AccessibleTopology represents selected node topology
type AccessibleTopology struct {
RequisiteTopologies []map[string]string
PreferredTopologies []map[string]string
Expand Down Expand Up @@ -122,26 +126,10 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er
return nil, errors.New("parameters must be configured for backend")
}

supportedTopologies := make([]map[string]string, 0)
if topologies, exist := config[SupportedTopologies]; exist {
topologyArray, ok := topologies.([]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
for _, topologyArrElem := range topologyArray {
topologyMap, ok := topologyArrElem.(map[string]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
tempMap := make(map[string]string, 0)
for topologyKey, value := range topologyMap {
if topologyValue, ok := value.(string); ok {
tempMap[topologyKey] = topologyValue
}
}

supportedTopologies = append(supportedTopologies, tempMap)
}
// Get supported topologies for backend
supportedTopologies, err := getSupportedTopologies(config)
if err != nil {
return nil, err
}

plugin := plugin.GetPlugin(storage)
Expand Down Expand Up @@ -174,6 +162,50 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er
}, nil
}

func getSupportedTopologies(config map[string]interface{}) ([]map[string]string, error) {
supportedTopologies := make([]map[string]string, 0)

topologies, exist := config[supportedTopologiesKey]
if !exist {
return supportedTopologies, nil
}

// populate configured topologies
topologyArray, ok := topologies.([]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
for _, topologyArrElem := range topologyArray {
topologyMap, ok := topologyArrElem.(map[string]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
tempMap := make(map[string]string, 0)
for topologyKey, value := range topologyMap {
if topologyValue, ok := value.(string); ok {
tempMap[topologyKey] = topologyValue
}
}
supportedTopologies = append(supportedTopologies, tempMap)
}

return supportedTopologies, nil
}

// addProtocolTopology add up protocol specific topological support
func addProtocolTopology(backend *Backend, driverName string) {
proto, protocolAvailable := backend.Parameters["protocol"]
if protocol, isString := proto.(string); protocolAvailable && isString {
backend.SupportedTopologies = append(backend.SupportedTopologies, map[string]string{
k8sutils.TopologyPrefix + "/protocol." + protocol: driverName,
})
return
}

log.Warningf("supported topology for protocol may not work as protocol is miss configured " +
"in backend configuration")
}

func analyzeBackend(config map[string]interface{}) (*Backend, error) {
backendName, exist := config["name"].(string)
if !exist {
Expand Down Expand Up @@ -245,7 +277,7 @@ func updateReplicaBackends() {
}
}

func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool) error {
func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool, driverName string) error {
for _, i := range backendConfigs {
backend, err := analyzeBackend(i)
if err != nil {
Expand All @@ -259,6 +291,18 @@ func RegisterBackend(backendConfigs []map[string]interface{}, keepLogin bool) er
return err
}

// Note: Protocol is considered as special topological parameter. The protocol topology
// is populated internally by plugin using protocol name.
// If configured protocol for backend is "iscsi", CSI plugin internally add
// topology.kubernetes.io/protocol.iscsi = csi.huawei.com in supportedTopologies.
//
// Now users can opt to provision volumes based on protocol by
// 1. Labeling kubernetes nodes with protocol specific label (ie topology.kubernetes.io/protocol.iscsi = csi.huawei.com)
// 2. Configure topology support in plugin
// 3. Configure protocol topology in allowedTopologies fo Storage class
// addProtocolTopology is called after backend plugin init as init takes care of protocol validation
addProtocolTopology(backend, driverName)

csiBackends[backend.Name] = backend
}

Expand Down Expand Up @@ -585,20 +629,23 @@ func filterPoolsOnTopology(candidatePools []*StoragePool, requisiteTopologies []
}

for _, pool := range candidatePools {
// mutex lock acquired in pool selection
backend, exist := csiBackends[pool.Parent]
if !exist {
continue
}

if len(backend.SupportedTopologies) > 0 {
for _, topology := range requisiteTopologies {
if isTopologySupportedByBackend(backend, topology) {
filteredPools = append(filteredPools, pool)
break
}
}
} else {
// when backend is not configured with supported topology
if len(backend.SupportedTopologies) == 0 {
filteredPools = append(filteredPools, pool)
continue
}

for _, topology := range requisiteTopologies {
if isTopologySupportedByBackend(backend, topology) {
filteredPools = append(filteredPools, pool)
break
}
}
}

Expand Down Expand Up @@ -650,3 +697,15 @@ func sortPoolsByPreferredTopologies(candidatePools []*StoragePool, preferredTopo
})
return append(orderedPools, remainingPools...)
}

func (pool *StoragePool) GetSupportedTopologies() []map[string]string {
mutex.Lock()
defer mutex.Unlock()
backend, exist := csiBackends[pool.Parent]
if !exist {
log.Warningf("Backend [%v] does not exist in CSI backend pool", pool.Parent)
return make([]map[string]string, 0)
}

return backend.SupportedTopologies
}
152 changes: 94 additions & 58 deletions src/csi/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,53 +35,14 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
parameters["backend"], parameters["cloneFrom"] = utils.SplitVolumeId(cloneFrom)
}

contentSource := req.GetVolumeContentSource()
if contentSource != nil {
if contentSnapshot := contentSource.GetSnapshot(); contentSnapshot != nil {
sourceSnapshotId := contentSnapshot.GetSnapshotId()
sourceBackendName, snapshotParentId, sourceSnapshotName := utils.SplitSnapshotId(sourceSnapshotId)
parameters["sourceSnapshotName"] = sourceSnapshotName
parameters["snapshotParentId"] = snapshotParentId
parameters["backend"] = sourceBackendName
log.Infof("Start to create volume from snapshot %s", sourceSnapshotName)
} else if contentVolume := contentSource.GetVolume(); contentVolume != nil {
sourceVolumeId := contentVolume.GetVolumeId()
sourceBackendName, sourceVolumeName := utils.SplitVolumeId(sourceVolumeId)
parameters["sourceVolumeName"] = sourceVolumeName
parameters["backend"] = sourceBackendName
log.Infof("Start to create volume from volume %s", sourceVolumeName)
} else {
log.Errorf("The source %s is not snapshot either volume", contentSource)
return nil, status.Error(codes.InvalidArgument, "no source ID provided is invalid")
}
// process volume content source
err := d.processVolumeContentSource(req, parameters)
if err != nil {
return nil, err
}

// Get topology requirements
accessibleTopology := req.GetAccessibilityRequirements()
if accessibleTopology != nil {
var requisiteTopologies = make([]map[string]string, 0)
for _, requisite := range accessibleTopology.GetRequisite() {
requirement := make(map[string]string)
for k, v := range requisite.GetSegments() {
requirement[k] = v
}
requisiteTopologies = append(requisiteTopologies, requirement)
}

var preferredTopologies = make([]map[string]string, 0)
for _, preferred := range accessibleTopology.GetPreferred() {
preference := make(map[string]string)
for k, v := range preferred.GetSegments() {
preference[k] = v
}
preferredTopologies = append(preferredTopologies, preference)
}

parameters[backend.TopologyRequirement] = backend.AccessibleTopology{
RequisiteTopologies: requisiteTopologies,
PreferredTopologies: preferredTopologies,
}
}
// process accessibility requirements
d.processAccessibilityRequirements(req, parameters)

localPool, remotePool, err := backend.SelectStoragePool(size, parameters)
if err != nil {
Expand All @@ -104,30 +65,105 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)

log.Infof("Volume %s is created", name)

volume, err := d.getCreatedVolume(req, volName, localPool)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return &csi.CreateVolumeResponse{
Volume: volume,
}, nil
}

func (d *Driver) getCreatedVolume(req *csi.CreateVolumeRequest, volName string, pool *backend.StoragePool) (*csi.Volume, error) {
contentSource := req.GetVolumeContentSource()
size := req.GetCapacityRange().GetRequiredBytes()

accessibleTopologies := make([]*csi.Topology, 0)
supportedTopology := pool.GetSupportedTopologies()
if len(supportedTopology) > 0 {
for _, segment := range supportedTopology {
accessibleTopologies = append(accessibleTopologies, &csi.Topology{Segments: segment})
}
}

if contentSource != nil {
attributes := map[string]string{
"backend": localPool.Parent,
"backend": pool.Parent,
"name": volName,
}

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: localPool.Parent + "." + volName,
CapacityBytes: size,
VolumeContext: attributes,
ContentSource: req.VolumeContentSource,
},
return &csi.Volume{
VolumeId: pool.Parent + "." + volName,
CapacityBytes: size,
VolumeContext: attributes,
ContentSource: contentSource,
AccessibleTopology: accessibleTopologies,
}, nil
}

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: localPool.Parent + "." + volName,
CapacityBytes: size,
},
return &csi.Volume{
VolumeId: pool.Parent + "." + volName,
CapacityBytes: size,
AccessibleTopology: accessibleTopologies,
}, nil
}

func (d *Driver) processVolumeContentSource(req *csi.CreateVolumeRequest, parameters map[string]interface{}) error {
contentSource := req.GetVolumeContentSource()
if contentSource != nil {
if contentSnapshot := contentSource.GetSnapshot(); contentSnapshot != nil {
sourceSnapshotId := contentSnapshot.GetSnapshotId()
sourceBackendName, snapshotParentId, sourceSnapshotName := utils.SplitSnapshotId(sourceSnapshotId)
parameters["sourceSnapshotName"] = sourceSnapshotName
parameters["snapshotParentId"] = snapshotParentId
parameters["backend"] = sourceBackendName
log.Infof("Start to create volume from snapshot %s", sourceSnapshotName)
} else if contentVolume := contentSource.GetVolume(); contentVolume != nil {
sourceVolumeId := contentVolume.GetVolumeId()
sourceBackendName, sourceVolumeName := utils.SplitVolumeId(sourceVolumeId)
parameters["sourceVolumeName"] = sourceVolumeName
parameters["backend"] = sourceBackendName
log.Infof("Start to create volume from volume %s", sourceVolumeName)
} else {
log.Errorf("The source %s is not snapshot either volume", contentSource)
return status.Error(codes.InvalidArgument, "no source ID provided is invalid")
}
}

return nil
}

func (d *Driver) processAccessibilityRequirements(req *csi.CreateVolumeRequest, parameters map[string]interface{}) {
accessibleTopology := req.GetAccessibilityRequirements()
if accessibleTopology == nil {
return
}

var requisiteTopologies = make([]map[string]string, 0)
for _, requisite := range accessibleTopology.GetRequisite() {
requirement := make(map[string]string)
for k, v := range requisite.GetSegments() {
requirement[k] = v
}
requisiteTopologies = append(requisiteTopologies, requirement)
}

var preferredTopologies = make([]map[string]string, 0)
for _, preferred := range accessibleTopology.GetPreferred() {
preference := make(map[string]string)
for k, v := range preferred.GetSegments() {
preference[k] = v
}
preferredTopologies = append(preferredTopologies, preference)
}

parameters[backend.TopologyRequirement] = backend.AccessibleTopology{
RequisiteTopologies: requisiteTopologies,
PreferredTopologies: preferredTopologies,
}
}

func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeId := req.GetVolumeId()

Expand Down
6 changes: 3 additions & 3 deletions src/csi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
"absolute path to the kubeconfig file")
nodeName = flag.String("nodename",
os.Getenv(nodeNameEnv),
"absolute path to the kubeconfig file")
"node name in kubernetes cluster")

config CSIConfig
secret CSISecret
Expand Down Expand Up @@ -182,14 +182,14 @@ func main() {
}()

if *controller || *controllerFlagFile != "" {
err := backend.RegisterBackend(config.Backends, true)
err := backend.RegisterBackend(config.Backends, true, *driverName)
if err != nil {
log.Fatalf("Register backends error: %v", err)
}

go updateBackendCapabilities()
} else {
err := backend.RegisterBackend(config.Backends, false)
err := backend.RegisterBackend(config.Backends, false, *driverName)
if err != nil {
log.Fatalf("Register backends error: %v", err)
}
Expand Down
Loading

0 comments on commit b616bf6

Please sign in to comment.