Skip to content

Commit

Permalink
enable topology aware scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Roushan committed Aug 26, 2021
1 parent 51eae88 commit dbbeaf4
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 36 deletions.
232 changes: 202 additions & 30 deletions src/csi/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import (
"csi/backend/plugin"
"errors"
"fmt"
"math/rand"
"regexp"
"sync"
"utils"
"utils/log"
)

const (
TopologyRequirement = "topologyRequirement"
SupportedTopologies = "supportedTopologies"
)

var (
mutex sync.Mutex
csiBackends = make(map[string]*Backend)
Expand All @@ -22,6 +28,7 @@ var (
{"qos", filterByQos},
{"hyperMetro", filterByMetro},
{"replication", filterByReplication},
{TopologyRequirement, filterByTopology},
}

secondaryFilterFuncs = [][]interface{}{
Expand All @@ -32,6 +39,11 @@ var (
}
)

type AccessibleTopology struct {
RequisiteTopologies []map[string]string
PreferredTopologies []map[string]string
}

type StoragePool struct {
Name string
Storage string
Expand All @@ -41,12 +53,13 @@ type StoragePool struct {
}

type Backend struct {
Name string
Storage string
Available bool
Plugin plugin.Plugin
Pools []*StoragePool
Parameters map[string]interface{}
Name string
Storage string
Available bool
Plugin plugin.Plugin
Pools []*StoragePool
Parameters map[string]interface{}
SupportedTopologies []map[string]string

MetroDomain string
MetrovStorePairID string
Expand Down Expand Up @@ -109,6 +122,28 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er
return nil, errors.New("parameters must be configured for backend")
}

supportedTopologies := make([]map[string]string, 0)
if topologies, exist := config[SupportedTopologies]; exist {
topologyArray, ok := topologies.([]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
for _, topologyArrElem := range topologyArray {
topologyMap, ok := topologyArrElem.(map[string]interface{})
if !ok {
return nil, errors.New("invalid supported topologies configuration")
}
tempMap := make(map[string]string, 0)
for topologyKey, value := range topologyMap {
if topologyValue, ok := value.(string); ok {
tempMap[topologyKey] = topologyValue
}
}

supportedTopologies = append(supportedTopologies, tempMap)
}
}

plugin := plugin.GetPlugin(storage)
if plugin == nil {
return nil, fmt.Errorf("Cannot get plugin for storage %s", storage)
Expand All @@ -126,15 +161,16 @@ func newBackend(backendName string, config map[string]interface{}) (*Backend, er
}

return &Backend{
Name: backendName,
Storage: storage,
Available: false,
Plugin: plugin,
Parameters: parameters,
MetroDomain: metroDomain,
MetrovStorePairID: metrovStorePairID,
ReplicaBackendName: replicaBackend,
MetroBackendName: metroBackend,
Name: backendName,
Storage: storage,
Available: false,
SupportedTopologies: supportedTopologies,
Plugin: plugin,
Parameters: parameters,
MetroDomain: metroDomain,
MetrovStorePairID: metrovStorePairID,
ReplicaBackendName: replicaBackend,
MetroBackendName: metroBackend,
}, nil
}

Expand Down Expand Up @@ -269,12 +305,14 @@ func selectOnePool(requestSize int64,
}

for _, i := range filterFuncs {
key, filter := i[0].(string), i[1].(func(string, []*StoragePool) []*StoragePool)
value, _ := parameters[key].(string)
filterPools = filter(value, filterPools)
if len(filterPools) == 0 {
return nil, fmt.Errorf("failed to select pool, the last filter field: %s, parameters %v",
key, parameters)
key, filter := i[0].(string), i[1].(func(interface{}, []*StoragePool) []*StoragePool)
value, exists := parameters[key]
if exists {
filterPools = filter(value, filterPools)
if len(filterPools) == 0 {
return nil, fmt.Errorf("failed to select pool, the last filter field: %s, parameters %v",
key, parameters)
}
}
}

Expand Down Expand Up @@ -354,9 +392,14 @@ func SelectStoragePool(requestSize int64, parameters map[string]interface{}) (*S
return localPool, remotePool, nil
}

func filterByBackendName(backendName string, candidatePools []*StoragePool) []*StoragePool {
func filterByBackendName(iBackendName interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

backendName, ok := iBackendName.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
if backendName == "" || backendName == pool.Parent {
filterPools = append(filterPools, pool)
Expand All @@ -366,9 +409,14 @@ func filterByBackendName(backendName string, candidatePools []*StoragePool) []*S
return filterPools
}

func filterByStoragePool(poolName string, candidatePools []*StoragePool) []*StoragePool {
func filterByStoragePool(iPoolName interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

poolName, ok := iPoolName.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
if poolName == "" || poolName == pool.Name {
filterPools = append(filterPools, pool)
Expand All @@ -378,9 +426,14 @@ func filterByStoragePool(poolName string, candidatePools []*StoragePool) []*Stor
return filterPools
}

func filterByVolumeType(volumeType string, candidatePools []*StoragePool) []*StoragePool {
func filterByVolumeType(iVolumeType interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

volumeType, ok := iVolumeType.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
if volumeType == "lun" || volumeType == "" {
if pool.Storage == "oceanstor-san" || pool.Storage == "fusionstorage-san" {
Expand All @@ -396,9 +449,14 @@ func filterByVolumeType(volumeType string, candidatePools []*StoragePool) []*Sto
return filterPools
}

func filterByAllocType(allocType string, candidatePools []*StoragePool) []*StoragePool {
func filterByAllocType(iAllocType interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

allocType, ok := iAllocType.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
valid := false

Expand All @@ -420,9 +478,14 @@ func filterByAllocType(allocType string, candidatePools []*StoragePool) []*Stora
return filterPools
}

func filterByQos(qos string, candidatePools []*StoragePool) []*StoragePool {
func filterByQos(iqos interface{}, candidatePools []*StoragePool) []*StoragePool {
var filterPools []*StoragePool

qos, ok := iqos.(string)
if !ok {
return candidatePools
}

for _, pool := range candidatePools {
if qos != "" {
supportQoS, exist := pool.Capabilities["SupportQoS"].(bool)
Expand All @@ -437,8 +500,9 @@ func filterByQos(qos string, candidatePools []*StoragePool) []*StoragePool {
return filterPools
}

func filterByMetro(hyperMetro string, candidatePools []*StoragePool) []*StoragePool {
if len(hyperMetro) == 0 || !utils.StrToBool(hyperMetro) {
func filterByMetro(iHyperMetro interface{}, candidatePools []*StoragePool) []*StoragePool {
hyperMetro, ok := iHyperMetro.(string)
if !ok || len(hyperMetro) == 0 || !utils.StrToBool(hyperMetro) {
return candidatePools
}

Expand All @@ -458,8 +522,9 @@ func filterByMetro(hyperMetro string, candidatePools []*StoragePool) []*StorageP
return filterPools
}

func filterByReplication(replication string, candidatePools []*StoragePool) []*StoragePool {
if len(replication) == 0 || !utils.StrToBool(replication) {
func filterByReplication(iReplication interface{}, candidatePools []*StoragePool) []*StoragePool {
replication, ok := iReplication.(string)
if !ok || len(replication) == 0 || !utils.StrToBool(replication) {
return candidatePools
}

Expand All @@ -478,3 +543,110 @@ func filterByReplication(replication string, candidatePools []*StoragePool) []*S

return filterPools
}

// filterByTopology returns a subset of the provided pools that can support any of the topology requirement.
func filterByTopology(iTopologyRequirement interface{}, candidatePools []*StoragePool) []*StoragePool {
topologyRequirement, ok := iTopologyRequirement.(AccessibleTopology)
if !ok || len(topologyRequirement.RequisiteTopologies) == 0 {
return candidatePools
}

filterPools := filterPoolsOnTopology(candidatePools, topologyRequirement.RequisiteTopologies)
if len(filterPools) == 0 {
log.Infoln("no backend pools support any requisite topologies")
}
return sortPoolsByPreferredTopologies(filterPools, topologyRequirement.PreferredTopologies)
}

// isTopologySupportedByBackend returns whether the specific backend can create volumes accessible by the given topology
func isTopologySupportedByBackend(backend *Backend, topology map[string]string) bool {
requisiteFound := false
for _, supported := range backend.SupportedTopologies {
eachFound := true
for k, v := range topology {
if sup, ok := supported[k]; !ok || (sup != v) {
eachFound = false
break
}
}
if eachFound {
requisiteFound = true
}
}
return requisiteFound
}

// filterPoolsOnTopology returns a subset of the provided pools that can support any of the requisiteTopologies.
func filterPoolsOnTopology(candidatePools []*StoragePool, requisiteTopologies []map[string]string) []*StoragePool {
filteredPools := make([]*StoragePool, 0)

if len(requisiteTopologies) == 0 {
return candidatePools
}

for _, pool := range candidatePools {
backend, exist := csiBackends[pool.Parent]
if !exist {
continue
}

if len(backend.SupportedTopologies) > 0 {
for _, topology := range requisiteTopologies {
if isTopologySupportedByBackend(backend, topology) {
filteredPools = append(filteredPools, pool)
break
}
}
} else {
filteredPools = append(filteredPools, pool)
}
}

return filteredPools
}

// sortPoolsByPreferredTopologies returns a list of pools ordered by the pools supportedTopologies field against
// the provided list of preferredTopologies. If 2 or more pools can support a given preferredTopology, they are shuffled
// randomly within that segment of the list, in order to prevent hotspots.
func sortPoolsByPreferredTopologies(candidatePools []*StoragePool, preferredTopologies []map[string]string) []*StoragePool {
remainingPools := make([]*StoragePool, len(candidatePools))
copy(remainingPools, candidatePools)
orderedPools := make([]*StoragePool, 0)

for _, preferred := range preferredTopologies {
newRemainingPools := make([]*StoragePool, 0)
poolBucket := make([]*StoragePool, 0)

for _, pool := range remainingPools {
backend, exist := csiBackends[pool.Parent]
if !exist {
continue
}
// If it supports topology, pop it and add to bucket. Otherwise, add it to newRemaining pools to be
// addressed in future loop iterations.
if isTopologySupportedByBackend(backend, preferred) {
poolBucket = append(poolBucket, pool)
} else {
newRemainingPools = append(newRemainingPools, pool)
}
}

// make new list of remaining pools
remainingPools = make([]*StoragePool, len(newRemainingPools))
copy(remainingPools, newRemainingPools)

// shuffle bucket
rand.Shuffle(len(poolBucket), func(i, j int) {
poolBucket[i], poolBucket[j] = poolBucket[j], poolBucket[i]
})

// add all in bucket to final list
orderedPools = append(orderedPools, poolBucket...)
}

// shuffle and add leftover pools the did not match any preference
rand.Shuffle(len(remainingPools), func(i, j int) {
remainingPools[i], remainingPools[j] = remainingPools[j], remainingPools[i]
})
return append(orderedPools, remainingPools...)
}
27 changes: 27 additions & 0 deletions src/csi/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}
}

// Get topology requirements
accessibleTopology := req.GetAccessibilityRequirements()
if accessibleTopology != nil {
var requisiteTopologies = make([]map[string]string, 0)
for _, requisite := range accessibleTopology.GetRequisite() {
requirement := make(map[string]string)
for k, v := range requisite.GetSegments() {
requirement[k] = v
}
requisiteTopologies = append(requisiteTopologies, requirement)
}

var preferredTopologies = make([]map[string]string, 0)
for _, preferred := range accessibleTopology.GetPreferred() {
preference := make(map[string]string)
for k, v := range preferred.GetSegments() {
preference[k] = v
}
preferredTopologies = append(preferredTopologies, preference)
}

parameters[backend.TopologyRequirement] = backend.AccessibleTopology{
RequisiteTopologies: requisiteTopologies,
PreferredTopologies: preferredTopologies,
}
}

localPool, remotePool, err := backend.SelectStoragePool(size, parameters)
if err != nil {
log.Errorf("Cannot select pool for volume creation: %v", err)
Expand Down
Loading

0 comments on commit dbbeaf4

Please sign in to comment.