Skip to content

Commit

Permalink
[DEVOPS-635] Refactor waiting for resources to be ready
Browse files Browse the repository at this point in the history
  • Loading branch information
Julio Chana committed Dec 26, 2017
1 parent da955e4 commit 11d12b2
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 85 deletions.
119 changes: 34 additions & 85 deletions pkg/failover/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,23 +340,7 @@ func (r *RedisFailoverKubeClient) CreateBootstrapPod(rf *RedisFailover) error {
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for pod to be ready")
ready := false
pod, _ = r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
for _, condition := range pod.Status.Conditions {

if condition.Type == "Ready" && condition.Status == v1.ConditionTrue {
ready = true
break
}
}
if ready {
t.Stop()
break
}
}
r.waitForPod(rf.Metadata.Name, rf.Metadata.Namespace, logger)

return nil
}
Expand Down Expand Up @@ -401,19 +385,7 @@ func (r *RedisFailoverKubeClient) CreateSentinelService(rf *RedisFailover) error
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for service to find bootstrap pod")
endpoints, _ := r.Client.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{})
addresses := 0
for _, subset := range endpoints.Subsets {
addresses += len(subset.Addresses)
}
if addresses > 0 {
t.Stop()
break
}
}
r.waitForService(name, namespace, logger)

return nil
}
Expand Down Expand Up @@ -543,15 +515,7 @@ func (r *RedisFailoverKubeClient) CreateSentinelDeployment(rf *RedisFailover) er
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for Sentinel deployment to be fully operative")
deployment, _ := r.Client.AppsV1beta1().Deployments(namespace).Get(name, metav1.GetOptions{})
if deployment.Status.ReadyReplicas == spec.Sentinel.Replicas {
t.Stop()
break
}
}
r.waitForDeployment(name, namespace, spec.Sentinel.Replicas, logger)

logger.Debug("Creating Sentinel PodDisruptionBudget...")
if err := r.createPodDisruptionBudget(rf, sentinelName, sentinelRoleName); err != nil {
Expand Down Expand Up @@ -729,15 +693,7 @@ func (r *RedisFailoverKubeClient) CreateRedisStatefulset(rf *RedisFailover) erro
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for Redis statefulset to be fully operative")
statefulset, _ := r.Client.AppsV1beta1().StatefulSets(namespace).Get(name, metav1.GetOptions{})
if statefulset.Status.ReadyReplicas == spec.Redis.Replicas {
t.Stop()
break
}
}
r.waitForStatefulset(name, namespace, spec.Redis.Replicas, logger)

logger.Debug("Creating Redis PodDisruptionBudget...")
if err := r.createPodDisruptionBudget(rf, redisName, redisRoleName); err != nil {
Expand Down Expand Up @@ -809,7 +765,9 @@ func (r *RedisFailoverKubeClient) createPodDisruptionBudget(rf *RedisFailover, n

// UpdateSentinelDeployment updates the spec of the existing sentinel deployment
func (r *RedisFailoverKubeClient) UpdateSentinelDeployment(rf *RedisFailover) error {
logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
name := r.GetSentinelName(rf)
namespace := rf.Metadata.Namespace
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)

quorum := rf.GetQuorum()
replicas := rf.Spec.Sentinel.Replicas
Expand All @@ -834,26 +792,20 @@ func (r *RedisFailoverKubeClient) UpdateSentinelDeployment(rf *RedisFailover) er
oldSD.Spec.Template.Spec.Containers[0].Image = getRedisImage(rf)
oldSD.Spec.Template.Spec.Containers[0].Resources = getSentinelResources(rf.Spec)

if _, err := r.Client.AppsV1beta1().Deployments(rf.Metadata.Namespace).Update(oldSD); err != nil {
if _, err := r.Client.AppsV1beta1().Deployments(namespace).Update(oldSD); err != nil {
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for Sentinel deployment to be updated")
deployment, _ := r.GetSentinelDeployment(rf)
if deployment.Status.ReadyReplicas == replicas && deployment.Status.UpdatedReplicas == replicas {
t.Stop()
break
}
}
r.waitForDeployment(name, namespace, replicas, logger)

return nil
}

// UpdateRedisStatefulset updates the spec of the existing redis statefulset
func (r *RedisFailoverKubeClient) UpdateRedisStatefulset(rf *RedisFailover) error {
logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
name := r.GetRedisName(rf)
namespace := rf.Metadata.Namespace
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)

replicas := rf.Spec.Redis.Replicas

Expand All @@ -866,19 +818,11 @@ func (r *RedisFailoverKubeClient) UpdateRedisStatefulset(rf *RedisFailover) erro
oldSS.Spec.Template.Spec.Containers[0].Resources = getRedisResources(rf.Spec)
oldSS.Spec.Template.Spec.Containers[0].Image = getRedisImage(rf)

if _, err := r.Client.AppsV1beta1().StatefulSets(rf.Metadata.Namespace).Update(oldSS); err != nil {
if _, err := r.Client.AppsV1beta1().StatefulSets(namespace).Update(oldSS); err != nil {
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for Redis statefulset to be updated")
statefulset, _ := r.GetRedisStatefulset(rf)
if statefulset.Status.ReadyReplicas == replicas && statefulset.Status.UpdatedReplicas == replicas {
t.Stop()
break
}
}
r.waitForStatefulset(name, namespace, replicas, logger)

return nil
}
Expand All @@ -888,20 +832,14 @@ func (r *RedisFailoverKubeClient) DeleteBootstrapPod(rf *RedisFailover) error {
name := r.GetBootstrapName(rf)
namespace := rf.Metadata.Namespace

logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
err := r.Client.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{})
if err != nil {
return err
}
t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for pod to terminate")
pod, _ := r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
if len(pod.Name) == 0 {
t.Stop()
break
}
}

r.waitForPodDeletion(name, namespace, logger)

return nil
}

Expand All @@ -913,13 +851,15 @@ func (r *RedisFailoverKubeClient) DeleteRedisStatefulset(rf *RedisFailover) erro
if err := r.Client.AppsV1beta1().StatefulSets(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil {
return err
}
logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
logger.Debug("Deleting Redis PodDisruptionBudget...")
if err := r.deletePodDisruptionBudget(rf, redisName); err != nil {
return err
}
logger.Debug("Redis PodDisruptionBudget deleted!")
// TODO: Wait for statefulset to really delete

r.waitForStatefulsetDeletion(name, namespace, logger)

return nil
}

Expand All @@ -931,36 +871,45 @@ func (r *RedisFailoverKubeClient) DeleteSentinelDeployment(rf *RedisFailover) er
if err := r.Client.AppsV1beta1().Deployments(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil {
return err
}
logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
logger.Debug("Deleting Sentinel PodDisruptionBudget...")
if err := r.deletePodDisruptionBudget(rf, sentinelName); err != nil {
return err
}
logger.Debug("Sentinel PodDisruptionBudget deleted!")
// TODO: Wait for deployment to really delete

r.waitForDeploymentDeletion(name, namespace, logger)

return nil
}

// DeleteSentinelService deletes a sentinel service
func (r *RedisFailoverKubeClient) DeleteSentinelService(rf *RedisFailover) error {
name := r.GetSentinelName(rf)
namespace := rf.Metadata.Namespace
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
propagation := metav1.DeletePropagationForeground
if err := r.Client.CoreV1().Services(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil {
return err
}
// TODO: Wait for service to really delete

r.waitForServiceDeletion(name, namespace, logger)

return nil
}

// DeleteRedisService deletes redis service
func (r *RedisFailoverKubeClient) DeleteRedisService(rf *RedisFailover) error {
name := r.GetRedisName(rf)
namespace := rf.Metadata.Namespace
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
propagation := metav1.DeletePropagationForeground
if err := r.Client.CoreV1().Services(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil {
return err
}

r.waitForServiceDeletion(name, namespace, logger)

return nil
}

Expand Down
142 changes: 142 additions & 0 deletions pkg/failover/waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package failover

import (
"github.com/spotahome/redis-operator/pkg/log"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1"
)

func (r *RedisFailoverKubeClient) waitForPod(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for pod to be ready")
pod, _ := r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
for _, condition := range pod.Status.Conditions {
if condition.Type == "Ready" && condition.Status == v1.ConditionTrue {
return
}
}
}
}

func (r *RedisFailoverKubeClient) waitForService(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for service to find bootstrap pod")
endpoints, _ := r.Client.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{})
addresses := 0
for _, subset := range endpoints.Subsets {
addresses += len(subset.Addresses)
}
if addresses > 0 {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForDeployment(name string, namespace string, replicas int32, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for Sentinel deployment to be fully operative")
deployment, _ := r.Client.AppsV1beta1().Deployments(namespace).Get(name, metav1.GetOptions{})
if deployment.Status.ReadyReplicas == replicas {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForStatefulset(name string, namespace string, replicas int32, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for Redis statefulset to be fully operative")
statefulset, _ := r.Client.AppsV1beta1().StatefulSets(namespace).Get(name, metav1.GetOptions{})
if statefulset.Status.ReadyReplicas == replicas {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForPodDeletion(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for pod to terminate")
podList, _ := r.Client.CoreV1().Pods(namespace).List(metav1.ListOptions{})
found := false
for _, pod := range podList.Items {
if pod.Name == name {
found = true
}
}
if !found {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForStatefulsetDeletion(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for statefulset to terminate")
statefulsetList, _ := r.Client.AppsV1beta1().StatefulSets(namespace).List(metav1.ListOptions{})
found := false
for _, statefulset := range statefulsetList.Items {
if statefulset.Name == name {
found = true
}
}
if !found {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForDeploymentDeletion(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for deployment to terminate")
deploymentList, _ := r.Client.Apps().Deployments(namespace).List(metav1.ListOptions{})
found := false
for _, deployment := range deploymentList.Items {
if deployment.Name == name {
found = true
}
}
if !found {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForServiceDeletion(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for service to disappear")
serviceList, _ := r.Client.Core().Services(namespace).List(metav1.ListOptions{})
found := false
for _, service := range serviceList.Items {
if service.Name == name {
found = true
}
}
if !found {
return
}
}
}

0 comments on commit 11d12b2

Please sign in to comment.