From ab5e07aff66a74f85cca806c8deeb9002a0285cd Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 27 Nov 2024 17:49:59 +0000 Subject: [PATCH] Store AST directly in provider mappings (#6114) (#6158) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Store AST directly in dynamic provider mappings * Store AST directly in context provider mappings (cherry picked from commit fe0f6b00790ffac072c0cb2a23492bf66d309444) Co-authored-by: Mikołaj Świątek --- internal/pkg/agent/transpiler/ast.go | 3 ++ internal/pkg/composable/benchmark_test.go | 10 ++-- internal/pkg/composable/controller.go | 60 ++++++----------------- 3 files changed, 26 insertions(+), 47 deletions(-) diff --git a/internal/pkg/agent/transpiler/ast.go b/internal/pkg/agent/transpiler/ast.go index c8441e87efb..1fae370ce40 100644 --- a/internal/pkg/agent/transpiler/ast.go +++ b/internal/pkg/agent/transpiler/ast.go @@ -825,6 +825,9 @@ func (a *AST) HashStr() string { // Equal check if two AST are equals by using the computed hash. func (a *AST) Equal(other *AST) bool { + if a.root == nil || other.root == nil { + return a.root == other.root + } return bytes.Equal(a.Hash(), other.Hash()) } diff --git a/internal/pkg/composable/benchmark_test.go b/internal/pkg/composable/benchmark_test.go index 87b59442934..fec6e797a0f 100644 --- a/internal/pkg/composable/benchmark_test.go +++ b/internal/pkg/composable/benchmark_test.go @@ -5,12 +5,13 @@ package composable import ( - "maps" "os" "path/filepath" "strings" "testing" + "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + "gopkg.in/yaml.v3" "k8s.io/apimachinery/pkg/util/uuid" @@ -54,7 +55,8 @@ func BenchmarkGenerateVars100Pods(b *testing.B) { mappings: make(map[string]dynamicProviderMapping), } for i := 0; i < podCount; i++ { - podData := maps.Clone(providerMapping) + podData, err := transpiler.NewAST(providerMapping) + require.NoError(b, err) podUID := uuid.NewUUID() podMapping := dynamicProviderMapping{ mapping: podData, @@ -63,8 +65,10 @@ func BenchmarkGenerateVars100Pods(b *testing.B) { } c.dynamicProviders[providerName] = providerState } else { + providerAst, err := transpiler.NewAST(providerData[providerName]) + require.NoError(b, err) providerState := &contextProviderState{ - mapping: providerData[providerName], + mapping: providerAst, } c.contextProviders[providerName] = providerState } diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index a5005bce970..1e09719dc13 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -79,10 +79,12 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) if err != nil { return nil, errors.New(err, fmt.Sprintf("failed to build provider '%s'", name), errors.TypeConfig, errors.M("provider", name)) } + emptyMapping, _ := transpiler.NewAST(nil) contextProviders[name] = &contextProviderState{ // Safe for Context to be nil here because it will be filled in // by (*controller).Run before the provider is started. provider: provider, + mapping: emptyMapping, } } @@ -275,20 +277,17 @@ func (c *controller) Close() { func (c *controller) generateVars(fetchContextProviders mapstr.M) []*transpiler.Vars { // build the vars list of mappings vars := make([]*transpiler.Vars, 1) - mapping := map[string]interface{}{} + mapping, _ := transpiler.NewAST(map[string]any{}) for name, state := range c.contextProviders { - mapping[name] = state.Current() + _ = mapping.Insert(state.Current(), name) } - // this is ensured not to error, by how the mappings states are verified - mappingAst, _ := transpiler.NewAST(mapping) - vars[0] = transpiler.NewVarsFromAst("", mappingAst, fetchContextProviders) + vars[0] = transpiler.NewVarsFromAst("", mapping, fetchContextProviders) // add to the vars list for each dynamic providers mappings for name, state := range c.dynamicProviders { for _, mappings := range state.Mappings() { - local := mappingAst.ShallowClone() - dynamicAst, _ := transpiler.NewAST(mappings.mapping) - _ = local.Insert(dynamicAst, name) + local := mapping.ShallowClone() + _ = local.Insert(mappings.mapping, name) id := fmt.Sprintf("%s-%s", name, mappings.id) v := transpiler.NewVarsWithProcessorsFromAst(id, local, name, mappings.processors, fetchContextProviders) vars = append(vars, v) @@ -302,7 +301,7 @@ type contextProviderState struct { provider corecomp.ContextProvider lock sync.RWMutex - mapping map[string]interface{} + mapping *transpiler.AST signal chan bool } @@ -324,12 +323,7 @@ func (c *contextProviderState) Signal() { // Set sets the current mapping. func (c *contextProviderState) Set(mapping map[string]interface{}) error { var err error - mapping, err = cloneMap(mapping) - if err != nil { - return err - } - // ensure creating vars will not error - _, err = transpiler.NewVars("", mapping, nil) + ast, err := transpiler.NewAST(mapping) if err != nil { return err } @@ -337,17 +331,17 @@ func (c *contextProviderState) Set(mapping map[string]interface{}) error { c.lock.Lock() defer c.lock.Unlock() - if reflect.DeepEqual(c.mapping, mapping) { + if c.mapping != nil && c.mapping.Equal(ast) { // same mapping; no need to update and signal return nil } - c.mapping = mapping + c.mapping = ast c.Signal() return nil } // Current returns the current mapping. -func (c *contextProviderState) Current() map[string]interface{} { +func (c *contextProviderState) Current() *transpiler.AST { c.lock.RLock() defer c.lock.RUnlock() return c.mapping @@ -356,7 +350,7 @@ func (c *contextProviderState) Current() map[string]interface{} { type dynamicProviderMapping struct { id string priority int - mapping map[string]interface{} + mapping *transpiler.AST processors transpiler.Processors } @@ -376,31 +370,25 @@ type dynamicProviderState struct { // to ensure that matching of variables occurs on the lower priority mappings first. func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error { var err error - mapping, err = cloneMap(mapping) - if err != nil { - return err - } processors, err = cloneMapArray(processors) if err != nil { return err } - // ensure creating vars will not error - _, err = transpiler.NewVars("", mapping, nil) + ast, err := transpiler.NewAST(mapping) if err != nil { return err } - c.lock.Lock() defer c.lock.Unlock() curr, ok := c.mappings[id] - if ok && reflect.DeepEqual(curr.mapping, mapping) && reflect.DeepEqual(curr.processors, processors) { + if ok && curr.mapping.Equal(ast) && reflect.DeepEqual(curr.processors, processors) { // same mapping; no need to update and signal return nil } c.mappings[id] = dynamicProviderMapping{ id: id, priority: priority, - mapping: mapping, + mapping: ast, processors: processors, } @@ -458,22 +446,6 @@ func (c *dynamicProviderState) Mappings() []dynamicProviderMapping { return mappings } -func cloneMap(source map[string]interface{}) (map[string]interface{}, error) { - if source == nil { - return nil, nil - } - bytes, err := json.Marshal(source) - if err != nil { - return nil, fmt.Errorf("failed to clone: %w", err) - } - var dest map[string]interface{} - err = json.Unmarshal(bytes, &dest) - if err != nil { - return nil, fmt.Errorf("failed to clone: %w", err) - } - return dest, nil -} - func cloneMapArray(source []map[string]interface{}) ([]map[string]interface{}, error) { if source == nil { return nil, nil