From c5aa8f099388aa51beb686f593c400e4d274a269 Mon Sep 17 00:00:00 2001 From: Ishan Tyagi <42602577+ishan16696@users.noreply.github.com> Date: Wed, 22 Nov 2023 20:54:09 +0530 Subject: [PATCH] Make embedded etcd lean during restoration to avoid any restoration failure (#668) * Make embedded etcd lean during restoration to avoid any failure due to etcd database space exceeds. * Added unit tests. * Some code and tests improvements. * Address review comments. * Address review comments. * Address review comments 2. --- pkg/snapshot/restorer/restorer.go | 196 +++++++++++++++++++++---- pkg/snapshot/restorer/restorer_test.go | 195 ++++++++++++++++++++++++ 2 files changed, 361 insertions(+), 30 deletions(-) diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 8fa78ab4a..daedf2371 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -57,7 +57,11 @@ import ( ) const ( - etcdDialTimeout = time.Second * 30 + etcdConnectionTimeout = 30 * time.Second + etcdCompactTimeout = 2 * time.Minute + etcdDefragTimeout = 5 * time.Minute + periodicallyMakeEtcdLeanDeltaSnapshotInterval = 10 + thresholdPercentageForDBSizeAlarm float64 = 80.0 / 100.0 ) // Restorer is a struct for etcd data directory restorer @@ -113,33 +117,26 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed. defer func() { if err := os.RemoveAll(ro.Config.TempSnapshotsDir); err != nil { - r.logger.Errorf("Failed to remove restoration temp directory %s: %v", ro.Config.TempSnapshotsDir, err) + r.logger.Errorf("failed to remove restoration temp directory %s: %v", ro.Config.TempSnapshotsDir, err) } }() - r.logger.Infof("Starting embedded etcd server...") + r.logger.Infof("Starting an embedded etcd server...") e, err := miscellaneous.StartEmbeddedEtcd(r.logger, &ro) if err != nil { return e, err } + embeddedEtcdEndpoints := []string{e.Clients[0].Addr().String()} + clientFactory := etcdutil.NewClientFactory(ro.NewClientFactory, brtypes.EtcdConnectionConfig{ MaxCallSendMsgSize: ro.Config.MaxCallSendMsgSize, - Endpoints: []string{e.Clients[0].Addr().String()}, + Endpoints: embeddedEtcdEndpoints, InsecureTransport: true, }) - clientKV, err := clientFactory.NewKV() - if err != nil { - return e, err - } - defer func() { - if err := clientKV.Close(); err != nil { - r.logger.Errorf("Failed to close etcd KV client: %v", err) - } - }() r.logger.Infof("Applying delta snapshots...") - if err := r.applyDeltaSnapshots(clientKV, ro); err != nil { + if err := r.applyDeltaSnapshots(clientFactory, embeddedEtcdEndpoints, ro); err != nil { return e, err } @@ -148,7 +145,11 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed. if err != nil { return e, err } - defer clientCluster.Close() + defer func() { + if err := clientCluster.Close(); err != nil { + r.logger.Errorf("failed to close etcd cluster client: %v", err) + } + }() m.UpdateMemberPeerURL(context.TODO(), clientCluster) } return e, nil @@ -411,7 +412,28 @@ func makeWALAndSnap(logger *zap.Logger, walDir, snapDir string, cl *membership.R } // applyDeltaSnapshots fetches the events from delta snapshots in parallel and applies them to the embedded etcd sequentially. -func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.RestoreOptions) error { +func (r *Restorer) applyDeltaSnapshots(clientFactory client.Factory, endPoints []string, ro brtypes.RestoreOptions) error { + + clientKV, err := clientFactory.NewKV() + if err != nil { + return err + } + defer func() { + if err := clientKV.Close(); err != nil { + r.logger.Errorf("failed to close etcd KV client: %v", err) + } + }() + + clientMaintenance, err := clientFactory.NewMaintenance() + if err != nil { + return err + } + defer func() { + if err := clientMaintenance.Close(); err != nil { + r.logger.Errorf("failed to close etcd maintenance client: %v", err) + } + }() + snapList := ro.DeltaSnapList numMaxFetchers := ro.Config.MaxFetchers @@ -420,6 +442,9 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest if err := r.applyFirstDeltaSnapshot(clientKV, firstDeltaSnap); err != nil { return err } + + embeddedEtcdQuotaBytes := float64(ro.Config.EmbeddedEtcdQuotaBytes) + if err := verifySnapshotRevision(clientKV, snapList[0]); err != nil { return err } @@ -430,23 +455,29 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest } var ( - remainingSnaps = snapList[1:] - numSnaps = len(remainingSnaps) - numFetchers = int(math.Min(float64(numMaxFetchers), float64(numSnaps))) - snapLocationsCh = make(chan string, numSnaps) - errCh = make(chan error, numFetchers+1) - fetcherInfoCh = make(chan brtypes.FetcherInfo, numSnaps) - applierInfoCh = make(chan brtypes.ApplierInfo, numSnaps) - stopCh = make(chan bool) - wg sync.WaitGroup + remainingSnaps = snapList[1:] + numSnaps = len(remainingSnaps) + numFetchers = int(math.Min(float64(numMaxFetchers), float64(numSnaps))) + snapLocationsCh = make(chan string, numSnaps) + errCh = make(chan error, numFetchers+1) + fetcherInfoCh = make(chan brtypes.FetcherInfo, numSnaps) + applierInfoCh = make(chan brtypes.ApplierInfo, numSnaps) + wg sync.WaitGroup + stopCh = make(chan bool) + stopHandleAlarmCh = make(chan bool) + dbSizeAlarmCh = make(chan string) + dbSizeAlarmDisarmCh = make(chan bool) ) - go r.applySnaps(clientKV, remainingSnaps, applierInfoCh, errCh, stopCh, &wg) + go r.applySnaps(clientKV, clientMaintenance, remainingSnaps, dbSizeAlarmCh, dbSizeAlarmDisarmCh, applierInfoCh, errCh, stopCh, &wg, endPoints, embeddedEtcdQuotaBytes) for f := 0; f < numFetchers; f++ { go r.fetchSnaps(f, fetcherInfoCh, applierInfoCh, snapLocationsCh, errCh, stopCh, &wg, ro.Config.TempSnapshotsDir) } + go r.HandleAlarm(stopHandleAlarmCh, dbSizeAlarmCh, dbSizeAlarmDisarmCh, clientMaintenance) + defer close(stopHandleAlarmCh) + for i, remainingSnap := range remainingSnaps { fetcherInfo := brtypes.FetcherInfo{ Snapshot: *remainingSnap, @@ -456,7 +487,7 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest } close(fetcherInfoCh) - err := <-errCh + err = <-errCh if cleanupErr := r.cleanup(snapLocationsCh, stopCh, &wg); cleanupErr != nil { r.logger.Errorf("Cleanup of temporary snapshots failed: %v", cleanupErr) @@ -540,13 +571,22 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan brtypes.Fet } // applySnaps applies delta snapshot events to the embedded etcd sequentially, in the right order of snapshots, regardless of the order in which they were fetched. -func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.SnapList, applierInfoCh <-chan brtypes.ApplierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) { +func (r *Restorer) applySnaps(clientKV client.KVCloser, clientMaintenance client.MaintenanceCloser, remainingSnaps brtypes.SnapList, dbSizeAlarmCh chan string, dbSizeAlarmDisarmCh <-chan bool, applierInfoCh <-chan brtypes.ApplierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup, endPoints []string, embeddedEtcdQuotaBytes float64) { defer wg.Done() wg.Add(1) + // To reduce or to stop the growing size of embedded etcd database during restoration + // it's important to track number of delta snapshots applied to an embedded etcd + // to raise an alarm(if required). + // it is initialize with "1" as backup-restore has already applied first delta snapshot. + numberOfDeltaSnapApplied := 1 + + // A flag to track whether a previous attempt to make embedded etcd lean failed or succeeds. + // If failed then backup-restore should re-try after applying next delta snapshot. + prevAttemptToMakeEtcdLeanFailed := false + pathList := make([]string, len(remainingSnaps)) nextSnapIndexToApply := 0 - for { select { case _, more := <-stopCh: @@ -603,6 +643,23 @@ func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.S errCh <- nil // restore finished return } + + numberOfDeltaSnapApplied++ + + if numberOfDeltaSnapApplied%periodicallyMakeEtcdLeanDeltaSnapshotInterval == 0 || prevAttemptToMakeEtcdLeanFailed { + r.logger.Info("making an embedded etcd lean and check for db size alarm") + if err := r.MakeEtcdLeanAndCheckAlarm(int64(remainingSnaps[currSnapIndex].LastRevision), endPoints, embeddedEtcdQuotaBytes, dbSizeAlarmCh, dbSizeAlarmDisarmCh, clientKV, clientMaintenance); err != nil { + r.logger.Errorf("unable to make embedded etcd lean: %v", err) + r.logger.Warn("etcd mvcc: database space might exceeds its quota limit") + r.logger.Info("backup-restore will try again in next attempt...") + // setting the flag to true + // so, backup-restore shouldn't wait for periodically call + // and it should re-try after applying next delta snapshot. + prevAttemptToMakeEtcdLeanFailed = true + } else { + prevAttemptToMakeEtcdLeanFailed = false + } + } } } } @@ -645,7 +702,7 @@ func (r *Restorer) applyFirstDeltaSnapshot(clientKV client.KVCloser, snap *brtyp // the latest revision from full snapshot may overlap with first few revision on first delta snapshot // Hence, we have to additionally take care of that. // Refer: https://github.com/coreos/etcd/issues/9037 - ctx, cancel := context.WithTimeout(context.TODO(), etcdDialTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), etcdConnectionTimeout) defer cancel() resp, err := clientKV.Get(ctx, "", clientv3.WithLastRev()...) if err != nil { @@ -887,3 +944,82 @@ func ErrorArrayToError(errs []error) error { return fmt.Errorf("%s", strings.TrimSpace(errString)) } + +// HandleAlarm function handles alarm raised by backup-restore. +func (r *Restorer) HandleAlarm(stopHandleAlarmCh chan bool, dbSizeAlarmCh <-chan string, dbSizeAlarmDisarmCh chan bool, clientMaintenance client.MaintenanceCloser) { + r.logger.Info("Starting to handle an alarm...") + for { + select { + case <-stopHandleAlarmCh: + r.logger.Info("Closing handleAlarm...") + return + case endPoint := <-dbSizeAlarmCh: + r.logger.Info("Received a dbsize alarm") + r.logger.Infof("Calling defrag on endpoint: [%v]", endPoint) + if err := func() error { + ctx, cancel := context.WithTimeout(context.Background(), etcdDefragTimeout) + defer cancel() + if _, err := clientMaintenance.Defragment(ctx, endPoint); err != nil { + return err + } + return nil + }(); err != nil { + r.logger.Errorf("unable to disalarm as defrag call failed: %v", err) + // failed to disalarm + dbSizeAlarmDisarmCh <- false + } else { + // successfully disalarm + dbSizeAlarmDisarmCh <- true + } + } + } +} + +// MakeEtcdLeanAndCheckAlarm calls etcd compaction on given revision number and raise db size alarm if embedded etcd db size crosses threshold. +func (r *Restorer) MakeEtcdLeanAndCheckAlarm(revision int64, endPoints []string, embeddedEtcdQuotaBytes float64, dbSizeAlarmCh chan string, dbSizeAlarmDisarmCh <-chan bool, clientKV client.KVCloser, clientMaintenance client.MaintenanceCloser) error { + + ctx, cancel := context.WithTimeout(context.Background(), etcdCompactTimeout) + defer cancel() + if _, err := clientKV.Compact(ctx, revision, clientv3.WithCompactPhysical()); err != nil { + return fmt.Errorf("Compact API call failed: %v", err) + } + r.logger.Infof("Successfully compacted embedded etcd till revision: %v", revision) + + ctx, cancel = context.WithTimeout(context.Background(), etcdConnectionTimeout) + defer cancel() + + // check database size of embedded etcdServer. + status, err := clientMaintenance.Status(ctx, endPoints[0]) + if err != nil { + return fmt.Errorf("unable to check embedded etcd status: %v", err) + } + + if float64(status.DbSizeInUse) > thresholdPercentageForDBSizeAlarm*embeddedEtcdQuotaBytes || + float64(status.DbSize) > thresholdPercentageForDBSizeAlarm*embeddedEtcdQuotaBytes { + r.logger.Info("Embedded etcd database size crosses the threshold limit") + r.logger.Info("Raising a dbSize alarm...") + + for _, endPoint := range endPoints { + // send endpoint to alarm channel to raise an db size alarm + dbSizeAlarmCh <- endPoint + + if <-dbSizeAlarmDisarmCh { + r.logger.Info("Successfully disalarm the embedded etcd dbSize alarm") + ctx, cancel := context.WithTimeout(context.Background(), etcdConnectionTimeout) + defer cancel() + if afterDefragStatus, err := clientMaintenance.Status(ctx, endPoint); err != nil { + r.logger.Warnf("failed to get status of embedded etcd with error: %v", err) + } else { + dbSizeBeforeDefrag := status.DbSize + dbSizeAfterDefrag := afterDefragStatus.DbSize + r.logger.Infof("Probable DB size change for embedded etcd: %dB -> %dB after defragmentation call", dbSizeBeforeDefrag, dbSizeAfterDefrag) + } + } else { + return fmt.Errorf("failed to disalarm the embedded etcd dbSize alarm") + } + } + } else { + r.logger.Infof("Embedded etcd dbsize: %dB didn't crosses the threshold limit: %fB", status.DbSize, thresholdPercentageForDBSizeAlarm*embeddedEtcdQuotaBytes) + } + return nil +} diff --git a/pkg/snapshot/restorer/restorer_test.go b/pkg/snapshot/restorer/restorer_test.go index cbe8cb105..d4a406084 100644 --- a/pkg/snapshot/restorer/restorer_test.go +++ b/pkg/snapshot/restorer/restorer_test.go @@ -27,10 +27,13 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" + mockfactory "github.com/gardener/etcd-backup-restore/pkg/mock/etcdutil/client" "github.com/gardener/etcd-backup-restore/pkg/snapstore" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/gardener/etcd-backup-restore/test/utils" + "github.com/golang/mock/gomock" "github.com/sirupsen/logrus" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/pkg/types" . "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" @@ -630,6 +633,198 @@ var _ = Describe("Running Restorer", func() { }) }) }) + + Describe("Handle Alarm and Make etcd lean", func() { + var ( + ctrl *gomock.Controller + factory *mockfactory.MockFactory + cm *mockfactory.MockMaintenanceCloser + ckv *mockfactory.MockKVCloser + + dummyRevisionNo = int64(1111) + dummyEtcdEndpoints = []string{"http://127.0.0.1:9999", "http://127.0.0.1:9900"} + dummyEmbeddedEtcdQuotaBytes = float64(100) // 100B + ) + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + factory = mockfactory.NewMockFactory(ctrl) + cm = mockfactory.NewMockMaintenanceCloser(ctrl) + ckv = mockfactory.NewMockKVCloser(ctrl) + restorer, err = NewRestorer(store, logger) + Expect(err).ShouldNot(HaveOccurred()) + }) + + Context("Etcd database size within the threshold limit", func() { + var ( + dummyDBSize = int64(50) + dummyDBSizeInUse = int64(25) + ) + BeforeEach(func() { + factory.EXPECT().NewMaintenance().Return(cm, nil).AnyTimes() + factory.EXPECT().NewKV().Return(ckv, nil).AnyTimes() + }) + + Context("unable to compact etcd", func() { + It("should return error", func() { + var ( + dbSizeAlarmCh = make(chan string) + dbSizeDisAlarmCh = make(chan bool) + ) + + ckv.EXPECT().Compact(gomock.Any(), gomock.Any(), gomock.Any()).Return( + nil, fmt.Errorf("dummy compact etcd error"), + ).AnyTimes() + + clientMaintenance, err := factory.NewMaintenance() + Expect(err).ShouldNot(HaveOccurred()) + + clientKV, err := factory.NewKV() + Expect(err).ShouldNot(HaveOccurred()) + + err = restorer.MakeEtcdLeanAndCheckAlarm(dummyRevisionNo, dummyEtcdEndpoints, dummyEmbeddedEtcdQuotaBytes, dbSizeAlarmCh, dbSizeDisAlarmCh, clientKV, clientMaintenance) + Expect(err).Should(HaveOccurred()) + }) + }) + + Context("able to compact etcd but unable to check etcd status", func() { + It("should return error", func() { + var ( + dbSizeAlarmCh = make(chan string) + dbSizeDisAlarmCh = make(chan bool) + ) + + ckv.EXPECT().Compact(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + cm.EXPECT().Status(gomock.Any(), gomock.Any()).Return( + nil, fmt.Errorf("dummy etcd status error"), + ).AnyTimes() + + clientMaintenance, err := factory.NewMaintenance() + Expect(err).ShouldNot(HaveOccurred()) + + clientKV, err := factory.NewKV() + Expect(err).ShouldNot(HaveOccurred()) + + err = restorer.MakeEtcdLeanAndCheckAlarm(dummyRevisionNo, dummyEtcdEndpoints, dummyEmbeddedEtcdQuotaBytes, dbSizeAlarmCh, dbSizeDisAlarmCh, clientKV, clientMaintenance) + Expect(err).Should(HaveOccurred()) + }) + }) + + Context("able to compact etcd and check the etcd status", func() { + It("shouldn't return error", func() { + var ( + dbSizeAlarmCh = make(chan string) + dbSizeDisAlarmCh = make(chan bool) + ) + + ckv.EXPECT().Compact(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + cm.EXPECT().Status(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ string) (*clientv3.StatusResponse, error) { + response := new(clientv3.StatusResponse) + // setting the db size + response.DbSize = dummyDBSize + response.DbSizeInUse = dummyDBSizeInUse + return response, nil + }).AnyTimes() + + clientMaintenance, err := factory.NewMaintenance() + Expect(err).ShouldNot(HaveOccurred()) + + clientKV, err := factory.NewKV() + Expect(err).ShouldNot(HaveOccurred()) + + err = restorer.MakeEtcdLeanAndCheckAlarm(dummyRevisionNo, dummyEtcdEndpoints, dummyEmbeddedEtcdQuotaBytes, dbSizeAlarmCh, dbSizeDisAlarmCh, clientKV, clientMaintenance) + Expect(err).ShouldNot(HaveOccurred()) + }) + }) + }) + + Context("Etcd database size crosses the threshold limit", func() { + var ( + dummyDBSize = int64(90) + dummyDBSizeInUse = int64(50) + dummyDBSizeAfterDefrag = int64(50) + ) + BeforeEach(func() { + factory.EXPECT().NewMaintenance().Return(cm, nil).AnyTimes() + factory.EXPECT().NewKV().Return(ckv, nil).AnyTimes() + }) + + Context("compact but unable to defragment the given endpoint", func() { + It("should return error", func() { + var ( + stopHandleAlarmCh = make(chan bool) + dbSizeAlarmCh = make(chan string) + dbSizeDisAlarmCh = make(chan bool) + ) + + ckv.EXPECT().Compact(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + cm.EXPECT().Status(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ string) (*clientv3.StatusResponse, error) { + response := new(clientv3.StatusResponse) + // setting the db size + response.DbSize = dummyDBSize + response.DbSizeInUse = dummyDBSizeInUse + return response, nil + }).AnyTimes() + + cm.EXPECT().Defragment(gomock.Any(), gomock.Any()).Return( + nil, fmt.Errorf("dummy defrag error"), + ).AnyTimes() + + clientMaintenance, err := factory.NewMaintenance() + Expect(err).ShouldNot(HaveOccurred()) + + clientKV, err := factory.NewKV() + Expect(err).ShouldNot(HaveOccurred()) + + go restorer.HandleAlarm(stopHandleAlarmCh, dbSizeAlarmCh, dbSizeDisAlarmCh, clientMaintenance) + defer close(stopHandleAlarmCh) + err = restorer.MakeEtcdLeanAndCheckAlarm(dummyRevisionNo, dummyEtcdEndpoints, dummyEmbeddedEtcdQuotaBytes, dbSizeAlarmCh, dbSizeDisAlarmCh, clientKV, clientMaintenance) + Expect(err).Should(HaveOccurred()) + }) + }) + + Context("compact and defragment the given etcd endpoint", func() { + It("shouldn't return any error", func() { + var ( + stopHandleAlarmCh = make(chan bool) + dbSizeAlarmCh = make(chan string) + dbSizeDisAlarmCh = make(chan bool) + ) + + ckv.EXPECT().Compact(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + cm.EXPECT().Status(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ string) (*clientv3.StatusResponse, error) { + response := new(clientv3.StatusResponse) + // setting the db size before defrag + response.DbSize = dummyDBSize + response.DbSizeInUse = dummyDBSizeInUse + return response, nil + }).MaxTimes(1) + + cm.EXPECT().Status(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ string) (*clientv3.StatusResponse, error) { + response := new(clientv3.StatusResponse) + // setting the db size after defrag + response.DbSize = dummyDBSizeAfterDefrag + response.DbSizeInUse = dummyDBSizeAfterDefrag + return response, nil + }).AnyTimes() + + cm.EXPECT().Defragment(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + clientMaintenance, err := factory.NewMaintenance() + Expect(err).ShouldNot(HaveOccurred()) + + clientKV, err := factory.NewKV() + Expect(err).ShouldNot(HaveOccurred()) + + go restorer.HandleAlarm(stopHandleAlarmCh, dbSizeAlarmCh, dbSizeDisAlarmCh, clientMaintenance) + defer close(stopHandleAlarmCh) + err = restorer.MakeEtcdLeanAndCheckAlarm(dummyRevisionNo, dummyEtcdEndpoints, dummyEmbeddedEtcdQuotaBytes, dbSizeAlarmCh, dbSizeDisAlarmCh, clientKV, clientMaintenance) + Expect(err).ShouldNot(HaveOccurred()) + }) + }) + }) + }) }) var _ = Describe("Running Restorer when both v1 and v2 directory structures are present", func() {