Skip to content

Commit

Permalink
fix: unintended behaviour on instance deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
lucas-jacques committed Oct 23, 2024
1 parent feec059 commit 59aa6b5
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 44 deletions.
6 changes: 3 additions & 3 deletions internal/agent/instance/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ func (m *Manager) Destroy(ctx context.Context, force bool) error {
<-m.waitCh
}

err := m.destroyImpl(ctx, core.OriginUser, "requested by user")
err := m.destroyImpl(ctx, core.OriginUser, force, "requested by user")
if err != nil {
return err
}

return nil
}

func (m *Manager) destroyImpl(ctx context.Context, origin core.Origin, reason string) error {
err := m.state.PushInstanceDestroyEvent(ctx, origin, reason)
func (m *Manager) destroyImpl(ctx context.Context, origin core.Origin, force bool, reason string) error {
err := m.state.PushInstanceDestroyEvent(ctx, origin, force, reason)
if err != nil {
return err
}
Expand Down
42 changes: 27 additions & 15 deletions internal/agent/instance/eventer/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,33 @@ type Eventer struct {
toReport *deque.Deque[core.InstanceEvent]
mutex sync.RWMutex

nc *nats.Conn
store *store.Store
notify chan struct{}
stopCh chan struct{}
nc *nats.Conn
store *store.Store
notify chan struct{}
stopCh chan struct{}
stoppedCh chan struct{}
stopOnce sync.Once
}

func NewEventer(unreportedEvents []core.InstanceEvent, machineId, instanceId string, nc *nats.Conn, store *store.Store) *Eventer {
toReport := deque.New[core.InstanceEvent](len(unreportedEvents))

e := &Eventer{
subject: "events." + machineId + "." + instanceId,
toReport: toReport,
nc: nc,
store: store,
notify: make(chan struct{}, 1),
stopCh: make(chan struct{}),
subject: "events." + machineId + "." + instanceId,
toReport: toReport,
nc: nc,
store: store,
notify: make(chan struct{}, 1),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}

for _, event := range unreportedEvents {
e.toReport.PushBack(event)
}

if len(unreportedEvents) > 0 {
e.triggerNotify()
}

go e.start()
Expand All @@ -58,17 +69,18 @@ func (e *Eventer) Report(event core.InstanceEvent) {
}

func (e *Eventer) Stop() {
e.stopCh <- struct{}{}
}

func (e *Eventer) Start() {
go e.start()
e.stopOnce.Do(func() {
close(e.stopCh)
})
<-e.stoppedCh
}

func (e *Eventer) start() {
slog.Debug("starting eventer", "instance", e.subject)
for {
select {
case <-e.stopCh:
close(e.stoppedCh)
return
case <-e.notify:
e.reportEvents()
Expand Down
2 changes: 1 addition & 1 deletion internal/agent/instance/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (m *Manager) prepare() error {
reason = "instance preparation failed after maximum retries"
}

destroyErr := m.destroyImpl(ctx, core.OriginRavel, reason)
destroyErr := m.destroyImpl(ctx, core.OriginRavel, false, reason)
if destroyErr != nil {
slog.Error("failed to destroy instance after preparation failed", "error", destroyErr)
return err
Expand Down
35 changes: 16 additions & 19 deletions internal/agent/instance/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,21 @@ func (s *instanceState) PushInstanceExitedEvent(ctx context.Context, payload cor
return nil
}

func (s *instanceState) PushInstanceDestroyEvent(ctx context.Context, origin core.Origin, reason string) (err error) {
func (s *instanceState) PushInstanceDestroyEvent(ctx context.Context, origin core.Origin, force bool, reason string) (err error) {
s.updateLock.Lock()
defer s.updateLock.Unlock()

event := core.InstanceEvent{
Id: eventId(),
Type: core.InstanceDestroy,
Origin: core.OriginRavel,
Payload: core.InstanceEventPayload{
event := s.newInstanceEvent(
core.InstanceDestroy,
origin,
core.MachineStatusDestroying,
core.InstanceEventPayload{
Destroy: &core.InstanceDestroyEventPayload{
Reason: reason,
Force: force,
},
},
InstanceId: s.instance.Id,
Status: core.MachineStatusDestroying,
Timestamp: time.Now(),
}
)

s.instance.State.DesiredStatus = core.MachineStatusDestroyed

Expand All @@ -255,21 +253,20 @@ func (s *instanceState) PushInstanceDestroyedEvent(ctx context.Context) (err err
s.updateLock.Lock()
defer s.updateLock.Unlock()

event := core.InstanceEvent{
Id: eventId(),
Type: core.InstanceDestroyed,
Origin: core.OriginRavel,
Payload: core.InstanceEventPayload{
event := s.newInstanceEvent(
core.InstanceDestroyed,
core.OriginRavel,
core.MachineStatusDestroyed,
core.InstanceEventPayload{
Destroyed: &core.InstanceDestroyedEventPayload{},
},
InstanceId: s.instance.Id,
Status: core.MachineStatusDestroyed,
Timestamp: time.Now(),
}
)

if err = s.persistAndReportChange(event); err != nil {
return
}

s.eventer.Stop()

return nil
}
2 changes: 1 addition & 1 deletion internal/agent/instance/state/instance_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type InstanceState interface {
PushInstanceStartFailedEvent(ctx context.Context, errMsg string) error
PushInstanceExitedEvent(ctx context.Context, payload core.InstanceExitedEventPayload) error
PushInstanceStopEvent(ctx context.Context) error
PushInstanceDestroyEvent(ctx context.Context, origin core.Origin, reason string) error
PushInstanceDestroyEvent(ctx context.Context, origin core.Origin, force bool, reason string) error
PushInstanceDestroyedEvent(ctx context.Context) error
}

Expand Down
6 changes: 6 additions & 0 deletions internal/agent/reservations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reservations

import (
"context"
"log/slog"
"net"
"sync"
"time"
Expand Down Expand Up @@ -130,9 +131,12 @@ func (rs *ReservationService) GetReservation(id string) (structs.Reservation, er
func (rs *ReservationService) DeleteReservation(id string) error {
rs.lock.Lock()
defer rs.lock.Unlock()
slog.Info("deleting reservation", "id", id)
defer slog.Info("deleted reservation", "id", id)

reservation, ok := rs.reservations[id]
if !ok {
slog.Warn("reservation not found", "id", id)
return nil
}

Expand All @@ -144,6 +148,8 @@ func (rs *ReservationService) DeleteReservation(id string) error {
return err
}

slog.Info("deleting reservation from store", "id", id)

if err := rs.store.DeleteReservation(id); err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions internal/agent/store/reservations.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (s *Store) DeleteReservation(id string) error {
return err
}

if err = tx.Commit(); err != nil {
return err
}

return nil
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/agent/instance_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ func (a *Agent) CreateInstance(ctx context.Context, opt core.CreateInstancePaylo

config := opt.Config
i := core.Instance{
Id: id.GeneratePrefixed("instance"),
Namespace: opt.Namespace,
MachineId: opt.MachineId,
FleetId: opt.FleetId,
Id: id.GeneratePrefixed("instance"),
Namespace: opt.Namespace,
MachineId: opt.MachineId,
ReservationId: reservation.Id,

FleetId: opt.FleetId,
State: core.InstanceState{
DesiredStatus: desiredStatus,
Status: core.MachineStatusCreated,
Expand Down
11 changes: 10 additions & 1 deletion pkg/agent/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,20 @@ func (s *Agent) DestroyInstance(ctx context.Context, id string, force bool) erro
return err
}

err = s.reservations.DeleteReservation(instance.Instance().MachineId)
err = s.reservations.DeleteReservation(instance.Instance().ReservationId)
if err != nil {
return err
}

err = s.store.DestroyInstanceBucket(id)
if err != nil {
return err
}

s.lock.Lock()
delete(s.instances, id)
s.lock.Unlock()

return nil
}

Expand Down

0 comments on commit 59aa6b5

Please sign in to comment.