Skip to content

Commit

Permalink
feat: add configurable delay for retrying jobs (#257)
Browse files Browse the repository at this point in the history
* feat: retry after

* fix: migration

* fix: retry after rate limit
  • Loading branch information
rafaelcr authored Aug 30, 2024
1 parent 46d8fc8 commit 20d753a
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 43 deletions.
12 changes: 12 additions & 0 deletions migrations/1725039927416_job-retry-after.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate';

export const shorthands: ColumnDefinitions | undefined = undefined;

export function up(pgm: MigrationBuilder): void {
pgm.addColumn('jobs', {
retry_after: {
type: 'timestamptz',
},
});
}
4 changes: 3 additions & 1 deletion src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ const schema = Type.Object({
JOB_QUEUE_SIZE_LIMIT: Type.Number({ default: 200 }),
/** Maximum time a job will run before marking it as failed. */
JOB_QUEUE_TIMEOUT_MS: Type.Number({ default: 60_000 }),
/** Minimum time we will wait to retry a job after it's been executed. */
JOB_QUEUE_RETRY_AFTER_MS: Type.Number({ default: 5_000 }),

/**
* The max number of immediate attempts that will be made to retrieve metadata from external URIs
Expand Down Expand Up @@ -118,7 +120,7 @@ const schema = Type.Object({
* next request that is sent to it (seconds). This value will be overridden by the `Retry-After`
* header returned by the domain, if any.
*/
METADATA_RATE_LIMITED_HOST_RETRY_AFTER: Type.Number({ default: 3600 }), // 1 hour
METADATA_RATE_LIMITED_HOST_RETRY_AFTER: Type.Number({ default: 60 }), // 1 minute
/**
* Maximum number of HTTP redirections to follow when fetching metadata. Defaults to 5.
*/
Expand Down
24 changes: 18 additions & 6 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,16 @@ export class PgStore extends BasePgStore {
await this.sql`
UPDATE jobs
SET status = ${args.status},
invalid_reason = ${args.invalidReason ? args.invalidReason : this.sql`NULL`},
invalid_reason = ${
args.status == DbJobStatus.invalid && args.invalidReason
? args.invalidReason
: this.sql`NULL`
},
${
args.status != DbJobStatus.pending
? this.sql`retry_count = 0, retry_after = NULL,`
: this.sql``
}
updated_at = NOW()
WHERE id = ${args.id}
`;
Expand All @@ -216,30 +225,33 @@ export class PgStore extends BasePgStore {
async retryAllFailedJobs(): Promise<void> {
await this.sql`
UPDATE jobs
SET status = ${DbJobStatus.pending}, retry_count = 0, updated_at = NOW()
SET status = ${DbJobStatus.pending}, retry_count = 0, updated_at = NOW(), retry_after = NULL
WHERE status IN (${DbJobStatus.failed}, ${DbJobStatus.invalid})
`;
}

async increaseJobRetryCount(args: { id: number }): Promise<number> {
async increaseJobRetryCount(args: { id: number; retry_after: number }): Promise<number> {
const retryAfter = args.retry_after.toString();
const result = await this.sql<{ retry_count: number }[]>`
UPDATE jobs
SET retry_count = retry_count + 1, updated_at = NOW()
SET retry_count = retry_count + 1,
updated_at = NOW(),
retry_after = NOW() + INTERVAL '${this.sql(retryAfter)} ms'
WHERE id = ${args.id}
RETURNING retry_count
`;
return result[0].retry_count;
}

/**
* Retrieves a number of queued jobs so they can be processed immediately.
* Retrieves a number of pending jobs so they can be processed immediately.
* @param limit - number of jobs to retrieve
* @returns `DbJob[]`
*/
async getPendingJobBatch(args: { limit: number }): Promise<DbJob[]> {
return this.sql<DbJob[]>`
SELECT ${this.sql(JOBS_COLUMNS)} FROM jobs
WHERE status = 'pending'
WHERE status = 'pending' AND (retry_after IS NULL OR retry_after < NOW())
ORDER BY COALESCE(updated_at, created_at) ASC
LIMIT ${args.limit}
`;
Expand Down
2 changes: 2 additions & 0 deletions src/pg/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export type DbJob = {
retry_count: number;
created_at: string;
updated_at?: string;
retry_after?: string;
};

export type DbUpdateNotification = {
Expand Down Expand Up @@ -285,6 +286,7 @@ export const JOBS_COLUMNS = [
'retry_count',
'created_at',
'updated_at',
'retry_after',
];

export const METADATA_COLUMNS = [
Expand Down
20 changes: 18 additions & 2 deletions src/token-processor/queue/job/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { logger, resolveOrTimeout, stopwatch } from '@hirosystems/api-toolkit';
import { ENV } from '../../../env';
import { PgStore } from '../../../pg/pg-store';
import { DbJob, DbJobInvalidReason, DbJobStatus } from '../../../pg/types';
import { getUserErrorInvalidReason, UserError } from '../../util/errors';
import { getUserErrorInvalidReason, TooManyRequestsHttpError, UserError } from '../../util/errors';
import { RetryableJobError } from '../errors';
import { getJobQueueProcessingMode, JobQueueProcessingMode } from '../helpers';

Expand Down Expand Up @@ -52,7 +52,16 @@ export abstract class Job {
}
} catch (error) {
if (error instanceof RetryableJobError) {
const retries = await this.db.increaseJobRetryCount({ id: this.job.id });
let retry_after = ENV.JOB_QUEUE_RETRY_AFTER_MS;
// If we got rate limited, save this host so we can skip further calls even from jobs for
// other tokens.
if (error.cause instanceof TooManyRequestsHttpError) {
await this.saveRateLimitedHost(error.cause);
if (error.cause.retryAfter) {
retry_after = error.cause.retryAfter * 1_000;
}
}
const retries = await this.db.increaseJobRetryCount({ id: this.job.id, retry_after });
if (
getJobQueueProcessingMode() === JobQueueProcessingMode.strict ||
retries <= ENV.JOB_QUEUE_MAX_RETRIES
Expand Down Expand Up @@ -95,4 +104,11 @@ export abstract class Job {
return false;
}
}

private async saveRateLimitedHost(error: TooManyRequestsHttpError) {
const hostname = error.url.hostname;
const retryAfter = error.retryAfter ?? ENV.METADATA_RATE_LIMITED_HOST_RETRY_AFTER;
logger.info(`Job saving rate limited host ${hostname}, retry after ${retryAfter}s`);
await this.db.insertRateLimitedHost({ values: { hostname, retry_after: retryAfter } });
}
}
36 changes: 10 additions & 26 deletions src/token-processor/queue/job/process-token-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,16 @@ export class ProcessTokenJob extends Job {
contractPrincipal: contract.principal,
});
logger.info(`ProcessTokenJob processing ${this.description()}`);
try {
switch (token.type) {
case DbTokenType.ft:
await this.handleFt(client, token, contract);
break;
case DbTokenType.nft:
await this.handleNft(client, token, contract);
break;
case DbTokenType.sft:
await this.handleSft(client, token, contract);
break;
}
} catch (error) {
// If we got rate limited, save this host so we can skip further calls even from jobs for
// other tokens.
if (error instanceof RetryableJobError && error.cause instanceof TooManyRequestsHttpError) {
await this.saveRateLimitedHost(error.cause);
}
throw error;
switch (token.type) {
case DbTokenType.ft:
await this.handleFt(client, token, contract);
break;
case DbTokenType.nft:
await this.handleNft(client, token, contract);
break;
case DbTokenType.sft:
await this.handleSft(client, token, contract);
break;
}
}

Expand Down Expand Up @@ -191,13 +182,6 @@ export class ProcessTokenJob extends Job {
await this.db.updateProcessedTokenWithMetadata({ id: token.id, values: tokenValues });
}

private async saveRateLimitedHost(error: TooManyRequestsHttpError) {
const hostname = error.url.hostname;
const retryAfter = error.retryAfter ?? ENV.METADATA_RATE_LIMITED_HOST_RETRY_AFTER;
logger.info(`ProcessTokenJob saving rate limited host ${hostname}, retry after ${retryAfter}s`);
await this.db.insertRateLimitedHost({ values: { hostname, retry_after: retryAfter } });
}

private async getTokenUri(
client: StacksNodeRpcClient,
tokenNumber?: bigint
Expand Down
28 changes: 21 additions & 7 deletions tests/token-queue/job.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { cycleMigrations } from '@hirosystems/api-toolkit';
import { cycleMigrations, timeout } from '@hirosystems/api-toolkit';
import { ENV } from '../../src/env';
import { MIGRATIONS_DIR, PgStore } from '../../src/pg/pg-store';
import { DbJob, DbSipNumber, DbSmartContractInsert } from '../../src/pg/types';
Expand Down Expand Up @@ -64,14 +64,14 @@ describe('Job', () => {
const job = new TestRetryableJob({ db, job: dbJob });

await expect(job.work()).resolves.not.toThrow();
const jobs1 = await db.getPendingJobBatch({ limit: 1 });
expect(jobs1[0].retry_count).toBe(1);
expect(jobs1[0].status).toBe('pending');
const jobs1 = await db.getJob({ id: 1 });
expect(jobs1?.retry_count).toBe(1);
expect(jobs1?.status).toBe('pending');

await expect(job.work()).resolves.not.toThrow();
const jobs2 = await db.getPendingJobBatch({ limit: 1 });
expect(jobs2[0].retry_count).toBe(2);
expect(jobs2[0].status).toBe('pending');
const jobs2 = await db.getJob({ id: 1 });
expect(jobs2?.retry_count).toBe(2);
expect(jobs2?.status).toBe('pending');
});

test('user error marks job invalid', async () => {
Expand All @@ -98,6 +98,7 @@ describe('Job', () => {
test('strict mode ignores retry_count limit', async () => {
ENV.JOB_QUEUE_STRICT_MODE = true;
ENV.JOB_QUEUE_MAX_RETRIES = 0;
ENV.JOB_QUEUE_RETRY_AFTER_MS = 0;
const job = new TestRetryableJob({ db, job: dbJob });

await expect(job.work()).resolves.not.toThrow();
Expand All @@ -106,6 +107,19 @@ describe('Job', () => {
expect(jobs1[0].status).toBe('pending');
});

test('pending job batches consider retry_after', async () => {
ENV.JOB_QUEUE_RETRY_AFTER_MS = 200;
const job = new TestRetryableJob({ db, job: dbJob });

await expect(job.work()).resolves.not.toThrow();
const jobs1 = await db.getPendingJobBatch({ limit: 1 });
expect(jobs1).toHaveLength(0);

await timeout(300);
const jobs2 = await db.getPendingJobBatch({ limit: 1 });
expect(jobs2).toHaveLength(1);
});

test('db errors are not re-thrown', async () => {
await db.close();
const job = new TestDbJob({ db, job: dbJob });
Expand Down
2 changes: 1 addition & 1 deletion tests/token-queue/process-token-job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ describe('ProcessTokenJob', () => {
})
.reply(429, { error: 'nope' }, { headers: { 'retry-after': '999' } });
try {
await new ProcessTokenJob({ db, job: tokenJob }).handler();
await new ProcessTokenJob({ db, job: tokenJob }).work();
} catch (error) {
expect(error).toBeInstanceOf(RetryableJobError);
const err = error as RetryableJobError;
Expand Down

0 comments on commit 20d753a

Please sign in to comment.