Skip to content

Commit

Permalink
fix: queue graceful shutdown (#49)
Browse files Browse the repository at this point in the history
* fix: queue graceful shutdown

* chore: improve test queue
  • Loading branch information
rafaelcr authored Dec 2, 2022
1 parent c68d85c commit 20427b0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
21 changes: 11 additions & 10 deletions src/token-processor/queue/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export class JobQueue {
private readonly db: PgStore;
/** IDs of jobs currently being processed by the queue. */
private jobIds: Set<number>;
private isRunning = false;

constructor(args: { db: PgStore }) {
this.db = args.db;
Expand All @@ -51,18 +52,19 @@ export class JobQueue {
*/
start() {
console.log(`JobQueue starting queue...`);
this.isRunning = true;
this.queue.start();
void this.runQueueLoop();
}

/**
* Shuts down the queue by clearing it and waiting for its current work to be complete.
* Shuts down the queue and waits for its current work to be complete.
*/
async close() {
console.log(`JobQueue closing, waiting on ${this.queue.pending} jobs to finish...`);
this.queue.clear();
this.queue.pause();
this.isRunning = false;
await this.queue.onIdle();
this.queue.pause();
}

/**
Expand All @@ -72,12 +74,11 @@ export class JobQueue {
* @param job - A row from the `jobs` DB table that needs processing
*/
protected async add(job: DbJob): Promise<void> {
if (this.queue.size + this.queue.pending >= ENV.JOB_QUEUE_SIZE_LIMIT) {
// To avoid memory errors, we won't add this job to the queue. It will be processed later when
// the empty queue gets replenished with pending jobs.
return;
}
if (this.jobIds.has(job.id)) {
if (
!this.isRunning ||
this.jobIds.has(job.id) ||
this.queue.size + this.queue.pending >= ENV.JOB_QUEUE_SIZE_LIMIT
) {
return;
}
this.jobIds.add(job.id);
Expand Down Expand Up @@ -124,7 +125,7 @@ export class JobQueue {
* processing, repeating this cycle until the jobs table is completely processed.
*/
private async runQueueLoop() {
while (!this.queue.isPaused) {
while (this.isRunning) {
try {
const loadedJobs = await this.addJobBatch();
if (loadedJobs === 0) {
Expand Down
4 changes: 4 additions & 0 deletions tests/job-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import { JobQueue } from '../src/token-processor/queue/job-queue';
import { cycleMigrations } from './helpers';

class TestJobQueue extends JobQueue {
constructor(args: { db: PgStore }) {
super(args);
this['isRunning'] = true; // Simulate a running queue.
}
async testAdd(job: DbJob): Promise<void> {
return this.add(job);
}
Expand Down

0 comments on commit 20427b0

Please sign in to comment.