Skip to content

Commit

Permalink
support multiple statefulsets in 1 hashring
Browse files Browse the repository at this point in the history
  • Loading branch information
christopherzli committed Jan 24, 2024
1 parent 4f27cf4 commit f6e7a2b
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 44 deletions.
95 changes: 51 additions & 44 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func (c *controller) sync(ctx context.Context) {
return
}

statefulsets := make(map[string]*appsv1.StatefulSet)
statefulsets := make(map[string][]*appsv1.StatefulSet)

for _, obj := range c.ssetInf.GetStore().List() {
sts, ok := obj.(*appsv1.StatefulSet)
Expand Down Expand Up @@ -587,7 +587,12 @@ func (c *controller) sync(ctx context.Context) {
}

c.replicas[hashring] = *sts.Spec.Replicas
statefulsets[hashring] = sts.DeepCopy()
if _, ok := statefulsets[hashring]; !ok {
// If not, initialize a new slice
statefulsets[hashring] = []*appsv1.StatefulSet{}
}
// Append the new value to the slice associated with the key
statefulsets[hashring] = append(statefulsets[hashring], sts.DeepCopy())

time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s
}
Expand Down Expand Up @@ -635,63 +640,65 @@ func (c controller) waitForPod(ctx context.Context, name string) error {
})
}

func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string]*appsv1.StatefulSet) {
func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string][]*appsv1.StatefulSet) {
for i, h := range hashrings {
sts, exists := statefulsets[h.Hashring]
stsList, exists := statefulsets[h.Hashring]
if !exists {
continue
}

var endpoints []receive.Endpoint

for i := 0; i < int(*sts.Spec.Replicas); i++ {
podName := fmt.Sprintf("%s-%d", sts.Name, i)
pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{})
if c.options.allowDynamicScaling {
if kerrors.IsNotFound(err) {
continue
for _, sts := range stsList {
for i := 0; i < int(*sts.Spec.Replicas); i++ {
podName := fmt.Sprintf("%s-%d", sts.Name, i)
pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{})
if c.options.allowDynamicScaling {
if kerrors.IsNotFound(err) {
continue
}
// Do not add a replica to the hashring if pod is not Ready.
if !podutils.IsPodReady(pod) {
level.Warn(c.logger).Log("msg", "failed adding pod to hashring, pod not ready", "pod", podName, "err", err)
continue
}

if pod.ObjectMeta.DeletionTimestamp != nil && (pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodPending) {
// Pod is terminating, do not add it to the hashring.
continue
}
}
// Do not add a replica to the hashring if pod is not Ready.
if !podutils.IsPodReady(pod) {
level.Warn(c.logger).Log("msg", "failed adding pod to hashring, pod not ready", "pod", podName, "err", err)
continue
// If cluster domain is empty string we don't want dot after svc.
clusterDomain := ""
if c.options.clusterDomain != "" {
clusterDomain = fmt.Sprintf(".%s", c.options.clusterDomain)
}

if pod.ObjectMeta.DeletionTimestamp != nil && (pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodPending) {
// Pod is terminating, do not add it to the hashring.
continue
endpoint := receive.Endpoint{
Address: fmt.Sprintf("%s-%d.%s.%s.svc%s:%d",
sts.Name,
i,
sts.Spec.ServiceName,
c.options.namespace,
clusterDomain,
c.options.port,
),
}
}
// If cluster domain is empty string we don't want dot after svc.
clusterDomain := ""
if c.options.clusterDomain != "" {
clusterDomain = fmt.Sprintf(".%s", c.options.clusterDomain)
}
endpoint := receive.Endpoint{
Address: fmt.Sprintf("%s-%d.%s.%s.svc%s:%d",
sts.Name,
i,
sts.Spec.ServiceName,
c.options.namespace,
clusterDomain,
c.options.port,
),
}

if c.options.useAzAwareHashRing {
//If pod annotation value is not found or key not specified,
//endpoint will the Statefulset name as AZ name
endpoint.AZ = sts.Name
if c.options.podAzAnnotationKey != "" && err == nil {
annotationValue, ok := pod.Annotations[c.options.podAzAnnotationKey]
if ok {
endpoint.AZ = annotationValue
if c.options.useAzAwareHashRing {
//If pod annotation value is not found or key not specified,
//endpoint will the Statefulset name as AZ name
endpoint.AZ = sts.Name
if c.options.podAzAnnotationKey != "" && err == nil {
annotationValue, ok := pod.Annotations[c.options.podAzAnnotationKey]
if ok {
endpoint.AZ = annotationValue
}
}
}
}

endpoints = append(endpoints, endpoint)
endpoints = append(endpoints, endpoint)

}
}

hashrings[i].Endpoints = endpoints
Expand Down
111 changes: 111 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,54 @@ func TestController(t *testing.T) {
},
}},
},
{
name: "OneHashringLabelKeyManyStatefulSets",
hashrings: []receive.HashringConfig{{
Hashring: "hashring0",
Tenants: []string{"foo", "bar"},
}},
statefulsets: []*appsv1.StatefulSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: "hashring0",
Labels: map[string]string{
"a": "b",
hashringLabelKey: "hashring0",
},
},
Spec: appsv1.StatefulSetSpec{
Replicas: intPointer(3),
ServiceName: "h0",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "hashring1",
Labels: map[string]string{
"a": "b",
hashringLabelKey: "hashring0",
},
},
Spec: appsv1.StatefulSetSpec{
Replicas: intPointer(2),
ServiceName: "h0",
},
},
},
clusterDomain: "cluster.local",
expected: []receive.HashringConfig{{
Hashring: "hashring0",
Tenants: []string{"foo", "bar"},
Endpoints: []receive.Endpoint{
{Address: "hashring0-0.h0.namespace.svc.cluster.local:10901"},
{Address: "hashring0-1.h0.namespace.svc.cluster.local:10901"},
{Address: "hashring0-2.h0.namespace.svc.cluster.local:10901"},
{Address: "hashring1-0.h0.namespace.svc.cluster.local:10901"},
{Address: "hashring1-1.h0.namespace.svc.cluster.local:10901"},
},
},
},
},
{
clusterDomain: "",
name: "OneHashringOneStatefulSetNoClusterDomain",
Expand Down Expand Up @@ -603,6 +651,69 @@ func TestControllerWithAzAware(t *testing.T) {
},
}},
},
{
name: "OneHashringLabelKeyManyStatefulSets",
hashrings: []receive.HashringConfig{{
Hashring: "hashring0",
Tenants: []string{"foo", "bar"},
}},
statefulsets: []*appsv1.StatefulSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: "hashring0",
Labels: map[string]string{
"a": "b",
hashringLabelKey: "hashring0",
},
},
Spec: appsv1.StatefulSetSpec{
Replicas: intPointer(3),
ServiceName: "h0",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "hashring1",
Labels: map[string]string{
"a": "b",
hashringLabelKey: "hashring0",
},
},
Spec: appsv1.StatefulSetSpec{
Replicas: intPointer(2),
ServiceName: "h0",
},
},
},
clusterDomain: "cluster.local",
expected: []receive.HashringConfig{{
Hashring: "hashring0",
Tenants: []string{"foo", "bar"},
Endpoints: []receive.Endpoint{
{
Address: "hashring0-0.h0.namespace.svc.cluster.local:10901",
AZ: "hashring0",
},
{
Address: "hashring0-1.h0.namespace.svc.cluster.local:10901",
AZ: "hashring0",
},
{
Address: "hashring0-2.h0.namespace.svc.cluster.local:10901",
AZ: "hashring0",
},
{
Address: "hashring1-0.h0.namespace.svc.cluster.local:10901",
AZ: "hashring1",
},
{
Address: "hashring1-1.h0.namespace.svc.cluster.local:10901",
AZ: "hashring1",
},
},
},
},
},
{
clusterDomain: "",
name: "OneHashringOneStatefulSetNoClusterDomain",
Expand Down

0 comments on commit f6e7a2b

Please sign in to comment.