Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary and secondary client indexes and client-side transaction validation #314

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: v1.40.1
version: v1.46.2
skip-go-installation: true
skip-pkg-cache: true
skip-build-cache: true
Expand Down
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Additional indexes can be specified for a client instance to track. Just as sche
where each set consists of the columns that compose the index. Unlike schema indexes, a key within a column can be addressed if the column
type is a map.

Client indexes are leveraged through `Where`, and `WhereAll`. Since client indexes value uniqueness is not enforced as it happens with schema indexes,
Client indexes are leveraged through `Where`, and `WhereAll`. Since client indexes value uniqueness is not guaranteed as it happens with schema indexes,
conditions based on them can match multiple rows.

Indexed based operations generally provide better performance than operations based on explicit conditions.
Expand All @@ -116,6 +116,18 @@ can now be improved with:
// quick indexed result
ovn.Where(lb).List(ctx, &results)

Client indexes can be one of two types:

* **Primary**: Primary client indexes are verified to be unique in the context of a specific client instance in an optional transaction validation that happens
before sending the transaction to the server. Client option `ValidateTransactions` needs to be enabled. When it is, if a transaction includes a primary client
index that already exists for a different row in the client cache or the transaction itself, the transaction will fail. Be aware that this transaction validation
comes at a performance cost. Primary client indexes are the default unless otherwise specified but `ValidateTransactions` option is not enabled by default.
* **Secondary**: Secondary client indexes don't have any additional validation.

**Note**: `ValidateTransactions` does not consider strongly referenced rows that should be garbage collected and may report a duplicate index when it shouldn't.
The workarounds would be either to not use validation, to explicitly remove strong referenced rows in the transaction, or to use a separate transaction than the
one that removes the strongly referenced row to create a similar one with the same index.

## Documentation

This package is divided into several sub-packages. Documentation for each sub-package is available at [pkg.go.dev][doc]:
Expand Down
125 changes: 97 additions & 28 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ type ErrIndexExists struct {
Value interface{}
Index string
New string
Existing string
Existing []string
}

func (e *ErrIndexExists) Error() string {
return fmt.Sprintf("cannot insert %s in the %s table. item %s has identical indexes. index: %s, value: %v", e.New, e.Table, e.Existing, e.Index, e.Value)
}

func NewIndexExistsError(table string, value interface{}, index string, new, existing string) *ErrIndexExists {
func NewIndexExistsError(table string, value interface{}, index string, new string, existing []string) *ErrIndexExists {
return &ErrIndexExists{
table, value, index, new, existing,
}
Expand All @@ -83,9 +83,14 @@ type indexType uint

const (
schemaIndexType indexType = iota
clientIndexType
primaryClientIndexType
secondaryClientIndexType
)

func clientIndexTypeToCacheIndexType(modelIndexType model.IndexType) indexType {
return indexType(modelIndexType + 1)
}

// indexSpec contains details about an index
type indexSpec struct {
index index
Expand All @@ -94,13 +99,17 @@ type indexSpec struct {
}

func (s indexSpec) isClientIndex() bool {
return s.indexType == clientIndexType
return s.indexType >= primaryClientIndexType
}

func (s indexSpec) isSchemaIndex() bool {
return s.indexType == schemaIndexType
}

func (s indexSpec) isPrimaryIndex() bool {
return s.indexType < secondaryClientIndexType
}

// newIndex builds a index from a list of columns
func newIndexFromColumns(columns ...string) index {
sort.Strings(columns)
Expand Down Expand Up @@ -166,6 +175,13 @@ func (r *RowCache) Row(uuid string) model.Model {
return r.rowByUUID(uuid)
}

func (r *RowCache) HasRow(uuid string) bool {
r.mutex.RLock()
defer r.mutex.RUnlock()
_, found := r.cache[uuid]
return found
}

func (r *RowCache) rowsByModel(m model.Model, useClientIndexes bool) map[string]model.Model {
r.mutex.RLock()
defer r.mutex.RUnlock()
Expand Down Expand Up @@ -257,7 +273,7 @@ func (r *RowCache) Create(uuid string, m model.Model, checkIndexes bool) error {

vals := r.indexes[index]
if existing, ok := vals[val]; ok && !existing.empty() && checkIndexes && indexSpec.isSchemaIndex() {
return NewIndexExistsError(r.name, val, string(index), uuid, existing.getAny())
return NewIndexExistsError(r.name, val, string(index), uuid, existing.list())
}

uuidset := newUUIDSet(uuid)
Expand Down Expand Up @@ -285,7 +301,7 @@ func (r *RowCache) Update(uuid string, m model.Model, checkIndexes bool) (model.
if _, ok := r.cache[uuid]; !ok {
return nil, NewErrCacheInconsistent(fmt.Sprintf("cannot update row %s as it does not exist in the cache", uuid))
}
oldRow := model.Clone(r.cache[uuid])
oldRow := r.cache[uuid]
oldInfo, err := r.dbModel.NewModelInfo(oldRow)
if err != nil {
return nil, err
Expand Down Expand Up @@ -322,7 +338,7 @@ func (r *RowCache) Update(uuid string, m model.Model, checkIndexes bool) (model.
newVal,
string(index),
uuid,
existing.getAny(),
existing.list(),
))
}

Expand Down Expand Up @@ -352,6 +368,8 @@ func (r *RowCache) Update(uuid string, m model.Model, checkIndexes bool) (model.
return oldRow, nil
}

// IndexExists checks if any of the provided model primary indexes (schema index
// or client primary index) is already in the cache.
func (r *RowCache) IndexExists(row model.Model) error {
info, err := r.dbModel.NewModelInfo(row)
if err != nil {
Expand All @@ -363,10 +381,8 @@ func (r *RowCache) IndexExists(row model.Model) error {
}
uuid := field.(string)
for _, indexSpec := range r.indexSpecs {
if !indexSpec.isSchemaIndex() {
// Given the ordered indexSpecs, we can break here if we reach the
// first non schema index
break
if !indexSpec.isPrimaryIndex() {
continue
}
index := indexSpec.index
val, err := valueFromIndex(info, indexSpec.columns)
Expand All @@ -380,7 +396,7 @@ func (r *RowCache) IndexExists(row model.Model) error {
val,
string(index),
uuid,
existing.getAny(),
existing.list(),
)
}
}
Expand Down Expand Up @@ -616,8 +632,10 @@ func (r *RowCache) uuidsByConditionsAsIndexes(conditions []ovsdb.Condition, nati
return matching, err
}

// RowsByCondition searches models in the cache that match all conditions
func (r *RowCache) RowsByCondition(conditions []ovsdb.Condition) (map[string]model.Model, error) {
type cloner func(model.Model) model.Model

// rowsByCondition searches models in the cache that match all conditions
func (r *RowCache) rowsByCondition(conditions []ovsdb.Condition, cloner cloner) (map[string]model.Model, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
results := make(map[string]model.Model)
Expand All @@ -626,7 +644,7 @@ func (r *RowCache) RowsByCondition(conditions []ovsdb.Condition) (map[string]mod
// no conditions matches all rows
if len(conditions) == 0 {
for uuid := range r.cache {
results[uuid] = r.rowByUUID(uuid)
results[uuid] = cloner(r.cache[uuid])
}
return results, nil
}
Expand Down Expand Up @@ -715,12 +733,24 @@ func (r *RowCache) RowsByCondition(conditions []ovsdb.Condition) (map[string]mod
}

for uuid := range matching {
results[uuid] = r.rowByUUID(uuid)
results[uuid] = cloner(r.cache[uuid])
}

return results, nil
}

func (r *RowCache) RowsByCondition(conditions []ovsdb.Condition) (map[string]model.Model, error) {
return r.rowsByCondition(conditions, func(m model.Model) model.Model {
return model.Clone(m)
})
}

func (r *RowCache) RowsByConditionShallow(conditions []ovsdb.Condition) (map[string]model.Model, error) {
return r.rowsByCondition(conditions, func(m model.Model) model.Model {
return m
})
}

// Len returns the length of the cache
func (r *RowCache) Len() int {
r.mutex.RLock()
Expand Down Expand Up @@ -912,7 +942,12 @@ func (t *TableCache) Disconnected() {
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) error {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.populate(tableUpdates)
}

// populate adds data to the cache and places an event on the channel, needs to
// be called with the table mutex locked
func (t *TableCache) populate(tableUpdates ovsdb.TableUpdates) error {
for table := range t.dbModel.Types() {
updates, ok := tableUpdates[table]
if !ok {
Expand Down Expand Up @@ -1024,6 +1059,20 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) error {
}
t.eventProcessor.AddEvent(updateEvent, table, existing, modified)
}
case row.Old != nil && row.New != nil:
// internally, we handle Mutates with Modify, but Updates with
// Old/New as calculating Modify from Old/New has a cost
updates1 := ovsdb.TableUpdates{
table: ovsdb.TableUpdate{
uuid: &ovsdb.RowUpdate{
Old: row.Old,
New: row.New,
},
},
}
if err := t.populate(updates1); err != nil {
return err
}
case row.Delete != nil:
fallthrough
default:
Expand Down Expand Up @@ -1108,7 +1157,8 @@ func newRowCache(name string, dbModel model.DatabaseModel, dataType reflect.Type
if _, ok := indexes[index]; ok {
continue
}
spec := indexSpec{index: index, columns: columnKeys, indexType: clientIndexType}
indexType := clientIndexTypeToCacheIndexType(clientIndex.Type)
spec := indexSpec{index: index, columns: columnKeys, indexType: indexType}
r.indexSpecs = append(r.indexSpecs, spec)
indexes[index] = spec
}
Expand Down Expand Up @@ -1137,19 +1187,28 @@ type event struct {
// eventProcessor handles the queueing and processing of cache events
type eventProcessor struct {
events chan event
// handlersMutex locks the handlers array when we add a handler or dispatch events
// we don't need a RWMutex in this case as we only have one thread reading and the write
// volume is very low (i.e only when AddEventHandler is called)
handlersMutex sync.Mutex
handlers []EventHandler
logger *logr.Logger
// eventMutex locks the events channel and handlers array when we add a
// handler or dispatch events
eventMutex sync.RWMutex
handlers []EventHandler
logger *logr.Logger
capacity int
}

func newEventProcessor(capacity int, logger *logr.Logger) *eventProcessor {
return &eventProcessor{
events: make(chan event, capacity),
handlers: []EventHandler{},
logger: logger,
capacity: capacity,
}
}

// initEventsChannel is a lazy initializer for the events channel as it has
// performance cost and not everyone has a use for it. eventMutex needs to be
// held before calling this method.
func (e *eventProcessor) initEventsChannel() {
if e.events == nil {
e.events = make(chan event, e.capacity)
}
}

Expand All @@ -1158,13 +1217,20 @@ func newEventProcessor(capacity int, logger *logr.Logger) *eventProcessor {
// to be processed by the client. Long Running handler functions adversely affect
// other handlers and MAY cause loss of data if the channel buffer is full
func (e *eventProcessor) AddEventHandler(handler EventHandler) {
e.handlersMutex.Lock()
defer e.handlersMutex.Unlock()
e.eventMutex.Lock()
defer e.eventMutex.Unlock()
e.initEventsChannel()
e.handlers = append(e.handlers, handler)
}

// AddEvent writes an event to the channel
func (e *eventProcessor) AddEvent(eventType string, table string, old model.Model, new model.Model) {
e.eventMutex.RLock()
hasEventChannel := e.events != nil
e.eventMutex.RUnlock()
if !hasEventChannel {
return
}
// We don't need to check for error here since there
// is only a single writer. RPC is run in blocking mode
event := event{
Expand All @@ -1187,12 +1253,15 @@ func (e *eventProcessor) AddEvent(eventType string, table string, old model.Mode
// Otherwise it will wait for events to arrive on the event channel
// Once received, it will dispatch the event to each registered handler
func (e *eventProcessor) Run(stopCh <-chan struct{}) {
e.eventMutex.Lock()
e.initEventsChannel()
e.eventMutex.Unlock()
for {
select {
case <-stopCh:
return
case event := <-e.events:
e.handlersMutex.Lock()
e.eventMutex.RLock()
for _, handler := range e.handlers {
switch event.eventType {
case addEvent:
Expand All @@ -1203,7 +1272,7 @@ func (e *eventProcessor) Run(stopCh <-chan struct{}) {
handler.OnDelete(event.table, event.old)
}
}
e.handlersMutex.Unlock()
e.eventMutex.RUnlock()
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,7 @@ func TestTableCachePopulate2BrokenIndexes(t *testing.T) {
func TestEventProcessor_AddEvent(t *testing.T) {
logger := logr.Discard()
ep := newEventProcessor(16, &logger)
ep.events = make(chan event, ep.capacity)
var events []event
for i := 0; i < 17; i++ {
events = append(events, event{
Expand Down
Loading