Skip to content

Commit

Permalink
make net read/write timeout values configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Olga Shestopalova <oshestopalova@hubspot.com>
  • Loading branch information
Olga Shestopalova committed Oct 10, 2023
1 parent 597651a commit c6aba00
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 11 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ Flags:
--vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
--vreplication_net_write_timeout int Session value of net_read_timeout for vreplication, in seconds (default 600)
--vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s)
--vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s)
--vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ Flags:
--vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
--vreplication_net_write_timeout int Session value of net_read_timeout for vreplication, in seconds (default 600)
--vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s)
--vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s)
--vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package vttablet

import (
"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/servenv"
)

Expand All @@ -27,7 +26,11 @@ const (
VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2)
)

var VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
var (
VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
VReplicationNetReadTimeout = 300
VReplicationNetWriteTimeout = 600
)

func init() {
servenv.OnParseFor("vttablet", registerFlags)
Expand All @@ -36,4 +39,6 @@ func init() {
func registerFlags(fs *pflag.FlagSet) {
fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags,
"(Bitmask) of experimental features in vreplication to enable")
fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds")
fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds")
}
7 changes: 4 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"runtime/debug"
"strings"
"testing"
"vitess.io/vitess/go/vt/vttablet"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -60,8 +61,6 @@ const (
getAutoIncrementStep = "select @@session.auto_increment_increment"
setSessionTZ = "set @@session.time_zone = '+00:00'"
setNames = "set names 'binary'"
setNetReadTimeout = "set @@session.net_read_timeout = 300"
setNetWriteTimeout = "set @@session.net_write_timeout = 600"
getBinlogRowImage = "select @@binlog_row_image"
insertStreamsCreatedLog = "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(1, 'Stream Created', '', '%s'"
getVReplicationRecord = "select * from _vt.vreplication where id = 1"
Expand All @@ -86,7 +85,9 @@ var (
},
},
}
position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition)
position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition)
setNetReadTimeout = fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout)
setNetWriteTimeout = fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout)
)

// TestCreateVReplicationWorkflow tests the query generated
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"sync/atomic"
"time"
"vitess.io/vitess/go/vt/vttablet"

"google.golang.org/protobuf/encoding/prototext"

Expand Down Expand Up @@ -227,10 +228,10 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil {
return err
}
if _, err := dbClient.ExecuteFetch("set @@session.net_read_timeout = 300", 10000); err != nil {
if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil {
return err
}
if _, err := dbClient.ExecuteFetch("set @@session.net_write_timeout = 600", 10000); err != nil {
if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil {
return err
}
// We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid.
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sync"
"time"
"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
Expand Down Expand Up @@ -137,10 +138,10 @@ func (rs *rowStreamer) Stream() error {
if _, err := rs.conn.ExecuteFetch("set names 'binary'", 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch("set @@session.net_read_timeout = 300", 1, false); err != nil {
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch("set @@session.net_write_timeout = 600", 1, false); err != nil {
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil {
return err
}
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"strings"
"sync/atomic"
"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -108,10 +109,10 @@ func (ts *tableStreamer) Stream() error {
if _, err := conn.ExecuteFetch("set names 'binary'", 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch("set @@session.net_read_timeout = 300", 1, false); err != nil {
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch("set @@session.net_write_timeout = 600", 1, false); err != nil {
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil {
return err
}

Expand Down

0 comments on commit c6aba00

Please sign in to comment.