Skip to content

Commit

Permalink
fix: duplicate normalized requests (#12)
Browse files Browse the repository at this point in the history
* fix: duplicate normalized requests

* fix: duplicate normalized requests

* remove duplicate ctx.Done()

---------

Co-authored-by: kasteph <ayo@kasteph.com>
  • Loading branch information
dennis-tra and kasteph authored Oct 30, 2024
1 parent 58e9e7b commit d424936
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions queen.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,11 @@ func (q *Queen) Run(ctx context.Context) error {
defer logger.Debugln("Queen.Run completing")

go q.consumeAntsLogs(ctx)
go q.normalizeRequests(ctx)

crawlTime := time.NewTicker(CRAWL_INTERVAL)
defer crawlTime.Stop()

normalizationTime := time.NewTicker(NORMALIZATION_INTERVAL)
defer normalizationTime.Stop()

q.routine(ctx)

for {
Expand All @@ -216,11 +214,6 @@ func (q *Queen) Run(ctx context.Context) error {
return ctx.Err()
case <-crawlTime.C:
q.routine(ctx)
case <-normalizationTime.C:
go q.normalizeRequests(ctx)
default:
// busy-loop guard
time.Sleep(100 * time.Millisecond)
}
}
}
Expand Down Expand Up @@ -296,16 +289,25 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) {
}

func (q *Queen) normalizeRequests(ctx context.Context) {
nctx, ncancel := context.WithCancel(ctx)
defer ncancel()

logger.Info("Starting continuous normalization...")

err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc)
if err != nil {
logger.Errorf("Error during normalization: %w", err)
} else {
logger.Info("Normalization completed for current batch.")
normalizationTime := time.NewTicker(NORMALIZATION_INTERVAL)
defer normalizationTime.Stop()

for {
select {
case <-ctx.Done():
return
case <-normalizationTime.C:
// fall through
}

err := db.NormalizeRequests(ctx, q.dbc.Handler, q.dbc)
if err != nil {
logger.Errorf("Error during normalization: %s", err)
} else {
logger.Info("Normalization completed for current batch.")
}
}
}

Expand Down

0 comments on commit d424936

Please sign in to comment.