Skip to content

Commit

Permalink
Add opentelemetry support
Browse files Browse the repository at this point in the history
1. Added context.Context to most API methods for span propagation
2. Spans are added to the parent (from context) when an API is called
3. Simple cache metrics can be registered with cache.RegisterMetrics()

Signed-off-by: Dave Tucker <dave@dtucker.co.uk>
  • Loading branch information
dave-tucker committed Jul 9, 2021
1 parent 586143b commit 0850976
Show file tree
Hide file tree
Showing 17 changed files with 626 additions and 107 deletions.
52 changes: 50 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ jobs:
- 2.13.0

runs-on: ubuntu-latest

steps:
- name: Set up Go 1.16
uses: actions/setup-go@v1
Expand All @@ -99,4 +99,52 @@ jobs:
- name: Integration Test
run: make integration-test
env:
OVS_IMAGE_TAG: ${{ matrix.ovs_version }}
OVS_IMAGE_TAG: ${{ matrix.ovs_version }}

images:
needs: build
name: Build Image
runs-on: ubuntu-latest
strategy:
matrix:
cmd:
- modelgen
- print_schema
- stress
steps:
- name: Check Out Repo
uses: actions/checkout@v2

- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1

- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
- name: Login to Docker Hub
if: ${{ contains(github.ref, 'refs/head/main') }}
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_HUB_USERNAME }}
password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}

- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
context: ovs
target: ${{ matrix.image.cmd }}
builder: ${{ steps.buildx.outputs.name }}
push: ${{ contains(github.ref, 'refs/head/main') }}
tags: libovsdb/${{ matrix.image.cmd }}:latest
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache

- name: Image digest
run: echo ${{ steps.docker_build.outputs.digest }}
12 changes: 12 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM golang:1.16-alpine as base
COPY . /src
WORKDIR /src

FROM base as stress
RUN go install ./cmd/stress

FROM base as print_schema
RUN go install ./cmd/print_schema

FROM base as modelgen
RUN go install ./cmd/modelgen
46 changes: 41 additions & 5 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"bytes"
"context"
"crypto/sha256"
"encoding/gob"
"encoding/hex"
Expand All @@ -15,8 +16,13 @@ import (
"github.com/ovn-org/libovsdb/mapper"
"github.com/ovn-org/libovsdb/model"
"github.com/ovn-org/libovsdb/ovsdb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
)

var tracer = otel.Tracer("libovsdb.ovn.org/cache")

const (
updateEvent = "update"
addEvent = "add"
Expand Down Expand Up @@ -346,14 +352,16 @@ func NewTableCache(schema *ovsdb.DatabaseSchema, dbModel *model.DBModel, data Da
}
}
}
return &TableCache{

tc := &TableCache{
cache: cache,
schema: schema,
eventProcessor: eventProcessor,
mapper: mapper.NewMapper(schema),
dbModel: dbModel,
mutex: sync.RWMutex{},
}, nil
}
return tc, nil
}

// Mapper returns the mapper
Expand Down Expand Up @@ -389,11 +397,11 @@ func (t *TableCache) Tables() []string {

// Update implements the update method of the NotificationHandler interface
// this populates the cache with new updates
func (t *TableCache) Update(context interface{}, tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Update(monitorID interface{}, tableUpdates ovsdb.TableUpdates) {
if len(tableUpdates) == 0 {
return
}
t.Populate(tableUpdates)
t.Populate(context.TODO(), tableUpdates)
}

// Locked implements the locked method of the NotificationHandler interface
Expand All @@ -412,8 +420,36 @@ func (t *TableCache) Echo([]interface{}) {
func (t *TableCache) Disconnected() {
}

// RegisterMetrics registers a metric for all known tables
func (t *TableCache) RegisterMetrics() error {
for _, tt := range t.Tables() {
if err := t.registerMetric(tt); err != nil {
return err
}
}
return nil
}

func (t *TableCache) registerMetric(table string) error {
meter := global.Meter("libovsdb.ovn.org/cache")
if _, ok := t.cache[table]; !ok {
return fmt.Errorf("table not found")
}
_, err := meter.NewInt64ValueObserver(
fmt.Sprintf("libovsdb.cache.%s.size", strings.ToLower(table)),
func(ctx context.Context, result metric.Int64ObserverResult) {
value := t.Table(table).Len()
result.Observe(int64(value))
},
metric.WithDescription(fmt.Sprintf("the size of the %s table in the cache", strings.ToLower(table))),
)
return err
}

// Populate adds data to the cache and places an event on the channel
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Populate(ctx context.Context, tableUpdates ovsdb.TableUpdates) {
_, span := tracer.Start(ctx, "cache_populate")
defer span.End()
t.mutex.Lock()
defer t.mutex.Unlock()

Expand Down
7 changes: 4 additions & 3 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"context"
"testing"

"encoding/json"
Expand Down Expand Up @@ -721,7 +722,7 @@ func TestTableCache_populate(t *testing.T) {
},
},
}
tc.Populate(updates)
tc.Populate(context.Background(), updates)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -733,7 +734,7 @@ func TestTableCache_populate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
tc.Populate(context.Background(), updates)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -744,7 +745,7 @@ func TestTableCache_populate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
tc.Populate(context.Background(), updates)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down
64 changes: 44 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/ovn-org/libovsdb/mapper"
"github.com/ovn-org/libovsdb/model"
"github.com/ovn-org/libovsdb/ovsdb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// Constants defined for libovsdb
Expand All @@ -32,6 +35,9 @@ const (
// ErrNotConnected is an error returned when the client is not connected
var ErrNotConnected = errors.New("not connected")

// tracer is the tracer for opentelemetry
var tracer = otel.Tracer("libovsdb.ovn.org/client")

// Client represents an OVSDB Client Connection
// It provides all the necessary functionality to Connect to a server,
// perform transactions, and build your own replica of the database with
Expand All @@ -46,12 +52,12 @@ type Client interface {
SetOption(Option) error
Connected() bool
DisconnectNotify() chan struct{}
Echo() error
Transact(...ovsdb.Operation) ([]ovsdb.OperationResult, error)
Monitor(...TableMonitor) (string, error)
MonitorAll() (string, error)
MonitorCancel(id string) error
NewTableMonitor(m model.Model, fields ...interface{}) TableMonitor
Echo(context.Context) error
Transact(context.Context, ...ovsdb.Operation) ([]ovsdb.OperationResult, error)
Monitor(context.Context, ...TableMonitor) (string, error)
MonitorAll(context.Context) (string, error)
MonitorCancel(context.Context, string) error
NewTableMonitor(model.Model, ...interface{}) TableMonitor
API
}

Expand Down Expand Up @@ -149,7 +155,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
return err
}

dbs, err := o.listDbs()
dbs, err := o.listDbs(ctx)
if err != nil {
o.rpcClient.Close()
return err
Expand All @@ -167,7 +173,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
return fmt.Errorf("target database not found")
}

schema, err := o.getSchema(o.dbModel.Name())
schema, err := o.getSchema(ctx, o.dbModel.Name())
errors := o.dbModel.Validate(schema)
if len(errors) > 0 {
var combined []string
Expand Down Expand Up @@ -205,7 +211,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
o.monitorsMutex.Lock()
defer o.monitorsMutex.Unlock()
for id, request := range o.monitors {
err = o.monitor(id, reconnect, request...)
err = o.monitor(ctx, id, reconnect, request...)
if err != nil {
o.rpcClient.Close()
return err
Expand Down Expand Up @@ -312,7 +318,9 @@ func (o *ovsdbClient) update(args []json.RawMessage, reply *[]interface{}) error
// getSchema returns the schema in use for the provided database name
// RFC 7047 : get_schema
// Should only be called when mutex is held
func (o *ovsdbClient) getSchema(dbName string) (*ovsdb.DatabaseSchema, error) {
func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (*ovsdb.DatabaseSchema, error) {
_, span := tracer.Start(ctx, "get_schema")
defer span.End()
args := ovsdb.NewGetSchemaArgs(dbName)
var reply ovsdb.DatabaseSchema
err := o.rpcClient.Call("get_schema", args, &reply)
Expand All @@ -328,7 +336,9 @@ func (o *ovsdbClient) getSchema(dbName string) (*ovsdb.DatabaseSchema, error) {
// listDbs returns the list of databases on the server
// RFC 7047 : list_dbs
// Should only be called when mutex is held
func (o *ovsdbClient) listDbs() ([]string, error) {
func (o *ovsdbClient) listDbs(ctx context.Context) ([]string, error) {
_, span := tracer.Start(ctx, "list_dbs")
defer span.End()
var dbs []string
err := o.rpcClient.Call("list_dbs", nil, &dbs)
if err != nil {
Expand All @@ -342,13 +352,17 @@ func (o *ovsdbClient) listDbs() ([]string, error) {

// Transact performs the provided Operations on the database
// RFC 7047 : transact
func (o *ovsdbClient) Transact(operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
func (o *ovsdbClient) Transact(ctx context.Context, operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
_, span := tracer.Start(ctx, "transact")
defer span.End()
span.AddEvent("validating operations")
var reply []ovsdb.OperationResult
if ok := o.Schema().ValidateOperations(operation...); !ok {
return nil, fmt.Errorf("validation failed for the operation")
}
args := ovsdb.NewTransactArgs(o.schema.Name, operation...)

span.AddEvent("sending request to server")
o.rpcMutex.Lock()
if o.rpcClient == nil {
o.rpcMutex.Unlock()
Expand All @@ -366,17 +380,21 @@ func (o *ovsdbClient) Transact(operation ...ovsdb.Operation) ([]ovsdb.OperationR
}

// MonitorAll is a convenience method to monitor every table/column
func (o *ovsdbClient) MonitorAll() (string, error) {
func (o *ovsdbClient) MonitorAll(ctx context.Context) (string, error) {
ctx, span := tracer.Start(ctx, "monitor_all")
defer span.End()
var options []TableMonitor
for name := range o.dbModel.Types() {
options = append(options, TableMonitor{Table: name})
}
return o.Monitor(options...)
return o.Monitor(ctx, options...)
}

// MonitorCancel will request cancel a previously issued monitor request
// RFC 7047 : monitor_cancel
func (o *ovsdbClient) MonitorCancel(id string) error {
func (o *ovsdbClient) MonitorCancel(ctx context.Context, id string) error {
_, span := tracer.Start(ctx, "monitor_cancel")
defer span.End()
var reply ovsdb.OperationResult
args := ovsdb.NewMonitorCancelArgs(id)
o.rpcMutex.Lock()
Expand Down Expand Up @@ -428,12 +446,16 @@ func (o *ovsdbClient) NewTableMonitor(m model.Model, fields ...interface{}) Tabl
// and populate the cache with them. Subsequent updates will be processed
// by the Update Notifications
// RFC 7047 : monitor
func (o *ovsdbClient) Monitor(options ...TableMonitor) (string, error) {
func (o *ovsdbClient) Monitor(ctx context.Context, options ...TableMonitor) (string, error) {
ctx, span := tracer.Start(ctx, "monitor")
defer span.End()
id := uuid.NewString()
return id, o.monitor(id, false, options...)
return id, o.monitor(ctx, id, false, options...)
}

func (o *ovsdbClient) monitor(id string, reconnect bool, options ...TableMonitor) error {
func (o *ovsdbClient) monitor(ctx context.Context, id string, reconnect bool, options ...TableMonitor) error {
ctx, span := tracer.Start(ctx, "monitor.internal", trace.WithAttributes(attribute.String("id", id)))
defer span.End()
if len(options) == 0 {
return fmt.Errorf("no monitor options provided")
}
Expand Down Expand Up @@ -475,12 +497,14 @@ func (o *ovsdbClient) monitor(id string, reconnect bool, options ...TableMonitor
defer o.monitorsMutex.Unlock()
o.monitors[id] = options
}
o.cache.Populate(reply)
o.cache.Populate(ctx, reply)
return nil
}

// Echo tests the liveness of the OVSDB connetion
func (o *ovsdbClient) Echo() error {
func (o *ovsdbClient) Echo(ctx context.Context) error {
_, span := tracer.Start(ctx, "echo")
defer span.End()
args := ovsdb.NewEchoArgs()
var reply []interface{}
o.rpcMutex.RLock()
Expand Down
Loading

0 comments on commit 0850976

Please sign in to comment.