From 14365f18c379680471a83e850295d7d9d1eb9ddb Mon Sep 17 00:00:00 2001 From: Andrei Aaron Date: Sun, 29 Dec 2024 09:37:21 +0000 Subject: [PATCH] feat: redis cache driver implementation of GetAllBlobs Signed-off-by: Andrei Aaron --- pkg/storage/cache/cacheinterface.go | 1 + pkg/storage/cache/redis.go | 47 ++++++++++++++++++++++++++--- pkg/storage/cache/redis_test.go | 46 ++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 4 deletions(-) diff --git a/pkg/storage/cache/cacheinterface.go b/pkg/storage/cache/cacheinterface.go index 58044f6ed..8d3ebe679 100644 --- a/pkg/storage/cache/cacheinterface.go +++ b/pkg/storage/cache/cacheinterface.go @@ -11,6 +11,7 @@ type Cache interface { // Retrieves the blob matching provided digest. GetBlob(digest godigest.Digest) (string, error) + // Retrieves all blobs matching provided digest. GetAllBlobs(digest godigest.Digest) ([]string, error) // Uploads blob to cachedb. diff --git a/pkg/storage/cache/redis.go b/pkg/storage/cache/redis.go index c181a0abb..4352ca0b4 100644 --- a/pkg/storage/cache/redis.go +++ b/pkg/storage/cache/redis.go @@ -38,7 +38,7 @@ func NewRedisCache(parameters interface{}, log zlog.Logger) (*RedisDriver, error connOpts, err := redis.ParseURL(properParameters.URL) if err != nil { - log.Error().Err(err).Str("directory", properParameters.URL).Msg("failed to connect to redis") + log.Error().Err(err).Str("directory", properParameters.URL).Msg("failed to parse redis URL") } cacheDB := redis.NewClient(connOpts) @@ -49,6 +49,7 @@ func NewRedisCache(parameters interface{}, log zlog.Logger) (*RedisDriver, error return nil, err } + // Need to check if we need locking or not driver := &RedisDriver{ db: cacheDB, log: log, @@ -104,9 +105,9 @@ func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error { // add the key value pair [digest, path] to blobs:origin if not // exist already. the path becomes the canonical blob we do this in // a transaction to make sure that if something is in the set, then - // it is guaranteed to always have a path note that there is a - // race, but the worst case is that a different origin path that is - // still valid is used. + // it is guaranteed to always have a path + // note that there is a race, but the worst case is that a different + // origin path that is still valid is used. if err := txrp.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String(), path).Err(); err != nil { d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)). @@ -150,6 +151,44 @@ func (d *RedisDriver) GetBlob(digest godigest.Digest) (string, error) { return path, nil } +func (d *RedisDriver) GetAllBlobs(digest godigest.Digest) ([]string, error) { + blobPaths := []string{} + + ctx := context.TODO() + + originalPath, err := d.db.HGet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result() + if err != nil { + if goerrors.Is(err, redis.Nil) { + return nil, zerr.ErrCacheMiss + } + + d.log.Error().Err(err).Str("hget", join(constants.BlobsCache, constants.OriginalBucket)). + Str("digest", digest.String()).Msg("unable to get record") + + return nil, err + } + + blobPaths = append(blobPaths, originalPath) + + // see if we are in the set + duplicateBlobPaths, err := d.db.SMembers(ctx, join(constants.BlobsCache, constants.DuplicatesBucket, + digest.String())).Result() + if err != nil { + d.log.Error().Err(err).Str("smembers", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())). + Str("digest", digest.String()).Msg("unable to get record") + + return nil, err + } + + for _, item := range duplicateBlobPaths { + if item != originalPath { + blobPaths = append(blobPaths, item) + } + } + + return blobPaths, nil +} + func (d *RedisDriver) HasBlob(digest godigest.Digest, blob string) bool { ctx := context.TODO() // see if we are in the set diff --git a/pkg/storage/cache/redis_test.go b/pkg/storage/cache/redis_test.go index 6b90f1599..e4b5f78a2 100644 --- a/pkg/storage/cache/redis_test.go +++ b/pkg/storage/cache/redis_test.go @@ -124,4 +124,50 @@ func TestRedisCache(t *testing.T) { So(err, ShouldNotBeNil) So(val, ShouldBeEmpty) }) + + Convey("Test cache.GetAllBlos()", t, func() { + dir := t.TempDir() + + log := log.NewLogger("debug", "") + So(log, ShouldNotBeNil) + + cacheDriver, err := storage.Create("redis", + cache.RedisDriverParameters{dir, "redis://" + miniRedis.Addr(), true}, log) + So(cacheDriver, ShouldNotBeNil) + So(err, ShouldBeNil) + + name := cacheDriver.Name() + So(name, ShouldEqual, "redis") + + err = cacheDriver.PutBlob("digest", path.Join(dir, "first")) + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("digest", path.Join(dir, "second")) + So(err, ShouldBeNil) + + err = cacheDriver.PutBlob("digest", path.Join(dir, "third")) + So(err, ShouldBeNil) + + blobs, err := cacheDriver.GetAllBlobs("digest") + So(err, ShouldBeNil) + + So(blobs, ShouldResemble, []string{"first", "second", "third"}) + + err = cacheDriver.DeleteBlob("digest", path.Join(dir, "first")) + So(err, ShouldBeNil) + + blobs, err = cacheDriver.GetAllBlobs("digest") + So(err, ShouldBeNil) + So(len(blobs), ShouldEqual, 2) + So(blobs, ShouldContain, "second") + So(blobs, ShouldContain, "third") + + err = cacheDriver.DeleteBlob("digest", path.Join(dir, "third")) + So(err, ShouldBeNil) + + blobs, err = cacheDriver.GetAllBlobs("digest") + So(err, ShouldBeNil) + + So(blobs, ShouldResemble, []string{"second"}) + }) }