From 2f628688ae13b7f1d39fe07de7edf05aee91a364 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Wed, 8 Nov 2023 10:52:12 +0100 Subject: [PATCH] Rework accmeth model to live without backing repository (#566) * method to blobaccess * fix localblob refs to cv * work * disconnect for local acc meth implementations * rework final update on close * cleanup blob interface + prepare commit * fix close error in transfer --- Makefile | 1 + cmds/demoplugin/uploaders/writer.go | 6 +- pkg/blobaccess/bpi/utils.go | 18 +- pkg/blobaccess/cached.go | 35 ++++ pkg/blobaccess/digest.go | 23 +++ pkg/blobaccess/standard.go | 4 + pkg/common/accessio/cache.go | 5 +- pkg/common/accessio/digestreader.go | 113 +++--------- pkg/common/accessio/digestwriter.go | 48 ++---- pkg/common/accessio/wrapper.go | 3 +- pkg/contexts/ocm/accessmethods/npm/method.go | 3 +- .../ocm/accessmethods/npm/method_test.go | 4 +- .../ocm/accessmethods/ociartifact/method.go | 162 ++++++++++-------- .../ocm/accessmethods/ociblob/method.go | 6 +- .../ocm/accessmethods/plugin/method.go | 5 +- .../handlers/generic/ocirepo/blobhandler.go | 2 +- .../handlers/generic/ocirepo/upload_test.go | 31 ++++ .../ocm/blobhandler/handlers/oci/ctx.go | 15 +- .../handlers/oci/ocirepo/blobhandler.go | 12 +- .../ocm/blobhandler/handlers/ocm/ctx.go | 4 +- pkg/contexts/ocm/cpi/accspeccpi/method.go | 7 +- pkg/contexts/ocm/cpi/accspeccpi/methodview.go | 17 +- pkg/contexts/ocm/cpi/interface.go | 1 - pkg/contexts/ocm/cpi/storagectx.go | 10 +- .../ocm/cpi/support/compversaccess.go | 18 +- pkg/contexts/ocm/cpi/support/container.go | 9 +- pkg/contexts/ocm/cpi/view.go | 114 ++++++------ pkg/contexts/ocm/cpi/view_rsc.go | 7 +- .../ocm/download/handlers/plugin/handler.go | 8 +- .../artifactblob/genericblob/resource.go | 9 +- pkg/contexts/ocm/interface.go | 8 + pkg/contexts/ocm/internal/accesstypes.go | 12 +- pkg/contexts/ocm/internal/blobhandler.go | 2 +- pkg/contexts/ocm/internal/modopts.go | 4 +- pkg/contexts/ocm/plugin/utils.go | 3 +- .../comparch/accessmethod_localfs.go | 66 ++++--- .../comparch/accessmethod_test.go | 26 ++- .../repositories/comparch/componentarchive.go | 25 +-- .../ocm/repositories/comparch/repository.go | 20 ++- .../repositories/composition/close_test.go | 153 +++++++++++++++++ .../repositories/composition/repository.go | 2 + .../repositories/composition/version_test.go | 6 +- .../genericocireg/accessmethod_localblob.go | 21 ++- .../accessmethod_localoclblob.go | 11 +- .../genericocireg/componentversion.go | 25 ++- .../repositories/genericocireg/repo_test.go | 1 - .../ocm/repositories/genericocireg/view.go | 23 --- .../virtual/accessmethod_localblob.go | 54 ++---- .../repositories/virtual/componentversion.go | 18 +- .../repositories/virtual/example/example.go | 2 +- pkg/contexts/ocm/signing/handle.go | 34 +++- pkg/contexts/ocm/signing/signing_test.go | 2 +- pkg/contexts/ocm/testhelper/refmgmt.go | 16 ++ pkg/contexts/ocm/transfer/transfer.go | 14 +- pkg/env/builder/ocm_ctf.go | 2 +- pkg/finalizer/finalizer.go | 11 ++ pkg/iotools/digestreader.go | 104 +++++++++++ pkg/iotools/digestwriter.go | 52 ++++++ pkg/iotools/utils.go | 14 ++ pkg/refmgmt/refcloser.go | 28 +-- pkg/refmgmt/refmgmt.go | 32 +++- pkg/refmgmt/resource/resource.go | 19 ++ 62 files changed, 1008 insertions(+), 502 deletions(-) create mode 100644 pkg/blobaccess/cached.go create mode 100644 pkg/blobaccess/digest.go create mode 100644 pkg/contexts/ocm/repositories/composition/close_test.go delete mode 100644 pkg/contexts/ocm/repositories/genericocireg/view.go create mode 100644 pkg/contexts/ocm/testhelper/refmgmt.go create mode 100644 pkg/iotools/digestreader.go create mode 100644 pkg/iotools/digestwriter.go diff --git a/Makefile b/Makefile index 832e694a41..c946d97875 100644 --- a/Makefile +++ b/Makefile @@ -35,6 +35,7 @@ COMPONENTS ?= ocmcli helminstaller demoplugin ecrplugin helmdemo subchartsdemo build: ${SOURCES} mkdir -p bin go build ./pkg/... + go build ./examples/... CGO_ENABLED=0 go build -ldflags $(BUILD_FLAGS) -o bin/ocm ./cmds/ocm CGO_ENABLED=0 go build -ldflags $(BUILD_FLAGS) -o bin/helminstaller ./cmds/helminstaller CGO_ENABLED=0 go build -ldflags $(BUILD_FLAGS) -o bin/demo ./cmds/demoplugin diff --git a/cmds/demoplugin/uploaders/writer.go b/cmds/demoplugin/uploaders/writer.go index 929e0dcc34..ef4f306ac4 100644 --- a/cmds/demoplugin/uploaders/writer.go +++ b/cmds/demoplugin/uploaders/writer.go @@ -11,13 +11,13 @@ import ( "github.com/open-component-model/ocm/cmds/demoplugin/accessmethods" "github.com/open-component-model/ocm/pkg/common" - "github.com/open-component-model/ocm/pkg/common/accessio" "github.com/open-component-model/ocm/pkg/contexts/ocm/plugin/ppi" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/iotools" "github.com/open-component-model/ocm/pkg/runtime" ) -type writer = accessio.DigestWriter +type writer = iotools.DigestWriter type Writer struct { *writer @@ -32,7 +32,7 @@ type Writer struct { func NewWriter(file *os.File, path string, media string, rename bool, name, version string) *Writer { return &Writer{ - writer: accessio.NewDefaultDigestWriter(file), + writer: iotools.NewDefaultDigestWriter(file), file: file, path: path, rename: rename, diff --git a/pkg/blobaccess/bpi/utils.go b/pkg/blobaccess/bpi/utils.go index 3d11af580f..2589c17791 100644 --- a/pkg/blobaccess/bpi/utils.go +++ b/pkg/blobaccess/bpi/utils.go @@ -179,6 +179,14 @@ func BaseAccessForDataAccessAndMeta(mime string, acc DataAccess, dig digest.Dige //////////////////////////////////////////////////////////////////////////////// +// StaticBlobAccess is a BlobAccess which does not +// require finalization, therefore it can be used +// as BlobAccessProvider, also. +type StaticBlobAccess interface { + BlobAccess + BlobAccessProvider +} + type staticBlobAccess struct { blobAccess } @@ -187,6 +195,10 @@ func (s *staticBlobAccess) Dup() (BlobAccess, error) { return s, nil } +func (s *staticBlobAccess) BlobAccess() (BlobAccess, error) { + return s, nil +} + func (s *staticBlobAccess) Close() error { return nil } @@ -194,14 +206,14 @@ func (s *staticBlobAccess) Close() error { // ForStaticDataAccess is used for a data access using no closer. // They don't require a finalization and can be used // as long as they exist. Therefore, no ref counting -// is required. -func ForStaticDataAccess(mime string, acc DataAccess) BlobAccess { +// is required and they can be used as BlobAccessProvider, also. +func ForStaticDataAccess(mime string, acc DataAccess) StaticBlobAccess { return &staticBlobAccess{ blobAccess: blobAccess{mimeType: mime, _dataAccess: acc, digest: BLOB_UNKNOWN_DIGEST, size: BLOB_UNKNOWN_SIZE}, } } -func ForStaticDataAccessAndMeta(mime string, acc DataAccess, dig digest.Digest, size int64) BlobAccess { +func ForStaticDataAccessAndMeta(mime string, acc DataAccess, dig digest.Digest, size int64) StaticBlobAccess { return &staticBlobAccess{ blobAccess: blobAccess{mimeType: mime, _dataAccess: acc, digest: dig, size: size}, } diff --git a/pkg/blobaccess/cached.go b/pkg/blobaccess/cached.go new file mode 100644 index 0000000000..7290a051c3 --- /dev/null +++ b/pkg/blobaccess/cached.go @@ -0,0 +1,35 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors. +// +// SPDX-License-Identifier: Apache-2.0 + +package blobaccess + +import ( + "io" + + "github.com/mandelsoft/vfs/pkg/vfs" + + "github.com/open-component-model/ocm/pkg/utils" +) + +func ForCachedBlobAccess(blob BlobAccess, fss ...vfs.FileSystem) (BlobAccess, error) { + fs := utils.FileSystem(fss...) + + r, err := blob.Reader() + if err != nil { + return nil, err + } + defer r.Close() + + file, err := vfs.TempFile(fs, "", "cachedBlob*") + if err != nil { + return nil, err + } + _, err = io.Copy(file, r) + if err != nil { + return nil, err + } + file.Close() + + return ForTemporaryFilePath(blob.MimeType(), file.Name(), fs), nil +} diff --git a/pkg/blobaccess/digest.go b/pkg/blobaccess/digest.go new file mode 100644 index 0000000000..acefe02a70 --- /dev/null +++ b/pkg/blobaccess/digest.go @@ -0,0 +1,23 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors. +// +// SPDX-License-Identifier: Apache-2.0 + +package blobaccess + +import ( + "github.com/opencontainers/go-digest" +) + +func Digest(access DataAccess) (digest.Digest, error) { + reader, err := access.Reader() + if err != nil { + return "", err + } + defer reader.Close() + + dig, err := digest.FromReader(reader) + if err != nil { + return "", err + } + return dig, nil +} diff --git a/pkg/blobaccess/standard.go b/pkg/blobaccess/standard.go index c8ec70c2ba..857d6e5606 100644 --- a/pkg/blobaccess/standard.go +++ b/pkg/blobaccess/standard.go @@ -220,6 +220,10 @@ type annotatedBlobAccessView[T DataAccess] struct { annotation T } +func (a *annotatedBlobAccessView[T]) Close() error { + return a._blobAccess.Close() +} + func (a *annotatedBlobAccessView[T]) Dup() (BlobAccess, error) { b, err := a._blobAccess.Dup() if err != nil { diff --git a/pkg/common/accessio/cache.go b/pkg/common/accessio/cache.go index 6d5ab2f46d..c71c147c8b 100644 --- a/pkg/common/accessio/cache.go +++ b/pkg/common/accessio/cache.go @@ -21,6 +21,7 @@ import ( "github.com/open-component-model/ocm/pkg/blobaccess" "github.com/open-component-model/ocm/pkg/common" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/iotools" "github.com/open-component-model/ocm/pkg/refmgmt" ) @@ -209,7 +210,7 @@ func (c *blobCache) AddBlob(blob blobaccess.BlobAccess) (int64, digest.Digest, e } defer c.Unref() - var digester *DigestReader + var digester *iotools.DigestReader if blob.DigestKnown() { c.lock.RLock() @@ -231,7 +232,7 @@ func (c *blobCache) AddBlob(blob blobaccess.BlobAccess) (int64, digest.Digest, e reader := io.Reader(br) if !blob.DigestKnown() { - digester = NewDefaultDigestReader(reader) + digester = iotools.NewDefaultDigestReader(reader) reader = digester } diff --git a/pkg/common/accessio/digestreader.go b/pkg/common/accessio/digestreader.go index db8b437e54..05b94e4687 100644 --- a/pkg/common/accessio/digestreader.go +++ b/pkg/common/accessio/digestreader.go @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2022 SAP SE or an SAP affiliate company and Open Component Model contributors. +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors. // // SPDX-License-Identifier: Apache-2.0 @@ -6,114 +6,43 @@ package accessio import ( "crypto" - "hash" "io" "github.com/opencontainers/go-digest" "github.com/open-component-model/ocm/pkg/blobaccess" - "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/iotools" ) -// wow. digest does support a map with supported digesters. Unfortunately this one does not -// contain all the crypto hashes AND this map is private NAD there is no function to add entries, -// so that it cannot be extended from outside the package. I love GO. -// Therefore, we have to fake it a little to support digests with other crypto hashes. +// Deprecated: use iotools.DigestReader. +type DigestReader = iotools.DigestReader -type DigestReader struct { - reader io.Reader - alg digest.Algorithm - hash hash.Hash - count int64 +// Deprecated: use iotools.NewDefaultDigestReader. +func NewDefaultDigestReader(r io.Reader) *iotools.DigestReader { + return iotools.NewDigestReaderWith(digest.Canonical, r) } -func (r *DigestReader) Size() int64 { - return r.count +// Deprecated: use iotools.NewDigestReaderWith. +func NewDigestReaderWith(algorithm digest.Algorithm, r io.Reader) *iotools.DigestReader { + return iotools.NewDigestReaderWith(algorithm, r) } -func (r *DigestReader) Digest() digest.Digest { - return digest.NewDigest(r.alg, r.hash) +// Deprecated: use iotools.NewDigestReaderWithHash. +func NewDigestReaderWithHash(hash crypto.Hash, r io.Reader) *iotools.DigestReader { + return iotools.NewDigestReaderWithHash(hash, r) } -func (r *DigestReader) Read(buf []byte) (int, error) { - c, err := r.reader.Read(buf) - if c > 0 { - r.count += int64(c) - r.hash.Write(buf[:c]) - } - return c, err -} - -func NewDefaultDigestReader(r io.Reader) *DigestReader { - return NewDigestReaderWith(digest.Canonical, r) -} - -func NewDigestReaderWith(algorithm digest.Algorithm, r io.Reader) *DigestReader { - digester := algorithm.Digester() - return &DigestReader{ - reader: r, - hash: digester.Hash(), - alg: algorithm, - count: 0, - } +// Deprecated: use iotools.VerifyingReader. +func VerifyingReader(r io.ReadCloser, digest digest.Digest) io.ReadCloser { + return iotools.VerifyingReader(r, digest) } -func NewDigestReaderWithHash(hash crypto.Hash, r io.Reader) *DigestReader { - return &DigestReader{ - reader: r, - hash: hash.New(), - alg: digest.Algorithm(hash.String()), // fake a non-supported digest algorithm - count: 0, - } +// Deprecated: use iotools.VerifyingReaderWithHash. +func VerifyingReaderWithHash(r io.ReadCloser, hash crypto.Hash, digest string) io.ReadCloser { + return iotools.VerifyingReaderWithHash(r, hash, digest) } +// Deprecated: use blobaccess.Digest. func Digest(access blobaccess.DataAccess) (digest.Digest, error) { - reader, err := access.Reader() - if err != nil { - return "", err - } - defer reader.Close() - - dig, err := digest.FromReader(reader) - if err != nil { - return "", err - } - return dig, nil -} - -type verifiedReader struct { - closer io.Closer - *DigestReader - hash string - digest string -} - -func (v *verifiedReader) Close() error { - err := v.closer.Close() - if err != nil { - return err - } - dig := v.DigestReader.Digest() - if dig.Hex() != v.digest { - return errors.Newf("%s digest mismatch: expected %s, found %s", v.hash, v.digest, dig.Hex()) - } - return nil -} - -func VerifyingReader(r io.ReadCloser, digest digest.Digest) io.ReadCloser { - return &verifiedReader{ - closer: r, - DigestReader: NewDigestReaderWith(digest.Algorithm(), r), - hash: digest.Algorithm().String(), - digest: digest.Hex(), - } -} - -func VerifyingReaderWithHash(r io.ReadCloser, hash crypto.Hash, digest string) io.ReadCloser { - return &verifiedReader{ - closer: r, - DigestReader: NewDigestReaderWithHash(hash, r), - hash: hash.String(), - digest: digest, - } + return blobaccess.Digest(access) } diff --git a/pkg/common/accessio/digestwriter.go b/pkg/common/accessio/digestwriter.go index 0759cd147f..aebd4ca9ab 100644 --- a/pkg/common/accessio/digestwriter.go +++ b/pkg/common/accessio/digestwriter.go @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2022 SAP SE or an SAP affiliate company and Open Component Model contributors. +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors. // // SPDX-License-Identifier: Apache-2.0 @@ -8,45 +8,19 @@ import ( "io" "github.com/opencontainers/go-digest" -) - -type writer io.WriteCloser -type DigestWriter struct { - writer - digester digest.Digester - count int64 -} - -func (r *DigestWriter) Size() int64 { - return r.count -} - -func (r *DigestWriter) Digest() digest.Digest { - return r.digester.Digest() -} + "github.com/open-component-model/ocm/pkg/iotools" +) -func (r *DigestWriter) Write(buf []byte) (int, error) { - c, err := r.writer.Write(buf) - if c > 0 { - r.count += int64(c) - r.digester.Hash().Write(buf[:c]) - } - return c, err -} +// Deprecated: use iotools.DigestWriter. +type DigestWriter = iotools.DigestWriter -func NewDefaultDigestWriter(w io.WriteCloser) *DigestWriter { - return &DigestWriter{ - writer: w, - digester: digest.Canonical.Digester(), - count: 0, - } +// Deprecated: use iotools.NewDefaultDigestWriter. +func NewDefaultDigestWriter(w io.WriteCloser) *iotools.DigestWriter { + return iotools.NewDefaultDigestWriter(w) } -func NewDigestWriterWith(algorithm digest.Algorithm, w io.WriteCloser) *DigestWriter { - return &DigestWriter{ - writer: w, - digester: algorithm.Digester(), - count: 0, - } +// Deprecated: use iotools.NewDigestWriterWith. +func NewDigestWriterWith(algorithm digest.Algorithm, w io.WriteCloser) *iotools.DigestWriter { + return iotools.NewDigestWriterWith(algorithm, w) } diff --git a/pkg/common/accessio/wrapper.go b/pkg/common/accessio/wrapper.go index 67e1473511..23caa30b69 100644 --- a/pkg/common/accessio/wrapper.go +++ b/pkg/common/accessio/wrapper.go @@ -11,6 +11,7 @@ import ( "github.com/open-component-model/ocm/pkg/blobaccess" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/iotools" ) type Writer interface { @@ -34,7 +35,7 @@ func NewReaderWriter(r io.ReadCloser) DataWriter { func (d *readerWriter) WriteTo(w Writer) (size int64, dig digest.Digest, err error) { defer errors.PropagateError(&err, d.reader.Close) - dr := NewDefaultDigestReader(d.reader) + dr := iotools.NewDefaultDigestReader(d.reader) _, err = io.Copy(w, dr) if err != nil { return BLOB_UNKNOWN_SIZE, BLOB_UNKNOWN_DIGEST, err diff --git a/pkg/contexts/ocm/accessmethods/npm/method.go b/pkg/contexts/ocm/accessmethods/npm/method.go index 3f0b836d46..d4f01f2250 100644 --- a/pkg/contexts/ocm/accessmethods/npm/method.go +++ b/pkg/contexts/ocm/accessmethods/npm/method.go @@ -23,6 +23,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/datacontext/attrs/vfsattr" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/iotools" "github.com/open-component-model/ocm/pkg/mime" "github.com/open-component-model/ocm/pkg/runtime" ) @@ -138,7 +139,7 @@ func newMethod(c accspeccpi.ComponentVersionAccess, a *AccessSpec) (accspeccpi.A if err != nil { return nil, err } - return accessio.VerifyingReaderWithHash(r, crypto.SHA1, meta.Dist.Shasum), nil + return iotools.VerifyingReaderWithHash(r, crypto.SHA1, meta.Dist.Shasum), nil } } acc := blobaccess.DataAccessForReaderFunction(f, meta.Dist.Tarball) diff --git a/pkg/contexts/ocm/accessmethods/npm/method_test.go b/pkg/contexts/ocm/accessmethods/npm/method_test.go index 6ffa536b17..d15e985cb9 100644 --- a/pkg/contexts/ocm/accessmethods/npm/method_test.go +++ b/pkg/contexts/ocm/accessmethods/npm/method_test.go @@ -13,10 +13,10 @@ import ( . "github.com/open-component-model/ocm/pkg/env/builder" . "github.com/open-component-model/ocm/pkg/testutils" - "github.com/open-component-model/ocm/pkg/common/accessio" "github.com/open-component-model/ocm/pkg/contexts/ocm" "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/npm" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/iotools" "github.com/open-component-model/ocm/pkg/mime" ) @@ -46,7 +46,7 @@ var _ = Describe("Method", func() { r := Must(m.Reader()) defer r.Close() - dr := accessio.NewDigestReaderWithHash(crypto.SHA1, r) + dr := iotools.NewDigestReaderWithHash(crypto.SHA1, r) for { var buf [8096]byte _, err := dr.Read(buf[:]) diff --git a/pkg/contexts/ocm/accessmethods/ociartifact/method.go b/pkg/contexts/ocm/accessmethods/ociartifact/method.go index 4d3547c5d4..d669c4a034 100644 --- a/pkg/contexts/ocm/accessmethods/ociartifact/method.go +++ b/pkg/contexts/ocm/accessmethods/ociartifact/method.go @@ -24,6 +24,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/oci/repositories/ocireg" ocmcpi "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" + "github.com/open-component-model/ocm/pkg/errors" "github.com/open-component-model/ocm/pkg/logging" "github.com/open-component-model/ocm/pkg/runtime" "github.com/open-component-model/ocm/pkg/utils" @@ -144,15 +145,19 @@ type accessMethod struct { lock sync.Mutex ctx accspeccpi.Context spec accspeccpi.AccessSpec - relto oci.Repository reference string finalizer Finalizer err error - repo oci.Repository - art oci.ArtifactAccess - ref *oci.RefSpec - blob artifactset.ArtifactBlob + + id credentials.ConsumerIdentity + ref *oci.RefSpec + mime string + digest digest.Digest + art oci.ArtifactAccess + + repo oci.Repository + blob artifactset.ArtifactBlob } var ( @@ -163,12 +168,12 @@ var ( ) func NewMethod(ctx accspeccpi.ContextProvider, a accspeccpi.AccessSpec, ref string, repo ...oci.Repository) (accspeccpi.AccessMethod, error) { - return accspeccpi.AccessMethodForImplementation(&accessMethod{ + m := &accessMethod{ spec: a, reference: ref, - relto: utils.Optional(repo...), ctx: ctx.OCMContext(), - }, nil) + } + return accspeccpi.AccessMethodForImplementation(m, m.eval(utils.Optional(repo...))) } func (_ *accessMethod) IsLocal() bool { @@ -187,20 +192,47 @@ func (m *accessMethod) AccessSpec() accspeccpi.AccessSpec { return m.spec } +func (m *accessMethod) Cache() { + m.lock.Lock() + ref := m.ref + m.lock.Unlock() + if ref == nil { + return + } + logger := Logger(WrapContextProvider(m.ctx)) + logger.Info("cache artifact blob", "ref", m.reference) + + _, m.err = m.getBlob() + + m.finalizer.Finalize() +} + func (m *accessMethod) Close() error { m.lock.Lock() defer m.lock.Unlock() + + list := errors.ErrorList{} + + if m.blob != nil { + list.Add(m.blob.Close()) + } m.blob = nil m.art = nil - m.relto = nil - return m.finalizer.Finalize() + m.ref = nil + list.Add(m.finalizer.Finalize()) + return list.Result() } -func (m *accessMethod) eval() (oci.Repository, *oci.RefSpec, error) { - if m.relto == nil { - ref, err := oci.ParseRef(m.reference) +func (m *accessMethod) eval(relto oci.Repository) error { + var ( + err error + ref oci.RefSpec + ) + + if relto == nil { + ref, err = oci.ParseRef(m.reference) if err != nil { - return nil, nil, err + return err } ocictx := m.ctx.OCIContext() spec := ocictx.GetAlias(ref.Host) @@ -208,60 +240,67 @@ func (m *accessMethod) eval() (oci.Repository, *oci.RefSpec, error) { spec = ocireg.NewRepositorySpec(ref.Host) } repo, err := ocictx.RepositoryForSpec(spec) + if err != nil { + return err + } m.finalizer.Close(repo, "repository for accessing %s", m.reference) m.repo = repo - return repo, &ref, err + } else { + repo, err := relto.Dup() + if err != nil { + return err + } + m.finalizer.Close(repo) + art, err := oci.ParseArt(m.reference) + if err != nil { + return err + } + ref = oci.RefSpec{ + UniformRepositorySpec: *repo.GetSpecification().UniformRepositorySpec(), + ArtSpec: art, + } + m.repo = repo } - art, err := oci.ParseArt(m.reference) - if err != nil { - return nil, nil, err - } - ref := oci.RefSpec{ - UniformRepositorySpec: *m.relto.GetSpecification().UniformRepositorySpec(), - ArtSpec: art, - } - m.repo = m.relto - return m.repo, &ref, err + m.ref = &ref + m.id = credentials.GetProvidedConsumerId(m.repo, credentials.StringUsageContext(ref.Repository)) + return nil } func (m *accessMethod) GetArtifact() (oci.ArtifactAccess, *oci.RefSpec, error) { m.lock.Lock() defer m.lock.Unlock() - art, ref, err := m.getArtifact() + err := m.getArtifact() if err != nil { return nil, nil, m.err } - a, err := art.Dup() - if err != nil { - return nil, nil, err - } - return a, ref, err -} - -func (m *accessMethod) getArtifact() (oci.ArtifactAccess, *oci.RefSpec, error) { - if m.art == nil && m.err == nil { - repo, ref, err := m.eval() + art := m.art + if art != nil { + art, err = art.Dup() if err != nil { return nil, nil, err } - art, err := repo.LookupArtifact(ref.Repository, ref.Version()) + } + return art, m.ref, err +} + +func (m *accessMethod) getArtifact() error { + if m.art == nil && m.err == nil && m.ref != nil { + art, err := m.repo.LookupArtifact(m.ref.Repository, m.ref.Version()) m.finalizer.Close(art, "artifact for accessing %s", m.reference) - m.art, m.ref, m.err = art, ref, err + m.art, m.err = art, err + m.mime = artdesc.ToContentMediaType(m.art.GetDescriptor().MimeType()) + artifactset.SynthesizedBlobFormat + m.digest = art.Digest() } - return m.art, m.ref, m.err + return m.err } func (m *accessMethod) GetConsumerId(uctx ...credentials.UsageContext) credentials.ConsumerIdentity { m.lock.Lock() defer m.lock.Unlock() - repo, ref, err := m.eval() - if err != nil { - return nil - } - return credentials.GetProvidedConsumerId(repo, credentials.StringUsageContext(ref.Repository)) + return m.id } func (m *accessMethod) GetIdentityMatcher() string { @@ -277,16 +316,8 @@ func (m *accessMethod) GetDigest() (digest.Digest, error) { m.lock.Lock() defer m.lock.Unlock() - art, _, err := m.getArtifact() - if err == nil { - m.art = art - blob, err := art.Blob() - if err == nil { - return blob.Digest(), nil - } - m.finalizer.Close(blob) - } - return "", err + err := m.getArtifact() + return m.digest, err } func (m *accessMethod) Get() ([]byte, error) { @@ -306,40 +337,37 @@ func (m *accessMethod) Reader() (io.ReadCloser, error) { if err != nil { return nil, err } - // return accessio.AddCloser(r, b, "synthesized artifact"), nil return r, nil } func (m *accessMethod) MimeType() string { - m.lock.Lock() - defer m.lock.Unlock() - - art, _, err := m.getArtifact() - if err != nil { - return "" + if m.mime == "" { + m.lock.Lock() + defer m.lock.Unlock() + m.getArtifact() } - return artdesc.ToContentMediaType(art.GetDescriptor().MimeType()) + artifactset.SynthesizedBlobFormat + return m.mime } func (m *accessMethod) getBlob() (artifactset.ArtifactBlob, error) { m.lock.Lock() defer m.lock.Unlock() - if m.blob != nil { - return m.blob, nil + if m.blob != nil || m.err != nil { + return m.blob, m.err } - art, ref, err := m.getArtifact() + err := m.getArtifact() if err != nil { return nil, err } logger := Logger(WrapContextProvider(m.ctx)) logger.Info("synthesize artifact blob", "ref", m.reference) - m.blob, err = artifactset.SynthesizeArtifactBlobForArtifact(art, ref.Version()) + m.blob, err = artifactset.SynthesizeArtifactBlobForArtifact(m.art, m.ref.Version()) logger.Info("synthesize artifact blob done", "ref", m.reference, "error", logging.ErrorMessage(err)) if err != nil { + m.err = err return nil, err } - m.finalizer.Close(m.blob) return m.blob, nil } diff --git a/pkg/contexts/ocm/accessmethods/ociblob/method.go b/pkg/contexts/ocm/accessmethods/ociblob/method.go index a5cff5949a..97f8e1222b 100644 --- a/pkg/contexts/ocm/accessmethods/ociblob/method.go +++ b/pkg/contexts/ocm/accessmethods/ociblob/method.go @@ -112,13 +112,15 @@ func (m *accessMethod) AccessSpec() accspeccpi.AccessSpec { } func (m *accessMethod) Close() error { + var err error m.lock.Lock() defer m.lock.Unlock() + if m.blob != nil { - m.blob.Close() + err = m.blob.Close() m.blob = nil } - return nil + return err } func (m *accessMethod) Get() ([]byte, error) { diff --git a/pkg/contexts/ocm/accessmethods/plugin/method.go b/pkg/contexts/ocm/accessmethods/plugin/method.go index cf12070983..be5afdf70b 100644 --- a/pkg/contexts/ocm/accessmethods/plugin/method.go +++ b/pkg/contexts/ocm/accessmethods/plugin/method.go @@ -101,13 +101,14 @@ func (m *accessMethod) AccessSpec() cpi.AccessSpec { } func (m *accessMethod) Close() error { + var err error m.lock.Lock() defer m.lock.Unlock() if m.blob != nil { - m.blob.Close() + err = m.blob.Close() m.blob = nil } - return nil + return err } func (m *accessMethod) Get() ([]byte, error) { diff --git a/pkg/contexts/ocm/blobhandler/handlers/generic/ocirepo/blobhandler.go b/pkg/contexts/ocm/blobhandler/handlers/generic/ocirepo/blobhandler.go index 05cc41c529..ca073b5987 100644 --- a/pkg/contexts/ocm/blobhandler/handlers/generic/ocirepo/blobhandler.go +++ b/pkg/contexts/ocm/blobhandler/handlers/generic/ocirepo/blobhandler.go @@ -86,7 +86,7 @@ func (b *artifactHandler) StoreBlob(blob cpi.BlobAccess, artType, hint string, g var tag string if hint == "" { - name = path.Join(prefix, ctx.TargetComponentVersion().GetName()) + name = path.Join(prefix, ctx.TargetComponentName()) } else { i := strings.LastIndex(hint, ":") if i > 0 { diff --git a/pkg/contexts/ocm/blobhandler/handlers/generic/ocirepo/upload_test.go b/pkg/contexts/ocm/blobhandler/handlers/generic/ocirepo/upload_test.go index b48e9350b7..730953e857 100644 --- a/pkg/contexts/ocm/blobhandler/handlers/generic/ocirepo/upload_test.go +++ b/pkg/contexts/ocm/blobhandler/handlers/generic/ocirepo/upload_test.go @@ -81,6 +81,37 @@ var _ = Describe("upload", func() { env.Cleanup() }) + It("validated original oci manifest", func() { + ctx := env.OCMContext() + + ocirepo := Must(ctfoci.Open(ctx, accessobj.ACC_READONLY, OCIPATH, 0700, env)) + defer Close(ocirepo, "ocoirepo") + + ns := Must(ocirepo.LookupNamespace(OCINAMESPACE)) + defer Close(ns, "namespace") + + art := Must(ns.GetArtifact(OCIVERSION)) + defer Close(art, "artifact") + + Expect(art.Digest().Encoded()).To(Equal(D_OCIMANIFEST1)) + }) + + It("validated original digest", func() { + ctx := env.OCMContext() + + ctf := Must(ctfocm.Open(ctx, accessobj.ACC_READONLY, CTF, 0700, env)) + defer Close(ctf, "ctf") + + cv := Must(ctf.LookupComponentVersion(COMP, VERS)) + defer Close(cv, "component version") + + ra := Must(cv.GetResourceByIndex(0)) + acc := Must(ra.Access()) + Expect(acc.GetKind()).To(Equal(localblob.Type)) + + Expect(ra.Meta().Digest).To(Equal(DS_OCIMANIFEST1)) + }) + It("transfers oci artifact", func() { ctx := env.OCMContext() diff --git a/pkg/contexts/ocm/blobhandler/handlers/oci/ctx.go b/pkg/contexts/ocm/blobhandler/handlers/oci/ctx.go index fd37e14040..1db19aa926 100644 --- a/pkg/contexts/ocm/blobhandler/handlers/oci/ctx.go +++ b/pkg/contexts/ocm/blobhandler/handlers/oci/ctx.go @@ -27,11 +27,11 @@ type StorageContext struct { var _ ocmcpi.StorageContext = (*StorageContext)(nil) -func New(vers ocmcpi.ComponentVersionAccess, impltyp string, ocirepo oci.Repository, namespace oci.NamespaceAccess, manifest oci.ManifestAccess) *StorageContext { +func New(compname string, repo ocmcpi.Repository, impltyp string, ocirepo oci.Repository, namespace oci.NamespaceAccess, manifest oci.ManifestAccess) *StorageContext { return &StorageContext{ DefaultStorageContext: *ocmcpi.NewDefaultStorageContext( - vers.Repository(), - vers, + repo, + compname, ocmcpi.ImplementationRepositoryType{ ContextType: cpi.CONTEXT_TYPE, RepositoryType: impltyp, @@ -47,13 +47,16 @@ func (s *StorageContext) TargetComponentRepository() ocmcpi.Repository { return s.ComponentRepository } -func (s *StorageContext) TargetComponentVersion() ocmcpi.ComponentVersionAccess { - return s.ComponentVersion +func (s *StorageContext) TargetComponentName() string { + return s.ComponentName } func (s *StorageContext) AssureLayer(blob cpi.BlobAccess) error { + return AssureLayer(s.Manifest.GetDescriptor(), blob) +} + +func AssureLayer(desc *artdesc.Manifest, blob cpi.BlobAccess) error { d := artdesc.DefaultBlobDescriptor(blob) - desc := s.Manifest.GetDescriptor() found := -1 for i, l := range desc.Layers { diff --git a/pkg/contexts/ocm/blobhandler/handlers/oci/ocirepo/blobhandler.go b/pkg/contexts/ocm/blobhandler/handlers/oci/ocirepo/blobhandler.go index 30503a2324..f8d97b33d7 100644 --- a/pkg/contexts/ocm/blobhandler/handlers/oci/ocirepo/blobhandler.go +++ b/pkg/contexts/ocm/blobhandler/handlers/oci/ocirepo/blobhandler.go @@ -43,9 +43,11 @@ func init() { cpi.RegisterBlobHandler(NewArtifactHandler(OCIRegBaseFunction), cpi.ForRepo(oci.CONTEXT_TYPE, ocireg.ShortType), cpi.ForMimeType(mime)) } - cpi.RegisterBlobHandler(NewBlobHandler(OCIRegBaseFunction), cpi.ForRepo(oci.CONTEXT_TYPE, ocireg.Type)) - cpi.RegisterBlobHandler(NewBlobHandler(OCIRegBaseFunction), cpi.ForRepo(oci.CONTEXT_TYPE, ocireg.LegacyType)) - cpi.RegisterBlobHandler(NewBlobHandler(OCIRegBaseFunction), cpi.ForRepo(oci.CONTEXT_TYPE, ocireg.ShortType)) + /* + cpi.RegisterBlobHandler(NewBlobHandler(OCIRegBaseFunction), cpi.ForRepo(oci.CONTEXT_TYPE, ocireg.Type)) + cpi.RegisterBlobHandler(NewBlobHandler(OCIRegBaseFunction), cpi.ForRepo(oci.CONTEXT_TYPE, ocireg.LegacyType)) + cpi.RegisterBlobHandler(NewBlobHandler(OCIRegBaseFunction), cpi.ForRepo(oci.CONTEXT_TYPE, ocireg.ShortType)) + */ } //////////////////////////////////////////////////////////////////////////////// @@ -161,7 +163,9 @@ func (b *artifactHandler) StoreBlob(blob cpi.BlobAccess, artType, hint string, g if err != nil { return nil, errors.Wrapf(err, "cannot access source artifact") } - defer art.Close() + if art != nil { + defer art.Close() + } } } else { log.Debug("oci artifact handler", values...) diff --git a/pkg/contexts/ocm/blobhandler/handlers/ocm/ctx.go b/pkg/contexts/ocm/blobhandler/handlers/ocm/ctx.go index e40d1528f2..8b9e211f5d 100644 --- a/pkg/contexts/ocm/blobhandler/handlers/ocm/ctx.go +++ b/pkg/contexts/ocm/blobhandler/handlers/ocm/ctx.go @@ -27,9 +27,9 @@ type DefaultStorageContext struct { Payload interface{} } -func New(repo cpi.Repository, vers cpi.ComponentVersionAccess, access BlobSink, impltyp string, payload ...interface{}) StorageContext { +func New(repo cpi.Repository, compname string, access BlobSink, impltyp string, payload ...interface{}) StorageContext { return &DefaultStorageContext{ - DefaultStorageContext: *cpi.NewDefaultStorageContext(repo, vers, cpi.ImplementationRepositoryType{cpi.CONTEXT_TYPE, impltyp}), + DefaultStorageContext: *cpi.NewDefaultStorageContext(repo, compname, cpi.ImplementationRepositoryType{cpi.CONTEXT_TYPE, impltyp}), Sink: access, Payload: utils.Optional(payload...), } diff --git a/pkg/contexts/ocm/cpi/accspeccpi/method.go b/pkg/contexts/ocm/cpi/accspeccpi/method.go index 11dd8b1750..9ed12fd139 100644 --- a/pkg/contexts/ocm/cpi/accspeccpi/method.go +++ b/pkg/contexts/ocm/cpi/accspeccpi/method.go @@ -117,12 +117,15 @@ func (m *DefaultAccessMethodImpl) Reader() (io.ReadCloser, error) { } func (m *DefaultAccessMethodImpl) Close() error { + var err error m.lock.Lock() defer m.lock.Unlock() + if m.blob != nil { - return m.blob.Close() + err = m.blob.Close() + m.blob = nil } - return nil + return err } func (m *DefaultAccessMethodImpl) MimeType() string { diff --git a/pkg/contexts/ocm/cpi/accspeccpi/methodview.go b/pkg/contexts/ocm/cpi/accspeccpi/methodview.go index f72f0c36a0..072773696b 100644 --- a/pkg/contexts/ocm/cpi/accspeccpi/methodview.go +++ b/pkg/contexts/ocm/cpi/accspeccpi/methodview.go @@ -7,6 +7,7 @@ package accspeccpi import ( "io" + "github.com/modern-go/reflect2" "github.com/opencontainers/go-digest" "github.com/open-component-model/ocm/pkg/blobaccess" @@ -19,9 +20,9 @@ type DigestSource interface { GetDigest() (digest.Digest, error) } -// AccessMethodView can be used map wrap an access method -// into a managed method with multiple views. The original method -// object is closed once the last view is closed. +// AccessMethodView provides access +// to the implementation object behind an +// access method. type AccessMethodView interface { utils.Unwrappable AccessMethod @@ -32,6 +33,9 @@ type AccessMethodView interface { // closed when the last view is closed. func AccessMethodForImplementation(acc AccessMethodImpl, err error) (AccessMethod, error) { if err != nil { + if !reflect2.IsNil(acc) { + acc.Close() + } return nil, err } return refmgmt.WithView[AccessMethodImpl, AccessMethod](acc, accessMethodViewCreator), err @@ -43,8 +47,7 @@ func BlobAccessForAccessSpec(spec AccessSpec, cv ComponentVersionAccess) (blobac if err != nil { return nil, err } - defer m.Close() - return BlobAccessForAccessMethod(m) + return m.AsBlobAccess(), nil } func accessMethodViewCreator(impl AccessMethodImpl, view *refmgmt.View[AccessMethod]) AccessMethod { @@ -65,6 +68,10 @@ func (a *accessMethodView) Unwrap() interface{} { return a.methodimpl } +func (a *accessMethodView) AsBlobAccess() blobaccess.BlobAccess { + return blobaccess.ForDataAccess("", -1, a.MimeType(), a) +} + func (a *accessMethodView) IsLocal() bool { return a.methodimpl.IsLocal() } diff --git a/pkg/contexts/ocm/cpi/interface.go b/pkg/contexts/ocm/cpi/interface.go index 313a5447c7..c17344ad3a 100644 --- a/pkg/contexts/ocm/cpi/interface.go +++ b/pkg/contexts/ocm/cpi/interface.go @@ -54,7 +54,6 @@ type ( AccessSpecDecoder = internal.AccessSpecDecoder GenericAccessSpec = internal.GenericAccessSpec AccessMethod = internal.AccessMethod - AccessMethodSupport = internal.AccessMethodSupport AccessProvider = internal.AccessProvider AccessTypeProvider = internal.AccessTypeProvider AccessTypeScheme = internal.AccessTypeScheme diff --git a/pkg/contexts/ocm/cpi/storagectx.go b/pkg/contexts/ocm/cpi/storagectx.go index 48eea45738..e03c46f0d0 100644 --- a/pkg/contexts/ocm/cpi/storagectx.go +++ b/pkg/contexts/ocm/cpi/storagectx.go @@ -6,16 +6,16 @@ package cpi type DefaultStorageContext struct { ComponentRepository Repository - ComponentVersion ComponentVersionAccess + ComponentName string ImplementationRepositoryType ImplementationRepositoryType } var _ StorageContext = (*DefaultStorageContext)(nil) -func NewDefaultStorageContext(repo Repository, vers ComponentVersionAccess, reptype ImplementationRepositoryType) *DefaultStorageContext { +func NewDefaultStorageContext(repo Repository, compname string, reptype ImplementationRepositoryType) *DefaultStorageContext { return &DefaultStorageContext{ ComponentRepository: repo, - ComponentVersion: vers, + ComponentName: compname, ImplementationRepositoryType: reptype, } } @@ -24,8 +24,8 @@ func (c *DefaultStorageContext) GetContext() Context { return c.ComponentRepository.GetContext() } -func (c *DefaultStorageContext) TargetComponentVersion() ComponentVersionAccess { - return c.ComponentVersion +func (c *DefaultStorageContext) TargetComponentName() string { + return c.ComponentName } func (c *DefaultStorageContext) TargetComponentRepository() Repository { diff --git a/pkg/contexts/ocm/cpi/support/compversaccess.go b/pkg/contexts/ocm/cpi/support/compversaccess.go index 4f88d512d3..962d6180e5 100644 --- a/pkg/contexts/ocm/cpi/support/compversaccess.go +++ b/pkg/contexts/ocm/cpi/support/compversaccess.go @@ -8,6 +8,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/ocm/compdesc" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/refmgmt" ) type _ComponentVersionAccessImplBase = cpi.ComponentVersionAccessImplBase @@ -66,6 +67,7 @@ func (a *componentVersionAccessImpl) EnablePersistence() bool { return false } a.persistent = true + a.GetStorageContext() return true } @@ -97,24 +99,24 @@ func (a *componentVersionAccessImpl) IsReadOnly() bool { //////////////////////////////////////////////////////////////////////////////// // with access to actual view -func (a *componentVersionAccessImpl) AccessMethod(cv cpi.ComponentVersionAccess, acc cpi.AccessSpec) (cpi.AccessMethod, error) { - return a.base.AccessMethod(acc) +func (a *componentVersionAccessImpl) AccessMethod(acc cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error) { + return a.base.AccessMethod(acc, cv) } -func (a *componentVersionAccessImpl) GetInexpensiveContentVersionIdentity(cv cpi.ComponentVersionAccess, acc cpi.AccessSpec) string { - return a.base.GetInexpensiveContentVersionIdentity(acc) +func (a *componentVersionAccessImpl) GetInexpensiveContentVersionIdentity(acc cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) string { + return a.base.GetInexpensiveContentVersionIdentity(acc, cv) } func (a *componentVersionAccessImpl) GetDescriptor() *compdesc.ComponentDescriptor { return a.base.GetDescriptor() } -func (a *componentVersionAccessImpl) GetStorageContext(cv cpi.ComponentVersionAccess) cpi.StorageContext { - return a.base.GetStorageContext(cv) +func (a *componentVersionAccessImpl) GetStorageContext() cpi.StorageContext { + return a.base.GetStorageContext() } -func (a *componentVersionAccessImpl) AddBlobFor(storagectx cpi.StorageContext, blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { - return a.base.AddBlobFor(storagectx, blob, refName, global) +func (a *componentVersionAccessImpl) AddBlobFor(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { + return a.base.AddBlobFor(blob, refName, global) } func (a *componentVersionAccessImpl) ShouldUpdate(final bool) bool { diff --git a/pkg/contexts/ocm/cpi/support/container.go b/pkg/contexts/ocm/cpi/support/container.go index d03a39151d..df9a1bc895 100644 --- a/pkg/contexts/ocm/cpi/support/container.go +++ b/pkg/contexts/ocm/cpi/support/container.go @@ -9,6 +9,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/ocm/compdesc" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/refmgmt" ) // BlobContainer is the interface for an element capable to store blobs. @@ -19,7 +20,7 @@ type BlobContainer interface { // that is used to feed blob handlers for specific blob storage methods. // If no handler accepts the blob, the AddBlobFor method will // be used to store the blob - GetStorageContext(cv cpi.ComponentVersionAccess) cpi.StorageContext + GetStorageContext() cpi.StorageContext // AddBlobFor stores a local blob together with the component and // potentially provides a global reference according to the OCI distribution spec @@ -27,7 +28,7 @@ type BlobContainer interface { // The resulting access information (global and local) is provided as // an access method specification usable in a component descriptor. // This is the direct technical storage, without caring about any handler. - AddBlobFor(storagectx cpi.StorageContext, blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) + AddBlobFor(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) } // ComponentVersionContainer is the interface of an element hosting a component version. @@ -44,8 +45,8 @@ type ComponentVersionContainer interface { GetDescriptor() *compdesc.ComponentDescriptor BlobContainer - AccessMethod(a cpi.AccessSpec) (cpi.AccessMethod, error) - GetInexpensiveContentVersionIdentity(a cpi.AccessSpec) string + AccessMethod(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error) + GetInexpensiveContentVersionIdentity(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) string io.Closer } diff --git a/pkg/contexts/ocm/cpi/view.go b/pkg/contexts/ocm/cpi/view.go index 892f892794..490d294e3d 100644 --- a/pkg/contexts/ocm/cpi/view.go +++ b/pkg/contexts/ocm/cpi/view.go @@ -313,7 +313,10 @@ func (c *componentAccessView) addVersion(acc ComponentVersionAccess, overrides . eff ComponentVersionAccess ) - if !c.impl.IsOwned(acc) { + opts := NewBlobUploadOptions() + + forcestore := c.impl.IsOwned(acc) + if !forcestore { // transfer all local blobs into a new owned version. sel = func(spec AccessSpec) bool { return spec.IsLocal(ctx) } @@ -333,14 +336,16 @@ func (c *componentAccessView) addVersion(acc ComponentVersionAccess, overrides . *d = *acc.GetDescriptor().Copy() } else { // transfer composition blobs into local blobs + opts.UseNoDefaultIfNotSet = true + opts.BlobHandlerProvider = nil sel = compose.Is d = acc.GetDescriptor() eff = acc } - err = setupLocalBobs(ctx, "resource", acc, eff, nil, impl, d.Resources, sel) + err = setupLocalBobs(ctx, "resource", acc, nil, impl, d.Resources, sel, forcestore, opts) if err == nil { - err = setupLocalBobs(ctx, "source", acc, eff, nil, impl, d.Sources, sel) + err = setupLocalBobs(ctx, "source", acc, nil, impl, d.Sources, sel, forcestore, opts) } if err != nil { return err @@ -349,7 +354,7 @@ func (c *componentAccessView) addVersion(acc ComponentVersionAccess, overrides . return c.impl.AddVersion(eff) } -func setupLocalBobs(ctx Context, kind string, src, tgt ComponentVersionAccess, accprov func(AccessSpec) (AccessMethod, error), tgtimpl ComponentVersionAccessImpl, it compdesc.ArtifactAccessor, sel func(AccessSpec) bool) (ferr error) { +func setupLocalBobs(ctx Context, kind string, src ComponentVersionAccess, accprov func(AccessSpec) (AccessMethod, error), tgtimpl ComponentVersionAccessImpl, it compdesc.ArtifactAccessor, sel func(AccessSpec) bool, forcestore bool, opts *BlobUploadOptions) (ferr error) { var finalize finalizer.Finalizer defer finalize.FinalizeWithErrorPropagation(&ferr) @@ -366,7 +371,13 @@ func setupLocalBobs(ctx Context, kind string, src, tgt ComponentVersionAccess, a return errors.Wrapf(err, "%s %d", kind, i) } nested.Close(blob) - effspec, err := addBlob(tgtimpl, tgt, a.GetType(), ReferenceHint(spec, src), blob, GlobalAccess(spec, ctx)) + + var effspec AccessSpec + if forcestore { + effspec, err = tgtimpl.AddBlobFor(blob, ReferenceHint(spec, src), GlobalAccess(spec, ctx)) + } else { + effspec, err = addBlob(tgtimpl, a.GetType(), ReferenceHint(spec, src), blob, GlobalAccess(spec, ctx)) + } if err != nil { return errors.Wrapf(err, "cannot store %s %d", kind, i) } @@ -391,11 +402,7 @@ func blobAccessForLocalAccessSpec(spec AccessSpec, cv ComponentVersionAccess, ac if err != nil { return nil, err } - - if err != nil { - return nil, err - } - return accspeccpi.BlobAccessForAccessMethod(m) + return m.AsBlobAccess(), nil } func (c *componentAccessView) NewVersion(version string, overrides ...bool) (acc ComponentVersionAccess, err error) { @@ -438,22 +445,21 @@ type ComponentVersionAccessImpl interface { GetDescriptor() *compdesc.ComponentDescriptor - AccessMethod(ComponentVersionAccess, AccessSpec) (AccessMethod, error) - - GetInexpensiveContentVersionIdentity(ComponentVersionAccess, AccessSpec) string + AccessMethod(AccessSpec, refmgmt.ExtendedAllocatable) (AccessMethod, error) + GetInexpensiveContentVersionIdentity(AccessSpec, refmgmt.ExtendedAllocatable) string // GetStorageContext creates a storage context for blobs // that is used to feed blob handlers for specific blob storage methods. // If no handler accepts the blob, the AddBlobFor method will // be used to store the blob - GetStorageContext(cv ComponentVersionAccess) StorageContext + GetStorageContext() StorageContext // AddBlobFor stores a local blob together with the component and // potentially provides a global reference. // The resulting access information (global and local) is provided as // an access method specification usable in a component descriptor. // This is the direct technical storage, without caring about any handler. - AddBlobFor(storagectx StorageContext, blob BlobAccess, refName string, global AccessSpec) (AccessSpec, error) + AddBlobFor(blob BlobAccess, refName string, global AccessSpec) (AccessSpec, error) IsReadOnly() bool @@ -600,6 +606,7 @@ func (b *ComponentVersionAccessImplBase) GetBlobCache() BlobCache { type componentVersionAccessView struct { _ComponentVersionAccessView impl ComponentVersionAccessImpl + err error } var ( @@ -615,10 +622,12 @@ func GetComponentVersionAccessImplementation(n ComponentVersionAccess) (Componen } func artifactAccessViewCreator(i ComponentVersionAccessImpl, v resource.CloserView, d resource.ViewManager[ComponentVersionAccess]) ComponentVersionAccess { - return &componentVersionAccessView{ + cv := &componentVersionAccessView{ _ComponentVersionAccessView: resource.NewView[ComponentVersionAccess](v, d), impl: i, } + v.Allocatable().BeforeCleanup(refmgmt.CleanupHandlerFunc(cv.finish)) + return cv } func NewComponentVersionAccess(impl ComponentVersionAccessImpl) ComponentVersionAccess { @@ -630,24 +639,19 @@ func (c *componentVersionAccessView) Unwrap() interface{} { } func (c *componentVersionAccessView) Close() error { - err := c.Execute(func() error { - // executed under local lock, if refcount is one, I'm the last user. - if c.impl.RefCount() == 1 { - // prepare artifact access for final close in - // direct access mode. - if !compositionmodeattr.Get(c.GetContext()) { - err := c.update(true) - if err != nil { - return err - } - } + list := errors.ErrListf("closing %s", common.VersionedElementKey(c)) + err := c._ComponentVersionAccessView.Close() + return list.Add(c.err, err).Result() +} + +func (c *componentVersionAccessView) finish() { + if !c.IsClosed() { + // prepare artifact access for final close in + // direct access mode. + if !compositionmodeattr.Get(c.GetContext()) { + c.err = c.update(true) } - return nil - }) - if err != nil { - return err } - return c._ComponentVersionAccessView.Close() } func (c *componentVersionAccessView) Repository() Repository { @@ -709,7 +713,7 @@ func (c *componentVersionAccessView) accessMethod(spec AccessSpec) (meth AccessM case !spec.IsLocal(c.GetContext()): meth, err = spec.AccessMethod(c) default: - meth, err = c.impl.AccessMethod(c, spec) + meth, err = c.impl.AccessMethod(spec, c.Allocatable()) if err == nil { if blob := c.getLocalBlob(spec); blob != nil { meth, err = newFakeMethod(meth, blob) @@ -743,7 +747,7 @@ func (c *componentVersionAccessView) getInexpensiveContentVersionIdentity(spec A // fall back to original version return spec.GetInexpensiveContentVersionIdentity(c) default: - return c.impl.GetInexpensiveContentVersionIdentity(c, spec) + return c.impl.GetInexpensiveContentVersionIdentity(spec, c.Allocatable()) } } @@ -768,9 +772,9 @@ func (c *componentVersionAccessView) update(final bool) error { return err } // TODO: exceute for separately lockable view - err = setupLocalBobs(ctx, "resource", c, c, c.accessMethod, impl, d.Resources, compose.Is) + err = setupLocalBobs(ctx, "resource", c, c.accessMethod, impl, d.Resources, compose.Is, true, nil) if err == nil { - err = setupLocalBobs(ctx, "source", c, c, c.accessMethod, impl, d.Sources, compose.Is) + err = setupLocalBobs(ctx, "source", c, c.accessMethod, impl, d.Sources, compose.Is, true, nil) } if err != nil { return err @@ -792,35 +796,20 @@ func (c *componentVersionAccessView) AddBlob(blob cpi.BlobAccess, artType, refNa } blob, err := blob.Dup() if err != nil { - return nil, errors.Wrapf(err, "inavlid blob access") + return nil, errors.Wrapf(err, "invalid blob access") } defer blob.Close() err = utils.ValidateObject(blob) if err != nil { - return nil, errors.Wrapf(err, "inavlid blob access") - } - - eff := NewBlobUploadOptions(opts...) - if !eff.UseNoDefaultIfNotSet && eff.BlobHandlerProvider == nil { - eff.BlobHandlerProvider = internal.DefaultBlobHandlerProvider(c.GetContext()) + return nil, errors.Wrapf(err, "invalid blob access") } - var acc AccessSpec - if c.impl.UseDirectAccess() { - acc, err = addBlob(c.impl, c, artType, refName, blob, global) - } else { - // use local composition access to be added to the repository with AddVersion. - acc = compose.New(refName, blob.MimeType(), global) - } - if err == nil { - return c.cacheLocalBlob(acc, blob) - } - return acc, err + return addBlob(c.impl, artType, refName, blob, global) } -func addBlob(impl ComponentVersionAccessImpl, cv ComponentVersionAccess, artType, refName string, blob BlobAccess, global AccessSpec) (AccessSpec, error) { - storagectx := impl.GetStorageContext(cv) - ctx := cv.GetContext() +func addBlob(impl ComponentVersionAccessImpl, artType, refName string, blob BlobAccess, global AccessSpec) (AccessSpec, error) { + storagectx := impl.GetStorageContext() + ctx := impl.GetContext() h := ctx.BlobHandlers().LookupHandler(storagectx.GetImplementationRepositoryType(), artType, blob.MimeType()) if h != nil { acc, err := h.StoreBlob(blob, artType, refName, nil, storagectx) @@ -834,7 +823,12 @@ func addBlob(impl ComponentVersionAccessImpl, cv ComponentVersionAccess, artType global = acc } } - return impl.AddBlobFor(storagectx, blob, refName, global) + if impl.UseDirectAccess() { + return impl.AddBlobFor(blob, refName, global) + } + // use local composition access to be added to the repository with AddVersion. + acc := compose.New(refName, blob.MimeType(), global) + return cacheLocalBlob(impl, acc, blob) } func (c *componentVersionAccessView) getLocalBlob(acc AccessSpec) BlobAccess { @@ -845,7 +839,7 @@ func (c *componentVersionAccessView) getLocalBlob(acc AccessSpec) BlobAccess { return c.impl.GetBlobCache().GetBlobFor(string(key)) } -func (c *componentVersionAccessView) cacheLocalBlob(acc AccessSpec, blob BlobAccess) (AccessSpec, error) { +func cacheLocalBlob(impl ComponentVersionAccessImpl, acc AccessSpec, blob BlobAccess) (AccessSpec, error) { key, err := json.Marshal(acc) if err != nil { return nil, errors.Wrapf(err, "cannot marshal access spec") @@ -863,7 +857,7 @@ func (c *componentVersionAccessView) cacheLocalBlob(acc AccessSpec, blob BlobAcc // The access spec is independent of the actual repo, so it does // not have access to those credentials. Therefore, we have to // keep the original blob for further usage, also. - err = c.impl.GetBlobCache().AddBlobFor(string(key), blob) + err = impl.GetBlobCache().AddBlobFor(string(key), blob) if err != nil { return nil, err } diff --git a/pkg/contexts/ocm/cpi/view_rsc.go b/pkg/contexts/ocm/cpi/view_rsc.go index 6c480c9975..4b8868dafb 100644 --- a/pkg/contexts/ocm/cpi/view_rsc.go +++ b/pkg/contexts/ocm/cpi/view_rsc.go @@ -80,13 +80,12 @@ func (r *ComponentVersionBasedAccessProvider) AccessMethod() (AccessMethod, erro return acc.AccessMethod(r.vers) } -func (r *ComponentVersionBasedAccessProvider) BlobAccess() (blob BlobAccess, rerr error) { +func (r *ComponentVersionBasedAccessProvider) BlobAccess() (BlobAccess, error) { m, err := r.AccessMethod() if err != nil { return nil, err } - defer errors.PropagateError(&err, m.Close) - return accspeccpi.BlobAccessForAccessMethod(m) + return m.AsBlobAccess(), nil } //////////////////////////////////////////////////////////////////////////////// @@ -171,7 +170,7 @@ func (b *accessAccessProvider) Access() (cpi.AccessSpec, error) { } func (b *accessAccessProvider) AccessMethod() (cpi.AccessMethod, error) { - return nil, errors.ErrNotFound(descriptor.KIND_ACCESSMETHOD) + return b.spec.AccessMethod(&DummyComponentVersionAccess{b.ctx}) } func (b *accessAccessProvider) BlobAccess() (blobaccess.BlobAccess, error) { diff --git a/pkg/contexts/ocm/download/handlers/plugin/handler.go b/pkg/contexts/ocm/download/handlers/plugin/handler.go index e1a4e5b512..8d5790020d 100644 --- a/pkg/contexts/ocm/download/handlers/plugin/handler.go +++ b/pkg/contexts/ocm/download/handlers/plugin/handler.go @@ -14,6 +14,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/ocm/plugin" "github.com/open-component-model/ocm/pkg/contexts/ocm/plugin/descriptor" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/finalizer" ) // pluginHandler delegates download format of artifacts to a plugin based handler. @@ -36,14 +37,17 @@ func New(p plugin.Plugin, name string, config []byte) (download.Handler, error) }, nil } -func (b *pluginHandler) Download(_ common.Printer, racc cpi.ResourceAccess, path string, _ vfs.FileSystem) (bool, string, error) { +func (b *pluginHandler) Download(_ common.Printer, racc cpi.ResourceAccess, path string, _ vfs.FileSystem) (resp bool, eff string, rerr error) { m, err := racc.AccessMethod() if err != nil { return true, "", err } + var finalize finalizer.Finalizer + defer finalize.FinalizeWithErrorPropagation(&rerr) + finalize.Close(m, "method for download") r := accessio.NewOndemandReader(m) - defer errors.PropagateError(&err, r.Close) + finalize.Close(r, "reader for downlowd download") return b.plugin.Download(b.name, r, racc.Meta().Type, m.MimeType(), path, b.config) } diff --git a/pkg/contexts/ocm/elements/artifactblob/genericblob/resource.go b/pkg/contexts/ocm/elements/artifactblob/genericblob/resource.go index 0cc7465de0..ad32222f9c 100644 --- a/pkg/contexts/ocm/elements/artifactblob/genericblob/resource.go +++ b/pkg/contexts/ocm/elements/artifactblob/genericblob/resource.go @@ -6,24 +6,23 @@ package genericblob import ( "github.com/open-component-model/ocm/pkg/blobaccess" - "github.com/open-component-model/ocm/pkg/contexts/ocm" "github.com/open-component-model/ocm/pkg/contexts/ocm/compdesc" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" "github.com/open-component-model/ocm/pkg/generics" "github.com/open-component-model/ocm/pkg/optionutils" ) -func Access[M any, P compdesc.ArtifactMetaPointer[M]](ctx ocm.Context, meta P, blob blobaccess.BlobAccess, opts ...Option) cpi.ArtifactAccess[M] { +func Access[M any, P compdesc.ArtifactMetaPointer[M]](ctx cpi.Context, meta P, blob blobaccess.BlobAccessProvider, opts ...Option) cpi.ArtifactAccess[M] { eff := optionutils.EvalOptions(opts...) - accprov := cpi.NewAccessProviderForBlobAccessProvider(ctx, blobaccess.ProviderForBlobAccess(blob), eff.Hint, eff.Global) + accprov := cpi.NewAccessProviderForBlobAccessProvider(ctx, blob, eff.Hint, eff.Global) // strange type cast is required by Go compiler, meta has the correct type. return cpi.NewArtifactAccessForProvider(generics.As[*M](meta), accprov) } -func ResourceAccess(ctx ocm.Context, media string, meta *ocm.ResourceMeta, blob blobaccess.BlobAccess, opts ...Option) cpi.ResourceAccess { +func ResourceAccess(ctx cpi.Context, media string, meta *cpi.ResourceMeta, blob blobaccess.BlobAccessProvider, opts ...Option) cpi.ResourceAccess { return Access(ctx, meta, blob, opts...) } -func SourceAccess(ctx ocm.Context, media string, meta *ocm.SourceMeta, blob blobaccess.BlobAccess, opts ...Option) cpi.SourceAccess { +func SourceAccess(ctx cpi.Context, media string, meta *cpi.SourceMeta, blob blobaccess.BlobAccessProvider, opts ...Option) cpi.SourceAccess { return Access(ctx, meta, blob, opts...) } diff --git a/pkg/contexts/ocm/interface.go b/pkg/contexts/ocm/interface.go index 7307777ab8..4ad64ec641 100644 --- a/pkg/contexts/ocm/interface.go +++ b/pkg/contexts/ocm/interface.go @@ -91,6 +91,14 @@ func DefaultContext() internal.Context { return internal.DefaultContext } +// NoComponentVersion provides a dummy component version +// providing access to the context. +// It can be used to instantiate external access methods +// (not based on any component version). +func NoComponentVersion(ctx ContextProvider) ComponentVersionAccess { + return &cpi.DummyComponentVersionAccess{ctx.OCMContext()} +} + func DefaultBlobHandlers() BlobHandlerRegistry { return internal.DefaultBlobHandlerRegistry } diff --git a/pkg/contexts/ocm/internal/accesstypes.go b/pkg/contexts/ocm/internal/accesstypes.go index e76ac5c257..3c8b91373e 100644 --- a/pkg/contexts/ocm/internal/accesstypes.go +++ b/pkg/contexts/ocm/internal/accesstypes.go @@ -22,11 +22,6 @@ import ( type AccessType flagsetscheme.VersionTypedObjectType[AccessSpec] -type AccessMethodSupport interface { - GetContext() Context - LocalSupportForAccessSpec(spec AccessSpec) bool -} - // AccessSpec is the interface access method specifications // must fulfill. The main task is to map the specification // to a concrete implementation of the access method for a dedicated @@ -99,6 +94,13 @@ type AccessMethodImpl interface { type AccessMethod interface { refmgmt.Dup[AccessMethod] AccessMethodImpl + + // AsBlobAccess maps a method object into a + // basic blob access interface. + // It does not provide a separate reference, + // closing the blob access with close the + // access method. + AsBlobAccess() BlobAccess } type AccessTypeScheme flagsetscheme.TypeScheme[AccessSpec, AccessType] diff --git a/pkg/contexts/ocm/internal/blobhandler.go b/pkg/contexts/ocm/internal/blobhandler.go index 6498640d55..d655f6743a 100644 --- a/pkg/contexts/ocm/internal/blobhandler.go +++ b/pkg/contexts/ocm/internal/blobhandler.go @@ -32,7 +32,7 @@ func (t ImplementationRepositoryType) IsInitial() bool { // It depends on the Context type of the used base repository. type StorageContext interface { GetContext() Context - TargetComponentVersion() ComponentVersionAccess + TargetComponentName() string TargetComponentRepository() Repository GetImplementationRepositoryType() ImplementationRepositoryType } diff --git a/pkg/contexts/ocm/internal/modopts.go b/pkg/contexts/ocm/internal/modopts.go index 34eb201bd7..4c691a9360 100644 --- a/pkg/contexts/ocm/internal/modopts.go +++ b/pkg/contexts/ocm/internal/modopts.go @@ -18,8 +18,8 @@ type BlobOptionImpl interface { } type BlobUploadOptions struct { - UseNoDefaultIfNotSet bool - BlobHandlerProvider BlobHandlerProvider + UseNoDefaultIfNotSet bool `json:"noDefaultUpload,omitempty"` + BlobHandlerProvider BlobHandlerProvider `json:"-"` } var _ BlobUploadOption = (*BlobUploadOptions)(nil) diff --git a/pkg/contexts/ocm/plugin/utils.go b/pkg/contexts/ocm/plugin/utils.go index 8f3bba7cd6..0a3745410d 100644 --- a/pkg/contexts/ocm/plugin/utils.go +++ b/pkg/contexts/ocm/plugin/utils.go @@ -10,6 +10,7 @@ import ( "github.com/opencontainers/go-digest" "github.com/open-component-model/ocm/pkg/common/accessio" + "github.com/open-component-model/ocm/pkg/iotools" ) type AccessDataWriter struct { @@ -23,7 +24,7 @@ func NewAccessDataWriter(p Plugin, creds, accspec json.RawMessage) *AccessDataWr } func (d *AccessDataWriter) WriteTo(w accessio.Writer) (int64, digest.Digest, error) { - dw := accessio.NewDefaultDigestWriter(accessio.NopWriteCloser(w)) + dw := iotools.NewDefaultDigestWriter(accessio.NopWriteCloser(w)) err := d.plugin.Get(dw, d.creds, d.accspec) if err != nil { return accessio.BLOB_UNKNOWN_SIZE, accessio.BLOB_UNKNOWN_DIGEST, err diff --git a/pkg/contexts/ocm/repositories/comparch/accessmethod_localfs.go b/pkg/contexts/ocm/repositories/comparch/accessmethod_localfs.go index 6c0d2b0a35..f980c949fc 100644 --- a/pkg/contexts/ocm/repositories/comparch/accessmethod_localfs.go +++ b/pkg/contexts/ocm/repositories/comparch/accessmethod_localfs.go @@ -10,9 +10,11 @@ import ( "github.com/open-component-model/ocm/pkg/blobaccess" "github.com/open-component-model/ocm/pkg/common/accessio" + "github.com/open-component-model/ocm/pkg/contexts/datacontext/attrs/vfsattr" "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/localblob" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/support" + "github.com/open-component-model/ocm/pkg/refmgmt" ) //////////////////////////////////////////////////////////////////////////////// @@ -22,17 +24,36 @@ type localFilesystemBlobAccessMethod struct { closed bool spec *localblob.AccessSpec base support.ComponentVersionContainer - blobAccess blobaccess.DataAccess + err error + blobAccess blobaccess.BlobAccess } var _ accspeccpi.AccessMethodImpl = (*localFilesystemBlobAccessMethod)(nil) -func newLocalFilesystemBlobAccessMethod(a *localblob.AccessSpec, base support.ComponentVersionContainer) accspeccpi.AccessMethod { - m, _ := accspeccpi.AccessMethodForImplementation(&localFilesystemBlobAccessMethod{ +func newLocalFilesystemBlobAccessMethod(a *localblob.AccessSpec, base support.ComponentVersionContainer, ref refmgmt.ExtendedAllocatable) (accspeccpi.AccessMethod, error) { + m := &localFilesystemBlobAccessMethod{ spec: a, base: base, - }, nil) - return m + } + ref.BeforeCleanup(refmgmt.CleanupHandlerFunc(m.Cache)) + return accspeccpi.AccessMethodForImplementation(m, nil) +} + +func (m *localFilesystemBlobAccessMethod) Cache() { + m.Lock() + defer m.Unlock() + + if m.closed { + return + } + + blob, err := m.getBlob() + if err == nil { + blob, err = blobaccess.ForCachedBlobAccess(blob, vfsattr.Get(m.base.GetContext())) + } + m.blobAccess.Close() + m.blobAccess = blob + m.err = err } func (_ *localFilesystemBlobAccessMethod) IsLocal() bool { @@ -55,14 +76,23 @@ func (m *localFilesystemBlobAccessMethod) Reader() (io.ReadCloser, error) { return nil, accessio.ErrClosed } + blob, err := m.getBlob() + if err != nil { + return nil, err + } + + return blob.Reader() +} + +func (m *localFilesystemBlobAccessMethod) getBlob() (blobaccess.BlobAccess, error) { if m.blobAccess == nil { - var err error - m.blobAccess, err = m.base.GetBlobData(m.spec.LocalReference) + data, err := m.base.GetBlobData(m.spec.LocalReference) if err != nil { - return blobaccess.BlobReader(m.blobAccess, err) + return nil, err } + m.blobAccess = blobaccess.ForDataAccess(blobaccess.BLOB_UNKNOWN_DIGEST, blobaccess.BLOB_UNKNOWN_SIZE, m.MimeType(), data) } - return blobaccess.BlobReader(m.blobAccess, nil) + return m.blobAccess, m.err } func (m *localFilesystemBlobAccessMethod) Get() ([]byte, error) { @@ -73,14 +103,11 @@ func (m *localFilesystemBlobAccessMethod) Get() ([]byte, error) { return nil, accessio.ErrClosed } - if m.blobAccess == nil { - var err error - m.blobAccess, err = m.base.GetBlobData(m.spec.LocalReference) - if err != nil { - return blobaccess.BlobData(m.blobAccess, err) - } + blob, err := m.getBlob() + if err != nil { + return nil, err } - return blobaccess.BlobData(m.blobAccess, nil) + return blob.Get() } func (m *localFilesystemBlobAccessMethod) MimeType() string { @@ -95,13 +122,10 @@ func (m *localFilesystemBlobAccessMethod) Close() error { return accessio.ErrClosed } + m.closed = true if m.blobAccess != nil { err := m.blobAccess.Close() - m.blobAccess = nil - m.closed = true - if err != nil { - return err - } + return err } return nil } diff --git a/pkg/contexts/ocm/repositories/comparch/accessmethod_test.go b/pkg/contexts/ocm/repositories/comparch/accessmethod_test.go index a203a71fd6..6245483950 100644 --- a/pkg/contexts/ocm/repositories/comparch/accessmethod_test.go +++ b/pkg/contexts/ocm/repositories/comparch/accessmethod_test.go @@ -11,6 +11,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + . "github.com/open-component-model/ocm/pkg/testutils" "github.com/open-component-model/ocm/pkg/common/accessobj" "github.com/open-component-model/ocm/pkg/contexts/ocm" @@ -47,23 +48,19 @@ var _ = Describe("access method", func() { Context("component archive", func() { It("instantiate local blob access method for component archive", func() { - data, err := os.ReadFile("testdata/descriptor/component-descriptor.yaml") - Expect(err).To(Succeed()) - cd, err := compdesc.Decode(data) - Expect(err).To(Succeed()) + data := Must(os.ReadFile("testdata/descriptor/component-descriptor.yaml")) + cd := Must(compdesc.Decode(data)) - ca, err := comparch.New(DefaultContext, accessobj.ACC_CREATE, nil, nil, nil, 0600) - Expect(err).To(Succeed()) + ca := Must(comparch.New(DefaultContext, accessobj.ACC_CREATE, nil, nil, nil, 0600)) + defer Close(ca, "component archive") ca.GetDescriptor().Name = "acme.org/dummy" ca.GetDescriptor().Version = "v1" - res, err := cd.GetResourceByIdentity(metav1.NewIdentity("local")) - Expect(err).To(Succeed()) + res := Must(cd.GetResourceByIdentity(metav1.NewIdentity("local"))) Expect(res).To(Not(BeNil())) - spec, err := DefaultContext.AccessSpecForSpec(res.Access) - Expect(err).To(Succeed()) + spec := Must(DefaultContext.AccessSpecForSpec(res.Access)) Expect(spec).To(Not(BeNil())) Expect(spec.GetType()).To(Equal(localfsblob.Type)) @@ -71,17 +68,14 @@ var _ = Describe("access method", func() { Expect(spec.GetVersion()).To(Equal("v1")) Expect(reflect.TypeOf(spec)).To(Equal(reflect.TypeOf(&localblob.AccessSpec{}))) - data, err = json.Marshal(spec) - Expect(err).To(Succeed()) + data = Must(json.Marshal(spec)) Expect(string(data)).To(Equal(legacy)) - m, err := spec.AccessMethod(ca) - Expect(err).To(Succeed()) + m := Must(spec.AccessMethod(ca)) + defer Close(m, "caccess method") Expect(m).To(Not(BeNil())) Expect(reflect.TypeOf(accspeccpi.GetAccessMethodImplementation(m)).String()).To(Equal("*comparch.localFilesystemBlobAccessMethod")) Expect(m.GetKind()).To(Equal("localBlob")) - - Expect(ca.Close()).To(Succeed()) }) }) }) diff --git a/pkg/contexts/ocm/repositories/comparch/componentarchive.go b/pkg/contexts/ocm/repositories/comparch/componentarchive.go index 4f39afc5d6..b05d267be9 100644 --- a/pkg/contexts/ocm/repositories/comparch/componentarchive.go +++ b/pkg/contexts/ocm/repositories/comparch/componentarchive.go @@ -9,7 +9,6 @@ import ( "github.com/open-component-model/ocm/pkg/blobaccess" "github.com/open-component-model/ocm/pkg/common" - "github.com/open-component-model/ocm/pkg/common/accessio" "github.com/open-component-model/ocm/pkg/common/accessobj" ocicpi "github.com/open-component-model/ocm/pkg/contexts/oci/cpi" "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/localblob" @@ -19,6 +18,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/support" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/refmgmt" ) //////////////////////////////////////////////////////////////////////////////// @@ -117,8 +117,8 @@ func (c *componentArchiveContainer) GetParentViewManager() cpi.ComponentAccessVi } func (c *componentArchiveContainer) Close() error { - c.Update() - return c.base.Close() + var list errors.ErrorList + return list.Add(c.Update(), c.base.Close()).Result() } func (c *componentArchiveContainer) GetContext() cpi.Context { @@ -148,8 +148,8 @@ func (c *componentArchiveContainer) GetBlobData(name string) (cpi.DataAccess, er return c.base.GetBlobDataByName(name) } -func (c *componentArchiveContainer) GetStorageContext(cv cpi.ComponentVersionAccess) cpi.StorageContext { - return ocmhdlr.New(c.Repository(), cv, &BlobSink{c.base}, Type) +func (c *componentArchiveContainer) GetStorageContext() cpi.StorageContext { + return ocmhdlr.New(c.Repository(), c.impl.GetName(), &BlobSink{c.base}, Type) } type BlobSink struct { @@ -164,7 +164,7 @@ func (s *BlobSink) AddBlob(blob blobaccess.BlobAccess) (string, error) { return blob.Digest().String(), nil } -func (c *componentArchiveContainer) AddBlobFor(storagectx cpi.StorageContext, blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { +func (c *componentArchiveContainer) AddBlobFor(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { if blob == nil { return nil, errors.New("a resource has to be defined") } @@ -175,24 +175,29 @@ func (c *componentArchiveContainer) AddBlobFor(storagectx cpi.StorageContext, bl return localblob.New(common.DigestToFileName(blob.Digest()), refName, blob.MimeType(), global), nil } -func (c *componentArchiveContainer) AccessMethod(a cpi.AccessSpec) (cpi.AccessMethod, error) { +func (c *componentArchiveContainer) AccessMethod(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error) { if a.GetKind() == localblob.Type || a.GetKind() == localfsblob.Type { accessSpec, err := c.GetContext().AccessSpecForSpec(a) if err != nil { return nil, err } - return newLocalFilesystemBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c), nil + return newLocalFilesystemBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c, cv) } return nil, errors.ErrNotSupported(errors.KIND_ACCESSMETHOD, a.GetType(), "component archive") } -func (c *componentArchiveContainer) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec) string { +func (c *componentArchiveContainer) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) string { if a.GetKind() == localblob.Type || a.GetKind() == localfsblob.Type { accessSpec, err := c.GetContext().AccessSpecForSpec(a) if err != nil { return "" } - digest, _ := accessio.Digest(newLocalFilesystemBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c)) + m, err := newLocalFilesystemBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c, cv) + if err != nil { + return "" + } + defer m.Close() + digest, _ := blobaccess.Digest(m) return digest.String() } return "" diff --git a/pkg/contexts/ocm/repositories/comparch/repository.go b/pkg/contexts/ocm/repositories/comparch/repository.go index 384c7c8221..604e3bdeb2 100644 --- a/pkg/contexts/ocm/repositories/comparch/repository.go +++ b/pkg/contexts/ocm/repositories/comparch/repository.go @@ -9,6 +9,7 @@ import ( "strings" "sync" + "github.com/open-component-model/ocm/pkg/blobaccess" "github.com/open-component-model/ocm/pkg/common" "github.com/open-component-model/ocm/pkg/common/accessio" "github.com/open-component-model/ocm/pkg/contexts/datacontext/attrs/vfsattr" @@ -292,11 +293,11 @@ func (c *ComponentVersionContainer) GetBlobData(name string) (cpi.DataAccess, er return c.comp.repo.arch.container.GetBlobData(name) } -func (c *ComponentVersionContainer) GetStorageContext(cv cpi.ComponentVersionAccess) cpi.StorageContext { - return ocmhdlr.New(c.Repository(), cv, &BlobSink{c.comp.repo.arch.container.base}, Type) +func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext { + return ocmhdlr.New(c.Repository(), c.comp.GetName(), &BlobSink{c.comp.repo.arch.container.base}, Type) } -func (c *ComponentVersionContainer) AddBlobFor(storagectx cpi.StorageContext, blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { +func (c *ComponentVersionContainer) AddBlobFor(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { if blob == nil { return nil, errors.New("a resource has to be defined") } @@ -307,26 +308,29 @@ func (c *ComponentVersionContainer) AddBlobFor(storagectx cpi.StorageContext, bl return localblob.New(common.DigestToFileName(blob.Digest()), refName, blob.MimeType(), global), nil } -func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec) (cpi.AccessMethod, error) { +func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error) { if a.GetKind() == localblob.Type || a.GetKind() == localfsblob.Type { accessSpec, err := c.GetContext().AccessSpecForSpec(a) if err != nil { return nil, err } - return newLocalFilesystemBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c), nil + return newLocalFilesystemBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c, cv) } return nil, errors.ErrNotSupported(errors.KIND_ACCESSMETHOD, a.GetType(), "component archive") } -func (c *ComponentVersionContainer) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec) string { +func (c *ComponentVersionContainer) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) string { if a.GetKind() == localblob.Type || a.GetKind() == localfsblob.Type { accessSpec, err := c.GetContext().AccessSpecForSpec(a) if err != nil { return "" } - m := newLocalFilesystemBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c) + m, err := newLocalFilesystemBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c, cv) + if err != nil { + return "" + } defer m.Close() - digest, _ := accessio.Digest(m) + digest, _ := blobaccess.Digest(m) return digest.String() } return "" diff --git a/pkg/contexts/ocm/repositories/composition/close_test.go b/pkg/contexts/ocm/repositories/composition/close_test.go new file mode 100644 index 0000000000..6be999d23e --- /dev/null +++ b/pkg/contexts/ocm/repositories/composition/close_test.go @@ -0,0 +1,153 @@ +// SPDX-FileCopyrightText: 2022 SAP SE or an SAP affiliate company and Open Component Model contributors. +// +// SPDX-License-Identifier: Apache-2.0 + +package composition_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "github.com/open-component-model/ocm/pkg/contexts/oci/testhelper" + . "github.com/open-component-model/ocm/pkg/testutils" + + "github.com/open-component-model/ocm/pkg/common/accessio" + "github.com/open-component-model/ocm/pkg/common/accessobj" + "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/relativeociref" + v1 "github.com/open-component-model/ocm/pkg/contexts/ocm/compdesc/meta/v1" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/comparch" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/composition" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/ctf" + "github.com/open-component-model/ocm/pkg/env/builder" + "github.com/open-component-model/ocm/pkg/finalizer" + "github.com/open-component-model/ocm/pkg/mime" + "github.com/open-component-model/ocm/pkg/refmgmt" +) + +const OCIPATH = "/tmp/oci" +const OCIHOST = "alias" + +const RES = "ref" + +var _ = Describe("cached access method blob", func() { + var env *builder.Builder + + BeforeEach(func() { + env = builder.NewBuilder() + }) + + AfterEach(func() { + env.Cleanup() + }) + + Context("ocireg", func() { + + BeforeEach(func() { + env.OCICommonTransport(OCIPATH, accessio.FormatDirectory, func() { + OCIManifest1(env) + }) + FakeOCIRepo(env, OCIPATH, OCIHOST) + + env.OCMCommonTransport(OCIPATH, accessio.FormatDirectory, func() { + env.ComponentVersion(COMPONENT, VERSION, func() { + env.Resource(RES, VERSION, "testtyp", v1.LocalRelation, func() { + env.Access(relativeociref.New(OCINAMESPACE + ":" + OCIVERSION)) + }) + }) + }) + }) + + It("caches blobs on close", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + srcfinalize := finalize.Nested() + + ctfrepo := Must(ctf.Open(env, accessobj.ACC_READONLY, OCIPATH, 0o700, env)) + + refmgmt.AsLazy(ctfrepo) + + srcfinalize.Close(ctfrepo, "ctf") + srccv := Must(ctfrepo.LookupComponentVersion(COMPONENT, VERSION)) + srcfinalize.Close(srccv, "src cv") + + res := Must(srccv.GetResource(v1.NewIdentity(RES))) + srcblob := Must(res.BlobAccess()) + finalize.Close(srcblob, "source blob") + Expect(srcblob.MimeType()).To(Equal("application/vnd.oci.image.manifest.v1+tar+gzip")) + + // now close the original environment + // the blob access must be cached now and decoupled from the providing + // repository. + MustBeSuccessful(srcfinalize.Finalize()) + + Expect(srcblob.MimeType()).To(Equal("application/vnd.oci.image.manifest.v1+tar+gzip")) + }) + + It("caches blobs on close", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + srcfinalize := finalize.Nested() + + ctfrepo := Must(ctf.Open(env, accessobj.ACC_READONLY, OCIPATH, 0o700, env)) + srcfinalize.Close(ctfrepo, "ctf") + srccv := Must(ctfrepo.LookupComponentVersion(COMPONENT, VERSION)) + srcfinalize.Close(srccv, "src cv") + + res := Must(srccv.GetResource(v1.NewIdentity(RES))) + + // copy to composition repo + repo := composition.NewRepository(env) + finalize.Close(repo, "composition repository") + MustBeSuccessful(repo.AddComponentVersion(srccv)) + + // now close thenoriginal environment + // the blob access must be cached now and decoupled from the providing + // repository. + MustBeSuccessful(srcfinalize.Finalize()) + + cv := Must(repo.LookupComponentVersion(COMPONENT, VERSION)) + finalize.Close(cv, "composition cv") + + res = Must(cv.GetResource(v1.NewIdentity(RES))) + + m := Must(res.AccessMethod()) + finalize.Close(m, "copied method") + + Expect(m.MimeType()).To(Equal("application/vnd.oci.image.manifest.v1+tar+gzip")) + }) + }) + + Context("comparch", func() { + BeforeEach(func() { + env.ComponentArchive(OCIPATH, accessio.FormatTar, COMPONENT, VERSION, func() { + env.Resource(RES, VERSION, "testtyp", v1.LocalRelation, func() { + env.BlobStringData(mime.MIME_TEXT, "testdata") + }) + }) + }) + + It("caches blobs on close", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + srcfinalize := finalize.Nested() + + ctfrepo := Must(comparch.Open(env, accessobj.ACC_READONLY, OCIPATH, 0o700, env)) + srcfinalize.Close(ctfrepo, "ctf") + srccv := ctfrepo + + res := Must(srccv.GetResource(v1.NewIdentity(RES))) + srcblob := Must(res.BlobAccess()) + finalize.Close(srcblob, "source blob") + Expect(srcblob.MimeType()).To(Equal(mime.MIME_TEXT)) + + // now close the original environment + // the blob access must be cached now and decoupled from the providing + // repository. + MustBeSuccessful(srcfinalize.Finalize()) + + Expect(srcblob.MimeType()).To(Equal(mime.MIME_TEXT)) + }) + }) +}) diff --git a/pkg/contexts/ocm/repositories/composition/repository.go b/pkg/contexts/ocm/repositories/composition/repository.go index c11c8d5fd1..02093212ce 100644 --- a/pkg/contexts/ocm/repositories/composition/repository.go +++ b/pkg/contexts/ocm/repositories/composition/repository.go @@ -48,6 +48,8 @@ type Access struct { blobs map[string]blobaccess.BlobAccess } +var _ virtual.Access = (*Access)(nil) + func NewAccess() *Access { return &Access{ index: virtual.NewIndex[common.NameVersion](), diff --git a/pkg/contexts/ocm/repositories/composition/version_test.go b/pkg/contexts/ocm/repositories/composition/version_test.go index d3c797a993..30fe2aff0a 100644 --- a/pkg/contexts/ocm/repositories/composition/version_test.go +++ b/pkg/contexts/ocm/repositories/composition/version_test.go @@ -18,6 +18,7 @@ import ( ocmutils "github.com/open-component-model/ocm/pkg/contexts/ocm/utils" "github.com/open-component-model/ocm/pkg/finalizer" "github.com/open-component-model/ocm/pkg/mime" + "github.com/open-component-model/ocm/pkg/refmgmt" ) var _ = Describe("version", func() { @@ -32,7 +33,7 @@ var _ = Describe("version", func() { // compose new version cv := me.NewComponentVersion(ctx, COMPONENT, VERSION) cv.GetDescriptor().Provider.Name = "acme.org" - nested.Close(cv, "composed version") + finalize.Close(cv, "composed version") // wrap a non-closer access into a ref counting access to check cleanup blob := bpi.NewBlobAccessForBase(blobaccess.ForString(mime.MIME_TEXT, "testdata")) @@ -49,16 +50,19 @@ var _ = Describe("version", func() { // check result cv = Must(c.LookupVersion(VERSION)) + Expect(refmgmt.ReferenceCount(cv)).To(Equal(1)) nested.Close(cv, "query") rs := Must(cv.GetResourcesByName("test")) Expect(len(rs)).To(Equal(1)) data := Must(ocmutils.GetResourceData(rs[0])) Expect(string(data)).To(Equal("testdata")) + Expect(refmgmt.ReferenceCount(cv)).To(Equal(1)) // add this version again repo2 := me.NewRepository(ctx) finalize.Close(repo2, "target repo2") MustBeSuccessful(repo2.AddComponentVersion(cv)) + Expect(refmgmt.ReferenceCount(cv)).To(Equal(1)) MustBeSuccessful(nested.Finalize()) // check result diff --git a/pkg/contexts/ocm/repositories/genericocireg/accessmethod_localblob.go b/pkg/contexts/ocm/repositories/genericocireg/accessmethod_localblob.go index f3ddc03cc3..22cb85f0ad 100644 --- a/pkg/contexts/ocm/repositories/genericocireg/accessmethod_localblob.go +++ b/pkg/contexts/ocm/repositories/genericocireg/accessmethod_localblob.go @@ -16,10 +16,12 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/localblob" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/refmgmt" ) type localBlobAccessMethod struct { lock sync.Mutex + err error data blobaccess.DataAccess spec *localblob.AccessSpec namespace oci.NamespaceAccess @@ -28,17 +30,24 @@ type localBlobAccessMethod struct { var _ accspeccpi.AccessMethodImpl = (*localBlobAccessMethod)(nil) -func newLocalBlobAccessMethod(a *localblob.AccessSpec, ns oci.NamespaceAccess, art oci.ArtifactAccess) accspeccpi.AccessMethod { - m, _ := accspeccpi.AccessMethodForImplementation(newLocalBlobAccessMethodImpl(a, ns, art), nil) - return m +func newLocalBlobAccessMethod(a *localblob.AccessSpec, ns oci.NamespaceAccess, art oci.ArtifactAccess, ref refmgmt.ExtendedAllocatable) (accspeccpi.AccessMethod, error) { + return accspeccpi.AccessMethodForImplementation(newLocalBlobAccessMethodImpl(a, ns, art, ref)) } -func newLocalBlobAccessMethodImpl(a *localblob.AccessSpec, ns oci.NamespaceAccess, art oci.ArtifactAccess) *localBlobAccessMethod { - return &localBlobAccessMethod{ +func newLocalBlobAccessMethodImpl(a *localblob.AccessSpec, ns oci.NamespaceAccess, art oci.ArtifactAccess, ref refmgmt.ExtendedAllocatable) (*localBlobAccessMethod, error) { + m := &localBlobAccessMethod{ spec: a, namespace: ns, artifact: art, } + ref.BeforeCleanup(refmgmt.CleanupHandlerFunc(m.cache)) + return m, nil +} + +func (m *localBlobAccessMethod) cache() { + if m.artifact != nil { + _, m.err = m.getBlob() + } } func (_ *localBlobAccessMethod) IsLocal() bool { @@ -57,6 +66,8 @@ func (m *localBlobAccessMethod) Close() error { m.lock.Lock() defer m.lock.Unlock() + m.artifact = nil + m.namespace = nil if m.data != nil { tmp := m.data m.data = nil diff --git a/pkg/contexts/ocm/repositories/genericocireg/accessmethod_localoclblob.go b/pkg/contexts/ocm/repositories/genericocireg/accessmethod_localoclblob.go index 4747681fd4..84ef006905 100644 --- a/pkg/contexts/ocm/repositories/genericocireg/accessmethod_localoclblob.go +++ b/pkg/contexts/ocm/repositories/genericocireg/accessmethod_localoclblob.go @@ -10,6 +10,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/oci" "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/localblob" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" + "github.com/open-component-model/ocm/pkg/refmgmt" ) type localOCIBlobAccessMethod struct { @@ -18,11 +19,11 @@ type localOCIBlobAccessMethod struct { var _ accspeccpi.AccessMethodImpl = (*localOCIBlobAccessMethod)(nil) -func newLocalOCIBlobAccessMethod(a *localblob.AccessSpec, ns oci.NamespaceAccess, art oci.ArtifactAccess) accspeccpi.AccessMethod { - m, _ := accspeccpi.AccessMethodForImplementation(&localOCIBlobAccessMethod{ - localBlobAccessMethod: newLocalBlobAccessMethodImpl(a, ns, art), - }, nil) - return m +func newLocalOCIBlobAccessMethod(a *localblob.AccessSpec, ns oci.NamespaceAccess, art oci.ArtifactAccess, ref refmgmt.ExtendedAllocatable) (accspeccpi.AccessMethod, error) { + m, err := newLocalBlobAccessMethodImpl(a, ns, art, ref) + return accspeccpi.AccessMethodForImplementation(&localOCIBlobAccessMethod{ + localBlobAccessMethod: m, + }, err) } func (m *localOCIBlobAccessMethod) MimeType() string { diff --git a/pkg/contexts/ocm/repositories/genericocireg/componentversion.go b/pkg/contexts/ocm/repositories/genericocireg/componentversion.go index 85d35066fd..9ea2eb38fa 100644 --- a/pkg/contexts/ocm/repositories/genericocireg/componentversion.go +++ b/pkg/contexts/ocm/repositories/genericocireg/componentversion.go @@ -26,9 +26,11 @@ import ( ocihdlr "github.com/open-component-model/ocm/pkg/contexts/ocm/blobhandler/handlers/oci" "github.com/open-component-model/ocm/pkg/contexts/ocm/compdesc" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/support" "github.com/open-component-model/ocm/pkg/errors" "github.com/open-component-model/ocm/pkg/generics" + "github.com/open-component-model/ocm/pkg/refmgmt" ) // newComponentVersionAccess creates an component access for the artifact access, if this fails the artifact acess is closed. @@ -121,7 +123,7 @@ func (c *ComponentVersionContainer) IsClosed() bool { return c.manifest == nil } -func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec) (cpi.AccessMethod, error) { +func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error) { accessSpec, err := c.comp.GetContext().AccessSpecForSpec(a) if err != nil { return nil, err @@ -129,17 +131,22 @@ func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec) (cpi.AccessMe switch a.GetKind() { case localblob.Type: - return newLocalBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c.comp.namespace, c.access), nil + return newLocalBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c.comp.namespace, c.access, cv) case localociblob.Type: - return newLocalOCIBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c.comp.namespace, c.access), nil + return newLocalOCIBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c.comp.namespace, c.access, cv) case relativeociref.Type: - return ociartifact.NewMethod(c.GetContext(), a, accessSpec.(*relativeociref.AccessSpec).Reference, view(c.comp.repo.ocirepo)) + m, err := ociartifact.NewMethod(c.GetContext(), a, accessSpec.(*relativeociref.AccessSpec).Reference, c.comp.repo.ocirepo) + if err == nil { + impl := accspeccpi.GetAccessMethodImplementation(m).(ociartifact.AccessMethodImpl) + cv.BeforeCleanup(refmgmt.CleanupHandlerFunc(impl.Cache)) + } + return m, err } return nil, errors.ErrNotSupported(errors.KIND_ACCESSMETHOD, a.GetType(), "oci registry") } -func (c *ComponentVersionContainer) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec) string { +func (c *ComponentVersionContainer) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) string { accessSpec, err := c.comp.GetContext().AccessSpecForSpec(a) if err != nil { return "" @@ -261,11 +268,11 @@ func (c *ComponentVersionContainer) GetBlobData(name string) (cpi.DataAccess, er return c.manifest.GetBlob(digest.Digest(name)) } -func (c *ComponentVersionContainer) GetStorageContext(cv cpi.ComponentVersionAccess) cpi.StorageContext { - return ocihdlr.New(cv, c.comp.repo.ocirepo.GetSpecification().GetKind(), c.comp.repo.ocirepo, c.comp.namespace, c.manifest) +func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext { + return ocihdlr.New(c.comp.GetName(), c.Repository(), c.comp.repo.ocirepo.GetSpecification().GetKind(), c.comp.repo.ocirepo, c.comp.namespace, c.manifest) } -func (c *ComponentVersionContainer) AddBlobFor(storagectx cpi.StorageContext, blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { +func (c *ComponentVersionContainer) AddBlobFor(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { if blob == nil { return nil, errors.New("a resource has to be defined") } @@ -274,7 +281,7 @@ func (c *ComponentVersionContainer) AddBlobFor(storagectx cpi.StorageContext, bl if err != nil { return nil, err } - err = storagectx.(*ocihdlr.StorageContext).AssureLayer(blob) + err = ocihdlr.AssureLayer(c.manifest.GetDescriptor(), blob) if err != nil { return nil, err } diff --git a/pkg/contexts/ocm/repositories/genericocireg/repo_test.go b/pkg/contexts/ocm/repositories/genericocireg/repo_test.go index 4495024441..27224863c5 100644 --- a/pkg/contexts/ocm/repositories/genericocireg/repo_test.go +++ b/pkg/contexts/ocm/repositories/genericocireg/repo_test.go @@ -72,7 +72,6 @@ var _ = Describe("component repository mapping", func() { ocispec, err = ctf.NewRepositorySpec(accessobj.ACC_CREATE, "test", accessio.PathFileSystem(tempfs), accessobj.FormatDirectory) Expect(err).To(Succeed()) spec = genericocireg.NewRepositorySpec(ocispec, nil) - }) AfterEach(func() { diff --git a/pkg/contexts/ocm/repositories/genericocireg/view.go b/pkg/contexts/ocm/repositories/genericocireg/view.go deleted file mode 100644 index 09f40a6f41..0000000000 --- a/pkg/contexts/ocm/repositories/genericocireg/view.go +++ /dev/null @@ -1,23 +0,0 @@ -// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors. -// -// SPDX-License-Identifier: Apache-2.0 - -package genericocireg - -import ( - "github.com/open-component-model/ocm/pkg/contexts/oci" -) - -// TODO: add view concept to OCI context - -type nonClosing struct { - oci.Repository -} - -func (n *nonClosing) Close() error { - return nil -} - -func view(repo oci.Repository) oci.Repository { - return &nonClosing{repo} -} diff --git a/pkg/contexts/ocm/repositories/virtual/accessmethod_localblob.go b/pkg/contexts/ocm/repositories/virtual/accessmethod_localblob.go index 43200b5338..a3061410ab 100644 --- a/pkg/contexts/ocm/repositories/virtual/accessmethod_localblob.go +++ b/pkg/contexts/ocm/repositories/virtual/accessmethod_localblob.go @@ -11,22 +11,22 @@ import ( "github.com/open-component-model/ocm/pkg/blobaccess" "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/localblob" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" + "github.com/open-component-model/ocm/pkg/errors" ) type localBlobAccessMethod struct { - lock sync.Mutex - data blobaccess.DataAccess - spec *localblob.AccessSpec - access VersionAccess + lock sync.Mutex + data blobaccess.DataAccess + spec *localblob.AccessSpec } var _ accspeccpi.AccessMethodImpl = (*localBlobAccessMethod)(nil) -func newLocalBlobAccessMethod(a *localblob.AccessSpec, acc VersionAccess) *localBlobAccessMethod { +func newLocalBlobAccessMethod(a *localblob.AccessSpec, data blobaccess.DataAccess) (*localBlobAccessMethod, error) { return &localBlobAccessMethod{ - spec: a, - access: acc, - } + spec: a, + data: data, + }, nil } func (_ *localBlobAccessMethod) IsLocal() bool { @@ -45,43 +45,21 @@ func (m *localBlobAccessMethod) Close() error { m.lock.Lock() defer m.lock.Unlock() - if m.data != nil { - tmp := m.data - m.data = nil - return tmp.Close() - } - return nil -} - -func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) { - m.lock.Lock() - defer m.lock.Unlock() - - if m.data != nil { - return m.data, nil - } - data, err := m.access.GetBlob(m.spec.LocalReference) - if err != nil { - return nil, err + if m.data == nil { + return blobaccess.ErrClosed } - m.data = data - return m.data, err + list := errors.ErrorList{} + list.Add(m.data.Close()) + m.data = nil + return list.Result() } func (m *localBlobAccessMethod) Reader() (io.ReadCloser, error) { - blob, err := m.getBlob() - if err != nil { - return nil, err - } - return blob.Reader() + return m.data.Reader() } func (m *localBlobAccessMethod) Get() (data []byte, ferr error) { - b, err := m.getBlob() - if ferr != nil { - return nil, err - } - return blobaccess.BlobData(b) + return blobaccess.BlobData(m.data) } func (m *localBlobAccessMethod) MimeType() string { diff --git a/pkg/contexts/ocm/repositories/virtual/componentversion.go b/pkg/contexts/ocm/repositories/virtual/componentversion.go index a21af5097c..0e755e0a7f 100644 --- a/pkg/contexts/ocm/repositories/virtual/componentversion.go +++ b/pkg/contexts/ocm/repositories/virtual/componentversion.go @@ -15,6 +15,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/support" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/refmgmt" ) // newComponentVersionAccess creates a component access for the artifact access, if this fails the artifact acess is closed. @@ -98,7 +99,7 @@ func (c *ComponentVersionContainer) IsClosed() bool { return c.access == nil } -func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec) (cpi.AccessMethod, error) { +func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error) { accessSpec, err := c.comp.GetContext().AccessSpecForSpec(a) if err != nil { return nil, err @@ -108,13 +109,18 @@ func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec) (cpi.AccessMe case localfsblob.Type: fallthrough case localblob.Type: - return accspeccpi.AccessMethodForImplementation(newLocalBlobAccessMethod(accessSpec.(*localblob.AccessSpec), c.access), nil) + blob, err := c.access.GetBlob(accessSpec.(*localblob.AccessSpec).LocalReference) + if err != nil { + return nil, err + } + + return accspeccpi.AccessMethodForImplementation(newLocalBlobAccessMethod(accessSpec.(*localblob.AccessSpec), blob)) } return nil, errors.ErrNotSupported(errors.KIND_ACCESSMETHOD, a.GetType(), "virtual registry") } -func (c *ComponentVersionContainer) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec) string { +func (c *ComponentVersionContainer) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) string { accessSpec, err := c.comp.GetContext().AccessSpecForSpec(a) if err != nil { return "" @@ -142,11 +148,11 @@ func (c *ComponentVersionContainer) GetBlobData(name string) (cpi.DataAccess, er return c.access.GetBlob(name) } -func (c *ComponentVersionContainer) GetStorageContext(cv cpi.ComponentVersionAccess) cpi.StorageContext { - return ocmhdlr.New(c.Repository(), cv, c.access, Type, c.access) +func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext { + return ocmhdlr.New(c.Repository(), c.comp.GetName(), c.access, Type, c.access) } -func (c *ComponentVersionContainer) AddBlobFor(storagectx cpi.StorageContext, blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { +func (c *ComponentVersionContainer) AddBlobFor(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { if c.IsReadOnly() { return nil, accessio.ErrReadOnly } diff --git a/pkg/contexts/ocm/repositories/virtual/example/example.go b/pkg/contexts/ocm/repositories/virtual/example/example.go index dd34d21579..b18e0173d5 100644 --- a/pkg/contexts/ocm/repositories/virtual/example/example.go +++ b/pkg/contexts/ocm/repositories/virtual/example/example.go @@ -230,7 +230,7 @@ func (v *VersionAccess) GetInexpensiveContentVersionIdentity(a cpi.AccessSpec) s return "" } defer blob.Close() - dig, err := accessio.Digest(blob) + dig, err := blobaccess.Digest(blob) if err != nil { return "" } diff --git a/pkg/contexts/ocm/signing/handle.go b/pkg/contexts/ocm/signing/handle.go index c8d50c0508..54394ebee2 100644 --- a/pkg/contexts/ocm/signing/handle.go +++ b/pkg/contexts/ocm/signing/handle.go @@ -11,12 +11,12 @@ import ( "github.com/mandelsoft/logging" "github.com/open-component-model/ocm/pkg/common" - "github.com/open-component-model/ocm/pkg/common/accessio" "github.com/open-component-model/ocm/pkg/contexts/ocm" "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/none" "github.com/open-component-model/ocm/pkg/contexts/ocm/compdesc" metav1 "github.com/open-component-model/ocm/pkg/contexts/ocm/compdesc/meta/v1" "github.com/open-component-model/ocm/pkg/errors" + "github.com/open-component-model/ocm/pkg/finalizer" "github.com/open-component-model/ocm/pkg/signing" "github.com/open-component-model/ocm/pkg/utils" ) @@ -411,10 +411,15 @@ func doVerify(cd *compdesc.ComponentDescriptor, state WalkingState, signatureNam return spec, nil } -func calculateReferenceDigests(state WalkingState, opts *Options, legacy bool) error { +func calculateReferenceDigests(state WalkingState, opts *Options, legacy bool) (rerr error) { + var finalize finalizer.Finalizer + defer finalize.FinalizeWithErrorPropagation(&rerr) + ctx := state.Context cd := ctx.Descriptor for i, reference := range cd.References { + loop := finalize.Nested() + rnv := ocm.ComponentRefKey(&reference) nctx := state.GetContext(rnv, state.Context.CtxKey) @@ -426,11 +431,11 @@ func calculateReferenceDigests(state WalkingState, opts *Options, legacy bool) e if err != nil { return errors.Wrapf(err, refMsg(reference, "failed resolving component reference")) } + loop.Close(nested) + if nctx == nil || opts.Recursively || opts.Verify { - closer := accessio.OnceCloser(nested) - defer closer.Close() digestOpts := opts.Nested() - nctx, err = apply(state, nested, digestOpts, true) + nctx, err = apply(state, nested, digestOpts, false) if err != nil { return errors.Wrapf(err, refMsg(reference, "failed applying to component reference")) } @@ -465,14 +470,23 @@ func calculateReferenceDigests(state WalkingState, opts *Options, legacy bool) e ctx.Refs[nctx.Key] = nctx.Digest state.Logger.Debug("reference digest", "index", i, "reference", common.NewNameVersion(reference.ComponentName, reference.Version), "hashalgo", nctx.Digest.HashAlgorithm, "normalgo", nctx.Digest.NormalisationAlgorithm, "digest", nctx.Digest.Value) opts.Printer.Printf(" reference %d: %s:%s: digest %s\n", i, reference.ComponentName, reference.Version, nctx.Digest) + + if err := loop.Finalize(); err != nil { + return err + } } return nil } -func calculateResourceDigests(state WalkingState, cv ocm.ComponentVersionAccess, cd *compdesc.ComponentDescriptor, opts *Options, legacy bool, preset *metav1.NestedComponentDigests) error { +func calculateResourceDigests(state WalkingState, cv ocm.ComponentVersionAccess, cd *compdesc.ComponentDescriptor, opts *Options, legacy bool, preset *metav1.NestedComponentDigests) (rerr error) { + var finalize finalizer.Finalizer + defer finalize.FinalizeWithErrorPropagation(&rerr) + octx := cv.GetContext() blobdigesters := octx.BlobDigesters() for i, res := range cv.GetResources() { + loop := finalize.Nested() + meta := res.Meta() preset := preset.Lookup(meta.Name, meta.Version, meta.ExtraIdentity) raw := &cd.Resources[i] @@ -493,6 +507,9 @@ func calculateResourceDigests(state WalkingState, cv ocm.ComponentVersionAccess, } // special digest notation indicates to not digest the content if cd.Resources[i].Digest.IsExcluded() { + if err := loop.Finalize(); err != nil { + return err + } continue } @@ -500,6 +517,7 @@ func calculateResourceDigests(state WalkingState, cv ocm.ComponentVersionAccess, if err != nil { return errors.Wrapf(err, resMsg(raw, acc.Describe(octx), "failed creating access for resource")) } + loop.Close(meth, "method for resource "+res.Meta().Name) var rdigest *metav1.DigestSpec if raw.Digest != nil && @@ -540,6 +558,10 @@ func calculateResourceDigests(state WalkingState, cv ocm.ComponentVersionAccess, rid := res.Meta().GetIdentity(cv.GetDescriptor().Resources) state.Logger.Debug("resource digest", "index", i, "id", rid, "hashalgo", digest[0].HashAlgorithm, "normalgo", digest[0].NormalisationAlgorithm, "digest", digest[0].Value) opts.Printer.Printf(" resource %d: %s: digest %s\n", i, rid, &digest[0]) + + if err := loop.Finalize(); err != nil { + return err + } } return nil } diff --git a/pkg/contexts/ocm/signing/signing_test.go b/pkg/contexts/ocm/signing/signing_test.go index c1ae867fd9..d3856f0b19 100644 --- a/pkg/contexts/ocm/signing/signing_test.go +++ b/pkg/contexts/ocm/signing/signing_test.go @@ -279,7 +279,7 @@ applying to version "github.com/mandelsoft/test:v1"[github.com/mandelsoft/test:v cv.GetDescriptor().Resources[0].Digest.Value = "010ff2fb242a5dee4220f2cb0e6a519891fb67f2f828a6cab4ef8894633b1f50" // some wrong value _, err = Apply(nil, nil, cv, opts) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("github.com/mandelsoft/test:v1: calculated resource digest ([{HashAlgorithm:SHA-256 NormalisationAlgorithm:genericBlobDigest/v1 Value:" + D_TESTDATA + "}]) mismatches existing digest (SHA-256:010ff2fb242a5dee4220f2cb0e6a519891fb67f2f828a6cab4ef8894633b1f50[genericBlobDigest/v1]) for testdata:v1 (Local blob sha256:810ff2fb242a5dee4220f2cb0e6a519891fb67f2f828a6cab4ef8894633b1f50[])")) + Expect(err.Error()).To(StringEqualWithContext("github.com/mandelsoft/test:v1: calculated resource digest ([{HashAlgorithm:SHA-256 NormalisationAlgorithm:genericBlobDigest/v1 Value:" + D_TESTDATA + "}]) mismatches existing digest (SHA-256:010ff2fb242a5dee4220f2cb0e6a519891fb67f2f828a6cab4ef8894633b1f50[genericBlobDigest/v1]) for testdata:v1 (Local blob sha256:810ff2fb242a5dee4220f2cb0e6a519891fb67f2f828a6cab4ef8894633b1f50[])")) // Reset to original to avoid write back in readonly mode cv.GetDescriptor().Resources[0].Digest.Value = D_TESTDATA diff --git a/pkg/contexts/ocm/testhelper/refmgmt.go b/pkg/contexts/ocm/testhelper/refmgmt.go new file mode 100644 index 0000000000..d086af3735 --- /dev/null +++ b/pkg/contexts/ocm/testhelper/refmgmt.go @@ -0,0 +1,16 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors. +// +// SPDX-License-Identifier: Apache-2.0 + +package testhelper + +import ( + "github.com/mandelsoft/logging" + + ocmlog "github.com/open-component-model/ocm/pkg/logging" + "github.com/open-component-model/ocm/pkg/refmgmt" +) + +func EnableRefMgmtLog() { + ocmlog.Context().AddRule(logging.NewConditionRule(logging.TraceLevel, refmgmt.ALLOC_REALM)) +} diff --git a/pkg/contexts/ocm/transfer/transfer.go b/pkg/contexts/ocm/transfer/transfer.go index 551d188fa4..645f8457e8 100644 --- a/pkg/contexts/ocm/transfer/transfer.go +++ b/pkg/contexts/ocm/transfer/transfer.go @@ -10,7 +10,6 @@ import ( "github.com/mandelsoft/logging" "github.com/open-component-model/ocm/pkg/common" - "github.com/open-component-model/ocm/pkg/common/accessio" "github.com/open-component-model/ocm/pkg/contexts/ocm" "github.com/open-component-model/ocm/pkg/contexts/ocm/accessmethods/none" "github.com/open-component-model/ocm/pkg/contexts/ocm/compdesc" @@ -34,7 +33,7 @@ func TransferVersion(printer common.Printer, closure TransportClosure, src ocmcp return transferVersion(common.AssurePrinter(printer), Logger(src), state, src, tgt, handler) } -func transferVersion(printer common.Printer, log logging.Logger, state WalkingState, src ocmcpi.ComponentVersionAccess, tgt ocmcpi.Repository, handler TransferHandler) error { +func transferVersion(printer common.Printer, log logging.Logger, state WalkingState, src ocmcpi.ComponentVersionAccess, tgt ocmcpi.Repository, handler TransferHandler) (rerr error) { nv := common.VersionedElementKey(src) log = log.WithValues("history", state.History.String(), "version", nv) if ok, err := state.Add(ocm.KIND_COMPONENTVERSION, nv); !ok { @@ -50,17 +49,20 @@ func transferVersion(printer common.Printer, log logging.Logger, state WalkingSt } } + var finalize finalizer.Finalizer + defer finalize.FinalizeWithErrorPropagation(&rerr) + d := src.GetDescriptor() comp, err := tgt.LookupComponent(src.GetName()) if err != nil { return errors.Wrapf(err, "%s: lookup target component", state.History) } - defer comp.Close() + finalize.Close(comp, "closing target component") var ok bool t, err := comp.LookupVersion(src.GetVersion()) - defer accessio.Close(t) + finalize.Close(t, "existing target version") // references have always to be handled, because of potentially different // transport modes, which could affect the desired access methods in @@ -80,7 +82,7 @@ func transferVersion(printer common.Printer, log logging.Logger, state WalkingSt if err != nil { if errors.IsErrNotFound(err) { t, err = comp.NewVersion(src.GetVersion()) - defer accessio.Close(t) + finalize.Close(t, "new target version") } } else { if eq := d.Equivalent(t.GetDescriptor()); eq.IsHashEqual() { @@ -155,7 +157,7 @@ func transferVersion(printer common.Printer, log logging.Logger, state WalkingSt } if cv != nil { list.Add(transferVersion(subp, log.WithValues("ref", r.Name), state, cv, tgt, shdlr)) - cv.Close() + list.Addf(nil, cv.Close(), "closing reference %s", r.Name) } } diff --git a/pkg/env/builder/ocm_ctf.go b/pkg/env/builder/ocm_ctf.go index 1e56f1d2d2..25e9da4322 100644 --- a/pkg/env/builder/ocm_ctf.go +++ b/pkg/env/builder/ocm_ctf.go @@ -13,7 +13,7 @@ import ( const T_OCM_CTF = "ocm common transport format" func (b *Builder) OCMCommonTransport(path string, fmt accessio.FileFormat, f ...func()) { - r, err := ctf.Open(b.OCMContext(), accessobj.ACC_WRITABLE|accessobj.ACC_CREATE, path, 0o777, accessio.PathFileSystem(b.FileSystem())) + r, err := ctf.Open(b.OCMContext(), accessobj.ACC_WRITABLE|accessobj.ACC_CREATE, path, 0o777, fmt, accessio.PathFileSystem(b.FileSystem())) b.failOn(err) b.configure(&ocmRepository{Repository: r, kind: T_OCM_CTF}, f) } diff --git a/pkg/finalizer/finalizer.go b/pkg/finalizer/finalizer.go index b1ccc3e577..154516d493 100644 --- a/pkg/finalizer/finalizer.go +++ b/pkg/finalizer/finalizer.go @@ -5,6 +5,7 @@ package finalizer import ( + "fmt" "io" "strings" "sync" @@ -154,6 +155,16 @@ func (f *Finalizer) Close(c io.Closer, msg ...string) *Finalizer { return f } +// Closef will finalize the given object by calling +// its Close function when the finalizer is finalized +// and annotates an error with the given formatted message. +func (f *Finalizer) Closef(c io.Closer, msg string, args ...interface{}) *Finalizer { + if c != nil { + f.With(c.Close, fmt.Sprintf(msg, args...)) + } + return f +} + // ClosingWith can be used add a close request to // finalizer in a chained call. // Unfortunately it is not possible in Go diff --git a/pkg/iotools/digestreader.go b/pkg/iotools/digestreader.go new file mode 100644 index 0000000000..da23c237b9 --- /dev/null +++ b/pkg/iotools/digestreader.go @@ -0,0 +1,104 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors. +// +// SPDX-License-Identifier: Apache-2.0 + +package iotools + +import ( + "crypto" + "hash" + "io" + + "github.com/opencontainers/go-digest" + + "github.com/open-component-model/ocm/pkg/errors" +) + +// wow. digest does support a map with supported digesters. Unfortunately this one does not +// contain all the crypto hashes AND this map is private AND there is no function to add entries, +// so that it cannot be extended from outside the package. +// Therefore, we have to fake it a little to support digests with other crypto hashes. + +type DigestReader struct { + reader io.Reader + alg digest.Algorithm + hash hash.Hash + count int64 +} + +func (r *DigestReader) Size() int64 { + return r.count +} + +func (r *DigestReader) Digest() digest.Digest { + return digest.NewDigest(r.alg, r.hash) +} + +func (r *DigestReader) Read(buf []byte) (int, error) { + c, err := r.reader.Read(buf) + if c > 0 { + r.count += int64(c) + r.hash.Write(buf[:c]) + } + return c, err +} + +func NewDefaultDigestReader(r io.Reader) *DigestReader { + return NewDigestReaderWith(digest.Canonical, r) +} + +func NewDigestReaderWith(algorithm digest.Algorithm, r io.Reader) *DigestReader { + digester := algorithm.Digester() + return &DigestReader{ + reader: r, + hash: digester.Hash(), + alg: algorithm, + count: 0, + } +} + +func NewDigestReaderWithHash(hash crypto.Hash, r io.Reader) *DigestReader { + return &DigestReader{ + reader: r, + hash: hash.New(), + alg: digest.Algorithm(hash.String()), // fake a non-supported digest algorithm + count: 0, + } +} + +type verifiedReader struct { + closer io.Closer + *DigestReader + hash string + digest string +} + +func (v *verifiedReader) Close() error { + err := v.closer.Close() + if err != nil { + return err + } + dig := v.DigestReader.Digest() + if dig.Hex() != v.digest { + return errors.Newf("%s digest mismatch: expected %s, found %s", v.hash, v.digest, dig.Hex()) + } + return nil +} + +func VerifyingReader(r io.ReadCloser, digest digest.Digest) io.ReadCloser { + return &verifiedReader{ + closer: r, + DigestReader: NewDigestReaderWith(digest.Algorithm(), r), + hash: digest.Algorithm().String(), + digest: digest.Hex(), + } +} + +func VerifyingReaderWithHash(r io.ReadCloser, hash crypto.Hash, digest string) io.ReadCloser { + return &verifiedReader{ + closer: r, + DigestReader: NewDigestReaderWithHash(hash, r), + hash: hash.String(), + digest: digest, + } +} diff --git a/pkg/iotools/digestwriter.go b/pkg/iotools/digestwriter.go new file mode 100644 index 0000000000..bfb51f82a9 --- /dev/null +++ b/pkg/iotools/digestwriter.go @@ -0,0 +1,52 @@ +// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors. +// +// SPDX-License-Identifier: Apache-2.0 + +package iotools + +import ( + "io" + + "github.com/opencontainers/go-digest" +) + +type writer io.WriteCloser + +type DigestWriter struct { + writer + digester digest.Digester + count int64 +} + +func (r *DigestWriter) Size() int64 { + return r.count +} + +func (r *DigestWriter) Digest() digest.Digest { + return r.digester.Digest() +} + +func (r *DigestWriter) Write(buf []byte) (int, error) { + c, err := r.writer.Write(buf) + if c > 0 { + r.count += int64(c) + r.digester.Hash().Write(buf[:c]) + } + return c, err +} + +func NewDefaultDigestWriter(w io.WriteCloser) *DigestWriter { + return &DigestWriter{ + writer: w, + digester: digest.Canonical.Digester(), + count: 0, + } +} + +func NewDigestWriterWith(algorithm digest.Algorithm, w io.WriteCloser) *DigestWriter { + return &DigestWriter{ + writer: w, + digester: algorithm.Digester(), + count: 0, + } +} diff --git a/pkg/iotools/utils.go b/pkg/iotools/utils.go index 1c4cbd39d7..563fd7c996 100644 --- a/pkg/iotools/utils.go +++ b/pkg/iotools/utils.go @@ -4,8 +4,22 @@ package iotools +import ( + "io" +) + type NopCloser struct{} func (NopCloser) Close() error { return nil } + +type NopWriter struct { + NopCloser +} + +var _ io.Writer = NopWriter{} + +func (n2 NopWriter) Write(p []byte) (n int, err error) { + return len(p), nil +} diff --git a/pkg/refmgmt/refcloser.go b/pkg/refmgmt/refcloser.go index 5b9b679971..904c5ad40b 100644 --- a/pkg/refmgmt/refcloser.go +++ b/pkg/refmgmt/refcloser.go @@ -7,6 +7,7 @@ package refmgmt import ( "io" "sync" + "sync/atomic" "github.com/open-component-model/ocm/pkg/errors" ) @@ -16,7 +17,7 @@ var ErrClosed = errors.ErrClosed() // ReferencableCloser manages closable views to a basic closer. // If the last view is closed, the basic closer is finally closed. type ReferencableCloser interface { - Allocatable + ExtendedAllocatable RefCount() int UnrefLast() error @@ -136,13 +137,14 @@ type CloserView interface { Closer() io.Closer Execute(f func() error) error + Allocatable() ExtendedAllocatable } type view struct { lock sync.Mutex ref ReferencableCloser main bool - closed bool + closed atomic.Bool } var _ CloserView = (*view)(nil) @@ -155,10 +157,14 @@ func (v *view) RefCount() int { return v.ref.RefCount() } +func (v *view) Allocatable() ExtendedAllocatable { + return v.ref +} + func (v *view) Execute(f func() error) error { v.lock.Lock() defer v.lock.Unlock() - if v.closed { + if v.closed.Load() { return ErrClosed } return f() @@ -170,11 +176,12 @@ func (v *view) Execute(f func() error) error { func (v *view) Release() error { v.lock.Lock() defer v.lock.Unlock() - if v.closed { + if v.closed.Load() { return ErrClosed } - v.closed = true - return v.ref.Unref() + err := v.ref.Unref() + v.closed.Store(true) + return err } // Finalize will try to finalize the @@ -185,14 +192,14 @@ func (v *view) Finalize() error { v.lock.Lock() defer v.lock.Unlock() - if v.closed { + if v.closed.Load() { return ErrClosed } if err := v.ref.UnrefLast(); err != nil { return errors.Wrapf(err, "unable to unref last") } - v.closed = true + v.closed.Store(true) return nil } @@ -205,10 +212,7 @@ func (v *view) Close() error { } func (v *view) IsClosed() bool { - v.lock.Lock() - defer v.lock.Unlock() - - return v.closed + return v.closed.Load() } func (v *view) View() (CloserView, error) { diff --git a/pkg/refmgmt/refmgmt.go b/pkg/refmgmt/refmgmt.go index ea8154db25..5326965b4d 100644 --- a/pkg/refmgmt/refmgmt.go +++ b/pkg/refmgmt/refmgmt.go @@ -21,9 +21,25 @@ type Allocatable interface { Unref() error } +type CleanupHandler interface { + Cleanup() +} + +type CleanupHandlerFunc func() + +func (f CleanupHandlerFunc) Cleanup() { + f() +} + +type ExtendedAllocatable interface { + BeforeCleanup(f CleanupHandler) + Ref() error + Unref() error +} + type RefMgmt interface { - Allocatable UnrefLast() error + ExtendedAllocatable IsClosed() bool RefCount() int @@ -34,6 +50,7 @@ type refMgmt struct { lock sync.Mutex refcount int closed bool + before []CleanupHandler cleanup func() error name string } @@ -82,10 +99,12 @@ func (c *refMgmt) Unref() error { c.refcount-- allocLog.Trace("unref", "name", c.name, "refcnt", c.refcount) if c.refcount <= 0 { + for _, f := range c.before { + f.Cleanup() + } if c.cleanup != nil { err = c.cleanup() } - c.closed = true } @@ -102,6 +121,12 @@ func (c *refMgmt) RefCount() int { return c.refcount } +func (c *refMgmt) BeforeCleanup(f CleanupHandler) { + c.lock.Lock() + defer c.lock.Unlock() + c.before = append(c.before, f) +} + func (c *refMgmt) UnrefLast() error { c.lock.Lock() defer c.lock.Unlock() @@ -118,6 +143,9 @@ func (c *refMgmt) UnrefLast() error { c.refcount-- allocLog.Trace("unref last", "name", c.name, "refcnt", c.refcount) if c.refcount <= 0 { + for _, f := range c.before { + f.Cleanup() + } if c.cleanup != nil { err = c.cleanup() } diff --git a/pkg/refmgmt/resource/resource.go b/pkg/refmgmt/resource/resource.go index 2406653793..9b4f53c6fd 100644 --- a/pkg/refmgmt/resource/resource.go +++ b/pkg/refmgmt/resource/resource.go @@ -15,6 +15,7 @@ type CloserView interface { Close() error IsClosed() bool Execute(func() error) error + Allocatable() refmgmt.ExtendedAllocatable refmgmt.LazyMode refmgmt.RefCountProvider } @@ -47,6 +48,7 @@ type ResourceViewInt[T resourceViewInterface[T]] interface { resourceViewInterface[T] Execute(func() error) error + Allocatable() refmgmt.ExtendedAllocatable } type Dup[T any] interface { @@ -59,6 +61,7 @@ type Dup[T any] interface { // can be used to gain new views to a managed resource. type ViewManager[T any] interface { RefCount() int + Allocatable() refmgmt.ExtendedAllocatable View(main ...bool) (T, error) IsClosed() bool } @@ -99,6 +102,10 @@ func (i *viewManager[T, I]) RefCount() int { return i.refs.RefCount() } +func (i *viewManager[T, I]) Allocatable() refmgmt.ExtendedAllocatable { + return i.refs +} + func (i *viewManager[T, I]) View(main ...bool) (T, error) { var _nil T @@ -152,6 +159,10 @@ func (n *noneRefCloser[T]) RefCount() int { return n.mgr.RefCount() } +func (n *noneRefCloser[T]) Allocatable() refmgmt.ExtendedAllocatable { + return n.mgr.Allocatable() +} + //////////////////////////////////////////////////////////////////////////////// type resourceView[T any] struct { @@ -191,6 +202,10 @@ func (v *resourceView[T]) Execute(f func() error) error { return v.view.Execute(f) } +func (v *resourceView[T]) Allocatable() refmgmt.ExtendedAllocatable { + return v.view.Allocatable() +} + func (v *resourceView[T]) Dup() (t T, err error) { err = v.Execute(func() error { t, err = v.mgr.View() @@ -237,6 +252,10 @@ func (b *ResourceImplBase[T]) RefCount() int { return b.refs.RefCount() } +func (b *ResourceImplBase[T]) Allocatable() refmgmt.ExtendedAllocatable { + return b.refs.Allocatable() +} + func (b *ResourceImplBase[T]) View(main ...bool) (T, error) { return b.refs.View(main...) }