Skip to content

Commit

Permalink
Merge pull request nephio-project#146 from Nordix/cache-refresh-mutex
Browse files Browse the repository at this point in the history
Add mutex to refreshAllCachedPackages and refactor other mutex locations in the cache.
  • Loading branch information
nephio-prow[bot] authored Dec 2, 2024
2 parents 2384654 + 34e5419 commit aeb1c01
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/cache/memory/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (cd *cachedDraft) Close(ctx context.Context, version string) (repository.Pa
if err != nil {
return nil, err
}

if v != cd.cache.lastVersion {
_, _, err = cd.cache.refreshAllCachedPackages(ctx)
if err != nil {
Expand Down
20 changes: 11 additions & 9 deletions pkg/cache/memory/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,33 +114,33 @@ func (r *cachedRepository) getRefreshError() error {
}

func (r *cachedRepository) getPackageRevisions(ctx context.Context, filter repository.ListPackageRevisionFilter, forceRefresh bool) ([]repository.PackageRevision, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

_, packageRevisions, err := r.getCachedPackages(ctx, forceRefresh)
if err != nil {
return nil, err
}

r.mutex.Lock()
defer r.mutex.Unlock()
return toPackageRevisionSlice(packageRevisions, filter), nil
}

func (r *cachedRepository) getPackages(ctx context.Context, filter repository.ListPackageFilter, forceRefresh bool) ([]repository.Package, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

packages, _, err := r.getCachedPackages(ctx, forceRefresh)
if err != nil {
return nil, err
}

r.mutex.Lock()
defer r.mutex.Unlock()
return toPackageSlice(packages, filter), nil
}

// getCachedPackages returns cachedPackages; fetching it if not cached or if forceRefresh.
// mutex must be held.
func (r *cachedRepository) getCachedPackages(ctx context.Context, forceRefresh bool) (map[repository.PackageKey]*cachedPackage, map[repository.PackageRevisionKey]*cachedPackageRevision, error) {
// must hold mutex

r.mutex.Lock()
packages := r.cachedPackages
packageRevisions := r.cachedPackageRevisions
err := r.refreshRevisionsError
Expand All @@ -157,6 +157,7 @@ func (r *cachedRepository) getCachedPackages(ctx context.Context, forceRefresh b
}
}
}
r.mutex.Unlock()

if packages == nil {
packages, packageRevisions, err = r.refreshAllCachedPackages(ctx)
Expand Down Expand Up @@ -192,15 +193,15 @@ func (r *cachedRepository) UpdatePackageRevision(ctx context.Context, old reposi
}

func (r *cachedRepository) update(ctx context.Context, updated repository.PackageRevision) (*cachedPackageRevision, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

// TODO: Technically we only need this package, not all packages
if _, _, err := r.getCachedPackages(ctx, false); err != nil {
klog.Warningf("failed to get cached packages: %v", err)
// TODO: Invalidate all watches? We're dropping an add/update event
return nil, err
}
r.mutex.Lock()
defer r.mutex.Unlock()

k := updated.Key()
// previous := r.cachedPackageRevisions[k]
Expand Down Expand Up @@ -236,7 +237,6 @@ func (r *cachedRepository) update(ctx context.Context, updated repository.Packag
}

func (r *cachedRepository) createMainPackageRevision(ctx context.Context, updatedMain repository.PackageRevision) error {

//Search and delete any old main pkgRev of an older workspace in the cache
for pkgRevKey := range r.cachedPackageRevisions {
if (pkgRevKey.Repository == updatedMain.Key().Repository) && (pkgRevKey.Package == updatedMain.Key().Package) && (pkgRevKey.Revision == updatedMain.Key().Revision) {
Expand Down Expand Up @@ -403,6 +403,8 @@ func (r *cachedRepository) flush() {
func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[repository.PackageKey]*cachedPackage, map[repository.PackageRevisionKey]*cachedPackageRevision, error) {
// TODO: Avoid simultaneous fetches?
// TODO: Push-down partial refresh?
r.mutex.Lock()
defer r.mutex.Unlock()

start := time.Now()
defer func() { klog.Infof("repo %s: refresh finished in %f secs", r.id, time.Since(start).Seconds()) }()
Expand Down

0 comments on commit aeb1c01

Please sign in to comment.