Skip to content

Commit

Permalink
Updated commands to support HTTP Pull consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
pmiguel committed Mar 21, 2024
1 parent 4926222 commit e12cb0f
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .changeset/warm-seahorses-trade.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
"wrangler": minor
---

feature: add support for queue delivery controls on `wrangler queues create` and `wrangler queues consumer add`
feature: add support for queue delivery controls on `wrangler queues create`
16 changes: 8 additions & 8 deletions packages/wrangler/src/__tests__/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ describe("wrangler", () => {
-v, --version Show version number [boolean]
Options:
--delivery-delay How long a published message should be delayed for, in seconds. Must be a positive integer [number]"
--delivery-delay-secs How long a published message should be delayed for, in seconds. Must be a positive integer [number]"
`);
});

Expand Down Expand Up @@ -274,7 +274,7 @@ describe("wrangler", () => {

it("should send queue settings with delivery delay", async () => {
const requests = mockCreateRequest("testQueue", { delivery_delay: 10 });
await runWrangler("queues create testQueue --delivery-delay=10");
await runWrangler("queues create testQueue --delivery-delay-secs=10");
expect(std.out).toMatchInlineSnapshot(`
"Creating queue testQueue.
Created queue testQueue."
Expand All @@ -287,10 +287,10 @@ describe("wrangler", () => {

await expect(
runWrangler(
"queues create testQueue --delivery-delay=5 --delivery-delay=10"
"queues create testQueue --delivery-delay-secs=5 --delivery-delay-secs=10"
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Cannot specify --delivery-delay multiple times"`
`"Cannot specify --delivery-delay-secs multiple times"`
);

expect(requests.count).toEqual(0);
Expand Down Expand Up @@ -430,7 +430,7 @@ describe("wrangler", () => {
--message-retries Maximum number of retries for each message [number]
--dead-letter-queue Queue to send messages that failed to be consumed [string]
--max-concurrency The maximum number of concurrent consumer Worker invocations. Must be a positive integer [number]
--retry-delay How long a retried message should be delayed for, in seconds. Must be a positive integer [number]"
--retry-delay-secs The number of seconds to wait before retrying a message [number]"
`);
});

Expand Down Expand Up @@ -471,7 +471,7 @@ describe("wrangler", () => {
mockPostRequest("testQueue", expectedBody);

await runWrangler(
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay=10"
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=10"
);
expect(std.out).toMatchInlineSnapshot(`
"Adding consumer to queue testQueue.
Expand All @@ -496,10 +496,10 @@ describe("wrangler", () => {

await expect(
runWrangler(
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay=5 --retry-delay=10"
"queues consumer add testQueue testScript --env myEnv --batch-size 20 --batch-timeout 10 --message-retries 3 --max-concurrency 3 --dead-letter-queue myDLQ --retry-delay-secs=5 --retry-delay-secs=10"
)
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Cannot specify --retry-delay multiple times"`
`"Cannot specify --retry-delay-secs multiple times"`
);

expect(requests.count).toEqual(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
StrictYargsOptionsToInterface,
} from "../../../../../yargs-types";
import type { PostTypedConsumerBody } from "../../../../client";
import { CommandLineArgsError } from "../../../../../index";

export function options(yargs: CommonYargsArgv) {
return yargs
Expand Down Expand Up @@ -44,6 +45,12 @@ export async function handler(
) {
const config = readConfig(args.config, args);

if (Array.isArray(args.retryDelaySecs)) {
throw new CommandLineArgsError(
`Cannot specify --retry-delay-secs multiple times`
);
}

const postTypedConsumerBody: PostTypedConsumerBody = {
type: "http_pull",
settings: {
Expand Down
63 changes: 23 additions & 40 deletions packages/wrangler/src/queues/cli/commands/consumer/worker/add.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { readConfig } from "../../../../config";
import { CommandLineArgsError } from "../../../../index";
import { logger } from "../../../../logger";
import { postConsumer } from "../../../client";
import { handleFetchError } from "../../../utils";
import { readConfig } from "../../../../../config";
import { logger } from "../../../../../logger";
import { postConsumer } from "../../../../client";
import type {
CommonYargsArgv,
StrictYargsOptionsToInterface,
} from "../../../../yargs-types";
import type { PostConsumerBody } from "../../../client";
} from "../../../../../yargs-types";
import type { PostConsumerBody } from "../../../../client";
import { CommandLineArgsError } from "../../../../../index";

export function options(yargs: CommonYargsArgv) {
return yargs
Expand Down Expand Up @@ -44,18 +43,24 @@ export function options(yargs: CommonYargsArgv) {
describe:
"The maximum number of concurrent consumer Worker invocations. Must be a positive integer",
},
"retry-delay": {
"retry-delay-secs": {
type: "number",
describe:
"How long a retried message should be delayed for, in seconds. Must be a positive integer",
number: true,
describe: "The number of seconds to wait before retrying a message",
},
});
}

function createBody(
export async function handler(
args: StrictYargsOptionsToInterface<typeof options>
): PostConsumerBody {
) {
const config = readConfig(args.config, args);

if (Array.isArray(args.retryDelaySecs)) {
throw new CommandLineArgsError(
`Cannot specify --retry-delay-secs multiple times`
);
}

const body: PostConsumerBody = {
script_name: args.scriptName,
// TODO(soon) is this still the correct usage of the environment?
Expand All @@ -67,34 +72,12 @@ function createBody(
? 1000 * args.batchTimeout
: undefined,
max_concurrency: args.maxConcurrency,
retry_delay: args.retryDelaySecs,
},
dead_letter_queue: args.deadLetterQueue,
};

if (Array.isArray(args.retryDelay)) {
throw new CommandLineArgsError(
`Cannot specify --retry-delay multiple times`
);
}

if (args.retryDelay != undefined) {
body.settings.retry_delay = args.retryDelay;
}

return body;
}

export async function handler(
args: StrictYargsOptionsToInterface<typeof options>
) {
const config = readConfig(args.config, args);
const body = createBody(args);

try {
logger.log(`Adding consumer to queue ${args.queueName}.`);
await postConsumer(config, args.queueName, body);
logger.log(`Added consumer to queue ${args.queueName}.`);
} catch (e) {
handleFetchError(e as { code?: number });
}
}
logger.log(`Adding consumer to queue ${args.queueName}.`);
await postConsumer(config, args.queueName, body);
logger.log(`Added consumer to queue ${args.queueName}.`);
}
10 changes: 5 additions & 5 deletions packages/wrangler/src/queues/cli/commands/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export function options(yargs: CommonYargsArgv) {
description: "The name of the queue",
})
.options({
"delivery-delay": {
"delivery-delay-secs": {
type: "number",
describe:
"How long a published message should be delayed for, in seconds. Must be a positive integer",
Expand All @@ -32,15 +32,15 @@ function createBody(
queue_name: args.name,
};

if (Array.isArray(args.deliveryDelay)) {
if (Array.isArray(args.deliveryDelaySecs)) {
throw new CommandLineArgsError(
"Cannot specify --delivery-delay multiple times"
"Cannot specify --delivery-delay-secs multiple times"
);
}

if (args.deliveryDelay != undefined) {
if (args.deliveryDelaySecs != undefined) {
body.settings = {
delivery_delay: args.deliveryDelay,
delivery_delay: args.deliveryDelaySecs,
};
}

Expand Down
68 changes: 65 additions & 3 deletions packages/wrangler/src/queues/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { fetchResult } from "../cfetch";
import { type Config } from "../config";
import { UserError } from "../errors";
import { requireAuth } from "../user";

export async function createQueue(
Expand All @@ -15,11 +16,11 @@ export async function createQueue(

export interface CreateQueueBody {
queue_name: string;
settings?: QueueSettings;
settings?: QueueSettings
}

export interface QueueSettings {
delivery_delay?: number;
delivery_delay: number
}

export interface ScriptReference {
Expand All @@ -46,7 +47,7 @@ export interface QueueResponse {
producers_total_count: number;
consumers: Consumer[];
consumers_total_count: number;
settings: QueueSettings;
settings?: QueueSettings
}

export async function deleteQueue(
Expand Down Expand Up @@ -102,6 +103,50 @@ export async function postConsumer(
);
}

export async function postTypedConsumer(
config: Config,
queueName: string,
body: PostTypedConsumerBody
): Promise<TypedConsumerResponse> {
const accountId = await requireAuth(config);
const queue = await getQueue(config, queueName);
return fetchResult(
`/accounts/${accountId}/queues/${queue.queue_id}/consumers`,
{
method: "POST",
body: JSON.stringify(body),
}
);
}

export async function putTypedConsumer(
config: Config,
queueId: string,
consumerId: string,
body: PostTypedConsumerBody
): Promise<TypedConsumerResponse> {
const accountId = await requireAuth(config);

return fetchResult(
`/accounts/${accountId}/queues/${queueId}/consumers/${consumerId}`,
{
method: "PUT",
body: JSON.stringify(body),
}
);
}

export interface TypedConsumerResponse extends Consumer {
queue_name: string;
created_on: string;
}

export interface PostTypedConsumerBody extends PutConsumerBody {
type: string;
script_name?: string;
environment_name?: string;
}

export interface PutConsumerBody {
settings: ConsumerSettings;
dead_letter_queue?: string;
Expand All @@ -117,6 +162,7 @@ export interface ConsumerSettings {
max_retries?: number;
max_wait_time_ms?: number;
max_concurrency?: number | null;
visibility_timeout_ms?: number;
retry_delay?: number;
}

Expand All @@ -128,6 +174,22 @@ export interface ConsumerResponse extends PostConsumerBody {
dead_letter_queue?: string;
}

export async function deletePullConsumer(
config: Config,
queueName: string
): Promise<void> {
const accountId = await requireAuth(config);
const queue = await getQueue(config, queueName);
const consumer = queue.consumers[0];
if (consumer?.type !== "http_pull") {
throw new UserError(`No http_pull consumer exists for queue ${queueName}`);
}
const resource = `/accounts/${accountId}/queues/${queue.queue_id}/consumers/${consumer.consumer_id}`;
return fetchResult(resource, {
method: "DELETE",
});
}

export async function deleteConsumer(
config: Config,
queueName: string,
Expand Down

0 comments on commit e12cb0f

Please sign in to comment.