diff --git a/go.mod b/go.mod index 205b5b06..473f97c4 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,13 @@ require ( github.com/mcuadros/go-defaults v1.2.0 github.com/rs/zerolog v1.28.0 github.com/spf13/viper v1.18.1 + github.com/stretchr/testify v1.8.4 github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.7 // indirect @@ -21,6 +23,7 @@ require ( github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 0b3517d3..c8d16a9e 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -1,10 +1,10 @@ package reader import ( - "RedisShake/internal/client/proto" "bufio" "bytes" "context" + "encoding/json" "fmt" "io" "os" @@ -14,6 +14,8 @@ import ( "strings" "time" + "RedisShake/internal/client/proto" + "RedisShake/internal/client" "RedisShake/internal/config" "RedisShake/internal/entry" @@ -38,6 +40,8 @@ type SyncReaderOptions struct { Sentinel client.SentinelOptions `mapstructure:"sentinel"` } +const RDB_EOF_MARKER_LEN = 40 + type State string const ( @@ -48,6 +52,47 @@ const ( kSyncAof State = "syncing aof" ) +type syncStandaloneReaderStat struct { + Name string `json:"name"` + Address string `json:"address"` + Dir string `json:"dir"` + + // status + Status State `json:"status"` + + // rdb info + RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file + RdbFileSizeHuman string `json:"rdb_file_size_human"` + RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master + RdbReceivedHuman string `json:"rdb_received_human"` + RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan + RdbSentHuman string `json:"rdb_sent_human"` + + // aof info + AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master + AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan + AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master + AofReceivedHuman string `json:"aof_received_human"` +} + +func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) { + if s.RdbFileSizeBytes != 0 { + s.RdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes) + } + if s.RdbReceivedBytes != 0 { + s.RdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes) + } + if s.RdbSentBytes != 0 { + s.RdbSentHuman = humanize.IBytes(s.RdbSentBytes) + } + if s.AofReceivedBytes != 0 { + s.AofReceivedHuman = humanize.IBytes(s.AofReceivedBytes) + } + + type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion + return json.Marshal(aliasStat(s)) +} + type syncStandaloneReader struct { ctx context.Context opts *SyncReaderOptions @@ -56,28 +101,7 @@ type syncStandaloneReader struct { ch chan *entry.Entry DbId int - stat struct { - Name string `json:"name"` - Address string `json:"address"` - Dir string `json:"dir"` - - // status - Status State `json:"status"` - - // rdb info - RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file - RdbFileSizeHuman string `json:"rdb_file_size_human"` - RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master - RdbReceivedHuman string `json:"rdb_received_human"` - RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan - RdbSentHuman string `json:"rdb_sent_human"` - - // aof info - AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master - AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan - AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master - AofReceivedHuman string `json:"aof_received_human"` - } + stat syncStandaloneReaderStat // version info isDiskless bool @@ -319,7 +343,7 @@ func (r *syncStandaloneReader) receiveRDB() string { } timeStart = time.Now() log.Debugf("[%s] start receiving RDB. path=[%s]", r.stat.Name, rdbFilePath) - rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o666) if err != nil { log.Panicf(err.Error()) } @@ -345,39 +369,44 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write buf := make([]byte, bufSize) marker = strings.Split(marker, ":")[1] - if len(marker) != 40 { + if len(marker) != RDB_EOF_MARKER_LEN { log.Panicf("[%s] invalid len of EOF marker. value=[%s]", r.stat.Name, marker) } log.Infof("meet EOF begin marker: %s", marker) bMarker := []byte(marker) - goon := true - for goon { - n, err := r.client.Read(buf[:bufSize]) + var lastBytes []byte + for { + copy(buf, lastBytes) // copy previous tail bytes to head of buf + + nread, err := r.client.Read(buf[len(lastBytes):]) if err != nil { log.Panicf(err.Error()) } - buffer := buf[:n] - if bytes.Contains(buffer, bMarker) { + + bufLen := len(lastBytes) + nread + nwrite := 0 + if bufLen >= RDB_EOF_MARKER_LEN && bytes.Equal(buf[bufLen-RDB_EOF_MARKER_LEN:bufLen], bMarker) { log.Infof("meet EOF end marker.") - // replace it - fi := bytes.Index(buffer, bMarker) - if len(buffer[fi+40:]) > 0 { - log.Warnf("data after end marker will be discarded: %s", string(buffer[fi+40:])) + // Write all buf without EOF marker and break + if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil { + log.Panicf(err.Error()) } - buffer = buffer[:fi] - - goon = false + break } - _, err = wt.Write(buffer) - if err != nil { - log.Panicf(err.Error()) + if bufLen >= RDB_EOF_MARKER_LEN { + // left RDB_EOF_MARKER_LEN bytes to next round + if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil { + log.Panicf(err.Error()) + } + lastBytes = buf[bufLen-RDB_EOF_MARKER_LEN : bufLen] // save last RDB_EOF_MARKER_LEN bytes into lastBytes for next round + } else { + // save all bytes into lastBytes for next round if less than RDB_EOF_MARKER_LEN + lastBytes = buf[:bufLen] } - r.stat.RdbFileSizeBytes += int64(n) - r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(r.stat.RdbFileSizeBytes)) - r.stat.RdbReceivedBytes += int64(n) - r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes)) + r.stat.RdbFileSizeBytes += uint64(nwrite) + r.stat.RdbReceivedBytes += uint64(nwrite) } } @@ -387,8 +416,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr log.Panicf(err.Error()) } log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length))) - r.stat.RdbFileSizeBytes = length - r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length)) + r.stat.RdbFileSizeBytes = uint64(length) remainder := length const bufSize int64 = 32 * 1024 * 1024 // 32MB @@ -408,8 +436,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr log.Panicf(err.Error()) } - r.stat.RdbReceivedBytes += int64(n) - r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes)) + r.stat.RdbReceivedBytes += uint64(n) } } @@ -427,8 +454,7 @@ func (r *syncStandaloneReader) receiveAOF() { if err != nil { log.Panicf(err.Error()) } - r.stat.AofReceivedBytes += int64(n) - r.stat.AofReceivedHuman = humanize.IBytes(uint64(r.stat.AofReceivedBytes)) + r.stat.AofReceivedBytes += uint64(n) aofWriter.Write(buf[:n]) r.stat.AofReceivedOffset += int64(n) } @@ -440,8 +466,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) { log.Debugf("[%s] start sending RDB to target", r.stat.Name) r.stat.Status = kSyncRdb updateFunc := func(offset int64) { - r.stat.RdbSentBytes = offset - r.stat.RdbSentHuman = humanize.IBytes(uint64(offset)) + r.stat.RdbSentBytes = uint64(offset) } rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch) r.DbId = rdbLoader.ParseRDB(r.ctx) @@ -532,16 +557,16 @@ func (r *syncStandaloneReader) Status() interface{} { func (r *syncStandaloneReader) StatusString() string { if r.stat.Status == kSyncRdb { - return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbSentHuman, r.stat.RdbFileSizeHuman) + return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbSentBytes), humanize.IBytes(r.stat.RdbFileSizeBytes)) } if r.stat.Status == kSyncAof { return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset) } if r.stat.Status == kReceiveRdb { if r.isDiskless { - return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, r.stat.RdbReceivedHuman) + return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes)) } - return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbReceivedHuman, r.stat.RdbFileSizeHuman) + return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes), humanize.IBytes(r.stat.RdbFileSizeBytes)) } return string(r.stat.Status) } diff --git a/internal/reader/sync_standalone_reader_test.go b/internal/reader/sync_standalone_reader_test.go new file mode 100644 index 00000000..3485f092 --- /dev/null +++ b/internal/reader/sync_standalone_reader_test.go @@ -0,0 +1,78 @@ +package reader + +import ( + "context" + "encoding/json" + "testing" + + "RedisShake/internal/log" + "github.com/stretchr/testify/require" +) + +func Test_syncStandaloneReader_Status(t *testing.T) { + type fields struct { + ctx context.Context + opts *SyncReaderOptions + } + tests := []struct { + name string + fields fields + want interface{} + }{ + { + name: "syncStandaloneReader_Status_Marshal", + fields: fields{ + ctx: context.Background(), + opts: &SyncReaderOptions{ + Cluster: false, + Address: "127.0.0.1:6379", + Username: "username", + Password: "password", + Tls: false, + SyncRdb: false, + SyncAof: false, + PreferReplica: false, + TryDiskless: false, + }, + }, + want: map[string]interface{}{ + "name": "", + "address": "", + "dir": "", + "status": "", + "rdb_file_size_bytes": 0, + "rdb_file_size_human": "", + "rdb_received_bytes": 0, + "rdb_received_human": "", + "rdb_sent_bytes": 0, + "rdb_sent_human": "", + "aof_received_offset": 0, + "aof_sent_offset": 0, + "aof_received_bytes": 0, + "aof_received_human": "", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &syncStandaloneReader{ + ctx: tt.fields.ctx, + opts: tt.fields.opts, + } + + want, err := json.Marshal(tt.want) + if err != nil { + log.Warnf("marshal want failed, err=[%v]", err) + return + } + + got, err := json.Marshal(r.Status()) + if err != nil { + log.Warnf("marshal status failed, err=[%v]", err) + return + } + + require.JSONEq(t, string(want), string(got)) + }) + } +}