Skip to content

Commit

Permalink
Make embedded etcd lean during restoration to avoid any restoration f…
Browse files Browse the repository at this point in the history
…ailure (#668)

* Make embedded etcd lean during restoration to avoid any failure due to etcd database space exceeds.

* Added unit tests.

* Some code and tests improvements.

* Address review comments.

* Address review comments.

* Address review comments 2.
  • Loading branch information
ishan16696 authored Nov 22, 2023
1 parent de13fc2 commit c5aa8f0
Show file tree
Hide file tree
Showing 2 changed files with 361 additions and 30 deletions.
196 changes: 166 additions & 30 deletions pkg/snapshot/restorer/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ import (
)

const (
etcdDialTimeout = time.Second * 30
etcdConnectionTimeout = 30 * time.Second
etcdCompactTimeout = 2 * time.Minute
etcdDefragTimeout = 5 * time.Minute
periodicallyMakeEtcdLeanDeltaSnapshotInterval = 10
thresholdPercentageForDBSizeAlarm float64 = 80.0 / 100.0
)

// Restorer is a struct for etcd data directory restorer
Expand Down Expand Up @@ -113,33 +117,26 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed.

defer func() {
if err := os.RemoveAll(ro.Config.TempSnapshotsDir); err != nil {
r.logger.Errorf("Failed to remove restoration temp directory %s: %v", ro.Config.TempSnapshotsDir, err)
r.logger.Errorf("failed to remove restoration temp directory %s: %v", ro.Config.TempSnapshotsDir, err)
}
}()

r.logger.Infof("Starting embedded etcd server...")
r.logger.Infof("Starting an embedded etcd server...")
e, err := miscellaneous.StartEmbeddedEtcd(r.logger, &ro)
if err != nil {
return e, err
}

embeddedEtcdEndpoints := []string{e.Clients[0].Addr().String()}

clientFactory := etcdutil.NewClientFactory(ro.NewClientFactory, brtypes.EtcdConnectionConfig{
MaxCallSendMsgSize: ro.Config.MaxCallSendMsgSize,
Endpoints: []string{e.Clients[0].Addr().String()},
Endpoints: embeddedEtcdEndpoints,
InsecureTransport: true,
})
clientKV, err := clientFactory.NewKV()
if err != nil {
return e, err
}
defer func() {
if err := clientKV.Close(); err != nil {
r.logger.Errorf("Failed to close etcd KV client: %v", err)
}
}()

r.logger.Infof("Applying delta snapshots...")
if err := r.applyDeltaSnapshots(clientKV, ro); err != nil {
if err := r.applyDeltaSnapshots(clientFactory, embeddedEtcdEndpoints, ro); err != nil {
return e, err
}

Expand All @@ -148,7 +145,11 @@ func (r *Restorer) Restore(ro brtypes.RestoreOptions, m member.Control) (*embed.
if err != nil {
return e, err
}
defer clientCluster.Close()
defer func() {
if err := clientCluster.Close(); err != nil {
r.logger.Errorf("failed to close etcd cluster client: %v", err)
}
}()
m.UpdateMemberPeerURL(context.TODO(), clientCluster)
}
return e, nil
Expand Down Expand Up @@ -411,7 +412,28 @@ func makeWALAndSnap(logger *zap.Logger, walDir, snapDir string, cl *membership.R
}

// applyDeltaSnapshots fetches the events from delta snapshots in parallel and applies them to the embedded etcd sequentially.
func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.RestoreOptions) error {
func (r *Restorer) applyDeltaSnapshots(clientFactory client.Factory, endPoints []string, ro brtypes.RestoreOptions) error {

clientKV, err := clientFactory.NewKV()
if err != nil {
return err
}
defer func() {
if err := clientKV.Close(); err != nil {
r.logger.Errorf("failed to close etcd KV client: %v", err)
}
}()

clientMaintenance, err := clientFactory.NewMaintenance()
if err != nil {
return err
}
defer func() {
if err := clientMaintenance.Close(); err != nil {
r.logger.Errorf("failed to close etcd maintenance client: %v", err)
}
}()

snapList := ro.DeltaSnapList
numMaxFetchers := ro.Config.MaxFetchers

Expand All @@ -420,6 +442,9 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest
if err := r.applyFirstDeltaSnapshot(clientKV, firstDeltaSnap); err != nil {
return err
}

embeddedEtcdQuotaBytes := float64(ro.Config.EmbeddedEtcdQuotaBytes)

if err := verifySnapshotRevision(clientKV, snapList[0]); err != nil {
return err
}
Expand All @@ -430,23 +455,29 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest
}

var (
remainingSnaps = snapList[1:]
numSnaps = len(remainingSnaps)
numFetchers = int(math.Min(float64(numMaxFetchers), float64(numSnaps)))
snapLocationsCh = make(chan string, numSnaps)
errCh = make(chan error, numFetchers+1)
fetcherInfoCh = make(chan brtypes.FetcherInfo, numSnaps)
applierInfoCh = make(chan brtypes.ApplierInfo, numSnaps)
stopCh = make(chan bool)
wg sync.WaitGroup
remainingSnaps = snapList[1:]
numSnaps = len(remainingSnaps)
numFetchers = int(math.Min(float64(numMaxFetchers), float64(numSnaps)))
snapLocationsCh = make(chan string, numSnaps)
errCh = make(chan error, numFetchers+1)
fetcherInfoCh = make(chan brtypes.FetcherInfo, numSnaps)
applierInfoCh = make(chan brtypes.ApplierInfo, numSnaps)
wg sync.WaitGroup
stopCh = make(chan bool)
stopHandleAlarmCh = make(chan bool)
dbSizeAlarmCh = make(chan string)
dbSizeAlarmDisarmCh = make(chan bool)
)

go r.applySnaps(clientKV, remainingSnaps, applierInfoCh, errCh, stopCh, &wg)
go r.applySnaps(clientKV, clientMaintenance, remainingSnaps, dbSizeAlarmCh, dbSizeAlarmDisarmCh, applierInfoCh, errCh, stopCh, &wg, endPoints, embeddedEtcdQuotaBytes)

for f := 0; f < numFetchers; f++ {
go r.fetchSnaps(f, fetcherInfoCh, applierInfoCh, snapLocationsCh, errCh, stopCh, &wg, ro.Config.TempSnapshotsDir)
}

go r.HandleAlarm(stopHandleAlarmCh, dbSizeAlarmCh, dbSizeAlarmDisarmCh, clientMaintenance)
defer close(stopHandleAlarmCh)

for i, remainingSnap := range remainingSnaps {
fetcherInfo := brtypes.FetcherInfo{
Snapshot: *remainingSnap,
Expand All @@ -456,7 +487,7 @@ func (r *Restorer) applyDeltaSnapshots(clientKV client.KVCloser, ro brtypes.Rest
}
close(fetcherInfoCh)

err := <-errCh
err = <-errCh

if cleanupErr := r.cleanup(snapLocationsCh, stopCh, &wg); cleanupErr != nil {
r.logger.Errorf("Cleanup of temporary snapshots failed: %v", cleanupErr)
Expand Down Expand Up @@ -540,13 +571,22 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan brtypes.Fet
}

// applySnaps applies delta snapshot events to the embedded etcd sequentially, in the right order of snapshots, regardless of the order in which they were fetched.
func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.SnapList, applierInfoCh <-chan brtypes.ApplierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) {
func (r *Restorer) applySnaps(clientKV client.KVCloser, clientMaintenance client.MaintenanceCloser, remainingSnaps brtypes.SnapList, dbSizeAlarmCh chan string, dbSizeAlarmDisarmCh <-chan bool, applierInfoCh <-chan brtypes.ApplierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup, endPoints []string, embeddedEtcdQuotaBytes float64) {
defer wg.Done()
wg.Add(1)

// To reduce or to stop the growing size of embedded etcd database during restoration
// it's important to track number of delta snapshots applied to an embedded etcd
// to raise an alarm(if required).
// it is initialize with "1" as backup-restore has already applied first delta snapshot.
numberOfDeltaSnapApplied := 1

// A flag to track whether a previous attempt to make embedded etcd lean failed or succeeds.
// If failed then backup-restore should re-try after applying next delta snapshot.
prevAttemptToMakeEtcdLeanFailed := false

pathList := make([]string, len(remainingSnaps))
nextSnapIndexToApply := 0

for {
select {
case _, more := <-stopCh:
Expand Down Expand Up @@ -603,6 +643,23 @@ func (r *Restorer) applySnaps(clientKV client.KVCloser, remainingSnaps brtypes.S
errCh <- nil // restore finished
return
}

numberOfDeltaSnapApplied++

if numberOfDeltaSnapApplied%periodicallyMakeEtcdLeanDeltaSnapshotInterval == 0 || prevAttemptToMakeEtcdLeanFailed {
r.logger.Info("making an embedded etcd lean and check for db size alarm")
if err := r.MakeEtcdLeanAndCheckAlarm(int64(remainingSnaps[currSnapIndex].LastRevision), endPoints, embeddedEtcdQuotaBytes, dbSizeAlarmCh, dbSizeAlarmDisarmCh, clientKV, clientMaintenance); err != nil {
r.logger.Errorf("unable to make embedded etcd lean: %v", err)
r.logger.Warn("etcd mvcc: database space might exceeds its quota limit")
r.logger.Info("backup-restore will try again in next attempt...")
// setting the flag to true
// so, backup-restore shouldn't wait for periodically call
// and it should re-try after applying next delta snapshot.
prevAttemptToMakeEtcdLeanFailed = true
} else {
prevAttemptToMakeEtcdLeanFailed = false
}
}
}
}
}
Expand Down Expand Up @@ -645,7 +702,7 @@ func (r *Restorer) applyFirstDeltaSnapshot(clientKV client.KVCloser, snap *brtyp
// the latest revision from full snapshot may overlap with first few revision on first delta snapshot
// Hence, we have to additionally take care of that.
// Refer: https://github.com/coreos/etcd/issues/9037
ctx, cancel := context.WithTimeout(context.TODO(), etcdDialTimeout)
ctx, cancel := context.WithTimeout(context.TODO(), etcdConnectionTimeout)
defer cancel()
resp, err := clientKV.Get(ctx, "", clientv3.WithLastRev()...)
if err != nil {
Expand Down Expand Up @@ -887,3 +944,82 @@ func ErrorArrayToError(errs []error) error {

return fmt.Errorf("%s", strings.TrimSpace(errString))
}

// HandleAlarm function handles alarm raised by backup-restore.
func (r *Restorer) HandleAlarm(stopHandleAlarmCh chan bool, dbSizeAlarmCh <-chan string, dbSizeAlarmDisarmCh chan bool, clientMaintenance client.MaintenanceCloser) {
r.logger.Info("Starting to handle an alarm...")
for {
select {
case <-stopHandleAlarmCh:
r.logger.Info("Closing handleAlarm...")
return
case endPoint := <-dbSizeAlarmCh:
r.logger.Info("Received a dbsize alarm")
r.logger.Infof("Calling defrag on endpoint: [%v]", endPoint)
if err := func() error {
ctx, cancel := context.WithTimeout(context.Background(), etcdDefragTimeout)
defer cancel()
if _, err := clientMaintenance.Defragment(ctx, endPoint); err != nil {
return err
}
return nil
}(); err != nil {
r.logger.Errorf("unable to disalarm as defrag call failed: %v", err)
// failed to disalarm
dbSizeAlarmDisarmCh <- false
} else {
// successfully disalarm
dbSizeAlarmDisarmCh <- true
}
}
}
}

// MakeEtcdLeanAndCheckAlarm calls etcd compaction on given revision number and raise db size alarm if embedded etcd db size crosses threshold.
func (r *Restorer) MakeEtcdLeanAndCheckAlarm(revision int64, endPoints []string, embeddedEtcdQuotaBytes float64, dbSizeAlarmCh chan string, dbSizeAlarmDisarmCh <-chan bool, clientKV client.KVCloser, clientMaintenance client.MaintenanceCloser) error {

ctx, cancel := context.WithTimeout(context.Background(), etcdCompactTimeout)
defer cancel()
if _, err := clientKV.Compact(ctx, revision, clientv3.WithCompactPhysical()); err != nil {
return fmt.Errorf("Compact API call failed: %v", err)
}
r.logger.Infof("Successfully compacted embedded etcd till revision: %v", revision)

ctx, cancel = context.WithTimeout(context.Background(), etcdConnectionTimeout)
defer cancel()

// check database size of embedded etcdServer.
status, err := clientMaintenance.Status(ctx, endPoints[0])
if err != nil {
return fmt.Errorf("unable to check embedded etcd status: %v", err)
}

if float64(status.DbSizeInUse) > thresholdPercentageForDBSizeAlarm*embeddedEtcdQuotaBytes ||
float64(status.DbSize) > thresholdPercentageForDBSizeAlarm*embeddedEtcdQuotaBytes {
r.logger.Info("Embedded etcd database size crosses the threshold limit")
r.logger.Info("Raising a dbSize alarm...")

for _, endPoint := range endPoints {
// send endpoint to alarm channel to raise an db size alarm
dbSizeAlarmCh <- endPoint

if <-dbSizeAlarmDisarmCh {
r.logger.Info("Successfully disalarm the embedded etcd dbSize alarm")
ctx, cancel := context.WithTimeout(context.Background(), etcdConnectionTimeout)
defer cancel()
if afterDefragStatus, err := clientMaintenance.Status(ctx, endPoint); err != nil {
r.logger.Warnf("failed to get status of embedded etcd with error: %v", err)
} else {
dbSizeBeforeDefrag := status.DbSize
dbSizeAfterDefrag := afterDefragStatus.DbSize
r.logger.Infof("Probable DB size change for embedded etcd: %dB -> %dB after defragmentation call", dbSizeBeforeDefrag, dbSizeAfterDefrag)
}
} else {
return fmt.Errorf("failed to disalarm the embedded etcd dbSize alarm")
}
}
} else {
r.logger.Infof("Embedded etcd dbsize: %dB didn't crosses the threshold limit: %fB", status.DbSize, thresholdPercentageForDBSizeAlarm*embeddedEtcdQuotaBytes)
}
return nil
}
Loading

0 comments on commit c5aa8f0

Please sign in to comment.