Skip to content

Commit

Permalink
Enhancement: Implement Chunk Deletion for Multi-part Uploaded Files o…
Browse files Browse the repository at this point in the history
…n GCP and Openstack Cloud Providers (#673)

* Added generic functionality to garbage collect the chunks when GC is triggered instead of waiting for the chunks to get deleted when the corresponding snapshot is garbage collected

* Deleting chunks from GCS bucket immediately after the composite object is uploaded

* Deleting chunks from OpenStack bucket immediately after the composite object is uploaded

* Add logs for count of chunks getting deleted

* Updated comments and code for readability

* Fixed a bug in tests

* Transfer the immediate chunk deletion logic to new function with context timeout

* Timeout same as chunk upload timeout
* Collected individual chunk deletion errors into a errList
* Affected code: Gcs & Swift snapstore chunk deletion

* Add logs after successfully performing chunk deletion

* Fix snapstore tests, update comments

* Parallelize chunk deletion
* For better performance
* To prevent blocking the snapshotter while chunk deletion for older snapshot is underway

* Add comment

* Add mutex to prevent DATA RACE in ObjectMap of mock GCS & Swift clients while deleting chunks in parallel

* Minor fixes

* Add mutex to prevent parallel read & write for GCS Mock client

* Improved error collection in chunk deletion

* Remove immediate chunk deletion, address edge case in Garbage Collection

* Address review comments

* Minor fixes
  • Loading branch information
anveshreddy18 authored Nov 22, 2023
1 parent 5a75019 commit de13fc2
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 23 deletions.
37 changes: 25 additions & 12 deletions pkg/snapshot/snapshotter/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

// RunGarbageCollector basically consider the older backups as garbage and deletes it
Expand Down Expand Up @@ -59,6 +58,11 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
continue
}

// chunksDeleted stores the no of chunks deleted in the current iteration of GC
var chunksDeleted int
chunksDeleted, snapList = ssr.GarbageCollectChunks(snapList)
ssr.logger.Infof("GC: Total number garbage collected chunks: %d", chunksDeleted)

snapStreamIndexList := getSnapStreamIndexList(snapList)

switch ssr.config.GarbageCollectionPolicy {
Expand Down Expand Up @@ -142,7 +146,6 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
}
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc()
total++
garbageCollectChunks(ssr.store, snapList, snapStreamIndexList[snapStreamIndex-1]+1, snapStreamIndexList[snapStreamIndex])
}
}

Expand All @@ -167,7 +170,6 @@ func (ssr *Snapshotter) RunGarbageCollector(stopCh <-chan struct{}) {
}
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc()
total++
garbageCollectChunks(ssr.store, snapList, snapStreamIndexList[snapStreamIndex]+1, snapStreamIndexList[snapStreamIndex+1])
}
}
}
Expand All @@ -193,24 +195,35 @@ func getSnapStreamIndexList(snapList brtypes.SnapList) []int {
return snapStreamIndexList
}

// garbageCollectChunks deletes the chunks in the store from snaplist starting at index low (inclusive) till high (exclusive).
func garbageCollectChunks(store brtypes.SnapStore, snapList brtypes.SnapList, low, high int) {
for index := low; index < high; index++ {
snap := snapList[index]
// Only delete chunk snapshots of kind Full
if snap.Kind != brtypes.SnapshotKindFull || !snap.IsChunk {
// GarbageCollectChunks removes obsolete chunks based on the latest recorded snapshot.
// It eliminates chunks associated with snapshots that have already been uploaded.
// Additionally, it avoids deleting chunks linked to snapshots currently being uploaded to prevent the garbage collector from removing chunks before the composite is formed.
func (ssr *Snapshotter) GarbageCollectChunks(snapList brtypes.SnapList) (int, brtypes.SnapList) {
var nonChunkSnapList brtypes.SnapList
chunksDeleted := 0
for _, snap := range snapList {
// If not chunk, add to list and continue
if !snap.IsChunk {
nonChunkSnapList = append(nonChunkSnapList, snap)
continue
}
// Skip the chunk deletion if it's corresponding full/delta snapshot is not uploaded yet
if ssr.prevSnapshot.LastRevision == 0 || snap.StartRevision > ssr.prevSnapshot.LastRevision {
continue
}
// delete the chunk object
snapPath := path.Join(snap.SnapDir, snap.SnapName)
logrus.Infof("GC: Deleting chunk for old full snapshot: %s", snapPath)
if err := store.Delete(*snap); err != nil {
logrus.Warnf("GC: Failed to delete snapshot %s: %v", snapPath, err)
ssr.logger.Infof("GC: Deleting chunk for old snapshot: %s", snapPath)
if err := ssr.store.Delete(*snap); err != nil {
ssr.logger.Warnf("GC: Failed to delete chunk %s: %v", snapPath, err)
metrics.SnapshotterOperationFailure.With(prometheus.Labels{metrics.LabelError: err.Error()}).Inc()
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindChunk, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Inc()
continue
}
chunksDeleted++
metrics.GCSnapshotCounter.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindChunk, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Inc()
}
return chunksDeleted, nonChunkSnapList
}

/*
Expand Down
7 changes: 7 additions & 0 deletions pkg/snapstore/gcs_snapstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (m *mockBucketHandle) Object(name string) stiface.ObjectHandle {
}

func (m *mockBucketHandle) Objects(context.Context, *storage.Query) stiface.ObjectIterator {
m.client.objectMutex.Lock()
defer m.client.objectMutex.Unlock()
var keys []string
for key := range m.client.objects {
keys = append(keys, key)
Expand All @@ -65,6 +67,8 @@ type mockObjectHandle struct {
}

func (m *mockObjectHandle) NewReader(ctx context.Context) (stiface.Reader, error) {
m.client.objectMutex.Lock()
defer m.client.objectMutex.Unlock()
if value, ok := m.client.objects[m.object]; ok {
return &mockObjectReader{reader: io.NopCloser(bytes.NewReader(*value))}, nil
}
Expand All @@ -84,6 +88,8 @@ func (m *mockObjectHandle) ComposerFrom(objects ...stiface.ObjectHandle) stiface
}

func (m *mockObjectHandle) Delete(context.Context) error {
m.client.objectMutex.Lock()
defer m.client.objectMutex.Unlock()
if _, ok := m.client.objects[m.object]; ok {
delete(m.client.objects, m.object)
return nil
Expand Down Expand Up @@ -117,6 +123,7 @@ type mockComposer struct {

func (m *mockComposer) Run(ctx context.Context) (*storage.ObjectAttrs, error) {
dstWriter := m.dst.NewWriter(ctx)
defer dstWriter.Close()
for _, obj := range m.objectHandles {
r, err := obj.NewReader(ctx)
if err != nil {
Expand Down
24 changes: 14 additions & 10 deletions pkg/snapstore/snapstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
objectMap[path.Join(prefixV1, snap2.SnapDir, snap2.SnapName)] = &expectedVal2

logrus.Infof("Running mock tests for %s when only v1 is present", key)
// List snap1 and snap3
// List snap1 and snap2
snapList, err := snapStore.List()
Expect(err).ShouldNot(HaveOccurred())
Expect(snapList.Len()).To(Equal(2))
Expect(snapList[0].SnapName).To(Equal(snap2.SnapName))
// Fetch snap3
// Fetch snap2
rc, err := snapStore.Fetch(*snapList[1])
Expect(err).ShouldNot(HaveOccurred())
defer rc.Close()
Expand All @@ -164,12 +164,13 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
snapList, err = snapStore.List()
Expect(err).ShouldNot(HaveOccurred())
Expect(snapList.Len()).To(Equal(prevLen - 1))
// Save snapshot
// reset the objectMap
resetObjectMap()
dummyData := make([]byte, 6*1024*1024)
// Save a new snapshot 'snap3'
err = snapStore.Save(snap3, io.NopCloser(bytes.NewReader(dummyData)))
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).Should(BeNumerically(">", 0))
Expect(len(objectMap)).Should(BeNumerically(">=", 1))
}
})
})
Expand Down Expand Up @@ -217,19 +218,20 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
err = snapStore.Delete(*snapList[2])
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).To(Equal(prevLen - 1))

// Save snapshot
// reset the objectMap
resetObjectMap()
// Save a new snapshot 'snap1'
dummyData := make([]byte, 6*1024*1024)
err = snapStore.Save(snap1, io.NopCloser(bytes.NewReader(dummyData)))
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).Should(BeNumerically(">", 0))
Expect(len(objectMap)).Should(BeNumerically(">=", 1))

// Save another new snapshot 'snap4'
prevLen = len(objectMap)
dummyData = make([]byte, 6*1024*1024)
err = snapStore.Save(snap4, io.NopCloser(bytes.NewReader(dummyData)))
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).Should(BeNumerically(">", prevLen))
Expect(len(objectMap)).Should(BeNumerically(">=", prevLen+1))
}
})
})
Expand All @@ -248,6 +250,7 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
Expect(err).ShouldNot(HaveOccurred())
Expect(snapList.Len()).To(Equal(2))
Expect(snapList[0].SnapName).To(Equal(snap4.SnapName))
Expect(snapList[1].SnapName).To(Equal(snap5.SnapName))
// Fetch snap5
rc, err := snapStore.Fetch(*snapList[1])
Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -263,12 +266,13 @@ var _ = Describe("Save, List, Fetch, Delete from mock snapstore", func() {
snapList, err = snapStore.List()
Expect(err).ShouldNot(HaveOccurred())
Expect(snapList.Len()).To(Equal(prevLen - 1))
// Save snapshot
// Reset the objectMap
resetObjectMap()
// Save a new snapshot 'snap4'
dummyData := make([]byte, 6*1024*1024)
err = snapStore.Save(snap4, io.NopCloser(bytes.NewReader(dummyData)))
Expect(err).ShouldNot(HaveOccurred())
Expect(len(objectMap)).Should(BeNumerically(">", 0))
Expect(len(objectMap)).Should(BeNumerically(">=", 1))
}
})
})
Expand Down
9 changes: 8 additions & 1 deletion pkg/snapstore/swift_snapstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func handleCreateTextObject(w http.ResponseWriter, r *http.Request) {
// handleDownloadObject creates an HTTP handler at `/testContainer/testObject` on the test handler mux that
// responds with a `Download` response.
func handleDownloadObject(w http.ResponseWriter, r *http.Request) {
objectMapMutex.Lock()
defer objectMapMutex.Unlock()

prefix := parseObjectNamefromURL(r.URL)
var contents []byte
for key, val := range objectMap {
Expand All @@ -109,8 +112,10 @@ func handleDownloadObject(w http.ResponseWriter, r *http.Request) {
// handleListObjectNames creates an HTTP handler at `/testContainer` on the test handler mux that
// responds with a `List` response when only object names are requested.
func handleListObjectNames(w http.ResponseWriter, r *http.Request) {
marker := r.URL.Query().Get("marker")
objectMapMutex.Lock()
defer objectMapMutex.Unlock()

marker := r.URL.Query().Get("marker")
// To store the keys in slice in sorted order
var keys, contents []string
for k := range objectMap {
Expand All @@ -132,6 +137,8 @@ func handleListObjectNames(w http.ResponseWriter, r *http.Request) {
// handleDeleteObject creates an HTTP handler at `/testContainer/testObject` on the test handler mux that
// responds with a `Delete` response.
func handleDeleteObject(w http.ResponseWriter, r *http.Request) {
objectMapMutex.Lock()
defer objectMapMutex.Unlock()
key := parseObjectNamefromURL(r.URL)
if _, ok := objectMap[key]; ok {
delete(objectMap, key)
Expand Down

0 comments on commit de13fc2

Please sign in to comment.