Skip to content

Commit

Permalink
Fix TestManager_FakeInput_GoodUnitToBad (#3696) (#3698)
Browse files Browse the repository at this point in the history
(cherry picked from commit a621050)

Co-authored-by: Fae Charlton <fae.charlton@elastic.co>
  • Loading branch information
mergify[bot] and faec authored Nov 3, 2023
1 parent 2ff5f63 commit 6fb31d5
Showing 1 changed file with 95 additions and 98 deletions.
193 changes: 95 additions & 98 deletions pkg/component/runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,23 +1308,13 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) {
ai, _ := info.NewAgentInfo(ctx, true)
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
runResultChan := make(chan error, 1)
go func() {
err := m.Run(ctx)
if errors.Is(err, context.Canceled) {
err = nil
}
errCh <- err
runResultChan <- m.Run(ctx)
}()

waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second)
defer waitCancel()
if err := waitForReady(waitCtx, m); err != nil {
require.NoError(t, err)
}

binaryPath := testBinary(t, "component")
comp := component.Component{
healthyComp := component.Component{
ID: "fake-default",
InputSpec: &component.InputRuntimeSpec{
InputType: "fake",
Expand Down Expand Up @@ -1355,105 +1345,103 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) {
},
},
}
// unhealthyComp is a copy of healthyComp with an error inserted in the
// second unit
unhealthyComp := healthyComp
unhealthyComp.Units = make([]component.Unit, len(healthyComp.Units))
copy(unhealthyComp.Units, healthyComp.Units)
unhealthyComp.Units[1] = component.Unit{
ID: "good-input",
Type: client.UnitTypeInput,
Err: errors.New("hard-error for config"),
}
goodUnitKey := ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "good-input"}

subCtx, subCancel := context.WithCancel(context.Background())
defer subCancel()
subErrCh := make(chan error)
go func() {
unitGood := true

sub := m.Subscribe(subCtx, "fake-default")
for {
select {
case <-subCtx.Done():
return
case state := <-sub.Ch():
t.Logf("component state changed: %+v", state)
if state.State == client.UnitStateFailed {
subErrCh <- fmt.Errorf("component failed: %s", state.Message)
return
}

unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "good-input"}]
if !ok {
subErrCh <- errors.New("unit missing: good-input")
return
}
if unitGood {
switch unit.State {
case client.UnitStateFailed:
subErrCh <- fmt.Errorf("unit failed: %s", unit.Message)
return
case client.UnitStateHealthy:
// good unit it; now make it bad
t.Logf("marking good-input as having a hard-error for config")
updatedComp := comp
updatedComp.Units = make([]component.Unit, len(comp.Units))
copy(updatedComp.Units, comp.Units)
updatedComp.Units[1] = component.Unit{
ID: "good-input",
Type: client.UnitTypeInput,
Err: errors.New("hard-error for config"),
}
unitGood = false
err := m.Update(component.Model{Components: []component.Component{updatedComp}})
if err != nil {
subErrCh <- err
}
case client.UnitStateStarting:
// acceptable
default:
// unknown state that should not have occurred
subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State)
}
} else {
switch unit.State {
case client.UnitStateFailed:
// went to failed; stop whole component
err := m.Update(component.Model{Components: []component.Component{}})
if err != nil {
subErrCh <- err
}
case client.UnitStateStopped:
// unit was stopped
subErrCh <- nil
default:
subErrCh <- fmt.Errorf(
"good-input unit should be either failed or stopped, but it's '%s'",
unit.State)
}
}
// Wait for Manager to start up
timedWaitForReady(t, m, 1*time.Second)

}
}
}()
sub := m.Subscribe(ctx, "fake-default")

defer drainErrChan(errCh)
defer drainErrChan(subErrCh)
endTimer := time.NewTimer(30 * time.Second)
defer endTimer.Stop()

err = m.Update(component.Model{Components: []component.Component{comp}})
err = m.Update(component.Model{Components: []component.Component{healthyComp}})
require.NoError(t, err)

endTimer := time.NewTimer(30 * time.Second)
defer endTimer.Stop()
// nextState tracks the stage of the test. We expect the sequence
// Starting -> Healthy -> Failed -> Stopped.
nextState := client.UnitStateHealthy

LOOP:
for {
var state ComponentState
select {
case <-endTimer.C:
t.Fatalf("timed out after 30 seconds")
case err := <-errCh:
require.NoError(t, err)
case err := <-subErrCh:
require.NoError(t, err)
break LOOP
require.Fail(t, "timed out waiting for component state update")
case state = <-sub.Ch():
t.Logf("component state changed: %+v", state)
}

require.NotEqual(t, client.UnitStateFailed, state.State, "component should not fail")
unit, ok := state.Units[goodUnitKey]
require.True(t, ok, "unit good-input must be present")

if nextState == client.UnitStateHealthy {
// Waiting for unit to become healthy, if it's still starting skip
// to the next update
if unit.State == client.UnitStateStarting {
continue LOOP
}
if unit.State == client.UnitStateHealthy {
// good unit is healthy; now make it bad
t.Logf("marking good-input as having a hard-error for config")
err := m.Update(component.Model{Components: []component.Component{unhealthyComp}})
require.NoError(t, err, "Component model update should succeed")

// We next expect to transition to Failed
nextState = client.UnitStateFailed
} else {
// Unit should only be starting or healthy in this stage,
// anything else is an error.
require.FailNowf(t, "Incorrect state", "Expected STARTING or HEALTHY, got %v", unit.State)
}
} else if nextState == client.UnitStateFailed {
// Waiting for unit to fail, if it's still healthy skip to the next
// update
if unit.State == client.UnitStateHealthy {
continue LOOP
}
if unit.State == client.UnitStateFailed {
// Reached the expected state, now send an empty component model
// to stop everything.
err := m.Update(component.Model{Components: []component.Component{}})
require.NoError(t, err, "Component model update should succeed")
nextState = client.UnitStateStopped
} else {
// Unit should only be healthy or failed in this stage, anything
// else is an error.
require.FailNow(t, "Incorrect state", "Expected HEALTHY or FAILED, got %v", unit.State)
}
} else if nextState == client.UnitStateStopped {
// Waiting for component to stop, if it's still Failed skip to
// the next update
if unit.State == client.UnitStateFailed {
continue LOOP
}
if unit.State == client.UnitStateStopped {
// Success, we've finished the whole sequence
break LOOP
} else {
// Unit should only be failed or stopped in this stage, anything
// else is an error.
require.FailNowf(t, "Incorrect state", "Expected FAILED or STOPPED, got %v", unit.State)
}
}
}

subCancel()
cancel()

err = <-errCh
require.NoError(t, err)
err = <-runResultChan
require.Equal(t, context.Canceled, err, "Run should return with context canceled, got %v", err.Error())
}

func TestManager_FakeInput_NoDeadlock(t *testing.T) {
Expand Down Expand Up @@ -3671,3 +3659,12 @@ func waitForReady(ctx context.Context, m *Manager) error {
}
return nil
}

func timedWaitForReady(t *testing.T, m *Manager, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := waitForReady(ctx, m)
if err != nil {
require.FailNow(t, "timed out waiting for Manager to start")
}
}

0 comments on commit 6fb31d5

Please sign in to comment.