Skip to content

Commit

Permalink
Set deadline for tcp connection
Browse files Browse the repository at this point in the history
This sets up tcp connection deadline for write and read requests
to fail after timeout instead of blocking for a long time in case
of accidental disconnection of the ovsdb leader.

Signed-off-by: Periyasamy Palanisamy <pepalani@redhat.com>
  • Loading branch information
pperiyasamy committed Nov 8, 2023
1 parent cf04722 commit d569373
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type ovsdbClient struct {
metrics metrics
connected bool
rpcClient *rpc2.Client
conn net.Conn
rpcMutex sync.RWMutex
// endpoints contains all possible endpoints; the first element is
// the active endpoint if connected=true
Expand Down Expand Up @@ -427,6 +428,7 @@ func (o *ovsdbClient) createRPC2Client(conn net.Conn) {
if o.options.inactivityTimeout > 0 {
o.trafficSeen = make(chan struct{})
}
o.conn = conn
o.rpcClient = rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn))
o.rpcClient.SetBlocking(true)
o.rpcClient.Handle("echo", func(_ *rpc2.Client, args []interface{}, reply *[]interface{}) error {
Expand Down Expand Up @@ -748,7 +750,7 @@ func (o *ovsdbClient) update3(params []json.RawMessage, reply *[]interface{}) er
func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.DatabaseSchema, error) {
args := ovsdb.NewGetSchemaArgs(dbName)
var reply ovsdb.DatabaseSchema
err := o.rpcClient.CallWithContext(ctx, "get_schema", args, &reply)
err := o.CallWithContext(ctx, "get_schema", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ovsdb.DatabaseSchema{}, ErrNotConnected
Expand All @@ -763,7 +765,7 @@ func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.Datab
// Should only be called when mutex is held
func (o *ovsdbClient) listDbs(ctx context.Context) ([]string, error) {
var dbs []string
err := o.rpcClient.CallWithContext(ctx, "list_dbs", nil, &dbs)
err := o.CallWithContext(ctx, "list_dbs", nil, &dbs)
if err != nil {
if err == rpc2.ErrShutdown {
return nil, ErrNotConnected
Expand Down Expand Up @@ -836,7 +838,7 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite b
if dbgLogger.Enabled() {
dbgLogger.Info("transacting operations", "operations", fmt.Sprintf("%+v", operation))
}
err := o.rpcClient.CallWithContext(ctx, "transact", args, &reply)
err := o.CallWithContext(ctx, "transact", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return nil, ErrNotConnected
Expand Down Expand Up @@ -869,7 +871,7 @@ func (o *ovsdbClient) MonitorCancel(ctx context.Context, cookie MonitorCookie) e
if o.rpcClient == nil {
return ErrNotConnected
}
err := o.rpcClient.CallWithContext(ctx, "monitor_cancel", args, &reply)
err := o.CallWithContext(ctx, "monitor_cancel", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ErrNotConnected
Expand Down Expand Up @@ -981,15 +983,15 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
switch monitor.Method {
case ovsdb.MonitorRPC:
var reply ovsdb.TableUpdates
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
tableUpdates = reply
case ovsdb.ConditionalMonitorRPC:
var reply ovsdb.TableUpdates2
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
tableUpdates = reply
case ovsdb.ConditionalMonitorSinceRPC:
var reply ovsdb.MonitorCondSinceReply
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
if err == nil && reply.Found {
monitor.LastTransactionID = reply.LastTransactionID
lastTransactionFound = true
Expand Down Expand Up @@ -1080,7 +1082,7 @@ func (o *ovsdbClient) Echo(ctx context.Context) error {
if o.rpcClient == nil {
return ErrNotConnected
}
err := o.rpcClient.CallWithContext(ctx, "echo", args, &reply)
err := o.CallWithContext(ctx, "echo", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ErrNotConnected
Expand Down Expand Up @@ -1439,3 +1441,20 @@ func (o *ovsdbClient) WhereAll(m model.Model, conditions ...model.Condition) Con
func (o *ovsdbClient) WhereCache(predicate interface{}) ConditionalAPI {
return o.primaryDB().api.WhereCache(predicate)
}

// CallWithContext invokes the named function, waits for it to complete, and
// returns its error status, or an error from Context timeout.
func (o *ovsdbClient) CallWithContext(ctx context.Context, method string, args interface{}, reply interface{}) error {
// Set up read/write deadline for tcp connection before making
// a rpc request to the server.
if reflect.TypeOf(o.conn).String() == "*net.TCPConn" {
tcpConn := o.conn.(*net.TCPConn)
if o.options.timeout > 0 {
err := tcpConn.SetDeadline(time.Now().Add(o.options.timeout * 3))
if err != nil {
return err
}
}
}
return o.rpcClient.CallWithContext(ctx, method, args, reply)
}

0 comments on commit d569373

Please sign in to comment.