Skip to content

Commit

Permalink
feat: add redis cache support
Browse files Browse the repository at this point in the history
Currently, we have dynamoDB as the remote shared cache but ideal only
for the cloud use case.
For on-prem use case, add support for redis.

Signed-off-by: Ramkumar Chinchani <rchincha@cisco.com>
  • Loading branch information
rchincha committed Apr 8, 2024
1 parent a00259c commit 5d8cf64
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 145 deletions.
9 changes: 0 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ require (
golang.org/x/oauth2 v0.18.0
modernc.org/sqlite v1.29.5
oras.land/oras-go/v2 v2.5.0
zotregistry.io/zot v1.4.3
)

require (
Expand Down Expand Up @@ -95,7 +94,6 @@ require (
github.com/alecthomas/chroma v0.10.0 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 // indirect
github.com/apex/log v1.9.0 // indirect
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/aquasecurity/table v1.8.0 // indirect
github.com/aquasecurity/tml v0.6.1 // indirect
Expand Down Expand Up @@ -185,11 +183,8 @@ require (
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/nozzle/throttler v0.0.0-20180817012639-2ea982251481 // indirect
github.com/oleiade/reflections v1.0.1 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/opencontainers/selinux v1.11.0 // indirect
github.com/opencontainers/umoci v0.4.8-0.20210922062158-e60a0cc726e6 // indirect
github.com/openvex/go-vex v0.2.5 // indirect
github.com/oras-project/artifacts-spec v1.0.0-rc.2 // indirect
github.com/owenrumney/go-sarif/v2 v2.3.0 // indirect
github.com/package-url/packageurl-go v0.1.2 // indirect
github.com/pborman/uuid v1.2.1 // indirect
Expand All @@ -198,24 +193,20 @@ require (
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rootless-containers/proto/go-proto v0.0.0-20210921234734-69430b6543fb // indirect
github.com/rubenv/sql-migrate v1.5.2 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sigstore/cosign v1.13.1 // indirect
github.com/sigstore/timestamp-authority v1.2.1 // indirect
github.com/skeema/knownhosts v1.2.1 // indirect
github.com/smarty/assertions v1.15.0 // indirect
github.com/sosodev/duration v1.2.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spdx/tools-golang v0.5.4-0.20231108154018-0c0f394b5e1a // indirect
github.com/tetratelabs/wazero v1.7.0 // indirect
github.com/urfave/cli v1.22.14 // indirect
github.com/urfave/cli/v2 v2.27.1 // indirect
github.com/vbatts/go-mtree v0.5.0 // indirect
github.com/vbauerster/mpb/v8 v8.7.2 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
Expand Down
98 changes: 0 additions & 98 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/storage/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Create(dbtype string, parameters interface{}, log zlog.Logger) (cache.Cache
}
case "redis":
{
return cache.NewRedisCache(parameters, log), nil
return cache.NewRedisCache(parameters, log)
}
default:
{
Expand Down
103 changes: 75 additions & 28 deletions pkg/storage/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
godigest "github.com/opencontainers/go-digest"
"github.com/redis/go-redis/v9"

"zotregistry.io/zot/errors"
zlog "zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage/constants"
zerr "zotregistry.dev/zot/errors"
zlog "zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/storage/constants"
)

type RedisDriver struct {
Expand All @@ -23,33 +23,39 @@ type RedisDriver struct {

type RedisDriverParameters struct {
RootDir string
Url string // https://github.com/redis/redis-specifications/blob/master/uri/redis.txt
URL string // https://github.com/redis/redis-specifications/blob/master/uri/redis.txt
UseRelPaths bool
}

func NewRedisCache(parameters interface{}, log zlog.Logger) Cache {
func NewRedisCache(parameters interface{}, log zlog.Logger) (*RedisDriver, error) {
properParameters, ok := parameters.(RedisDriverParameters)
if !ok {
panic("Failed type assertion")
log.Error().Err(zerr.ErrTypeAssertionFailed).Msgf("failed to cast type, expected type '%T' but got '%T'",
BoltDBDriverParameters{}, parameters)

return nil, zerr.ErrTypeAssertionFailed
}

connOpts, err := redis.ParseURL(properParameters.Url)
connOpts, err := redis.ParseURL(properParameters.URL)
if err != nil {
log.Error().Err(err).Str("directory", properParameters.Url).Msg("unable to connect to redis")
log.Error().Err(err).Str("directory", properParameters.URL).Msg("unable to connect to redis")

Check warning on line 41 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L41

Added line #L41 was not covered by tests
}
cacheDB := redis.NewClient(connOpts)

if _, err := cacheDB.Ping(context.Background()).Result(); err != nil {
log.Error().Err(err).Msg("unable to ping redis cache")

Check warning on line 46 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L46

Added line #L46 was not covered by tests
return nil

return nil, err

Check warning on line 48 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L48

Added line #L48 was not covered by tests
}

return &RedisDriver{
driver := &RedisDriver{
db: cacheDB,
log: log,
rootDir: properParameters.RootDir,
useRelPaths: properParameters.UseRelPaths,
}

return driver, nil
}

func join(xs ...string) string {
Expand All @@ -66,9 +72,11 @@ func (d *RedisDriver) Name() string {

func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {
ctx := context.TODO()

if path == "" {
d.log.Error().Err(errors.ErrEmptyValue).Str("digest", digest.String()).Msg("empty path provided")
return errors.ErrEmptyValue
d.log.Error().Err(zerr.ErrEmptyValue).Str("digest", digest.String()).Msg("empty path provided")

return zerr.ErrEmptyValue
}

// use only relative (to rootDir) paths on blobs
Expand All @@ -79,29 +87,42 @@ func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {
d.log.Error().Err(err).Str("path", path).Msg("unable to get relative path")
}
}

if len(path) == 0 {
return errors.ErrEmptyValue
return zerr.ErrEmptyValue
}

// see if the blob digest exists.
exists, err := d.db.HExists(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
return err

Check warning on line 98 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L98

Added line #L98 was not covered by tests
}
if _, err := d.db.TxPipelined(ctx, func(tx redis.Pipeliner) error {

if _, err := d.db.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
if !exists {
// add the key value pair [digest, path] to blobs:origin if not exist already. the path becomes the canonical blob
// we do this in a transaction to make sure that if something is in the set, then it is guaranteed to always have a path
// note that there is a race, but the worst case is that a different origin path that is still valid is used.
if err := tx.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String(), path).Err(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).Str("value", path).Msg("unable to put record")
// add the key value pair [digest, path] to blobs:origin if not
// exist already. the path becomes the canonical blob we do this in
// a transaction to make sure that if something is in the set, then
// it is guaranteed to always have a path note that there is a
// race, but the worst case is that a different origin path that is
// still valid is used.
if err := txrp.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), path).Err(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).
Str("value", path).Msg("unable to put record")

Check warning on line 112 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L111-L112

Added lines #L111 - L112 were not covered by tests

return err

Check warning on line 114 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L114

Added line #L114 was not covered by tests
}
}
// add path to the set of paths which the digest represents
if err := d.db.SAdd(ctx, join(constants.BlobsCache, constants.DuplicatesBucket, digest.String()), path).Err(); err != nil {
d.log.Error().Err(err).Str("sadd", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).Str("value", path).Msg("unable to put record")
if err := d.db.SAdd(ctx, join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String()), path).Err(); err != nil {
d.log.Error().Err(err).Str("sadd", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("value", path).Msg("unable to put record")

Check warning on line 121 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L120-L121

Added lines #L120 - L121 were not covered by tests

return err

Check warning on line 123 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L123

Added line #L123 was not covered by tests
}

return nil
}); err != nil {
return err

Check warning on line 128 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L127-L128

Added lines #L127 - L128 were not covered by tests
Expand All @@ -112,37 +133,52 @@ func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {

func (d *RedisDriver) GetBlob(digest godigest.Digest) (string, error) {
ctx := context.TODO()

path, err := d.db.HGet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
if goerrors.Is(err, redis.Nil) {
return "", errors.ErrCacheMiss
return "", zerr.ErrCacheMiss
}
d.log.Error().Err(err).Str("hget", join(constants.BlobsCache, constants.OriginalBucket)).Str("digest", digest.String()).Msg("unable to get record")

d.log.Error().Err(err).Str("hget", join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")

Check warning on line 144 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L143-L144

Added lines #L143 - L144 were not covered by tests

return "", err

Check warning on line 146 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L146

Added line #L146 was not covered by tests
}

return path, nil
}

func (d *RedisDriver) HasBlob(digest godigest.Digest, blob string) bool {
ctx := context.TODO()
// see if we are in the set
exists, err := d.db.SIsMember(ctx, join(constants.BlobsCache, constants.DuplicatesBucket, digest.String()), blob).Result()
exists, err := d.db.SIsMember(ctx, join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String()), blob).Result()
if err != nil {
d.log.Error().Err(err).Str("sismember", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).Str("digest", digest.String()).Msg("unable to get record")
d.log.Error().Err(err).Str("sismember", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("digest", digest.String()).Msg("unable to get record")

Check warning on line 159 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L158-L159

Added lines #L158 - L159 were not covered by tests

return false

Check warning on line 161 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L161

Added line #L161 was not covered by tests
}

if !exists {
return false
}

// see if the path entry exists. is this actually needed? i guess it doesn't really hurt (it is fast)
exists, err = d.db.HExists(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
d.log.Error().Err(err).Str("hexists", join(constants.BlobsCache, constants.OriginalBucket)).Str("digest", digest.String()).Msg("unable to get record")

d.log.Error().Err(err).Str("hexists", join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")

if err != nil {
return false

Check warning on line 175 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L175

Added line #L175 was not covered by tests
}

if !exists {
return false

Check warning on line 179 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L179

Added line #L179 was not covered by tests
}

return true
}

Expand All @@ -164,16 +200,20 @@ func (d *RedisDriver) DeleteBlob(digest godigest.Digest, path string) error {
_, err = d.db.SRem(ctx, pathSet, path).Result()
if err != nil {
d.log.Error().Err(err).Str("srem", pathSet).Str("value", path).Msg("unable to delete record")

Check warning on line 202 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L202

Added line #L202 was not covered by tests

return err

Check warning on line 204 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L204

Added line #L204 was not covered by tests
}

currentPath, err := d.GetBlob(digest)
if err != nil {
return err
}

if currentPath != path {
// nothing we need to do, return nil yay
return nil
}

// we need to set a new path
newPath, err := d.db.SRandMember(ctx, pathSet).Result()
if err != nil {
Expand All @@ -182,13 +222,20 @@ func (d *RedisDriver) DeleteBlob(digest godigest.Digest, path string) error {
if err != nil {
return err

Check warning on line 223 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L223

Added line #L223 was not covered by tests
}

return nil
}

d.log.Error().Err(err).Str("srandmember", pathSet).Msg("unable to get new path")

Check warning on line 229 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L229

Added line #L229 was not covered by tests

return err

Check warning on line 231 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L231

Added line #L231 was not covered by tests
}
if _, err := d.db.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String(), newPath).Result(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).Str("value", newPath).Msg("unable to put record")

if _, err := d.db.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), newPath).Result(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).Str("value", newPath).
Msg("unable to put record")

Check warning on line 237 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L236-L237

Added lines #L236 - L237 were not covered by tests

return err

Check warning on line 239 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L239

Added line #L239 was not covered by tests
}

Expand Down
21 changes: 12 additions & 9 deletions pkg/storage/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,26 @@ import (
"github.com/alicebob/miniredis/v2"
. "github.com/smartystreets/goconvey/convey"

"zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/cache"
"zotregistry.dev/zot/errors"
"zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/storage"
"zotregistry.dev/zot/pkg/storage/cache"
)

func TestRedisCache(t *testing.T) {
mr := miniredis.RunT(t)
Convey("Make a new cache", t, func() {
miniRedis := miniredis.RunT(t)

Convey("Make a new cache", t, func() {
dir := t.TempDir()

log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)

So(func() { _, _ = storage.Create("redis", "failTypeAssertion", log) }, ShouldPanic)
_, err := storage.Create("redis", "failTypeAssertion", log)
So(err, ShouldNotBeNil)

cacheDriver, _ := storage.Create("redis", cache.RedisDriverParameters{dir, "redis://" + mr.Addr(), true}, log)
cacheDriver, _ := storage.Create("redis",
cache.RedisDriverParameters{dir, "redis://" + miniRedis.Addr(), true}, log)
So(cacheDriver, ShouldNotBeNil)

name := cacheDriver.Name()
Expand Down Expand Up @@ -61,7 +63,8 @@ func TestRedisCache(t *testing.T) {
So(err, ShouldNotBeNil)
So(err, ShouldEqual, errors.ErrEmptyValue)

cacheDriver, _ = storage.Create("redis", cache.RedisDriverParameters{t.TempDir(), "redis://" + mr.Addr() + "/5", false}, log)
cacheDriver, _ = storage.Create("redis",
cache.RedisDriverParameters{t.TempDir(), "redis://" + miniRedis.Addr() + "/5", false}, log)
So(cacheDriver, ShouldNotBeNil)

err = cacheDriver.PutBlob("key1", "originalBlobPath")
Expand Down
15 changes: 15 additions & 0 deletions test/blackbox/helpers_redis.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ROOT_DIR=$(git rev-parse --show-toplevel)
OS=$(go env GOOS)
ARCH=$(go env GOARCH)
ZOT_MINIMAL_PATH=${ROOT_DIR}/bin/zot-${OS}-${ARCH}-minimal
ZB_PATH=${ROOT_DIR}/bin/zb-${OS}-${ARCH}
TEST_DATA_DIR=${BATS_FILE_TMPDIR}/test/data

function redis_start() {
docker run -d --name redis_server -p 6379:6379 redis
}

function redis_stop() {
docker stop redis_server
docker rm -f redis_server
}
Loading

0 comments on commit 5d8cf64

Please sign in to comment.