diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index 010c9f0478f..bb83de6fd04 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -1308,23 +1308,13 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) { ai, _ := info.NewAgentInfo(ctx, true) m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) - errCh := make(chan error) + runResultChan := make(chan error, 1) go func() { - err := m.Run(ctx) - if errors.Is(err, context.Canceled) { - err = nil - } - errCh <- err + runResultChan <- m.Run(ctx) }() - waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) - defer waitCancel() - if err := waitForReady(waitCtx, m); err != nil { - require.NoError(t, err) - } - binaryPath := testBinary(t, "component") - comp := component.Component{ + healthyComp := component.Component{ ID: "fake-default", InputSpec: &component.InputRuntimeSpec{ InputType: "fake", @@ -1355,105 +1345,103 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) { }, }, } + // unhealthyComp is a copy of healthyComp with an error inserted in the + // second unit + unhealthyComp := healthyComp + unhealthyComp.Units = make([]component.Unit, len(healthyComp.Units)) + copy(unhealthyComp.Units, healthyComp.Units) + unhealthyComp.Units[1] = component.Unit{ + ID: "good-input", + Type: client.UnitTypeInput, + Err: errors.New("hard-error for config"), + } + goodUnitKey := ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "good-input"} - subCtx, subCancel := context.WithCancel(context.Background()) - defer subCancel() - subErrCh := make(chan error) - go func() { - unitGood := true - - sub := m.Subscribe(subCtx, "fake-default") - for { - select { - case <-subCtx.Done(): - return - case state := <-sub.Ch(): - t.Logf("component state changed: %+v", state) - if state.State == client.UnitStateFailed { - subErrCh <- fmt.Errorf("component failed: %s", state.Message) - return - } - - unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "good-input"}] - if !ok { - subErrCh <- errors.New("unit missing: good-input") - return - } - if unitGood { - switch unit.State { - case client.UnitStateFailed: - subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - return - case client.UnitStateHealthy: - // good unit it; now make it bad - t.Logf("marking good-input as having a hard-error for config") - updatedComp := comp - updatedComp.Units = make([]component.Unit, len(comp.Units)) - copy(updatedComp.Units, comp.Units) - updatedComp.Units[1] = component.Unit{ - ID: "good-input", - Type: client.UnitTypeInput, - Err: errors.New("hard-error for config"), - } - unitGood = false - err := m.Update(component.Model{Components: []component.Component{updatedComp}}) - if err != nil { - subErrCh <- err - } - case client.UnitStateStarting: - // acceptable - default: - // unknown state that should not have occurred - subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) - } - } else { - switch unit.State { - case client.UnitStateFailed: - // went to failed; stop whole component - err := m.Update(component.Model{Components: []component.Component{}}) - if err != nil { - subErrCh <- err - } - case client.UnitStateStopped: - // unit was stopped - subErrCh <- nil - default: - subErrCh <- fmt.Errorf( - "good-input unit should be either failed or stopped, but it's '%s'", - unit.State) - } - } + // Wait for Manager to start up + timedWaitForReady(t, m, 1*time.Second) - } - } - }() + sub := m.Subscribe(ctx, "fake-default") - defer drainErrChan(errCh) - defer drainErrChan(subErrCh) + endTimer := time.NewTimer(30 * time.Second) + defer endTimer.Stop() - err = m.Update(component.Model{Components: []component.Component{comp}}) + err = m.Update(component.Model{Components: []component.Component{healthyComp}}) require.NoError(t, err) - endTimer := time.NewTimer(30 * time.Second) - defer endTimer.Stop() + // nextState tracks the stage of the test. We expect the sequence + // Starting -> Healthy -> Failed -> Stopped. + nextState := client.UnitStateHealthy + LOOP: for { + var state ComponentState select { case <-endTimer.C: - t.Fatalf("timed out after 30 seconds") - case err := <-errCh: - require.NoError(t, err) - case err := <-subErrCh: - require.NoError(t, err) - break LOOP + require.Fail(t, "timed out waiting for component state update") + case state = <-sub.Ch(): + t.Logf("component state changed: %+v", state) + } + + require.NotEqual(t, client.UnitStateFailed, state.State, "component should not fail") + unit, ok := state.Units[goodUnitKey] + require.True(t, ok, "unit good-input must be present") + + if nextState == client.UnitStateHealthy { + // Waiting for unit to become healthy, if it's still starting skip + // to the next update + if unit.State == client.UnitStateStarting { + continue LOOP + } + if unit.State == client.UnitStateHealthy { + // good unit is healthy; now make it bad + t.Logf("marking good-input as having a hard-error for config") + err := m.Update(component.Model{Components: []component.Component{unhealthyComp}}) + require.NoError(t, err, "Component model update should succeed") + + // We next expect to transition to Failed + nextState = client.UnitStateFailed + } else { + // Unit should only be starting or healthy in this stage, + // anything else is an error. + require.FailNowf(t, "Incorrect state", "Expected STARTING or HEALTHY, got %v", unit.State) + } + } else if nextState == client.UnitStateFailed { + // Waiting for unit to fail, if it's still healthy skip to the next + // update + if unit.State == client.UnitStateHealthy { + continue LOOP + } + if unit.State == client.UnitStateFailed { + // Reached the expected state, now send an empty component model + // to stop everything. + err := m.Update(component.Model{Components: []component.Component{}}) + require.NoError(t, err, "Component model update should succeed") + nextState = client.UnitStateStopped + } else { + // Unit should only be healthy or failed in this stage, anything + // else is an error. + require.FailNow(t, "Incorrect state", "Expected HEALTHY or FAILED, got %v", unit.State) + } + } else if nextState == client.UnitStateStopped { + // Waiting for component to stop, if it's still Failed skip to + // the next update + if unit.State == client.UnitStateFailed { + continue LOOP + } + if unit.State == client.UnitStateStopped { + // Success, we've finished the whole sequence + break LOOP + } else { + // Unit should only be failed or stopped in this stage, anything + // else is an error. + require.FailNowf(t, "Incorrect state", "Expected FAILED or STOPPED, got %v", unit.State) + } } } - subCancel() cancel() - - err = <-errCh - require.NoError(t, err) + err = <-runResultChan + require.Equal(t, context.Canceled, err, "Run should return with context canceled, got %v", err.Error()) } func TestManager_FakeInput_NoDeadlock(t *testing.T) { @@ -3671,3 +3659,12 @@ func waitForReady(ctx context.Context, m *Manager) error { } return nil } + +func timedWaitForReady(t *testing.T, m *Manager, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + err := waitForReady(ctx, m) + if err != nil { + require.FailNow(t, "timed out waiting for Manager to start") + } +}