Skip to content

Commit

Permalink
Model-based Monitor API
Browse files Browse the repository at this point in the history
Specifying a custom Monitor is harder now we have models as it requires
the user to know the names of the Table and Fields they are interested
in. We already have this data in the model.

This commit changes the API to make it easier.

For example:

```
err = ovs.Monitor("play_with_ovs",
	ovs.NewTableMonitor(&OpenvSwitch{}),
	ovs.NewTableMonitor(&Bridge{}),
)
```

You can optionally provide pointers to the fields in a model to monitor
only those columns.

Signed-off-by: Dave Tucker <dave@dtucker.co.uk>
  • Loading branch information
dave-tucker committed Jun 14, 2021
1 parent e6caf40 commit 533bdbc
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 23 deletions.
3 changes: 3 additions & 0 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"sync"
"testing"

"encoding/json"
Expand Down Expand Up @@ -49,6 +50,7 @@ func TestRowCache_Row(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
r := &RowCache{
cache: tt.fields.cache,
mutex: sync.RWMutex{},
}
got := r.Row(tt.args.uuid)
assert.Equal(t, tt.want, got)
Expand All @@ -75,6 +77,7 @@ func TestRowCache_Rows(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
r := &RowCache{
cache: tt.fields.cache,
mutex: sync.RWMutex{},
}
got := r.Rows()
assert.ElementsMatch(t, tt.want, got)
Expand Down
3 changes: 3 additions & 0 deletions client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ func (a api) Mutate(model model.Model, mutationObjs ...model.Mutation) ([]ovsdb.
}

tableName := a.cache.DBModel().FindTable(reflect.ValueOf(model).Type())
if tableName == "" {
return nil, fmt.Errorf("table not found for object")
}
table := a.cache.Mapper().Schema.Table(tableName)
if table == nil {
return nil, fmt.Errorf("schema error: table not found in Database Model for type %s", reflect.TypeOf(model))
Expand Down
68 changes: 53 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"github.com/cenkalti/rpc2"
"github.com/cenkalti/rpc2/jsonrpc"
"github.com/ovn-org/libovsdb/cache"
"github.com/ovn-org/libovsdb/mapper"
"github.com/ovn-org/libovsdb/model"
"github.com/ovn-org/libovsdb/ovsdb"
)

// OvsdbClient is an OVSDB client
type OvsdbClient struct {
rpcClient *rpc2.Client
dbModel *model.DBModel
Schema ovsdb.DatabaseSchema
handlers []ovsdb.NotificationHandler
handlersMutex *sync.Mutex
Expand All @@ -29,9 +31,10 @@ type OvsdbClient struct {
api API
}

func newOvsdbClient() *OvsdbClient {
func newOvsdbClient(dbModel *model.DBModel) *OvsdbClient {
// Cache initialization is delayed because we first need to obtain the schema
ovs := &OvsdbClient{
dbModel: dbModel,
handlersMutex: &sync.Mutex{},
stopCh: make(chan struct{}),
}
Expand Down Expand Up @@ -85,7 +88,7 @@ func Connect(ctx context.Context, database *model.DBModel, opts ...Option) (*Ovs
}

func newRPC2Client(conn net.Conn, database *model.DBModel) (*OvsdbClient, error) {
ovs := newOvsdbClient()
ovs := newOvsdbClient(database)
ovs.rpcClient = rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn))
ovs.rpcClient.SetBlocking(true)
ovs.rpcClient.Handle("echo", func(_ *rpc2.Client, args []interface{}, reply *[]interface{}) error {
Expand Down Expand Up @@ -254,18 +257,11 @@ func (ovs OvsdbClient) Transact(operation ...ovsdb.Operation) ([]ovsdb.Operation

// MonitorAll is a convenience method to monitor every table/column
func (ovs OvsdbClient) MonitorAll(jsonContext interface{}) error {
requests := make(map[string]ovsdb.MonitorRequest)
for table, tableSchema := range ovs.Schema.Tables {
var columns []string
for column := range tableSchema.Columns {
columns = append(columns, column)
}
requests[table] = ovsdb.MonitorRequest{
Columns: columns,
Select: ovsdb.NewDefaultMonitorSelect(),
}
var options []TableMonitor
for name := range ovs.dbModel.Types() {
options = append(options, TableMonitor{Table: name})
}
return ovs.Monitor(jsonContext, requests)
return ovs.Monitor(jsonContext, options...)
}

// MonitorCancel will request cancel a previously issued monitor request
Expand All @@ -285,13 +281,55 @@ func (ovs OvsdbClient) MonitorCancel(jsonContext interface{}) error {
return nil
}

// TableMonitor is a table to be monitored
type TableMonitor struct {
// Table is the table to be monitored
Table string
// Fields are the fields in the model to monitor
// If none are supplied, all fields will be used
Fields []interface{}
}

func (ovs *OvsdbClient) NewTableMonitor(m model.Model, fields ...interface{}) TableMonitor {
tableName := ovs.dbModel.FindTable(reflect.TypeOf(m))
if tableName == "" {
panic(fmt.Sprintf("Object of type %s is not part of the DBModel", reflect.TypeOf(m)))
}
return TableMonitor{
Table: tableName,
Fields: fields,
}
}

// Monitor will provide updates for a given table/column
// and populate the cache with them. Subsequent updates will be processed
// by the Update Notifications
// RFC 7047 : monitor
func (ovs OvsdbClient) Monitor(jsonContext interface{}, requests map[string]ovsdb.MonitorRequest) error {
func (ovs OvsdbClient) Monitor(jsonContext interface{}, options ...TableMonitor) error {
var reply ovsdb.TableUpdates

mapper := mapper.NewMapper(&ovs.Schema)
typeMap := ovs.dbModel.Types()
requests := make(map[string]ovsdb.MonitorRequest)
if len(options) == 0 {
return fmt.Errorf("no monitor options provided")
}
for _, o := range options {
var fields []interface{}
if len(o.Fields) > 0 {
fields = o.Fields
} else {
fields = nil
}
m, ok := typeMap[o.Table]
if !ok {
return fmt.Errorf("type for table %s does not exist in dbModel", o.Table)
}
request, err := mapper.NewMonitorRequest(o.Table, m, fields)
if err != nil {
return err
}
requests[o.Table] = *request
}
args := ovsdb.NewMonitorArgs(ovs.Schema.Name, jsonContext, requests)
err := ovs.rpcClient.Call("monitor", args, &reply)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion client/ovs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,10 @@ func TestMonitorCancelIntegration(t *testing.T) {
Select: ovsdb.NewDefaultMonitorSelect(),
}

err = ovs.Monitor(monitorID, requests)
err = ovs.Monitor(monitorID,
ovs.NewTableMonitor(&ovsType{}),
ovs.NewTableMonitor(&bridgeType{}),
)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions example/play_with_ovs/play_with_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ func main() {
}
},
})

err = ovs.MonitorAll("")
err = ovs.Monitor("play_with_ovs",
ovs.NewTableMonitor(&OpenvSwitch{}),
ovs.NewTableMonitor(&Bridge{}),
)
if err != nil {
log.Fatal(err)
}
Expand Down
10 changes: 5 additions & 5 deletions mapper/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func (i *Info) SetField(column string, value interface{}) error {
return nil
}

// ColumnByPtr returns the column name that corresponds to the field by the field's pminter
// ColumnByPtr returns the column name that corresponds to the field by the field's pointer
func (i *Info) ColumnByPtr(fieldPtr interface{}) (string, error) {
fieldPtrVal := reflect.ValueOf(fieldPtr)
if fieldPtrVal.Kind() != reflect.Ptr {
return "", ovsdb.NewErrWrongType("ColumnByPminter", "pminter to a field in the struct", fieldPtr)
return "", ovsdb.NewErrWrongType("ColumnByPointer", "pointer to a field in the struct", fieldPtr)
}
offset := fieldPtrVal.Pointer() - reflect.ValueOf(i.obj).Pointer()
objType := reflect.TypeOf(i.obj).Elem()
Expand All @@ -64,7 +64,7 @@ func (i *Info) ColumnByPtr(fieldPtr interface{}) (string, error) {
return column, nil
}
}
return "", fmt.Errorf("field pminter does not correspond to orm struct")
return "", fmt.Errorf("field pointer does not correspond to orm struct")
}

// getValidIndexes inspects the object and returns the a list of indexes (set of columns) for witch
Expand Down Expand Up @@ -104,11 +104,11 @@ OUTER:
func NewInfo(table *ovsdb.TableSchema, obj interface{}) (*Info, error) {
objPtrVal := reflect.ValueOf(obj)
if objPtrVal.Type().Kind() != reflect.Ptr {
return nil, ovsdb.NewErrWrongType("NewMapperInfo", "pminter to a struct", obj)
return nil, ovsdb.NewErrWrongType("NewMapperInfo", "pointer to a struct", obj)
}
objVal := reflect.Indirect(objPtrVal)
if objVal.Kind() != reflect.Struct {
return nil, ovsdb.NewErrWrongType("NewMapperInfo", "pminter to a struct", obj)
return nil, ovsdb.NewErrWrongType("NewMapperInfo", "pointer to a struct", obj)
}
objType := objVal.Type()

Expand Down
27 changes: 27 additions & 0 deletions mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,30 @@ func (m Mapper) equalIndexes(table *ovsdb.TableSchema, one, other interface{}, i
}
return false, nil
}

// NewMonitorRequest returns a monitor request for the provided tableName
// If fields is provided, the request will be constrained to the provided columns
// If no fields are provided, all columns will be used
func (m *Mapper) NewMonitorRequest(tableName string, data interface{}, fields []interface{}) (*ovsdb.MonitorRequest, error) {
var columns []string
schema := m.Schema.Tables[tableName]
info, err := NewInfo(&schema, data)
if err != nil {
return nil, err
}
if len(fields) > 0 {
for _, f := range fields {
column, err := info.ColumnByPtr(f)
if err != nil {
return nil, err
}
columns = append(columns, column)
}
} else {
columns = append(columns, "_uuid")
for c := range info.table.Columns {
columns = append(columns, c)
}
}
return &ovsdb.MonitorRequest{Columns: columns, Select: ovsdb.NewDefaultMonitorSelect()}, nil
}
66 changes: 66 additions & 0 deletions mapper/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ovn-org/libovsdb/ovsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
Expand Down Expand Up @@ -1027,3 +1028,68 @@ func testOvsMap(t *testing.T, set interface{}) *ovsdb.OvsMap {
assert.Nil(t, err)
return oMap
}

func TestNewMonitorRequest(t *testing.T) {
var testSchema = []byte(`{
"cksum": "223619766 22548",
"name": "TestSchema",
"tables": {
"TestTable": {
"indexes": [["name"],["composed_1","composed_2"]],
"columns": {
"name": {
"type": "string"
},
"composed_1": {
"type": {
"key": "string"
}
},
"composed_2": {
"type": {
"key": "string"
}
},
"int1": {
"type": {
"key": "integer"
}
},
"int2": {
"type": {
"key": "integer"
}
},
"config": {
"type": {
"key": "string",
"max": "unlimited",
"min": 0,
"value": "string"
}
}
}
}
}
}`)
type testType struct {
ID string `ovsdb:"_uuid"`
MyName string `ovsdb:"name"`
Config map[string]string `ovsdb:"config"`
Comp1 string `ovsdb:"composed_1"`
Comp2 string `ovsdb:"composed_2"`
Int1 int `ovsdb:"int1"`
Int2 int `ovsdb:"int2"`
}
var schema ovsdb.DatabaseSchema
err := json.Unmarshal(testSchema, &schema)
require.NoError(t, err)
mapper := NewMapper(&schema)
testTable := &testType{}
mr, err := mapper.NewMonitorRequest("TestTable", testTable, nil)
require.NoError(t, err)
assert.ElementsMatch(t, mr.Columns, []string{"_uuid", "name", "config", "composed_1", "composed_2", "int1", "int2"})
mr2, err := mapper.NewMonitorRequest("TestTable", testTable, []interface{}{&testTable.ID, &testTable.MyName})
require.NoError(t, err)
assert.ElementsMatch(t, mr2.Columns, []string{"_uuid", "name"})
}

0 comments on commit 533bdbc

Please sign in to comment.