From dd86fbe6e4e4fb5f2efcbb4c8cea74949634c1cc Mon Sep 17 00:00:00 2001 From: "yinhang.sun" Date: Wed, 25 Dec 2024 11:56:34 +0800 Subject: [PATCH 1/4] opt bytes.Index and humanize.IBytes --- internal/reader/sync_standalone_reader.go | 84 ++++++++++++----------- 1 file changed, 43 insertions(+), 41 deletions(-) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 0b3517d3..cba2b17b 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -1,7 +1,6 @@ package reader import ( - "RedisShake/internal/client/proto" "bufio" "bytes" "context" @@ -14,6 +13,8 @@ import ( "strings" "time" + "RedisShake/internal/client/proto" + "RedisShake/internal/client" "RedisShake/internal/config" "RedisShake/internal/entry" @@ -38,6 +39,8 @@ type SyncReaderOptions struct { Sentinel client.SentinelOptions `mapstructure:"sentinel"` } +const RDB_EOF_MARKER_LEN = 40 + type State string const ( @@ -65,18 +68,14 @@ type syncStandaloneReader struct { Status State `json:"status"` // rdb info - RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file - RdbFileSizeHuman string `json:"rdb_file_size_human"` - RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master - RdbReceivedHuman string `json:"rdb_received_human"` - RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan - RdbSentHuman string `json:"rdb_sent_human"` + RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file + RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master + RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan // aof info - AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master - AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan - AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master - AofReceivedHuman string `json:"aof_received_human"` + AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master + AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan + AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master } // version info @@ -319,7 +318,7 @@ func (r *syncStandaloneReader) receiveRDB() string { } timeStart = time.Now() log.Debugf("[%s] start receiving RDB. path=[%s]", r.stat.Name, rdbFilePath) - rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o666) if err != nil { log.Panicf(err.Error()) } @@ -345,39 +344,46 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write buf := make([]byte, bufSize) marker = strings.Split(marker, ":")[1] - if len(marker) != 40 { + if len(marker) != RDB_EOF_MARKER_LEN { log.Panicf("[%s] invalid len of EOF marker. value=[%s]", r.stat.Name, marker) } log.Infof("meet EOF begin marker: %s", marker) bMarker := []byte(marker) - goon := true - for goon { - n, err := r.client.Read(buf[:bufSize]) + var lastBytes []byte + for { + if lastBytes != nil { // add previous tail + copy(buf, lastBytes) + } + + nread, err := r.client.Read(buf[len(lastBytes):]) if err != nil { log.Panicf(err.Error()) } - buffer := buf[:n] - if bytes.Contains(buffer, bMarker) { + + bufLen := len(lastBytes) + nread + nwrite := 0 + if bufLen >= RDB_EOF_MARKER_LEN && bytes.Equal(buf[bufLen-RDB_EOF_MARKER_LEN:bufLen], bMarker) { log.Infof("meet EOF end marker.") - // replace it - fi := bytes.Index(buffer, bMarker) - if len(buffer[fi+40:]) > 0 { - log.Warnf("data after end marker will be discarded: %s", string(buffer[fi+40:])) + // Write all buf without EOF marker and break + if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil { + log.Panicf(err.Error()) } - buffer = buffer[:fi] - - goon = false + break } - _, err = wt.Write(buffer) - if err != nil { - log.Panicf(err.Error()) + if bufLen >= RDB_EOF_MARKER_LEN { + // left RDB_EOF_MARKER_LEN bytes to next round + if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil { + log.Panicf(err.Error()) + } + lastBytes = buf[bufLen-RDB_EOF_MARKER_LEN : bufLen] // save last RDB_EOF_MARKER_LEN bytes into lastBytes for next round + } else { + // save all bytes into lastBytes for next round if less than RDB_EOF_MARKER_LEN + lastBytes = buf[:bufLen] } - r.stat.RdbFileSizeBytes += int64(n) - r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(r.stat.RdbFileSizeBytes)) - r.stat.RdbReceivedBytes += int64(n) - r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes)) + r.stat.RdbFileSizeBytes += uint64(nwrite) + r.stat.RdbReceivedBytes += uint64(nwrite) } } @@ -387,8 +393,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr log.Panicf(err.Error()) } log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length))) - r.stat.RdbFileSizeBytes = length - r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length)) + r.stat.RdbFileSizeBytes = uint64(length) remainder := length const bufSize int64 = 32 * 1024 * 1024 // 32MB @@ -408,8 +413,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr log.Panicf(err.Error()) } - r.stat.RdbReceivedBytes += int64(n) - r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes)) + r.stat.RdbReceivedBytes += uint64(n) } } @@ -428,7 +432,6 @@ func (r *syncStandaloneReader) receiveAOF() { log.Panicf(err.Error()) } r.stat.AofReceivedBytes += int64(n) - r.stat.AofReceivedHuman = humanize.IBytes(uint64(r.stat.AofReceivedBytes)) aofWriter.Write(buf[:n]) r.stat.AofReceivedOffset += int64(n) } @@ -441,7 +444,6 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) { r.stat.Status = kSyncRdb updateFunc := func(offset int64) { r.stat.RdbSentBytes = offset - r.stat.RdbSentHuman = humanize.IBytes(uint64(offset)) } rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch) r.DbId = rdbLoader.ParseRDB(r.ctx) @@ -532,16 +534,16 @@ func (r *syncStandaloneReader) Status() interface{} { func (r *syncStandaloneReader) StatusString() string { if r.stat.Status == kSyncRdb { - return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbSentHuman, r.stat.RdbFileSizeHuman) + return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(uint64(r.stat.RdbSentBytes)), humanize.IBytes(r.stat.RdbFileSizeBytes)) } if r.stat.Status == kSyncAof { return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset) } if r.stat.Status == kReceiveRdb { if r.isDiskless { - return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, r.stat.RdbReceivedHuman) + return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes)) } - return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbReceivedHuman, r.stat.RdbFileSizeHuman) + return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes), humanize.IBytes(r.stat.RdbFileSizeBytes)) } return string(r.stat.Status) } From 064fd9ee5452f8ec1ab311e8a7071518dc767c69 Mon Sep 17 00:00:00 2001 From: "yinhang.sun" Date: Thu, 26 Dec 2024 11:10:16 +0800 Subject: [PATCH 2/4] fix humanize missing in sync_standalone_reader and add testcase to cover it --- go.mod | 4 +- go.sum | 2 - internal/reader/sync_standalone_reader.go | 76 +++++++++++++----- .../reader/sync_standalone_reader_test.go | 78 +++++++++++++++++++ 4 files changed, 136 insertions(+), 24 deletions(-) create mode 100644 internal/reader/sync_standalone_reader_test.go diff --git a/go.mod b/go.mod index b9ca5c74..b783ef9e 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,12 @@ require ( github.com/mcuadros/go-defaults v1.2.0 github.com/rs/zerolog v1.28.0 github.com/spf13/viper v1.18.1 + github.com/stretchr/testify v1.8.4 github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 ) require ( - github.com/a8m/envsubst v1.4.2 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.7 // indirect @@ -21,6 +22,7 @@ require ( github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect diff --git a/go.sum b/go.sum index 92ef3dc5..ecce9d75 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/a8m/envsubst v1.4.2 h1:4yWIHXOLEJHQEFd4UjrWDrYeYlV7ncFWJOCBRLOZHQg= -github.com/a8m/envsubst v1.4.2/go.mod h1:MVUTQNGQ3tsjOOtKCNd+fl8RzhsXcDvvAEzkhGtlsbY= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index cba2b17b..c6a37c06 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "encoding/json" "fmt" "io" "os" @@ -51,6 +52,56 @@ const ( kSyncAof State = "syncing aof" ) +type syncStandaloneReaderStat struct { + Name string `json:"name"` + Address string `json:"address"` + Dir string `json:"dir"` + + // status + Status State `json:"status"` + + // rdb info + RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file + RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master + RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan + + // aof info + AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master + AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan + AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master +} + +func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) { + rdbFileSizeHuman, rdbReceivedHuman, rdbSentHuman, aofReceivedHuman := "", "", "", "" + if s.RdbFileSizeBytes != 0 { + rdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes) + } + if s.RdbReceivedBytes != 0 { + rdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes) + } + if s.RdbSentBytes != 0 { + rdbSentHuman = humanize.IBytes(s.RdbSentBytes) + } + if s.AofReceivedBytes != 0 { + aofReceivedHuman = humanize.IBytes(s.AofReceivedBytes) + } + + type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion + return json.Marshal(struct { + aliasStat + RdbFileSizeHuman string `json:"rdb_file_size_human"` + RdbReceivedHuman string `json:"rdb_received_human"` + RdbSentHuman string `json:"rdb_sent_human"` + AofReceivedHuman string `json:"aof_received_human"` + }{ + aliasStat: aliasStat(s), + RdbFileSizeHuman: rdbFileSizeHuman, + RdbReceivedHuman: rdbReceivedHuman, + RdbSentHuman: rdbSentHuman, + AofReceivedHuman: aofReceivedHuman, + }) +} + type syncStandaloneReader struct { ctx context.Context opts *SyncReaderOptions @@ -59,24 +110,7 @@ type syncStandaloneReader struct { ch chan *entry.Entry DbId int - stat struct { - Name string `json:"name"` - Address string `json:"address"` - Dir string `json:"dir"` - - // status - Status State `json:"status"` - - // rdb info - RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file - RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master - RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan - - // aof info - AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master - AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan - AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master - } + stat syncStandaloneReaderStat // version info isDiskless bool @@ -431,7 +465,7 @@ func (r *syncStandaloneReader) receiveAOF() { if err != nil { log.Panicf(err.Error()) } - r.stat.AofReceivedBytes += int64(n) + r.stat.AofReceivedBytes += uint64(n) aofWriter.Write(buf[:n]) r.stat.AofReceivedOffset += int64(n) } @@ -443,7 +477,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) { log.Debugf("[%s] start sending RDB to target", r.stat.Name) r.stat.Status = kSyncRdb updateFunc := func(offset int64) { - r.stat.RdbSentBytes = offset + r.stat.RdbSentBytes = uint64(offset) } rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch) r.DbId = rdbLoader.ParseRDB(r.ctx) @@ -534,7 +568,7 @@ func (r *syncStandaloneReader) Status() interface{} { func (r *syncStandaloneReader) StatusString() string { if r.stat.Status == kSyncRdb { - return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(uint64(r.stat.RdbSentBytes)), humanize.IBytes(r.stat.RdbFileSizeBytes)) + return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbSentBytes), humanize.IBytes(r.stat.RdbFileSizeBytes)) } if r.stat.Status == kSyncAof { return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset) diff --git a/internal/reader/sync_standalone_reader_test.go b/internal/reader/sync_standalone_reader_test.go new file mode 100644 index 00000000..457d9cbb --- /dev/null +++ b/internal/reader/sync_standalone_reader_test.go @@ -0,0 +1,78 @@ +package reader + +import ( + "context" + "encoding/json" + "testing" + + "RedisShake/internal/log" + "github.com/stretchr/testify/require" +) + +func Test_syncStandaloneReader_Status(t *testing.T) { + type fields struct { + ctx context.Context + opts *SyncReaderOptions + } + tests := []struct { + name string + fields fields + want interface{} + }{ + { + name: "syncStandaloneReader_Status_Marshal", + fields: fields{ + ctx: context.Background(), + opts: &SyncReaderOptions{ + Cluster: false, + Address: "127.0.0.1:6379", + Username: "username", + Password: "password", + Tls: false, + SyncRdb: false, + SyncAof: false, + PreferReplica: false, + TryDiskless: false, + }, + }, + want: map[string]interface{}{ + "name": "", + "address": "", + "dir": "", + "status": "", + "rdb_file_size_bytes": 0, + "rdb_file_size_human": "", + "rdb_received_bytes": 0, + "rdb_received_human": "", + "rdb_sent_bytes": 0, + "rdb_sent_human": "", + "aof_received_offset": 0, + "aof_sent_offset": 0, + "aof_received_bytes": 0, + "aof_received_human": "", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &syncStandaloneReader{ + ctx: tt.fields.ctx, + opts: tt.fields.opts, + } + + want, err := json.Marshal(tt.want) + if err != nil { + log.Warnf("marshal status failed, err=[%v]", err) + return + } + + got, err := json.Marshal(r.Status()) + if err != nil { + log.Warnf("marshal status failed, err=[%v]", err) + return + } + + require.JSONEq(t, string(want), string(got)) + }) + } +} From 5b04f0419b949bb7dcbd54d05411eddde04091a3 Mon Sep 17 00:00:00 2001 From: "yinhang.sun" Date: Thu, 26 Dec 2024 11:11:23 +0800 Subject: [PATCH 3/4] fix typo --- internal/reader/sync_standalone_reader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/reader/sync_standalone_reader_test.go b/internal/reader/sync_standalone_reader_test.go index 457d9cbb..3485f092 100644 --- a/internal/reader/sync_standalone_reader_test.go +++ b/internal/reader/sync_standalone_reader_test.go @@ -62,7 +62,7 @@ func Test_syncStandaloneReader_Status(t *testing.T) { want, err := json.Marshal(tt.want) if err != nil { - log.Warnf("marshal status failed, err=[%v]", err) + log.Warnf("marshal want failed, err=[%v]", err) return } From 467a111f644e8feba92fe296b8a42f7954a62989 Mon Sep 17 00:00:00 2001 From: "yinhang.sun" Date: Fri, 27 Dec 2024 10:44:11 +0800 Subject: [PATCH 4/4] simplify syncStandaloneReaderStat and remove if clause around copy --- internal/reader/sync_standalone_reader.go | 35 ++++++++--------------- 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index c6a37c06..c8d16a9e 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -62,44 +62,35 @@ type syncStandaloneReaderStat struct { // rdb info RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file - RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master - RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan + RdbFileSizeHuman string `json:"rdb_file_size_human"` + RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master + RdbReceivedHuman string `json:"rdb_received_human"` + RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan + RdbSentHuman string `json:"rdb_sent_human"` // aof info AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master + AofReceivedHuman string `json:"aof_received_human"` } func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) { - rdbFileSizeHuman, rdbReceivedHuman, rdbSentHuman, aofReceivedHuman := "", "", "", "" if s.RdbFileSizeBytes != 0 { - rdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes) + s.RdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes) } if s.RdbReceivedBytes != 0 { - rdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes) + s.RdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes) } if s.RdbSentBytes != 0 { - rdbSentHuman = humanize.IBytes(s.RdbSentBytes) + s.RdbSentHuman = humanize.IBytes(s.RdbSentBytes) } if s.AofReceivedBytes != 0 { - aofReceivedHuman = humanize.IBytes(s.AofReceivedBytes) + s.AofReceivedHuman = humanize.IBytes(s.AofReceivedBytes) } type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion - return json.Marshal(struct { - aliasStat - RdbFileSizeHuman string `json:"rdb_file_size_human"` - RdbReceivedHuman string `json:"rdb_received_human"` - RdbSentHuman string `json:"rdb_sent_human"` - AofReceivedHuman string `json:"aof_received_human"` - }{ - aliasStat: aliasStat(s), - RdbFileSizeHuman: rdbFileSizeHuman, - RdbReceivedHuman: rdbReceivedHuman, - RdbSentHuman: rdbSentHuman, - AofReceivedHuman: aofReceivedHuman, - }) + return json.Marshal(aliasStat(s)) } type syncStandaloneReader struct { @@ -385,9 +376,7 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write bMarker := []byte(marker) var lastBytes []byte for { - if lastBytes != nil { // add previous tail - copy(buf, lastBytes) - } + copy(buf, lastBytes) // copy previous tail bytes to head of buf nread, err := r.client.Read(buf[len(lastBytes):]) if err != nil {