Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add redis cache support #2378

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
)

require (
github.com/alicebob/miniredis/v2 v2.31.1
github.com/aquasecurity/trivy v0.50.1
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.31.1
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6
Expand All @@ -57,6 +58,7 @@ require (
github.com/notaryproject/notation-go v1.1.0
github.com/opencontainers/distribution-spec/specs-go v0.0.0-20240201174943-0f98d91a0afe
github.com/project-zot/mockoidc v0.0.0-20230307111146-f607b4b5fb97
github.com/redis/go-redis/v9 v9.3.0
github.com/sigstore/cosign/v2 v2.2.3
github.com/swaggo/http-swagger v1.3.4
github.com/zitadel/oidc v1.13.5
Expand Down Expand Up @@ -90,6 +92,7 @@ require (
github.com/Masterminds/squirrel v1.5.4 // indirect
github.com/Microsoft/hcsshim v0.12.0-rc.3 // indirect
github.com/alecthomas/chroma v0.10.0 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 // indirect
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/aquasecurity/table v1.8.0 // indirect
Expand Down Expand Up @@ -208,6 +211,7 @@ require (
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/otel/metric v1.23.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ github.com/CycloneDX/cyclonedx-go v0.8.0 h1:FyWVj6x6hoJrui5uRQdYZcSievw3Z32Z88uY
github.com/CycloneDX/cyclonedx-go v0.8.0/go.mod h1:K2bA+324+Og0X84fA8HhN2X066K7Bxz4rpMQ4ZhjtSk=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DmitriyVTitov/size v1.5.0/go.mod h1:le6rNI4CoLQV1b9gzp1+3d7hMAD/uu2QcJ+aYbNgiU0=
github.com/GoogleCloudPlatform/docker-credential-gcr v2.0.5+incompatible h1:juIaKLLVhqzP55d8x4cSVgwyQv76Z55/fRv/UBr2KkQ=
github.com/GoogleCloudPlatform/docker-credential-gcr v2.0.5+incompatible/go.mod h1:BB1eHdMLYEFuFdBlRMb0N7YGVdM5s6Pt0njxgvfbGGs=
github.com/Intevation/gval v1.3.0 h1:+Ze5sft5MmGbZrHj06NVUbcxCb67l9RaPTLMNr37mjw=
Expand Down Expand Up @@ -482,6 +483,10 @@ github.com/briandowns/spinner v1.23.0 h1:alDF2guRWqa/FOZZYWjlMIx2L6H0wyewPxo/CH4
github.com/briandowns/spinner v1.23.0/go.mod h1:rPG4gmXeN3wQV/TsAY4w8lPdIM6RX3yqeBQJSrbXjuE=
github.com/bshuster-repo/logrus-logstash-hook v1.0.0 h1:e+C0SB5R1pu//O4MQ3f9cFuPGoOVeF2fE4Og9otCc70=
github.com/bshuster-repo/logrus-logstash-hook v1.0.0/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd h1:rFt+Y/IK1aEZkEHchZRSq9OQbsSzIT/OrI8YFFmRIng=
github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8=
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b h1:otBG+dV+YK+Soembjv71DPz3uX/V/6MMlSyD9JBQ6kQ=
Expand Down Expand Up @@ -1301,6 +1306,8 @@ github.com/protocolbuffers/txtpbfmt v0.0.0-20231025115547-084445ff1adf h1:014O62
github.com/protocolbuffers/txtpbfmt v0.0.0-20231025115547-084445ff1adf/go.mod h1:jgxiZysxFPM+iWKwQwPR+y+Jvo54ARd4EisXxKYpB5c=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
Expand Down Expand Up @@ -1763,6 +1770,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func Create(dbtype string, parameters interface{}, log zlog.Logger) (cache.Cache
{
return cache.NewDynamoDBCache(parameters, log)
}
case "redis":
{
return cache.NewRedisCache(parameters, log)
}
default:
{
return nil, zerr.ErrBadConfig
Expand Down
243 changes: 243 additions & 0 deletions pkg/storage/cache/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package cache

import (
"context"
goerrors "errors"
"path/filepath"
"strings"

godigest "github.com/opencontainers/go-digest"
"github.com/redis/go-redis/v9"

zerr "zotregistry.dev/zot/errors"
zlog "zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/storage/constants"
)

type RedisDriver struct {
rootDir string
db redis.UniversalClient
log zlog.Logger
useRelPaths bool // whether or not to use relative paths, should be true for filesystem and false for s3
}

type RedisDriverParameters struct {
RootDir string
URL string // https://github.com/redis/redis-specifications/blob/master/uri/redis.txt
UseRelPaths bool
}

func NewRedisCache(parameters interface{}, log zlog.Logger) (*RedisDriver, error) {
properParameters, ok := parameters.(RedisDriverParameters)
if !ok {
log.Error().Err(zerr.ErrTypeAssertionFailed).Msgf("failed to cast type, expected type '%T' but got '%T'",
BoltDBDriverParameters{}, parameters)

return nil, zerr.ErrTypeAssertionFailed
}

connOpts, err := redis.ParseURL(properParameters.URL)
if err != nil {
log.Error().Err(err).Str("directory", properParameters.URL).Msg("failed to connect to redis")

Check warning on line 41 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L41

Added line #L41 was not covered by tests
}
cacheDB := redis.NewClient(connOpts)

if _, err := cacheDB.Ping(context.Background()).Result(); err != nil {
log.Error().Err(err).Msg("failed to ping redis cache")

Check warning on line 46 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L46

Added line #L46 was not covered by tests

return nil, err

Check warning on line 48 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L48

Added line #L48 was not covered by tests
}

driver := &RedisDriver{
db: cacheDB,
log: log,
rootDir: properParameters.RootDir,
useRelPaths: properParameters.UseRelPaths,
}

return driver, nil
}

func join(xs ...string) string {
return "zot:" + strings.Join(xs, ":")
}

func (d *RedisDriver) UsesRelativePaths() bool {
return d.useRelPaths

Check warning on line 66 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L65-L66

Added lines #L65 - L66 were not covered by tests
}

func (d *RedisDriver) Name() string {
return "redis"
}

func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {
ctx := context.TODO()

if path == "" {
d.log.Error().Err(zerr.ErrEmptyValue).Str("digest", digest.String()).Msg("failed to provide non-empty path")

return zerr.ErrEmptyValue
}

// use only relative (to rootDir) paths on blobs
var err error
if d.useRelPaths {
path, err = filepath.Rel(d.rootDir, path)
if err != nil {
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
}
}

if len(path) == 0 {
return zerr.ErrEmptyValue
}

// see if the blob digest exists.
exists, err := d.db.HExists(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
return err

Check warning on line 98 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L98

Added line #L98 was not covered by tests
}

if _, err := d.db.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
if !exists {
// add the key value pair [digest, path] to blobs:origin if not
// exist already. the path becomes the canonical blob we do this in
// a transaction to make sure that if something is in the set, then
// it is guaranteed to always have a path note that there is a
// race, but the worst case is that a different origin path that is
// still valid is used.
if err := txrp.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), path).Err(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).
Str("value", path).Msg("unable to put record")

Check warning on line 112 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L111-L112

Added lines #L111 - L112 were not covered by tests

return err

Check warning on line 114 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L114

Added line #L114 was not covered by tests
}
}
// add path to the set of paths which the digest represents
if err := d.db.SAdd(ctx, join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String()), path).Err(); err != nil {
d.log.Error().Err(err).Str("sadd", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("value", path).Msg("unable to put record")

Check warning on line 121 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L120-L121

Added lines #L120 - L121 were not covered by tests

return err

Check warning on line 123 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L123

Added line #L123 was not covered by tests
}

return nil
}); err != nil {
return err

Check warning on line 128 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L127-L128

Added lines #L127 - L128 were not covered by tests
}

return nil
}

func (d *RedisDriver) GetBlob(digest godigest.Digest) (string, error) {
ctx := context.TODO()

path, err := d.db.HGet(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
if goerrors.Is(err, redis.Nil) {
return "", zerr.ErrCacheMiss
}

d.log.Error().Err(err).Str("hget", join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")

Check warning on line 144 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L143-L144

Added lines #L143 - L144 were not covered by tests

return "", err

Check warning on line 146 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L146

Added line #L146 was not covered by tests
}

return path, nil
}

func (d *RedisDriver) HasBlob(digest godigest.Digest, blob string) bool {
ctx := context.TODO()
// see if we are in the set
exists, err := d.db.SIsMember(ctx, join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String()), blob).Result()
if err != nil {
d.log.Error().Err(err).Str("sismember", join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("digest", digest.String()).Msg("unable to get record")

Check warning on line 159 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L158-L159

Added lines #L158 - L159 were not covered by tests

return false

Check warning on line 161 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L161

Added line #L161 was not covered by tests
}

if !exists {
return false
}

// see if the path entry exists. is this actually needed? i guess it doesn't really hurt (it is fast)
exists, err = d.db.HExists(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()

d.log.Error().Err(err).Str("hexists", join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")

if err != nil {
return false

Check warning on line 175 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L175

Added line #L175 was not covered by tests
}

if !exists {
return false

Check warning on line 179 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L179

Added line #L179 was not covered by tests
}

return true
}

func (d *RedisDriver) DeleteBlob(digest godigest.Digest, path string) error {
ctx := context.TODO()

// use only relative (to rootDir) paths on blobs
var err error
if d.useRelPaths {
path, err = filepath.Rel(d.rootDir, path)
if err != nil {
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
}
}

pathSet := join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())

// delete path from the set of paths which the digest represents
_, err = d.db.SRem(ctx, pathSet, path).Result()
if err != nil {
d.log.Error().Err(err).Str("srem", pathSet).Str("value", path).Msg("failed to delete record")

Check warning on line 202 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L202

Added line #L202 was not covered by tests

return err

Check warning on line 204 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L204

Added line #L204 was not covered by tests
}

currentPath, err := d.GetBlob(digest)
if err != nil {
return err
}

if currentPath != path {
// nothing we need to do, return nil yay
return nil
}

// we need to set a new path
newPath, err := d.db.SRandMember(ctx, pathSet).Result()
if err != nil {
if goerrors.Is(err, redis.Nil) {
_, err := d.db.HDel(ctx, join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
return err

Check warning on line 223 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L223

Added line #L223 was not covered by tests
}

return nil
}

d.log.Error().Err(err).Str("srandmember", pathSet).Msg("failed to get new path")

Check warning on line 229 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L229

Added line #L229 was not covered by tests

return err

Check warning on line 231 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L231

Added line #L231 was not covered by tests
}

if _, err := d.db.HSet(ctx, join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), newPath).Result(); err != nil {
d.log.Error().Err(err).Str("hset", join(constants.BlobsCache, constants.OriginalBucket)).Str("value", newPath).
Msg("unable to put record")

Check warning on line 237 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L236-L237

Added lines #L236 - L237 were not covered by tests

return err

Check warning on line 239 in pkg/storage/cache/redis.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/cache/redis.go#L239

Added line #L239 was not covered by tests
}

return nil
}
Loading
Loading