Skip to content

Commit

Permalink
[QUEUES] Delay messages (#13584)
Browse files Browse the repository at this point in the history
* queues: HTTP Pull docs

* queue: changelog

* queues: http pull

* queues: pull consumers

* queues: pull auth

* queues: typo

* queues: fix url

* queues: fix header

* queues: update pull guide

* queues: update pull guide

* queues: add breadcrumb on pull to tutorial

* queues: MORE PULLING

* queues: ack ack ack

* queues: update ack payload schema

* queues: refactor

* queues: short polling

* queues: 1200/5

* queues: update wrangler consumer commands

* queues: Apply suggestions from code review

Co-authored-by: Maddy <130055405+Maddy-Cloudflare@users.noreply.github.com>

* queues: remove examples for now

* queues: fix limits for push vs. pull

* queues: retries

* queues: more docs on delays

* queues: add limit docs for delaySeconds

* queues: Update javascript-apis.md

* queue: changelog

* queues: pull consumers

* queues: fix pull

* queues: fix

* queues: Apply suggestions from code review

* queues: update pull consumer for wrangler

* queues: clarify required wrangler version

* queues: apply suggestions from code review

Co-authored-by: Maddy <130055405+Maddy-Cloudflare@users.noreply.github.com>

---------

Co-authored-by: Maddy <130055405+Maddy-Cloudflare@users.noreply.github.com>
  • Loading branch information
elithrar and Maddy-Cloudflare authored Mar 26, 2024
1 parent a93a215 commit eba825f
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 38 deletions.
21 changes: 11 additions & 10 deletions content/queues/platform/limits.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ Many of these limits will increase during Queues' public beta period. [Follow ou
| Feature | Limit |
| -------------------------------------------------- | --------------------------------------- |
| Queues | 10,000 per account <sup>beta</sup> |
| Maximum message size | 128 KB <sup>1</sup> |
| Maximum message retries | 100 |
| Maximum batch size | 100 messages <sup>beta</sup> |
| Maximum batch wait time | 30 seconds |
| Maximum per-queue message throughput <sup>2</sup> | 400 messages per second <sup>3</sup> |
| Maximum message retention period <sup>4</sup> | 4 days (96 hours) |
| Maximum per-queue backlog size <sup>5</sup> | 25GB |
| Maximum concurrent consumer invocations | 20 <sup>push-based only</sup> |
| Maximum consumer invocation duration | 15 minutes <sup>6</sup> |
| Maximum `visibilityTimeout` (pull-based queues) | 12 hours |
| Message size | 128 KB <sup>1</sup> |
| Message retries | 100 |
| Batch size | 100 messages <sup>beta</sup> |
| Batch wait time | 30 seconds |
| Per-queue message throughput <sup>2</sup> | 400 messages per second <sup>3</sup> |
| Message retention period <sup>4</sup> | 4 days (96 hours) |
| Per-queue backlog size <sup>5</sup> | 25GB |
| Concurrent consumer invocations | 20 <sup>push-based only</sup> |
| Consumer invocation duration | 15 minutes <sup>6</sup> |
| `visibilityTimeout` (pull-based queues) | 12 hours |
| `delaySeconds` (when sending or retrying) | 12 hours |
| Requests to the Queues API (incl. pulls/acks) | [1200 requests / 5 mins](/fundamentals/api/reference/limits/) |

{{</table-wrap>}}
Expand Down
9 changes: 9 additions & 0 deletions content/queues/queues-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
pcx_content_type: navigation
title: Queues REST API
external_link: /api/operations/queue-create-queue
weight: 100
_build:
publishResources: false
render: never
---
107 changes: 92 additions & 15 deletions content/queues/reference/batching-retries.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: Batching and Retries
pcx_content_type: concept
weight: 3
weight: 4
---

# Batching and Retries
Expand Down Expand Up @@ -40,7 +40,7 @@ The following batch-level settings can be configured to adjust how Queues delive

{{</table-wrap>}}

## Explicit acknowledgement
## Explicit acknowledgement and retries

You can acknowledge individual messages with a batch by explicitly acknowledging each message as it is processed. Messages that are explicitly acknowledged will not be re-delivered, even if your queue consumer fails on a subsequent message and/or fails to return successfully when processing a batch.

Expand Down Expand Up @@ -70,7 +70,7 @@ You can also call `retry()` to explicitly force a message to be redelivered in a

```ts
---
header: index.js
header: index.ts
---
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
Expand All @@ -91,37 +91,114 @@ Note that calls to `ack()`, `retry()` and their `ackAll()` / `retryAll` equivale
* If you call `retry()` on a message and then call `ack()`: the `ack()` is ignored. The first method call wins in all cases.
* If you call either `ack()` or `retry()` on a single message, and then either/any of `ackAll()` or `retryAll()` on the batch, the call on the single message takes precedence. That is, the batch-level call does not apply to that message (or messages, if multiple calls were made).

## Retries
## Delivery falure

When a message is failed to be delivered, the default behaviour is to retry delivery three times before marking the delivery as failed. You can set `max_retries` (defaults to 3) when configuring your consumer, but in most cases we recommend leaving this as the default.

When a message is failed to be delivered, the default behaviour is to retry delivery three times before marking the delivery as failed (refer to [Dead Letter Queues](#dead-letter-queues)). You can set `max_retries` (defaults to 3) when configuring your consumer, but in most cases we recommend leaving this as the default.
Messages that reach the configured maximum retries will be deleted from the queue, or if a [dead-letter queue](/queues/reference/dead-letter-queues/) (DLQ) is configured, written to the DLQ instead.

{{<Aside type="note">}}

Each retry counts as an additional read operation per [Queues pricing](/queues/platform/pricing/).

{{</Aside>}}

When a single message within a batch fails to be delivered, the entire batch is retried, unless you have [explicitly acknowledged](#explicit-acknowledgement) a message (or messages) within that batch. For example, if a batch of 10 messages is delivered, but the 8th message fails to be delivered, all 10 messages will be retried and thus redelivered to your consumer in full.
When a single message within a batch fails to be delivered, the entire batch is retried, unless you have [explicitly acknowledged](#explicit-acknowledgement-and-retries) a message (or messages) within that batch. For example, if a batch of 10 messages is delivered, but the 8th message fails to be delivered, all 10 messages will be retried and thus redelivered to your consumer in full.

{{<Aside type="warning" header="Retried messages and consumer concurrency">}}

Retrying messages with `.retry()` or calling `.retryAll()` on a batch will cause the consumer to autoscale down if consumer concurrency is enabled. Refer to [Consumer concurrency](/queues/reference/consumer-concurrency/) to learn more.

{{</Aside>}}

## Dead Letter Queues
## Delay messages

When publishing messages to a queue, or when [marking a messsage or batch for retry](#explicit-acknowledgement-and-retries), you can choose to delay messages from being processed for a period of time.

Delaying messages allows you to defer tasks until later, and/or respond to backpressure when consuming from a queue. For example, if an upstream API you are calling to returns a `HTTP 429: Too Many Requests`, you can delay messages to slow down how quickly you are consuming them before they are re-processed.

{{<Aside type="note">}}

Configuring delivery and retry delays via the `wrangler` CLI requires `wrangler` version `3.38.0` or greater. Use `npx wrangler@latest` to always use the latest version of `wrangler`.

{{</Aside>}}

### Delay on send

To delay a message or batch of messages when sending to a queue, you can provide a `delaySeconds` parameter when sending a message.

```ts
// Delay a singular message by 600 seconds (10 minutes)
await env.YOUR_QUEUE.send(message, { delaySeconds: 600 })

// Delay a batch of messages by 300 seconds (5 minutes)
await env.YOUR_QUEUE.sendBatch(messages, { delaySeconds: 300 })

// Do not delay this message.
// If there is a global delay configured on the queue, ignore it.
await env.YOUR_QUEUE.sendBatch(messages, { delaySeconds: 0 })
```

You can also configure a default, global delay on a per-queue basis by passing `--delivery-delay-secs` when creating a queue via the `wrangler` CLI:

```sh
# Delay all messages by 5 minutes as a default
$ npx wrangler queues create $QUEUE_NAME --delivery-delay-secs=300
```

### Delay on retry

When [consuming messages from a queue](/queues/reference/how-queues-works/#consumers), you can choose to [explicitly mark messages to be retried](#explicit-acknowledgement-and-retries). Messages can be retried and delayed individually, or as an entire batch.

To delay an individual message within a batch:

A Dead Letter Queue (DLQ) is a common concept in a messaging system, and represents where messages are sent when a delivery failure occurs with a consumer after `max_retries` is reached. A Dead Letter Queue is just like any other queue, and can be produced to and consumed from independently.
```ts
---
header: index.ts
---
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
for (const msg of batch.messages) {
// Mark for retry and delay a singular message
// by 3600 seconds (1 hour)
msg.retry({delaySeconds: 3600})

}
},
};
```

With Cloudflare Queues, a Dead Letter Queue is configured as part of your consumer. For example, the following consumer configuration would send messages to our DLQ named `"my-other-queue"` after retrying delivery (by default, 3 times):
To delay a batch of messages:

```toml
```ts
---
filename: wrangler.toml
header: index.ts
---
[[queues.consumers]]
queue = "my-queue"
dead_letter_queue = "my-other-queue"
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
// Mark for retry and delay a batch of messages
// by 600 seconds (10 minutes)
batch.retryAll({ delaySeconds: 600 })
},
};
```

To process messages placed on your DLQ, you need to set up a consumer associated with that queue. Messages delivered to a DLQ without an active consumer will persist for four (4) days before being deleted from the queue.
You can also choose to set a default retry delay to any messages that are retried due to either implicit failure or when calling `retry()` explicitly. This is set at the consumer level, and is supported in both push-based (Worker) and pull-based (HTTP) consumers:

```sh
# Push-based consumers
# Delay any messages that are retried by 60 seconds (1 minute) by default.
$ npx wrangler@latest queues consumer worker add $QUEUE_NAME $WORKER_SCRIPT_NAME --retry-delay-secs=60

# Pull-based consumers
# Delay any messages that are retried by 60 seconds (1 minute) by default.
$ npx wrangler@latest queues consumer http add $QUEUE_NAME --retry-delay-secs=60
```

Refer to the [Queues REST API documentation](/api/operations/queue-list-queue-consumers) to learn how to configure message delays and retry delays programmatically.

## Related

* Review the [JavaScript API](/queues/reference/javascript-apis/) documentation for Queues.
* Learn more about [How Queues Works](/queues/reference/how-queues-works/).
* Understand the [metrics available](/queues/reference/metrics/) for your queues, including backlog and delayed message counts.
2 changes: 1 addition & 1 deletion content/queues/reference/configuration.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
pcx_content_type: reference
title: Configuration
weight: 7
weight: 3
meta:
title: Cloudflare Queues - Configuration
---
Expand Down
2 changes: 1 addition & 1 deletion content/queues/reference/consumer-concurrency.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: Consumer concurrency
pcx_content_type: concept
weight: 4
weight: 9
---

# Consumer concurrency
Expand Down
32 changes: 32 additions & 0 deletions content/queues/reference/dead-letter-queues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
title: Dead Letter Queues
pcx_content_type: concept
weight: 5
---

# Dead Letter Queues

A Dead Letter Queue (DLQ) is a common concept in a messaging system, and represents where messages are sent when a delivery failure occurs with a consumer after `max_retries` is reached. A Dead Letter Queue is like any other queue, and can be produced to and consumed from independently.

With Cloudflare Queues, a Dead Letter Queue is defined within your [consumer configuration](/queues/reference/configuration/). Messages are delivered to the DLQ when they reach the configured retry limit for the consumer. Without a DLQ configured, messages that reach the retry limit are deleted permanently.

For example, the following consumer configuration would send messages to our DLQ named `"my-other-queue"` after retrying delivery (by default, 3 times):

```toml
---
filename: wrangler.toml
---
[[queues.consumers]]
queue = "my-queue"
dead_letter_queue = "my-other-queue"
```

You can also configure a DLQ when creating a consumer from the command-line using `wrangler`:

```sh
$ wrangler queues consumer add $QUEUE_NAME $SCRIPT_NAME --dead-letter-queue=$NAME_OF_OTHER_QUEUE
```

To process messages placed on your DLQ, you need to [configure a consumer](/queues/reference/configuration/) for that queue as you would with any other queue.

Messages delivered to a DLQ without an active consumer will persist for four (4) days before being deleted from the queue.
60 changes: 50 additions & 10 deletions content/queues/reference/javascript-apis.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
pcx_content_type: reference
title: JavaScript APIs
weight: 8
weight: 5
meta:
title: Cloudflare Queues - JavaScript APIs
---
Expand Down Expand Up @@ -55,8 +55,8 @@ A binding that allows a producer to send messages to a Queue.

```ts
interface Queue<Body = unknown> {
send(body: Body, options?: { contentType?: QueuesContentType }): Promise<void>;
sendBatch(messages: Iterable<MessageSendRequest<Body>>): Promise<void>;
send(body: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;
}
```

Expand All @@ -81,7 +81,7 @@ A wrapper type used for sending message batches.
```ts
type MessageSendRequest<Body = unknown> = {
body: Body;
contentType?: QueuesContentType;
options?: QueueSendOptions;
};
```

Expand All @@ -92,13 +92,36 @@ type MessageSendRequest<Body = unknown> = {
- The body of the message.
- The body can be any type supported by the [structured clone algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#supported_types), as long as its size is less than 128 KB.

- {{<code>}}options{{<param-type>}}QueueSendOptions{{</param-type>}}{{</code>}}

- Options to apply to the current message, including content type and message delay settings.


{{</definitions>}}

### `QueueSendOptions`

Optional configuration that applies when sending a message to a queue.

- {{<code>}}contentType{{<param-type>}}QueuesContentType{{</param-type>}}{{</code>}}

- The explicit content type of a message so it can be previewed correctly with the [List messages from the dashboard](/queues/examples/list-messages-from-dash/) feature. Optional argument.
- As of now, this option is for internal use. In the future, `contentType` will be used by alternative consumer types to explicitly mark messages as serialized so they can be consumed in the desired type.
- See [QueuesContentType](#queuescontenttype) for possible values.

{{</definitions>}}
- {{<code>}}delaySeconds{{<param-type>}}number{{</param-type>}}{{</code>}}

- The number of seconds to [delay a message](/queues/reference/batching-retries/) for within the queue, before it can be delivered to a consumer.
- Must be an integer between 0 and 43200 (12 hours). Setting this value to zero will explicitly prevent the message from being delayed, even if there is a global (default) delay at the queue level.

### `QueueSendBatchOptions`

Optional configuration that applies when sending a batch of messages to a queue.

- {{<code>}}delaySeconds{{<param-type>}}number{{</param-type>}}{{</code>}}

- The number of seconds to [delay messages](/queues/reference/batching-retries/) for within the queue, before it can be delivered to a consumer.
- Must be a positive integer.

### `QueuesContentType`

Expand Down Expand Up @@ -158,7 +181,7 @@ export default {

The `env` and `ctx` fields are as [documented in the Workers documentation](/workers/reference/migrate-to-module-workers/).

Or alternatively, a queue consumer can be written using service worker syntax:
Or alternatively, a queue consumer can be written using the (deprecated) service worker syntax:

```js
addEventListener('queue', (event) => {
Expand All @@ -183,7 +206,7 @@ interface MessageBatch<Body = unknown> {
readonly queue: string;
readonly messages: Message<Body>[];
ackAll(): void;
retryAll(): void;
retryAll(options?: QueueRetryOptions): void;
}
```

Expand All @@ -201,9 +224,10 @@ interface MessageBatch<Body = unknown> {

- Marks every message as successfully delivered, regardless of whether your `queue()` consumer handler returns successfully or not.

- {{<code>}}retryAll(){{</code>}} {{<type>}}void{{</type>}}
- {{<code>}}retryAll(options?: QueueRetryOptions){{</code>}} {{<type>}}void{{</type>}}

- Marks every message to be retried in the next batch.
- Supports an optional `options` object.

{{</definitions>}}

Expand All @@ -217,7 +241,7 @@ interface Message<Body = unknown> {
readonly timestamp: Date;
readonly body: Body;
ack(): void;
retry(): void;
retry(options?: QueueRetryOption): void;
}
```

Expand All @@ -240,8 +264,24 @@ interface Message<Body = unknown> {

- Marks a message as successfully delivered, regardless of whether your `queue()` consumer handler returns successfully or not.

- {{<code>}}retry(){{</code>}} {{<type>}}void{{</type>}}
- {{<code>}}retry(options?: QueueRetryOptions){{</code>}} {{<type>}}void{{</type>}}

- Marks a message to be retried in the next batch.
- Supports an optional `options` object.

{{</definitions>}}

### `QueueRetryOptions`

Optional configuration when marking a message or a batch of messages for retry.

```ts
declare interface QueueRetryOptions {
delaySeconds?: number;
}
```

- {{<code>}}delaySeconds{{<param-type>}}number{{</param-type>}}{{</code>}}

- The number of seconds to [delay a message](/queues/reference/batching-retries/) for within the queue, before it can be delivered to a consumer.
- Must be a positive integer.
2 changes: 1 addition & 1 deletion content/queues/reference/pull-consumers.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
pcx_content_type: reference
title: Pull consumers
weight: 15
weight: 6
meta:
title: Cloudflare Queues - Pull consumers
---
Expand Down
6 changes: 6 additions & 0 deletions data/changelogs/queues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
link: "/queues/platform/changelog/"
productName: Queues
entries:
- publish_date: '2024-03-27'
title: Delay messages published to a queue
description: |-
Messages published to a queue and/or marked for retry from a queue consumer can now be explicitly delayed. Delaying messages allows you to defer tasks until later, and/or respond to backpressure when consuming from a queue.
Refer to [Batching and Retries](/queues/reference/batching-retries/) to learn how to delay messages written to a queue.
- publish_date: '2024-03-25'
title: Support for pull-based consumers
description: |-
Expand Down

0 comments on commit eba825f

Please sign in to comment.