From b3399a1aab6ab1ab9d1debb9f480bfe5634f5a2a Mon Sep 17 00:00:00 2001 From: Pedro Leal Date: Thu, 21 Mar 2024 15:13:57 +0000 Subject: [PATCH] Sync with HTTP typed consumers --- .changeset/fuzzy-mangos-sparkle.md | 7 + .changeset/warm-seahorses-trade.md | 2 +- .../wrangler/src/__tests__/deploy.test.ts | 140 ++++++++- .../wrangler/src/__tests__/queues.test.ts | 273 +++++++++++++++++- packages/wrangler/src/config/environment.ts | 11 +- packages/wrangler/src/config/validation.ts | 6 + packages/wrangler/src/deploy/deploy.ts | 108 +++++-- .../cli/commands/consumer/http-pull/add.ts | 70 +++++ .../cli/commands/consumer/http-pull/index.ts | 25 ++ .../cli/commands/consumer/http-pull/remove.ts | 26 ++ .../src/queues/cli/commands/consumer/index.ts | 29 +- .../cli/commands/consumer/{ => worker}/add.ts | 63 ++-- .../cli/commands/consumer/worker/index.ts | 19 ++ .../commands/consumer/{ => worker}/remove.ts | 8 +- .../src/queues/cli/commands/create.ts | 10 +- .../wrangler/src/queues/cli/commands/index.ts | 2 +- packages/wrangler/src/queues/client.ts | 67 ++++- 17 files changed, 766 insertions(+), 100 deletions(-) create mode 100644 .changeset/fuzzy-mangos-sparkle.md create mode 100644 packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts create mode 100644 packages/wrangler/src/queues/cli/commands/consumer/http-pull/index.ts create mode 100644 packages/wrangler/src/queues/cli/commands/consumer/http-pull/remove.ts rename packages/wrangler/src/queues/cli/commands/consumer/{ => worker}/add.ts (62%) create mode 100644 packages/wrangler/src/queues/cli/commands/consumer/worker/index.ts rename packages/wrangler/src/queues/cli/commands/consumer/{ => worker}/remove.ts (79%) diff --git a/.changeset/fuzzy-mangos-sparkle.md b/.changeset/fuzzy-mangos-sparkle.md new file mode 100644 index 000000000000..5e2bbc2c0b23 --- /dev/null +++ b/.changeset/fuzzy-mangos-sparkle.md @@ -0,0 +1,7 @@ +--- +"wrangler": minor +--- + +feature: Add support for configuring HTTP Pull consumers for Queues + +HTTP Pull consumers can be used to pull messages from queues via https request. diff --git a/.changeset/warm-seahorses-trade.md b/.changeset/warm-seahorses-trade.md index 20d14b40a65b..7298aa003199 100644 --- a/.changeset/warm-seahorses-trade.md +++ b/.changeset/warm-seahorses-trade.md @@ -2,4 +2,4 @@ "wrangler": minor --- -feature: add support for queue delivery controls on `wrangler queues create` and `wrangler queues consumer add` +feature: add support for queue delivery controls on `wrangler queues create` diff --git a/packages/wrangler/src/__tests__/deploy.test.ts b/packages/wrangler/src/__tests__/deploy.test.ts index 4d850920d293..79cf78a3f92c 100644 --- a/packages/wrangler/src/__tests__/deploy.test.ts +++ b/packages/wrangler/src/__tests__/deploy.test.ts @@ -51,7 +51,7 @@ import writeWranglerToml from "./helpers/write-wrangler-toml"; import type { Config } from "../config"; import type { CustomDomain, CustomDomainChangeset } from "../deploy/deploy"; import type { KVNamespaceInfo } from "../kv/helpers"; -import type { PutConsumerBody } from "../queues/client"; +import type { PostTypedConsumerBody, PutConsumerBody } from "../queues/client"; import type { RestRequest } from "msw"; describe("deploy", () => { @@ -8643,6 +8643,112 @@ export default{ `); }); + it("should post queue http consumers on deploy", async () => { + writeWranglerToml({ + queues: { + consumers: [ + { + queue: "queue1", + type: "http_pull", + dead_letter_queue: "myDLQ", + max_batch_size: 5, + visibility_timeout_ms: 4000, + max_retries: 10, + retry_delay: 1, + }, + ], + }, + }); + await fs.promises.writeFile("index.js", `export default {};`); + mockSubDomainRequest(); + mockUploadWorkerRequest(); + mockGetQueue("queue1", "queue1-queue-id"); + mockPostQueueHTTPConsumer("queue1-queue-id", { + type: "http_pull", + dead_letter_queue: "myDLQ", + settings: { + batch_size: 5, + max_retries: 10, + visibility_timeout_ms: 4000, + retry_delay: 1, + }, + }); + await runWrangler("deploy index.js"); + expect(std.out).toMatchInlineSnapshot(` + "Total Upload: xx KiB / gzip: xx KiB + Uploaded test-name (TIMINGS) + Published test-name (TIMINGS) + https://test-name.test-sub-domain.workers.dev + Consumer for queue1 + Current Deployment ID: Galaxy-Class" + `); + }); + + it("should update queue http consumers when one already exists for queue", async () => { + writeWranglerToml({ + queues: { + consumers: [ + { + queue: "queue1", + type: "http_pull", + }, + ], + }, + }); + await fs.promises.writeFile("index.js", `export default {};`); + mockSubDomainRequest(); + mockUploadWorkerRequest(); + msw.use( + rest.get( + `*/accounts/:accountId/workers/queues/queue1`, + (req, res, ctx) => { + expect(req.params.accountId).toEqual("some-account-id"); + return res( + ctx.json({ + success: true, + errors: [], + messages: [], + result: { + queue: "queue1", + queue_id: "queue1-queue-id", + consumers: [ + { type: "http_pull", consumer_id: "queue1-consumer-id" }, + ], + }, + }) + ); + } + ) + ); + msw.use( + rest.put( + `*/accounts/:accountId/queues/:queueId/consumers/:consumerId`, + async (req, res, ctx) => { + expect(req.params.queueId).toEqual("queue1-queue-id"); + expect(req.params.consumerId).toEqual("queue1-consumer-id"); + expect(req.params.accountId).toEqual("some-account-id"); + return res( + ctx.json({ + success: true, + errors: [], + messages: [], + result: null, + }) + ); + } + ) + ); + await runWrangler("deploy index.js"); + expect(std.out).toMatchInlineSnapshot(` + "Total Upload: xx KiB / gzip: xx KiB + Uploaded test-name (TIMINGS) + Published test-name (TIMINGS) + https://test-name.test-sub-domain.workers.dev + Consumer for queue1 + Current Deployment ID: Galaxy-Class" + `); + }); + it("should support queue consumer concurrency with a max concurrency specified", async () => { writeWranglerToml({ queues: { @@ -9541,7 +9647,7 @@ function mockServiceScriptData(options: { } } -function mockGetQueue(expectedQueueName: string) { +function mockGetQueue(expectedQueueName: string, expectedQueueId?: string) { const requests = { count: 0 }; msw.use( rest.get( @@ -9554,7 +9660,7 @@ function mockGetQueue(expectedQueueName: string) { success: true, errors: [], messages: [], - result: { queue: expectedQueueName }, + result: { queue: expectedQueueName, queue_id: expectedQueueId }, }) ); } @@ -9619,6 +9725,34 @@ function mockPutQueueConsumer( return requests; } +function mockPostQueueHTTPConsumer( + expectedQueueId: string, + expectedBody: PostTypedConsumerBody +) { + const requests = { count: 0 }; + msw.use( + rest.post( + `*/accounts/:accountId/queues/:queueId/consumers`, + async (req, res, ctx) => { + const body = await req.json(); + expect(req.params.queueId).toEqual(expectedQueueId); + expect(req.params.accountId).toEqual("some-account-id"); + expect(body).toEqual(expectedBody); + requests.count += 1; + return res( + ctx.json({ + success: true, + errors: [], + messages: [], + result: {}, + }) + ); + } + ) + ); + return requests; +} + // MSW FormData & Blob polyfills to test FormData requests function mockFormDataToString(this: FormData) { const entries = []; diff --git a/packages/wrangler/src/__tests__/queues.test.ts b/packages/wrangler/src/__tests__/queues.test.ts index 3132fc058854..9ca0a8e7e38c 100644 --- a/packages/wrangler/src/__tests__/queues.test.ts +++ b/packages/wrangler/src/__tests__/queues.test.ts @@ -4,7 +4,11 @@ import { mockConsoleMethods } from "./helpers/mock-console"; import { msw } from "./helpers/msw"; import { runInTempDir } from "./helpers/run-in-tmp"; import { runWrangler } from "./helpers/run-wrangler"; -import type { PostConsumerBody, QueueResponse } from "../queues/client"; +import type { + PostConsumerBody, + PostTypedConsumerBody, + QueueResponse, +} from "../queues/client"; describe("wrangler", () => { mockAccountId(); @@ -215,7 +219,7 @@ describe("wrangler", () => { -v, --version Show version number [boolean] Options: - --delivery-delay How long a published message should be delayed for, in seconds. Must be a positive integer [number]" + --delivery-delay-secs How long a published message should be delayed for, in seconds. Must be a positive integer [number]" `); }); @@ -270,7 +274,7 @@ describe("wrangler", () => { it("should send queue settings with delivery delay", async () => { const requests = mockCreateRequest("testQueue", { delivery_delay: 10 }); - await runWrangler("queues create testQueue --delivery-delay=10"); + await runWrangler("queues create testQueue --delivery-delay-secs=10"); expect(std.out).toMatchInlineSnapshot(` "Creating queue testQueue. Created queue testQueue." @@ -283,10 +287,10 @@ describe("wrangler", () => { await expect( runWrangler( - "queues create testQueue --delivery-delay=5 --delivery-delay=10" + "queues create testQueue --delivery-delay-secs=5 --delivery-delay-secs=10" ) ).rejects.toThrowErrorMatchingInlineSnapshot( - `"Cannot specify --delivery-delay multiple times"` + `"Cannot specify --delivery-delay-secs multiple times"` ); expect(requests.count).toEqual(0); @@ -359,8 +363,10 @@ describe("wrangler", () => { Configure Queue Consumers Commands: - wrangler queues consumer add Add a Queue Consumer - wrangler queues consumer remove Remove a Queue Consumer + wrangler queues consumer add Add a Queue Worker Consumer + wrangler queues consumer remove Remove a Queue Worker Consumer + wrangler queues consumer http Configure Queue HTTP Pull Consumers + wrangler queues consumer worker Configure Queue Worker Consumers Flags: -j, --experimental-json-config Experimental: Support wrangler.json [boolean] @@ -405,7 +411,7 @@ describe("wrangler", () => { expect(std.out).toMatchInlineSnapshot(` "wrangler queues consumer add - Add a Queue Consumer + Add a Queue Worker Consumer Positionals: queue-name Name of the queue to configure [string] [required] @@ -424,7 +430,7 @@ describe("wrangler", () => { --message-retries Maximum number of retries for each message [number] --dead-letter-queue Queue to send messages that failed to be consumed [string] --max-concurrency The maximum number of concurrent consumer Worker invocations. Must be a positive integer [number] - --retry-delay How long a retried message should be delayed for, in seconds. Must be a positive integer [number]" + --retry-delay-secs The number of seconds to wait before retrying a message [number]" `); }); @@ -465,7 +471,7 @@ describe("wrangler", () => { mockPostRequest("testQueue", expectedBody); await runWrangler( - "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay=10" + "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=10" ); expect(std.out).toMatchInlineSnapshot(` "Adding consumer to queue testQueue. @@ -490,10 +496,10 @@ describe("wrangler", () => { await expect( runWrangler( - "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay=5 --retry-delay=10" + "queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=5 --retry-delay-secs=10" ) ).rejects.toThrowErrorMatchingInlineSnapshot( - `"Cannot specify --retry-delay multiple times"` + `"Cannot specify --retry-delay-secs multiple times"` ); expect(requests.count).toEqual(0); @@ -584,7 +590,7 @@ describe("wrangler", () => { expect(std.out).toMatchInlineSnapshot(` "wrangler queues consumer remove - Remove a Queue Consumer + Remove a Queue Worker Consumer Positionals: queue-name Name of the queue to configure [string] [required] @@ -628,5 +634,246 @@ describe("wrangler", () => { }); }); }); + + describe("http_pull consumers", () => { + it("should show the correct help text", async () => { + await runWrangler("queues consumer http --help"); + + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + "wrangler queues consumer http + + Configure Queue HTTP Pull Consumers + + Commands: + wrangler queues consumer http add Add a Queue HTTP Pull Consumer + wrangler queues consumer http remove Remove a Queue HTTP Pull Consumer + + Flags: + -j, --experimental-json-config Experimental: Support wrangler.json [boolean] + -c, --config Path to .toml configuration file [string] + -e, --env Environment to use for operations and .env files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean]" + `); + }); + + describe("add", () => { + function mockGetQueueRequest(expectedQueueName: string) { + const requests = { count: 0 }; + msw.use( + rest.get( + "*/accounts/:accountId/workers/queues/:queueName", + async (request, response, context) => { + requests.count += 1; + expect(request.params.queueName).toEqual(expectedQueueName); + expect(request.params.accountId).toEqual("some-account-id"); + return response.once( + context.json({ + success: true, + errors: [], + messages: [], + result: { + queue_id: "fake-queue-id", + }, + }) + ); + } + ) + ); + return requests; + } + function mockPostRequest( + expectedQueueId: string, + expectedBody: PostTypedConsumerBody + ) { + const requests = { count: 0 }; + msw.use( + rest.post( + "*/accounts/:accountId/queues/:queueId/consumers", + async (request, response, context) => { + requests.count += 1; + expect(request.params.queueId).toEqual(expectedQueueId); + expect(request.params.accountId).toEqual("some-account-id"); + expect(await request.json()).toEqual(expectedBody); + return response.once( + context.json({ + success: true, + errors: [], + messages: [], + result: {}, + }) + ); + } + ) + ); + return requests; + } + + it("should show the correct help text", async () => { + await runWrangler("queues consumer http add --help"); + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + "wrangler queues consumer http add + + Add a Queue HTTP Pull Consumer + + Positionals: + queue-name Name of the queue for the consumer [string] [required] + + Flags: + -j, --experimental-json-config Experimental: Support wrangler.json [boolean] + -c, --config Path to .toml configuration file [string] + -e, --env Environment to use for operations and .env files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean] + + Options: + --batch-size Maximum number of messages per batch [number] + --message-retries Maximum number of retries for each message [number] + --dead-letter-queue Queue to send messages that failed to be consumed [string] + --visibility-timeout-secs The number of seconds a message will wait for an acknowledgement before being returned to the queue. [number] + --retry-delay-secs The number of seconds to wait before retrying a message [number]" + `); + }); + + it("should add a consumer using defaults", async () => { + const expectedBody: PostTypedConsumerBody = { + type: "http_pull", + settings: { + batch_size: undefined, + max_retries: undefined, + visibility_timeout_ms: undefined, + retry_delay: undefined, + }, + dead_letter_queue: undefined, + }; + mockPostRequest("fake-queue-id", expectedBody); + mockGetQueueRequest("testQueue"); + + await runWrangler("queues consumer http add testQueue"); + expect(std.out).toMatchInlineSnapshot(` + "Adding consumer to queue testQueue. + Added consumer to queue testQueue." + `); + }); + + it("should add a consumer using custom values", async () => { + const expectedBody: PostTypedConsumerBody = { + type: "http_pull", + settings: { + batch_size: 20, + max_retries: 3, + visibility_timeout_ms: 6000, + retry_delay: 3, + }, + dead_letter_queue: "myDLQ", + }; + mockPostRequest("fake-queue-id", expectedBody); + mockGetQueueRequest("testQueue"); + + await runWrangler( + "queues consumer http add testQueue --batch-size 20 --message-retries 3 --visibility-timeout-secs 6 --retry-delay-secs 3 --dead-letter-queue myDLQ" + ); + expect(std.out).toMatchInlineSnapshot(` + "Adding consumer to queue testQueue. + Added consumer to queue testQueue." + `); + }); + }); + + describe("delete", () => { + function mockGetQueueRequest(expectedQueueName: string) { + const requests = { count: 0 }; + msw.use( + rest.get( + "*/accounts/:accountId/workers/queues/:queueName", + async (request, response, context) => { + requests.count += 1; + expect(request.params.queueName).toEqual(expectedQueueName); + expect(request.params.accountId).toEqual("some-account-id"); + return response.once( + context.json({ + success: true, + errors: [], + messages: [], + result: { + queue_id: "fake-queue-id", + consumers: [ + { consumer_id: "fake-consumer-id", type: "http_pull" }, + ], + }, + }) + ); + } + ) + ); + return requests; + } + function mockDeleteRequest( + expectedQueueId: string, + expectedConsumerId: string + ) { + const requests = { count: 0 }; + const resource = `accounts/:accountId/queues/:expectedQueueId/consumers/:expectedConsumerId`; + msw.use( + rest.delete(`*/${resource}`, async (request, response, context) => { + requests.count++; + expect(request.params.accountId).toBe("some-account-id"); + expect(request.params.expectedQueueId).toBe(expectedQueueId); + expect(request.params.expectedConsumerId).toBe( + expectedConsumerId + ); + return response.once( + context.status(200), + context.json({ + success: true, + errors: [], + messages: [], + result: {}, + }) + ); + }) + ); + + return requests; + } + + it("should show the correct help text", async () => { + await runWrangler("queues consumer http remove --help"); + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + "wrangler queues consumer http remove + + Remove a Queue HTTP Pull Consumer + + Positionals: + queue-name Name of the queue for the consumer [string] [required] + + Flags: + -j, --experimental-json-config Experimental: Support wrangler.json [boolean] + -c, --config Path to .toml configuration file [string] + -e, --env Environment to use for operations and .env files [string] + -h, --help Show help [boolean] + -v, --version Show version number [boolean]" + `); + }); + + it("should delete a pull consumer", async () => { + mockGetQueueRequest("testQueue"); + const requests = mockDeleteRequest( + "fake-queue-id", + "fake-consumer-id" + ); + await runWrangler("queues consumer http remove testQueue"); + + expect(requests.count).toEqual(1); + expect(std.out).toMatchInlineSnapshot(` + "Removing consumer from queue testQueue. + Removed consumer from queue testQueue." + `); + }); + }); + }); }); }); diff --git a/packages/wrangler/src/config/environment.ts b/packages/wrangler/src/config/environment.ts index dbf977e80a40..3839917a5798 100644 --- a/packages/wrangler/src/config/environment.ts +++ b/packages/wrangler/src/config/environment.ts @@ -438,9 +438,12 @@ export interface EnvironmentNonInheritable { /** Consumer configuration */ consumers?: { - /** The name of the queue from which this script should consume. */ + /** The name of the queue from which this consumer should consume. */ queue: string; + /** The consumer type, e.g., worker, http-pull, r2-bucket, etc. Default is worker. */ + type?: string; + /** The maximum number of messages per batch */ max_batch_size?: number; @@ -455,6 +458,12 @@ export interface EnvironmentNonInheritable { /** The maximum number of concurrent consumer Worker invocations. Leaving this unset will allow your consumer to scale to the maximum concurrency needed to keep up with the message backlog. */ max_concurrency?: number | null; + + /** The number of milliseconds to wait for pulled messages to become visible again */ + visibility_timeout_ms?: number; + + /** The number of seconds to wait before retrying a message */ + retry_delay?: number; }[]; }; diff --git a/packages/wrangler/src/config/validation.ts b/packages/wrangler/src/config/validation.ts index 1246e51d2192..d65b8c9cf919 100644 --- a/packages/wrangler/src/config/validation.ts +++ b/packages/wrangler/src/config/validation.ts @@ -2902,11 +2902,14 @@ const validateConsumer: ValidatorFn = (diagnostics, field, value, _config) => { if ( !validateAdditionalProperties(diagnostics, field, Object.keys(value), [ "queue", + "type", "max_batch_size", "max_batch_timeout", "max_retries", "dead_letter_queue", "max_concurrency", + "visibility_timeout_ms", + "retry_delay", ]) ) { isValid = false; @@ -2924,11 +2927,14 @@ const validateConsumer: ValidatorFn = (diagnostics, field, value, _config) => { key: string; type: "number" | "string" | "boolean"; }[] = [ + { key: "type", type: "string" }, { key: "max_batch_size", type: "number" }, { key: "max_batch_timeout", type: "number" }, { key: "max_retries", type: "number" }, { key: "dead_letter_queue", type: "string" }, { key: "max_concurrency", type: "number" }, + { key: "visibility_timeout_ms", type: "number" }, + { key: "retry_delay", type: "number" }, ]; for (const optionalOpt of options) { if (!isOptionalProperty(value, optionalOpt.key, optionalOpt.type)) { diff --git a/packages/wrangler/src/deploy/deploy.ts b/packages/wrangler/src/deploy/deploy.ts index 1585c8c92f67..f6fa34e61eaa 100644 --- a/packages/wrangler/src/deploy/deploy.ts +++ b/packages/wrangler/src/deploy/deploy.ts @@ -29,7 +29,12 @@ import { getMetricsUsageHeaders } from "../metrics"; import { isNavigatorDefined } from "../navigator-user-agent"; import { APIError, ParseError } from "../parse"; import { getWranglerTmpDir } from "../paths"; -import { getQueue, putConsumer } from "../queues/client"; +import { + getQueue, + postTypedConsumer, + putConsumer, + putTypedConsumer, +} from "../queues/client"; import { getWorkersDevSubdomain } from "../routes"; import { syncAssets } from "../sites"; import { @@ -48,7 +53,7 @@ import type { } from "../config/environment"; import type { Entry } from "../deployment-bundle/entry"; import type { CfPlacement, CfWorkerInit } from "../deployment-bundle/worker"; -import type { PutConsumerBody } from "../queues/client"; +import type { PostTypedConsumerBody, PutConsumerBody } from "../queues/client"; import type { AssetPaths } from "../sites"; import type { RetrieveSourceMapFunction } from "../sourcemap"; @@ -935,7 +940,9 @@ See https://developers.cloudflare.com/workers/platform/compatibility-dates for m } if (config.queues.consumers && config.queues.consumers.length) { - deployments.push(...updateQueueConsumers(config)); + const updateConsumers = await updateQueueConsumers(config); + + deployments.push(...updateConsumers); } const targets = await Promise.all(deployments); @@ -1162,31 +1169,82 @@ async function ensureQueuesExist(config: Config) { } } -function updateQueueConsumers(config: Config): Promise[] { +async function updateQueueConsumers( + config: Config +): Promise[]> { const consumers = config.queues.consumers || []; - return consumers.map((consumer) => { - const body: PutConsumerBody = { - dead_letter_queue: consumer.dead_letter_queue, - settings: { - batch_size: consumer.max_batch_size, - max_retries: consumer.max_retries, - max_wait_time_ms: consumer.max_batch_timeout - ? 1000 * consumer.max_batch_timeout - : undefined, - max_concurrency: consumer.max_concurrency, - }, - }; + const updateConsumers: Promise[] = []; + for (const consumer of consumers) { + if (consumer.type === "http_pull") { + const queue = await getQueue(config, consumer.queue); + const existingConsumer = queue.consumers && queue.consumers[0]; + if (existingConsumer) { + const body: PostTypedConsumerBody = { + type: consumer.type, + dead_letter_queue: consumer.dead_letter_queue, + settings: { + batch_size: consumer.max_batch_size, + max_retries: consumer.max_retries, + visibility_timeout_ms: consumer.visibility_timeout_ms, + retry_delay: consumer.retry_delay, + }, + }; + updateConsumers.push( + putTypedConsumer( + config, + queue.queue_id, + existingConsumer.consumer_id, + body + ).then(() => [`Consumer for ${consumer.queue}`]) + ); + continue; + } - if (config.name === undefined) { - // TODO: how can we reliably get the current script name? - throw new UserError("Script name is required to update queue consumers"); + const body: PostTypedConsumerBody = { + type: consumer.type, + dead_letter_queue: consumer.dead_letter_queue, + settings: { + batch_size: consumer.max_batch_size, + max_retries: consumer.max_retries, + visibility_timeout_ms: consumer.visibility_timeout_ms, + retry_delay: consumer.retry_delay, + }, + }; + updateConsumers.push( + postTypedConsumer(config, consumer.queue, body).then(() => [ + `Consumer for ${consumer.queue}`, + ]) + ); + } else { + const body: PutConsumerBody = { + dead_letter_queue: consumer.dead_letter_queue, + settings: { + batch_size: consumer.max_batch_size, + max_retries: consumer.max_retries, + max_wait_time_ms: consumer.max_batch_timeout + ? 1000 * consumer.max_batch_timeout + : undefined, + max_concurrency: consumer.max_concurrency, + }, + }; + + if (config.name === undefined) { + // TODO: how can we reliably get the current script name? + throw new UserError( + "Script name is required to update queue consumers" + ); + } + const scriptName = config.name; + const envName = undefined; // TODO: script environment for wrangler deploy? + updateConsumers.push( + putConsumer(config, consumer.queue, scriptName, envName, body).then( + () => [`Consumer for ${consumer.queue}`] + ) + ); } - const scriptName = config.name; - const envName = undefined; // TODO: script environment for wrangler deploy? - return putConsumer(config, consumer.queue, scriptName, envName, body).then( - () => [`Consumer for ${consumer.queue}`] - ); - }); + } + + return updateConsumers; } async function noBundleWorker( diff --git a/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts new file mode 100644 index 000000000000..c0be2f0b2521 --- /dev/null +++ b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/add.ts @@ -0,0 +1,70 @@ +import { readConfig } from "../../../../../config"; +import { logger } from "../../../../../logger"; +import { postTypedConsumer } from "../../../../client"; +import type { + CommonYargsArgv, + StrictYargsOptionsToInterface, +} from "../../../../../yargs-types"; +import type { PostTypedConsumerBody } from "../../../../client"; +import { CommandLineArgsError } from "../../../../../index"; + +export function options(yargs: CommonYargsArgv) { + return yargs + .positional("queue-name", { + type: "string", + demandOption: true, + description: "Name of the queue for the consumer", + }) + .options({ + "batch-size": { + type: "number", + describe: "Maximum number of messages per batch", + }, + "message-retries": { + type: "number", + describe: "Maximum number of retries for each message", + }, + "dead-letter-queue": { + type: "string", + describe: "Queue to send messages that failed to be consumed", + }, + "visibility-timeout-secs": { + type: "number", + describe: + "The number of seconds a message will wait for an acknowledgement before being returned to the queue.", + }, + "retry-delay-secs": { + type: "number", + describe: "The number of seconds to wait before retrying a message", + }, + }); +} + +export async function handler( + args: StrictYargsOptionsToInterface +) { + const config = readConfig(args.config, args); + + if (Array.isArray(args.retryDelaySecs)) { + throw new CommandLineArgsError( + `Cannot specify --retry-delay multiple times` + ); + } + + const postTypedConsumerBody: PostTypedConsumerBody = { + type: "http_pull", + settings: { + batch_size: args.batchSize, + max_retries: args.messageRetries, + visibility_timeout_ms: args.visibilityTimeoutSecs + ? args.visibilityTimeoutSecs * 1000 + : undefined, + retry_delay: args.retryDelaySecs, + }, + dead_letter_queue: args.deadLetterQueue, + }; + logger.log(`Adding consumer to queue ${args.queueName}.`); + + await postTypedConsumer(config, args.queueName, postTypedConsumerBody); + logger.log(`Added consumer to queue ${args.queueName}.`); +} diff --git a/packages/wrangler/src/queues/cli/commands/consumer/http-pull/index.ts b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/index.ts new file mode 100644 index 000000000000..ac99ddd06ef2 --- /dev/null +++ b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/index.ts @@ -0,0 +1,25 @@ +import { + handler as addHTTPConsumerHandler, + options as addOptions, +} from "./add"; +import { + handler as removeHTTPConsumerHandler, + options as removeOptions, +} from "./remove"; +import type { CommonYargsArgv } from "../../../../../yargs-types"; + +export function pullConsumers(yargs: CommonYargsArgv) { + yargs.command( + "add ", + "Add a Queue HTTP Pull Consumer", + addOptions, + addHTTPConsumerHandler + ); + + yargs.command( + "remove ", + "Remove a Queue HTTP Pull Consumer", + removeOptions, + removeHTTPConsumerHandler + ); +} diff --git a/packages/wrangler/src/queues/cli/commands/consumer/http-pull/remove.ts b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/remove.ts new file mode 100644 index 000000000000..bc3d10df641b --- /dev/null +++ b/packages/wrangler/src/queues/cli/commands/consumer/http-pull/remove.ts @@ -0,0 +1,26 @@ +import { readConfig } from "../../../../../config"; +import { logger } from "../../../../../logger"; +import { deletePullConsumer } from "../../../../client"; +import type { + CommonYargsArgv, + StrictYargsOptionsToInterface, +} from "../../../../../yargs-types"; + +export function options(yargs: CommonYargsArgv) { + return yargs.positional("queue-name", { + type: "string", + demandOption: true, + description: "Name of the queue for the consumer", + }); +} + +export async function handler( + args: StrictYargsOptionsToInterface +) { + const config = readConfig(args.config, args); + + logger.log(`Removing consumer from queue ${args.queueName}.`); + await deletePullConsumer(config, args.queueName); + + logger.log(`Removed consumer from queue ${args.queueName}.`); +} diff --git a/packages/wrangler/src/queues/cli/commands/consumer/index.ts b/packages/wrangler/src/queues/cli/commands/consumer/index.ts index 0c6a117041ca..51ab5f87c721 100644 --- a/packages/wrangler/src/queues/cli/commands/consumer/index.ts +++ b/packages/wrangler/src/queues/cli/commands/consumer/index.ts @@ -1,19 +1,40 @@ -import { handler as addHandler, options as addOptions } from "./add"; -import { handler as removeHandler, options as removeOptions } from "./remove"; +import { pullConsumers } from "./http-pull"; +import { workerConsumers } from "./worker"; +import { handler as addHandler, options as addOptions } from "./worker/add"; +import { + handler as removeHandler, + options as removeOptions, +} from "./worker/remove"; import type { CommonYargsArgv } from "../../../../yargs-types"; export function consumers(yargs: CommonYargsArgv) { yargs.command( "add ", - "Add a Queue Consumer", + "Add a Queue Worker Consumer", addOptions, addHandler ); yargs.command( "remove ", - "Remove a Queue Consumer", + "Remove a Queue Worker Consumer", removeOptions, removeHandler ); + + yargs.command( + "http", + "Configure Queue HTTP Pull Consumers", + async (consumersYargs) => { + await pullConsumers(consumersYargs); + } + ); + + yargs.command( + "worker", + "Configure Queue Worker Consumers", + async (consumersYargs) => { + await workerConsumers(consumersYargs); + } + ); } diff --git a/packages/wrangler/src/queues/cli/commands/consumer/add.ts b/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts similarity index 62% rename from packages/wrangler/src/queues/cli/commands/consumer/add.ts rename to packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts index a9cb4ffb0e35..90afd68ffad0 100644 --- a/packages/wrangler/src/queues/cli/commands/consumer/add.ts +++ b/packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts @@ -1,13 +1,12 @@ -import { readConfig } from "../../../../config"; -import { CommandLineArgsError } from "../../../../index"; -import { logger } from "../../../../logger"; -import { postConsumer } from "../../../client"; -import { handleFetchError } from "../../../utils"; +import { readConfig } from "../../../../../config"; +import { logger } from "../../../../../logger"; +import { postConsumer } from "../../../../client"; import type { CommonYargsArgv, StrictYargsOptionsToInterface, -} from "../../../../yargs-types"; -import type { PostConsumerBody } from "../../../client"; +} from "../../../../../yargs-types"; +import type { PostConsumerBody } from "../../../../client"; +import { CommandLineArgsError } from "../../../../../index"; export function options(yargs: CommonYargsArgv) { return yargs @@ -44,18 +43,24 @@ export function options(yargs: CommonYargsArgv) { describe: "The maximum number of concurrent consumer Worker invocations. Must be a positive integer", }, - "retry-delay": { + "retry-delay-secs": { type: "number", - describe: - "How long a retried message should be delayed for, in seconds. Must be a positive integer", - number: true, + describe: "The number of seconds to wait before retrying a message", }, }); } -function createBody( +export async function handler( args: StrictYargsOptionsToInterface -): PostConsumerBody { +) { + const config = readConfig(args.config, args); + + if (Array.isArray(args.retryDelaySecs)) { + throw new CommandLineArgsError( + `Cannot specify --retry-delay-secs multiple times` + ); + } + const body: PostConsumerBody = { script_name: args.scriptName, // TODO(soon) is this still the correct usage of the environment? @@ -67,34 +72,12 @@ function createBody( ? 1000 * args.batchTimeout : undefined, max_concurrency: args.maxConcurrency, + retry_delay: args.retryDelaySecs, }, dead_letter_queue: args.deadLetterQueue, }; - if (Array.isArray(args.retryDelay)) { - throw new CommandLineArgsError( - `Cannot specify --retry-delay multiple times` - ); - } - - if (args.retryDelay != undefined) { - body.settings.retry_delay = args.retryDelay; - } - - return body; -} - -export async function handler( - args: StrictYargsOptionsToInterface -) { - const config = readConfig(args.config, args); - const body = createBody(args); - - try { - logger.log(`Adding consumer to queue ${args.queueName}.`); - await postConsumer(config, args.queueName, body); - logger.log(`Added consumer to queue ${args.queueName}.`); - } catch (e) { - handleFetchError(e as { code?: number }); - } -} + logger.log(`Adding consumer to queue ${args.queueName}.`); + await postConsumer(config, args.queueName, body); + logger.log(`Added consumer to queue ${args.queueName}.`); +} \ No newline at end of file diff --git a/packages/wrangler/src/queues/cli/commands/consumer/worker/index.ts b/packages/wrangler/src/queues/cli/commands/consumer/worker/index.ts new file mode 100644 index 000000000000..3b54550aaf28 --- /dev/null +++ b/packages/wrangler/src/queues/cli/commands/consumer/worker/index.ts @@ -0,0 +1,19 @@ +import { handler as addHandler, options as addOptions } from "./add"; +import { handler as removeHandler, options as removeOptions } from "./remove"; +import type { CommonYargsArgv } from "../../../../../yargs-types"; + +export function workerConsumers(yargs: CommonYargsArgv) { + yargs.command( + "add ", + "Add a Queue Worker Consumer", + addOptions, + addHandler + ); + + yargs.command( + "remove ", + "Remove a Queue Worker Consumer", + removeOptions, + removeHandler + ); +} diff --git a/packages/wrangler/src/queues/cli/commands/consumer/remove.ts b/packages/wrangler/src/queues/cli/commands/consumer/worker/remove.ts similarity index 79% rename from packages/wrangler/src/queues/cli/commands/consumer/remove.ts rename to packages/wrangler/src/queues/cli/commands/consumer/worker/remove.ts index 4896010c3e51..2ffdef797e4a 100644 --- a/packages/wrangler/src/queues/cli/commands/consumer/remove.ts +++ b/packages/wrangler/src/queues/cli/commands/consumer/worker/remove.ts @@ -1,10 +1,10 @@ -import { readConfig } from "../../../../config"; -import { logger } from "../../../../logger"; -import { deleteConsumer } from "../../../client"; +import { readConfig } from "../../../../../config"; +import { logger } from "../../../../../logger"; +import { deleteConsumer } from "../../../../client"; import type { CommonYargsArgv, StrictYargsOptionsToInterface, -} from "../../../../yargs-types"; +} from "../../../../../yargs-types"; export function options(yargs: CommonYargsArgv) { return yargs diff --git a/packages/wrangler/src/queues/cli/commands/create.ts b/packages/wrangler/src/queues/cli/commands/create.ts index e897fab85154..c51da73bdf96 100644 --- a/packages/wrangler/src/queues/cli/commands/create.ts +++ b/packages/wrangler/src/queues/cli/commands/create.ts @@ -17,7 +17,7 @@ export function options(yargs: CommonYargsArgv) { description: "The name of the queue", }) .options({ - "delivery-delay": { + "delivery-delay-secs": { type: "number", describe: "How long a published message should be delayed for, in seconds. Must be a positive integer", @@ -32,15 +32,15 @@ function createBody( queue_name: args.name, }; - if (Array.isArray(args.deliveryDelay)) { + if (Array.isArray(args.deliveryDelaySecs)) { throw new CommandLineArgsError( - "Cannot specify --delivery-delay multiple times" + "Cannot specify --delivery-delay-secs multiple times" ); } - if (args.deliveryDelay != undefined) { + if (args.deliveryDelaySecs != undefined) { body.settings = { - delivery_delay: args.deliveryDelay, + delivery_delay: args.deliveryDelaySecs, }; } diff --git a/packages/wrangler/src/queues/cli/commands/index.ts b/packages/wrangler/src/queues/cli/commands/index.ts index 0ebb7f082298..13ab2699b2e1 100644 --- a/packages/wrangler/src/queues/cli/commands/index.ts +++ b/packages/wrangler/src/queues/cli/commands/index.ts @@ -1,5 +1,5 @@ import { HandleUnauthorizedError } from "../../utils"; -import { consumers } from "./consumer"; +import { consumers } from "./consumer/index"; import { handler as createHandler, options as createOptions } from "./create"; import { handler as deleteHandler, options as deleteOptions } from "./delete"; import { handler as listHandler, options as listOptions } from "./list"; diff --git a/packages/wrangler/src/queues/client.ts b/packages/wrangler/src/queues/client.ts index c69ddc369164..5887a269d519 100644 --- a/packages/wrangler/src/queues/client.ts +++ b/packages/wrangler/src/queues/client.ts @@ -1,5 +1,6 @@ import { fetchResult } from "../cfetch"; import { type Config } from "../config"; +import { UserError } from "../errors"; import { requireAuth } from "../user"; export async function createQueue( @@ -15,11 +16,11 @@ export async function createQueue( export interface CreateQueueBody { queue_name: string; - settings?: QueueSettings; + settings?: QueueSettings } export interface QueueSettings { - delivery_delay?: number; + delivery_delay: number } export interface ScriptReference { @@ -46,7 +47,6 @@ export interface QueueResponse { producers_total_count: number; consumers: Consumer[]; consumers_total_count: number; - settings: QueueSettings; } export async function deleteQueue( @@ -102,6 +102,50 @@ export async function postConsumer( ); } +export async function postTypedConsumer( + config: Config, + queueName: string, + body: PostTypedConsumerBody +): Promise { + const accountId = await requireAuth(config); + const queue = await getQueue(config, queueName); + return fetchResult( + `/accounts/${accountId}/queues/${queue.queue_id}/consumers`, + { + method: "POST", + body: JSON.stringify(body), + } + ); +} + +export async function putTypedConsumer( + config: Config, + queueId: string, + consumerId: string, + body: PostTypedConsumerBody +): Promise { + const accountId = await requireAuth(config); + + return fetchResult( + `/accounts/${accountId}/queues/${queueId}/consumers/${consumerId}`, + { + method: "PUT", + body: JSON.stringify(body), + } + ); +} + +export interface TypedConsumerResponse extends Consumer { + queue_name: string; + created_on: string; +} + +export interface PostTypedConsumerBody extends PutConsumerBody { + type: string; + script_name?: string; + environment_name?: string; +} + export interface PutConsumerBody { settings: ConsumerSettings; dead_letter_queue?: string; @@ -117,6 +161,7 @@ export interface ConsumerSettings { max_retries?: number; max_wait_time_ms?: number; max_concurrency?: number | null; + visibility_timeout_ms?: number; retry_delay?: number; } @@ -128,6 +173,22 @@ export interface ConsumerResponse extends PostConsumerBody { dead_letter_queue?: string; } +export async function deletePullConsumer( + config: Config, + queueName: string +): Promise { + const accountId = await requireAuth(config); + const queue = await getQueue(config, queueName); + const consumer = queue.consumers[0]; + if (consumer?.type !== "http_pull") { + throw new UserError(`No http_pull consumer exists for queue ${queueName}`); + } + const resource = `/accounts/${accountId}/queues/${queue.queue_id}/consumers/${consumer.consumer_id}`; + return fetchResult(resource, { + method: "DELETE", + }); +} + export async function deleteConsumer( config: Config, queueName: string,