Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize: bytes.Index and humanize.IBytes in SyncStandaloneReader #909

Open
wants to merge 5 commits into
base: v4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ require (
github.com/mcuadros/go-defaults v1.2.0
github.com/rs/zerolog v1.28.0
github.com/spf13/viper v1.18.1
github.com/stretchr/testify v1.8.4
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
135 changes: 80 additions & 55 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package reader

import (
"RedisShake/internal/client/proto"
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -14,6 +14,8 @@ import (
"strings"
"time"

"RedisShake/internal/client/proto"

"RedisShake/internal/client"
"RedisShake/internal/config"
"RedisShake/internal/entry"
Expand All @@ -38,6 +40,8 @@ type SyncReaderOptions struct {
Sentinel client.SentinelOptions `mapstructure:"sentinel"`
}

const RDB_EOF_MARKER_LEN = 40

type State string

const (
Expand All @@ -48,6 +52,47 @@ const (
kSyncAof State = "syncing aof"
)

type syncStandaloneReaderStat struct {
Name string `json:"name"`
Address string `json:"address"`
Dir string `json:"dir"`

// status
Status State `json:"status"`

// rdb info
RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbFileSizeHuman string `json:"rdb_file_size_human"`
RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbReceivedHuman string `json:"rdb_received_human"`
RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
RdbSentHuman string `json:"rdb_sent_human"`

// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master
AofReceivedHuman string `json:"aof_received_human"`
}

func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) {
suxb201 marked this conversation as resolved.
Show resolved Hide resolved
if s.RdbFileSizeBytes != 0 {
s.RdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes)
}
if s.RdbReceivedBytes != 0 {
s.RdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes)
}
if s.RdbSentBytes != 0 {
s.RdbSentHuman = humanize.IBytes(s.RdbSentBytes)
}
if s.AofReceivedBytes != 0 {
s.AofReceivedHuman = humanize.IBytes(s.AofReceivedBytes)
}

type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion
return json.Marshal(aliasStat(s))
}

type syncStandaloneReader struct {
ctx context.Context
opts *SyncReaderOptions
Expand All @@ -56,28 +101,7 @@ type syncStandaloneReader struct {
ch chan *entry.Entry
DbId int

stat struct {
Name string `json:"name"`
Address string `json:"address"`
Dir string `json:"dir"`

// status
Status State `json:"status"`

// rdb info
RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbFileSizeHuman string `json:"rdb_file_size_human"`
RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbReceivedHuman string `json:"rdb_received_human"`
RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
RdbSentHuman string `json:"rdb_sent_human"`

// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master
AofReceivedHuman string `json:"aof_received_human"`
}
stat syncStandaloneReaderStat

// version info
isDiskless bool
Expand Down Expand Up @@ -319,7 +343,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
}
timeStart = time.Now()
log.Debugf("[%s] start receiving RDB. path=[%s]", r.stat.Name, rdbFilePath)
rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o666)
if err != nil {
log.Panicf(err.Error())
}
Expand All @@ -345,39 +369,44 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write
buf := make([]byte, bufSize)

marker = strings.Split(marker, ":")[1]
if len(marker) != 40 {
if len(marker) != RDB_EOF_MARKER_LEN {
log.Panicf("[%s] invalid len of EOF marker. value=[%s]", r.stat.Name, marker)
}
log.Infof("meet EOF begin marker: %s", marker)
bMarker := []byte(marker)
goon := true
for goon {
n, err := r.client.Read(buf[:bufSize])
var lastBytes []byte
for {
copy(buf, lastBytes) // copy previous tail bytes to head of buf

nread, err := r.client.Read(buf[len(lastBytes):])
if err != nil {
log.Panicf(err.Error())
}
buffer := buf[:n]
if bytes.Contains(buffer, bMarker) {
suxb201 marked this conversation as resolved.
Show resolved Hide resolved

bufLen := len(lastBytes) + nread
nwrite := 0
if bufLen >= RDB_EOF_MARKER_LEN && bytes.Equal(buf[bufLen-RDB_EOF_MARKER_LEN:bufLen], bMarker) {
log.Infof("meet EOF end marker.")
// replace it
fi := bytes.Index(buffer, bMarker)
if len(buffer[fi+40:]) > 0 {
suxb201 marked this conversation as resolved.
Show resolved Hide resolved
log.Warnf("data after end marker will be discarded: %s", string(buffer[fi+40:]))
// Write all buf without EOF marker and break
if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil {
log.Panicf(err.Error())
}
buffer = buffer[:fi]

goon = false
break
}

_, err = wt.Write(buffer)
if err != nil {
log.Panicf(err.Error())
if bufLen >= RDB_EOF_MARKER_LEN {
// left RDB_EOF_MARKER_LEN bytes to next round
if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil {
log.Panicf(err.Error())
}
lastBytes = buf[bufLen-RDB_EOF_MARKER_LEN : bufLen] // save last RDB_EOF_MARKER_LEN bytes into lastBytes for next round
} else {
// save all bytes into lastBytes for next round if less than RDB_EOF_MARKER_LEN
lastBytes = buf[:bufLen]
}

r.stat.RdbFileSizeBytes += int64(n)
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(r.stat.RdbFileSizeBytes))
r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
r.stat.RdbFileSizeBytes += uint64(nwrite)
r.stat.RdbReceivedBytes += uint64(nwrite)
}
}

Expand All @@ -387,8 +416,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
log.Panicf(err.Error())
}
log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length)))
r.stat.RdbFileSizeBytes = length
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length))
r.stat.RdbFileSizeBytes = uint64(length)

remainder := length
const bufSize int64 = 32 * 1024 * 1024 // 32MB
Expand All @@ -408,8 +436,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
log.Panicf(err.Error())
}

r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
r.stat.RdbReceivedBytes += uint64(n)
}
}

Expand All @@ -427,8 +454,7 @@ func (r *syncStandaloneReader) receiveAOF() {
if err != nil {
log.Panicf(err.Error())
}
r.stat.AofReceivedBytes += int64(n)
r.stat.AofReceivedHuman = humanize.IBytes(uint64(r.stat.AofReceivedBytes))
r.stat.AofReceivedBytes += uint64(n)
aofWriter.Write(buf[:n])
r.stat.AofReceivedOffset += int64(n)
}
Expand All @@ -440,8 +466,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) {
log.Debugf("[%s] start sending RDB to target", r.stat.Name)
r.stat.Status = kSyncRdb
updateFunc := func(offset int64) {
r.stat.RdbSentBytes = offset
r.stat.RdbSentHuman = humanize.IBytes(uint64(offset))
r.stat.RdbSentBytes = uint64(offset)
}
rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch)
r.DbId = rdbLoader.ParseRDB(r.ctx)
Expand Down Expand Up @@ -532,16 +557,16 @@ func (r *syncStandaloneReader) Status() interface{} {

func (r *syncStandaloneReader) StatusString() string {
if r.stat.Status == kSyncRdb {
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbSentHuman, r.stat.RdbFileSizeHuman)
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbSentBytes), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
if r.stat.Status == kSyncAof {
return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset)
}
if r.stat.Status == kReceiveRdb {
if r.isDiskless {
return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, r.stat.RdbReceivedHuman)
return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes))
}
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbReceivedHuman, r.stat.RdbFileSizeHuman)
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
return string(r.stat.Status)
}
Expand Down
78 changes: 78 additions & 0 deletions internal/reader/sync_standalone_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package reader

import (
"context"
"encoding/json"
"testing"

"RedisShake/internal/log"
"github.com/stretchr/testify/require"
)

func Test_syncStandaloneReader_Status(t *testing.T) {
type fields struct {
ctx context.Context
opts *SyncReaderOptions
}
tests := []struct {
name string
fields fields
want interface{}
}{
{
name: "syncStandaloneReader_Status_Marshal",
fields: fields{
ctx: context.Background(),
opts: &SyncReaderOptions{
Cluster: false,
Address: "127.0.0.1:6379",
Username: "username",
Password: "password",
Tls: false,
SyncRdb: false,
SyncAof: false,
PreferReplica: false,
TryDiskless: false,
},
},
want: map[string]interface{}{
"name": "",
"address": "",
"dir": "",
"status": "",
"rdb_file_size_bytes": 0,
"rdb_file_size_human": "",
"rdb_received_bytes": 0,
"rdb_received_human": "",
"rdb_sent_bytes": 0,
"rdb_sent_human": "",
"aof_received_offset": 0,
"aof_sent_offset": 0,
"aof_received_bytes": 0,
"aof_received_human": "",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &syncStandaloneReader{
ctx: tt.fields.ctx,
opts: tt.fields.opts,
}

want, err := json.Marshal(tt.want)
if err != nil {
log.Warnf("marshal want failed, err=[%v]", err)
return
}

got, err := json.Marshal(r.Status())
if err != nil {
log.Warnf("marshal status failed, err=[%v]", err)
return
}

require.JSONEq(t, string(want), string(got))
})
}
}
Loading