diff --git a/pkg/cache/memory/draft.go b/pkg/cache/memory/draft.go index 1a9066a1..70bcaa23 100644 --- a/pkg/cache/memory/draft.go +++ b/pkg/cache/memory/draft.go @@ -16,6 +16,7 @@ package memory import ( "context" + "fmt" "github.com/nephio-project/porch/api/porch/v1alpha1" "github.com/nephio-project/porch/pkg/cache" @@ -38,8 +39,9 @@ func (cd *cachedDraft) Close(ctx context.Context, version string) (repository.Pa if err != nil { return nil, err } + if v != cd.cache.lastVersion { - _, _, err = cd.cache.refreshAllCachedPackages(ctx) + err = cd.cache.reconcileCache(ctx, "draft-version") if err != nil { return nil, err } @@ -64,9 +66,20 @@ func (cd *cachedDraft) Close(ctx context.Context, version string) (repository.Pa return nil, err } - if closed, err := cd.PackageDraft.Close(ctx, nextVersion); err != nil { + closed, err := cd.PackageDraft.Close(ctx, nextVersion) + if err != nil { return nil, err - } else { - return cd.cache.update(ctx, closed) } + + err = cd.cache.reconcileCache(ctx, "close-draft") + if err != nil { + return nil, err + } + + cpr := cd.cache.getPackageRevision(closed.Key()) + if cpr == nil { + return nil, fmt.Errorf("closed draft not found") + } + + return cpr, nil } diff --git a/pkg/cache/memory/repository.go b/pkg/cache/memory/repository.go index 432daeb4..1d8d610f 100644 --- a/pkg/cache/memory/repository.go +++ b/pkg/cache/memory/repository.go @@ -56,9 +56,26 @@ type cachedRepository struct { lastVersion string - mutex sync.Mutex + // We use separate mutexes for cache map changes and for the overall + // reconcile process. We want update, delete, and reconcile + // to all block on the reconcileMutex, which could be held for a long time + // during reconcile. For much of that time (during reconcile) we do NOT + // want to block reads. There are a few protected areas where we touch map + // entries where we need to block reads, so those will also grab the general + // mutex. + // + // Any code that needs to hold both locks MUST get the reconcileMutex first, + // or we could end up with deadlocks + mutex sync.RWMutex + reconcileMutex sync.Mutex + cachedPackageRevisions map[repository.PackageRevisionKey]*cachedPackageRevision - cachedPackages map[repository.PackageKey]*cachedPackage + + // not ideal but this is another cache, used by the underlying storage to avoid + // reloading. Would be best to combine these somehow, but not doing that now. + // Eventual CRD-based redesign should make this entire repo cache obsolete + packageRevisionCache repository.PackageRevisionCache + // Error encountered on repository refresh by the refresh goroutine. // This is returned back by the cache to the background goroutine when it calls periodicall to resync repositories. refreshRevisionsError error @@ -86,9 +103,13 @@ func newRepository(id string, repoSpec *configapi.Repository, repo repository.Re return r } +func (r *cachedRepository) nn() string { + return r.repoSpec.Namespace + "/" + r.repoSpec.Name +} + func (r *cachedRepository) RefreshCache(ctx context.Context) error { - _, _, err := r.refreshAllCachedPackages(ctx) + err := r.reconcileCache(ctx, "repo-refresh-cache") return err } @@ -98,7 +119,7 @@ func (r *cachedRepository) Version(ctx context.Context) (string, error) { } func (r *cachedRepository) ListPackageRevisions(ctx context.Context, filter repository.ListPackageRevisionFilter) ([]repository.PackageRevision, error) { - packages, err := r.getPackageRevisions(ctx, filter, false) + packages, err := r.getPackageRevisions(ctx, filter) if err != nil { return nil, err } @@ -106,21 +127,22 @@ func (r *cachedRepository) ListPackageRevisions(ctx context.Context, filter repo return packages, nil } -func (r *cachedRepository) getRefreshError() error { - r.mutex.Lock() - defer r.mutex.Unlock() +func (r *cachedRepository) getPackageRevision(key repository.PackageRevisionKey) *cachedPackageRevision { + r.mutex.RLock() + defer r.mutex.RUnlock() - // TODO: This should also check r.refreshPkgsError when - // the package resource is fully supported. + cpr, _ := r.cachedPackageRevisions[key] + return cpr +} +func (r *cachedRepository) getRefreshError() error { + r.mutex.RLock() + defer r.mutex.RUnlock() return r.refreshRevisionsError } -func (r *cachedRepository) getPackageRevisions(ctx context.Context, filter repository.ListPackageRevisionFilter, forceRefresh bool) ([]repository.PackageRevision, error) { - r.mutex.Lock() - defer r.mutex.Unlock() - - _, packageRevisions, err := r.getCachedPackages(ctx, forceRefresh) +func (r *cachedRepository) getPackageRevisions(ctx context.Context, filter repository.ListPackageRevisionFilter) ([]repository.PackageRevision, error) { + packageRevisions, err := r.getCachedPackageRevisions(ctx) if err != nil { return nil, err } @@ -128,44 +150,42 @@ func (r *cachedRepository) getPackageRevisions(ctx context.Context, filter repos return toPackageRevisionSlice(packageRevisions, filter), nil } -func (r *cachedRepository) getPackages(ctx context.Context, filter repository.ListPackageFilter, forceRefresh bool) ([]repository.Package, error) { - r.mutex.Lock() - defer r.mutex.Unlock() - - packages, _, err := r.getCachedPackages(ctx, forceRefresh) +// getCachedPackageRevisions returns the cache contents, blocking until +// the cache is loaded +// caller must NOT hold the lock +// returned *map* is a copy and can be operated on without locks +// map entries are NOT copies and should not be modified +func (r *cachedRepository) getCachedPackageRevisions(ctx context.Context) (map[repository.PackageRevisionKey]*cachedPackageRevision, error) { + err := r.blockUntilLoaded(ctx) if err != nil { return nil, err } - return toPackageSlice(packages, filter), nil + r.mutex.RLock() + defer r.mutex.RUnlock() + + packageRevisions := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(r.cachedPackageRevisions)) + for k, v := range r.cachedPackageRevisions { + packageRevisions[k] = v + } + + return packageRevisions, r.refreshRevisionsError } -// getCachedPackages returns cachedPackages; fetching it if not cached or if forceRefresh. -// mutex must be held. -func (r *cachedRepository) getCachedPackages(ctx context.Context, forceRefresh bool) (map[repository.PackageKey]*cachedPackage, map[repository.PackageRevisionKey]*cachedPackageRevision, error) { - // must hold mutex - packages := r.cachedPackages - packageRevisions := r.cachedPackageRevisions - err := r.refreshRevisionsError - - if forceRefresh { - packages = nil - packageRevisions = nil - - if gitRepo, isGitRepo := r.repo.(git.GitRepository); isGitRepo { - // TODO: Figure out a way to do this without the cache layer - // needing to know what type of repo we are working with. - if err := gitRepo.UpdateDeletionProposedCache(); err != nil { - return nil, nil, err +// blocks waiting until the cache is loaded +func (r *cachedRepository) blockUntilLoaded(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("repo %s: stopped waiting for load because context is done: %v", r.nn(), ctx.Err()) + default: + r.mutex.RLock() + if r.cachedPackageRevisions != nil { + r.mutex.RUnlock() + return nil } } } - - if packages == nil { - packages, packageRevisions, err = r.refreshAllCachedPackages(ctx) - } - - return packages, packageRevisions, err } func (r *cachedRepository) CreatePackageRevision(ctx context.Context, obj *v1alpha1.PackageRevision) (repository.PackageDraft, error) { @@ -194,52 +214,6 @@ func (r *cachedRepository) UpdatePackageRevision(ctx context.Context, old reposi }, nil } -func (r *cachedRepository) update(ctx context.Context, updated repository.PackageRevision) (*cachedPackageRevision, error) { - r.mutex.Lock() - defer r.mutex.Unlock() - - // TODO: Technically we only need this package, not all packages - if _, _, err := r.getCachedPackages(ctx, false); err != nil { - klog.Warningf("failed to get cached packages: %v", err) - // TODO: Invalidate all watches? We're dropping an add/update event - return nil, err - } - - k := updated.Key() - // previous := r.cachedPackageRevisions[k] - - if v1alpha1.LifecycleIsPublished(updated.Lifecycle()) { - oldKey := repository.PackageRevisionKey{ - Repository: k.Repository, - Package: k.Package, - WorkspaceName: k.WorkspaceName, - } - if _, ok := r.cachedPackageRevisions[oldKey]; ok { - delete(r.cachedPackageRevisions, oldKey) - } - } - - cached := &cachedPackageRevision{PackageRevision: updated} - r.cachedPackageRevisions[k] = cached - - // Recompute latest package revisions. - identifyLatestRevisions(r.cachedPackageRevisions) - - // Create the main package revision - if v1alpha1.LifecycleIsPublished(updated.Lifecycle()) { - updatedMain := updated.ToMainPackageRevision() - r.createMainPackageRevision(ctx, updatedMain) - } else { - version, err := r.repo.Version(ctx) - if err != nil { - return nil, err - } - r.lastVersion = version - } - - return cached, nil -} - func (r *cachedRepository) createMainPackageRevision(ctx context.Context, updatedMain repository.PackageRevision) error { //Search and delete any old main pkgRev of an older workspace in the cache @@ -291,52 +265,35 @@ func (r *cachedRepository) DeletePackageRevision(ctx context.Context, old reposi return err } - r.mutex.Lock() - if r.cachedPackages != nil { - k := old.Key() - // previous := r.cachedPackages[k] - delete(r.cachedPackageRevisions, k) - - // Recompute latest package revisions. - // TODO: Only for affected object / key? - identifyLatestRevisions(r.cachedPackageRevisions) + // reconciliation is faster now, so force it immediately + if err := r.reconcileCache(ctx, "delete"); err != nil { + klog.Warningf("error reconciling cache after deleting %v in %s: %v", unwrapped.Key(), r.nn(), err) } - r.mutex.Unlock() - return nil } func (r *cachedRepository) ListPackages(ctx context.Context, filter repository.ListPackageFilter) ([]repository.Package, error) { - packages, err := r.getPackages(ctx, filter, false) - if err != nil { - return nil, err - } - - return packages, nil + return nil, fmt.Errorf("not implemented") } func (r *cachedRepository) CreatePackage(ctx context.Context, obj *v1alpha1.PorchPackage) (repository.Package, error) { - klog.Infoln("cachedRepository::CreatePackage") - return r.repo.CreatePackage(ctx, obj) + return nil, fmt.Errorf("not implemented") } func (r *cachedRepository) DeletePackage(ctx context.Context, old repository.Package) error { - // Unwrap - unwrapped := old.(*cachedPackage).Package - if err := r.repo.DeletePackage(ctx, unwrapped); err != nil { - return err - } - - // TODO: Do something more efficient than a full cache flush - r.flush() - - return nil + return fmt.Errorf("not implemented") } func (r *cachedRepository) Close() error { r.cancel() + r.reconcileMutex.Lock() + defer r.reconcileMutex.Unlock() + + r.mutex.Lock() + defer r.mutex.Unlock() + // Make sure that watch events are sent for packagerevisions that are // removed as part of closing the repository. sent := 0 @@ -348,12 +305,15 @@ func (r *cachedRepository) Close() error { // There isn't really any correct way to handle finalizers here. We are removing // the repository, so we have to just delete the PackageRevision regardless of any // finalizers. - klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.id, nn.Namespace, nn.Name) + klog.Infof("repo %s: deleting packagerev %s/%s because repository is closed", r.nn(), nn.Namespace, nn.Name) pkgRevMeta, err := r.metadataStore.Delete(context.TODO(), nn, true) if err != nil { // There isn't much use in returning an error here, so we just log it // and create a PackageRevisionMeta with just name and namespace. This // makes sure that the Delete event is sent. + if !apierrors.IsNotFound(err) { + klog.Warningf("Error deleting PackageRev CR %s/%s: %s", nn.Namespace, nn.Name, err) + } klog.Warningf("repo %s: error deleting packagerev for %s: %v", r.id, nn.Name, err) pkgRevMeta = meta.PackageRevisionMeta{ Name: nn.Name, @@ -363,7 +323,7 @@ func (r *cachedRepository) Close() error { klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name) sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta) } - klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions)) + klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.nn(), sent, len(r.cachedPackageRevisions)) return r.repo.Close() } @@ -373,7 +333,7 @@ func (r *cachedRepository) pollForever(ctx context.Context, repoSyncFrequency ti for { select { case <-ctx.Done(): - klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.id, ctx.Err()) + klog.V(2).Infof("repo %s: exiting repository poller, because context is done: %v", r.nn(), ctx.Err()) return default: r.pollOnce(ctx) @@ -383,74 +343,76 @@ func (r *cachedRepository) pollForever(ctx context.Context, repoSyncFrequency ti } func (r *cachedRepository) pollOnce(ctx context.Context) { - start := time.Now() - klog.Infof("repo %s: poll started", r.id) - defer func() { klog.Infof("repo %s: poll finished in %f secs", r.id, time.Since(start).Seconds()) }() ctx, span := tracer.Start(ctx, "Repository::pollOnce", trace.WithAttributes()) defer span.End() - if _, err := r.getPackageRevisions(ctx, repository.ListPackageRevisionFilter{}, true); err != nil { - klog.Warningf("error polling repo packages %s: %v", r.id, err) + if err := r.reconcileCache(ctx, "poll"); err != nil { + klog.Warningf("error polling repo packages %s: %v", r.nn(), err) } - // TODO: Uncomment when package resources are fully supported - //if _, err := r.getPackages(ctx, repository.ListPackageRevisionFilter{}, true); err != nil { - // klog.Warningf("error polling repo packages %s: %v", r.id, err) - //} -} - -func (r *cachedRepository) flush() { - r.mutex.Lock() - defer r.mutex.Unlock() - - r.cachedPackageRevisions = nil - r.cachedPackages = nil } -// refreshAllCachedPackages updates the cached map for this repository with all the newPackages, -// it also triggers notifications for all package changes. -// mutex must be held. -func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[repository.PackageKey]*cachedPackage, map[repository.PackageRevisionKey]*cachedPackageRevision, error) { - // TODO: Avoid simultaneous fetches? - // TODO: Push-down partial refresh? - +// reconcileCache updates the cached map for this repository +// it also triggers notifications for all package changes +// caller must NOT hold any locks +func (r *cachedRepository) reconcileCache(ctx context.Context, reason string) error { start := time.Now() - defer func() { klog.Infof("repo %s: refresh finished in %f secs", r.id, time.Since(start).Seconds()) }() + defer func() { + klog.Infof("repo %s: reconcile for %s finished in %f secs", r.nn(), reason, time.Since(start).Seconds()) + }() curVer, err := r.Version(ctx) if err != nil { - return nil, nil, err + return err } if curVer == r.lastVersion { - return r.cachedPackages, r.cachedPackageRevisions, nil + return nil + } + + // get the reconcile lock first, to block any repo-level mutations + r.reconcileMutex.Lock() + defer r.reconcileMutex.Unlock() + + if gitRepo, isGitRepo := r.repo.(git.GitRepository); isGitRepo { + // TODO: Figure out a way to do this without the cache layer + // needing to know what type of repo we are working with. + if err := gitRepo.UpdateDeletionProposedCache(); err != nil { + return err + } } - // Look up all existing PackageRevCRs so we an compare those to the - // actual Packagerevisions found in git/oci, and add/prune PackageRevCRs + // Look up all existing PackageRevCRs so we can compare those to the + // actual PackageRevisions found in git/oci, and add/prune PackageRevCRs // as necessary. existingPkgRevCRs, err := r.metadataStore.List(ctx, r.repoSpec) if err != nil { - return nil, nil, err + return err } + // Create a map so we can quickly check if a specific PackageRevisionMeta exists. - existingPkgRevCRsMap := make(map[string]meta.PackageRevisionMeta) + pkgRevCRsMap := make(map[string]meta.PackageRevisionMeta) for i := range existingPkgRevCRs { pr := existingPkgRevCRs[i] - existingPkgRevCRsMap[pr.Name] = pr + pkgRevCRsMap[pr.Name] = pr } - // TODO: Can we avoid holding the lock for the ListPackageRevisions / identifyLatestRevisions section? - newPackageRevisions, err := r.repo.ListPackageRevisions(ctx, repository.ListPackageRevisionFilter{}) + ctxWithCache := repository.ContextWithPackageRevisionCache(ctx, r.packageRevisionCache) + newPackageRevisions, err := r.repo.ListPackageRevisions(ctxWithCache, repository.ListPackageRevisionFilter{}) if err != nil { - return nil, nil, fmt.Errorf("error listing packages: %w", err) + return fmt.Errorf("error listing packages: %w", err) } // Build mapping from kubeObjectName to PackageRevisions for new PackageRevisions. + // and also recreate packageRevisionCache + prc := make(repository.PackageRevisionCache, len(newPackageRevisions)) newPackageRevisionNames := make(map[string]*cachedPackageRevision, len(newPackageRevisions)) for _, newPackage := range newPackageRevisions { + cid := newPackage.CachedIdentifier() + prc[cid.Key] = repository.PackageRevisionCacheEntry{Version: cid.Version, PackageRevision: newPackage} + kname := newPackage.KubeObjectName() if newPackageRevisionNames[kname] != nil { - klog.Warningf("repo %s: found duplicate packages with name %v", r.repo, kname) + klog.Warningf("repo %s: found duplicate packages with name %v", r.nn(), kname) } pkgRev := &cachedPackageRevision{ @@ -461,92 +423,60 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re } // Build mapping from kubeObjectName to PackageRevisions for existing PackageRevisions + // Grab the RLock while we create this map + r.mutex.RLock() oldPackageRevisionNames := make(map[string]*cachedPackageRevision, len(r.cachedPackageRevisions)) for _, oldPackage := range r.cachedPackageRevisions { oldPackageRevisionNames[oldPackage.KubeObjectName()] = oldPackage } + r.mutex.RUnlock() + + addMeta := 0 + delMeta := 0 + // We go through all PackageRev CRs that represents PackageRevisions // in the current repo and make sure they all have a corresponding // PackageRevision. The ones that doesn't is removed. for _, prm := range existingPkgRevCRs { if _, found := newPackageRevisionNames[prm.Name]; !found { - klog.Infof("repo %s: deleting PackageRev %s/%s because parent PackageRevision was not found", - r.id, prm.Namespace, prm.Name) + delMeta += 1 if _, err := r.metadataStore.Delete(ctx, types.NamespacedName{ Name: prm.Name, Namespace: prm.Namespace, }, true); err != nil { if !apierrors.IsNotFound(err) { // This will be retried the next time the sync runs. - klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %v", - r.id, prm.Name, prm.Namespace, err) + klog.Warningf("repo %s: unable to delete PackageRev CR for %s/%s: %w", + r.nn(), prm.Name, prm.Namespace, err) } } } } - // Send notification for packages that changed before the creation of PkgRev to avoid race conditions because of ownerReferences. - addSent := 0 - modSent := 0 - for kname, newPackage := range newPackageRevisionNames { - oldPackage := oldPackageRevisionNames[kname] - metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()] - if !found { - klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName()) - } - if oldPackage == nil { - addSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage) - } else { - if oldPackage.ResourceVersion() != newPackage.ResourceVersion() { - modSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage) - } - } - } - // We go through all the PackageRevisions and make sure they have // a corresponding PackageRev CR. for pkgRevName, pkgRev := range newPackageRevisionNames { - if _, found := existingPkgRevCRsMap[pkgRevName]; !found { + if _, found := pkgRevCRsMap[pkgRevName]; !found { pkgRevMeta := meta.PackageRevisionMeta{ Name: pkgRevName, Namespace: r.repoSpec.Namespace, } - if _, err := r.metadataStore.Create(ctx, pkgRevMeta, r.repoSpec.Name, pkgRev.UID()); err != nil { + addMeta += 1 + if created, err := r.metadataStore.Create(ctx, pkgRevMeta, r.repoSpec.Name, pkgRev.UID()); err != nil { // TODO: We should try to find a way to make these errors available through // either the repository CR or the PackageRevision CR. This will be // retried on the next sync. - klog.Warningf("unable to create PackageRev CR for %s/%s: %v", + klog.Warningf("unable to create PackageRev CR for %s/%s: %w", r.repoSpec.Namespace, pkgRevName, err) + } else { + // add to the cache for notifications later + pkgRevCRsMap[pkgRevName] = created } } } - delSent := 0 - // Send notifications for packages that was deleted in the SoT - for kname, oldPackage := range oldPackageRevisionNames { - if newPackageRevisionNames[kname] == nil { - nn := types.NamespacedName{ - Name: oldPackage.KubeObjectName(), - Namespace: oldPackage.KubeObjectNamespace(), - } - klog.Infof("repo %s: deleting PackageRev %s/%s because PackageRevision was removed from SoT", - r.id, nn.Namespace, nn.Name) - metaPackage, err := r.metadataStore.Delete(ctx, nn, true) - if err != nil { - if !apierrors.IsNotFound(err) { - klog.Warningf("repo %s: error deleting PkgRevMeta %s: %v", r.id, nn, err) - } - metaPackage = meta.PackageRevisionMeta{ - Name: nn.Name, - Namespace: nn.Namespace, - } - } - delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage) - } - } - klog.Infof("repo %s: addSent %d, modSent %d, delSent for %d old and %d new repo packages", r.id, addSent, modSent, len(oldPackageRevisionNames), len(newPackageRevisionNames)) - + // fix up the isLatestRevision in the new maps newPackageRevisionMap := make(map[repository.PackageRevisionKey]*cachedPackageRevision, len(newPackageRevisions)) for _, newPackage := range newPackageRevisions { k := newPackage.Key() @@ -559,21 +489,46 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re identifyLatestRevisions(newPackageRevisionMap) - newPackageMap := make(map[repository.PackageKey]*cachedPackage) + // hold the RW lock while swap in the new packages + // we do this now, *before* sending notifications, so that + // anyone responding to the notification will get the new values + r.mutex.Lock() + r.cachedPackageRevisions = newPackageRevisionMap + r.packageRevisionCache = prc + r.lastVersion = curVer + r.mutex.Unlock() - for _, newPackageRevision := range newPackageRevisionMap { - if !newPackageRevision.isLatestRevision { - continue + // Send notification for packages that changed. + addSent := 0 + modSent := 0 + for kname, newPackage := range newPackageRevisionNames { + oldPackage := oldPackageRevisionNames[kname] + metaPackage, found := pkgRevCRsMap[newPackage.KubeObjectName()] + if !found { + // should never happen + klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName()) + } + if oldPackage == nil { + addSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage) + } else { + if oldPackage.ResourceVersion() != newPackage.ResourceVersion() { + modSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage) + } } - // TODO: Build package? - // newPackage := &cachedPackage{ - // } - // newPackageMap[newPackage.Key()] = newPackage } - r.cachedPackageRevisions = newPackageRevisionMap - r.cachedPackages = newPackageMap - r.lastVersion = curVer - - return newPackageMap, newPackageRevisionMap, nil + delSent := 0 + // Send notifications for packages that were deleted in the SoT + for kname, oldPackage := range oldPackageRevisionNames { + if newPackageRevisionNames[kname] == nil { + metaPackage := meta.PackageRevisionMeta{ + Name: oldPackage.KubeObjectName(), + Namespace: oldPackage.KubeObjectNamespace(), + } + delSent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage) + } + } + klog.Infof("repo %s: addMeta %d, delMeta %d, addSent %d, modSent %d, delSent %d for %d in-cache and %d in-storage package revisions", + r.nn(), addMeta, delMeta, addSent, modSent, delSent, len(oldPackageRevisionNames), len(newPackageRevisionNames)) + return nil } diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 671df1f1..076d63c4 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -295,7 +295,7 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj * return nil, fmt.Errorf("error listing package revisions: %w", err) } - if err := ensureUniqueWorkspaceName(obj, revs); err != nil { + if err := ensureUniqueWorkspaceName(repositoryObj, obj, revs); err != nil { return nil, err } @@ -338,9 +338,20 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj * } // The workspaceName must be unique, because it used to generate the package revision's metadata.name. -func ensureUniqueWorkspaceName(obj *api.PackageRevision, existingRevs []repository.PackageRevision) error { +func ensureUniqueWorkspaceName(repositoryObj *configapi.Repository, obj *api.PackageRevision, existingRevs []repository.PackageRevision) error { + // HACK + // It's ok for the "main" revision to have the same workspace name + // So ignore main revisions in this calculation + mainRev := "" + if repositoryObj.Spec.Git != nil { + mainRev = repositoryObj.Spec.Git.Branch + } + for _, r := range existingRevs { k := r.Key() + if mainRev != "" && k.Revision == mainRev { + continue + } if k.WorkspaceName == obj.Spec.WorkspaceName { return fmt.Errorf("package revision workspaceNames must be unique; package revision with name %s in repo %s with "+ "workspaceName %s already exists", obj.Spec.PackageName, obj.Spec.RepositoryName, obj.Spec.WorkspaceName) diff --git a/pkg/engine/fake/packagerevision.go b/pkg/engine/fake/packagerevision.go index 713c0b99..568d9685 100644 --- a/pkg/engine/fake/packagerevision.go +++ b/pkg/engine/fake/packagerevision.go @@ -35,6 +35,10 @@ type PackageRevision struct { Kptfile kptfile.KptFile } +func (pr *PackageRevision) CachedIdentifier() repository.CachedIdentifier { + return repository.CachedIdentifier{Key: pr.Key().String(), Version: pr.Key().Revision} +} + func (pr *PackageRevision) KubeObjectName() string { return pr.Name } diff --git a/pkg/engine/watchermanager.go b/pkg/engine/watchermanager.go index 5a2d74eb..af7c193f 100644 --- a/pkg/engine/watchermanager.go +++ b/pkg/engine/watchermanager.go @@ -65,6 +65,19 @@ func (r *watcherManager) WatchPackageRevisions(ctx context.Context, filter repos r.mutex.Lock() defer r.mutex.Unlock() + // reap any dead watchers + for i, watcher := range r.watchers { + if watcher == nil { + continue + } + + if err := watcher.isDoneFunction(); err != nil { + klog.Infof("stopping watcher in reaper: %v", err) + r.watchers[i] = nil + continue + } + } + w := &watcher{ isDoneFunction: ctx.Err, callback: callback, diff --git a/pkg/git/git.go b/pkg/git/git.go index 4794acba..1651691b 100644 --- a/pkg/git/git.go +++ b/pkg/git/git.go @@ -293,6 +293,15 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit mainBranch := r.branch.RefInLocal() // Looking for the registered branch + // if a cache is available, use it + cache := repository.PackageRevisionCacheFromContext(ctx) + draftCache := 0 + tagCache := 0 + mainCache := 0 + draftLoaded := 0 + tagLoaded := 0 + mainLoaded := 0 + for { ref, err := refs.Next() if err == io.EOF { @@ -305,9 +314,27 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit continue case isProposedBranchNameInLocal(ref.Name()), isDraftBranchNameInLocal(ref.Name()): - draft, err := r.loadDraft(ctx, ref) - if err != nil { - return nil, fmt.Errorf("failed to load package draft %q: %w", name.String(), err) + var draft *gitPackageRevision + if entry, ok := cache[ref.Name().String()]; ok { + if entry.Version == ref.Hash().String() { + dd, good := entry.PackageRevision.(*gitPackageRevision) + if !good { + klog.Warningf("Found current cached branch %s version %s, but it is not a gitPackageRevision", ref.Name(), entry.Version) + } else { + draft = dd + draftCache += 1 + } + } + } + + if draft == nil { + draft, err = r.loadDraft(ctx, ref) + if err != nil { + return nil, fmt.Errorf("failed to load package draft %q: %w", name.String(), err) + } + if draft != nil { + draftLoaded += 1 + } } if draft != nil { drafts = append(drafts, draft) @@ -315,24 +342,61 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit klog.Warningf("no package draft found for ref %v", ref) } case isTagInLocalRepo(ref.Name()): - tagged, err := r.loadTaggedPackages(ctx, ref) - if err != nil { - // this tag is not associated with any package (e.g. could be a release tag) - continue + var tagged *gitPackageRevision + if entry, ok := cache[ref.Name().String()]; ok { + if entry.Version == ref.Hash().String() { + dd, good := entry.PackageRevision.(*gitPackageRevision) + if !good { + klog.Warningf("Found current cached branch %s version %s, but it is not a gitPackageRevision", ref.Name(), entry.Version) + } else { + tagged = dd + tagCache += 1 + } + } } - for _, p := range tagged { - if filter.Matches(p) { - result = append(result, p) + if tagged == nil { + tagged, err = r.loadTaggedPackage(ctx, ref) + if err != nil { + // this tag is not associated with any package (e.g. could be a release tag) + continue } + if tagged != nil { + tagLoaded += 1 + } + } + if tagged != nil && filter.Matches(tagged) { + result = append(result, tagged) } } } if main != nil { - // TODO: ignore packages that are unchanged in main branch, compared to a tagged version? - mainpkgs, err := r.discoverFinalizedPackages(ctx, main) - if err != nil { - return nil, err + // Look for any package whose cached identifier starts with main.Name() + // There will be one for each pacakge found in main, but they all will have the same + // hash. If that matches main.Hash() there is no change in main and so we can just + // copy all the packages rather than rediscovering. + var mainpkgs []*gitPackageRevision + for k, v := range cache { + if strings.Index(k, main.Name().String()) == 0 { + if v.Version != main.Hash().String() { + continue + } + gpr, ok := v.PackageRevision.(*gitPackageRevision) + if !ok { + klog.Warningf("Found current cached main package %s version %s, but it is not a gitPackageRevision", k, v.Version) + } else { + mainpkgs = append(mainpkgs, gpr) + mainCache += 1 + } + } + } + if len(mainpkgs) == 0 { + mp, err := r.discoverFinalizedPackages(ctx, main) + if err != nil { + return nil, err + } + mainpkgs = mp + mainLoaded = len(mainpkgs) } for _, p := range mainpkgs { if filter.Matches(p) { @@ -347,6 +411,8 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit } } + klog.Infof("repo %s/%s: %d draftCache, %d draftLoaded, %d tagCache, %d tagLoaded, %d mainCache, %d mainLoaded", r.namespace, r.name, + draftCache, draftLoaded, tagCache, tagLoaded, mainCache, mainLoaded) return result, nil } @@ -768,8 +834,8 @@ func parseDraftName(draft *plumbing.Reference) (name string, workspaceName v1alp return name, workspaceName, nil } -func (r *gitRepository) loadTaggedPackages(ctx context.Context, tag *plumbing.Reference) ([]*gitPackageRevision, error) { - ctx, span := tracer.Start(ctx, "gitRepository::loadTaggedPackages", trace.WithAttributes()) +func (r *gitRepository) loadTaggedPackage(ctx context.Context, tag *plumbing.Reference) (*gitPackageRevision, error) { + ctx, span := tracer.Start(ctx, "gitRepository::loadTaggedPackage", trace.WithAttributes()) defer span.End() name, ok := getTagNameInLocalRepo(tag.Name()) @@ -818,10 +884,7 @@ func (r *gitRepository) loadTaggedPackages(ctx context.Context, tag *plumbing.Re return nil, err } - return []*gitPackageRevision{ - packageRevision, - }, nil - + return packageRevision, nil } func (r *gitRepository) dumpAllRefs() { @@ -1089,8 +1152,6 @@ func (r *gitRepository) pushAndCleanup(ctx context.Context, ph *pushRefSpecBuild return err } - klog.Infof("pushing refs: %v", specs) - if err := r.doGitWithAuth(ctx, func(auth transport.AuthMethod) error { return r.repo.Push(&git.PushOptions{ RemoteName: OriginName, @@ -1703,9 +1764,6 @@ func (r *gitRepository) discoverPackagesInTree(commit *object.Commit, opt Discov return nil, err } - if opt.FilterPrefix == "" { - klog.Infof("discovered %d packages @%v", len(t.packages), commit.Hash) - } return t, nil } diff --git a/pkg/git/package.go b/pkg/git/package.go index c39962d6..f0747eca 100644 --- a/pkg/git/package.go +++ b/pkg/git/package.go @@ -84,6 +84,18 @@ func (p *gitPackageRevision) UID() types.UID { return p.uid() } +func (p *gitPackageRevision) CachedIdentifier() repository.CachedIdentifier { + if p.ref != nil { + k := p.ref.Name().String() + if p.revision == string(p.repo.branch) { + k += ":" + p.path + } + return repository.CachedIdentifier{Key: k, Version: p.ref.Hash().String()} + } + + return repository.CachedIdentifier{} +} + func (p *gitPackageRevision) ResourceVersion() string { return p.commit.String() } diff --git a/pkg/oci/oci.go b/pkg/oci/oci.go index e43157fe..2cffebc2 100644 --- a/pkg/oci/oci.go +++ b/pkg/oci/oci.go @@ -249,6 +249,10 @@ func (p *ociPackageRevision) ToMainPackageRevision() repository.PackageRevision panic("unimplemented") } +func (p *ociPackageRevision) CachedIdentifier() repository.CachedIdentifier { + return repository.CachedIdentifier{Key: p.packageName + ":" + string(p.workspaceName), Version: p.resourceVersion} +} + type ociPackageRevision struct { digestName oci.ImageDigestName packageName string diff --git a/pkg/repository/repository.go b/pkg/repository/repository.go index 96e21c8e..3d2bd5ea 100644 --- a/pkg/repository/repository.go +++ b/pkg/repository/repository.go @@ -47,6 +47,37 @@ func (n PackageKey) String() string { return fmt.Sprintf("Repository: %q, Package: %q", n.Repository, n.Package) } +// CachedIdentier is a used by a cache and underlying storage +// implementation to avoid unnecessary reloads +type CachedIdentifier struct { + // Key uniquely identifies the resource in the underlying storage + Key string + + // Version uniquely identifies the version of the resource in the underlying storage + Version string +} + +type PackageRevisionCacheEntry struct { + Version string + PackageRevision PackageRevision +} + +type PackageRevisionCache map[string]PackageRevisionCacheEntry + +type packageCacheKey struct{} + +func ContextWithPackageRevisionCache(ctx context.Context, cache PackageRevisionCache) context.Context { + return context.WithValue(ctx, packageCacheKey{}, cache) +} + +func PackageRevisionCacheFromContext(ctx context.Context) PackageRevisionCache { + cache, ok := ctx.Value(packageCacheKey{}).(PackageRevisionCache) + if !ok { + cache = make(PackageRevisionCache) + } + return cache +} + // PackageRevision is an abstract package version. // We have a single object for both Revision and Resources, because conceptually they are one object. // The best way we've found (so far) to represent them in k8s is as two resources, but they map to the same object. @@ -66,6 +97,9 @@ type PackageRevision interface { // Key returns the "primary key" of the package. Key() PackageRevisionKey + // CachedIdentier returns a unique identifer for this package revision and version + CachedIdentifier() CachedIdentifier + // Lifecycle returns the current lifecycle state of the package. Lifecycle() v1alpha1.PackageRevisionLifecycle diff --git a/test/e2e/suite_utils.go b/test/e2e/suite_utils.go index d1c5c53c..d123f789 100644 --- a/test/e2e/suite_utils.go +++ b/test/e2e/suite_utils.go @@ -307,7 +307,8 @@ func (t *TestSuite) MustNotExist(ctx context.Context, obj client.Object) { // provided name and namespace is ready, i.e. the Ready condition is true. // It also queries for Functions and PackageRevisions, to ensure these are also // ready - this is an artifact of the way we've implemented the aggregated apiserver, -// where the first fetch can sometimes be synchronous. +// where the first fetch will block on the cache loading. Wait up to two minutes for the +// package revisions and functions. func (t *TestSuite) WaitUntilRepositoryReady(ctx context.Context, name, namespace string) { t.Helper() nn := types.NamespacedName{ @@ -338,7 +339,7 @@ func (t *TestSuite) WaitUntilRepositoryReady(ctx context.Context, name, namespac } // While we're using an aggregated apiserver, make sure we can query the generated objects - if err := wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) { + if err := wait.PollImmediateWithContext(ctx, time.Second, 120*time.Second, func(ctx context.Context) (bool, error) { var revisions porchapi.PackageRevisionList if err := t.Client.List(ctx, &revisions, client.InNamespace(nn.Namespace)); err != nil { innerErr = err