Skip to content

Commit

Permalink
Refactor task package to make mutations more visible
Browse files Browse the repository at this point in the history
  • Loading branch information
liamfallon committed Dec 3, 2024
1 parent 90b2326 commit e339d39
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 166 deletions.
2 changes: 2 additions & 0 deletions pkg/task/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"k8s.io/klog/v2"
)

var _ mutation = &clonePackageMutation{}

type clonePackageMutation struct {
task *api.Task

Expand Down
2 changes: 2 additions & 0 deletions pkg/task/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"sigs.k8s.io/kustomize/kyaml/yaml"
)

var _ mutation = &evalFunctionMutation{}

type evalFunctionMutation struct {
runtime fn.FunctionRuntime
runnerOptions fnruntime.RunnerOptions
Expand Down
165 changes: 1 addition & 164 deletions pkg/task/generictaskhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1"
"github.com/nephio-project/porch/internal/kpt/builtins"
"github.com/nephio-project/porch/internal/kpt/fnruntime"
"github.com/nephio-project/porch/pkg/kpt"
kptfile "github.com/nephio-project/porch/pkg/kpt/api/kptfile/v1"
"github.com/nephio-project/porch/pkg/kpt/fn"
"github.com/nephio-project/porch/pkg/repository"
Expand Down Expand Up @@ -205,7 +204,7 @@ func (th *genericTaskHandler) DoPRResourceMutations(ctx context.Context, pr2Upda
runnerOptions := th.runnerOptionsResolver(oldRes.GetNamespace())

mutations := []mutation{
&mutationReplaceResources{
&replaceResourcesMutation{
newResources: newRes,
oldResources: oldRes,
},
Expand Down Expand Up @@ -485,168 +484,6 @@ func applyResourceMutations(ctx context.Context, draft repository.PackageRevisio
return applied, renderStatus, nil
}

type updatePackageMutation struct {
cloneTask *api.Task
updateTask *api.Task
repoOpener repository.RepositoryOpener
referenceResolver repository.ReferenceResolver
namespace string
pkgName string
}

func (m *updatePackageMutation) apply(ctx context.Context, resources repository.PackageResources) (repository.PackageResources, *api.TaskResult, error) {
ctx, span := tracer.Start(ctx, "updatePackageMutation::Apply", trace.WithAttributes())
defer span.End()

currUpstreamPkgRef, err := m.currUpstream()
if err != nil {
return repository.PackageResources{}, nil, err
}

targetUpstream := m.updateTask.Update.Upstream
if targetUpstream.Type == api.RepositoryTypeGit || targetUpstream.Type == api.RepositoryTypeOCI {
return repository.PackageResources{}, nil, fmt.Errorf("update is not supported for non-porch upstream packages")
}

originalResources, err := (&repository.PackageFetcher{
RepoOpener: m.repoOpener,
ReferenceResolver: m.referenceResolver,
}).FetchResources(ctx, currUpstreamPkgRef, m.namespace)
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("error fetching the resources for package %s with ref %+v",
m.pkgName, *currUpstreamPkgRef)
}

upstreamRevision, err := (&repository.PackageFetcher{
RepoOpener: m.repoOpener,
ReferenceResolver: m.referenceResolver,
}).FetchRevision(ctx, targetUpstream.UpstreamRef, m.namespace)
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("error fetching revision for target upstream %s", targetUpstream.UpstreamRef.Name)
}
upstreamResources, err := upstreamRevision.GetResources(ctx)
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("error fetching resources for target upstream %s", targetUpstream.UpstreamRef.Name)
}

klog.Infof("performing pkg upgrade operation for pkg %s resource counts local[%d] original[%d] upstream[%d]",
m.pkgName, len(resources.Contents), len(originalResources.Spec.Resources), len(upstreamResources.Spec.Resources))

// May be have packageUpdater part of the Porch core to make it easy for testing ?
updatedResources, err := (&repository.DefaultPackageUpdater{}).Update(ctx,
resources,
repository.PackageResources{
Contents: originalResources.Spec.Resources,
},
repository.PackageResources{
Contents: upstreamResources.Spec.Resources,
})
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("error updating the package to revision %s", targetUpstream.UpstreamRef.Name)
}

newUpstream, newUpstreamLock, err := upstreamRevision.GetLock()
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("error fetching the resources for package revisions %s", targetUpstream.UpstreamRef.Name)
}
if err := kpt.UpdateKptfileUpstream("", updatedResources.Contents, newUpstream, newUpstreamLock); err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("failed to apply upstream lock to package %q: %w", m.pkgName, err)
}

// ensure merge-key comment is added to newly added resources.
result, err := ensureMergeKey(ctx, updatedResources)
if err != nil {
klog.Infof("failed to add merge key comments: %v", err)
}
return result, &api.TaskResult{Task: m.updateTask}, nil
}

// Currently assumption is that downstream packages will be forked from a porch package.
// As per current implementation, upstream package ref is stored in a new update task but this may
// change so the logic of figuring out current upstream will live in this function.
func (m *updatePackageMutation) currUpstream() (*api.PackageRevisionRef, error) {
if m.cloneTask == nil || m.cloneTask.Clone == nil {
return nil, fmt.Errorf("package %s does not have original upstream info", m.pkgName)
}
upstream := m.cloneTask.Clone.Upstream
if upstream.Type == api.RepositoryTypeGit || upstream.Type == api.RepositoryTypeOCI {
return nil, fmt.Errorf("upstream package must be porch native package. Found it to be %s", upstream.Type)
}
return upstream.UpstreamRef, nil
}

func findCloneTask(pr *api.PackageRevision) *api.Task {
if len(pr.Spec.Tasks) == 0 {
return nil
}
firstTask := pr.Spec.Tasks[0]
if firstTask.Type == api.TaskTypeClone {
return &firstTask
}
return nil
}

type mutationReplaceResources struct {
newResources *api.PackageRevisionResources
oldResources *api.PackageRevisionResources
}

func (m *mutationReplaceResources) apply(ctx context.Context, resources repository.PackageResources) (repository.PackageResources, *api.TaskResult, error) {
_, span := tracer.Start(ctx, "mutationReplaceResources::Apply", trace.WithAttributes())
defer span.End()

patch := &api.PackagePatchTaskSpec{}

old := resources.Contents
new, err := healConfig(old, m.newResources.Spec.Resources)
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("failed to heal resources: %w", err)
}

for k, newV := range new {
oldV, ok := old[k]
// New config or changed config
if !ok {
patchSpec := api.PatchSpec{
File: k,
PatchType: api.PatchTypeCreateFile,
Contents: newV,
}
patch.Patches = append(patch.Patches, patchSpec)
} else if newV != oldV {
patchSpec, err := GeneratePatch(k, oldV, newV)
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("error generating patch: %w", err)
}
if patchSpec.Contents == "" {
continue
}
patch.Patches = append(patch.Patches, patchSpec)
}
}
for k := range old {
// Deleted config
if _, ok := new[k]; !ok {
patchSpec := api.PatchSpec{
File: k,
PatchType: api.PatchTypeDeleteFile,
}
patch.Patches = append(patch.Patches, patchSpec)
}
}
// If patch is empty, don't create a Task.
var taskResult *api.TaskResult
if len(patch.Patches) > 0 {
taskResult = &api.TaskResult{
Task: &api.Task{
Type: api.TaskTypePatch,
Patch: patch,
},
}
}
return repository.PackageResources{Contents: new}, taskResult, nil
}

func healConfig(old, new map[string]string) (map[string]string, error) {
// Copy comments from old config to new
oldResources, err := (&packageReader{
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestReplaceResources(t *testing.T) {
input := repository.ReadPackage(t, filepath.Join("testdata", "replace"))
nocomment := removeComments(t, input)

replace := &mutationReplaceResources{
replace := &replaceResourcesMutation{
newResources: &v1alpha1.PackageRevisionResources{
Spec: v1alpha1.PackageRevisionResourcesSpec{
Resources: nocomment.Contents,
Expand All @@ -48,7 +48,7 @@ func TestReplaceResources(t *testing.T) {

output, _, err := replace.apply(ctx, input)
if err != nil {
t.Fatalf("mutationReplaceResources.Apply failed: %v", err)
t.Fatalf("replaceResourcesMutation.Apply failed: %v", err)
}

if !cmp.Equal(input, output) {
Expand Down
87 changes: 87 additions & 0 deletions pkg/task/replaceresources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2024 The kpt and Nephio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package task

import (
"context"
"fmt"

api "github.com/nephio-project/porch/api/porch/v1alpha1"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel/trace"
)

var _ mutation = &replaceResourcesMutation{}

type replaceResourcesMutation struct {
newResources *api.PackageRevisionResources
oldResources *api.PackageRevisionResources
}

func (m *replaceResourcesMutation) apply(ctx context.Context, resources repository.PackageResources) (repository.PackageResources, *api.TaskResult, error) {
_, span := tracer.Start(ctx, "mutationReplaceResources::Apply", trace.WithAttributes())
defer span.End()

patch := &api.PackagePatchTaskSpec{}

old := resources.Contents
new, err := healConfig(old, m.newResources.Spec.Resources)
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("failed to heal resources: %w", err)
}

for k, newV := range new {
oldV, ok := old[k]
// New config or changed config
if !ok {
patchSpec := api.PatchSpec{
File: k,
PatchType: api.PatchTypeCreateFile,
Contents: newV,
}
patch.Patches = append(patch.Patches, patchSpec)
} else if newV != oldV {
patchSpec, err := GeneratePatch(k, oldV, newV)
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("error generating patch: %w", err)
}
if patchSpec.Contents == "" {
continue
}
patch.Patches = append(patch.Patches, patchSpec)
}
}
for k := range old {
// Deleted config
if _, ok := new[k]; !ok {
patchSpec := api.PatchSpec{
File: k,
PatchType: api.PatchTypeDeleteFile,
}
patch.Patches = append(patch.Patches, patchSpec)
}
}
// If patch is empty, don't create a Task.
var taskResult *api.TaskResult
if len(patch.Patches) > 0 {
taskResult = &api.TaskResult{
Task: &api.Task{
Type: api.TaskTypePatch,
Patch: patch,
},
}
}
return repository.PackageResources{Contents: new}, taskResult, nil
}
Loading

0 comments on commit e339d39

Please sign in to comment.