Skip to content

Commit

Permalink
compression: support generating compressor metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
  • Loading branch information
giuseppe committed Jul 2, 2021
1 parent 48f39f6 commit b167b4b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 10 deletions.
18 changes: 14 additions & 4 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
originalLayerReader = destStream
}

compressionMetadata := map[string]string{}
// === Deal with layer compression/decompression if necessary
// WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists
// short-circuit conditions
Expand Down Expand Up @@ -1453,7 +1454,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
// If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise,
// e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed,
// we don’t care.
go c.compressGoroutine(pipeWriter, destStream, *uploadCompressionFormat) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, destStream, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter
destStream = pipeReader
inputInfo.Digest = ""
inputInfo.Size = -1
Expand All @@ -1473,7 +1474,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()

go c.compressGoroutine(pipeWriter, s, *uploadCompressionFormat) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, s, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter

destStream = pipeReader
inputInfo.Digest = ""
Expand Down Expand Up @@ -1640,17 +1641,26 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, srcCompressorName)
}
}

// Copy all the metadata generated by the compressor into the annotations.
if uploadedInfo.Annotations == nil {
uploadedInfo.Annotations = map[string]string{}
}
for k, v := range compressionMetadata {
uploadedInfo.Annotations[k] = v
}

return uploadedInfo, nil
}

// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, compressionFormat compression.Algorithm) {
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm) {
err := errors.New("Internal error: unexpected panic in compressGoroutine")
defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily.
_ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil
}()

compressor, err := compression.CompressStream(dest, compressionFormat, c.compressionLevel)
compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, c.compressionLevel)
if err != nil {
return
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,33 @@ func XzDecompressor(r io.Reader) (io.ReadCloser, error) {
}

// gzipCompressor is a CompressorFunc for the gzip compression algorithm.
func gzipCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
func gzipCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
if level != nil {
return pgzip.NewWriterLevel(r, *level)
}
return pgzip.NewWriter(r), nil
}

// bzip2Compressor is a CompressorFunc for the bzip2 compression algorithm.
func bzip2Compressor(r io.Writer, level *int) (io.WriteCloser, error) {
func bzip2Compressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
return nil, fmt.Errorf("bzip2 compression not supported")
}

// xzCompressor is a CompressorFunc for the xz compression algorithm.
func xzCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
func xzCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
return xz.NewWriter(r)
}

// CompressStream returns the compressor by its name
func CompressStream(dest io.Writer, algo Algorithm, level *int) (io.WriteCloser, error) {
return internal.AlgorithmCompressor(algo)(dest, level)
m := map[string]string{}
return internal.AlgorithmCompressor(algo)(dest, m, level)
}

// CompressStreamWithMetadata returns the compressor by its name. If the compression
// generates any metadata, it is written to the provided metadata map.
func CompressStreamWithMetadata(dest io.Writer, metadata map[string]string, algo Algorithm, level *int) (io.WriteCloser, error) {
return internal.AlgorithmCompressor(algo)(dest, metadata, level)
}

// DetectCompressionFormat returns an Algorithm and DecompressorFunc if the input is recognized as a compressed format, an invalid
Expand Down
2 changes: 1 addition & 1 deletion pkg/compression/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "io"

// CompressorFunc writes the compressed stream to the given writer using the specified compression level.
// The caller must call Close() on the stream (even if the input stream does not need closing!).
type CompressorFunc func(io.Writer, *int) (io.WriteCloser, error)
type CompressorFunc func(io.Writer, map[string]string, *int) (io.WriteCloser, error)

// DecompressorFunc returns the decompressed stream, given a compressed stream.
// The caller must call Close() on the decompressed stream (even if the compressed input stream does not need closing!).
Expand Down
2 changes: 1 addition & 1 deletion pkg/compression/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func zstdWriterWithLevel(dest io.Writer, level int) (*zstd.Encoder, error) {
}

// zstdCompressor is a CompressorFunc for the zstd compression algorithm.
func zstdCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
func zstdCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
if level == nil {
return zstdWriter(r)
}
Expand Down

0 comments on commit b167b4b

Please sign in to comment.