Skip to content

Commit

Permalink
Initial cut at porch core refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
liamfallon committed Nov 14, 2024
1 parent 31ee5d5 commit de6212e
Show file tree
Hide file tree
Showing 97 changed files with 1,190 additions and 1,850 deletions.
20 changes: 1 addition & 19 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,6 @@ import (
)

type Cache interface {
OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (CachedRepository, error)
OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error)
CloseRepository(repositorySpec *configapi.Repository, allRepos []configapi.Repository) error
}

type CachedRepository interface {
repository.Repository
RefreshCache(ctx context.Context) error
}

type CachedPackageRevision interface {
repository.PackageRevision
}

type CachedPackageDraft interface {
repository.PackageDraft
}

// Remove?
type CachedPackage interface {
repository.Package
}
3 changes: 1 addition & 2 deletions pkg/cache/fake/objectnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
package fake

import (
"github.com/nephio-project/porch/pkg/meta"
"github.com/nephio-project/porch/pkg/repository"
"k8s.io/apimachinery/pkg/watch"
)

type ObjectNotifier struct{}

func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) int {
func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision) int {
return 0
}
4 changes: 2 additions & 2 deletions pkg/cache/memory/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Cache struct {
var _ cache.Cache = &Cache{}

type objectNotifier interface {
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta) int
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision) int
}

type CacheOptions struct {
Expand Down Expand Up @@ -105,7 +105,7 @@ func getCacheKey(repositorySpec *configapi.Repository) (string, error) {
}
}

func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (cache.CachedRepository, error) {
func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error) {
ctx, span := tracer.Start(ctx, "Cache::OpenRepository", trace.WithAttributes())
defer span.End()

Expand Down
3 changes: 1 addition & 2 deletions pkg/cache/memory/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/google/go-cmp/cmp"
api "github.com/nephio-project/porch/api/porch/v1alpha1"
"github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/pkg/cache"

fakecache "github.com/nephio-project/porch/pkg/cache/fake"
"github.com/nephio-project/porch/pkg/git"
Expand Down Expand Up @@ -216,7 +215,7 @@ func TestDeletePublishedMain(t *testing.T) {

}

func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name string) cache.CachedRepository {
func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name string) repository.Repository {
t.Helper()

tempdir := t.TempDir()
Expand Down
2 changes: 0 additions & 2 deletions pkg/cache/memory/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/nephio-project/porch/api/porch/v1alpha1"
"github.com/nephio-project/porch/pkg/cache"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -29,7 +28,6 @@ type cachedDraft struct {
}

var _ repository.PackageDraft = &cachedDraft{}
var _ cache.CachedPackageDraft = &cachedDraft{}

func (cd *cachedDraft) Close(ctx context.Context, version string) (repository.PackageRevision, error) {
ctx, span := tracer.Start(ctx, "cachedDraft::Close", trace.WithAttributes())
Expand Down
2 changes: 0 additions & 2 deletions pkg/cache/memory/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package memory

import (
"github.com/nephio-project/porch/pkg/cache"
"github.com/nephio-project/porch/pkg/repository"
)

Expand All @@ -26,7 +25,6 @@ import (
// between Git and OCI.

var _ repository.Package = &cachedPackage{}
var _ cache.CachedPackage = &cachedPackage{}

type cachedPackage struct {
repository.Package
Expand Down
53 changes: 43 additions & 10 deletions pkg/cache/memory/packagerevision.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ package memory
import (
"context"

"github.com/nephio-project/porch/api/porch/v1alpha1"
"github.com/nephio-project/porch/pkg/cache"
api "github.com/nephio-project/porch/api/porch/v1alpha1"
"github.com/nephio-project/porch/pkg/repository"
)

Expand All @@ -29,26 +28,60 @@ import (
// between Git and OCI.

var _ repository.PackageRevision = &cachedPackageRevision{}
var _ cache.CachedPackageRevision = &cachedPackageRevision{}

type cachedPackageRevision struct {
repository.PackageRevision
isLatestRevision bool
}

func (c *cachedPackageRevision) GetPackageRevision(ctx context.Context) (*v1alpha1.PackageRevision, error) {
rev, err := c.PackageRevision.GetPackageRevision(ctx)
func (c *cachedPackageRevision) GetPackageRevision(ctx context.Context) (*api.PackageRevision, error) {
apiPR, err := c.PackageRevision.GetPackageRevision(ctx)
if err != nil {
return nil, err
}

apiPR.Annotations = c.GetMeta().Annotations
apiPR.Finalizers = c.GetMeta().Finalizers
apiPR.OwnerReferences = c.GetMeta().OwnerReferences
apiPR.DeletionTimestamp = c.GetMeta().DeletionTimestamp
apiPR.Labels = c.GetMeta().Labels

if c.isLatestRevision {
// copy the labels in case the cached object is being read by another go routine
labels := make(map[string]string, len(rev.Labels))
for k, v := range rev.Labels {
labels := make(map[string]string, len(apiPR.Labels))
for k, v := range apiPR.Labels {
labels[k] = v
}
labels[api.LatestPackageRevisionKey] = api.LatestPackageRevisionValue
apiPR.Labels = labels
}

return apiPR, nil
}

func (c *cachedPackageRevision) GetCachedPackageRevision(ctx context.Context) (*api.PackageRevision, error) {
repoPkgRev, err := c.GetPackageRevision(ctx)
if err != nil {
return nil, err
}
var isLatest bool
if val, found := repoPkgRev.Labels[api.LatestPackageRevisionKey]; found && val == api.LatestPackageRevisionValue {
isLatest = true
}
repoPkgRev.Labels = c.GetMeta().Labels
if isLatest {
// copy the labels in case the cached object is being read by another go routine
labels := make(map[string]string, len(repoPkgRev.Labels))
for k, v := range repoPkgRev.Labels {
labels[k] = v
}
labels[v1alpha1.LatestPackageRevisionKey] = v1alpha1.LatestPackageRevisionValue
rev.Labels = labels
labels[api.LatestPackageRevisionKey] = api.LatestPackageRevisionValue
repoPkgRev.Labels = labels
}
return rev, nil
repoPkgRev.Annotations = c.GetMeta().Annotations
repoPkgRev.Finalizers = c.GetMeta().Finalizers
repoPkgRev.OwnerReferences = c.GetMeta().OwnerReferences
repoPkgRev.DeletionTimestamp = c.GetMeta().DeletionTimestamp

return repoPkgRev, nil
}
38 changes: 9 additions & 29 deletions pkg/cache/memory/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import (

"github.com/nephio-project/porch/api/porch/v1alpha1"
configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/pkg/cache"
"github.com/nephio-project/porch/pkg/git"
"github.com/nephio-project/porch/pkg/meta"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
Expand All @@ -43,7 +42,6 @@ var tracer = otel.Tracer("cache")
// between Git and OCI.

var _ repository.Repository = &cachedRepository{}
var _ cache.CachedRepository = &cachedRepository{}

type cachedRepository struct {
id string
Expand Down Expand Up @@ -85,7 +83,7 @@ func newRepository(id string, repoSpec *configapi.Repository, repo repository.Re
return r
}

func (r *cachedRepository) RefreshCache(ctx context.Context) error {
func (r *cachedRepository) Refresh(ctx context.Context) error {

_, _, err := r.refreshAllCachedPackages(ctx)

Expand Down Expand Up @@ -261,7 +259,7 @@ func (r *cachedRepository) createMainPackageRevision(ctx context.Context, update

// Create the package if it doesn't exist
_, err := r.metadataStore.Get(ctx, pkgRevMetaNN)
if apierrors.IsNotFound(err) {
if errors.IsNotFound(err) {
pkgRevMeta := meta.PackageRevisionMeta{
Name: updatedMain.KubeObjectName(),
Namespace: updatedMain.KubeObjectNamespace(),
Expand Down Expand Up @@ -346,19 +344,15 @@ func (r *cachedRepository) Close() error {
// the repository, so we have to just delete the PackageRevision regardless of any
// finalizers.
klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.id, nn.Namespace, nn.Name)
pkgRevMeta, err := r.metadataStore.Delete(context.TODO(), nn, true)
_, err := r.metadataStore.Delete(context.TODO(), nn, true)
if err != nil {
// There isn't much use in returning an error here, so we just log it
// and create a PackageRevisionMeta with just name and namespace. This
// makes sure that the Delete event is sent.
klog.Warningf("repo %s: error deleting packagerev for %s: %v", r.id, nn.Name, err)
pkgRevMeta = meta.PackageRevisionMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}
}
klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name)
sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr)
}
klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions))
return r.repo.Close()
Expand Down Expand Up @@ -474,7 +468,7 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
Name: prm.Name,
Namespace: prm.Namespace,
}, true); err != nil {
if !apierrors.IsNotFound(err) {
if !errors.IsNotFound(err) {
// This will be retried the next time the sync runs.
klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %v",
r.id, prm.Name, prm.Namespace, err)
Expand All @@ -488,15 +482,11 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
modSent := 0
for kname, newPackage := range newPackageRevisionNames {
oldPackage := oldPackageRevisionNames[kname]
metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()]
if !found {
klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName())
}
if oldPackage == nil {
addSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage)
addSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage)
} else {
if oldPackage.ResourceVersion() != newPackage.ResourceVersion() {
modSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage)
modSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage)
}
}
}
Expand Down Expand Up @@ -529,17 +519,7 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
}
klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT",
r.id, nn.Namespace, nn.Name)
metaPackage, err := r.metadataStore.Delete(ctx, nn, true)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Warningf("repo %s: error deleting PkgRevMeta %s: %v", r.id, nn, err)
}
metaPackage = meta.PackageRevisionMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}
}
delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage)
delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage)
}
}
klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames))
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/memory/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func identifyLatestRevisions(result map[repository.PackageRevisionKey]*cachedPac
latest[currentKey.Package] = current
}
}

// Mark the winners as latest
for _, v := range latest {
v.isLatestRevision = true
Expand Down
17 changes: 0 additions & 17 deletions pkg/engine/doc.go

This file was deleted.

Loading

0 comments on commit de6212e

Please sign in to comment.