Skip to content

Commit

Permalink
[ISSUE #9025] [RIP-73] Pop Consumption Improvement Based on RocksDB
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhimins committed Dec 14, 2024
1 parent eed2d19 commit 4f3e30a
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 13 deletions.
1 change: 1 addition & 0 deletions broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ java_library(
"//srvutil",
"//store",
"//tieredstore",
"@maven//:org_slf4j_slf4j_api",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class PopLongPollingService extends ServiceThread {

private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);

private final BrokerController brokerController;
private final NettyRequestProcessor processor;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,19 @@ public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long po
}
return CompletableFuture.completedFuture(result);
}).whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("PopConsumerService popAsync get message error",
throwable instanceof CompletionException ? throwable.getCause() : throwable);
}
if (result.getMessageCount() > 0) {
log.debug("PopConsumerService popAsync result, found={}, groupId={}, topicId={}, queueId={}, " +
"batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}", result.getMessageCount(),
groupId, topicId, queueId, batchSize, invisibleTime, fifo, attemptId, filter);
try {
if (throwable != null) {
log.error("PopConsumerService popAsync get message error",
throwable instanceof CompletionException ? throwable.getCause() : throwable);
}
if (result.getMessageCount() > 0) {
log.debug("PopConsumerService popAsync result, found={}, groupId={}, topicId={}, queueId={}, " +
"batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}", result.getMessageCount(),
groupId, topicId, queueId, batchSize, invisibleTime, fifo, attemptId, filter);
}
} finally {
consumerLockService.unlock(groupId, topicId);
}
consumerLockService.unlock(groupId, topicId);
});
} catch (Throwable t) {
log.error("PopConsumerService popAsync error", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,9 @@ private void appendAckNew(final AckMessageRequestHeader requestHeader, final Bat

if (requestHeader != null && batchAck == null) {
String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
brokerName = ExtraInfoUtil.getBrokerName(extraInfo);
String groupId = requestHeader.getConsumerGroup();
String topicId = requestHeader.getTopic();
int queueId = requestHeader.getQueueId();
long startOffset = ExtraInfoUtil.getCkQueueOffset(extraInfo);
long ackOffset = requestHeader.getOffset();
long popTime = ExtraInfoUtil.getPopTime(extraInfo);
long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
Expand Down Expand Up @@ -373,7 +371,7 @@ private void appendAckNew(final AckMessageRequestHeader requestHeader, final Bat
this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
this.brokerController.getBrokerStatsManager().incGroupAckNums(groupId, topicId, ackCount);
} catch (ConsumeQueueException e) {
throw new RemotingCommandException("Failed to get max offset in queue", e);
throw new RemotingCommandException("Failed to ack message", e);
}
}
}
Expand Down

0 comments on commit 4f3e30a

Please sign in to comment.