Skip to content

Commit

Permalink
feat: redis cache driver implementation of GetAllBlobs
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
  • Loading branch information
andaaron committed Jan 8, 2025
1 parent ea3de19 commit 14365f1
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/storage/cache/cacheinterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Cache interface {
// Retrieves the blob matching provided digest.
GetBlob(digest godigest.Digest) (string, error)

// Retrieves all blobs matching provided digest.
GetAllBlobs(digest godigest.Digest) ([]string, error)

// Uploads blob to cachedb.
Expand Down
47 changes: 43 additions & 4 deletions pkg/storage/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewRedisCache(parameters interface{}, log zlog.Logger) (*RedisDriver, error

connOpts, err := redis.ParseURL(properParameters.URL)
if err != nil {
log.Error().Err(err).Str("directory", properParameters.URL).Msg("failed to connect to redis")
log.Error().Err(err).Str("directory", properParameters.URL).Msg("failed to parse redis URL")
}

cacheDB := redis.NewClient(connOpts)
Expand All @@ -49,6 +49,7 @@ func NewRedisCache(parameters interface{}, log zlog.Logger) (*RedisDriver, error
return nil, err
}

// Need to check if we need locking or not
driver := &RedisDriver{
db: cacheDB,
log: log,
Expand Down Expand Up @@ -104,9 +105,9 @@ func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {
// add the key value pair [digest, path] to blobs:origin if not
// exist already. the path becomes the canonical blob we do this in
// a transaction to make sure that if something is in the set, then
// it is guaranteed to always have a path note that there is a
// race, but the worst case is that a different origin path that is
// still valid is used.
// it is guaranteed to always have a path
// note that there is a race, but the worst case is that a different
// origin path that is still valid is used.
if err := txrp.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), path).Err(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).
Expand Down Expand Up @@ -150,6 +151,44 @@ func (d *RedisDriver) GetBlob(digest godigest.Digest) (string, error) {
return path, nil
}

func (d *RedisDriver) GetAllBlobs(digest godigest.Digest) ([]string, error) {
blobPaths := []string{}

ctx := context.TODO()

originalPath, err := d.db.HGet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
if goerrors.Is(err, redis.Nil) {
return nil, zerr.ErrCacheMiss
}

d.log.Error().Err(err).Str("hget", join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")

return nil, err
}

blobPaths = append(blobPaths, originalPath)

// see if we are in the set
duplicateBlobPaths, err := d.db.SMembers(ctx, join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String())).Result()
if err != nil {
d.log.Error().Err(err).Str("smembers", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("digest", digest.String()).Msg("unable to get record")

return nil, err
}

for _, item := range duplicateBlobPaths {
if item != originalPath {
blobPaths = append(blobPaths, item)
}
}

return blobPaths, nil
}

func (d *RedisDriver) HasBlob(digest godigest.Digest, blob string) bool {
ctx := context.TODO()
// see if we are in the set
Expand Down
46 changes: 46 additions & 0 deletions pkg/storage/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,50 @@ func TestRedisCache(t *testing.T) {
So(err, ShouldNotBeNil)
So(val, ShouldBeEmpty)
})

Convey("Test cache.GetAllBlos()", t, func() {
dir := t.TempDir()

log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)

cacheDriver, err := storage.Create("redis",
cache.RedisDriverParameters{dir, "redis://" + miniRedis.Addr(), true}, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)

name := cacheDriver.Name()
So(name, ShouldEqual, "redis")

err = cacheDriver.PutBlob("digest", path.Join(dir, "first"))
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("digest", path.Join(dir, "second"))
So(err, ShouldBeNil)

err = cacheDriver.PutBlob("digest", path.Join(dir, "third"))
So(err, ShouldBeNil)

blobs, err := cacheDriver.GetAllBlobs("digest")
So(err, ShouldBeNil)

So(blobs, ShouldResemble, []string{"first", "second", "third"})

err = cacheDriver.DeleteBlob("digest", path.Join(dir, "first"))
So(err, ShouldBeNil)

blobs, err = cacheDriver.GetAllBlobs("digest")
So(err, ShouldBeNil)
So(len(blobs), ShouldEqual, 2)
So(blobs, ShouldContain, "second")
So(blobs, ShouldContain, "third")

err = cacheDriver.DeleteBlob("digest", path.Join(dir, "third"))
So(err, ShouldBeNil)

blobs, err = cacheDriver.GetAllBlobs("digest")
So(err, ShouldBeNil)

So(blobs, ShouldResemble, []string{"second"})
})
}

0 comments on commit 14365f1

Please sign in to comment.