diff --git a/client/client.go b/client/client.go index c0b60ff7..b71da2a8 100644 --- a/client/client.go +++ b/client/client.go @@ -105,6 +105,8 @@ type ovsdbClient struct { handlerShutdown *sync.WaitGroup + trafficSeen chan struct{} + logger *logr.Logger } @@ -304,6 +306,10 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { } go o.handleDisconnectNotification() + if o.options.inactivityTimeout > 0 { + o.handlerShutdown.Add(1) + go o.handleInactivityProbes() + } for _, db := range o.databases { o.handlerShutdown.Add(1) eventStopChan := make(chan struct{}) @@ -418,6 +424,9 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro // Should only be called when the mutex is held func (o *ovsdbClient) createRPC2Client(conn net.Conn) { o.stopCh = make(chan struct{}) + if o.options.inactivityTimeout > 0 { + o.trafficSeen = make(chan struct{}) + } o.rpcClient = rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn)) o.rpcClient.SetBlocking(true) o.rpcClient.Handle("echo", func(_ *rpc2.Client, args []interface{}, reply *[]interface{}) error { @@ -445,7 +454,7 @@ func (o *ovsdbClient) isEndpointLeader(ctx context.Context) (bool, string, error Table: "Database", Columns: []string{"name", "model", "leader", "sid"}, } - results, err := o.transact(ctx, serverDB, op) + results, err := o.transact(ctx, serverDB, true, op) if err != nil { return false, "", fmt.Errorf("could not check if server was leader: %w", err) } @@ -793,10 +802,10 @@ func (o *ovsdbClient) Transact(ctx context.Context, operation ...ovsdb.Operation } } defer o.rpcMutex.RUnlock() - return o.transact(ctx, o.primaryDBName, operation...) + return o.transact(ctx, o.primaryDBName, false, operation...) } -func (o *ovsdbClient) transact(ctx context.Context, dbName string, operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) { +func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite bool, operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) { var reply []ovsdb.OperationResult db := o.databases[dbName] db.modelMutex.RLock() @@ -824,6 +833,10 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, operation ... } return nil, err } + + if !skipChWrite && o.trafficSeen != nil { + o.trafficSeen <- struct{}{} + } return reply, nil } @@ -1174,10 +1187,82 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) { } } +func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call { + o.rpcMutex.RLock() + defer o.rpcMutex.RUnlock() + if o.rpcClient == nil { + return nil + } + return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1)) +} + +func (o *ovsdbClient) handleInactivityProbes() { + defer o.handlerShutdown.Done() + echoReplied := make(chan string) + var lastEcho string + stopCh := o.stopCh + trafficSeen := o.trafficSeen + for { + select { + case <-stopCh: + return + case <-trafficSeen: + // We got some traffic from the server, restart our timer + case ts := <-echoReplied: + // Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect() + if ts != lastEcho { + o.Disconnect() + return + } + lastEcho = "" + case <-time.After(o.options.inactivityTimeout): + // If there's a lastEcho already, then we didn't get a server reply, disconnect + if lastEcho != "" { + o.Disconnect() + return + } + // Otherwise send an echo + thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro()) + args := []interface{}{"libovsdb echo", thisEcho} + var reply []interface{} + // Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go() + call := o.sendEcho(args, &reply) + if call == nil { + o.Disconnect() + return + } + lastEcho = thisEcho + go func() { + // Wait for the echo reply + select { + case <-stopCh: + return + case <-call.Done: + if call.Error != nil { + // RPC timeout; disconnect + o.logger.V(3).Error(call.Error, "server echo reply error") + o.Disconnect() + } else if !reflect.DeepEqual(args, reply) { + o.logger.V(3).Info("warning: incorrect server echo reply", + "expected", args, "reply", reply) + o.Disconnect() + } else { + // Otherwise stuff thisEcho into the echoReplied channel + echoReplied <- thisEcho + } + } + }() + } + } +} + func (o *ovsdbClient) handleDisconnectNotification() { <-o.rpcClient.DisconnectNotify() // close the stopCh, which will stop the cache event processor close(o.stopCh) + if o.trafficSeen != nil { + close(o.trafficSeen) + } o.metrics.numDisconnects.Inc() // wait for client related handlers to shutdown o.handlerShutdown.Wait() diff --git a/client/client_test.go b/client/client_test.go index 48308e06..c8a8148e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -959,6 +959,96 @@ func setLeader(t *testing.T, cli Client, row *serverdb.Database, isLeader bool) assert.NoErrorf(t, err, "%+v", opErr) } +func TestClientInactiveCheck(t *testing.T) { + var defSchema ovsdb.DatabaseSchema + err := json.Unmarshal([]byte(schema), &defSchema) + require.NoError(t, err) + + serverDBModel, err := serverdb.FullDatabaseModel() + require.NoError(t, err) + // Create server + server, sock := newOVSDBServer(t, defDB, defSchema) + + // Create client to test inactivity check. + endpoint := fmt.Sprintf("unix:%s", sock) + ovs, err := newOVSDBClient(serverDBModel, + WithInactivityCheck(2*time.Second, 1*time.Second, &backoff.ZeroBackOff{}), + WithEndpoint(endpoint)) + require.NoError(t, err) + err = ovs.Connect(context.Background()) + require.NoError(t, err) + t.Cleanup(ovs.Close) + + // Make server to do echo off and then on for two times. + // Ensure this is detected by client's inactivity probe + // each time and then reconnects to the server when it + // is started responding to echo requests. + + // 1st test for client with making server not to respond for echo requests. + notified := make(chan struct{}) + ready := make(chan struct{}) + disconnectNotify := ovs.rpcClient.DisconnectNotify() + go func() { + ready <- struct{}{} + <-disconnectNotify + notified <- struct{}{} + }() + <-ready + server.DoEcho(false) + select { + case <-notified: + // got notification + case <-time.After(5 * time.Second): + assert.Fail(t, "client doesn't detect the echo failure") + } + + // 2nd test for client with making server to respond for echo requests. + server.DoEcho(true) +loop: + for timeout := time.After(5 * time.Second); ; { + select { + case <-timeout: + assert.Fail(t, "reconnect is not successful") + default: + if ovs.Connected() { + break loop + } + } + } + + // 3rd test for client with making server not to respond for echo requests. + notified = make(chan struct{}) + ready = make(chan struct{}) + disconnectNotify = ovs.rpcClient.DisconnectNotify() + go func() { + ready <- struct{}{} + <-disconnectNotify + notified <- struct{}{} + }() + <-ready + server.DoEcho(false) + select { + case <-notified: + // got notification + case <-time.After(5 * time.Second): + assert.Fail(t, "client doesn't detect the echo failure") + } + + // 4th test for client with making server to respond for echo requests. + server.DoEcho(true) +loop1: + for timeout := time.After(5 * time.Second); ; { + select { + case <-timeout: + assert.Fail(t, "reconnect is not successful") + default: + if ovs.Connected() { + break loop1 + } + } + } +} + func TestClientReconnectLeaderOnly(t *testing.T) { rand.Seed(time.Now().UnixNano()) diff --git a/client/options.go b/client/options.go index 5ebf292a..81ccffe2 100644 --- a/client/options.go +++ b/client/options.go @@ -28,6 +28,7 @@ type options struct { shouldRegisterMetrics bool // in case metrics are changed after-the-fact metricNamespace string // prometheus metric namespace metricSubsystem string // prometheus metric subsystem + inactivityTimeout time.Duration } type Option func(o *options) error @@ -111,6 +112,22 @@ func WithReconnect(timeout time.Duration, backoff backoff.BackOff) Option { } } +// WithInactivityCheck tells the client to send Echo request to ovsdb server periodically +// upon inactivityTimeout. When Echo request fails, then it attempts to reconnect +// with server. The inactivity check is performed as long as the connection is established. +// The reconnectTimeout argument is used to construct the context on each call to Connect, +// while reconnectBackoff dictates the backoff algorithm to use. +func WithInactivityCheck(inactivityTimeout, reconnectTimeout time.Duration, + reconnectBackoff backoff.BackOff) Option { + return func(o *options) error { + o.reconnect = true + o.timeout = reconnectTimeout + o.backoff = reconnectBackoff + o.inactivityTimeout = inactivityTimeout + return nil + } +} + // WithLogger allows setting a specific log sink. Otherwise, the default // go log package is used. func WithLogger(l *logr.Logger) Option { diff --git a/modelgen/table_test.go b/modelgen/table_test.go index b507a47d..6d4e2711 100644 --- a/modelgen/table_test.go +++ b/modelgen/table_test.go @@ -719,7 +719,7 @@ func TestFieldType(t *testing.T) { out string }{ {"t1", "c1", &singleValueSetSchema, "*string"}, - {"t1", "c2", &multipleValueSetSchema, "[2]string"}, + {"t1", "c2", &multipleValueSetSchema, "[]string"}, } for _, tt := range tests { diff --git a/server/server.go b/server/server.go index a5b8ac79..0100d766 100644 --- a/server/server.go +++ b/server/server.go @@ -26,6 +26,7 @@ type OvsdbServer struct { done chan struct{} db database.Database ready bool + doEcho bool readyMutex sync.RWMutex models map[string]model.DatabaseModel modelsMutex sync.RWMutex @@ -35,12 +36,16 @@ type OvsdbServer struct { txnMutex sync.Mutex } +func init() { + stdr.SetVerbosity(5) +} + // NewOvsdbServer returns a new OvsdbServer func NewOvsdbServer(db database.Database, models ...model.DatabaseModel) (*OvsdbServer, error) { l := stdr.NewWithOptions(log.New(os.Stderr, "", log.LstdFlags), stdr.Options{LogCaller: stdr.All}).WithName("server") - stdr.SetVerbosity(5) o := &OvsdbServer{ done: make(chan struct{}, 1), + doEcho: true, db: db, models: make(map[string]model.DatabaseModel), modelsMutex: sync.RWMutex{}, @@ -83,6 +88,12 @@ func (o *OvsdbServer) OnDisConnect(f func(*rpc2.Client)) { o.srv.OnDisconnect(f) } +func (o *OvsdbServer) DoEcho(ok bool) { + o.readyMutex.Lock() + o.doEcho = ok + o.readyMutex.Unlock() +} + // Serve starts the OVSDB server on the given path and protocol func (o *OvsdbServer) Serve(protocol string, path string) error { var err error @@ -382,6 +393,11 @@ func (o *OvsdbServer) Unlock(client *rpc2.Client, args []interface{}, reply *[]i // Echo tests the liveness of the connection func (o *OvsdbServer) Echo(client *rpc2.Client, args []interface{}, reply *[]interface{}) error { + o.readyMutex.Lock() + defer o.readyMutex.Unlock() + if !o.doEcho { + return fmt.Errorf("no echo reply") + } echoReply := make([]interface{}, len(args)) copy(echoReply, args) *reply = echoReply diff --git a/test/ovs/ovs_integration_test.go b/test/ovs/ovs_integration_test.go index be401682..daafc926 100644 --- a/test/ovs/ovs_integration_test.go +++ b/test/ovs/ovs_integration_test.go @@ -24,9 +24,10 @@ import ( // OVSIntegrationSuite runs tests against a real Open vSwitch instance type OVSIntegrationSuite struct { suite.Suite - pool *dockertest.Pool - resource *dockertest.Resource - client client.Client + pool *dockertest.Pool + resource *dockertest.Resource + clientWithoutInactvityCheck client.Client + clientWithInactivityCheck client.Client } func (suite *OVSIntegrationSuite) SetupSuite() { @@ -68,8 +69,11 @@ func (suite *OVSIntegrationSuite) SetupSuite() { } func (suite *OVSIntegrationSuite) SetupTest() { - if suite.client != nil { - suite.client.Close() + if suite.clientWithoutInactvityCheck != nil { + suite.clientWithoutInactvityCheck.Close() + } + if suite.clientWithInactivityCheck != nil { + suite.clientWithInactivityCheck.Close() } var err error err = suite.pool.Retry(func() error { @@ -89,15 +93,32 @@ func (suite *OVSIntegrationSuite) SetupTest() { suite.T().Log(err) return err } - suite.client = ovs + suite.clientWithoutInactvityCheck = ovs + + ovs2, err := client.NewOVSDBClient( + defDB, + client.WithEndpoint(endpoint), + client.WithInactivityCheck(2*time.Second, 1*time.Second, &backoff.ZeroBackOff{}), + client.WithLeaderOnly(true), + ) + if err != nil { + return err + } + err = ovs2.Connect(ctx) + if err != nil { + suite.T().Log(err) + return err + } + suite.clientWithInactivityCheck = ovs2 + return nil }) require.NoError(suite.T(), err) // give ovsdb-server some time to start up - _, err = suite.client.Monitor(context.TODO(), - suite.client.NewMonitor( + _, err = suite.clientWithoutInactvityCheck.Monitor(context.TODO(), + suite.clientWithoutInactvityCheck.NewMonitor( client.WithTable(&ovsType{}), client.WithTable(&bridgeType{}), ), @@ -106,9 +127,13 @@ func (suite *OVSIntegrationSuite) SetupTest() { } func (suite *OVSIntegrationSuite) TearDownSuite() { - if suite.client != nil { - suite.client.Close() - suite.client = nil + if suite.clientWithoutInactvityCheck != nil { + suite.clientWithoutInactvityCheck.Close() + suite.clientWithoutInactvityCheck = nil + } + if suite.clientWithInactivityCheck != nil { + suite.clientWithInactivityCheck.Close() + suite.clientWithInactivityCheck = nil } err := suite.pool.Purge(suite.resource) require.NoError(suite.T(), err) @@ -183,13 +208,13 @@ var defDB, _ = model.NewClientDBModel("Open_vSwitch", map[string]model.Model{ }) func (suite *OVSIntegrationSuite) TestConnectReconnect() { - assert.True(suite.T(), suite.client.Connected()) - err := suite.client.Echo(context.TODO()) + assert.True(suite.T(), suite.clientWithoutInactvityCheck.Connected()) + err := suite.clientWithoutInactvityCheck.Echo(context.TODO()) require.NoError(suite.T(), err) bridgeName := "br-discoreco" brChan := make(chan *bridgeType) - suite.client.Cache().AddEventHandler(&cache.EventHandlerFuncs{ + suite.clientWithoutInactvityCheck.Cache().AddEventHandler(&cache.EventHandlerFuncs{ AddFunc: func(table string, model model.Model) { br, ok := model.(*bridgeType) if !ok { @@ -208,10 +233,10 @@ func (suite *OVSIntegrationSuite) TestConnectReconnect() { // make another connect call, this should return without error as we're already connected ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - err = suite.client.Connect(ctx) + err = suite.clientWithoutInactvityCheck.Connect(ctx) require.NoError(suite.T(), err) - disconnectNotification := suite.client.DisconnectNotify() + disconnectNotification := suite.clientWithoutInactvityCheck.DisconnectNotify() notified := make(chan struct{}) ready := make(chan struct{}) @@ -222,7 +247,7 @@ func (suite *OVSIntegrationSuite) TestConnectReconnect() { }() <-ready - suite.client.Disconnect() + suite.clientWithoutInactvityCheck.Disconnect() select { case <-notified: @@ -231,15 +256,15 @@ func (suite *OVSIntegrationSuite) TestConnectReconnect() { suite.T().Fatal("expected a disconnect notification but didn't receive one") } - assert.Equal(suite.T(), false, suite.client.Connected()) + assert.Equal(suite.T(), false, suite.clientWithoutInactvityCheck.Connected()) - err = suite.client.Echo(context.TODO()) + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) require.EqualError(suite.T(), err, client.ErrNotConnected.Error()) ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - err = suite.client.Connect(ctx) + err = suite.clientWithoutInactvityCheck.Connect(ctx) require.NoError(suite.T(), err) br := &bridgeType{ @@ -247,14 +272,14 @@ func (suite *OVSIntegrationSuite) TestConnectReconnect() { } // assert cache has been purged - err = suite.client.Get(ctx, br) + err = suite.clientWithoutInactvityCheck.Get(ctx, br) require.Error(suite.T(), err, client.ErrNotFound) - err = suite.client.Echo(context.TODO()) + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) assert.NoError(suite.T(), err) - _, err = suite.client.Monitor(context.TODO(), - suite.client.NewMonitor( + _, err = suite.clientWithoutInactvityCheck.Monitor(context.TODO(), + suite.clientWithoutInactvityCheck.NewMonitor( client.WithTable(&ovsType{}), client.WithTable(&bridgeType{}), ), @@ -262,24 +287,90 @@ func (suite *OVSIntegrationSuite) TestConnectReconnect() { require.NoError(suite.T(), err) // assert cache has been re-populated - require.NoError(suite.T(), suite.client.Get(ctx, br)) + require.NoError(suite.T(), suite.clientWithoutInactvityCheck.Get(ctx, br)) + +} + +func (suite *OVSIntegrationSuite) TestWithInactivityCheck() { + assert.Equal(suite.T(), true, suite.clientWithInactivityCheck.Connected()) + err := suite.clientWithInactivityCheck.Echo(context.TODO()) + require.NoError(suite.T(), err) + + // Disconnect client + suite.clientWithInactivityCheck.Disconnect() + + // Ensure Disconnect doesn't have any impact to the connection. + require.Eventually(suite.T(), func() bool { + return suite.clientWithInactivityCheck.Connected() + }, 5*time.Second, 1*time.Second) + // Try to reconfigure client which already have an established connection. + err = suite.clientWithInactivityCheck.SetOption( + client.WithReconnect(2*time.Second, &backoff.ZeroBackOff{}), + ) + require.Error(suite.T(), err) + + // Ensure Connect doesn't purge the cache. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err = suite.clientWithInactivityCheck.Connect(ctx) + require.NoError(suite.T(), err) + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) + require.NoError(suite.T(), err) + require.True(suite.T(), suite.clientWithoutInactvityCheck.Cache().Table("Bridge").Len() != 0) + + // set up a disconnect notification + disconnectNotification := suite.clientWithoutInactvityCheck.DisconnectNotify() + notified := make(chan struct{}) + ready := make(chan struct{}) + + go func() { + ready <- struct{}{} + <-disconnectNotification + notified <- struct{}{} + }() + + <-ready + // close the connection + suite.clientWithoutInactvityCheck.Close() + select { + case <-notified: + // got notification + case <-time.After(5 * time.Second): + suite.T().Fatal("expected a disconnect notification but didn't receive one") + } + + assert.Equal(suite.T(), false, suite.clientWithoutInactvityCheck.Connected()) + + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) + require.EqualError(suite.T(), err, client.ErrNotConnected.Error()) + + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err = suite.clientWithoutInactvityCheck.Connect(ctx) + require.NoError(suite.T(), err) + + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) + assert.NoError(suite.T(), err) + + _, err = suite.clientWithoutInactvityCheck.MonitorAll(context.TODO()) + require.NoError(suite.T(), err) } func (suite *OVSIntegrationSuite) TestWithReconnect() { - assert.Equal(suite.T(), true, suite.client.Connected()) - err := suite.client.Echo(context.TODO()) + assert.Equal(suite.T(), true, suite.clientWithoutInactvityCheck.Connected()) + err := suite.clientWithoutInactvityCheck.Echo(context.TODO()) require.NoError(suite.T(), err) // Disconnect client - suite.client.Disconnect() + suite.clientWithoutInactvityCheck.Disconnect() require.Eventually(suite.T(), func() bool { - return !suite.client.Connected() + return !suite.clientWithoutInactvityCheck.Connected() }, 5*time.Second, 1*time.Second) // Reconfigure - err = suite.client.SetOption( + err = suite.clientWithoutInactvityCheck.SetOption( client.WithReconnect(2*time.Second, &backoff.ZeroBackOff{}), ) require.NoError(suite.T(), err) @@ -287,30 +378,30 @@ func (suite *OVSIntegrationSuite) TestWithReconnect() { // Connect (again) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - err = suite.client.Connect(ctx) + err = suite.clientWithoutInactvityCheck.Connect(ctx) require.NoError(suite.T(), err) // make another connect call, this should return without error as we're already connected ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - err = suite.client.Connect(ctx) + err = suite.clientWithoutInactvityCheck.Connect(ctx) require.NoError(suite.T(), err) // check the connection is working - err = suite.client.Echo(context.TODO()) + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) require.NoError(suite.T(), err) // check the cache is purged - require.True(suite.T(), suite.client.Cache().Table("Bridge").Len() == 0) + require.True(suite.T(), suite.clientWithoutInactvityCheck.Cache().Table("Bridge").Len() == 0) // set up the monitor again - _, err = suite.client.MonitorAll(context.TODO()) + _, err = suite.clientWithoutInactvityCheck.MonitorAll(context.TODO()) require.NoError(suite.T(), err) // add a bridge and verify our handler gets called bridgeName := "recon-b4" brChan := make(chan *bridgeType) - suite.client.Cache().AddEventHandler(&cache.EventHandlerFuncs{ + suite.clientWithoutInactvityCheck.Cache().AddEventHandler(&cache.EventHandlerFuncs{ AddFunc: func(table string, model model.Model) { br, ok := model.(*bridgeType) if !ok { @@ -333,14 +424,14 @@ func (suite *OVSIntegrationSuite) TestWithReconnect() { // check that we are automatically reconnected require.Eventually(suite.T(), func() bool { - return suite.client.Connected() + return suite.clientWithoutInactvityCheck.Connected() }, 20*time.Second, 1*time.Second) - err = suite.client.Echo(context.TODO()) + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) require.NoError(suite.T(), err) // check our original bridge is in the cache - err = suite.client.Get(ctx, br) + err = suite.clientWithoutInactvityCheck.Get(ctx, br) require.NoError(suite.T(), err) // create a new bridge to ensure the monitor and cache handler is still working @@ -361,7 +452,7 @@ LOOP: } // set up a disconnect notification - disconnectNotification := suite.client.DisconnectNotify() + disconnectNotification := suite.clientWithoutInactvityCheck.DisconnectNotify() notified := make(chan struct{}) ready := make(chan struct{}) @@ -373,7 +464,7 @@ LOOP: <-ready // close the connection - suite.client.Close() + suite.clientWithoutInactvityCheck.Close() select { case <-notified: @@ -382,20 +473,20 @@ LOOP: suite.T().Fatal("expected a disconnect notification but didn't receive one") } - assert.Equal(suite.T(), false, suite.client.Connected()) + assert.Equal(suite.T(), false, suite.clientWithoutInactvityCheck.Connected()) - err = suite.client.Echo(context.TODO()) + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) require.EqualError(suite.T(), err, client.ErrNotConnected.Error()) ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - err = suite.client.Connect(ctx) + err = suite.clientWithoutInactvityCheck.Connect(ctx) require.NoError(suite.T(), err) - err = suite.client.Echo(context.TODO()) + err = suite.clientWithoutInactvityCheck.Echo(context.TODO()) assert.NoError(suite.T(), err) - _, err = suite.client.MonitorAll(context.TODO()) + _, err = suite.clientWithoutInactvityCheck.MonitorAll(context.TODO()) require.NoError(suite.T(), err) } @@ -405,7 +496,7 @@ func (suite *OVSIntegrationSuite) TestInsertTransactIntegration() { require.NoError(suite.T(), err) require.Eventually(suite.T(), func() bool { br := &bridgeType{UUID: uuid} - err := suite.client.Get(context.Background(), br) + err := suite.clientWithoutInactvityCheck.Get(context.Background(), br) return err == nil }, 2*time.Second, 500*time.Millisecond) } @@ -416,7 +507,7 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsTransactIntegration() { require.NoError(suite.T(), err) require.Eventually(suite.T(), func() bool { br := &bridgeType{UUID: uuid} - err := suite.client.Get(context.Background(), br) + err := suite.clientWithoutInactvityCheck.Get(context.Background(), br) return err == nil }, 2*time.Second, 500*time.Millisecond) @@ -424,7 +515,7 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsTransactIntegration() { ovsRow := bridgeType{} br := &bridgeType{UUID: uuid} - op1, err := suite.client.Where(br). + op1, err := suite.clientWithoutInactvityCheck.Where(br). Mutate(&ovsRow, model.Mutation{ Field: &ovsRow.ExternalIds, Mutator: ovsdb.MutateOperationInsert, @@ -450,7 +541,7 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsTransactIntegration() { Value: map[string]string{"podman": "made-for-each-other"}, }, } - op2, err := suite.client.Where(br).Mutate(&ovsRow, op2Mutations...) + op2, err := suite.clientWithoutInactvityCheck.Where(br).Mutate(&ovsRow, op2Mutations...) require.NoError(suite.T(), err) operations = append(operations, op2...) @@ -458,14 +549,14 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsTransactIntegration() { op3 := ovsdb.Operation{Op: ovsdb.OperationComment, Comment: &op3Comment} operations = append(operations, op3) - reply, err := suite.client.Transact(context.TODO(), operations...) + reply, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), operations...) require.NoError(suite.T(), err) _, err = ovsdb.CheckOperationResults(reply, operations) require.NoError(suite.T(), err) require.Eventually(suite.T(), func() bool { - err := suite.client.Get(context.Background(), br) + err := suite.clientWithoutInactvityCheck.Get(context.Background(), br) return err == nil }, 2*time.Second, 500*time.Millisecond) @@ -486,15 +577,15 @@ func (suite *OVSIntegrationSuite) TestInsertAndDeleteTransactIntegration() { require.Eventually(suite.T(), func() bool { br := &bridgeType{UUID: bridgeUUID} - err := suite.client.Get(context.Background(), br) + err := suite.clientWithoutInactvityCheck.Get(context.Background(), br) return err == nil }, 2*time.Second, 500*time.Millisecond) - deleteOp, err := suite.client.Where(&bridgeType{Name: bridgeName}).Delete() + deleteOp, err := suite.clientWithoutInactvityCheck.Where(&bridgeType{Name: bridgeName}).Delete() require.NoError(suite.T(), err) ovsRow := ovsType{} - delMutateOp, err := suite.client.WhereCache(func(*ovsType) bool { return true }). + delMutateOp, err := suite.clientWithoutInactvityCheck.WhereCache(func(*ovsType) bool { return true }). Mutate(&ovsRow, model.Mutation{ Field: &ovsRow.Bridges, Mutator: ovsdb.MutateOperationDelete, @@ -504,7 +595,7 @@ func (suite *OVSIntegrationSuite) TestInsertAndDeleteTransactIntegration() { require.NoError(suite.T(), err) delOperations := append(deleteOp, delMutateOp...) - delReply, err := suite.client.Transact(context.TODO(), delOperations...) + delReply, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), delOperations...) require.NoError(suite.T(), err) delOperationErrs, err := ovsdb.CheckOperationResults(delReply, delOperations) @@ -517,7 +608,7 @@ func (suite *OVSIntegrationSuite) TestInsertAndDeleteTransactIntegration() { require.Eventually(suite.T(), func() bool { br := &bridgeType{UUID: bridgeUUID} - err := suite.client.Get(context.Background(), br) + err := suite.clientWithoutInactvityCheck.Get(context.Background(), br) return err != nil }, 2*time.Second, 500*time.Millisecond) } @@ -528,7 +619,7 @@ func (suite *OVSIntegrationSuite) TestTableSchemaValidationIntegration() { Table: "InvalidTable", Row: ovsdb.Row(map[string]interface{}{"name": "docker-ovs"}), } - _, err := suite.client.Transact(context.TODO(), operation) + _, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), operation) assert.Error(suite.T(), err) } @@ -539,7 +630,7 @@ func (suite *OVSIntegrationSuite) TestColumnSchemaInRowValidationIntegration() { Row: ovsdb.Row(map[string]interface{}{"name": "docker-ovs", "invalid_column": "invalid_column"}), } - _, err := suite.client.Transact(context.TODO(), operation) + _, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), operation) assert.Error(suite.T(), err) } @@ -553,7 +644,7 @@ func (suite *OVSIntegrationSuite) TestColumnSchemaInMultipleRowsValidationIntegr Table: "Bridge", Rows: rows, } - _, err := suite.client.Transact(context.TODO(), operation) + _, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), operation) assert.Error(suite.T(), err) } @@ -563,14 +654,14 @@ func (suite *OVSIntegrationSuite) TestColumnSchemaValidationIntegration() { Table: "Bridge", Columns: []string{"name", "invalidColumn"}, } - _, err := suite.client.Transact(context.TODO(), operation) + _, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), operation) assert.Error(suite.T(), err) } func (suite *OVSIntegrationSuite) TestMonitorCancelIntegration() { - monitorID, err := suite.client.Monitor( + monitorID, err := suite.clientWithoutInactvityCheck.Monitor( context.TODO(), - suite.client.NewMonitor( + suite.clientWithoutInactvityCheck.NewMonitor( client.WithTable(&queueType{}), ), ) @@ -580,18 +671,18 @@ func (suite *OVSIntegrationSuite) TestMonitorCancelIntegration() { require.NoError(suite.T(), err) require.Eventually(suite.T(), func() bool { q := &queueType{UUID: uuid} - err = suite.client.Get(context.Background(), q) + err = suite.clientWithoutInactvityCheck.Get(context.Background(), q) return err == nil }, 2*time.Second, 500*time.Millisecond) - err = suite.client.MonitorCancel(context.TODO(), monitorID) + err = suite.clientWithoutInactvityCheck.MonitorCancel(context.TODO(), monitorID) assert.NoError(suite.T(), err) uuid, err = suite.createQueue("test2", 1) require.NoError(suite.T(), err) assert.Never(suite.T(), func() bool { q := &queueType{UUID: uuid} - err = suite.client.Get(context.Background(), q) + err = suite.clientWithoutInactvityCheck.Get(context.Background(), q) return err == nil }, 2*time.Second, 500*time.Millisecond) } @@ -614,9 +705,9 @@ func (suite *OVSIntegrationSuite) TestMonitorConditionIntegration() { }, } - _, err := suite.client.Monitor( + _, err := suite.clientWithoutInactvityCheck.Monitor( context.TODO(), - suite.client.NewMonitor( + suite.clientWithoutInactvityCheck.NewMonitor( client.WithConditionalTable(&queue, conditions), ), ) @@ -626,7 +717,7 @@ func (suite *OVSIntegrationSuite) TestMonitorConditionIntegration() { require.NoError(suite.T(), err) require.Eventually(suite.T(), func() bool { q := &queueType{UUID: uuid} - err = suite.client.Get(context.Background(), q) + err = suite.clientWithoutInactvityCheck.Get(context.Background(), q) return err == nil }, 2*time.Second, 500*time.Millisecond) @@ -634,7 +725,7 @@ func (suite *OVSIntegrationSuite) TestMonitorConditionIntegration() { require.NoError(suite.T(), err) assert.Never(suite.T(), func() bool { q := &queueType{UUID: uuid} - err = suite.client.Get(context.Background(), q) + err = suite.clientWithoutInactvityCheck.Get(context.Background(), q) return err == nil }, 2*time.Second, 500*time.Millisecond) @@ -642,7 +733,7 @@ func (suite *OVSIntegrationSuite) TestMonitorConditionIntegration() { require.NoError(suite.T(), err) require.Eventually(suite.T(), func() bool { q := &queueType{UUID: uuid} - err = suite.client.Get(context.Background(), q) + err = suite.clientWithoutInactvityCheck.Get(context.Background(), q) return err == nil }, 2*time.Second, 500*time.Millisecond) } @@ -653,7 +744,7 @@ func (suite *OVSIntegrationSuite) TestInsertDuplicateTransactIntegration() { require.Eventually(suite.T(), func() bool { br := &bridgeType{UUID: uuid} - err := suite.client.Get(context.Background(), br) + err := suite.clientWithoutInactvityCheck.Get(context.Background(), br) return err == nil }, 2*time.Second, 500*time.Millisecond) @@ -668,32 +759,32 @@ func (suite *OVSIntegrationSuite) TestUpdate() { require.Eventually(suite.T(), func() bool { br := &bridgeType{UUID: uuid} - err := suite.client.Get(context.Background(), br) + err := suite.clientWithoutInactvityCheck.Get(context.Background(), br) return err == nil }, 2*time.Second, 500*time.Millisecond) bridgeRow := &bridgeType{UUID: uuid} - err = suite.client.Get(context.Background(), bridgeRow) + err = suite.clientWithoutInactvityCheck.Get(context.Background(), bridgeRow) require.NoError(suite.T(), err) // try to modify immutable field bridgeRow.Name = "br-update2" - _, err = suite.client.Where(bridgeRow).Update(bridgeRow, &bridgeRow.Name) + _, err = suite.clientWithoutInactvityCheck.Where(bridgeRow).Update(bridgeRow, &bridgeRow.Name) require.Error(suite.T(), err) bridgeRow.Name = "br-update" // update many fields bridgeRow.ExternalIds["baz"] = "foobar" bridgeRow.OtherConfig = map[string]string{"foo": "bar"} - ops, err := suite.client.Where(bridgeRow).Update(bridgeRow) + ops, err := suite.clientWithoutInactvityCheck.Where(bridgeRow).Update(bridgeRow) require.NoError(suite.T(), err) - reply, err := suite.client.Transact(context.Background(), ops...) + reply, err := suite.clientWithoutInactvityCheck.Transact(context.Background(), ops...) require.NoError(suite.T(), err) opErrs, err := ovsdb.CheckOperationResults(reply, ops) require.NoErrorf(suite.T(), err, "%+v", opErrs) require.Eventually(suite.T(), func() bool { br := &bridgeType{UUID: uuid} - err = suite.client.Get(context.Background(), br) + err = suite.clientWithoutInactvityCheck.Get(context.Background(), br) if err != nil { return false } @@ -702,16 +793,16 @@ func (suite *OVSIntegrationSuite) TestUpdate() { newExternalIds := map[string]string{"foo": "bar"} bridgeRow.ExternalIds = newExternalIds - ops, err = suite.client.Where(bridgeRow).Update(bridgeRow, &bridgeRow.ExternalIds) + ops, err = suite.clientWithoutInactvityCheck.Where(bridgeRow).Update(bridgeRow, &bridgeRow.ExternalIds) require.NoError(suite.T(), err) - reply, err = suite.client.Transact(context.Background(), ops...) + reply, err = suite.clientWithoutInactvityCheck.Transact(context.Background(), ops...) require.NoError(suite.T(), err) opErr, err := ovsdb.CheckOperationResults(reply, ops) require.NoErrorf(suite.T(), err, "%Populate2+v", opErr) assert.Eventually(suite.T(), func() bool { br := &bridgeType{UUID: uuid} - err = suite.client.Get(context.Background(), br) + err = suite.clientWithoutInactvityCheck.Get(context.Background(), br) if err != nil { return false } @@ -732,12 +823,12 @@ func (suite *OVSIntegrationSuite) createBridge(bridgeName string) (string, error BridgeFailMode: &BridgeFailModeSecure, } - insertOp, err := suite.client.Create(&br) + insertOp, err := suite.clientWithoutInactvityCheck.Create(&br) require.NoError(suite.T(), err) // Inserting a Bridge row in Bridge table requires mutating the open_vswitch table. ovsRow := ovsType{} - mutateOp, err := suite.client.WhereCache(func(*ovsType) bool { return true }). + mutateOp, err := suite.clientWithoutInactvityCheck.WhereCache(func(*ovsType) bool { return true }). Mutate(&ovsRow, model.Mutation{ Field: &ovsRow.Bridges, Mutator: ovsdb.MutateOperationInsert, @@ -746,7 +837,7 @@ func (suite *OVSIntegrationSuite) createBridge(bridgeName string) (string, error require.NoError(suite.T(), err) operations := append(insertOp, mutateOp...) - reply, err := suite.client.Transact(context.TODO(), operations...) + reply, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), operations...) require.NoError(suite.T(), err) _, err = ovsdb.CheckOperationResults(reply, operations) @@ -762,17 +853,17 @@ func (suite *OVSIntegrationSuite) TestCreateIPFIX() { UUID: namedUUID, Targets: []string{"127.0.0.1:6650"}, } - insertOp, err := suite.client.Create(&ipfix) + insertOp, err := suite.clientWithoutInactvityCheck.Create(&ipfix) require.NoError(suite.T(), err) bridge := bridgeType{ UUID: uuid, IPFIX: &namedUUID, } - updateOps, err := suite.client.Where(&bridge).Update(&bridge, &bridge.IPFIX) + updateOps, err := suite.clientWithoutInactvityCheck.Where(&bridge).Update(&bridge, &bridge.IPFIX) require.NoError(suite.T(), err) operations := append(insertOp, updateOps...) - reply, err := suite.client.Transact(context.TODO(), operations...) + reply, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), operations...) require.NoError(suite.T(), err) opErrs, err := ovsdb.CheckOperationResults(reply, operations) if err != nil { @@ -783,9 +874,9 @@ func (suite *OVSIntegrationSuite) TestCreateIPFIX() { // Delete the IPFIX row by removing it's strong reference bridge.IPFIX = nil - updateOps, err = suite.client.Where(&bridge).Update(&bridge, &bridge.IPFIX) + updateOps, err = suite.clientWithoutInactvityCheck.Where(&bridge).Update(&bridge, &bridge.IPFIX) require.NoError(suite.T(), err) - reply, err = suite.client.Transact(context.TODO(), updateOps...) + reply, err = suite.clientWithoutInactvityCheck.Transact(context.TODO(), updateOps...) require.NoError(suite.T(), err) opErrs, err = ovsdb.CheckOperationResults(reply, updateOps) if err != nil { @@ -797,7 +888,7 @@ func (suite *OVSIntegrationSuite) TestCreateIPFIX() { //Assert the IPFIX table is empty ipfixes := []ipfixType{} - err = suite.client.List(context.Background(), &ipfixes) + err = suite.clientWithoutInactvityCheck.List(context.Background(), &ipfixes) require.NoError(suite.T(), err) require.Empty(suite.T(), ipfixes) @@ -819,10 +910,10 @@ func (suite *OVSIntegrationSuite) TestWait() { }, } timeout := 0 - ops, err := suite.client.WhereAny(bridgeRow, conditions...).Wait( + ops, err := suite.clientWithoutInactvityCheck.WhereAny(bridgeRow, conditions...).Wait( ovsdb.WaitConditionNotEqual, &timeout, bridgeRow, &bridgeRow.Name) require.NoError(suite.T(), err) - reply, err := suite.client.Transact(context.Background(), ops...) + reply, err := suite.clientWithoutInactvityCheck.Transact(context.Background(), ops...) require.NoError(suite.T(), err) opErrs, err := ovsdb.CheckOperationResults(reply, ops) require.NoErrorf(suite.T(), err, "%+v", opErrs) @@ -844,20 +935,20 @@ func (suite *OVSIntegrationSuite) TestWait() { }, } timeout = 2 * 1000 // 2 seconds (in milliseconds) - ops, err = suite.client.WhereAny(bridgeRow, conditions...).Wait( + ops, err = suite.clientWithoutInactvityCheck.WhereAny(bridgeRow, conditions...).Wait( ovsdb.WaitConditionEqual, &timeout, bridgeRow, &bridgeRow.BridgeFailMode) require.NoError(suite.T(), err) - reply, err = suite.client.Transact(context.Background(), ops...) + reply, err = suite.clientWithoutInactvityCheck.Transact(context.Background(), ops...) require.NoError(suite.T(), err) opErrs, err = ovsdb.CheckOperationResults(reply, ops) require.NoErrorf(suite.T(), err, "%+v", opErrs) // Use wait to get a txn error due to until condition that is not happening timeout = 222 // milliseconds - ops, err = suite.client.WhereAny(bridgeRow, conditions...).Wait( + ops, err = suite.clientWithoutInactvityCheck.WhereAny(bridgeRow, conditions...).Wait( ovsdb.WaitConditionNotEqual, &timeout, bridgeRow, &bridgeRow.BridgeFailMode) require.NoError(suite.T(), err) - reply, err = suite.client.Transact(context.Background(), ops...) + reply, err = suite.clientWithoutInactvityCheck.Transact(context.Background(), ops...) require.NoError(suite.T(), err) _, err = ovsdb.CheckOperationResults(reply, ops) assert.Error(suite.T(), err) @@ -868,9 +959,9 @@ func (suite *OVSIntegrationSuite) createQueue(queueName string, dscp int) (strin DSCP: &dscp, } - insertOp, err := suite.client.Create(&q) + insertOp, err := suite.clientWithoutInactvityCheck.Create(&q) require.NoError(suite.T(), err) - reply, err := suite.client.Transact(context.TODO(), insertOp...) + reply, err := suite.clientWithoutInactvityCheck.Transact(context.TODO(), insertOp...) require.NoError(suite.T(), err) _, err = ovsdb.CheckOperationResults(reply, insertOp) @@ -885,18 +976,18 @@ func (suite *OVSIntegrationSuite) TestOpsWaitForReconnect() { } // Shutdown client - suite.client.Disconnect() + suite.clientWithoutInactvityCheck.Disconnect() require.Eventually(suite.T(), func() bool { - return !suite.client.Connected() + return !suite.clientWithoutInactvityCheck.Connected() }, 5*time.Second, 1*time.Second) - err := suite.client.SetOption( + err := suite.clientWithoutInactvityCheck.SetOption( client.WithReconnect(2*time.Second, &backoff.ZeroBackOff{}), ) require.NoError(suite.T(), err) var insertOp []ovsdb.Operation - insertOp, err = suite.client.Create(&ipfix) + insertOp, err = suite.clientWithoutInactvityCheck.Create(&ipfix) require.NoError(suite.T(), err) wg := sync.WaitGroup{} @@ -904,7 +995,7 @@ func (suite *OVSIntegrationSuite) TestOpsWaitForReconnect() { // delay reconnecting for 5 seconds go func() { time.Sleep(5 * time.Second) - err := suite.client.Connect(context.Background()) + err := suite.clientWithoutInactvityCheck.Connect(context.Background()) require.NoError(suite.T(), err) wg.Done() }() @@ -912,7 +1003,7 @@ func (suite *OVSIntegrationSuite) TestOpsWaitForReconnect() { // execute the transaction, should not fail and execute after reconnection ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - reply, err := suite.client.Transact(ctx, insertOp...) + reply, err := suite.clientWithoutInactvityCheck.Transact(ctx, insertOp...) require.NoError(suite.T(), err) _, err = ovsdb.CheckOperationResults(reply, insertOp) @@ -935,21 +1026,21 @@ func (suite *OVSIntegrationSuite) TestUnsetOptional() { } // verify the bridge has BridgeFailMode set - err = suite.client.Get(ctx, &br) + err = suite.clientWithoutInactvityCheck.Get(ctx, &br) require.NoError(suite.T(), err) require.NotNil(suite.T(), br.BridgeFailMode) // modify bridge to unset BridgeFailMode br.BridgeFailMode = nil - ops, err := suite.client.Where(&br).Update(&br, &br.BridgeFailMode) + ops, err := suite.clientWithoutInactvityCheck.Where(&br).Update(&br, &br.BridgeFailMode) require.NoError(suite.T(), err) - r, err := suite.client.Transact(ctx, ops...) + r, err := suite.clientWithoutInactvityCheck.Transact(ctx, ops...) require.NoError(suite.T(), err) _, err = ovsdb.CheckOperationResults(r, ops) require.NoError(suite.T(), err) // verify the bridge has BridgeFailMode unset - err = suite.client.Get(ctx, &br) + err = suite.clientWithoutInactvityCheck.Get(ctx, &br) require.NoError(suite.T(), err) require.Nil(suite.T(), br.BridgeFailMode) } @@ -967,21 +1058,21 @@ func (suite *OVSIntegrationSuite) TestUpdateOptional() { } // verify the bridge has BridgeFailMode set - err = suite.client.Get(ctx, &br) + err = suite.clientWithoutInactvityCheck.Get(ctx, &br) require.NoError(suite.T(), err) require.Equal(suite.T(), &BridgeFailModeSecure, br.BridgeFailMode) // modify bridge to update BridgeFailMode br.BridgeFailMode = &BridgeFailModeStandalone - ops, err := suite.client.Where(&br).Update(&br, &br.BridgeFailMode) + ops, err := suite.clientWithoutInactvityCheck.Where(&br).Update(&br, &br.BridgeFailMode) require.NoError(suite.T(), err) - r, err := suite.client.Transact(ctx, ops...) + r, err := suite.clientWithoutInactvityCheck.Transact(ctx, ops...) require.NoError(suite.T(), err) _, err = ovsdb.CheckOperationResults(r, ops) require.NoError(suite.T(), err) // verify the bridge has BridgeFailMode updated - err = suite.client.Get(ctx, &br) + err = suite.clientWithoutInactvityCheck.Get(ctx, &br) require.NoError(suite.T(), err) require.Equal(suite.T(), &BridgeFailModeStandalone, br.BridgeFailMode) } @@ -1061,12 +1152,12 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsSameRow() { Ports: []string{port10UUID, port1UUID}, ExternalIds: map[string]string{"key1": "value1"}, } - op, err := suite.client.Create(&br) + op, err := suite.clientWithoutInactvityCheck.Create(&br) require.NoError(suite.T(), err) ops = append(ops, op...) ovs := ovsType{} - op, err = suite.client.WhereCache(func(*ovsType) bool { return true }).Mutate(&ovs, model.Mutation{ + op, err = suite.clientWithoutInactvityCheck.WhereCache(func(*ovsType) bool { return true }).Mutate(&ovs, model.Mutation{ Field: &ovs.Bridges, Mutator: ovsdb.MutateOperationInsert, Value: []string{bridgeUUID}, @@ -1074,7 +1165,7 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsSameRow() { require.NoError(suite.T(), err) ops = append(ops, op...) - results, err := suite.client.Transact(ctx, ops...) + results, err := suite.clientWithoutInactvityCheck.Transact(ctx, ops...) require.NoError(suite.T(), err) _, err = ovsdb.CheckOperationResults(results, ops) @@ -1090,11 +1181,11 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsSameRow() { // Do several ops with the bridge in the same transaction br.Ports = []string{port10UUID} br.ExternalIds = map[string]string{"key1": "value1", "key10": "value10"} - op, err = suite.client.Where(&br).Update(&br, &br.Ports, &br.ExternalIds) + op, err = suite.clientWithoutInactvityCheck.Where(&br).Update(&br, &br.Ports, &br.ExternalIds) require.NoError(suite.T(), err) ops = append(ops, op...) - op, err = suite.client.Where(&br).Mutate(&br, + op, err = suite.clientWithoutInactvityCheck.Where(&br).Mutate(&br, model.Mutation{ Field: &br.ExternalIds, Mutator: ovsdb.MutateOperationInsert, @@ -1109,7 +1200,7 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsSameRow() { require.NoError(suite.T(), err) ops = append(ops, op...) - op, err = suite.client.Where(&br).Mutate(&br, + op, err = suite.clientWithoutInactvityCheck.Where(&br).Mutate(&br, model.Mutation{ Field: &br.ExternalIds, Mutator: ovsdb.MutateOperationDelete, @@ -1125,16 +1216,16 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsSameRow() { ops = append(ops, op...) datapathID = "datapathID_updated" - op, err = suite.client.Where(&br).Update(&br, &br.DatapathID) + op, err = suite.clientWithoutInactvityCheck.Where(&br).Update(&br, &br.DatapathID) require.NoError(suite.T(), err) ops = append(ops, op...) br.DatapathID = nil - op, err = suite.client.Where(&br).Update(&br, &br.DatapathID) + op, err = suite.clientWithoutInactvityCheck.Where(&br).Update(&br, &br.DatapathID) require.NoError(suite.T(), err) ops = append(ops, op...) - results, err = suite.client.Transact(ctx, ops...) + results, err = suite.clientWithoutInactvityCheck.Transact(ctx, ops...) require.NoError(suite.T(), err) errors, err := ovsdb.CheckOperationResults(results, ops) @@ -1145,7 +1236,7 @@ func (suite *OVSIntegrationSuite) TestMultipleOpsSameRow() { br = bridgeType{ UUID: bridgeUUID, } - err = suite.client.Get(ctx, &br) + err = suite.clientWithoutInactvityCheck.Get(ctx, &br) require.NoError(suite.T(), err) require.Equal(suite.T(), []string{port1UUID}, br.Ports)