From 0ba29f185773da8d83d4f266c0a8749d6d7bc59c Mon Sep 17 00:00:00 2001 From: prakashelango Date: Sun, 14 Jul 2024 23:15:42 +0800 Subject: [PATCH] fix: Nack on sqs message --- .../messaging/aws/sqs/SqsConnector.java | 1 + .../messaging/aws/sqs/SqsInboundChannel.java | 2 +- .../reactive/messaging/aws/sqs/SqsMessage.java | 5 +---- .../aws/sqs/ack/SqsNothingAckHandler.java | 18 +++++++++++++++++- .../sqs/locals/LocalPropagationAckTest.java | 2 +- 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java index 10026ab79..38528bfd5 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java +++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsConnector.java @@ -49,6 +49,7 @@ @ConnectorAttribute(name = "receive.request.retries", type = "long", direction = ConnectorAttribute.Direction.INCOMING, description = "If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled.", defaultValue = "2147483647") @ConnectorAttribute(name = "receive.request.pause.resume", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.", defaultValue = "true") @ConnectorAttribute(name = "ack.delete", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the acknowledgement deletes the message from the queue", defaultValue = "true") +@ConnectorAttribute(name = "nack.visibility-timeout", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The duration in seconds that the nacked messages to set Visibility Timeout", defaultValue = "0") public class SqsConnector implements InboundConnector, OutboundConnector, HealthReporter { @Inject diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java index 73ec245f2..a735dd284 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java +++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsInboundChannel.java @@ -63,7 +63,7 @@ public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, Sq this.customizer = customizer; SqsAckHandler ackHandler = conf.getAckDelete() ? new SqsDeleteAckHandler(client, queueUrlUni) - : new SqsNothingAckHandler(); + : new SqsNothingAckHandler(client, queueUrlUni); PausablePollingStream, software.amazon.awssdk.services.sqs.model.Message> pollingStream = new PausablePollingStream<>( channel, request(null, 0), (messages, processor) -> { if (messages != null) { diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsMessage.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsMessage.java index ab6206ab8..6d81455c2 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsMessage.java +++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsMessage.java @@ -3,7 +3,6 @@ import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; import java.util.function.Function; @@ -118,9 +117,7 @@ public Function> getAckWithMetadata() { @Override public CompletionStage nack(Throwable reason, Metadata metadata) { - CompletableFuture nack = new CompletableFuture<>(); - runOnMessageContext(() -> nack.complete(null)); - return nack; + return ackHandler.handle(this).subscribeAsCompletionStage(); } @Override diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/ack/SqsNothingAckHandler.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/ack/SqsNothingAckHandler.java index e7d790183..1e2eb3177 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/ack/SqsNothingAckHandler.java +++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/ack/SqsNothingAckHandler.java @@ -3,12 +3,28 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.aws.sqs.SqsAckHandler; import io.smallrye.reactive.messaging.aws.sqs.SqsMessage; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; public class SqsNothingAckHandler implements SqsAckHandler { + private final SqsAsyncClient client; + private final Uni queueUrlUni; + + public SqsNothingAckHandler(SqsAsyncClient client, Uni queueUrlUni) { + this.client = client; + this.queueUrlUni = queueUrlUni; + } + @Override public Uni handle(SqsMessage message) { - return Uni.createFrom().voidItem() + return queueUrlUni.map(queueUrl -> ChangeMessageVisibilityRequest.builder() + .queueUrl(queueUrl) + .receiptHandle(message.getMessage().receiptHandle()) + .visibilityTimeout(0) + .build()) + .chain(request -> Uni.createFrom().completionStage(() -> client.changeMessageVisibility(request))) + .replaceWithVoid() .emitOn(message::runOnMessageContext); } } diff --git a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/locals/LocalPropagationAckTest.java b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/locals/LocalPropagationAckTest.java index 93cde9422..0da818dd1 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/locals/LocalPropagationAckTest.java +++ b/smallrye-reactive-messaging-aws-sqs/src/test/java/io/smallrye/reactive/messaging/aws/sqs/locals/LocalPropagationAckTest.java @@ -54,7 +54,7 @@ public void testChannelWithAckOnMessageContextNothingAck() { SqsClientProvider.client = getSqsClient(); addBeans(SqsClientProvider.class); IncomingChannelWithAckOnMessageContext bean = runApplication(dataconfig() - .with("mp.messaging.incoming.data.ack.delete", false), + .with("mp.messaging.incoming.data.ack.delete", true), IncomingChannelWithAckOnMessageContext.class); bean.process(i -> i + 1); await().until(() -> bean.getResults().size() >= 5);