From f48bb5b3cbcdc8a8302301484fa3b631fcae2f65 Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Mon, 19 Aug 2024 15:02:47 -0700 Subject: [PATCH] add pantheon migration state metrics (#4) * support az aware hashring and multiple sts in one hashring (#129) * support az aware hashring * Update receive-controller.json * support multiple statefulsets in 1 hashring * add more logs * style * fix lint issue * debug * return when encountering error * remove whitespace * Fix k8s permissions (#133) * Fix k8s permissions * fix ci * fix ci * sync * add pantheon migration state * Revert "Fix k8s permissions (#133)" This reverts commit e545b838a002f0a38e8fc58346d8fd4a19a485d4. --------- Co-authored-by: Alec Rajeev <13004609+alecrajeev@users.noreply.github.com> Signed-off-by: Yi Jin --- .gitignore | 1 + main.go | 55 +++++++++++++++++++- main_test.go | 144 +++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 194 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index b892ee13..b246026a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ vendor jsonnetfile.lock.json tmp .buildxcache +.idea/ diff --git a/main.go b/main.go index bae08800..74d0bf5c 100644 --- a/main.go +++ b/main.go @@ -41,7 +41,8 @@ import ( type label = string const ( - defaultPort = 10901 + defaultPort = 10901 + defaultReplicaFactor = 3 resyncPeriod = 5 * time.Minute defaultScaleTimeout = 5 * time.Second @@ -76,6 +77,7 @@ type CmdConfig struct { ScaleTimeout time.Duration useAzAwareHashRing bool podAzAnnotationKey string + migrationState string } func parseFlags() CmdConfig { @@ -98,6 +100,7 @@ func parseFlags() CmdConfig { flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy") flag.BoolVar(&config.useAzAwareHashRing, "use-az-aware-hashring", false, "A boolean to use az aware hashring to comply with Thanos v0.32+") flag.StringVar(&config.podAzAnnotationKey, "pod-az-annotation-key", "", "pod annotation key for AZ Info, If not specified or key not found, will use sts name as AZ key") + flag.StringVar(&config.migrationState, "migration-state", "no-state", "[Databricks Internal] internal pantheon migration state info") flag.Parse() return config @@ -160,7 +163,9 @@ func main() { scaleTimeout: config.ScaleTimeout, useAzAwareHashRing: config.useAzAwareHashRing, podAzAnnotationKey: config.podAzAnnotationKey, + migrationState: config.migrationState, } + c := newController(klient, logger, opt) c.registerMetrics(reg) done := make(chan struct{}) @@ -346,6 +351,7 @@ type options struct { scaleTimeout time.Duration useAzAwareHashRing bool podAzAnnotationKey string + migrationState string } type controller struct { @@ -368,6 +374,7 @@ type controller struct { configmapLastSuccessfulChangeTime prometheus.Gauge hashringNodes *prometheus.GaugeVec hashringTenants *prometheus.GaugeVec + pantheonMigrationState *prometheus.GaugeVec } func newController(klient kubernetes.Interface, logger log.Logger, o *options) *controller { @@ -432,6 +439,13 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) * }, []string{"name"}, ), + pantheonMigrationState: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "thanos_receive_controller_pantheon_migration_state", + Help: "pantheon migration state", + }, + []string{"migration_state"}, + ), } } @@ -446,6 +460,7 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) { c.configmapChangeErrors.WithLabelValues(create).Add(0) c.configmapChangeErrors.WithLabelValues(update).Add(0) c.configmapChangeErrors.WithLabelValues(other).Add(0) + c.pantheonMigrationState.WithLabelValues(c.options.migrationState).Add(1) reg.MustRegister( c.reconcileAttempts, c.reconcileErrors, @@ -455,6 +470,7 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) { c.configmapLastSuccessfulChangeTime, c.hashringNodes, c.hashringTenants, + c.pantheonMigrationState, ) } } @@ -533,6 +549,36 @@ func (c *controller) worker(ctx context.Context) { } } +func (c *controller) isProvisioned(statefulsets map[string][]*appsv1.StatefulSet) bool { + _, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapGeneratedName)) + if ok && err == nil { + level.Warn(c.logger).Log("msg", "could not fetch ConfigMap", "err", err) + // if the generated configmap is already present, we don't need to do anything + return true + } + + if len(statefulsets) == 0 { + return false + } + + for group, stsList := range statefulsets { + level.Info(c.logger).Log("msg", "checking statefulsets group", "group", group) + // at least 3 statefulsets need to be ready during provision per replication group + if len(stsList) < defaultReplicaFactor { + for _, sts := range stsList { + level.Info(c.logger).Log("msg", "not enough statefulsets found during provision < 3", + "sts", sts.Name, + "replicas", sts.Spec.Replicas, + "ready", sts.Status.ReadyReplicas) + } + + return false + } + } + + return true +} + func (c *controller) sync(ctx context.Context) { c.reconcileAttempts.Inc() configMap, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapName)) @@ -613,6 +659,11 @@ func (c *controller) sync(ctx context.Context) { time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s } + if !c.isProvisioned(statefulsets) { + level.Error(c.logger).Log("msg", "not enough statefulsets found during provision") + return + } + c.populate(ctx, hashrings, statefulsets) level.Info(c.logger).Log("msg", "hashring populated", "hashring", fmt.Sprintf("%+v", hashrings)) @@ -632,7 +683,7 @@ func (c *controller) sync(ctx context.Context) { } } -func (c controller) waitForPod(ctx context.Context, name string) error { +func (c *controller) waitForPod(ctx context.Context, name string) error { //nolint:staticcheck return wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, name, metav1.GetOptions{}) diff --git a/main_test.go b/main_test.go index cd61c83a..91fb5af7 100644 --- a/main_test.go +++ b/main_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/receive" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -32,6 +33,7 @@ func TestController(t *testing.T) { statefulsets []*appsv1.StatefulSet clusterDomain string expected []receive.HashringConfig + notProvision bool }{ { name: "Empty", @@ -286,6 +288,7 @@ func TestController(t *testing.T) { statefulsets := tt.statefulsets expected := tt.expected clusterDomain := tt.clusterDomain + provisioned := !tt.notProvision t.Run(name, func(t *testing.T) { opts := &options{ @@ -303,7 +306,7 @@ func TestController(t *testing.T) { cleanUp := setupController(ctx, t, klient, opts) defer cleanUp() - _ = createInitialResources(ctx, t, klient, opts, hashrings, statefulsets) + _ = createInitialResources(ctx, t, klient, opts, hashrings, statefulsets, provisioned) // Reconciliation is async, so we need to wait a bit. <-time.After(reconciliationDelay) @@ -345,6 +348,7 @@ func TestControllerConfigmapUpdate(t *testing.T) { hashrings []receive.HashringConfig labels map[string]string shouldBeUpdated bool + provisioned bool }{ { name: "DifferentHashring", @@ -366,6 +370,7 @@ func TestControllerConfigmapUpdate(t *testing.T) { hashrings := tt.hashrings labels := tt.labels shouldBeUpdated := tt.shouldBeUpdated + provisioned := tt.provisioned t.Run(name, func(t *testing.T) { opts := &options{ @@ -397,7 +402,7 @@ func TestControllerConfigmapUpdate(t *testing.T) { ServiceName: "h0", }, }, - }) + }, provisioned) buf, err := json.Marshal(hashrings) if err != nil { @@ -452,7 +457,9 @@ func TestControllerWithAzAware(t *testing.T) { hashrings []receive.HashringConfig statefulsets []*appsv1.StatefulSet clusterDomain string + notProvision bool expected []receive.HashringConfig + expectErr bool }{ { name: "Empty", @@ -758,12 +765,118 @@ func TestControllerWithAzAware(t *testing.T) { }, }}, }, + { + name: "OneHashringManyStatefulSetsNotProvisionedError", + hashrings: []receive.HashringConfig{{ + Hashring: "hashring0", + Tenants: []string{"foo", "bar"}, + }}, + statefulsets: []*appsv1.StatefulSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "hashring0", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "hashring0", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(3), + ServiceName: "h0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "hashring1", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "hashring0", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(2), + ServiceName: "h0", + }, + }, + }, + clusterDomain: "cluster.local", + notProvision: true, + expectErr: true, + }, + { + name: "OneHashringManyStatefulSetsNotProvisioned", + notProvision: true, + hashrings: []receive.HashringConfig{{ + Hashring: "group", + }}, + statefulsets: []*appsv1.StatefulSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "rep0", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "group", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(1), + ServiceName: "h0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "rep1", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "group", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(1), + ServiceName: "h1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "rep2", + Labels: map[string]string{ + "a": "b", + hashringLabelKey: "group", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: intPointer(1), + ServiceName: "h2", + }, + }, + }, + clusterDomain: "cluster.local", + expected: []receive.HashringConfig{{ + Hashring: "group", + Endpoints: []receive.Endpoint{ + { + Address: "rep0-0.h0.namespace.svc.cluster.local:10901", + AZ: "rep0", + }, + { + Address: "rep1-0.h1.namespace.svc.cluster.local:10901", + AZ: "rep1", + }, + { + Address: "rep2-0.h2.namespace.svc.cluster.local:10901", + AZ: "rep2", + }, + }, + }}, + }, } { name := tt.name hashrings := tt.hashrings statefulsets := tt.statefulsets expected := tt.expected + expectErr := tt.expectErr clusterDomain := tt.clusterDomain + provisioned := !tt.notProvision t.Run(name, func(t *testing.T) { opts := &options{ @@ -777,16 +890,21 @@ func TestControllerWithAzAware(t *testing.T) { port: port, scheme: "http", useAzAwareHashRing: true, + migrationState: "hello", } klient := fake.NewSimpleClientset() cleanUp := setupController(ctx, t, klient, opts) defer cleanUp() - _ = createInitialResources(ctx, t, klient, opts, hashrings, statefulsets) + _ = createInitialResources(ctx, t, klient, opts, hashrings, statefulsets, provisioned) // Reconciliation is async, so we need to wait a bit. <-time.After(reconciliationDelay) cm, err := klient.CoreV1().ConfigMaps(opts.namespace).Get(ctx, opts.configMapGeneratedName, metav1.GetOptions{}) + if expectErr { + require.Error(t, err, "expected error to get generated config map") + return + } if err != nil { t.Fatalf("got unexpected error getting ConfigMap: %v", err) } @@ -833,6 +951,7 @@ func TestControllerConfigmapUpdateWithAzAware(t *testing.T) { hashrings []receive.HashringConfig labels map[string]string shouldBeUpdated bool + provisioned bool }{ { name: "DifferentHashring", @@ -854,6 +973,7 @@ func TestControllerConfigmapUpdateWithAzAware(t *testing.T) { hashrings := tt.hashrings labels := tt.labels shouldBeUpdated := tt.shouldBeUpdated + provisioned := tt.provisioned t.Run(name, func(t *testing.T) { opts := &options{ @@ -886,7 +1006,7 @@ func TestControllerConfigmapUpdateWithAzAware(t *testing.T) { ServiceName: "h0", }, }, - }) + }, provisioned) buf, err := json.Marshal(hashrings) if err != nil { @@ -956,6 +1076,7 @@ func createInitialResources( opts *options, hashrings []receive.HashringConfig, statefulsets []*appsv1.StatefulSet, + provisioned bool, ) *corev1.ConfigMap { t.Helper() @@ -977,6 +1098,21 @@ func createInitialResources( t.Fatalf("got unexpected error creating ConfigMap: %v", err) } + if provisioned { + genCm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.configMapGeneratedName, + Namespace: opts.namespace, + }, + Data: map[string]string{ + opts.fileName: "empty", + }, + } + if _, err := klient.CoreV1().ConfigMaps(opts.namespace).Create(ctx, genCm, metav1.CreateOptions{}); err != nil { + t.Fatalf("got unexpected error creating GeneratedConfigMap: %v", err) + } + } + for _, sts := range statefulsets { if _, err := klient.AppsV1().StatefulSets(opts.namespace).Create(ctx, sts, metav1.CreateOptions{}); err != nil { t.Fatalf("got unexpected error creating StatefulSet: %v", err)