diff --git a/pkg/failover/client.go b/pkg/failover/client.go index 86dd0c26f..b60173b4f 100644 --- a/pkg/failover/client.go +++ b/pkg/failover/client.go @@ -340,23 +340,7 @@ func (r *RedisFailoverKubeClient) CreateBootstrapPod(rf *RedisFailover) error { return err } - t := r.clock.NewTicker(loopInterval) - for range t.C { - logger.Debug("Waiting for pod to be ready") - ready := false - pod, _ = r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) - for _, condition := range pod.Status.Conditions { - - if condition.Type == "Ready" && condition.Status == v1.ConditionTrue { - ready = true - break - } - } - if ready { - t.Stop() - break - } - } + r.waitForPod(rf.Metadata.Name, rf.Metadata.Namespace, logger) return nil } @@ -401,19 +385,7 @@ func (r *RedisFailoverKubeClient) CreateSentinelService(rf *RedisFailover) error return err } - t := r.clock.NewTicker(loopInterval) - for range t.C { - logger.Debug("Waiting for service to find bootstrap pod") - endpoints, _ := r.Client.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{}) - addresses := 0 - for _, subset := range endpoints.Subsets { - addresses += len(subset.Addresses) - } - if addresses > 0 { - t.Stop() - break - } - } + r.waitForService(name, namespace, logger) return nil } @@ -543,15 +515,7 @@ func (r *RedisFailoverKubeClient) CreateSentinelDeployment(rf *RedisFailover) er return err } - t := r.clock.NewTicker(loopInterval) - for range t.C { - logger.Debug("Waiting for Sentinel deployment to be fully operative") - deployment, _ := r.Client.AppsV1beta1().Deployments(namespace).Get(name, metav1.GetOptions{}) - if deployment.Status.ReadyReplicas == spec.Sentinel.Replicas { - t.Stop() - break - } - } + r.waitForDeployment(name, namespace, spec.Sentinel.Replicas, logger) logger.Debug("Creating Sentinel PodDisruptionBudget...") if err := r.createPodDisruptionBudget(rf, sentinelName, sentinelRoleName); err != nil { @@ -729,15 +693,7 @@ func (r *RedisFailoverKubeClient) CreateRedisStatefulset(rf *RedisFailover) erro return err } - t := r.clock.NewTicker(loopInterval) - for range t.C { - logger.Debug("Waiting for Redis statefulset to be fully operative") - statefulset, _ := r.Client.AppsV1beta1().StatefulSets(namespace).Get(name, metav1.GetOptions{}) - if statefulset.Status.ReadyReplicas == spec.Redis.Replicas { - t.Stop() - break - } - } + r.waitForStatefulset(name, namespace, spec.Redis.Replicas, logger) logger.Debug("Creating Redis PodDisruptionBudget...") if err := r.createPodDisruptionBudget(rf, redisName, redisRoleName); err != nil { @@ -809,7 +765,9 @@ func (r *RedisFailoverKubeClient) createPodDisruptionBudget(rf *RedisFailover, n // UpdateSentinelDeployment updates the spec of the existing sentinel deployment func (r *RedisFailoverKubeClient) UpdateSentinelDeployment(rf *RedisFailover) error { - logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace) + name := r.GetSentinelName(rf) + namespace := rf.Metadata.Namespace + logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace) quorum := rf.GetQuorum() replicas := rf.Spec.Sentinel.Replicas @@ -834,26 +792,20 @@ func (r *RedisFailoverKubeClient) UpdateSentinelDeployment(rf *RedisFailover) er oldSD.Spec.Template.Spec.Containers[0].Image = getRedisImage(rf) oldSD.Spec.Template.Spec.Containers[0].Resources = getSentinelResources(rf.Spec) - if _, err := r.Client.AppsV1beta1().Deployments(rf.Metadata.Namespace).Update(oldSD); err != nil { + if _, err := r.Client.AppsV1beta1().Deployments(namespace).Update(oldSD); err != nil { return err } - t := r.clock.NewTicker(loopInterval) - for range t.C { - logger.Debug("Waiting for Sentinel deployment to be updated") - deployment, _ := r.GetSentinelDeployment(rf) - if deployment.Status.ReadyReplicas == replicas && deployment.Status.UpdatedReplicas == replicas { - t.Stop() - break - } - } + r.waitForDeployment(name, namespace, replicas, logger) return nil } // UpdateRedisStatefulset updates the spec of the existing redis statefulset func (r *RedisFailoverKubeClient) UpdateRedisStatefulset(rf *RedisFailover) error { - logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace) + name := r.GetRedisName(rf) + namespace := rf.Metadata.Namespace + logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace) replicas := rf.Spec.Redis.Replicas @@ -866,19 +818,11 @@ func (r *RedisFailoverKubeClient) UpdateRedisStatefulset(rf *RedisFailover) erro oldSS.Spec.Template.Spec.Containers[0].Resources = getRedisResources(rf.Spec) oldSS.Spec.Template.Spec.Containers[0].Image = getRedisImage(rf) - if _, err := r.Client.AppsV1beta1().StatefulSets(rf.Metadata.Namespace).Update(oldSS); err != nil { + if _, err := r.Client.AppsV1beta1().StatefulSets(namespace).Update(oldSS); err != nil { return err } - t := r.clock.NewTicker(loopInterval) - for range t.C { - logger.Debug("Waiting for Redis statefulset to be updated") - statefulset, _ := r.GetRedisStatefulset(rf) - if statefulset.Status.ReadyReplicas == replicas && statefulset.Status.UpdatedReplicas == replicas { - t.Stop() - break - } - } + r.waitForStatefulset(name, namespace, replicas, logger) return nil } @@ -888,20 +832,14 @@ func (r *RedisFailoverKubeClient) DeleteBootstrapPod(rf *RedisFailover) error { name := r.GetBootstrapName(rf) namespace := rf.Metadata.Namespace - logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace) + logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace) err := r.Client.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{}) if err != nil { return err } - t := r.clock.NewTicker(loopInterval) - for range t.C { - logger.Debug("Waiting for pod to terminate") - pod, _ := r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) - if len(pod.Name) == 0 { - t.Stop() - break - } - } + + r.waitForPodDeletion(name, namespace, logger) + return nil } @@ -913,13 +851,15 @@ func (r *RedisFailoverKubeClient) DeleteRedisStatefulset(rf *RedisFailover) erro if err := r.Client.AppsV1beta1().StatefulSets(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil { return err } - logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace) + logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace) logger.Debug("Deleting Redis PodDisruptionBudget...") if err := r.deletePodDisruptionBudget(rf, redisName); err != nil { return err } logger.Debug("Redis PodDisruptionBudget deleted!") - // TODO: Wait for statefulset to really delete + + r.waitForStatefulsetDeletion(name, namespace, logger) + return nil } @@ -931,13 +871,15 @@ func (r *RedisFailoverKubeClient) DeleteSentinelDeployment(rf *RedisFailover) er if err := r.Client.AppsV1beta1().Deployments(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil { return err } - logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace) + logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace) logger.Debug("Deleting Sentinel PodDisruptionBudget...") if err := r.deletePodDisruptionBudget(rf, sentinelName); err != nil { return err } logger.Debug("Sentinel PodDisruptionBudget deleted!") - // TODO: Wait for deployment to really delete + + r.waitForDeploymentDeletion(name, namespace, logger) + return nil } @@ -945,11 +887,14 @@ func (r *RedisFailoverKubeClient) DeleteSentinelDeployment(rf *RedisFailover) er func (r *RedisFailoverKubeClient) DeleteSentinelService(rf *RedisFailover) error { name := r.GetSentinelName(rf) namespace := rf.Metadata.Namespace + logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace) propagation := metav1.DeletePropagationForeground if err := r.Client.CoreV1().Services(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil { return err } - // TODO: Wait for service to really delete + + r.waitForServiceDeletion(name, namespace, logger) + return nil } @@ -957,10 +902,14 @@ func (r *RedisFailoverKubeClient) DeleteSentinelService(rf *RedisFailover) error func (r *RedisFailoverKubeClient) DeleteRedisService(rf *RedisFailover) error { name := r.GetRedisName(rf) namespace := rf.Metadata.Namespace + logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace) propagation := metav1.DeletePropagationForeground if err := r.Client.CoreV1().Services(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil { return err } + + r.waitForServiceDeletion(name, namespace, logger) + return nil } diff --git a/pkg/failover/waiter.go b/pkg/failover/waiter.go new file mode 100644 index 000000000..8e81d1768 --- /dev/null +++ b/pkg/failover/waiter.go @@ -0,0 +1,142 @@ +package failover + +import ( + "github.com/spotahome/redis-operator/pkg/log" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/pkg/api/v1" +) + +func (r *RedisFailoverKubeClient) waitForPod(name string, namespace string, logger log.Logger) { + t := r.clock.NewTicker(loopInterval) + defer t.Stop() + + for range t.C { + logger.Debug("Waiting for pod to be ready") + pod, _ := r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) + for _, condition := range pod.Status.Conditions { + if condition.Type == "Ready" && condition.Status == v1.ConditionTrue { + return + } + } + } +} + +func (r *RedisFailoverKubeClient) waitForService(name string, namespace string, logger log.Logger) { + t := r.clock.NewTicker(loopInterval) + defer t.Stop() + + for range t.C { + logger.Debug("Waiting for service to find bootstrap pod") + endpoints, _ := r.Client.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{}) + addresses := 0 + for _, subset := range endpoints.Subsets { + addresses += len(subset.Addresses) + } + if addresses > 0 { + return + } + } +} + +func (r *RedisFailoverKubeClient) waitForDeployment(name string, namespace string, replicas int32, logger log.Logger) { + t := r.clock.NewTicker(loopInterval) + defer t.Stop() + + for range t.C { + logger.Debug("Waiting for Sentinel deployment to be fully operative") + deployment, _ := r.Client.AppsV1beta1().Deployments(namespace).Get(name, metav1.GetOptions{}) + if deployment.Status.ReadyReplicas == replicas { + return + } + } +} + +func (r *RedisFailoverKubeClient) waitForStatefulset(name string, namespace string, replicas int32, logger log.Logger) { + t := r.clock.NewTicker(loopInterval) + defer t.Stop() + + for range t.C { + logger.Debug("Waiting for Redis statefulset to be fully operative") + statefulset, _ := r.Client.AppsV1beta1().StatefulSets(namespace).Get(name, metav1.GetOptions{}) + if statefulset.Status.ReadyReplicas == replicas { + return + } + } +} + +func (r *RedisFailoverKubeClient) waitForPodDeletion(name string, namespace string, logger log.Logger) { + t := r.clock.NewTicker(loopInterval) + defer t.Stop() + + for range t.C { + logger.Debug("Waiting for pod to terminate") + podList, _ := r.Client.CoreV1().Pods(namespace).List(metav1.ListOptions{}) + found := false + for _, pod := range podList.Items { + if pod.Name == name { + found = true + } + } + if !found { + return + } + } +} + +func (r *RedisFailoverKubeClient) waitForStatefulsetDeletion(name string, namespace string, logger log.Logger) { + t := r.clock.NewTicker(loopInterval) + defer t.Stop() + + for range t.C { + logger.Debug("Waiting for statefulset to terminate") + statefulsetList, _ := r.Client.AppsV1beta1().StatefulSets(namespace).List(metav1.ListOptions{}) + found := false + for _, statefulset := range statefulsetList.Items { + if statefulset.Name == name { + found = true + } + } + if !found { + return + } + } +} + +func (r *RedisFailoverKubeClient) waitForDeploymentDeletion(name string, namespace string, logger log.Logger) { + t := r.clock.NewTicker(loopInterval) + defer t.Stop() + + for range t.C { + logger.Debug("Waiting for deployment to terminate") + deploymentList, _ := r.Client.Apps().Deployments(namespace).List(metav1.ListOptions{}) + found := false + for _, deployment := range deploymentList.Items { + if deployment.Name == name { + found = true + } + } + if !found { + return + } + } +} + +func (r *RedisFailoverKubeClient) waitForServiceDeletion(name string, namespace string, logger log.Logger) { + t := r.clock.NewTicker(loopInterval) + defer t.Stop() + + for range t.C { + logger.Debug("Waiting for service to disappear") + serviceList, _ := r.Client.Core().Services(namespace).List(metav1.ListOptions{}) + found := false + for _, service := range serviceList.Items { + if service.Name == name { + found = true + } + } + if !found { + return + } + } +}