Skip to content

Commit

Permalink
add topology aware provisioning for controller service
Browse files Browse the repository at this point in the history
  • Loading branch information
AmitRoushan committed Aug 24, 2021
1 parent 1b765fe commit 2027c17
Showing 1 changed file with 181 additions and 25 deletions.
206 changes: 181 additions & 25 deletions src/csi/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import (
"csi/backend/plugin"
"errors"
"fmt"
"math/rand"
"regexp"
"sync"
"utils"
"utils/log"
)

const (
TopologyRequirement = "topologyRequirement"
SupportedTopologies = "supportedTopologies"
)

var (
mutex sync.Mutex
csiBackends = make(map[string]*Backend)
Expand All @@ -22,6 +28,7 @@ var (
{"qos", filterByQos},
{"hyperMetro", filterByMetro},
{"replication", filterByReplication},
{"topologyRequirement", filterByTopology},
}

secondaryFilterFuncs = [][]interface{}{
Expand All @@ -32,6 +39,13 @@ var (
}
)

type FilterFunction func(interface{}, []*StoragePool) []*StoragePool

type AccessibleTopology struct {
RequisiteTopologies []map[string]string
PreferredTopologies []map[string]string
}

type StoragePool struct {
Name string
Storage string
Expand All @@ -41,12 +55,13 @@ type StoragePool struct {
}

type Backend struct {
Name string
Storage string
Available bool
Plugin plugin.Plugin
Pools []*StoragePool
Parameters map[string]interface{}
Name string
Storage string
Available bool
Plugin plugin.Plugin
Pools []*StoragePool
Parameters map[string]interface{}
SupportedTopologies []map[string]string

MetroDomain string
MetrovStorePairID string
Expand Down Expand Up @@ -109,6 +124,11 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er
return nil, errors.New("parameters must be configured for backend")
}

supportedTopologies, exists := config[SupportedTopologies].([]map[string]string)
if !exists {
return nil, errors.New("invalid supportedTopologies configured for backend")
}

plugin := plugin.GetPlugin(storage)
if plugin == nil {
return nil, fmt.Errorf("Cannot get plugin for storage %s", storage)
Expand All @@ -126,15 +146,16 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er
}

return &Backend{
Name: backendName,
Storage: storage,
Available: false,
Plugin: plugin,
Parameters: parameters,
MetroDomain: metroDomain,
MetrovStorePairID: metrovStorePairID,
ReplicaBackendName: replicaBackend,
MetroBackendName: metroBackend,
Name: backendName,
Storage: storage,
Available: false,
SupportedTopologies: supportedTopologies,
Plugin: plugin,
Parameters: parameters,
MetroDomain: metroDomain,
MetrovStorePairID: metrovStorePairID,
ReplicaBackendName: replicaBackend,
MetroBackendName: metroBackend,
}, nil
}

Expand Down Expand Up @@ -269,7 +290,7 @@ func selectOnePool(requestSize int64,
}

for _, i := range filterFuncs {
key, filter := i[0].(string), i[1].(func(string, []*StoragePool) []*StoragePool)
key, filter := i[0].(string), i[1].(FilterFunction)
value, _ := parameters[key].(string)
filterPools = filter(value, filterPools)
if len(filterPools) == 0 {
Expand Down Expand Up @@ -354,9 +375,14 @@ func SelectStoragePool(requestSize int64, parameters map[string]interface{}) (*S
return localPool, remotePool, nil
}

func filterByBackendName(backendName string, candidatePools []*StoragePool) []*StoragePool {
func filterByBackendName(iBackendName interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

backendName, ok := iBackendName.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
if backendName == "" || backendName == pool.Parent {
filterPools = append(filterPools, pool)
Expand All @@ -366,9 +392,14 @@ func filterByBackendName(backendName string, candidatePools []*StoragePool) []*S
return filterPools
}

func filterByStoragePool(poolName string, candidatePools []*StoragePool) []*StoragePool {
func filterByStoragePool(iPoolName interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

poolName, ok := iPoolName.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
if poolName == "" || poolName == pool.Name {
filterPools = append(filterPools, pool)
Expand All @@ -378,9 +409,14 @@ func filterByStoragePool(poolName string, candidatePools []*StoragePool) []*Stor
return filterPools
}

func filterByVolumeType(volumeType string, candidatePools []*StoragePool) []*StoragePool {
func filterByVolumeType(iVolumeType interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

volumeType, ok := iVolumeType.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
if volumeType == "lun" || volumeType == "" {
if pool.Storage == "oceanstor-san" || pool.Storage == "fusionstorage-san" {
Expand All @@ -396,9 +432,14 @@ func filterByVolumeType(volumeType string, candidatePools []*StoragePool) []*Sto
return filterPools
}

func filterByAllocType(allocType string, candidatePools []*StoragePool) []*StoragePool {
func filterByAllocType(iAllocType interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

allocType, ok := iAllocType.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
valid := false

Expand All @@ -420,9 +461,14 @@ func filterByAllocType(allocType string, candidatePools []*StoragePool) []*Stora
return filterPools
}

func filterByQos(qos string, candidatePools []*StoragePool) []*StoragePool {
func filterByQos(iqos interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

qos, ok := iqos.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
if qos != "" {
supportQoS, exist := pool.Capabilities["SupportQoS"].(bool)
Expand All @@ -437,8 +483,9 @@ func filterByQos(qos string, candidatePools []*StoragePool) []*StoragePool {
return filterPools
}

func filterByMetro(hyperMetro string, candidatePools []*StoragePool) []*StoragePool {
if len(hyperMetro) == 0 || !utils.StrToBool(hyperMetro) {
func filterByMetro(iHyperMetro interface{}, candidatePools []*StoragePool) []*StoragePool {
hyperMetro, ok := iHyperMetro.(string)
if !ok || len(hyperMetro) == 0 || !utils.StrToBool(hyperMetro) {
return candidatePools
}

Expand All @@ -458,8 +505,9 @@ func filterByMetro(hyperMetro string, candidatePools []*StoragePool) []*StorageP
return filterPools
}

func filterByReplication(replication string, candidatePools []*StoragePool) []*StoragePool {
if len(replication) == 0 || !utils.StrToBool(replication) {
func filterByReplication(iReplication interface{}, candidatePools []*StoragePool) []*StoragePool {
replication, ok := iReplication.(string)
if !ok || len(replication) == 0 || !utils.StrToBool(replication) {
return candidatePools
}

Expand All @@ -478,3 +526,111 @@ func filterByReplication(replication string, candidatePools []*StoragePool) []*S

return filterPools
}

// filterByTopology returns a subset of the provided pools that can support any of the topology requirement.
func filterByTopology(iTopologyRequirement interface{}, candidatePools []*StoragePool) []*StoragePool {
topologyRequirement, ok := iTopologyRequirement.(AccessibleTopology)
if !ok || len(topologyRequirement.RequisiteTopologies) == 0 {
return candidatePools
}

filterPools := filterPoolsOnTopology(candidatePools, topologyRequirement.RequisiteTopologies)
if len(filterPools) == 0 {
log.Infoln("no backend pools support any requisite topologies")
}
return sortPoolsByPreferredTopologies(filterPools, topologyRequirement.PreferredTopologies)
}

// isTopologySupportedByBackend returns whether the specific backend can create volumes accessible by the given topology
func isTopologySupportedByBackend(backend *Backend, topology map[string]string) bool {
requisiteFound := false
for _, supported := range backend.SupportedTopologies {
eachFound := true
for k, v := range topology {
if sup, ok := supported[k]; ok && sup != v {
eachFound = false
break
}
}
if eachFound {
requisiteFound = true
}
}
return requisiteFound
}

// filterPoolsOnTopology returns a subset of the provided pools that can support any of the requisiteTopologies.
func filterPoolsOnTopology(candidatePools []*StoragePool, requisiteTopologies []map[string]string) []*StoragePool {
filteredPools := make([]*StoragePool, 0)

if len(requisiteTopologies) == 0 {
return candidatePools
}

for _, pool := range candidatePools {
backend, exist := csiBackends[pool.Parent]
if !exist {
continue
}

if len(backend.SupportedTopologies) > 0 {
for _, topology := range requisiteTopologies {
if isTopologySupportedByBackend(backend, topology) {
filteredPools = append(filteredPools, pool)
break
}
}
} else {
filteredPools = append(filteredPools, pool)
}
}

return filteredPools
}

// sortPoolsByPreferredTopologies returns a list of pools ordered by the pools supportedTopologies field against
// the provided list of preferredTopologies. If 2 or more pools can support a given preferredTopology, they are shuffled
// randomly within that segment of the list, in order to prevent hotspots.
func sortPoolsByPreferredTopologies(candidatePools []*StoragePool, preferredTopologies []map[string]string) []*StoragePool {
remainingPools := make([]*StoragePool, len(candidatePools))
copy(remainingPools, candidatePools)
orderedPools := make([]*StoragePool, 0)

for _, preferred := range preferredTopologies {

newRemainingPools := make([]*StoragePool, 0)
poolBucket := make([]*StoragePool, 0)

for _, pool := range remainingPools {
backend, exist := csiBackends[pool.Parent]
if !exist {
continue
}
// If it supports topology, pop it and add to bucket. Otherwise, add it to newRemaining pools to be
// addressed in future loop iterations.
if isTopologySupportedByBackend(backend, preferred) {
poolBucket = append(poolBucket, pool)
} else {
newRemainingPools = append(newRemainingPools, pool)
}
}

// make new list of remaining pools
remainingPools = make([]*StoragePool, len(newRemainingPools))
copy(remainingPools, newRemainingPools)

// shuffle bucket
rand.Shuffle(len(poolBucket), func(i, j int) {
poolBucket[i], poolBucket[j] = poolBucket[j], poolBucket[i]
})

// add all in bucket to final list
orderedPools = append(orderedPools, poolBucket...)
}

// shuffle and add leftover pools the did not match any preference
rand.Shuffle(len(remainingPools), func(i, j int) {
remainingPools[i], remainingPools[j] = remainingPools[j], remainingPools[i]
})
return append(orderedPools, remainingPools...)
}

0 comments on commit 2027c17

Please sign in to comment.