Skip to content

Commit

Permalink
Merge pull request nephio-project#128 from Nordix/modularize-cache
Browse files Browse the repository at this point in the history
Modularize cache
  • Loading branch information
nephio-prow[bot] authored Oct 30, 2024
2 parents 5dc3c1d + 4e1752f commit 122f1da
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 215 deletions.
9 changes: 5 additions & 4 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
internalapi "github.com/nephio-project/porch/internal/api/porchinternal/v1alpha1"
"github.com/nephio-project/porch/internal/kpt/fnruntime"
"github.com/nephio-project/porch/pkg/cache"
memorycache "github.com/nephio-project/porch/pkg/cache/memory"
"github.com/nephio-project/porch/pkg/engine"
"github.com/nephio-project/porch/pkg/meta"
"github.com/nephio-project/porch/pkg/registry/porch"
Expand Down Expand Up @@ -92,7 +93,7 @@ type Config struct {
type PorchServer struct {
GenericAPIServer *genericapiserver.GenericAPIServer
coreClient client.WithWatch
cache *cache.Cache
cache cache.Cache
}

type completedConfig struct {
Expand Down Expand Up @@ -225,7 +226,7 @@ func (c completedConfig) New() (*PorchServer, error) {

watcherMgr := engine.NewWatcherManager()

cache := cache.NewCache(c.ExtraConfig.CacheDirectory, c.ExtraConfig.RepoSyncFrequency, c.ExtraConfig.UseGitCaBundle, cache.CacheOptions{
memoryCache := memorycache.NewCache(c.ExtraConfig.CacheDirectory, c.ExtraConfig.RepoSyncFrequency, c.ExtraConfig.UseGitCaBundle, memorycache.CacheOptions{
CredentialResolver: credentialResolver,
UserInfoProvider: userInfoProvider,
MetadataStore: metadataStore,
Expand All @@ -246,7 +247,7 @@ func (c completedConfig) New() (*PorchServer, error) {
}

cad, err := engine.NewCaDEngine(
engine.WithCache(cache),
engine.WithCache(memoryCache),
// The order of registering the function runtimes matters here. When
// evaluating a function, the runtimes will be tried in the same
// order as they are registered.
Expand All @@ -271,7 +272,7 @@ func (c completedConfig) New() (*PorchServer, error) {
s := &PorchServer{
GenericAPIServer: genericServer,
coreClient: coreClient,
cache: cache,
cache: memoryCache,
}

// Install the groups.
Expand Down
211 changes: 15 additions & 196 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,214 +1,33 @@
// Copyright 2022 The kpt and Nephio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"context"
"errors"
"fmt"
"path/filepath"
"sync"
"time"

kptoci "github.com/GoogleContainerTools/kpt/pkg/oci"
configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/pkg/git"
"github.com/nephio-project/porch/pkg/meta"
"github.com/nephio-project/porch/pkg/oci"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/watch"
)

// Cache allows us to keep state for repositories, rather than querying them every time.
//
// Cache Structure:
// <cacheDir>/git/
// * Caches bare git repositories in directories named based on the repository address.
// <cacheDir>/oci/
// * Caches oci images with further hierarchy underneath
// * We Cache image layers in <cacheDir>/oci/layers/ (this might be obsolete with the flattened Cache)
// * We Cache flattened tar files in <cacheDir>/oci/ (so we don't need to pull to read resources)
// * We poll the repositories (every minute) and Cache the discovered images in memory.
type Cache struct {
mutex sync.Mutex
repositories map[string]*cachedRepository
cacheDir string
credentialResolver repository.CredentialResolver
userInfoProvider repository.UserInfoProvider
metadataStore meta.MetadataStore
repoSyncFrequency time.Duration
objectNotifier objectNotifier
useGitCaBundle bool
}

type objectNotifier interface {
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int
type Cache interface {
OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (CachedRepository, error)
CloseRepository(repositorySpec *configapi.Repository, allRepos []configapi.Repository) error
}

type CacheOptions struct {
CredentialResolver repository.CredentialResolver
UserInfoProvider repository.UserInfoProvider
MetadataStore meta.MetadataStore
ObjectNotifier objectNotifier
type CachedRepository interface {
repository.Repository
// TODO: Remove this once https://github.com/nephio-project/porch/pull/119 is merged
repository.FunctionRepository
RefreshCache(ctx context.Context) error
}

func NewCache(cacheDir string, repoSyncFrequency time.Duration, useGitCaBundle bool, opts CacheOptions) *Cache {
return &Cache{
repositories: make(map[string]*cachedRepository),
cacheDir: cacheDir,
credentialResolver: opts.CredentialResolver,
userInfoProvider: opts.UserInfoProvider,
metadataStore: opts.MetadataStore,
objectNotifier: opts.ObjectNotifier,
repoSyncFrequency: repoSyncFrequency,
useGitCaBundle: useGitCaBundle,
}
type CachedPackageRevision interface {
repository.PackageRevision
}

func getCacheKey(repositorySpec *configapi.Repository) (string, error) {
switch repositoryType := repositorySpec.Spec.Type; repositoryType {
case configapi.RepositoryTypeOCI:
ociSpec := repositorySpec.Spec.Oci
if ociSpec == nil {
return "", fmt.Errorf("oci not configured")
}
return "oci://" + ociSpec.Registry, nil

case configapi.RepositoryTypeGit:
gitSpec := repositorySpec.Spec.Git
if gitSpec == nil {
return "", errors.New("git property is required")
}
if gitSpec.Repo == "" {
return "", errors.New("git.repo property is required")
}
return fmt.Sprintf("git://%s/%s@%s/%s", gitSpec.Repo, gitSpec.Directory, repositorySpec.Namespace, repositorySpec.Name), nil

default:
return "", fmt.Errorf("repository type %q not supported", repositoryType)
}
type CachedPackageDraft interface {
repository.PackageDraft
}

func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (*cachedRepository, error) {
ctx, span := tracer.Start(ctx, "Cache::OpenRepository", trace.WithAttributes())
defer span.End()

key, err := getCacheKey(repositorySpec)
if err != nil {
return nil, err
}
c.mutex.Lock()
defer c.mutex.Unlock()
cachedRepo := c.repositories[key]

switch repositoryType := repositorySpec.Spec.Type; repositoryType {
case configapi.RepositoryTypeOCI:
ociSpec := repositorySpec.Spec.Oci
if cachedRepo == nil {
cacheDir := filepath.Join(c.cacheDir, "oci")
storage, err := kptoci.NewStorage(cacheDir)
if err != nil {
return nil, err
}

r, err := oci.OpenRepository(repositorySpec.Name, repositorySpec.Namespace, repositorySpec.Spec.Content, ociSpec, repositorySpec.Spec.Deployment, storage)
if err != nil {
return nil, err
}
cachedRepo = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore, c.repoSyncFrequency)
c.repositories[key] = cachedRepo
}
return cachedRepo, nil

case configapi.RepositoryTypeGit:
gitSpec := repositorySpec.Spec.Git
if !isPackageContent(repositorySpec.Spec.Content) {
return nil, fmt.Errorf("git repository supports Package content only; got %q", string(repositorySpec.Spec.Content))
}
if cachedRepo == nil {
var mbs git.MainBranchStrategy
if gitSpec.CreateBranch {
mbs = git.CreateIfMissing
} else {
mbs = git.ErrorIfMissing
}

r, err := git.OpenRepository(ctx, repositorySpec.Name, repositorySpec.Namespace, gitSpec, repositorySpec.Spec.Deployment, filepath.Join(c.cacheDir, "git"), git.GitRepositoryOptions{
CredentialResolver: c.credentialResolver,
UserInfoProvider: c.userInfoProvider,
MainBranchStrategy: mbs,
UseGitCaBundle: c.useGitCaBundle,
})
if err != nil {
return nil, err
}

cachedRepo = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore, c.repoSyncFrequency)
c.repositories[key] = cachedRepo
} else {
// If there is an error from the background refresh goroutine, return it.
if err := cachedRepo.getRefreshError(); err != nil {
return nil, err
}
}
return cachedRepo, nil

default:
return nil, fmt.Errorf("type %q not supported", repositoryType)
}
}

func isPackageContent(content configapi.RepositoryContent) bool {
return content == configapi.RepositoryContentPackage
}

func (c *Cache) CloseRepository(repositorySpec *configapi.Repository, allRepos []configapi.Repository) error {
key, err := getCacheKey(repositorySpec)
if err != nil {
return err
}

// check if repositorySpec shares the underlying cached repo with another repository
for _, r := range allRepos {
if r.Name == repositorySpec.Name && r.Namespace == repositorySpec.Namespace {
continue
}
otherKey, err := getCacheKey(&r)
if err != nil {
return err
}
if otherKey == key {
// do not close cached repo if it is shared
return nil
}
}

var repository *cachedRepository
{
c.mutex.Lock()
if r, ok := c.repositories[key]; ok {
delete(c.repositories, key)
repository = r
}
c.mutex.Unlock()
}

if repository != nil {
return repository.Close()
} else {
return nil
}
// Remove?
type CachedPackage interface {
repository.Package
}
Loading

0 comments on commit 122f1da

Please sign in to comment.