diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 4ed61f95080..f452b2844da 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -405,6 +405,8 @@ Flags: --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence + --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) + --vreplication_net_write_timeout int Session value of net_read_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 3b3f5142c97..61b315f923a 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -411,6 +411,8 @@ Flags: --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence + --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) + --vreplication_net_write_timeout int Session value of net_read_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table diff --git a/go/vt/vttablet/flags.go b/go/vt/vttablet/flags.go index 460a5427358..a5d7d61747e 100644 --- a/go/vt/vttablet/flags.go +++ b/go/vt/vttablet/flags.go @@ -18,7 +18,6 @@ package vttablet import ( "github.com/spf13/pflag" - "vitess.io/vitess/go/vt/servenv" ) @@ -27,7 +26,11 @@ const ( VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2) ) -var VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage +var ( + VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage + VReplicationNetReadTimeout = 300 + VReplicationNetWriteTimeout = 600 +) func init() { servenv.OnParseFor("vttablet", registerFlags) @@ -36,4 +39,6 @@ func init() { func registerFlags(fs *pflag.FlagSet) { fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags, "(Bitmask) of experimental features in vreplication to enable") + fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds") + fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds") } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 069f5977fe9..f4df9e2a31c 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -24,6 +24,7 @@ import ( "runtime/debug" "strings" "testing" + "vitess.io/vitess/go/vt/vttablet" "github.com/stretchr/testify/require" @@ -60,8 +61,6 @@ const ( getAutoIncrementStep = "select @@session.auto_increment_increment" setSessionTZ = "set @@session.time_zone = '+00:00'" setNames = "set names 'binary'" - setNetReadTimeout = "set @@session.net_read_timeout = 300" - setNetWriteTimeout = "set @@session.net_write_timeout = 600" getBinlogRowImage = "select @@binlog_row_image" insertStreamsCreatedLog = "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(1, 'Stream Created', '', '%s'" getVReplicationRecord = "select * from _vt.vreplication where id = 1" @@ -86,7 +85,9 @@ var ( }, }, } - position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition) + position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition) + setNetReadTimeout = fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout) + setNetWriteTimeout = fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout) ) // TestCreateVReplicationWorkflow tests the query generated diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index a431a35cd60..700b25e202f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -23,6 +23,7 @@ import ( "strings" "sync/atomic" "time" + "vitess.io/vitess/go/vt/vttablet" "google.golang.org/protobuf/encoding/prototext" @@ -227,10 +228,10 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil { return err } - if _, err := dbClient.ExecuteFetch("set @@session.net_read_timeout = 300", 10000); err != nil { + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil { return err } - if _, err := dbClient.ExecuteFetch("set @@session.net_write_timeout = 600", 10000); err != nil { + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil { return err } // We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid. diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index b1a25929145..566c273d3cb 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" "time" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" @@ -137,10 +138,10 @@ func (rs *rowStreamer) Stream() error { if _, err := rs.conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } - if _, err := conn.ExecuteFetch("set @@session.net_read_timeout = 300", 1, false); err != nil { + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil { return err } - if _, err := conn.ExecuteFetch("set @@session.net_write_timeout = 600", 1, false); err != nil { + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil { return err } } diff --git a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go index d46519dbd3d..9c01ac12c93 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go @@ -22,6 +22,7 @@ import ( "fmt" "strings" "sync/atomic" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" @@ -108,10 +109,10 @@ func (ts *tableStreamer) Stream() error { if _, err := conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } - if _, err := conn.ExecuteFetch("set @@session.net_read_timeout = 300", 1, false); err != nil { + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil { return err } - if _, err := conn.ExecuteFetch("set @@session.net_write_timeout = 600", 1, false); err != nil { + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil { return err }