Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support layer deltas #902

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 139 additions & 5 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"reflect"
"sort"
"strings"
"sync"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/containers/image/v5/types"
"github.com/containers/ocicrypt"
encconfig "github.com/containers/ocicrypt/config"
"github.com/containers/tar-diff/pkg/tar-patch"
digest "github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -790,6 +792,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
srcInfosUpdated = true
}

deltaLayers, err := types.ImageDeltaLayers(ic.src, ctx)
if err != nil {
return err
}

type copyLayerData struct {
destInfo types.BlobInfo
diffID digest.Digest
Expand All @@ -809,7 +816,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress) {
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress, deltaLayers []types.BlobInfo) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
Expand All @@ -824,7 +831,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool)
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, index, srcLayer, toEncrypt, pool, deltaLayers)
}
data[index] = cld
}
Expand Down Expand Up @@ -861,7 +868,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
return errors.Wrapf(err, "Can't acquire semaphore")
}
copyGroup.Add(1)
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool)
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool, deltaLayers)
}

// A call to copyGroup.Wait() is done at this point by the defer above.
Expand Down Expand Up @@ -1043,9 +1050,83 @@ type diffIDResult struct {
err error
}

// Get all the deltas that apply to this layer
func (ic *imageCopier) getMatchingDeltaLayers(ctx context.Context, srcIndex int, deltaLayers []types.BlobInfo) (digest.Digest, []*types.BlobInfo) {
if deltaLayers == nil {
return "", nil
}
config, _ := ic.src.OCIConfig(ctx)
if config == nil || config.RootFS.DiffIDs == nil || len(config.RootFS.DiffIDs) <= srcIndex {
return "", nil
}

layerDiffId := config.RootFS.DiffIDs[srcIndex]

var matchingLayers []*types.BlobInfo
for i := range deltaLayers {
deltaLayer := &deltaLayers[i]
to := deltaLayer.Annotations["io.github.containers.delta.to"]
if to == layerDiffId.String() {
matchingLayers = append(matchingLayers, deltaLayer)
}
}

return layerDiffId, matchingLayers
}

// Looks at which of the matching delta froms have locally available data and picks the best one
func (ic *imageCopier) resolveDeltaLayer(ctx context.Context, matchingDeltas []*types.BlobInfo) (io.ReadCloser, tar_patch.DataSource, types.BlobInfo, error) {
// Sort smallest deltas so we favour the smallest useable one
sort.Slice(matchingDeltas, func(i, j int) bool {
return matchingDeltas[i].Size < matchingDeltas[j].Size
})

for i := range matchingDeltas {
matchingDelta := matchingDeltas[i]
from := matchingDelta.Annotations["io.github.containers.delta.from"]
fromDigest, err := digest.Parse(from)
if err != nil {
continue // Silently ignore if server specified a werid format
}

dataSource, err := types.ImageDestinationGetLayerDeltaData(ic.c.dest, ctx, fromDigest)
if err != nil {
return nil, nil, types.BlobInfo{}, err // Internal error
}
if dataSource == nil {
continue // from layer doesn't exist
}

logrus.Debugf("Using delta %v for DiffID %v", matchingDelta.Digest, fromDigest)

deltaStream, _, err := ic.c.rawSource.GetBlob(ctx, *matchingDelta, ic.c.blobInfoCache)
if err != nil {
return nil, nil, types.BlobInfo{}, errors.Wrapf(err, "Error reading delta blob %s", matchingDelta.Digest)
}
return deltaStream, dataSource, *matchingDelta, nil
}
return nil, nil, types.BlobInfo{}, nil
}

func (ic *imageCopier) canUseDeltas(srcInfo types.BlobInfo) (bool, string) {
// Deltas rewrite the manifest to refer to the uncompressed digest, so we must be able to substiture blobs
if !ic.canSubstituteBlobs {
return false, ""
}

switch srcInfo.MediaType {
case manifest.DockerV2Schema2LayerMediaType, manifest.DockerV2SchemaLayerMediaTypeUncompressed:
return true, manifest.DockerV2SchemaLayerMediaTypeUncompressed
case imgspecv1.MediaTypeImageLayer, imgspecv1.MediaTypeImageLayerGzip, imgspecv1.MediaTypeImageLayerZstd:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interaction for encrypting path with decryption routine

Encryption mediatypes https://github.com/containers/ocicrypt/blob/master/spec/spec.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

canUseDeltas() currently returns false (due to the above media checks) for encrypted manifests, because for now we don't support encrypted deltas. However, once we do these checks need to be widened, yes.

return true, imgspecv1.MediaTypeImageLayer
}

return false, ""
}

// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcIndex int, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, deltaLayers []types.BlobInfo) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
// Diffs are needed if we are encrypting an image or trying to decrypt an image
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" || toEncrypt || (isOciEncrypted(srcInfo.MediaType) && ic.ociDecryptConfig != nil)
Expand All @@ -1064,6 +1145,52 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
}

// First look for a delta that matches this layer and substitute the result of that
if ok, deltaResultMediaType := ic.canUseDeltas(srcInfo); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interaction for encrypting path with toEncrypt flag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here, it seem alright to me. Applying deltas naturally create the uncompressed layers, so we get the diffid etc. That said, I'm not well versed in how crypto works here, so I might be missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... so the toEncrypt flag tells the copy process to encrypt the blob when passed into copyBlobFromStream. My concern was if any deltas are created by this process, that they would need to support encryption. If they are not, then it should probably be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy never generates deltas, it just applies them to regenerate the tar. Creating deltas is currently done by a separate patch to skopeo.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of encrypted images, skopeo generate-delta will have to take encryption key as an additional argument to encrypt the deltas. skopeo copy already supports taking decryption key as an argument to decrypt the image layers, so that can be re-used to decrypt deltas too.

// Get deltas going TO this layer
deltaDiffID, matchingDeltas := ic.getMatchingDeltaLayers(ctx, srcIndex, deltaLayers)
// Get best possible FROM delta
deltaStream, deltaDataSource, matchingDelta, err := ic.resolveDeltaLayer(ctx, matchingDeltas)
if err != nil {
return types.BlobInfo{}, "", err
}
if deltaStream != nil {
bar := ic.c.createProgressBar(pool, matchingDelta, "delta", "done")

wrappedDeltaStream := bar.ProxyReader(deltaStream)

// Convert deltaStream to uncompressed tar layer stream
pr, pw := io.Pipe()
go func() {
if err := tar_patch.Apply(wrappedDeltaStream, deltaDataSource, pw); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the deltas always uncompressed/unencrypted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deltas are inherently compressed. I.e. they are not wrapped in a compression layer but instead compressed on the parts inside that need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, they are always unencrypted, so layering wise we might want to wrap some encryption around them.

// We will notice this error when failing to verify the digest, so leave it be
logrus.Infof("Failed to apply layer delta: %v", err)
}
deltaDataSource.Close()
deltaStream.Close()
wrappedDeltaStream.Close()
pw.Close()
}()
defer pr.Close()

// Copy uncompressed tar layer to destination, verifying the diffID
blobInfo, err := ic.c.copyBlobFromStream(ctx, pr, types.BlobInfo{Digest: deltaDiffID, Size: -1, MediaType: deltaResultMediaType, Annotations: srcInfo.Annotations}, nil, ic.canModifyManifest, false, toEncrypt, nil)
if err != nil {
return types.BlobInfo{}, "", err
}

bar.SetTotal(matchingDelta.Size, true)

// We verified this when streaming the applied delta above
diffID := deltaDiffID

// Record the fact that this blob is uncompressed
ic.c.blobInfoCache.RecordDigestUncompressedPair(diffID, diffID)

return blobInfo, diffID, err
}
}

// Fallback: copy the layer, computing the diffID if we need to do so
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
Expand Down Expand Up @@ -1213,7 +1340,9 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
isCompressed := decompressor != nil
destStream = bar.ProxyReader(destStream)
if bar != nil {
destStream = bar.ProxyReader(destStream)
}

// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
Expand All @@ -1232,6 +1361,11 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
logrus.Debugf("Using original blob without modification for encrypted blob")
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
} else if canModifyBlob && manifest.IsNoCompressType(srcInfo.MediaType) {
// This is a blob we should not repack, such as a delta
logrus.Debugf("Using original blob without modification for no-compress type")
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
} else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Compress && !isCompressed {
logrus.Debugf("Compressing blob on the fly")
compressionOperation = types.Compress
Expand Down
10 changes: 10 additions & 0 deletions copy/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -77,6 +78,15 @@ func (f fakeImageSource) SupportsEncryption(ctx context.Context) bool {
func (f fakeImageSource) Size() (int64, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) GetDeltaIndex(ctx context.Context) (types.ImageReference, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) DeltaLayers(ctx context.Context) ([]types.BlobInfo, error) {
panic("Unexpected call to a mock function")
}

func TestDetermineManifestConversion(t *testing.T) {
supportS1S2OCI := []string{
Expand Down
38 changes: 38 additions & 0 deletions docker/docker_image_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,44 @@ func (s *dockerImageSource) fetchManifest(ctx context.Context, tagOrDigest strin
return manblob, simplifyContentType(res.Header.Get("Content-Type")), nil
}

func (s *dockerImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
// Get the real manifest digest
srcManifestDigest, err := s.manifestDigest(ctx, instanceDigest)
if err != nil {
return nil, "", err
}

// Load the delta manifest index
ib, _, err := s.fetchManifest(ctx, "_deltaindex")
// Don't return error if the manifest doesn't exist, only for internal errors
// Deltas are an optional optimization anyway
if err == nil {
index, err := manifest.OCI1IndexFromManifest(ib)
if err != nil {
return nil, "", err
}

// Look up the delta manifest in the index by the real manifest digest
for _, manifest := range index.Manifests {
if manifest.Annotations["io.github.containers.delta.target"] == srcManifestDigest.String() {
return s.fetchManifest(ctx, manifest.Digest.String())
}
}
}

// No delta
return nil, "", nil
}

func (s *dockerImageSource) GetDeltaIndex(ctx context.Context) (types.ImageReference, error) {
deltaRef, err := reference.WithTag(s.logicalRef.ref, "_deltaindex")
if err != nil {
return nil, err
}

return newReference(deltaRef)
}

// ensureManifestIsLoaded sets s.cachedManifest and s.cachedManifestMIMEType
//
// ImageSource implementations are not required or expected to do any caching,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/containers/libtrust v0.0.0-20190913040956-14b96171aa3b
github.com/containers/ocicrypt v1.0.2
github.com/containers/storage v1.20.2
github.com/containers/tar-diff v0.1.2
github.com/docker/distribution v2.7.1+incompatible
github.com/docker/docker v1.4.2-0.20191219165747-a9416c67da9f
github.com/docker/docker-credential-helpers v0.6.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ github.com/containers/storage v1.20.1 h1:2XE4eRIqSa6YjhAZjNwIkIKE6+Miy+5WV8l1KzY
github.com/containers/storage v1.20.1/go.mod h1:RoKzO8KSDogCT6c06rEbanZTcKYxshorB33JikEGc3A=
github.com/containers/storage v1.20.2 h1:tw/uKRPDnmVrluIzer3dawTFG/bTJLP8IEUyHFhltYk=
github.com/containers/storage v1.20.2/go.mod h1:oOB9Ie8OVPojvoaKWEGSEtHbXUAs+tSyr7RO7ZGteMc=
github.com/containers/tar-diff v0.1.2 h1:6E04zGCdCsSJ8SoApFLiYkAxqkFllcKOaOATBUGydL4=
github.com/containers/tar-diff v0.1.2/go.mod h1:9/tnBUlqmoW1bz83CQAW9wC+EQCH+h1Wn8uq3VxLbMc=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
14 changes: 14 additions & 0 deletions image/unparsed.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ func (i *UnparsedImage) Manifest(ctx context.Context) ([]byte, string, error) {
return i.cachedManifest, i.cachedManifestMIMEType, nil
}

func (i *UnparsedImage) DeltaLayers(ctx context.Context) ([]types.BlobInfo, error) {
// Note that GetDeltaManifest can return nil with a nil error. This is ok if no deltas exist
mb, mt, err := types.ImageSourceGetDeltaManifest(i.src, ctx, i.instanceDigest)
if mb == nil {
return nil, err
}

m, err := manifestInstanceFromBlob(ctx, nil, i.src, mb, mt)
if err != nil {
return nil, err
}
return m.LayerInfos(), nil
}

// expectedManifestDigest returns a the expected value of the manifest digest, and an indicator whether it is known.
// The bool return value seems redundant with digest != ""; it is used explicitly
// to refuse (unexpected) situations when the digest exists but is "".
Expand Down
11 changes: 10 additions & 1 deletion manifest/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type OCI1 struct {
imgspecv1.Manifest
}

const (
// MediaTypeDescriptor specifies the media type for a content descriptor.
MediaTypeTarDiff = "application/vnd.tar-diff"
)

// SupportedOCI1MediaType checks if the specified string is a supported OCI1
// media type.
//
Expand All @@ -41,7 +46,7 @@ type OCI1 struct {
// useful for validation anyway.
func SupportedOCI1MediaType(m string) error {
switch m {
case imgspecv1.MediaTypeDescriptor, imgspecv1.MediaTypeImageConfig, imgspecv1.MediaTypeImageLayer, imgspecv1.MediaTypeImageLayerGzip, imgspecv1.MediaTypeImageLayerNonDistributable, imgspecv1.MediaTypeImageLayerNonDistributableGzip, imgspecv1.MediaTypeImageLayerNonDistributableZstd, imgspecv1.MediaTypeImageLayerZstd, imgspecv1.MediaTypeImageManifest, imgspecv1.MediaTypeLayoutHeader, ociencspec.MediaTypeLayerEnc, ociencspec.MediaTypeLayerGzipEnc:
case imgspecv1.MediaTypeDescriptor, imgspecv1.MediaTypeImageConfig, imgspecv1.MediaTypeImageLayer, imgspecv1.MediaTypeImageLayerGzip, imgspecv1.MediaTypeImageLayerNonDistributable, imgspecv1.MediaTypeImageLayerNonDistributableGzip, imgspecv1.MediaTypeImageLayerNonDistributableZstd, imgspecv1.MediaTypeImageLayerZstd, imgspecv1.MediaTypeImageManifest, imgspecv1.MediaTypeLayoutHeader, ociencspec.MediaTypeLayerEnc, ociencspec.MediaTypeLayerGzipEnc, MediaTypeTarDiff:
return nil
default:
return fmt.Errorf("unsupported OCIv1 media type: %q", m)
Expand Down Expand Up @@ -211,3 +216,7 @@ func getDecryptedMediaType(mediatype string) (string, error) {

return strings.TrimSuffix(mediatype, "+encrypted"), nil
}

func IsNoCompressType(mediatype string) bool {
return mediatype == MediaTypeTarDiff
}
15 changes: 15 additions & 0 deletions openshift/openshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ func (s *openshiftImageSource) GetManifest(ctx context.Context, instanceDigest *
return s.docker.GetManifest(ctx, instanceDigest)
}

func (s *openshiftImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
if err := s.ensureImageIsResolved(ctx); err != nil {
return nil, "", err
}
return types.ImageSourceGetDeltaManifest(s.docker, ctx, instanceDigest)
}

func (s *openshiftImageSource) GetDeltaIndex(ctx context.Context) (types.ImageReference, error) {
return types.ImageSourceGetDeltaIndex(s.docker, ctx)
}

// HasThreadSafeGetBlob indicates whether GetBlob can be executed concurrently.
func (s *openshiftImageSource) HasThreadSafeGetBlob() bool {
return false
Expand Down Expand Up @@ -511,6 +522,10 @@ func (d *openshiftImageDestination) Commit(ctx context.Context, unparsedToplevel
return d.docker.Commit(ctx, unparsedToplevel)
}

func (d *openshiftImageDestination) GetLayerDeltaData(ctx context.Context, diffID digest.Digest) (types.DeltaDataSource, error) {
return types.ImageDestinationGetLayerDeltaData(d.docker, ctx, diffID)
}

// These structs are subsets of github.com/openshift/origin/pkg/image/api/v1 and its dependencies.
type imageStream struct {
Status imageStreamStatus `json:"status,omitempty"`
Expand Down
Loading