Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
hilmarf authored Dec 20, 2024
2 parents bacea6e + 2cb3187 commit bcce5b7
Show file tree
Hide file tree
Showing 20 changed files with 983 additions and 228 deletions.
11 changes: 11 additions & 0 deletions api/credentials/identity/hostpath/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,14 @@ func PathPrefix(id cpi.ConsumerIdentity) string {
}
return strings.TrimPrefix(id[ID_PATHPREFIX], "/")
}

func HostPort(id cpi.ConsumerIdentity) string {
if id == nil {
return ""
}
host := id[ID_HOSTNAME]
if port, ok := id[ID_PORT]; ok {
return host + ":" + port
}
return host
}
16 changes: 13 additions & 3 deletions api/oci/extensions/repositories/ocireg/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/containerd/errdefs"
"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/logging"
"github.com/moby/locker"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"

Expand Down Expand Up @@ -69,6 +70,7 @@ func NewRepository(ctx cpi.Context, spec *RepositorySpec, info *RepositoryInfo)
spec: spec,
info: info,
}
i.logger.Debug("created repository")
return cpi.NewRepository(i), nil
}

Expand Down Expand Up @@ -164,14 +166,22 @@ func (r *RepositoryImpl) getResolver(comp string) (oras.Resolver, error) {
}

authClient := &auth.Client{
Client: client,
Cache: auth.NewCache(),
Credential: auth.StaticCredential(r.info.HostPort(), authCreds),
Client: client,
Cache: auth.NewCache(),
Credential: auth.CredentialFunc(func(ctx context.Context, hostport string) (auth.Credential, error) {
if strings.Contains(hostport, r.info.HostPort()) {
return authCreds, nil
}
logger.Warn("no credentials for host", "host", hostport)
return auth.EmptyCredential, nil
}),
}

return oras.New(oras.ClientOptions{
Client: authClient,
PlainHTTP: r.info.Scheme == "http",
Logger: logger,
Lock: locker.New(),
}), nil
}

Expand Down
18 changes: 18 additions & 0 deletions api/ocm/cpi/repocpi/bridge_r.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ type RepositoryImpl interface {
io.Closer
}

// Chunked is an optional interface, which
// may be implemented to accept a blob limit for mapping
// local blobs to an external storage system.
type Chunked interface {
// SetBlobLimit sets the blob limit if possible.
// It returns true, if this was successful.
SetBlobLimit(s int64) bool
}

// SetBlobLimit tries to set a blob limit for a repository
// implementation. It returns true, if this was possible.
func SetBlobLimit(i RepositoryImpl, s int64) bool {
if c, ok := i.(Chunked); ok {
return c.SetBlobLimit(s)
}
return false
}

type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository]

type repositoryBridge struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package genericocireg

import (
"bytes"
"io"
"os"
"strings"
"sync"

"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/goutils/finalizer"
"github.com/opencontainers/go-digest"

"ocm.software/ocm/api/oci"
Expand Down Expand Up @@ -88,9 +92,19 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) {
return nil, errors.ErrNotImplemented("artifact blob synthesis")
}
}
_, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
if err != nil {
return nil, err
refs := strings.Split(m.spec.LocalReference, ",")

var (
data blobaccess.DataAccess
err error
)
if len(refs) < 2 {
_, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
if err != nil {
return nil, err
}
} else {
data = &composedBlock{m, refs}
}
m.data = data
return m.data, err
Expand All @@ -111,3 +125,119 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) {
func (m *localBlobAccessMethod) MimeType() string {
return m.spec.MediaType
}

////////////////////////////////////////////////////////////////////////////////

type composedBlock struct {
m *localBlobAccessMethod
refs []string
}

var _ blobaccess.DataAccess = (*composedBlock)(nil)

func (c *composedBlock) Get() ([]byte, error) {
buf := bytes.NewBuffer(nil)
for _, ref := range c.refs {
var finalize finalizer.Finalizer

_, data, err := c.m.namespace.GetBlobData(digest.Digest(ref))
if err != nil {
return nil, err
}
finalize.Close(data)
r, err := data.Reader()
if err != nil {
return nil, err
}
finalize.Close(r)
_, err = io.Copy(buf, r)
if err != nil {
return nil, err
}
err = finalize.Finalize()
if err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}

func (c *composedBlock) Reader() (io.ReadCloser, error) {
return &composedReader{
m: c.m,
refs: c.refs,
}, nil
}

func (c *composedBlock) Close() error {
return nil
}

type composedReader struct {
lock sync.Mutex
m *localBlobAccessMethod
refs []string
reader io.ReadCloser
data blobaccess.DataAccess
}

func (c *composedReader) Read(p []byte) (n int, err error) {
c.lock.Lock()
defer c.lock.Unlock()

for {
if c.reader != nil {
n, err := c.reader.Read(p)

if err == io.EOF {
c.reader.Close()
c.data.Close()
c.refs = c.refs[1:]
c.reader = nil
c.data = nil
// start new layer and return partial (>0) read before next layer is started
err = nil
}
// return partial read (even a zero read if layer is not yet finished) or error
if c.reader != nil || err != nil || n > 0 {
return n, err
}
// otherwise, we can use the given buffer for the next layer

// now, we have to check for a next succeeding layer.
// This means to finish with the actual reader and continue
// with the next one.
}

// If no more layers are available, report EOF.
if len(c.refs) == 0 {
return 0, io.EOF
}

ref := strings.TrimSpace(c.refs[0])
_, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref))
if err != nil {
return 0, err
}
c.reader, err = c.data.Reader()
if err != nil {
return 0, err
}
}
}

func (c *composedReader) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.reader == nil && c.refs == nil {
return os.ErrClosed
}
if c.reader != nil {
c.reader.Close()
c.data.Close()
c.reader = nil
c.refs = nil
}
return nil
}
53 changes: 53 additions & 0 deletions api/ocm/extensions/repositories/genericocireg/bloblimits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package genericocireg

import (
"sync"

configctx "ocm.software/ocm/api/config"
"ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config"
)

var (
defaultBlobLimits config.BlobLimits
lock sync.Mutex
)

const (
KB = int64(1000)
MB = 1000 * KB
GB = 1000 * MB
)

func init() {
defaultBlobLimits = config.BlobLimits{}

// Add limits for known OCI repositories, here,
// or provide init functions in specialized packages
// by calling AddDefaultBlobLimit.
AddDefaultBlobLimit("ghcr.io", 10*GB) // https://github.com/orgs/community/discussions/77429
}

// AddDefaultBlobLimit can be used to set default blob limits
// for known repositories.
// Those limits will be overwritten, by blob limits
// given by a configuration object and the repository
// specification.
func AddDefaultBlobLimit(name string, limit int64) {
lock.Lock()
defer lock.Unlock()

defaultBlobLimits[name] = limit
}

func ConfigureBlobLimits(ctx configctx.ContextProvider, target config.Configurable) {
if target != nil {
lock.Lock()
defer lock.Unlock()

target.ConfigureBlobLimits(defaultBlobLimits)

if ctx != nil {
ctx.ConfigContext().ApplyTo(0, target)
}
}
}
Loading

0 comments on commit bcce5b7

Please sign in to comment.