diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 0d53f29c..7cb15142 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -228,18 +228,22 @@ func (c completedConfig) New() (*PorchServer, error) { userInfoProvider := &porch.ApiserverUserInfoProvider{} watcherMgr := engine.NewWatcherManager() - - cache, err := cache.CreateCacheImpl( + cacheImpl, err := cache.CreateCacheImpl( context.TODO(), - repoimpltypes.RepoImplOptions{ - LocalDirectory: c.ExtraConfig.CacheDirectory, - RepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency, - UseUserDefinedCaBundle: c.ExtraConfig.UseUserDefinedCaBundle, - CredentialResolver: credentialResolver, - UserInfoProvider: userInfoProvider, - MetadataStore: metadataStore, - RepoPRNotifier: watcherMgr, + cachetypes.CacheOptions{ + RepoImplOptions: repoimpltypes.RepoImplOptions{ + LocalDirectory: c.ExtraConfig.CacheDirectory, + UseUserDefinedCaBundle: c.ExtraConfig.UseUserDefinedCaBundle, + CredentialResolver: credentialResolver, + UserInfoProvider: userInfoProvider, + }, + RepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency, + MetadataStore: metadataStore, + RepoPRChangeNotifier: watcherMgr, }) + if err != nil { + return nil, fmt.Errorf("failed to creeate repository cache: %w", err) + } runnerOptionsResolver := func(namespace string) fnruntime.RunnerOptions { runnerOptions := fnruntime.RunnerOptions{} @@ -249,7 +253,7 @@ func (c completedConfig) New() (*PorchServer, error) { } cad, err := engine.NewCaDEngine( - engine.WithCache(cache), + engine.WithCache(cacheImpl), // The order of registering the function runtimes matters here. When // evaluating a function, the runtimes will be tried in the same // order as they are registered. @@ -274,7 +278,7 @@ func (c completedConfig) New() (*PorchServer, error) { s := &PorchServer{ GenericAPIServer: genericServer, coreClient: coreClient, - cache: cache, + cache: cacheImpl, // Set background job periodic frequency the same as repo sync frequency. PeriodicRepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency, } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 2b730f64..854bcdd2 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -17,16 +17,15 @@ package cache import ( "context" - memorycache "github.com/nephio-project/porch/pkg/cache/memory" + memorycache "github.com/nephio-project/porch/pkg/cache/memorycache" cachetypes "github.com/nephio-project/porch/pkg/cache/types" - repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" ) var tracer = otel.Tracer("cache") -func CreateCacheImpl(ctx context.Context, options repoimpltypes.RepoImplOptions) (cachetypes.Cache, error) { +func CreateCacheImpl(ctx context.Context, options cachetypes.CacheOptions) (cachetypes.Cache, error) { ctx, span := tracer.Start(ctx, "Repository::RepositoryFactory", trace.WithAttributes()) defer span.End() diff --git a/pkg/cache/memory/cache.go b/pkg/cache/memorycache/cache.go similarity index 91% rename from pkg/cache/memory/cache.go rename to pkg/cache/memorycache/cache.go index f5c5e00c..c8edb943 100644 --- a/pkg/cache/memory/cache.go +++ b/pkg/cache/memorycache/cache.go @@ -16,12 +16,12 @@ package memorycache import ( "context" + "errors" "sync" configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1" cachetypes "github.com/nephio-project/porch/pkg/cache/types" "github.com/nephio-project/porch/pkg/repoimpl" - repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types" "github.com/nephio-project/porch/pkg/repository" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -42,7 +42,7 @@ var tracer = otel.Tracer("memorycache") type Cache struct { mutex sync.Mutex repositories map[string]*cachedRepository - options repoimpltypes.RepoImplOptions + options cachetypes.CacheOptions } var _ cachetypes.Cache = &Cache{} @@ -68,7 +68,7 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re } } - repoImpl, err := repoimpl.CreateRepositoryImpl(ctx, repositorySpec, c.options) + repoImpl, err := repoimpl.CreateRepositoryImpl(ctx, repositorySpec, c.options.RepoImplOptions) if err != nil { return nil, err } @@ -79,6 +79,10 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re return cachedRepo, nil } +func (c *Cache) UpdateRepository(ctx context.Context, repositorySpec *configapi.Repository) error { + return errors.New("update on memory cached repositories is not supported") +} + func (c *Cache) CloseRepository(ctx context.Context, repositorySpec *configapi.Repository, allRepos []configapi.Repository) error { _, span := tracer.Start(ctx, "Cache::CloseRepository", trace.WithAttributes()) defer span.End() @@ -128,11 +132,3 @@ func (c *Cache) GetRepositories(ctx context.Context) []configapi.Repository { } return repoSlice } - -func (c *Cache) GetRepository(ctx context.Context, repositoryName string) *configapi.Repository { - if repo := c.repositories[repositoryName]; repo != nil { - return repo.repoSpec - } else { - return nil - } -} diff --git a/pkg/cache/memory/cache_test.go b/pkg/cache/memorycache/cache_test.go similarity index 95% rename from pkg/cache/memory/cache_test.go rename to pkg/cache/memorycache/cache_test.go index 23dbaab6..5b7f77bf 100644 --- a/pkg/cache/memory/cache_test.go +++ b/pkg/cache/memorycache/cache_test.go @@ -27,6 +27,7 @@ import ( "github.com/nephio-project/porch/api/porchconfig/v1alpha1" fakecache "github.com/nephio-project/porch/pkg/cache/fake" + cachetypes "github.com/nephio-project/porch/pkg/cache/types" "github.com/nephio-project/porch/pkg/meta" fakemeta "github.com/nephio-project/porch/pkg/meta/fake" "github.com/nephio-project/porch/pkg/repoimpl/git" @@ -224,13 +225,15 @@ func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name _, address := git.ServeGitRepository(t, tarfile, tempdir) metadataStore := createMetadataStoreFromArchive(t, fmt.Sprintf("%s-metadata.yaml", name), name) - cache, _ := new(MemoryCacheFactory).NewCache(ctx, repoimpltypes.RepoImplOptions{ - LocalDirectory: t.TempDir(), - RepoSyncFrequency: 60 * time.Second, - UseUserDefinedCaBundle: true, - MetadataStore: metadataStore, - RepoPRNotifier: &fakecache.ObjectNotifier{}, - CredentialResolver: &fakecache.CredentialResolver{}, + cache, _ := new(MemoryCacheFactory).NewCache(ctx, cachetypes.CacheOptions{ + RepoImplOptions: repoimpltypes.RepoImplOptions{ + LocalDirectory: t.TempDir(), + UseUserDefinedCaBundle: true, + CredentialResolver: &fakecache.CredentialResolver{}, + }, + RepoSyncFrequency: 60 * time.Second, + MetadataStore: metadataStore, + RepoPRChangeNotifier: &fakecache.ObjectNotifier{}, }) apiRepo := &v1alpha1.Repository{ TypeMeta: metav1.TypeMeta{ diff --git a/pkg/cache/memory/memorycachefactory.go b/pkg/cache/memorycache/memorycachefactory.go similarity index 88% rename from pkg/cache/memory/memorycachefactory.go rename to pkg/cache/memorycache/memorycachefactory.go index 72a5ac8d..7e45813d 100644 --- a/pkg/cache/memory/memorycachefactory.go +++ b/pkg/cache/memorycache/memorycachefactory.go @@ -18,7 +18,6 @@ import ( "context" cachetypes "github.com/nephio-project/porch/pkg/cache/types" - repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types" ) var _ cachetypes.CacheFactory = &MemoryCacheFactory{} @@ -26,7 +25,7 @@ var _ cachetypes.CacheFactory = &MemoryCacheFactory{} type MemoryCacheFactory struct { } -func (f *MemoryCacheFactory) NewCache(_ context.Context, options repoimpltypes.RepoImplOptions) (cachetypes.Cache, error) { +func (f *MemoryCacheFactory) NewCache(_ context.Context, options cachetypes.CacheOptions) (cachetypes.Cache, error) { return &Cache{ repositories: make(map[string]*cachedRepository), options: options, diff --git a/pkg/cache/memory/package.go b/pkg/cache/memorycache/package.go similarity index 100% rename from pkg/cache/memory/package.go rename to pkg/cache/memorycache/package.go diff --git a/pkg/cache/memory/packagerevision.go b/pkg/cache/memorycache/packagerevision.go similarity index 100% rename from pkg/cache/memory/packagerevision.go rename to pkg/cache/memorycache/packagerevision.go diff --git a/pkg/cache/memory/repository.go b/pkg/cache/memorycache/repository.go similarity index 97% rename from pkg/cache/memory/repository.go rename to pkg/cache/memorycache/repository.go index ee7f91da..969b8a90 100644 --- a/pkg/cache/memory/repository.go +++ b/pkg/cache/memorycache/repository.go @@ -22,7 +22,7 @@ import ( "github.com/nephio-project/porch/api/porch/v1alpha1" configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1" - repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types" + cachetypes "github.com/nephio-project/porch/pkg/cache/types" "github.com/nephio-project/porch/pkg/repository" "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/api/errors" @@ -57,10 +57,10 @@ type cachedRepository struct { // This is returned back by the cache to the background goroutine when it calls periodicall to resync repositories. refreshRevisionsError error - options repoimpltypes.RepoImplOptions + options cachetypes.CacheOptions } -func newRepository(id string, repoSpec *configapi.Repository, repo repository.Repository, options repoimpltypes.RepoImplOptions) *cachedRepository { +func newRepository(id string, repoSpec *configapi.Repository, repo repository.Repository, options cachetypes.CacheOptions) *cachedRepository { ctx, cancel := context.WithCancel(context.Background()) r := &cachedRepository{ id: id, @@ -372,7 +372,7 @@ func (r *cachedRepository) Close() error { klog.Warningf("repo %s: error deleting packagerev for %s: %v", r.id, nn.Name, err) } klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name) - sent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Deleted, pr) + sent += r.options.RepoPRChangeNotifier.NotifyPackageRevisionChange(watch.Deleted, pr) } klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions)) return r.repo.Close() @@ -505,10 +505,10 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re for kname, newPackage := range newPackageRevisionNames { oldPackage := oldPackageRevisionNames[kname] if oldPackage == nil { - addSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Added, newPackage) + addSent += r.options.RepoPRChangeNotifier.NotifyPackageRevisionChange(watch.Added, newPackage) } else { if oldPackage.ResourceVersion() != newPackage.ResourceVersion() { - modSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage) + modSent += r.options.RepoPRChangeNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage) } } } @@ -541,7 +541,7 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re } klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT", r.id, nn.Namespace, nn.Name) - delSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage) + delSent += r.options.RepoPRChangeNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage) } } klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames)) diff --git a/pkg/cache/memory/util.go b/pkg/cache/memorycache/util.go similarity index 100% rename from pkg/cache/memory/util.go rename to pkg/cache/memorycache/util.go diff --git a/pkg/cache/types/cachetypes.go b/pkg/cache/types/cachetypes.go index 678e0c15..e824cca9 100644 --- a/pkg/cache/types/cachetypes.go +++ b/pkg/cache/types/cachetypes.go @@ -16,19 +16,35 @@ package cachetypes import ( "context" + "time" configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1" + "github.com/nephio-project/porch/pkg/meta" repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types" "github.com/nephio-project/porch/pkg/repository" + "k8s.io/apimachinery/pkg/watch" ) +type CacheOptions struct { + RepoImplOptions repoimpltypes.RepoImplOptions + RepoSyncFrequency time.Duration + MetadataStore meta.MetadataStore + RepoPRChangeNotifier RepoPRChangeNotifier + Driver string + DataSource string +} + type Cache interface { OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error) CloseRepository(ctx context.Context, repositorySpec *configapi.Repository, allRepos []configapi.Repository) error GetRepositories(ctx context.Context) []configapi.Repository - GetRepository(ctx context.Context, repositoryName string) *configapi.Repository + UpdateRepository(ctx context.Context, repositorySpec *configapi.Repository) error } type CacheFactory interface { - NewCache(ctx context.Context, options repoimpltypes.RepoImplOptions) (Cache, error) + NewCache(ctx context.Context, options CacheOptions) (Cache, error) +} + +type RepoPRChangeNotifier interface { + NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision) int } diff --git a/pkg/repoimpl/types/repoimpltypes.go b/pkg/repoimpl/types/repoimpltypes.go index 8af947f2..b8fd920c 100644 --- a/pkg/repoimpl/types/repoimpltypes.go +++ b/pkg/repoimpl/types/repoimpltypes.go @@ -16,12 +16,9 @@ package repoimpltypes import ( "context" - "time" configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1" - "github.com/nephio-project/porch/pkg/meta" "github.com/nephio-project/porch/pkg/repository" - "k8s.io/apimachinery/pkg/watch" ) type RepoImplFactory interface { @@ -30,14 +27,7 @@ type RepoImplFactory interface { type RepoImplOptions struct { LocalDirectory string - RepoSyncFrequency time.Duration UseUserDefinedCaBundle bool CredentialResolver repository.CredentialResolver UserInfoProvider repository.UserInfoProvider - MetadataStore meta.MetadataStore - RepoPRNotifier RepoPRNotifier -} - -type RepoPRNotifier interface { - NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision) int } diff --git a/pkg/repository/repository.go b/pkg/repository/repository.go index dfb8cf3b..6417003f 100644 --- a/pkg/repository/repository.go +++ b/pkg/repository/repository.go @@ -31,21 +31,62 @@ type PackageResources struct { } type PackageRevisionKey struct { - Repository, Package, Revision string - WorkspaceName v1alpha1.WorkspaceName + Namespace, Repository, Package, Revision string + WorkspaceName v1alpha1.WorkspaceName } func (n PackageRevisionKey) String() string { - return fmt.Sprintf("Repository: %q, Package: %q, Revision: %q, WorkspaceName: %q", - n.Repository, n.Package, n.Revision, string(n.WorkspaceName)) + return fmt.Sprintf("%s.%s.%s.v%s.%s", n.Namespace, n.Repository, n.Package, n.Revision, string(n.WorkspaceName)) +} + +func (n PackageRevisionKey) NonNSString() string { + return fmt.Sprintf("%s.%s.v%s.%s", n.Repository, n.Package, n.Revision, string(n.WorkspaceName)) +} + +func (n PackageRevisionKey) PackageKey() PackageKey { + return PackageKey{ + Namespace: n.Namespace, + Repository: n.Repository, + Package: n.Package, + } +} + +func (n PackageRevisionKey) RepositoryKey() RepositoryKey { + return RepositoryKey{ + Namespace: n.Namespace, + Repository: n.Repository, + } } type PackageKey struct { - Repository, Package string + Namespace, Repository, Package string } func (n PackageKey) String() string { - return fmt.Sprintf("Repository: %q, Package: %q", n.Repository, n.Package) + return fmt.Sprintf("%s.%s.%s", n.Namespace, n.Repository, n.Package) +} + +func (n PackageKey) NonNSString() string { + return fmt.Sprintf("%s.%s", n.Repository, n.Package) +} + +func (n PackageKey) RepositoryKey() RepositoryKey { + return RepositoryKey{ + Namespace: n.Namespace, + Repository: n.Repository, + } +} + +type RepositoryKey struct { + Namespace, Repository string +} + +func (n RepositoryKey) String() string { + return fmt.Sprintf("%s.%s", n.Namespace, n.Repository) +} + +func (n RepositoryKey) NonNSString() string { + return n.Repository } // PackageRevision is an abstract package version.