Skip to content

Commit

Permalink
Create a fctory for hiding implementations of cache from non-cache code
Browse files Browse the repository at this point in the history
  • Loading branch information
liamfallon committed Jan 15, 2025
1 parent beab57c commit 8cad732
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 60 deletions.
28 changes: 16 additions & 12 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,22 @@ func (c completedConfig) New() (*PorchServer, error) {
userInfoProvider := &porch.ApiserverUserInfoProvider{}

watcherMgr := engine.NewWatcherManager()

cache, err := cache.CreateCacheImpl(
cacheImpl, err := cache.CreateCacheImpl(
context.TODO(),
repoimpltypes.RepoImplOptions{
LocalDirectory: c.ExtraConfig.CacheDirectory,
RepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency,
UseUserDefinedCaBundle: c.ExtraConfig.UseUserDefinedCaBundle,
CredentialResolver: credentialResolver,
UserInfoProvider: userInfoProvider,
MetadataStore: metadataStore,
RepoPRNotifier: watcherMgr,
cachetypes.CacheOptions{
RepoImplOptions: repoimpltypes.RepoImplOptions{
LocalDirectory: c.ExtraConfig.CacheDirectory,
UseUserDefinedCaBundle: c.ExtraConfig.UseUserDefinedCaBundle,
CredentialResolver: credentialResolver,
UserInfoProvider: userInfoProvider,
},
RepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency,
MetadataStore: metadataStore,
RepoPRChangeNotifier: watcherMgr,
})
if err != nil {
return nil, fmt.Errorf("failed to creeate repository cache: %w", err)
}

runnerOptionsResolver := func(namespace string) fnruntime.RunnerOptions {
runnerOptions := fnruntime.RunnerOptions{}
Expand All @@ -249,7 +253,7 @@ func (c completedConfig) New() (*PorchServer, error) {
}

cad, err := engine.NewCaDEngine(
engine.WithCache(cache),
engine.WithCache(cacheImpl),
// The order of registering the function runtimes matters here. When
// evaluating a function, the runtimes will be tried in the same
// order as they are registered.
Expand All @@ -274,7 +278,7 @@ func (c completedConfig) New() (*PorchServer, error) {
s := &PorchServer{
GenericAPIServer: genericServer,
coreClient: coreClient,
cache: cache,
cache: cacheImpl,
// Set background job periodic frequency the same as repo sync frequency.
PeriodicRepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency,
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ package cache
import (
"context"

memorycache "github.com/nephio-project/porch/pkg/cache/memory"
memorycache "github.com/nephio-project/porch/pkg/cache/memorycache"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var tracer = otel.Tracer("cache")

func CreateCacheImpl(ctx context.Context, options repoimpltypes.RepoImplOptions) (cachetypes.Cache, error) {
func CreateCacheImpl(ctx context.Context, options cachetypes.CacheOptions) (cachetypes.Cache, error) {
ctx, span := tracer.Start(ctx, "Repository::RepositoryFactory", trace.WithAttributes())
defer span.End()

Expand Down
18 changes: 7 additions & 11 deletions pkg/cache/memory/cache.go → pkg/cache/memorycache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package memorycache

import (
"context"
"errors"
"sync"

configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
"github.com/nephio-project/porch/pkg/repoimpl"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
Expand All @@ -42,7 +42,7 @@ var tracer = otel.Tracer("memorycache")
type Cache struct {
mutex sync.Mutex
repositories map[string]*cachedRepository
options repoimpltypes.RepoImplOptions
options cachetypes.CacheOptions
}

var _ cachetypes.Cache = &Cache{}
Expand All @@ -68,7 +68,7 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re
}
}

repoImpl, err := repoimpl.CreateRepositoryImpl(ctx, repositorySpec, c.options)
repoImpl, err := repoimpl.CreateRepositoryImpl(ctx, repositorySpec, c.options.RepoImplOptions)
if err != nil {
return nil, err
}
Expand All @@ -79,6 +79,10 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re
return cachedRepo, nil
}

func (c *Cache) UpdateRepository(ctx context.Context, repositorySpec *configapi.Repository) error {
return errors.New("update on memory cached repositories is not supported")
}

func (c *Cache) CloseRepository(ctx context.Context, repositorySpec *configapi.Repository, allRepos []configapi.Repository) error {
_, span := tracer.Start(ctx, "Cache::CloseRepository", trace.WithAttributes())
defer span.End()
Expand Down Expand Up @@ -128,11 +132,3 @@ func (c *Cache) GetRepositories(ctx context.Context) []configapi.Repository {
}
return repoSlice
}

func (c *Cache) GetRepository(ctx context.Context, repositoryName string) *configapi.Repository {
if repo := c.repositories[repositoryName]; repo != nil {
return repo.repoSpec
} else {
return nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/nephio-project/porch/api/porchconfig/v1alpha1"

fakecache "github.com/nephio-project/porch/pkg/cache/fake"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
"github.com/nephio-project/porch/pkg/meta"
fakemeta "github.com/nephio-project/porch/pkg/meta/fake"
"github.com/nephio-project/porch/pkg/repoimpl/git"
Expand Down Expand Up @@ -224,13 +225,15 @@ func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name
_, address := git.ServeGitRepository(t, tarfile, tempdir)
metadataStore := createMetadataStoreFromArchive(t, fmt.Sprintf("%s-metadata.yaml", name), name)

cache, _ := new(MemoryCacheFactory).NewCache(ctx, repoimpltypes.RepoImplOptions{
LocalDirectory: t.TempDir(),
RepoSyncFrequency: 60 * time.Second,
UseUserDefinedCaBundle: true,
MetadataStore: metadataStore,
RepoPRNotifier: &fakecache.ObjectNotifier{},
CredentialResolver: &fakecache.CredentialResolver{},
cache, _ := new(MemoryCacheFactory).NewCache(ctx, cachetypes.CacheOptions{
RepoImplOptions: repoimpltypes.RepoImplOptions{
LocalDirectory: t.TempDir(),
UseUserDefinedCaBundle: true,
CredentialResolver: &fakecache.CredentialResolver{},
},
RepoSyncFrequency: 60 * time.Second,
MetadataStore: metadataStore,
RepoPRChangeNotifier: &fakecache.ObjectNotifier{},
})
apiRepo := &v1alpha1.Repository{
TypeMeta: metav1.TypeMeta{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ import (
"context"

cachetypes "github.com/nephio-project/porch/pkg/cache/types"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
)

var _ cachetypes.CacheFactory = &MemoryCacheFactory{}

type MemoryCacheFactory struct {
}

func (f *MemoryCacheFactory) NewCache(_ context.Context, options repoimpltypes.RepoImplOptions) (cachetypes.Cache, error) {
func (f *MemoryCacheFactory) NewCache(_ context.Context, options cachetypes.CacheOptions) (cachetypes.Cache, error) {
return &Cache{
repositories: make(map[string]*cachedRepository),
options: options,
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/nephio-project/porch/api/porch/v1alpha1"
configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -57,10 +57,10 @@ type cachedRepository struct {
// This is returned back by the cache to the background goroutine when it calls periodicall to resync repositories.
refreshRevisionsError error

options repoimpltypes.RepoImplOptions
options cachetypes.CacheOptions
}

func newRepository(id string, repoSpec *configapi.Repository, repo repository.Repository, options repoimpltypes.RepoImplOptions) *cachedRepository {
func newRepository(id string, repoSpec *configapi.Repository, repo repository.Repository, options cachetypes.CacheOptions) *cachedRepository {
ctx, cancel := context.WithCancel(context.Background())
r := &cachedRepository{
id: id,
Expand Down Expand Up @@ -372,7 +372,7 @@ func (r *cachedRepository) Close() error {
klog.Warningf("repo %s: error deleting packagerev for %s: %v", r.id, nn.Name, err)
}
klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name)
sent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Deleted, pr)
sent += r.options.RepoPRChangeNotifier.NotifyPackageRevisionChange(watch.Deleted, pr)
}
klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions))
return r.repo.Close()
Expand Down Expand Up @@ -505,10 +505,10 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
for kname, newPackage := range newPackageRevisionNames {
oldPackage := oldPackageRevisionNames[kname]
if oldPackage == nil {
addSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Added, newPackage)
addSent += r.options.RepoPRChangeNotifier.NotifyPackageRevisionChange(watch.Added, newPackage)
} else {
if oldPackage.ResourceVersion() != newPackage.ResourceVersion() {
modSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage)
modSent += r.options.RepoPRChangeNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage)
}
}
}
Expand Down Expand Up @@ -541,7 +541,7 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
}
klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT",
r.id, nn.Namespace, nn.Name)
delSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage)
delSent += r.options.RepoPRChangeNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage)
}
}
klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames))
Expand Down
File renamed without changes.
20 changes: 18 additions & 2 deletions pkg/cache/types/cachetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,35 @@ package cachetypes

import (
"context"
"time"

configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/pkg/meta"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
"github.com/nephio-project/porch/pkg/repository"
"k8s.io/apimachinery/pkg/watch"
)

type CacheOptions struct {
RepoImplOptions repoimpltypes.RepoImplOptions
RepoSyncFrequency time.Duration
MetadataStore meta.MetadataStore
RepoPRChangeNotifier RepoPRChangeNotifier
Driver string
DataSource string
}

type Cache interface {
OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error)
CloseRepository(ctx context.Context, repositorySpec *configapi.Repository, allRepos []configapi.Repository) error
GetRepositories(ctx context.Context) []configapi.Repository
GetRepository(ctx context.Context, repositoryName string) *configapi.Repository
UpdateRepository(ctx context.Context, repositorySpec *configapi.Repository) error
}

type CacheFactory interface {
NewCache(ctx context.Context, options repoimpltypes.RepoImplOptions) (Cache, error)
NewCache(ctx context.Context, options CacheOptions) (Cache, error)
}

type RepoPRChangeNotifier interface {
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision) int
}
10 changes: 0 additions & 10 deletions pkg/repoimpl/types/repoimpltypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ package repoimpltypes

import (
"context"
"time"

configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/pkg/meta"
"github.com/nephio-project/porch/pkg/repository"
"k8s.io/apimachinery/pkg/watch"
)

type RepoImplFactory interface {
Expand All @@ -30,14 +27,7 @@ type RepoImplFactory interface {

type RepoImplOptions struct {
LocalDirectory string
RepoSyncFrequency time.Duration
UseUserDefinedCaBundle bool
CredentialResolver repository.CredentialResolver
UserInfoProvider repository.UserInfoProvider
MetadataStore meta.MetadataStore
RepoPRNotifier RepoPRNotifier
}

type RepoPRNotifier interface {
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision) int
}
53 changes: 47 additions & 6 deletions pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,62 @@ type PackageResources struct {
}

type PackageRevisionKey struct {
Repository, Package, Revision string
WorkspaceName v1alpha1.WorkspaceName
Namespace, Repository, Package, Revision string
WorkspaceName v1alpha1.WorkspaceName
}

func (n PackageRevisionKey) String() string {
return fmt.Sprintf("Repository: %q, Package: %q, Revision: %q, WorkspaceName: %q",
n.Repository, n.Package, n.Revision, string(n.WorkspaceName))
return fmt.Sprintf("%s.%s.%s.v%s.%s", n.Namespace, n.Repository, n.Package, n.Revision, string(n.WorkspaceName))
}

func (n PackageRevisionKey) NonNSString() string {
return fmt.Sprintf("%s.%s.v%s.%s", n.Repository, n.Package, n.Revision, string(n.WorkspaceName))
}

func (n PackageRevisionKey) PackageKey() PackageKey {
return PackageKey{
Namespace: n.Namespace,
Repository: n.Repository,
Package: n.Package,
}
}

func (n PackageRevisionKey) RepositoryKey() RepositoryKey {
return RepositoryKey{
Namespace: n.Namespace,
Repository: n.Repository,
}
}

type PackageKey struct {
Repository, Package string
Namespace, Repository, Package string
}

func (n PackageKey) String() string {
return fmt.Sprintf("Repository: %q, Package: %q", n.Repository, n.Package)
return fmt.Sprintf("%s.%s.%s", n.Namespace, n.Repository, n.Package)
}

func (n PackageKey) NonNSString() string {
return fmt.Sprintf("%s.%s", n.Repository, n.Package)
}

func (n PackageKey) RepositoryKey() RepositoryKey {
return RepositoryKey{
Namespace: n.Namespace,
Repository: n.Repository,
}
}

type RepositoryKey struct {
Namespace, Repository string
}

func (n RepositoryKey) String() string {
return fmt.Sprintf("%s.%s", n.Namespace, n.Repository)
}

func (n RepositoryKey) NonNSString() string {
return n.Repository
}

// PackageRevision is an abstract package version.
Expand Down

0 comments on commit 8cad732

Please sign in to comment.