From 51ad0ff6221ea2ec605719328a6e55d12207ede8 Mon Sep 17 00:00:00 2001 From: James McDermott Date: Mon, 30 Sep 2024 18:01:34 +0100 Subject: [PATCH] Issue #657 - session conflict handling - added mutex-based handling to PackageRevision.Update operation - the meat of the operation is in packagecommon.go, so it covers some other operations as well: - propose - packageRevisionsApproval.Update (approve, propose-delete) - added tests for operations mentioned above https://github.com/nephio-project/nephio/issues/657 --- pkg/registry/porch/packagecommon.go | 13 ++ pkg/registry/porch/packagerevision.go | 33 +++-- test/e2e/e2e_test.go | 173 ++++++++++++++++++++++++-- 3 files changed, 189 insertions(+), 30 deletions(-) diff --git a/pkg/registry/porch/packagecommon.go b/pkg/registry/porch/packagecommon.go index 32b26b95..25875ff5 100644 --- a/pkg/registry/porch/packagecommon.go +++ b/pkg/registry/porch/packagecommon.go @@ -210,6 +210,19 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string, return nil, false, apierrors.NewBadRequest("namespace must be specified") } + packageMutexKey := fmt.Sprintf("%s/%s", ns, name) + packageMutex := getMutexForPackage(packageMutexKey) + + lockAcquired := packageMutex.TryLock() + if !lockAcquired { + return nil, false, + apierrors.NewConflict( + api.Resource("packagerevisions"), + name, + fmt.Errorf(GenericConflictErrorMsg, "package revision", packageMutexKey)) + } + defer packageMutex.Unlock() + // isCreate tracks whether this is an update that creates an object (this happens in server-side apply) isCreate := false diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index a1a47ea0..ab103b4d 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -39,8 +39,8 @@ var mutexMapMutex sync.Mutex var pkgRevOperationMutexes = map[string]*sync.Mutex{} const ( - CreateConflictErrorMsg = "another request is already in progress to create %s with details %s" - DeleteConflictErrorMsg = "another request is already in progress to delete %s \"%s\"" + CreateConflictErrorMsg = "another request is already in progress to create %s with details %s" + GenericConflictErrorMsg = "another request is already in progress on %s \"%s\"" ) type packageRevisions struct { @@ -179,13 +179,7 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj newApiPkgRev.Spec.PackageName, newApiPkgRev.Spec.WorkspaceName) - mutexMapMutex.Lock() - packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] - if !alreadyPresent { - packageMutex = &sync.Mutex{} - pkgRevOperationMutexes[packageMutexKey] = packageMutex - } - mutexMapMutex.Unlock() + packageMutex := getMutexForPackage(packageMutexKey) lockAcquired := packageMutex.TryLock() if !lockAcquired { @@ -264,13 +258,7 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida } packageMutexKey := fmt.Sprintf("%s/%s", ns, name) - mutexMapMutex.Lock() - packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] - if !alreadyPresent { - packageMutex = &sync.Mutex{} - pkgRevOperationMutexes[packageMutexKey] = packageMutex - } - mutexMapMutex.Unlock() + packageMutex := getMutexForPackage(packageMutexKey) lockAcquired := packageMutex.TryLock() if !lockAcquired { @@ -278,7 +266,7 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida apierrors.NewConflict( api.Resource("packagerevisions"), name, - fmt.Errorf(DeleteConflictErrorMsg, "package revision", packageMutexKey)) + fmt.Errorf(GenericConflictErrorMsg, "package revision", packageMutexKey)) } defer packageMutex.Unlock() @@ -289,3 +277,14 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida // TODO: Should we do an async delete? return apiPkgRev, true, nil } + +func getMutexForPackage(packageMutexKey string) *sync.Mutex { + mutexMapMutex.Lock() + defer mutexMapMutex.Unlock() + packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey] + if !alreadyPresent { + packageMutex = &sync.Mutex{} + pkgRevOperationMutexes[packageMutexKey] = packageMutex + } + return packageMutex +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 118cd730..c7643fe4 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -295,17 +295,12 @@ func (t *PorchSuite) TestConcurrentClones(ctx context.Context) { } results := RunInParallel(cloneFunction, cloneFunction) - expectedResultCount := 2 - actualResultCount := len(results) - assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) - assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") }) - assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") - + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) } func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { @@ -386,7 +381,7 @@ func (t *PorchSuite) TestConcurrentInits(ctx context.Context) { conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") }) - assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) } func (t *PorchSuite) TestInitTaskPackage(ctx context.Context) { @@ -714,16 +709,12 @@ func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) { editOperation, editOperation) - expectedResultCount := 2 - actualResultCount := len(results) - assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount) - assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") }) - assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) // Check its task list var pkgRev porchapi.PackageRevision @@ -1000,6 +991,68 @@ func (t *PorchSuite) TestProposeApprove(ctx context.Context) { } } +func (t *PorchSuite) TestConcurrentProposeApprove(ctx context.Context) { + const ( + repository = "lifecycle" + packageName = "test-package" + workspace = "workspace" + ) + + // Register the repository + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := t.CreatePackageSkeleton(repository, packageName, workspace) + pr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeInit, + Init: &porchapi.PackageInitTaskSpec{}, + }, + } + t.CreateF(ctx, pr) + + var pkg porchapi.PackageRevision + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: pr.Name, + }, &pkg) + + // Propose the package revision to be finalized + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed + proposeFunction := func() any { + return t.Client.Update(ctx, &pkg) + } + proposeResults := RunInParallel(proposeFunction, proposeFunction) + + assert.Contains(t, proposeResults, nil, "expected one 'propose' request to succeed, but did not happen - results: %v", proposeResults) + + conflictFailurePresent := slices.ContainsFunc(proposeResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'propose' request to fail with a conflict, but did not happen - results: %v", proposeResults) + + var proposed porchapi.PackageRevision + t.GetF(ctx, client.ObjectKey{ + Namespace: t.Namespace, + Name: pr.Name, + }, &proposed) + + // Approve the package + proposed.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished + approveFunction := func() any { + _, err := t.Clientset.PorchV1alpha1().PackageRevisions(proposed.Namespace).UpdateApproval(ctx, proposed.Name, &proposed, metav1.UpdateOptions{}) + return err + } + approveResults := RunInParallel(approveFunction, approveFunction) + + assert.Contains(t, approveResults, nil, "expected one 'approve' request to succeed, but did not happen - results: %v", approveResults) + + conflictFailurePresent = slices.ContainsFunc(approveResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'approve' request to fail with a conflict, but did not happen - results: %v", approveResults) +} + func (t *PorchSuite) TestDeleteDraft(ctx context.Context) { const ( repository = "delete-draft" @@ -1067,7 +1120,7 @@ func (t *PorchSuite) TestConcurrentDeletes(ctx context.Context) { conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") }) - assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen") + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) t.MustNotExist(ctx, &draft) } @@ -1158,6 +1211,44 @@ func (t *PorchSuite) TestDeleteFinal(ctx context.Context) { t.mustNotExist(ctx, &pkg) } +func (t *PorchSuite) TestConcurrentProposeDeletes(ctx context.Context) { + const ( + repository = "delete-final" + packageName = "test-delete-final" + workspace = "workspace" + ) + + // Register the repository and create a draft package + t.RegisterMainGitRepositoryF(ctx, repository) + created := t.CreatePackageDraftF(ctx, repository, packageName, workspace) + // Check the package exists + var pkg porchapi.PackageRevision + t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &pkg) + + // Propose and approve the package revision to be finalized + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed + t.UpdateF(ctx, &pkg) + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished + t.UpdateApprovalF(ctx, &pkg, metav1.UpdateOptions{}) + + t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &pkg) + + // Propose deletion with two clients at once + pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleDeletionProposed + proposeDeleteFunction := func() any { + _, err := t.Clientset.PorchV1alpha1().PackageRevisions(pkg.Namespace).UpdateApproval(ctx, pkg.Name, &pkg, metav1.UpdateOptions{}) + return err + } + proposeDeleteResults := RunInParallel(proposeDeleteFunction, proposeDeleteFunction) + + assert.Contains(t, proposeDeleteResults, nil, "expected one 'propose-delete' request to succeed, but did not happen - results: %v", proposeDeleteResults) + + conflictFailurePresent := slices.ContainsFunc(proposeDeleteResults, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one 'propose-delete' request to fail with a conflict, but did not happen") +} + func (t *PorchSuite) TestProposeDeleteAndUndo(ctx context.Context) { const ( repository = "test-propose-delete-and-undo" @@ -1518,7 +1609,63 @@ func (t *PorchSuite) TestPackageUpdate(ctx context.Context) { if _, found := revisionResources.Spec.Resources["resourcequota.yaml"]; !found { t.Errorf("Updated package should contain 'resourcequota.yaml` file") } +} + +func (t *PorchSuite) TestConcurrentPackageUpdates(ctx context.Context) { + const ( + gitRepository = "package-update" + packageName = "testns" + workspace = "test-workspace" + ) + + t.RegisterGitRepositoryF(ctx, testBlueprintsRepo, "test-blueprints", "") + + var list porchapi.PackageRevisionList + t.ListE(ctx, &list, client.InNamespace(t.Namespace)) + + basensV1 := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v1"}) + basensV2 := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v2"}) + + // Register the repository as 'downstream' + t.RegisterMainGitRepositoryF(ctx, gitRepository) + + // Create PackageRevision from upstream repo + pr := t.CreatePackageSkeleton(gitRepository, packageName, workspace) + pr.Spec.Tasks = []porchapi.Task{ + { + Type: porchapi.TaskTypeClone, + Clone: &porchapi.PackageCloneTaskSpec{ + Upstream: porchapi.UpstreamPackage{ + UpstreamRef: &porchapi.PackageRevisionRef{ + Name: basensV1.Name, + }, + }, + }, + }, + } + t.CreateF(ctx, pr) + upstream := pr.Spec.Tasks[0].Clone.Upstream.DeepCopy() + upstream.UpstreamRef.Name = basensV2.Name + pr.Spec.Tasks = append(pr.Spec.Tasks, porchapi.Task{ + Type: porchapi.TaskTypeUpdate, + Update: &porchapi.PackageUpdateTaskSpec{ + Upstream: *upstream, + }, + }) + + // Two clients at the same time try to update the downstream package + cloneFunction := func() any { + return t.Client.Update(ctx, pr, &client.UpdateOptions{}) + } + results := RunInParallel(cloneFunction, cloneFunction) + + assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results) + + conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }) + assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results) } func (t *PorchSuite) TestRegisterRepository(ctx context.Context) {