Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DeliveryDelay and RetryDelay to create Queue and Add Worker Consumer commands. #5102

Merged
merged 17 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/warm-seahorses-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"wrangler": minor
---

feature: add support for queue delivery controls on `wrangler queues create`
81 changes: 77 additions & 4 deletions packages/wrangler/src/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ describe("wrangler", () => {
producers_total_count: 0,
consumers: [],
consumers_total_count: 0,
settings: {
delivery_delay: 0,
},
},
{
queue_id: "def19fa3787741579c9088eb850474af",
Expand All @@ -106,6 +109,9 @@ describe("wrangler", () => {
producers_total_count: 0,
consumers: [],
consumers_total_count: 0,
settings: {
delivery_delay: 0,
},
},
];
const expectedPage = 1;
Expand Down Expand Up @@ -135,6 +141,9 @@ describe("wrangler", () => {
producers_total_count: 0,
consumers: [],
consumers_total_count: 0,
settings: {
delivery_delay: 0,
},
},
];
const expectedPage = 2;
Expand All @@ -153,18 +162,26 @@ describe("wrangler", () => {
});

describe("create", () => {
function mockCreateRequest(expectedQueueName: string) {
function mockCreateRequest(
expectedQueueName: string,
queueSettings: { delivery_delay?: number } | undefined = undefined
) {
const requests = { count: 0 };

msw.use(
rest.post(
"*/accounts/:accountId/workers/queues",
async (request, response, context) => {
requests.count += 1;

const body = (await request.json()) as {
queue_name: string;
settings: {
delivery_delay: number;
};
};
expect(body.queue_name).toEqual(expectedQueueName);
expect(body.settings).toEqual(queueSettings);
return response.once(
context.json({
success: true,
Expand Down Expand Up @@ -199,7 +216,10 @@ describe("wrangler", () => {
-c, --config Path to .toml configuration file [string]
-e, --env Environment to use for operations and .env files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]"
-v, --version Show version number [boolean]

Options:
--delivery-delay-secs How long a published message should be delayed for, in seconds. Must be a positive integer [number]"
`);
});

Expand Down Expand Up @@ -251,6 +271,30 @@ describe("wrangler", () => {
"
`);
});

it("should send queue settings with delivery delay", async () => {
const requests = mockCreateRequest("testQueue", { delivery_delay: 10 });
await runWrangler("queues create testQueue --delivery-delay-secs=10");
expect(std.out).toMatchInlineSnapshot(`
"Creating queue testQueue.
Created queue testQueue."
`);
expect(requests.count).toEqual(1);
});

it("should show an error when two delivery delays are set", async () => {
const requests = mockCreateRequest("testQueue", { delivery_delay: 0 });

await expect(
runWrangler(
"queues create testQueue --delivery-delay-secs=5 --delivery-delay-secs=10"
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Cannot specify --delivery-delay-secs multiple times"`
);

expect(requests.count).toEqual(0);
});
});

describe("delete", () => {
Expand Down Expand Up @@ -385,7 +429,8 @@ describe("wrangler", () => {
--batch-timeout Maximum number of seconds to wait to fill a batch with messages [number]
--message-retries Maximum number of retries for each message [number]
--dead-letter-queue Queue to send messages that failed to be consumed [string]
--max-concurrency The maximum number of concurrent consumer Worker invocations. Must be a positive integer [number]"
--max-concurrency The maximum number of concurrent consumer Worker invocations. Must be a positive integer [number]
--retry-delay-secs The number of seconds to wait before retrying a message [number]"
`);
});

Expand All @@ -398,6 +443,7 @@ describe("wrangler", () => {
max_retries: undefined,
max_wait_time_ms: undefined,
max_concurrency: undefined,
retry_delay: undefined,
},
dead_letter_queue: undefined,
};
Expand All @@ -418,20 +464,47 @@ describe("wrangler", () => {
max_retries: 3,
max_wait_time_ms: 10 * 1000,
max_concurrency: 3,
retry_delay: 10,
},
dead_letter_queue: "myDLQ",
};
mockPostRequest("testQueue", expectedBody);

await runWrangler(
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ"
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=10"
);
expect(std.out).toMatchInlineSnapshot(`
"Adding consumer to queue testQueue.
Added consumer to queue testQueue."
`);
});

it("should show an error when two retry delays are set", async () => {
const expectedBody: PostConsumerBody = {
script_name: "testScript",
environment_name: "myEnv",
settings: {
batch_size: 20,
max_retries: 3,
max_wait_time_ms: 10 * 1000,
max_concurrency: 3,
retry_delay: 0,
},
dead_letter_queue: "myDLQ",
};
const requests = mockPostRequest("testQueue", expectedBody);

await expect(
runWrangler(
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=5 --retry-delay-secs=10"
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Cannot specify --retry-delay-secs multiple times"`
);

expect(requests.count).toEqual(0);
});

it("should show link to dash when not enabled", async () => {
const queueName = "testQueue";
msw.use(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { readConfig } from "../../../../../config";
import { CommandLineArgsError } from "../../../../../index";
import { logger } from "../../../../../logger";
import { postTypedConsumer } from "../../../../client";
import type {
Expand Down Expand Up @@ -44,6 +45,12 @@
) {
const config = readConfig(args.config, args);

if (Array.isArray(args.retryDelaySecs)) {
throw new CommandLineArgsError(

Check warning on line 49 in packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts

View check run for this annotation

Codecov / codecov/patch

packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts#L49

Added line #L49 was not covered by tests
`Cannot specify --retry-delay-secs multiple times`
);
}

const postTypedConsumerBody: PostTypedConsumerBody = {
type: "http_pull",
settings: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { readConfig } from "../../../../../config";
import { CommandLineArgsError } from "../../../../../index";
import { logger } from "../../../../../logger";
import { postConsumer } from "../../../../client";
import type {
Expand Down Expand Up @@ -42,6 +43,10 @@ export function options(yargs: CommonYargsArgv) {
describe:
"The maximum number of concurrent consumer Worker invocations. Must be a positive integer",
},
"retry-delay-secs": {
type: "number",
describe: "The number of seconds to wait before retrying a message",
},
});
}

Expand All @@ -50,6 +55,12 @@ export async function handler(
) {
const config = readConfig(args.config, args);

if (Array.isArray(args.retryDelaySecs)) {
throw new CommandLineArgsError(
`Cannot specify --retry-delay-secs multiple times`
);
}

const body: PostConsumerBody = {
script_name: args.scriptName,
// TODO(soon) is this still the correct usage of the environment?
Expand All @@ -61,6 +72,7 @@ export async function handler(
? 1000 * args.batchTimeout
: undefined,
max_concurrency: args.maxConcurrency,
retry_delay: args.retryDelaySecs,
},
dead_letter_queue: args.deadLetterQueue,
};
Expand Down
55 changes: 46 additions & 9 deletions packages/wrangler/src/queues/cli/commands/create.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,62 @@
import { readConfig } from "../../../config";
import { CommandLineArgsError } from "../../../index";
import { logger } from "../../../logger";
import { createQueue } from "../../client";
import { handleFetchError } from "../../utils";
import type {
CommonYargsArgv,
StrictYargsOptionsToInterface,
} from "../../../yargs-types";
import type { CreateQueueBody } from "../../client";

export function options(yargs: CommonYargsArgv) {
return yargs.positional("name", {
type: "string",
demandOption: true,
description: "The name of the queue",
});
return yargs
.positional("name", {
type: "string",
demandOption: true,
description: "The name of the queue",
})
.options({
"delivery-delay-secs": {
type: "number",
describe:
"How long a published message should be delayed for, in seconds. Must be a positive integer",
},
});
}

function createBody(
args: StrictYargsOptionsToInterface<typeof options>
): CreateQueueBody {
const body: CreateQueueBody = {
queue_name: args.name,
};

if (Array.isArray(args.deliveryDelaySecs)) {
throw new CommandLineArgsError(
"Cannot specify --delivery-delay-secs multiple times"
);
}

if (args.deliveryDelaySecs != undefined) {
body.settings = {
delivery_delay: args.deliveryDelaySecs,
};
}

return body;
}

export async function handler(
args: StrictYargsOptionsToInterface<typeof options>
) {
const config = readConfig(args.config, args);

logger.log(`Creating queue ${args.name}.`);
await createQueue(config, { queue_name: args.name });
logger.log(`Created queue ${args.name}.`);
const body = createBody(args);
try {
logger.log(`Creating queue ${args.name}.`);
await createQueue(config, body);
logger.log(`Created queue ${args.name}.`);
} catch (e) {
handleFetchError(e as { code?: number });
}
}
6 changes: 6 additions & 0 deletions packages/wrangler/src/queues/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ export async function createQueue(

export interface CreateQueueBody {
queue_name: string;
settings?: QueueSettings;
}

export interface QueueSettings {
delivery_delay: number;
}

export interface ScriptReference {
Expand All @@ -42,6 +47,7 @@ export interface QueueResponse {
producers_total_count: number;
consumers: Consumer[];
consumers_total_count: number;
settings?: QueueSettings;
}

export async function deleteQueue(
Expand Down
2 changes: 2 additions & 0 deletions packages/wrangler/src/queues/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const INVALID_CONSUMER_SETTINGS_ERROR = 100127;
export const INVALID_QUEUE_SETTINGS_ERROR = 100128;
19 changes: 18 additions & 1 deletion packages/wrangler/src/queues/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { UserError } from "../errors";
import { logger } from "../logger";
import { type ParseError } from "../parse";
import { getAccountId } from "../user";
import {
INVALID_CONSUMER_SETTINGS_ERROR,
INVALID_QUEUE_SETTINGS_ERROR,
} from "./constants";

const isFetchError = (err: unknown): err is ParseError => err instanceof Error;

Expand All @@ -14,5 +19,17 @@
);
}
}
throw err;
return err;
};

export function handleFetchError(e: { code?: number }): void {
if (e.code === INVALID_CONSUMER_SETTINGS_ERROR) {
throw new UserError(`The specified consumer settings are invalid.`);

Check warning on line 27 in packages/wrangler/src/queues/utils.ts

View check run for this annotation

Codecov / codecov/patch

packages/wrangler/src/queues/utils.ts#L27

Added line #L27 was not covered by tests
}

if (e.code === INVALID_QUEUE_SETTINGS_ERROR) {
throw new UserError(`The specified queue settings are invalid.`);

Check warning on line 31 in packages/wrangler/src/queues/utils.ts

View check run for this annotation

Codecov / codecov/patch

packages/wrangler/src/queues/utils.ts#L31

Added line #L31 was not covered by tests
}

throw e;
}
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading