Skip to content

Commit

Permalink
Add Dead Letter Queue to QueueInputSource (#93)
Browse files Browse the repository at this point in the history
* Add Dead Letter Queue to QueueInputSource

* Addressing comments

* pass in a modified config to dead letter queue

* addressing comment

* changing API to expect values in milliseconds

* Updating docs

* Docs update with example dead letter queue config

* update package.jsoon

* doc nit

Co-authored-by: Plamen Ivanov <plamen.ivanov@jet.com>
  • Loading branch information
plameniv and Plamen Ivanov authored May 4, 2020
1 parent 08dc035 commit 483a9a6
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 15 deletions.
49 changes: 48 additions & 1 deletion docs/docs/Module_Azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ Application.create()
storageAccessKey: "[SOME_KEY]",
queueName: "[QUEUE_NAME]",
encoder: new JsonMessageEncoder(),
visibilityTimeout: 30, // default 30 seconds
visibilityTimeout: 30000, // default 30 seconds
numOfMessages: 32, // default 32
}))
.done()
Expand All @@ -305,3 +305,50 @@ Application.create()
It is recommended to run the service in Serial mode with `queueSource` because once the message is received from Azure Queues its visibility timeout window starts and running the service in serial mode will decrease the chance of hitting the window timeout as messages are queued up internally in Cookie Cutter in Concurrent mode.

Queues items will be reprocessed if you throw an error in the message handler function. The `DequeueCount` metadata can be used to detect reprocessed messages and skip over those if appropriate.

### Dead Letter Queue

It is possible to designate a queue to serve as a dead letter queue. `maxDequeueCount` specifies how many times a message can be dequeued before it is sent to the dead letter queue. The visibility timeout and message time to live will default to the values of the main queue unless the values are explicitly overwritten.

```typescript
Application.create()
.input()
.add(Streaming.queueSource({
storageAccount: "[SOME_ACCOUNT]",
storageAccessKey: "[SOME_KEY]",
queueName: "[QUEUE_NAME]",
encoder: new JsonMessageEncoder(),
deadLetterQueue: {
queueName: "[OTHER_QUEUE_NAME]",
maxDequeueCount: 10,
visibilityTimeout: 30000,
messageTimeToLive: 120000,
}
}))
.done()
.dispatch({
onSomeTask: (_msg: ISomeTask, _ctx: IDispatchContext) => {
// ...
},
})
.run(ErrorHandlingMode.LogAndContinue, ParallelismMode.Serial);
```

### Metadata

The following metadata is available

| Name | Description |
|------|-------------|
| GrpcMetadata.Peer | the host and port of the client sending the request |
| QueueMetadata.QueueName | Queue name |
| QueueMetadata.VisibilityTimeout | When passed into msg metadata via `publish`/`store`: Specifies the new visibility timeout value, in seconds, relative to server time |
| QueueMetadata.VisibilityTimeoutMs | When passed into msg metadata via `publish`/`store`: Specifies the new visibility timeout value, in milliseconds, relative to server time |
| QueueMetadata.VisibilityTimeout | When read from the MessageRef metadata: Returns the date when the message will next be visible in string format: "Tue, 21 Apr 2020 16:33:23 GMT" |
| QueueMetadata.TimeToLive | When passed into msg metadata via `publish`/`store`: The time-to-live interval for the message, in seconds. |
| QueueMetadata.TimeToLiveMs | When passed into msg metadata via `publish`/`store`: The time-to-live interval for the message, in milliseconds. |
| QueueMetadata.TimeToLive | When read from the MessageRef metadata: Returns the date when the message will expire in string format: "Tue, 21 Apr 2020 16:33:23 GMT" |
| QueueMetadata.DequeueCount | Number of times a message has been dequeued |
| QueueMetadata.TimeToNextVisible | not used |
| QueueMetadata.MessageId | The message identifier of the message |
| QueueMetadata.PopReceipt | A valid pop receipt value returned from an earlier call to the Get Messages or Update Message operation |
2 changes: 1 addition & 1 deletion packages/azure/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@walmartlabs/cookie-cutter-azure",
"version": "1.3.0-beta.1",
"version": "1.3.0-beta.2",
"license": "Apache-2.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
21 changes: 21 additions & 0 deletions packages/azure/src/streaming/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,24 @@ export interface IQueueMessagePreprocessor {
process(payload: string): IQueueMessage;
}

export interface IDeadLetterQueueConfiguration {
readonly queueName: string;
readonly maxDequeueCount: number;
/**
* The time-to-live interval for the message, in milliseconds. The maximum time-to-live allowed is 7 days. If this parameter
* is omitted, the default time-to-live is 7 days (604800000 milliseconds)
*/
messageTimeToLive?: number;
/**
* Specifies the new visibility timeout value, in milliseconds, relative to server time. The new value must be larger than or
* equal to 0, and cannot be larger than 7 days (604800000 milliseconds). The visibility timeout of a message cannot be set to a value later than
* the expiry time (calculated based on time-to-live when updating message). visibilitytimeout should be set to a value smaller than the time-to-live value.
*/
visibilityTimeout?: number;
readonly retryCount?: number;
readonly retryInterval?: number;
}

export interface IQueueConfiguration {
readonly url?: string;
readonly storageAccount: string;
Expand Down Expand Up @@ -61,12 +79,15 @@ export interface IQueueSourceConfiguration {
* The visibility timeout of a message can be set to a value later than the expiry time.
*/
visibilityTimeout?: number;
readonly deadLetterQueue?: IDeadLetterQueueConfiguration;
}

export enum QueueMetadata {
QueueName = "queue.name",
VisibilityTimeout = "queue.visibility_timeout",
VisibilityTimeoutMs = "queue.visibility_timeout_ms",
TimeToLive = "queue.time_to_live",
TimeToLiveMs = "queue.time_to_live_ms",
DequeueCount = "queue.dequeue_count",
TimeToNextVisible = "queue.time_to_next_visible",
MessageId = "queue.message_id",
Expand Down
49 changes: 49 additions & 0 deletions packages/azure/src/streaming/internal/QueueInputSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ interface IBufferToJSON {

export class QueueInputSource implements IInputSource, IRequireInitialization {
private readonly client: QueueClient & IRequireInitialization;
private readonly deadLetterClient: (QueueClient & IRequireInitialization) | undefined;
private readonly readOptions: IQueueReadOptions;
private readonly encoder: IMessageEncoder;
private metrics: IMetrics;
Expand All @@ -57,13 +58,31 @@ export class QueueInputSource implements IInputSource, IRequireInitialization {
this.metrics = DefaultComponentContext.metrics;
this.tracer = DefaultComponentContext.tracer;
this.logger = DefaultComponentContext.logger;
if (config.deadLetterQueue) {
const deadLetterConfig: IQueueConfiguration = {
createQueueIfNotExists: config.createQueueIfNotExists,
encoder: config.encoder,
queueName: config.deadLetterQueue.queueName,
storageAccessKey: config.storageAccessKey,
storageAccount: config.storageAccount,
largeItemBlobContainer: config.largeItemBlobContainer,
preprocessor: config.preprocessor,
retryCount: config.deadLetterQueue.retryCount || config.retryCount,
retryInterval: config.deadLetterQueue.retryInterval || config.retryInterval,
url: config.url,
};
this.deadLetterClient = QueueClientWithLargeItemSupport.create(deadLetterConfig);
}
}

public async initialize(context: IComponentContext): Promise<void> {
this.metrics = context.metrics;
this.logger = context.logger;
this.tracer = context.tracer;
await this.client.initialize(context);
if (this.deadLetterClient) {
await this.deadLetterClient.initialize(context);
}
}

public async *start(): AsyncIterableIterator<MessageRef> {
Expand Down Expand Up @@ -107,6 +126,36 @@ export class QueueInputSource implements IInputSource, IRequireInitialization {
[QueueMetadata.PopReceipt]: message.headers[QueueMetadata.PopReceipt],
};

if (
this.deadLetterClient &&
metadata[QueueMetadata.DequeueCount] >
this.config.deadLetterQueue.maxDequeueCount
) {
try {
await this.deadLetterClient.write(
spanContext,
payload,
{ [EventSourcedMetadata.EventType]: event_type },
{
visibilityTimeout: this.config.deadLetterQueue.visibilityTimeout,
messageTimeToLive: this.config.deadLetterQueue.messageTimeToLive,
}
);
await this.client.markAsProcessed(
spanContext,
message.headers[QueueMetadata.MessageId],
message.headers[QueueMetadata.PopReceipt],
message.headers[QueueMetadata.QueueName]
);
} catch (e) {
span.log({ reprocess: true });
failSpan(span, e);
} finally {
span.finish();
}
continue;
}

const msgRef = new MessageRef(metadata, msg, span.context());
msgRef.once("released", async (_msg: MessageRef, _value?: any, error?: Error) => {
try {
Expand Down
12 changes: 10 additions & 2 deletions packages/azure/src/streaming/internal/QueueOutputSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,16 @@ export class QueueOutputSink implements IOutputSink<IPublishedMessage>, IRequire
public async sink(output: IterableIterator<IPublishedMessage>): Promise<void> {
for (const msg of output) {
const queueName = msg.metadata[QueueMetadata.QueueName];
const visibilityTimeout = msg.metadata[QueueMetadata.VisibilityTimeout];
const messageTimeToLive = msg.metadata[QueueMetadata.TimeToLive];
const visibilityTimeoutSeconds = msg.metadata[QueueMetadata.VisibilityTimeout];
const messageTimeToLiveSeconds = msg.metadata[QueueMetadata.TimeToLive];
const visibilityTimeoutMs = msg.metadata[QueueMetadata.VisibilityTimeoutMs];
const messageTimeToLiveMs = msg.metadata[QueueMetadata.TimeToLiveMs];
const visibilityTimeout = visibilityTimeoutMs
? Math.floor(visibilityTimeoutMs / 1000)
: visibilityTimeoutSeconds;
const messageTimeToLive = messageTimeToLiveMs
? Math.floor(messageTimeToLiveMs / 1000)
: messageTimeToLiveSeconds;
const headers = {
[EventSourcedMetadata.EventType]: msg.message.type,
};
Expand Down
66 changes: 65 additions & 1 deletion packages/azure/src/streaming/internal/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,63 @@ LICENSE file in the root directory of this source tree.
*/

import { config, IMessageEncoder } from "@walmartlabs/cookie-cutter-core";
import { IQueueConfiguration, IQueueMessagePreprocessor, IQueueSourceConfiguration } from "..";
import {
IQueueConfiguration,
IQueueMessagePreprocessor,
IQueueSourceConfiguration,
IDeadLetterQueueConfiguration,
} from "..";

@config.section
export class DeadLetterQueueConfiguration implements IDeadLetterQueueConfiguration {
@config.field(config.converters.string)
public set queueName(_: string) {
config.noop();
}
public get queueName(): string {
return config.noop();
}

@config.field(config.converters.number)
public set maxDequeueCount(_: number) {
config.noop();
}
public get maxDequeueCount(): number {
return config.noop();
}

@config.field(config.converters.timespanOf(config.TimeSpanTargetUnit.Seconds))
public set visibilityTimeout(_: number) {
config.noop();
}
public get visibilityTimeout(): number {
return config.noop();
}

@config.field(config.converters.timespanOf(config.TimeSpanTargetUnit.Seconds))
public set messageTimeToLive(_: number) {
config.noop();
}
public get messageTimeToLive(): number {
return config.noop();
}

@config.field(config.converters.number)
public set retryCount(_: number) {
config.noop();
}
public get retryCount(): number {
return config.noop();
}

@config.field(config.converters.timespan)
public set retryInterval(_: number) {
config.noop();
}
public get retryInterval(): number {
return config.noop();
}
}

@config.section
export class QueueConfiguration implements IQueueConfiguration {
Expand Down Expand Up @@ -109,4 +165,12 @@ export class QueueSourceConfiguration extends QueueConfiguration
public get visibilityTimeout(): number {
return config.noop();
}

@config.field<IDeadLetterQueueConfiguration>(DeadLetterQueueConfiguration)
public set deadLetterQueue(_: IDeadLetterQueueConfiguration) {
config.noop();
}
public get deadLetterQueue(): IDeadLetterQueueConfiguration {
return config.noop();
}
}
28 changes: 18 additions & 10 deletions packages/azure/src/utils/QueueClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ export enum QueueOpenTracingTagKeys {
QueueName = "queue.name",
}

export interface IDeadLetterQueueOptions extends IQueueCreateMessageOptions {
readonly maxDequeueCount: number;
readonly retryCount?: number;
readonly retryInterval?: number;
}

export interface IQueueCreateMessageOptions extends IQueueRequestOptions {
/**
* (FROM AZURE DOCS)
Expand Down Expand Up @@ -80,6 +86,7 @@ export interface IQueueReadOptions extends IQueueRequestOptions {
* The visibility timeout of a message can be set to a value later than the expiry time.
*/
visibilityTimeout?: number;
readonly deadLetterQueue?: IDeadLetterQueueOptions;
}

export interface IQueueMessage {
Expand Down Expand Up @@ -337,13 +344,13 @@ export class QueueClient implements IRequireInitialization {
);
resolve(
results.reduce((messages, result) => {
const mesageObj = this.config.preprocessor.process(
const messageObj = this.config.preprocessor.process(
result.messageText
);

if (
!mesageObj.headers ||
!mesageObj.headers[EventSourcedMetadata.EventType]
!messageObj.headers ||
!messageObj.headers[EventSourcedMetadata.EventType]
) {
span.log({ messageId: result.messageId });
failSpan(
Expand All @@ -360,17 +367,18 @@ export class QueueClient implements IRequireInitialization {
return messages;
}

mesageObj.headers[QueueMetadata.DequeueCount] = (
messageObj.headers[QueueMetadata.DequeueCount] = (
result.dequeueCount || 1
).toString();
mesageObj.headers[QueueMetadata.QueueName] = result.queue;
mesageObj.headers[QueueMetadata.TimeToLive] = result.expirationTime;
mesageObj.headers[QueueMetadata.VisibilityTimeout] =
messageObj.headers[QueueMetadata.QueueName] = result.queue;
messageObj.headers[QueueMetadata.TimeToLive] =
result.expirationTime;
messageObj.headers[QueueMetadata.VisibilityTimeout] =
result.timeNextVisible;
mesageObj.headers[QueueMetadata.MessageId] = result.messageId;
mesageObj.headers[QueueMetadata.PopReceipt] = result.popReceipt;
messageObj.headers[QueueMetadata.MessageId] = result.messageId;
messageObj.headers[QueueMetadata.PopReceipt] = result.popReceipt;

messages.push(mesageObj);
messages.push(messageObj);

return messages;
}, [])
Expand Down

0 comments on commit 483a9a6

Please sign in to comment.