Skip to content

Commit

Permalink
Issue #657 - session conflict handling
Browse files Browse the repository at this point in the history
- added mutex-based handling to PackageRevision.Update operation
  - the meat of the operation is in packagecommon.go, so it
    covers some other operations as well:
    - propose
    - packageRevisionsApproval.Update (approve, propose-delete)
- added tests for operations mentioned above

nephio-project/nephio#657
  • Loading branch information
JamesMcDermott committed Sep 30, 2024
1 parent 2bf829c commit 51ad0ff
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 30 deletions.
13 changes: 13 additions & 0 deletions pkg/registry/porch/packagecommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string,
return nil, false, apierrors.NewBadRequest("namespace must be specified")
}

packageMutexKey := fmt.Sprintf("%s/%s", ns, name)
packageMutex := getMutexForPackage(packageMutexKey)

lockAcquired := packageMutex.TryLock()
if !lockAcquired {
return nil, false,
apierrors.NewConflict(
api.Resource("packagerevisions"),
name,
fmt.Errorf(GenericConflictErrorMsg, "package revision", packageMutexKey))
}
defer packageMutex.Unlock()

// isCreate tracks whether this is an update that creates an object (this happens in server-side apply)
isCreate := false

Expand Down
33 changes: 16 additions & 17 deletions pkg/registry/porch/packagerevision.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ var mutexMapMutex sync.Mutex
var pkgRevOperationMutexes = map[string]*sync.Mutex{}

const (
CreateConflictErrorMsg = "another request is already in progress to create %s with details %s"
DeleteConflictErrorMsg = "another request is already in progress to delete %s \"%s\""
CreateConflictErrorMsg = "another request is already in progress to create %s with details %s"
GenericConflictErrorMsg = "another request is already in progress on %s \"%s\""
)

type packageRevisions struct {
Expand Down Expand Up @@ -179,13 +179,7 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj
newApiPkgRev.Spec.PackageName,
newApiPkgRev.Spec.WorkspaceName)

mutexMapMutex.Lock()
packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey]
if !alreadyPresent {
packageMutex = &sync.Mutex{}
pkgRevOperationMutexes[packageMutexKey] = packageMutex
}
mutexMapMutex.Unlock()
packageMutex := getMutexForPackage(packageMutexKey)

lockAcquired := packageMutex.TryLock()
if !lockAcquired {
Expand Down Expand Up @@ -264,21 +258,15 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida
}

packageMutexKey := fmt.Sprintf("%s/%s", ns, name)
mutexMapMutex.Lock()
packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey]
if !alreadyPresent {
packageMutex = &sync.Mutex{}
pkgRevOperationMutexes[packageMutexKey] = packageMutex
}
mutexMapMutex.Unlock()
packageMutex := getMutexForPackage(packageMutexKey)

lockAcquired := packageMutex.TryLock()
if !lockAcquired {
return nil, false,
apierrors.NewConflict(
api.Resource("packagerevisions"),
name,
fmt.Errorf(DeleteConflictErrorMsg, "package revision", packageMutexKey))
fmt.Errorf(GenericConflictErrorMsg, "package revision", packageMutexKey))
}
defer packageMutex.Unlock()

Expand All @@ -289,3 +277,14 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida
// TODO: Should we do an async delete?
return apiPkgRev, true, nil
}

func getMutexForPackage(packageMutexKey string) *sync.Mutex {
mutexMapMutex.Lock()
defer mutexMapMutex.Unlock()
packageMutex, alreadyPresent := pkgRevOperationMutexes[packageMutexKey]
if !alreadyPresent {
packageMutex = &sync.Mutex{}
pkgRevOperationMutexes[packageMutexKey] = packageMutex
}
return packageMutex
}
173 changes: 160 additions & 13 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,17 +295,12 @@ func (t *PorchSuite) TestConcurrentClones(ctx context.Context) {
}
results := RunInParallel(cloneFunction, cloneFunction)

expectedResultCount := 2
actualResultCount := len(results)
assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount)

assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results)

conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
})
assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen")

assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results)
}

func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) {
Expand Down Expand Up @@ -386,7 +381,7 @@ func (t *PorchSuite) TestConcurrentInits(ctx context.Context) {
conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
})
assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen")
assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results)
}

func (t *PorchSuite) TestInitTaskPackage(ctx context.Context) {
Expand Down Expand Up @@ -714,16 +709,12 @@ func (t *PorchSuite) TestConcurrentEdits(ctx context.Context) {
editOperation,
editOperation)

expectedResultCount := 2
actualResultCount := len(results)
assert.Equal(t, expectedResultCount, actualResultCount, "expected %d results but was %d", expectedResultCount, actualResultCount)

assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results)

conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
})
assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen")
assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results)

// Check its task list
var pkgRev porchapi.PackageRevision
Expand Down Expand Up @@ -1000,6 +991,68 @@ func (t *PorchSuite) TestProposeApprove(ctx context.Context) {
}
}

func (t *PorchSuite) TestConcurrentProposeApprove(ctx context.Context) {
const (
repository = "lifecycle"
packageName = "test-package"
workspace = "workspace"
)

// Register the repository
t.RegisterMainGitRepositoryF(ctx, repository)

// Create a new package (via init)
pr := t.CreatePackageSkeleton(repository, packageName, workspace)
pr.Spec.Tasks = []porchapi.Task{
{
Type: porchapi.TaskTypeInit,
Init: &porchapi.PackageInitTaskSpec{},
},
}
t.CreateF(ctx, pr)

var pkg porchapi.PackageRevision
t.GetF(ctx, client.ObjectKey{
Namespace: t.Namespace,
Name: pr.Name,
}, &pkg)

// Propose the package revision to be finalized
pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed
proposeFunction := func() any {
return t.Client.Update(ctx, &pkg)
}
proposeResults := RunInParallel(proposeFunction, proposeFunction)

assert.Contains(t, proposeResults, nil, "expected one 'propose' request to succeed, but did not happen - results: %v", proposeResults)

conflictFailurePresent := slices.ContainsFunc(proposeResults, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
})
assert.True(t, conflictFailurePresent, "expected one 'propose' request to fail with a conflict, but did not happen - results: %v", proposeResults)

var proposed porchapi.PackageRevision
t.GetF(ctx, client.ObjectKey{
Namespace: t.Namespace,
Name: pr.Name,
}, &proposed)

// Approve the package
proposed.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished
approveFunction := func() any {
_, err := t.Clientset.PorchV1alpha1().PackageRevisions(proposed.Namespace).UpdateApproval(ctx, proposed.Name, &proposed, metav1.UpdateOptions{})
return err
}
approveResults := RunInParallel(approveFunction, approveFunction)

assert.Contains(t, approveResults, nil, "expected one 'approve' request to succeed, but did not happen - results: %v", approveResults)

conflictFailurePresent = slices.ContainsFunc(approveResults, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
})
assert.True(t, conflictFailurePresent, "expected one 'approve' request to fail with a conflict, but did not happen - results: %v", approveResults)
}

func (t *PorchSuite) TestDeleteDraft(ctx context.Context) {
const (
repository = "delete-draft"
Expand Down Expand Up @@ -1067,7 +1120,7 @@ func (t *PorchSuite) TestConcurrentDeletes(ctx context.Context) {
conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
})
assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen")
assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results)
t.MustNotExist(ctx, &draft)
}

Expand Down Expand Up @@ -1158,6 +1211,44 @@ func (t *PorchSuite) TestDeleteFinal(ctx context.Context) {
t.mustNotExist(ctx, &pkg)
}

func (t *PorchSuite) TestConcurrentProposeDeletes(ctx context.Context) {
const (
repository = "delete-final"
packageName = "test-delete-final"
workspace = "workspace"
)

// Register the repository and create a draft package
t.RegisterMainGitRepositoryF(ctx, repository)
created := t.CreatePackageDraftF(ctx, repository, packageName, workspace)
// Check the package exists
var pkg porchapi.PackageRevision
t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &pkg)

// Propose and approve the package revision to be finalized
pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleProposed
t.UpdateF(ctx, &pkg)
pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecyclePublished
t.UpdateApprovalF(ctx, &pkg, metav1.UpdateOptions{})

t.MustExist(ctx, client.ObjectKey{Namespace: t.Namespace, Name: created.Name}, &pkg)

// Propose deletion with two clients at once
pkg.Spec.Lifecycle = porchapi.PackageRevisionLifecycleDeletionProposed
proposeDeleteFunction := func() any {
_, err := t.Clientset.PorchV1alpha1().PackageRevisions(pkg.Namespace).UpdateApproval(ctx, pkg.Name, &pkg, metav1.UpdateOptions{})
return err
}
proposeDeleteResults := RunInParallel(proposeDeleteFunction, proposeDeleteFunction)

assert.Contains(t, proposeDeleteResults, nil, "expected one 'propose-delete' request to succeed, but did not happen - results: %v", proposeDeleteResults)

conflictFailurePresent := slices.ContainsFunc(proposeDeleteResults, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
})
assert.True(t, conflictFailurePresent, "expected one 'propose-delete' request to fail with a conflict, but did not happen")
}

func (t *PorchSuite) TestProposeDeleteAndUndo(ctx context.Context) {
const (
repository = "test-propose-delete-and-undo"
Expand Down Expand Up @@ -1518,7 +1609,63 @@ func (t *PorchSuite) TestPackageUpdate(ctx context.Context) {
if _, found := revisionResources.Spec.Resources["resourcequota.yaml"]; !found {
t.Errorf("Updated package should contain 'resourcequota.yaml` file")
}
}

func (t *PorchSuite) TestConcurrentPackageUpdates(ctx context.Context) {
const (
gitRepository = "package-update"
packageName = "testns"
workspace = "test-workspace"
)

t.RegisterGitRepositoryF(ctx, testBlueprintsRepo, "test-blueprints", "")

var list porchapi.PackageRevisionList
t.ListE(ctx, &list, client.InNamespace(t.Namespace))

basensV1 := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v1"})
basensV2 := MustFindPackageRevision(t.T, &list, repository.PackageRevisionKey{Repository: "test-blueprints", Package: "basens", Revision: "v2"})

// Register the repository as 'downstream'
t.RegisterMainGitRepositoryF(ctx, gitRepository)

// Create PackageRevision from upstream repo
pr := t.CreatePackageSkeleton(gitRepository, packageName, workspace)
pr.Spec.Tasks = []porchapi.Task{
{
Type: porchapi.TaskTypeClone,
Clone: &porchapi.PackageCloneTaskSpec{
Upstream: porchapi.UpstreamPackage{
UpstreamRef: &porchapi.PackageRevisionRef{
Name: basensV1.Name,
},
},
},
},
}
t.CreateF(ctx, pr)

upstream := pr.Spec.Tasks[0].Clone.Upstream.DeepCopy()
upstream.UpstreamRef.Name = basensV2.Name
pr.Spec.Tasks = append(pr.Spec.Tasks, porchapi.Task{
Type: porchapi.TaskTypeUpdate,
Update: &porchapi.PackageUpdateTaskSpec{
Upstream: *upstream,
},
})

// Two clients at the same time try to update the downstream package
cloneFunction := func() any {
return t.Client.Update(ctx, pr, &client.UpdateOptions{})
}
results := RunInParallel(cloneFunction, cloneFunction)

assert.Contains(t, results, nil, "expected one request to succeed, but did not happen - results: %v", results)

conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
})
assert.True(t, conflictFailurePresent, "expected one request to fail with a conflict, but did not happen - results: %v", results)
}

func (t *PorchSuite) TestRegisterRepository(ctx context.Context) {
Expand Down

0 comments on commit 51ad0ff

Please sign in to comment.