diff --git a/internal/agent/instance/destroy.go b/internal/agent/instance/destroy.go index 0e35e4f..3d1e8ce 100644 --- a/internal/agent/instance/destroy.go +++ b/internal/agent/instance/destroy.go @@ -26,7 +26,7 @@ func (m *Manager) Destroy(ctx context.Context, force bool) error { <-m.waitCh } - err := m.destroyImpl(ctx, core.OriginUser, "requested by user") + err := m.destroyImpl(ctx, core.OriginUser, force, "requested by user") if err != nil { return err } @@ -34,8 +34,8 @@ func (m *Manager) Destroy(ctx context.Context, force bool) error { return nil } -func (m *Manager) destroyImpl(ctx context.Context, origin core.Origin, reason string) error { - err := m.state.PushInstanceDestroyEvent(ctx, origin, reason) +func (m *Manager) destroyImpl(ctx context.Context, origin core.Origin, force bool, reason string) error { + err := m.state.PushInstanceDestroyEvent(ctx, origin, force, reason) if err != nil { return err } diff --git a/internal/agent/instance/eventer/eventer.go b/internal/agent/instance/eventer/eventer.go index 07043e0..12c18a9 100644 --- a/internal/agent/instance/eventer/eventer.go +++ b/internal/agent/instance/eventer/eventer.go @@ -18,22 +18,33 @@ type Eventer struct { toReport *deque.Deque[core.InstanceEvent] mutex sync.RWMutex - nc *nats.Conn - store *store.Store - notify chan struct{} - stopCh chan struct{} + nc *nats.Conn + store *store.Store + notify chan struct{} + stopCh chan struct{} + stoppedCh chan struct{} + stopOnce sync.Once } func NewEventer(unreportedEvents []core.InstanceEvent, machineId, instanceId string, nc *nats.Conn, store *store.Store) *Eventer { toReport := deque.New[core.InstanceEvent](len(unreportedEvents)) e := &Eventer{ - subject: "events." + machineId + "." + instanceId, - toReport: toReport, - nc: nc, - store: store, - notify: make(chan struct{}, 1), - stopCh: make(chan struct{}), + subject: "events." + machineId + "." + instanceId, + toReport: toReport, + nc: nc, + store: store, + notify: make(chan struct{}, 1), + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), + } + + for _, event := range unreportedEvents { + e.toReport.PushBack(event) + } + + if len(unreportedEvents) > 0 { + e.triggerNotify() } go e.start() @@ -58,17 +69,18 @@ func (e *Eventer) Report(event core.InstanceEvent) { } func (e *Eventer) Stop() { - e.stopCh <- struct{}{} -} - -func (e *Eventer) Start() { - go e.start() + e.stopOnce.Do(func() { + close(e.stopCh) + }) + <-e.stoppedCh } func (e *Eventer) start() { + slog.Debug("starting eventer", "instance", e.subject) for { select { case <-e.stopCh: + close(e.stoppedCh) return case <-e.notify: e.reportEvents() diff --git a/internal/agent/instance/prepare.go b/internal/agent/instance/prepare.go index 7d688c0..3394583 100644 --- a/internal/agent/instance/prepare.go +++ b/internal/agent/instance/prepare.go @@ -76,7 +76,7 @@ func (m *Manager) prepare() error { reason = "instance preparation failed after maximum retries" } - destroyErr := m.destroyImpl(ctx, core.OriginRavel, reason) + destroyErr := m.destroyImpl(ctx, core.OriginRavel, false, reason) if destroyErr != nil { slog.Error("failed to destroy instance after preparation failed", "error", destroyErr) return err diff --git a/internal/agent/instance/state/events.go b/internal/agent/instance/state/events.go index d424fd7..9365022 100644 --- a/internal/agent/instance/state/events.go +++ b/internal/agent/instance/state/events.go @@ -224,23 +224,21 @@ func (s *instanceState) PushInstanceExitedEvent(ctx context.Context, payload cor return nil } -func (s *instanceState) PushInstanceDestroyEvent(ctx context.Context, origin core.Origin, reason string) (err error) { +func (s *instanceState) PushInstanceDestroyEvent(ctx context.Context, origin core.Origin, force bool, reason string) (err error) { s.updateLock.Lock() defer s.updateLock.Unlock() - event := core.InstanceEvent{ - Id: eventId(), - Type: core.InstanceDestroy, - Origin: core.OriginRavel, - Payload: core.InstanceEventPayload{ + event := s.newInstanceEvent( + core.InstanceDestroy, + origin, + core.MachineStatusDestroying, + core.InstanceEventPayload{ Destroy: &core.InstanceDestroyEventPayload{ Reason: reason, + Force: force, }, }, - InstanceId: s.instance.Id, - Status: core.MachineStatusDestroying, - Timestamp: time.Now(), - } + ) s.instance.State.DesiredStatus = core.MachineStatusDestroyed @@ -255,21 +253,20 @@ func (s *instanceState) PushInstanceDestroyedEvent(ctx context.Context) (err err s.updateLock.Lock() defer s.updateLock.Unlock() - event := core.InstanceEvent{ - Id: eventId(), - Type: core.InstanceDestroyed, - Origin: core.OriginRavel, - Payload: core.InstanceEventPayload{ + event := s.newInstanceEvent( + core.InstanceDestroyed, + core.OriginRavel, + core.MachineStatusDestroyed, + core.InstanceEventPayload{ Destroyed: &core.InstanceDestroyedEventPayload{}, }, - InstanceId: s.instance.Id, - Status: core.MachineStatusDestroyed, - Timestamp: time.Now(), - } + ) if err = s.persistAndReportChange(event); err != nil { return } + s.eventer.Stop() + return nil } diff --git a/internal/agent/instance/state/instance_state.go b/internal/agent/instance/state/instance_state.go index 3762559..3b1c3fa 100644 --- a/internal/agent/instance/state/instance_state.go +++ b/internal/agent/instance/state/instance_state.go @@ -50,7 +50,7 @@ type InstanceState interface { PushInstanceStartFailedEvent(ctx context.Context, errMsg string) error PushInstanceExitedEvent(ctx context.Context, payload core.InstanceExitedEventPayload) error PushInstanceStopEvent(ctx context.Context) error - PushInstanceDestroyEvent(ctx context.Context, origin core.Origin, reason string) error + PushInstanceDestroyEvent(ctx context.Context, origin core.Origin, force bool, reason string) error PushInstanceDestroyedEvent(ctx context.Context) error } diff --git a/internal/agent/reservations/service.go b/internal/agent/reservations/service.go index 3a7edc8..fc4126c 100644 --- a/internal/agent/reservations/service.go +++ b/internal/agent/reservations/service.go @@ -2,6 +2,7 @@ package reservations import ( "context" + "log/slog" "net" "sync" "time" @@ -130,9 +131,12 @@ func (rs *ReservationService) GetReservation(id string) (structs.Reservation, er func (rs *ReservationService) DeleteReservation(id string) error { rs.lock.Lock() defer rs.lock.Unlock() + slog.Info("deleting reservation", "id", id) + defer slog.Info("deleted reservation", "id", id) reservation, ok := rs.reservations[id] if !ok { + slog.Warn("reservation not found", "id", id) return nil } @@ -144,6 +148,8 @@ func (rs *ReservationService) DeleteReservation(id string) error { return err } + slog.Info("deleting reservation from store", "id", id) + if err := rs.store.DeleteReservation(id); err != nil { return err } diff --git a/internal/agent/store/reservations.go b/internal/agent/store/reservations.go index a771e8f..621de77 100644 --- a/internal/agent/store/reservations.go +++ b/internal/agent/store/reservations.go @@ -47,6 +47,10 @@ func (s *Store) DeleteReservation(id string) error { return err } + if err = tx.Commit(); err != nil { + return err + } + return nil } diff --git a/pkg/agent/instance_create.go b/pkg/agent/instance_create.go index 6f34c59..9aa4a27 100644 --- a/pkg/agent/instance_create.go +++ b/pkg/agent/instance_create.go @@ -29,10 +29,12 @@ func (a *Agent) CreateInstance(ctx context.Context, opt core.CreateInstancePaylo config := opt.Config i := core.Instance{ - Id: id.GeneratePrefixed("instance"), - Namespace: opt.Namespace, - MachineId: opt.MachineId, - FleetId: opt.FleetId, + Id: id.GeneratePrefixed("instance"), + Namespace: opt.Namespace, + MachineId: opt.MachineId, + ReservationId: reservation.Id, + + FleetId: opt.FleetId, State: core.InstanceState{ DesiredStatus: desiredStatus, Status: core.MachineStatusCreated, diff --git a/pkg/agent/instances.go b/pkg/agent/instances.go index 8f19f17..ea009e1 100644 --- a/pkg/agent/instances.go +++ b/pkg/agent/instances.go @@ -33,11 +33,20 @@ func (s *Agent) DestroyInstance(ctx context.Context, id string, force bool) erro return err } - err = s.reservations.DeleteReservation(instance.Instance().MachineId) + err = s.reservations.DeleteReservation(instance.Instance().ReservationId) if err != nil { return err } + err = s.store.DestroyInstanceBucket(id) + if err != nil { + return err + } + + s.lock.Lock() + delete(s.instances, id) + s.lock.Unlock() + return nil }