diff --git a/.changeset/warm-seahorses-trade.md b/.changeset/warm-seahorses-trade.md index 20d14b40a65b..7298aa003199 100644 --- a/.changeset/warm-seahorses-trade.md +++ b/.changeset/warm-seahorses-trade.md @@ -2,4 +2,4 @@ "wrangler": minor --- -feature: add support for queue delivery controls on `wrangler queues create` and `wrangler queues consumer add` +feature: add support for queue delivery controls on `wrangler queues create` diff --git a/packages/wrangler/src/__tests__/queues.test.ts b/packages/wrangler/src/__tests__/queues.test.ts index ddaf14619b16..9ca0a8e7e38c 100644 --- a/packages/wrangler/src/__tests__/queues.test.ts +++ b/packages/wrangler/src/__tests__/queues.test.ts @@ -219,7 +219,7 @@ describe("wrangler", () => { -v, --version Show version number [boolean] Options: - --delivery-delay How long a published message should be delayed for, in seconds. Must be a positive integer [number]" + --delivery-delay-secs How long a published message should be delayed for, in seconds. Must be a positive integer [number]" `); }); @@ -274,7 +274,7 @@ describe("wrangler", () => { it("should send queue settings with delivery delay", async () => { const requests = mockCreateRequest("testQueue", { delivery_delay: 10 }); - await runWrangler("queues create testQueue --delivery-delay=10"); + await runWrangler("queues create testQueue --delivery-delay-secs=10"); expect(std.out).toMatchInlineSnapshot(` "Creating queue testQueue. Created queue testQueue." @@ -287,10 +287,10 @@ describe("wrangler", () => { await expect( runWrangler( - "queues create testQueue --delivery-delay=5 --delivery-delay=10" + "queues create testQueue --delivery-delay-secs=5 --delivery-delay-secs=10" ) ).rejects.toThrowErrorMatchingInlineSnapshot( - `"Cannot specify --delivery-delay multiple times"` + `"Cannot specify --delivery-delay-secs multiple times"` ); expect(requests.count).toEqual(0); @@ -430,7 +430,7 @@ describe("wrangler", () => { --message-retries Maximum number of retries for each message [number] --dead-letter-queue Queue to send messages that failed to be consumed [string] --max-concurrency The maximum number of concurrent consumer Worker invocations. Must be a positive integer [number] - --retry-delay How long a retried message should be delayed for, in seconds. Must be a positive integer [number]" + --retry-delay-secs The number of seconds to wait before retrying a message [number]" `); }); @@ -471,7 +471,7 @@ describe("wrangler", () => { mockPostRequest("testQueue", expectedBody); await runWrangler( - "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay=10" + "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=10" ); expect(std.out).toMatchInlineSnapshot(` "Adding consumer to queue testQueue. @@ -496,10 +496,10 @@ describe("wrangler", () => { await expect( runWrangler( - "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay=5 --retry-delay=10" + "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=5 --retry-delay-secs=10" ) ).rejects.toThrowErrorMatchingInlineSnapshot( - `"Cannot specify --retry-delay multiple times"` + `"Cannot specify --retry-delay-secs multiple times"` ); expect(requests.count).toEqual(0); diff --git a/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts index 9e0d0601a27d..8a80d4e53d6e 100644 --- a/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts +++ b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts @@ -1,4 +1,5 @@ import { readConfig } from "../../../../../config"; +import { CommandLineArgsError } from "../../../../../index"; import { logger } from "../../../../../logger"; import { postTypedConsumer } from "../../../../client"; import type { @@ -44,6 +45,12 @@ export async function handler( ) { const config = readConfig(args.config, args); + if (Array.isArray(args.retryDelaySecs)) { + throw new CommandLineArgsError( + `Cannot specify --retry-delay-secs multiple times` + ); + } + const postTypedConsumerBody: PostTypedConsumerBody = { type: "http_pull", settings: { diff --git a/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts b/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts index a9cb4ffb0e35..5c02e86314dc 100644 --- a/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts +++ b/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts @@ -1,13 +1,12 @@ -import { readConfig } from "../../../../config"; -import { CommandLineArgsError } from "../../../../index"; -import { logger } from "../../../../logger"; -import { postConsumer } from "../../../client"; -import { handleFetchError } from "../../../utils"; +import { readConfig } from "../../../../../config"; +import { CommandLineArgsError } from "../../../../../index"; +import { logger } from "../../../../../logger"; +import { postConsumer } from "../../../../client"; import type { CommonYargsArgv, StrictYargsOptionsToInterface, -} from "../../../../yargs-types"; -import type { PostConsumerBody } from "../../../client"; +} from "../../../../../yargs-types"; +import type { PostConsumerBody } from "../../../../client"; export function options(yargs: CommonYargsArgv) { return yargs @@ -44,18 +43,24 @@ export function options(yargs: CommonYargsArgv) { describe: "The maximum number of concurrent consumer Worker invocations. Must be a positive integer", }, - "retry-delay": { + "retry-delay-secs": { type: "number", - describe: - "How long a retried message should be delayed for, in seconds. Must be a positive integer", - number: true, + describe: "The number of seconds to wait before retrying a message", }, }); } -function createBody( +export async function handler( args: StrictYargsOptionsToInterface -): PostConsumerBody { +) { + const config = readConfig(args.config, args); + + if (Array.isArray(args.retryDelaySecs)) { + throw new CommandLineArgsError( + `Cannot specify --retry-delay-secs multiple times` + ); + } + const body: PostConsumerBody = { script_name: args.scriptName, // TODO(soon) is this still the correct usage of the environment? @@ -67,34 +72,12 @@ function createBody( ? 1000 * args.batchTimeout : undefined, max_concurrency: args.maxConcurrency, + retry_delay: args.retryDelaySecs, }, dead_letter_queue: args.deadLetterQueue, }; - if (Array.isArray(args.retryDelay)) { - throw new CommandLineArgsError( - `Cannot specify --retry-delay multiple times` - ); - } - - if (args.retryDelay != undefined) { - body.settings.retry_delay = args.retryDelay; - } - - return body; -} - -export async function handler( - args: StrictYargsOptionsToInterface -) { - const config = readConfig(args.config, args); - const body = createBody(args); - - try { - logger.log(`Adding consumer to queue ${args.queueName}.`); - await postConsumer(config, args.queueName, body); - logger.log(`Added consumer to queue ${args.queueName}.`); - } catch (e) { - handleFetchError(e as { code?: number }); - } + logger.log(`Adding consumer to queue ${args.queueName}.`); + await postConsumer(config, args.queueName, body); + logger.log(`Added consumer to queue ${args.queueName}.`); } diff --git a/packages/wrangler/src/queues/cli/commands/create.ts b/packages/wrangler/src/queues/cli/commands/create.ts index e897fab85154..c51da73bdf96 100644 --- a/packages/wrangler/src/queues/cli/commands/create.ts +++ b/packages/wrangler/src/queues/cli/commands/create.ts @@ -17,7 +17,7 @@ export function options(yargs: CommonYargsArgv) { description: "The name of the queue", }) .options({ - "delivery-delay": { + "delivery-delay-secs": { type: "number", describe: "How long a published message should be delayed for, in seconds. Must be a positive integer", @@ -32,15 +32,15 @@ function createBody( queue_name: args.name, }; - if (Array.isArray(args.deliveryDelay)) { + if (Array.isArray(args.deliveryDelaySecs)) { throw new CommandLineArgsError( - "Cannot specify --delivery-delay multiple times" + "Cannot specify --delivery-delay-secs multiple times" ); } - if (args.deliveryDelay != undefined) { + if (args.deliveryDelaySecs != undefined) { body.settings = { - delivery_delay: args.deliveryDelay, + delivery_delay: args.deliveryDelaySecs, }; } diff --git a/packages/wrangler/src/queues/client.ts b/packages/wrangler/src/queues/client.ts index c69ddc369164..3c35dbfcc953 100644 --- a/packages/wrangler/src/queues/client.ts +++ b/packages/wrangler/src/queues/client.ts @@ -1,5 +1,6 @@ import { fetchResult } from "../cfetch"; import { type Config } from "../config"; +import { UserError } from "../errors"; import { requireAuth } from "../user"; export async function createQueue( @@ -19,7 +20,7 @@ export interface CreateQueueBody { } export interface QueueSettings { - delivery_delay?: number; + delivery_delay: number; } export interface ScriptReference { @@ -46,7 +47,7 @@ export interface QueueResponse { producers_total_count: number; consumers: Consumer[]; consumers_total_count: number; - settings: QueueSettings; + settings?: QueueSettings; } export async function deleteQueue( @@ -102,6 +103,50 @@ export async function postConsumer( ); } +export async function postTypedConsumer( + config: Config, + queueName: string, + body: PostTypedConsumerBody +): Promise { + const accountId = await requireAuth(config); + const queue = await getQueue(config, queueName); + return fetchResult( + `/accounts/${accountId}/queues/${queue.queue_id}/consumers`, + { + method: "POST", + body: JSON.stringify(body), + } + ); +} + +export async function putTypedConsumer( + config: Config, + queueId: string, + consumerId: string, + body: PostTypedConsumerBody +): Promise { + const accountId = await requireAuth(config); + + return fetchResult( + `/accounts/${accountId}/queues/${queueId}/consumers/${consumerId}`, + { + method: "PUT", + body: JSON.stringify(body), + } + ); +} + +export interface TypedConsumerResponse extends Consumer { + queue_name: string; + created_on: string; +} + +export interface PostTypedConsumerBody extends PutConsumerBody { + type: string; + script_name?: string; + environment_name?: string; +} + export interface PutConsumerBody { settings: ConsumerSettings; dead_letter_queue?: string; @@ -117,6 +162,7 @@ export interface ConsumerSettings { max_retries?: number; max_wait_time_ms?: number; max_concurrency?: number | null; + visibility_timeout_ms?: number; retry_delay?: number; } @@ -128,6 +174,22 @@ export interface ConsumerResponse extends PostConsumerBody { dead_letter_queue?: string; } +export async function deletePullConsumer( + config: Config, + queueName: string +): Promise { + const accountId = await requireAuth(config); + const queue = await getQueue(config, queueName); + const consumer = queue.consumers[0]; + if (consumer?.type !== "http_pull") { + throw new UserError(`No http_pull consumer exists for queue ${queueName}`); + } + const resource = `/accounts/${accountId}/queues/${queue.queue_id}/consumers/${consumer.consumer_id}`; + return fetchResult(resource, { + method: "DELETE", + }); +} + export async function deleteConsumer( config: Config, queueName: string,