Skip to content

Commit

Permalink
Updated to introduce cache factory
Browse files Browse the repository at this point in the history
  • Loading branch information
liamfallon committed Jan 15, 2025
1 parent b9d483c commit beab57c
Show file tree
Hide file tree
Showing 20 changed files with 264 additions and 109 deletions.
2 changes: 1 addition & 1 deletion api/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 15 additions & 13 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
internalapi "github.com/nephio-project/porch/internal/api/porchinternal/v1alpha1"
"github.com/nephio-project/porch/internal/kpt/fnruntime"
"github.com/nephio-project/porch/pkg/cache"
memorycache "github.com/nephio-project/porch/pkg/cache/memory"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
"github.com/nephio-project/porch/pkg/engine"
"github.com/nephio-project/porch/pkg/meta"
"github.com/nephio-project/porch/pkg/registry/porch"
Expand Down Expand Up @@ -95,7 +95,7 @@ type Config struct {
type PorchServer struct {
GenericAPIServer *genericapiserver.GenericAPIServer
coreClient client.WithWatch
cache cache.Cache
cache cachetypes.Cache
PeriodicRepoSyncFrequency time.Duration
}

Expand Down Expand Up @@ -229,15 +229,17 @@ func (c completedConfig) New() (*PorchServer, error) {

watcherMgr := engine.NewWatcherManager()

memoryCache := memorycache.NewCache(repoimpltypes.RepoImplOptions{
LocalDirectory: c.ExtraConfig.CacheDirectory,
RepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency,
UseUserDefinedCaBundle: c.ExtraConfig.UseUserDefinedCaBundle,
CredentialResolver: credentialResolver,
UserInfoProvider: userInfoProvider,
MetadataStore: metadataStore,
ObjectNotifier: watcherMgr,
})
cache, err := cache.CreateCacheImpl(

Check failure on line 232 in pkg/apiserver/apiserver.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
context.TODO(),
repoimpltypes.RepoImplOptions{
LocalDirectory: c.ExtraConfig.CacheDirectory,
RepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency,
UseUserDefinedCaBundle: c.ExtraConfig.UseUserDefinedCaBundle,
CredentialResolver: credentialResolver,
UserInfoProvider: userInfoProvider,
MetadataStore: metadataStore,
RepoPRNotifier: watcherMgr,
})

runnerOptionsResolver := func(namespace string) fnruntime.RunnerOptions {
runnerOptions := fnruntime.RunnerOptions{}
Expand All @@ -247,7 +249,7 @@ func (c completedConfig) New() (*PorchServer, error) {
}

cad, err := engine.NewCaDEngine(
engine.WithCache(memoryCache),
engine.WithCache(cache),
// The order of registering the function runtimes matters here. When
// evaluating a function, the runtimes will be tried in the same
// order as they are registered.
Expand All @@ -272,7 +274,7 @@ func (c completedConfig) New() (*PorchServer, error) {
s := &PorchServer{
GenericAPIServer: genericServer,
coreClient: coreClient,
cache: memoryCache,
cache: cache,
// Set background job periodic frequency the same as repo sync frequency.
PeriodicRepoSyncFrequency: c.ExtraConfig.RepoSyncFrequency,
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@ package cache
import (
"context"

configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/pkg/repository"
memorycache "github.com/nephio-project/porch/pkg/cache/memory"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

type Cache interface {
OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error)
CloseRepository(ctx context.Context, repositorySpec *configapi.Repository, allRepos []configapi.Repository) error
var tracer = otel.Tracer("cache")

func CreateCacheImpl(ctx context.Context, options repoimpltypes.RepoImplOptions) (cachetypes.Cache, error) {
ctx, span := tracer.Start(ctx, "Repository::RepositoryFactory", trace.WithAttributes())
defer span.End()

var cacheFactory = new(memorycache.MemoryCacheFactory)
return cacheFactory.NewCache(ctx, options)
}
35 changes: 24 additions & 11 deletions pkg/cache/memory/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memory
package memorycache

import (
"context"
"sync"

configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/pkg/cache"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
"github.com/nephio-project/porch/pkg/repoimpl"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var tracer = otel.Tracer("memorycache")

// Cache allows us to keep state for repositories, rather than querying them every time.
//
// Cache Structure:
Expand All @@ -42,14 +45,7 @@ type Cache struct {
options repoimpltypes.RepoImplOptions
}

var _ cache.Cache = &Cache{}

func NewCache(options repoimpltypes.RepoImplOptions) *Cache {
return &Cache{
repositories: make(map[string]*cachedRepository),
options: options,
}
}
var _ cachetypes.Cache = &Cache{}

func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error) {
ctx, span := tracer.Start(ctx, "Cache::OpenRepository", trace.WithAttributes())
Expand All @@ -72,7 +68,7 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re
}
}

repoImpl, err := repoimpl.RepositoryFactory(ctx, repositorySpec, c.options)
repoImpl, err := repoimpl.CreateRepositoryImpl(ctx, repositorySpec, c.options)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -123,3 +119,20 @@ func (c *Cache) CloseRepository(ctx context.Context, repositorySpec *configapi.R
return nil
}
}

func (c *Cache) GetRepositories(ctx context.Context) []configapi.Repository {
repoSlice := []configapi.Repository{}

for _, repo := range c.repositories {
repoSlice = append(repoSlice, *repo.repoSpec)
}
return repoSlice
}

func (c *Cache) GetRepository(ctx context.Context, repositoryName string) *configapi.Repository {
if repo := c.repositories[repositoryName]; repo != nil {
return repo.repoSpec
} else {
return nil
}
}
8 changes: 4 additions & 4 deletions pkg/cache/memory/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memory
package memorycache

import (
"context"
Expand Down Expand Up @@ -224,12 +224,12 @@ func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name
_, address := git.ServeGitRepository(t, tarfile, tempdir)
metadataStore := createMetadataStoreFromArchive(t, fmt.Sprintf("%s-metadata.yaml", name), name)

cache := NewCache(repoimpltypes.RepoImplOptions{
cache, _ := new(MemoryCacheFactory).NewCache(ctx, repoimpltypes.RepoImplOptions{
LocalDirectory: t.TempDir(),
RepoSyncFrequency: 60 * time.Second,
UseUserDefinedCaBundle: true,
MetadataStore: metadataStore,
ObjectNotifier: &fakecache.ObjectNotifier{},
RepoPRNotifier: &fakecache.ObjectNotifier{},
CredentialResolver: &fakecache.CredentialResolver{},
})
apiRepo := &v1alpha1.Repository{
Expand Down Expand Up @@ -259,7 +259,7 @@ func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name
if err != nil {
t.Errorf("CloseRepository(%q) failed: %v", address, err)
}
if len(cache.repositories) != 0 {
if len(cache.GetRepositories(ctx)) != 0 {
t.Errorf("CloseRepository hasn't deleted repository from cache")
}
})
Expand Down
34 changes: 34 additions & 0 deletions pkg/cache/memory/memorycachefactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2025 The kpt and Nephio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memorycache

import (
"context"

cachetypes "github.com/nephio-project/porch/pkg/cache/types"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
)

var _ cachetypes.CacheFactory = &MemoryCacheFactory{}

type MemoryCacheFactory struct {
}

func (f *MemoryCacheFactory) NewCache(_ context.Context, options repoimpltypes.RepoImplOptions) (cachetypes.Cache, error) {
return &Cache{
repositories: make(map[string]*cachedRepository),
options: options,
}, nil
}
2 changes: 1 addition & 1 deletion pkg/cache/memory/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memory
package memorycache

import (
"github.com/nephio-project/porch/pkg/repository"
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/memory/packagerevision.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memory
package memorycache

import (
"context"
Expand Down
13 changes: 5 additions & 8 deletions pkg/cache/memory/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memory
package memorycache

import (
"context"
Expand All @@ -24,7 +24,6 @@ import (
configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,8 +32,6 @@ import (
"k8s.io/klog/v2"
)

var tracer = otel.Tracer("cache")

// We take advantage of the cache having a global view of all the packages
// in a repository and compute the latest package revision in the cache
// rather than add another level of caching in the repositories themselves.
Expand Down Expand Up @@ -375,7 +372,7 @@ func (r *cachedRepository) Close() error {
klog.Warningf("repo %s: error deleting packagerev for %s: %v", r.id, nn.Name, err)
}
klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name)
sent += r.options.ObjectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr)
sent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Deleted, pr)
}
klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions))
return r.repo.Close()
Expand Down Expand Up @@ -508,10 +505,10 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
for kname, newPackage := range newPackageRevisionNames {
oldPackage := oldPackageRevisionNames[kname]
if oldPackage == nil {
addSent += r.options.ObjectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage)
addSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Added, newPackage)
} else {
if oldPackage.ResourceVersion() != newPackage.ResourceVersion() {
modSent += r.options.ObjectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage)
modSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage)
}
}
}
Expand Down Expand Up @@ -544,7 +541,7 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
}
klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT",
r.id, nn.Namespace, nn.Name)
delSent += r.options.ObjectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage)
delSent += r.options.RepoPRNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage)
}
}
klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames))
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/memory/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memory
package memorycache

import (
"sort"
Expand Down
34 changes: 34 additions & 0 deletions pkg/cache/types/cachetypes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2025 The kpt and Nephio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachetypes

import (
"context"

configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
repoimpltypes "github.com/nephio-project/porch/pkg/repoimpl/types"
"github.com/nephio-project/porch/pkg/repository"
)

type Cache interface {
OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error)
CloseRepository(ctx context.Context, repositorySpec *configapi.Repository, allRepos []configapi.Repository) error
GetRepositories(ctx context.Context) []configapi.Repository
GetRepository(ctx context.Context, repositoryName string) *configapi.Repository
}

type CacheFactory interface {
NewCache(ctx context.Context, options repoimpltypes.RepoImplOptions) (Cache, error)
}
4 changes: 2 additions & 2 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

api "github.com/nephio-project/porch/api/porch/v1alpha1"
configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
cache "github.com/nephio-project/porch/pkg/cache"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
"github.com/nephio-project/porch/pkg/meta"
"github.com/nephio-project/porch/pkg/repository"
"github.com/nephio-project/porch/pkg/task"
Expand Down Expand Up @@ -73,7 +73,7 @@ func NewCaDEngine(opts ...EngineOption) (CaDEngine, error) {
}

type cadEngine struct {
cache cache.Cache
cache cachetypes.Cache

userInfoProvider repository.UserInfoProvider
metadataStore meta.MetadataStore
Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"fmt"

"github.com/nephio-project/porch/internal/kpt/fnruntime"
"github.com/nephio-project/porch/pkg/cache"
cachetypes "github.com/nephio-project/porch/pkg/cache/types"
"github.com/nephio-project/porch/pkg/kpt"
"github.com/nephio-project/porch/pkg/kpt/fn"
"github.com/nephio-project/porch/pkg/meta"
Expand All @@ -38,7 +38,7 @@ func (f EngineOptionFunc) apply(engine *cadEngine) error {
return f(engine)
}

func WithCache(cache cache.Cache) EngineOption {
func WithCache(cache cachetypes.Cache) EngineOption {
return EngineOptionFunc(func(engine *cadEngine) error {
engine.cache = cache
return nil
Expand Down
Loading

0 comments on commit beab57c

Please sign in to comment.