Skip to content

Commit

Permalink
Switch to using blake3 hashes and skipping empty blocks
Browse files Browse the repository at this point in the history
Blake3 hashes are significantly faster than blake2
hashes used previously. Also added faster empty block
detection so those don't need to get hashed at all.

Signed-off-by: Alexander Wels <awels@redhat.com>
  • Loading branch information
awels committed Oct 8, 2024
1 parent a6cdbca commit 7fd9dd1
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 43 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
github.com/onsi/ginkgo/v2 v2.14.0
github.com/onsi/gomega v1.30.0
github.com/spf13/pflag v1.0.5
github.com/zeebo/blake3 v0.2.4
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.21.0
sigs.k8s.io/controller-runtime v0.17.3
)

Expand All @@ -21,6 +21,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.12 h1:p9dKCg8i4gmOxtv35DvrYoWqYzQrvEVdjQ762Y0OqZE=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand All @@ -50,6 +52,12 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY=
github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/blake3 v0.2.4 h1:KYQPkhpRtcqh0ssGYcKLG1JYvddkEA8QwCM/yBqhaZI=
github.com/zeebo/blake3 v0.2.4/go.mod h1:7eeQ6d2iXWRGF6npfaxl2CU+xy2Fjo2gxeyZGCRUjcE=
github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo=
github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand All @@ -59,8 +67,6 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down
32 changes: 20 additions & 12 deletions pkg/blockrsync/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blockrsync

import (
"bytes"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -42,19 +43,26 @@ func (b *BlockrsyncClient) ConnectToTarget() error {
}
b.log.Info("Opened file", "file", b.sourceFile)
defer f.Close()

b.log.V(3).Info("Connecting to target", "address", b.connectionProvider.TargetAddress())
conn, err := b.connectionProvider.Connect()
if err != nil {
return err
}
defer conn.Close()
b.log.Info("Connected to target, reading file to hash")
size, err := b.hasher.HashFile(b.sourceFile)
if err != nil {
return err
}
b.sourceSize = size
b.log.V(5).Info("Hashed file", "filename", b.sourceFile, "size", size)
conn, err := b.connectionProvider.Connect()
if err != nil {
reader := snappy.NewReader(conn)
if match, err := b.hasher.CompareHashHash(conn); err != nil {
return err
} else if match {
b.log.Info("No differences found, exiting")
return nil
}
defer conn.Close()
reader := snappy.NewReader(conn)
var diff []int64
if blockSize, sourceHashes, err := b.hasher.DeserializeHashes(reader); err != nil {
return err
Expand Down Expand Up @@ -141,12 +149,7 @@ func (b *BlockrsyncClient) writeBlocksToServer(writer io.Writer, offsets []int64
}

func isEmptyBlock(buf []byte) bool {
for _, b := range buf {
if b != 0 {
return false
}
}
return true
return bytes.Equal(buf, emptyBlock)
}

func int64SortFunc(i, j int64) int {
Expand All @@ -160,6 +163,7 @@ func int64SortFunc(i, j int64) int {

type ConnectionProvider interface {
Connect() (io.ReadWriteCloser, error)
TargetAddress() string
}

type NetworkConnectionProvider struct {
Expand All @@ -177,9 +181,13 @@ func (n *NetworkConnectionProvider) Connect() (io.ReadWriteCloser, error) {
if retryCount > 30 {
return nil, fmt.Errorf("unable to connect to target after %d retries", retryCount)
}
time.Sleep(time.Second)
time.Sleep(time.Second * 10)
retryCount++
}
}
return conn, nil
}

func (n *NetworkConnectionProvider) TargetAddress() string {
return n.targetAddress
}
79 changes: 63 additions & 16 deletions pkg/blockrsync/hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ import (
"time"

"github.com/go-logr/logr"
"golang.org/x/crypto/blake2b"
"github.com/zeebo/blake3"
)

const (
DefaultBlockSize = int64(64 * 1024)
defaultConcurrency = 25
blake3HashLength = 32
)

var (
emptyBlock []byte
zeroHash []byte
)

type Hasher interface {
Expand All @@ -30,6 +36,7 @@ type Hasher interface {
SerializeHashes(io.Writer) error
DeserializeHashes(io.Reader) (int64, map[int64][]byte, error)
BlockSize() int64
CompareHashHash(io.ReadWriter) (bool, error)
}

type OffsetHash struct {
Expand All @@ -44,9 +51,13 @@ type FileHasher struct {
blockSize int64
fileSize int64
log logr.Logger
// Hash of hashes
hashHash []byte
}

func NewFileHasher(blockSize int64, log logr.Logger) Hasher {
emptyBlock = make([]byte, blockSize)
zeroHash = computeZeroHash()
return &FileHasher{
blockSize: blockSize,
queue: make(chan int64, defaultConcurrency),
Expand All @@ -56,10 +67,17 @@ func NewFileHasher(blockSize int64, log logr.Logger) Hasher {
}
}

func computeZeroHash() []byte {
h := blake3.New()
h.Write(emptyBlock)
return h.Sum(nil)
}

func (f *FileHasher) HashFile(fileName string) (int64, error) {
f.log.V(3).Info("Hashing file", "file", fileName)
t := time.Now()
defer func() {
f.hashHash = f.calculateHashHash()
f.log.V(3).Info("Hashing took", "milliseconds", time.Since(t).Milliseconds())
}()
done := make(chan struct{})
Expand All @@ -79,10 +97,7 @@ func (f *FileHasher) HashFile(fileName string) (int64, error) {

for i := 0; i < count; i++ {
wg.Add(1)
h, err := blake2b.New512(nil)
if err != nil {
return 0, err
}
h := blake3.New()
go func(h hash.Hash) {
defer wg.Done()
osFile, err := os.Open(fileName)
Expand Down Expand Up @@ -110,6 +125,14 @@ func (f *FileHasher) HashFile(fileName string) (int64, error) {
}
}

func (f *FileHasher) calculateHashHash() []byte {
h := blake3.New()
for _, v := range f.hashes {
h.Write(v)
}
return h.Sum(nil)
}

func (f *FileHasher) getFileSize(fileName string) (int64, error) {
file, err := os.Open(fileName)
if err != nil {
Expand Down Expand Up @@ -153,17 +176,23 @@ func (f *FileHasher) calculateHash(offset int64, rs io.ReadSeeker, h hash.Hash)
f.log.V(5).Info("Failed to read")
return err
}
n, err = h.Write(buf[:n])
if err != nil {
f.log.V(5).Info("Failed to write to hash")
return err
}
if n != len(buf) {
f.log.V(5).Info("Finished reading file")
var hash []byte
if bytes.Equal(buf, emptyBlock) {
hash = zeroHash
} else {
n, err = h.Write(buf[:n])
if err != nil {
f.log.V(5).Info("Failed to write to hash")
return err
}
if n != len(buf) {
f.log.V(5).Info("Finished reading file")
}
hash = h.Sum(nil)
}
offsetHash := OffsetHash{
Offset: offset,
Hash: h.Sum(nil),
Hash: hash,
}
f.res <- offsetHash
return nil
Expand Down Expand Up @@ -210,6 +239,7 @@ func (f *FileHasher) SerializeHashes(w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, int64(f.blockSize)); err != nil {
return err
}

length := len(f.hashes)
f.log.V(5).Info("Number of blocks", "size", length)
if err := binary.Write(w, binary.LittleEndian, int64(length)); err != nil {
Expand All @@ -225,8 +255,8 @@ func (f *FileHasher) SerializeHashes(w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, k); err != nil {
return err
}
if len(f.hashes[k]) != 64 {
return errors.New("invalid hash length")
if len(f.hashes[k]) != blake3HashLength {
return fmt.Errorf("invalid hash length %d", len(f.hashes[k]))
}
if n, err := w.Write(f.hashes[k]); err != nil {
return err
Expand All @@ -248,6 +278,7 @@ func (f *FileHasher) DeserializeHashes(r io.Reader) (int64, map[int64][]byte, er
if err := binary.Read(r, binary.LittleEndian, &blockSize); err != nil {
return 0, nil, err
}
f.log.V(5).Info("Block size", "size", blockSize)
var length int64
if err := binary.Read(r, binary.LittleEndian, &length); err != nil {
return 0, nil, err
Expand All @@ -263,7 +294,7 @@ func (f *FileHasher) DeserializeHashes(r io.Reader) (int64, map[int64][]byte, er
if offset < 0 || offset > length*blockSize {
return 0, nil, fmt.Errorf("invalid offset %d", offset)
}
hash := make([]byte, 64)
hash := make([]byte, blake3HashLength)
if n, err := io.ReadFull(r, hash); err != nil {
return 0, nil, err
} else {
Expand All @@ -275,6 +306,22 @@ func (f *FileHasher) DeserializeHashes(r io.Reader) (int64, map[int64][]byte, er
return blockSize, hashes, nil
}

func (f *FileHasher) CompareHashHash(rw io.ReadWriter) (bool, error) {
f.log.V(5).Info("Comparing hash of hashes", "hash", base64.StdEncoding.EncodeToString(f.hashHash))
if n, err := rw.Write(f.hashHash); err != nil {
return false, err
} else {
f.log.V(5).Info("Wrote hash of hashes", "bytes", n)
}
hashHash := make([]byte, blake3HashLength)
if n, err := io.ReadFull(rw, hashHash); err != nil {
return false, err
} else {
f.log.V(5).Info("Read hash of hashes", "bytes", n, "hash", base64.StdEncoding.EncodeToString(hashHash))
}
return bytes.Equal(hashHash, f.hashHash), nil
}

func (f *FileHasher) BlockSize() int64 {
return f.blockSize
}
4 changes: 2 additions & 2 deletions pkg/blockrsync/hasher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ var _ = Describe("hasher tests", func() {
err = hasher.SerializeHashes(w)
Expect(err).ToNot(HaveOccurred())
hashes := hasher.GetHashes()
// 16 for the blocksize and length, 72 for each hash
Expect(b.Len()).To(Equal(72*len(hashes) + 16))
// 16 for the blocksize and length, 40 for each hash (32 bytes for the hash, 8 for the offset)
Expect(b.Len()).To(Equal(40*len(hashes) + 16))
r := io.Reader(&b)
blockSize, h, err := hasher.DeserializeHashes(r)
Expect(err).ToNot(HaveOccurred())
Expand Down
26 changes: 16 additions & 10 deletions pkg/blockrsync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func (b *BlockrsyncServer) StartServer() error {
defer conn.Close()
writer := snappy.NewBufferedWriter(conn)
<-readyChan
if match, err := b.hasher.CompareHashHash(conn); err != nil {
return err
} else if match {
b.log.Info("No differences found, exiting")
return nil
}

if err := b.writeHashes(writer); err != nil {
return err
Expand Down Expand Up @@ -99,6 +105,7 @@ func (b *BlockrsyncServer) writeBlocksToFile(f *os.File, reader io.Reader) error
_, err = handleReadError(err, nocallback)
return err
}
b.targetFileSize = max(b.targetFileSize, sourceSize)
if err := b.truncateFileIfNeeded(f, sourceSize, b.targetFileSize); err != nil {
_, err = handleReadError(err, nocallback)
return err
Expand Down Expand Up @@ -131,16 +138,15 @@ func (b *BlockrsyncServer) truncateFileIfNeeded(f *os.File, sourceSize, targetSi
if err != nil {
return err
}
if targetSize > sourceSize {
b.log.V(5).Info("Source size", "size", sourceSize)
if info.Mode()&(os.ModeDevice|os.ModeCharDevice) == 0 {
// Not a block device, truncate the file if it is larger than the source file
// Truncate the target file if it is larger than the source file
b.log.V(5).Info("Source is smaller than target, truncating file")
if err := f.Truncate(sourceSize); err != nil {
return err
}
} else {
b.log.V(5).Info("Source size", "size", sourceSize)
if info.Mode()&(os.ModeDevice|os.ModeCharDevice) == 0 {
// Not a block device, set the file size to the received size
b.log.V(3).Info("Setting target file size", "size", targetSize)
if err := f.Truncate(sourceSize); err != nil {
return err
}
} else {
if targetSize > sourceSize {
// empty out existing blocks
PunchHole(f, sourceSize, targetSize-sourceSize)
}
Expand Down

0 comments on commit 7fd9dd1

Please sign in to comment.