diff --git a/pkg/controllers/rediscluster/rediscluster_controller.go b/pkg/controllers/rediscluster/rediscluster_controller.go index 956938bac..18ad9ca2d 100644 --- a/pkg/controllers/rediscluster/rediscluster_controller.go +++ b/pkg/controllers/rediscluster/rediscluster_controller.go @@ -73,37 +73,40 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } // Check if the cluster is downscaled - if leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); leaderReplicas < leaderCount { + if leaderCount := r.GetStatefulSetReplicas(ctx, instance.Namespace, instance.Name+"-leader"); leaderReplicas < leaderCount { if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) { return intctrlutil.Reconciled() } - - logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas) - for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- { - logger.Info("Remove the shard", "Shard.Index", shardIdx) - // Imp if the last index of leader sts is not leader make it then - // check whether the redis is leader or not ? - // if not true then make it leader pod - if !(k8sutils.VerifyLeaderPod(ctx, r.K8sClient, instance)) { - // lastLeaderPod is slaving right now Make it the master Pod - // We have to bring a manual failover here to make it a leaderPod - // clusterFailover should also include the clusterReplicate since we have to map the followers to new leader - logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx) - if err = k8sutils.ClusterFailover(ctx, r.K8sClient, instance); err != nil { - logger.Error(err, "Failed to initiate cluster failover") - return intctrlutil.RequeueWithError(ctx, err, "") + if masterCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); masterCount == leaderCount { + logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas) + for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- { + logger.Info("Remove the shard", "Shard.Index", shardIdx) + // Imp if the last index of leader sts is not leader make it then + // check whether the redis is leader or not ? + // if not true then make it leader pod + if !(k8sutils.VerifyLeaderPod(ctx, r.K8sClient, instance, shardIdx)) { + // lastLeaderPod is slaving right now Make it the master Pod + // We have to bring a manual failover here to make it a leaderPod + // clusterFailover should also include the clusterReplicate since we have to map the followers to new leader + logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx) + if err = k8sutils.ClusterFailover(ctx, r.K8sClient, instance, shardIdx); err != nil { + logger.Error(err, "Failed to initiate cluster failover") + return intctrlutil.RequeueWithError(ctx, err, "") + } } + // Step 1 Remove the Follower Node + k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance, shardIdx) + // Step 2 Reshard the Cluster + k8sutils.ReshardRedisCluster(ctx, r.K8sClient, instance, shardIdx, true) } - // Step 1 Remove the Follower Node - k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance) - // Step 2 Reshard the Cluster - k8sutils.ReshardRedisCluster(ctx, r.K8sClient, instance, true) + logger.Info("Redis cluster is downscaled... Rebalancing the cluster") + // Step 3 Rebalance the cluster + k8sutils.RebalanceRedisCluster(ctx, r.K8sClient, instance) + logger.Info("Redis cluster is downscaled... Rebalancing the cluster is done") + return intctrlutil.RequeueAfter(ctx, time.Second*10, "") + } else { + logger.Info("masterCount is not equal to leader statefulset replicas,skip downscale", "masterCount", masterCount, "leaderReplicas", leaderReplicas) } - logger.Info("Redis cluster is downscaled... Rebalancing the cluster") - // Step 3 Rebalance the cluster - k8sutils.RebalanceRedisCluster(ctx, r.K8sClient, instance) - logger.Info("Redis cluster is downscaled... Rebalancing the cluster is done") - return intctrlutil.RequeueAfter(ctx, time.Second*10, "") } // Mark the cluster status as initializing if there are no leader or follower nodes diff --git a/pkg/k8sutils/cluster-scaling.go b/pkg/k8sutils/cluster-scaling.go index 3e7cd7097..8d0c9b3ce 100644 --- a/pkg/k8sutils/cluster-scaling.go +++ b/pkg/k8sutils/cluster-scaling.go @@ -15,12 +15,11 @@ import ( // ReshardRedisCluster transfer the slots from the last node to the first node. // // NOTE: when all slot been transferred, the node become slave of the first master node. -func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, remove bool) { +func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32, remove bool) { redisClient := configureRedisClient(ctx, client, cr, cr.ObjectMeta.Name+"-leader-0") defer redisClient.Close() var cmd []string - currentRedisCount := CheckRedisNodeCount(ctx, client, cr, "leader") // Transfer Pod details transferPOD := RedisDetails{ @@ -29,7 +28,7 @@ func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *r } // Remove POD details removePOD := RedisDetails{ - PodName: cr.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1), + PodName: cr.Name + "-leader-" + strconv.Itoa(int(shardIdx)), Namespace: cr.Namespace, } cmd = []string{"redis-cli", "--cluster", "reshard"} @@ -274,18 +273,17 @@ func getAttachedFollowerNodeIDs(ctx context.Context, redisClient *redis.Client, } // Remove redis follower node would remove all follower nodes of last leader node using redis-cli -func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) { +func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32) { var cmd []string redisClient := configureRedisClient(ctx, client, cr, cr.ObjectMeta.Name+"-leader-0") defer redisClient.Close() - currentRedisCount := CheckRedisNodeCount(ctx, client, cr, "leader") existingPod := RedisDetails{ PodName: cr.ObjectMeta.Name + "-leader-0", Namespace: cr.Namespace, } lastLeaderPod := RedisDetails{ - PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1), + PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(shardIdx)), Namespace: cr.Namespace, } @@ -365,8 +363,8 @@ func RemoveRedisNodeFromCluster(ctx context.Context, client kubernetes.Interface } // verifyLeaderPod return true if the pod is leader/master -func VerifyLeaderPod(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) bool { - podName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1) +func VerifyLeaderPod(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, leadIndex int32) bool { + podName := cr.Name + "-leader-" + strconv.Itoa(int(leadIndex)) redisClient := configureRedisClient(ctx, client, cr, podName) defer redisClient.Close() @@ -391,8 +389,8 @@ func verifyLeaderPodInfo(ctx context.Context, redisClient *redis.Client, podName return false } -func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) error { - slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1) +func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32) error { + slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(shardIdx)) // cmd = redis-cli cluster failover -a var cmd []string pod := RedisDetails{ diff --git a/pkg/k8sutils/statefulset.go b/pkg/k8sutils/statefulset.go index b593b8b95..2f010107f 100644 --- a/pkg/k8sutils/statefulset.go +++ b/pkg/k8sutils/statefulset.go @@ -26,6 +26,7 @@ import ( type StatefulSet interface { IsStatefulSetReady(ctx context.Context, namespace, name string) bool + GetStatefulSetReplicas(ctx context.Context, namespace, name string) int32 } type StatefulSetService struct { @@ -76,6 +77,17 @@ func (s *StatefulSetService) IsStatefulSetReady(ctx context.Context, namespace, return true } +func (s *StatefulSetService) GetStatefulSetReplicas(ctx context.Context, namespace, name string) int32 { + sts, err := s.kubeClient.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return 0 + } + if sts.Spec.Replicas == nil { + return 0 + } + return *sts.Spec.Replicas +} + const ( redisExporterContainer = "redis-exporter" )