Skip to content

Commit

Permalink
Add DeliveryDelay and RetryDelay to create Queue and Add Worker Consu…
Browse files Browse the repository at this point in the history
…mer commands. (#5102)

* Added builders for consumer and queue POST bodies

* Unit tests for queues create with delivery delay

* Added error handling for invalid queue settings

* Helper Text

* Unit tests for consumer add with retry settings

* Updated tests check for no-parameter and parameter

* PR suggestions

* Changed yargs.fail error handling

* Removed delivery delay from create consumer and queue

* Prettier run

* Removed stale comment

* Update packages/wrangler/src/queues/cli/commands/create.ts

Co-authored-by: MrBBot <me@mrbbot.dev>

* Update packages/wrangler/src/queues/cli/commands/consumer/add.ts

Co-authored-by: MrBBot <me@mrbbot.dev>

* Update packages/wrangler/src/__tests__/queues.test.ts

Co-authored-by: MrBBot <me@mrbbot.dev>

* Update .changeset/warm-seahorses-trade.md

Co-authored-by: MrBBot <me@mrbbot.dev>

* Tests passing

* Updated commands to support HTTP Pull consumers

---------

Co-authored-by: MrBBot <me@mrbbot.dev>
  • Loading branch information
pmiguel and mrbbot authored Mar 26, 2024
1 parent 3be826f commit ba52208
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/warm-seahorses-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"wrangler": minor
---

feature: add support for queue delivery controls on `wrangler queues create`
81 changes: 77 additions & 4 deletions packages/wrangler/src/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ describe("wrangler", () => {
producers_total_count: 0,
consumers: [],
consumers_total_count: 0,
settings: {
delivery_delay: 0,
},
},
{
queue_id: "def19fa3787741579c9088eb850474af",
Expand All @@ -106,6 +109,9 @@ describe("wrangler", () => {
producers_total_count: 0,
consumers: [],
consumers_total_count: 0,
settings: {
delivery_delay: 0,
},
},
];
const expectedPage = 1;
Expand Down Expand Up @@ -135,6 +141,9 @@ describe("wrangler", () => {
producers_total_count: 0,
consumers: [],
consumers_total_count: 0,
settings: {
delivery_delay: 0,
},
},
];
const expectedPage = 2;
Expand All @@ -153,18 +162,26 @@ describe("wrangler", () => {
});

describe("create", () => {
function mockCreateRequest(expectedQueueName: string) {
function mockCreateRequest(
expectedQueueName: string,
queueSettings: { delivery_delay?: number } | undefined = undefined
) {
const requests = { count: 0 };

msw.use(
rest.post(
"*/accounts/:accountId/workers/queues",
async (request, response, context) => {
requests.count += 1;

const body = (await request.json()) as {
queue_name: string;
settings: {
delivery_delay: number;
};
};
expect(body.queue_name).toEqual(expectedQueueName);
expect(body.settings).toEqual(queueSettings);
return response.once(
context.json({
success: true,
Expand Down Expand Up @@ -199,7 +216,10 @@ describe("wrangler", () => {
-c, --config Path to .toml configuration file [string]
-e, --env Environment to use for operations and .env files [string]
-h, --help Show help [boolean]
-v, --version Show version number [boolean]"
-v, --version Show version number [boolean]
Options:
--delivery-delay-secs How long a published message should be delayed for, in seconds. Must be a positive integer [number]"
`);
});

Expand Down Expand Up @@ -251,6 +271,30 @@ describe("wrangler", () => {
"
`);
});

it("should send queue settings with delivery delay", async () => {
const requests = mockCreateRequest("testQueue", { delivery_delay: 10 });
await runWrangler("queues create testQueue --delivery-delay-secs=10");
expect(std.out).toMatchInlineSnapshot(`
"Creating queue testQueue.
Created queue testQueue."
`);
expect(requests.count).toEqual(1);
});

it("should show an error when two delivery delays are set", async () => {
const requests = mockCreateRequest("testQueue", { delivery_delay: 0 });

await expect(
runWrangler(
"queues create testQueue --delivery-delay-secs=5 --delivery-delay-secs=10"
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Cannot specify --delivery-delay-secs multiple times"`
);

expect(requests.count).toEqual(0);
});
});

describe("delete", () => {
Expand Down Expand Up @@ -385,7 +429,8 @@ describe("wrangler", () => {
--batch-timeout Maximum number of seconds to wait to fill a batch with messages [number]
--message-retries Maximum number of retries for each message [number]
--dead-letter-queue Queue to send messages that failed to be consumed [string]
--max-concurrency The maximum number of concurrent consumer Worker invocations. Must be a positive integer [number]"
--max-concurrency The maximum number of concurrent consumer Worker invocations. Must be a positive integer [number]
--retry-delay-secs The number of seconds to wait before retrying a message [number]"
`);
});

Expand All @@ -398,6 +443,7 @@ describe("wrangler", () => {
max_retries: undefined,
max_wait_time_ms: undefined,
max_concurrency: undefined,
retry_delay: undefined,
},
dead_letter_queue: undefined,
};
Expand All @@ -418,20 +464,47 @@ describe("wrangler", () => {
max_retries: 3,
max_wait_time_ms: 10 * 1000,
max_concurrency: 3,
retry_delay: 10,
},
dead_letter_queue: "myDLQ",
};
mockPostRequest("testQueue", expectedBody);

await runWrangler(
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ"
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=10"
);
expect(std.out).toMatchInlineSnapshot(`
"Adding consumer to queue testQueue.
Added consumer to queue testQueue."
`);
});

it("should show an error when two retry delays are set", async () => {
const expectedBody: PostConsumerBody = {
script_name: "testScript",
environment_name: "myEnv",
settings: {
batch_size: 20,
max_retries: 3,
max_wait_time_ms: 10 * 1000,
max_concurrency: 3,
retry_delay: 0,
},
dead_letter_queue: "myDLQ",
};
const requests = mockPostRequest("testQueue", expectedBody);

await expect(
runWrangler(
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=5 --retry-delay-secs=10"
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Cannot specify --retry-delay-secs multiple times"`
);

expect(requests.count).toEqual(0);
});

it("should show link to dash when not enabled", async () => {
const queueName = "testQueue";
msw.use(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { readConfig } from "../../../../../config";
import { CommandLineArgsError } from "../../../../../index";
import { logger } from "../../../../../logger";
import { postTypedConsumer } from "../../../../client";
import type {
Expand Down Expand Up @@ -44,6 +45,12 @@ export async function handler(
) {
const config = readConfig(args.config, args);

if (Array.isArray(args.retryDelaySecs)) {
throw new CommandLineArgsError(
`Cannot specify --retry-delay-secs multiple times`
);
}

const postTypedConsumerBody: PostTypedConsumerBody = {
type: "http_pull",
settings: {
Expand Down
12 changes: 12 additions & 0 deletions packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { readConfig } from "../../../../../config";
import { CommandLineArgsError } from "../../../../../index";
import { logger } from "../../../../../logger";
import { postConsumer } from "../../../../client";
import type {
Expand Down Expand Up @@ -42,6 +43,10 @@ export function options(yargs: CommonYargsArgv) {
describe:
"The maximum number of concurrent consumer Worker invocations. Must be a positive integer",
},
"retry-delay-secs": {
type: "number",
describe: "The number of seconds to wait before retrying a message",
},
});
}

Expand All @@ -50,6 +55,12 @@ export async function handler(
) {
const config = readConfig(args.config, args);

if (Array.isArray(args.retryDelaySecs)) {
throw new CommandLineArgsError(
`Cannot specify --retry-delay-secs multiple times`
);
}

const body: PostConsumerBody = {
script_name: args.scriptName,
// TODO(soon) is this still the correct usage of the environment?
Expand All @@ -61,6 +72,7 @@ export async function handler(
? 1000 * args.batchTimeout
: undefined,
max_concurrency: args.maxConcurrency,
retry_delay: args.retryDelaySecs,
},
dead_letter_queue: args.deadLetterQueue,
};
Expand Down
55 changes: 46 additions & 9 deletions packages/wrangler/src/queues/cli/commands/create.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,62 @@
import { readConfig } from "../../../config";
import { CommandLineArgsError } from "../../../index";
import { logger } from "../../../logger";
import { createQueue } from "../../client";
import { handleFetchError } from "../../utils";
import type {
CommonYargsArgv,
StrictYargsOptionsToInterface,
} from "../../../yargs-types";
import type { CreateQueueBody } from "../../client";

export function options(yargs: CommonYargsArgv) {
return yargs.positional("name", {
type: "string",
demandOption: true,
description: "The name of the queue",
});
return yargs
.positional("name", {
type: "string",
demandOption: true,
description: "The name of the queue",
})
.options({
"delivery-delay-secs": {
type: "number",
describe:
"How long a published message should be delayed for, in seconds. Must be a positive integer",
},
});
}

function createBody(
args: StrictYargsOptionsToInterface<typeof options>
): CreateQueueBody {
const body: CreateQueueBody = {
queue_name: args.name,
};

if (Array.isArray(args.deliveryDelaySecs)) {
throw new CommandLineArgsError(
"Cannot specify --delivery-delay-secs multiple times"
);
}

if (args.deliveryDelaySecs != undefined) {
body.settings = {
delivery_delay: args.deliveryDelaySecs,
};
}

return body;
}

export async function handler(
args: StrictYargsOptionsToInterface<typeof options>
) {
const config = readConfig(args.config, args);

logger.log(`Creating queue ${args.name}.`);
await createQueue(config, { queue_name: args.name });
logger.log(`Created queue ${args.name}.`);
const body = createBody(args);
try {
logger.log(`Creating queue ${args.name}.`);
await createQueue(config, body);
logger.log(`Created queue ${args.name}.`);
} catch (e) {
handleFetchError(e as { code?: number });
}
}
6 changes: 6 additions & 0 deletions packages/wrangler/src/queues/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ export async function createQueue(

export interface CreateQueueBody {
queue_name: string;
settings?: QueueSettings;
}

export interface QueueSettings {
delivery_delay: number;
}

export interface ScriptReference {
Expand All @@ -42,6 +47,7 @@ export interface QueueResponse {
producers_total_count: number;
consumers: Consumer[];
consumers_total_count: number;
settings?: QueueSettings;
}

export async function deleteQueue(
Expand Down
2 changes: 2 additions & 0 deletions packages/wrangler/src/queues/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const INVALID_CONSUMER_SETTINGS_ERROR = 100127;
export const INVALID_QUEUE_SETTINGS_ERROR = 100128;
19 changes: 18 additions & 1 deletion packages/wrangler/src/queues/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { UserError } from "../errors";
import { logger } from "../logger";
import { type ParseError } from "../parse";
import { getAccountId } from "../user";
import {
INVALID_CONSUMER_SETTINGS_ERROR,
INVALID_QUEUE_SETTINGS_ERROR,
} from "./constants";

const isFetchError = (err: unknown): err is ParseError => err instanceof Error;

Expand All @@ -14,5 +19,17 @@ export const HandleUnauthorizedError = async (_msg: string, err: Error) => {
);
}
}
throw err;
return err;
};

export function handleFetchError(e: { code?: number }): void {
if (e.code === INVALID_CONSUMER_SETTINGS_ERROR) {
throw new UserError(`The specified consumer settings are invalid.`);
}

if (e.code === INVALID_QUEUE_SETTINGS_ERROR) {
throw new UserError(`The specified queue settings are invalid.`);
}

throw e;
}
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ba52208

Please sign in to comment.