diff --git a/cache/cache.go b/cache/cache.go index 45a9c158..1a8dc365 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -703,6 +703,39 @@ func (t *TableCache) Purge(dbModel model.DatabaseModel) { } } +// PurgeTable drops all data in the given table's cache and reinitializes it using the +// provided database model +func (t *TableCache) PurgeTable(dbModel model.DatabaseModel, name string) error { + return t.PurgeTableRows(dbModel, name, nil) +} + +// PurgeTableRows drops all rows in the given table's cache that match the given conditions +func (t *TableCache) PurgeTableRows(dbModel model.DatabaseModel, name string, conditions []ovsdb.Condition) error { + t.mutex.Lock() + defer t.mutex.Unlock() + t.dbModel = dbModel + tableTypes := t.dbModel.Types() + dataType, ok := tableTypes[name] + if !ok { + return fmt.Errorf("table %s not found", name) + } + if len(conditions) == 0 { + t.cache[name] = newRowCache(name, t.dbModel, dataType) + return nil + } + + r := t.cache[name] + rows, err := r.RowsByCondition(conditions) + if err != nil { + return err + } + for uuid := range rows { + r.Delete(uuid) + } + + return nil +} + // AddEventHandler registers the supplied EventHandler to receive cache events func (t *TableCache) AddEventHandler(handler EventHandler) { t.eventProcessor.AddEventHandler(handler) diff --git a/client/client.go b/client/client.go index b2cc81dc..e8191622 100644 --- a/client/client.go +++ b/client/client.go @@ -199,6 +199,58 @@ func (o *ovsdbClient) Connect(ctx context.Context) error { return nil } +func (db *database) purge() { + // If a table has any !Since monitors or has no conditions, purge it + // If a table only has Since monitors with conditions, purge only rows that match the conditions + type purge struct { + conditions []ovsdb.Condition + purgeAll bool + } + + purges := make(map[string]*purge) + for _, monitor := range db.monitors { + for _, tm := range monitor.Tables { + p, ok := purges[tm.Table] + if !ok { + p = &purge{} + purges[tm.Table] = p + } + if monitor.Method == ovsdb.ConditionalMonitorSinceRPC { + model, err := db.model.NewModel(tm.Table) + if err != nil { + p.purgeAll = true + continue + } + info, err := db.model.NewModelInfo(model) + if err != nil { + p.purgeAll = true + continue + } + ovsdbCond, err := db.model.Mapper.NewCondition(info, tm.Condition.Field, tm.Condition.Function, tm.Condition.Value) + if err != nil { + p.purgeAll = true + continue + } + p.conditions = append(p.conditions, *ovsdbCond) + } else { + p.purgeAll = true + } + } + } + if len(purges) == 0 { + db.cache.Purge(db.model) + return + } + + for name, p := range purges { + if p.purgeAll { + db.cache.PurgeTable(db.model, name) + } else { + db.cache.PurgeTableRows(db.model, name, p.conditions) + } + } +} + func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { o.rpcMutex.Lock() defer o.rpcMutex.Unlock() @@ -243,6 +295,8 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { for dbName, db := range o.databases { db.monitorsMutex.Lock() defer db.monitorsMutex.Unlock() + + db.purge() for id, request := range db.monitors { err := o.monitor(ctx, MonitorCookie{DatabaseName: dbName, ID: id}, true, request) if err != nil { @@ -349,8 +403,6 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error { return err } db.api = newAPI(db.cache, o.logger) - } else { - db.cache.Purge(db.model) } db.cacheMutex.Unlock() } @@ -832,12 +884,7 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne var args []interface{} if monitor.Method == ovsdb.ConditionalMonitorSinceRPC { - // FIXME: We should pass the monitor.LastTransactionID here - // But that would require delaying clearing the cache until - // after the monitors have been re-established - the logic - // would also need to be different for monitor and monitor_cond - // as we must always clear the cache in that instance - args = ovsdb.NewMonitorCondSinceArgs(dbName, cookie, requests, emptyUUID) + args = ovsdb.NewMonitorCondSinceArgs(dbName, cookie, requests, monitor.LastTransactionID) } else { args = ovsdb.NewMonitorArgs(dbName, cookie, requests) }