diff --git a/queen.go b/queen.go index e385481..9aaa05f 100644 --- a/queen.go +++ b/queen.go @@ -199,13 +199,11 @@ func (q *Queen) Run(ctx context.Context) error { defer logger.Debugln("Queen.Run completing") go q.consumeAntsLogs(ctx) + go q.normalizeRequests(ctx) crawlTime := time.NewTicker(CRAWL_INTERVAL) defer crawlTime.Stop() - normalizationTime := time.NewTicker(NORMALIZATION_INTERVAL) - defer normalizationTime.Stop() - q.routine(ctx) for { @@ -216,11 +214,6 @@ func (q *Queen) Run(ctx context.Context) error { return ctx.Err() case <-crawlTime.C: q.routine(ctx) - case <-normalizationTime.C: - go q.normalizeRequests(ctx) - default: - // busy-loop guard - time.Sleep(100 * time.Millisecond) } } } @@ -296,16 +289,25 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { } func (q *Queen) normalizeRequests(ctx context.Context) { - nctx, ncancel := context.WithCancel(ctx) - defer ncancel() - logger.Info("Starting continuous normalization...") - err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc) - if err != nil { - logger.Errorf("Error during normalization: %w", err) - } else { - logger.Info("Normalization completed for current batch.") + normalizationTime := time.NewTicker(NORMALIZATION_INTERVAL) + defer normalizationTime.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-normalizationTime.C: + // fall through + } + + err := db.NormalizeRequests(ctx, q.dbc.Handler, q.dbc) + if err != nil { + logger.Errorf("Error during normalization: %s", err) + } else { + logger.Info("Normalization completed for current batch.") + } } }