Skip to content

Commit

Permalink
Issue #828 - add OTel traces to fill holes in tracing
Browse files Browse the repository at this point in the history
- this required wiring Contexts into some flows to
  propagate the tracing links/trees
  - hence profusion of small edits
- added "[START]" markers to trace names of entrypoint/
  beginning-of-flow trace spans
  - for ease of identification in Jaeger

nephio-project/nephio#828
  • Loading branch information
JamesMcDermott committed Jan 14, 2025
1 parent 9b4e226 commit df98229
Show file tree
Hide file tree
Showing 32 changed files with 174 additions and 99 deletions.
6 changes: 3 additions & 3 deletions pkg/cache/memory/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestPublishedLatest(t *testing.T) {

bucket := revisions[0]
// Expect draft package
if got, want := bucket.Lifecycle(), api.PackageRevisionLifecycleDraft; got != want {
if got, want := bucket.Lifecycle(ctx), api.PackageRevisionLifecycleDraft; got != want {
t.Fatalf("Bucket package lifecycle: got %s, want %s", got, want)
}

Expand Down Expand Up @@ -147,7 +147,7 @@ func TestDeletePublishedMain(t *testing.T) {

bucket := revisions[0]
// Expect draft package
if got, want := bucket.Lifecycle(), api.PackageRevisionLifecycleDraft; got != want {
if got, want := bucket.Lifecycle(ctx), api.PackageRevisionLifecycleDraft; got != want {
t.Fatalf("Bucket package lifecycle: got %s, want %s", got, want)
}

Expand Down Expand Up @@ -184,7 +184,7 @@ func TestDeletePublishedMain(t *testing.T) {

approvedBucket := publishedRevisions[0]

if got, want := approvedBucket.Lifecycle(), api.PackageRevisionLifecyclePublished; got != want {
if got, want := approvedBucket.Lifecycle(ctx), api.PackageRevisionLifecyclePublished; got != want {
t.Fatalf("Approved Bucket package lifecycle: got %s, want %s", got, want)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/cache/memory/packagerevision.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

api "github.com/nephio-project/porch/api/porch/v1alpha1"
"github.com/nephio-project/porch/pkg/repository"
"go.opentelemetry.io/otel/trace"
)

// We take advantage of the cache having a global view of all the packages
Expand All @@ -35,6 +36,9 @@ type cachedPackageRevision struct {
}

func (c *cachedPackageRevision) GetPackageRevision(ctx context.Context) (*api.PackageRevision, error) {
ctx, span := tracer.Start(ctx, "cachedPackageRevision::GetPackageRevision", trace.WithAttributes())
defer span.End()

apiPR, err := c.PackageRevision.GetPackageRevision(ctx)
if err != nil {
return nil, err
Expand Down
24 changes: 15 additions & 9 deletions pkg/cache/memory/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (r *cachedRepository) Refresh(ctx context.Context) error {
}

func (r *cachedRepository) Version(ctx context.Context) (string, error) {
ctx, span := tracer.Start(ctx, "cachedRepository::Version", trace.WithAttributes())
defer span.End()

return r.repo.Version(ctx)
}

Expand Down Expand Up @@ -122,7 +125,7 @@ func (r *cachedRepository) getPackageRevisions(ctx context.Context, filter repos
}
r.mutex.Lock()
defer r.mutex.Unlock()
return toPackageRevisionSlice(packageRevisions, filter), nil
return toPackageRevisionSlice(ctx, packageRevisions, filter), nil
}

func (r *cachedRepository) getPackages(ctx context.Context, filter repository.ListPackageFilter, forceRefresh bool) ([]repository.Package, error) {
Expand Down Expand Up @@ -196,12 +199,12 @@ func (r *cachedRepository) ClosePackageRevisionDraft(ctx context.Context, prd re

var publishedRevisions []string
for _, rev := range revisions {
if v1alpha1.LifecycleIsPublished(rev.Lifecycle()) {
if v1alpha1.LifecycleIsPublished(rev.Lifecycle(ctx)) {
publishedRevisions = append(publishedRevisions, rev.Key().Revision)
}
}

nextVersion, err := repository.NextRevisionNumber(publishedRevisions)
nextVersion, err := repository.NextRevisionNumber(ctx, publishedRevisions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -234,7 +237,7 @@ func (r *cachedRepository) update(ctx context.Context, updated repository.Packag
k := updated.Key()
// previous := r.cachedPackageRevisions[k]

if v1alpha1.LifecycleIsPublished(updated.Lifecycle()) {
if v1alpha1.LifecycleIsPublished(updated.Lifecycle(ctx)) {
oldKey := repository.PackageRevisionKey{
Repository: k.Repository,
Package: k.Package,
Expand All @@ -247,10 +250,10 @@ func (r *cachedRepository) update(ctx context.Context, updated repository.Packag
r.cachedPackageRevisions[k] = cached

// Recompute latest package revisions.
identifyLatestRevisions(r.cachedPackageRevisions)
identifyLatestRevisions(ctx, r.cachedPackageRevisions)

// Create the main package revision
if v1alpha1.LifecycleIsPublished(updated.Lifecycle()) {
if v1alpha1.LifecycleIsPublished(updated.Lifecycle(ctx)) {
updatedMain := updated.ToMainPackageRevision()
err := r.createMainPackageRevision(ctx, updatedMain)
if err != nil {
Expand Down Expand Up @@ -325,7 +328,7 @@ func (r *cachedRepository) DeletePackageRevision(ctx context.Context, old reposi

// Recompute latest package revisions.
// TODO: Only for affected object / key?
identifyLatestRevisions(r.cachedPackageRevisions)
identifyLatestRevisions(ctx, r.cachedPackageRevisions)
}

r.mutex.Unlock()
Expand Down Expand Up @@ -408,7 +411,7 @@ func (r *cachedRepository) pollOnce(ctx context.Context) {
start := time.Now()
klog.Infof("repo %s: poll started", r.id)
defer func() { klog.Infof("repo %s: poll finished in %f secs", r.id, time.Since(start).Seconds()) }()
ctx, span := tracer.Start(ctx, "Repository::pollOnce", trace.WithAttributes())
ctx, span := tracer.Start(ctx, "[START]::Repository::pollOnce", trace.WithAttributes())
defer span.End()

if _, err := r.getPackageRevisions(ctx, repository.ListPackageRevisionFilter{}, true); err != nil {
Expand All @@ -432,6 +435,9 @@ func (r *cachedRepository) flush() {
// it also triggers notifications for all package changes.
// mutex must be held.
func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[repository.PackageKey]*cachedPackage, map[repository.PackageRevisionKey]*cachedPackageRevision, error) {
ctx, span := tracer.Start(ctx, "cachedRepository::refreshAllCachedPackages", trace.WithAttributes())
defer span.End()

// TODO: Avoid simultaneous fetches?
// TODO: Push-down partial refresh?
r.mutex.Lock()
Expand Down Expand Up @@ -567,7 +573,7 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
newPackageRevisionMap[k] = pkgRev
}

identifyLatestRevisions(newPackageRevisionMap)
identifyLatestRevisions(ctx, newPackageRevisionMap)

newPackageMap := make(map[repository.PackageKey]*cachedPackage)

Expand Down
12 changes: 7 additions & 5 deletions pkg/cache/memory/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package memory

import (
"context"
"sort"
"strings"

Expand All @@ -24,7 +25,7 @@ import (
"k8s.io/klog/v2"
)

func identifyLatestRevisions(result map[repository.PackageRevisionKey]*cachedPackageRevision) {
func identifyLatestRevisions(ctx context.Context, result map[repository.PackageRevisionKey]*cachedPackageRevision) {
// Compute the latest among the different revisions of the same package.
// The map is keyed by the package name; Values are the latest revision found so far.

Expand All @@ -35,7 +36,7 @@ func identifyLatestRevisions(result map[repository.PackageRevisionKey]*cachedPac

// Check if the current package revision is more recent than the one seen so far.
// Only consider Published packages
if !v1alpha1.LifecycleIsPublished(current.Lifecycle()) {
if !v1alpha1.LifecycleIsPublished(current.Lifecycle(ctx)) {
continue
}

Expand Down Expand Up @@ -64,10 +65,11 @@ func identifyLatestRevisions(result map[repository.PackageRevisionKey]*cachedPac
}
}

func toPackageRevisionSlice(cached map[repository.PackageRevisionKey]*cachedPackageRevision, filter repository.ListPackageRevisionFilter) []repository.PackageRevision {
func toPackageRevisionSlice(
ctx context.Context, cached map[repository.PackageRevisionKey]*cachedPackageRevision, filter repository.ListPackageRevisionFilter) []repository.PackageRevision {
result := make([]repository.PackageRevision, 0, len(cached))
for _, p := range cached {
if filter.Matches(p) {
if filter.Matches(ctx, p) {
result = append(result, p)
}
}
Expand All @@ -89,7 +91,7 @@ func toPackageRevisionSlice(cached map[repository.PackageRevisionKey]*cachedPack
default:
// Equal. Compare next element
}
switch res := strings.Compare(string(result[i].Lifecycle()), string(result[j].Lifecycle())); {
switch res := strings.Compare(string(result[i].Lifecycle(ctx)), string(result[j].Lifecycle(ctx))); {
case res < 0:
return true
case res > 0:
Expand Down
46 changes: 28 additions & 18 deletions pkg/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type GitRepositoryOptions struct {
}

func OpenRepository(ctx context.Context, name, namespace string, spec *configapi.GitRepository, deployment bool, root string, opts GitRepositoryOptions) (GitRepository, error) {
ctx, span := tracer.Start(ctx, "OpenRepository", trace.WithAttributes())
ctx, span := tracer.Start(ctx, "git.go::OpenRepository", trace.WithAttributes())
defer span.End()

replace := strings.NewReplacer("/", "-", ":", "-")
Expand Down Expand Up @@ -263,6 +263,7 @@ func (r *gitRepository) DeletePackage(ctx context.Context, obj repository.Packag
func (r *gitRepository) ListPackageRevisions(ctx context.Context, filter repository.ListPackageRevisionFilter) ([]repository.PackageRevision, error) {
ctx, span := tracer.Start(ctx, "gitRepository::ListPackageRevisions", trace.WithAttributes())
defer span.End()

r.mutex.Lock()
defer r.mutex.Unlock()

Expand Down Expand Up @@ -324,7 +325,7 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit
continue
}
for _, p := range tagged {
if filter.Matches(p) {
if filter.Matches(ctx, p) {
result = append(result, p)
}
}
Expand All @@ -338,14 +339,14 @@ func (r *gitRepository) listPackageRevisions(ctx context.Context, filter reposit
return nil, err
}
for _, p := range mainpkgs {
if filter.Matches(p) {
if filter.Matches(ctx, p) {
result = append(result, p)
}
}
}

for _, p := range drafts {
if filter.Matches(p) {
if filter.Matches(ctx, p) {
result = append(result, p)
}
}
Expand Down Expand Up @@ -425,7 +426,7 @@ func (r *gitRepository) UpdatePackageRevision(ctx context.Context, old repositor

// Fetch lifecycle directly from the repository rather than from the gitPackageRevision. This makes
// sure we don't end up requesting the same lock twice.
lifecycle := r.getLifecycle(ctx, oldGitPackage)
lifecycle := r.getLifecycle(oldGitPackage)

return &gitPackageRevisionDraft{
parent: r,
Expand Down Expand Up @@ -628,7 +629,7 @@ func (r *gitRepository) loadPackageRevision(ctx context.Context, version, path s
} else {
revision = version[last+1:]
}
workspace, err = getPkgWorkspace(ctx, commit, krmPackage, ref)
workspace, err = getPkgWorkspace(commit, krmPackage, ref)
if err != nil {
return nil, kptfilev1.GitLock{}, err
}
Expand Down Expand Up @@ -667,7 +668,7 @@ func (r *gitRepository) discoverFinalizedPackages(ctx context.Context, ref *plum

var result []*gitPackageRevision
for _, krmPackage := range krmPackages.packages {
workspace, err := getPkgWorkspace(ctx, commit, krmPackage, ref)
workspace, err := getPkgWorkspace(commit, krmPackage, ref)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -814,7 +815,7 @@ func (r *gitRepository) loadTaggedPackages(ctx context.Context, tag *plumbing.Re
return nil, nil
}

workspaceName, err := getPkgWorkspace(ctx, commit, krmPackage, tag)
workspaceName, err := getPkgWorkspace(commit, krmPackage, tag)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1083,13 +1084,19 @@ func (r *gitRepository) createPackageDeleteCommit(ctx context.Context, branch pl
}

func (r *gitRepository) PushAndCleanup(ctx context.Context, ph *pushRefSpecBuilder) error {
ctx, span := tracer.Start(ctx, "gitRepository::PushAndCleanup", trace.WithAttributes())
defer span.End()

r.mutex.Lock()
defer r.mutex.Unlock()

return r.pushAndCleanup(ctx, ph)
}

func (r *gitRepository) pushAndCleanup(ctx context.Context, ph *pushRefSpecBuilder) error {
ctx, span := tracer.Start(ctx, "gitRepository::pushAndCleanup", trace.WithAttributes())
defer span.End()

specs, require, err := ph.BuildRefSpecs()
if err != nil {
return err
Expand Down Expand Up @@ -1229,7 +1236,7 @@ func (r *gitRepository) GetResources(hash plumbing.Hash) (map[string]string, err

// findLatestPackageCommit returns the latest commit from the history that pertains
// to the package given by the packagePath. If no commit is found, it will return nil.
func (r *gitRepository) findLatestPackageCommit(ctx context.Context, startCommit *object.Commit, packagePath string) (*object.Commit, error) {
func (r *gitRepository) findLatestPackageCommit(startCommit *object.Commit, packagePath string) (*object.Commit, error) {
var commit *object.Commit
err := r.packageHistoryIterator(startCommit, packagePath, func(c *object.Commit) error {
commit = c
Expand Down Expand Up @@ -1330,15 +1337,15 @@ func (r *gitRepository) storeTree(tree *object.Tree) (plumbing.Hash, error) {
}

func (r *gitRepository) GetLifecycle(ctx context.Context, pkgRev *gitPackageRevision) v1alpha1.PackageRevisionLifecycle {
ctx, span := tracer.Start(ctx, "GitRepository::GetLifecycle", trace.WithAttributes())
ctx, span := tracer.Start(ctx, "gitRepository::GetLifecycle", trace.WithAttributes())

Check failure on line 1340 in pkg/git/git.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to ctx (ineffassign)
defer span.End()
r.mutex.Lock()
defer r.mutex.Unlock()

return r.getLifecycle(ctx, pkgRev)
return r.getLifecycle(pkgRev)
}

func (r *gitRepository) getLifecycle(ctx context.Context, pkgRev *gitPackageRevision) v1alpha1.PackageRevisionLifecycle {
func (r *gitRepository) getLifecycle(pkgRev *gitPackageRevision) v1alpha1.PackageRevisionLifecycle {
switch ref := pkgRev.ref; {
case ref == nil:
return r.checkPublishedLifecycle(pkgRev)
Expand Down Expand Up @@ -1368,12 +1375,13 @@ func (r *gitRepository) checkPublishedLifecycle(pkgRev *gitPackageRevision) v1al
}

func (r *gitRepository) UpdateLifecycle(ctx context.Context, pkgRev *gitPackageRevision, newLifecycle v1alpha1.PackageRevisionLifecycle) error {
ctx, span := tracer.Start(ctx, "GitRepository::UpdateLifecycle", trace.WithAttributes())
ctx, span := tracer.Start(ctx, "gitRepository::UpdateLifecycle", trace.WithAttributes())
defer span.End()

r.mutex.Lock()
defer r.mutex.Unlock()

old := r.getLifecycle(ctx, pkgRev)
old := r.getLifecycle(pkgRev)
if !v1alpha1.LifecycleIsPublished(old) {
return fmt.Errorf("cannot update lifecycle for draft package revision")
}
Expand Down Expand Up @@ -1409,7 +1417,7 @@ func (r *gitRepository) UpdateLifecycle(ctx context.Context, pkgRev *gitPackageR
}

func (r *gitRepository) UpdateDraftResources(ctx context.Context, draft *gitPackageRevisionDraft, new *v1alpha1.PackageRevisionResources, change *v1alpha1.Task) error {
ctx, span := tracer.Start(ctx, "gitPackageDraft::UpdateResources", trace.WithAttributes())
ctx, span := tracer.Start(ctx, "gitRepository::UpdateResources", trace.WithAttributes())
defer span.End()
r.mutex.Lock()
defer r.mutex.Unlock()
Expand Down Expand Up @@ -1464,7 +1472,7 @@ func (r *gitRepository) UpdateDraftResources(ctx context.Context, draft *gitPack
}

func (r *gitRepository) ClosePackageRevisionDraft(ctx context.Context, prd repository.PackageRevisionDraft, version string) (repository.PackageRevision, error) {
ctx, span := tracer.Start(ctx, "GitRepository::UpdateLifecycle", trace.WithAttributes())
ctx, span := tracer.Start(ctx, "gitRepository::ClosePackageRevisionDraft", trace.WithAttributes())
defer span.End()

r.mutex.Lock()
Expand Down Expand Up @@ -1590,6 +1598,8 @@ func (r *gitRepository) doGitWithAuth(ctx context.Context, op func(transport.Aut
}

func (r *gitRepository) commitPackageToMain(ctx context.Context, d *gitPackageRevisionDraft) (commitHash, newPackageTreeHash plumbing.Hash, base *plumbing.Reference, err error) {
ctx, span := tracer.Start(ctx, "gitRepository::commitPackageToMain", trace.WithAttributes())
defer span.End()
branch := r.branch
localRef := branch.RefInLocal()

Expand Down Expand Up @@ -1713,10 +1723,10 @@ func reverseSlice(s []v1alpha1.Task) {
}
}

func getPkgWorkspace(ctx context.Context, commit *object.Commit, p *packageListEntry, ref *plumbing.Reference) (v1alpha1.WorkspaceName, error) {
func getPkgWorkspace(commit *object.Commit, p *packageListEntry, ref *plumbing.Reference) (v1alpha1.WorkspaceName, error) {
if ref == nil || (!isTagInLocalRepo(ref.Name()) && !isDraftBranchNameInLocal(ref.Name()) && !isProposedBranchNameInLocal(ref.Name())) {
// packages on the main branch may have unrelated commits, we need to find the latest commit relevant to this package
c, err := p.parent.parent.findLatestPackageCommit(ctx, p.parent.commit, p.path)
c, err := p.parent.parent.findLatestPackageCommit(p.parent.commit, p.path)
if err != nil {
return "", err
}
Expand Down
Loading

0 comments on commit df98229

Please sign in to comment.