Skip to content

Commit

Permalink
[ISSUE #9112] Speedup revive scan in Pop Consumption and support serv…
Browse files Browse the repository at this point in the history
…er side reset offset
  • Loading branch information
lizhimins committed Jan 8, 2025
1 parent 9ce8345 commit a1a4354
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ public interface PopConsumerKVStore {

/**
* Scans and returns a list of expired consumer records before the current time.
* @param currentTime The current revive checkpoint timestamp.
* @param maxCount The maximum number of records to return.
* @return A list of expired consumer records.
*/
List<PopConsumerRecord> scanExpiredRecords(long currentTime, int maxCount);
List<PopConsumerRecord> scanExpiredRecords(long lowerTime, long upperTime, int maxCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
Expand All @@ -43,7 +45,7 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P

private WriteOptions writeOptions;
private WriteOptions deleteOptions;
private ColumnFamilyHandle columnFamilyHandle;
protected ColumnFamilyHandle columnFamilyHandle;

public PopConsumerRocksdbStore(String filePath) {
super(filePath);
Expand All @@ -60,8 +62,7 @@ protected void initOptions() {
this.writeOptions.setNoSlowdown(false);

this.deleteOptions = new WriteOptions();
this.deleteOptions.setSync(false);
this.deleteOptions.setLowPri(true);
this.deleteOptions.setSync(true);
this.deleteOptions.setDisableWAL(false);
this.deleteOptions.setNoSlowdown(false);

Expand Down Expand Up @@ -135,18 +136,19 @@ public void deleteRecords(List<PopConsumerRecord> consumerRecordList) {
}

@Override
public List<PopConsumerRecord> scanExpiredRecords(long currentTime, int maxCount) {
// https://github.com/facebook/rocksdb/issues/10300
public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper, int maxCount) {
// In RocksDB, we can use SstPartitionerFixedPrefixFactory in cfOptions
// and new ColumnFamilyOptions().useFixedLengthPrefixExtractor() to
// configure prefix indexing to improve the performance of scans.
// However, in the current implementation, this is not the bottleneck.
List<PopConsumerRecord> consumerRecordList = new ArrayList<>();
try (RocksIterator iterator = db.newIterator(this.columnFamilyHandle)) {
iterator.seekToFirst();
try (ReadOptions scanOptions = new ReadOptions()
.setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(lower).array()))
.setIterateUpperBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upper).array()));
RocksIterator iterator = db.newIterator(this.columnFamilyHandle, scanOptions)) {
iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lower).array());
while (iterator.isValid() && consumerRecordList.size() < maxCount) {
if (ByteBuffer.wrap(iterator.key()).getLong() > currentTime) {
break;
}
consumerRecordList.add(PopConsumerRecord.decode(iterator.value()));
iterator.next();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class PopConsumerService extends ServiceThread {
private final AtomicBoolean consumerRunning;
private final BrokerConfig brokerConfig;
private final BrokerController brokerController;
private final AtomicLong currentTime;
private final AtomicLong lastCleanupLockTime;
private final PopConsumerCache popConsumerCache;
private final PopConsumerKVStore popConsumerStore;
Expand All @@ -88,6 +89,7 @@ public PopConsumerService(BrokerController brokerController) {

this.consumerRunning = new AtomicBoolean(false);
this.requestCountTable = new ConcurrentHashMap<>();
this.currentTime = new AtomicLong(TimeUnit.SECONDS.toMillis(3));
this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis());
this.consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));
this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(
Expand Down Expand Up @@ -195,12 +197,27 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
return context;
}

public Long getPopOffset(String groupId, String topicId, int queueId) {
Long resetOffset =
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId, groupId, queueId);
if (resetOffset != null) {
this.clearCache(groupId, topicId, queueId);
this.brokerController.getConsumerOrderInfoManager().clearBlock(topicId, groupId, queueId);
this.brokerController.getConsumerOffsetManager()
.commitOffset("ResetPopOffset", groupId, topicId, queueId, resetOffset);
}
return resetOffset;
}

public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,
String groupId, String topicId, int queueId, long offset, int batchSize, MessageFilter filter) {

log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}",
groupId, topicId, offset, queueId, batchSize, filter != null);

Long resetOffset = this.getPopOffset(groupId, topicId, queueId);
final long currentOffset = resetOffset != null ? resetOffset : offset;

CompletableFuture<GetMessageResult> getMessageFuture =
brokerController.getMessageStore().getMessageAsync(groupId, topicId, queueId, offset, batchSize, filter);

Expand All @@ -223,7 +240,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String clientHost,

log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, " +
"groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}",
groupId, topicId, queueId, batchSize, offset, result.getNextBeginOffset());
groupId, topicId, queueId, batchSize, currentOffset, result.getNextBeginOffset());

return brokerController.getMessageStore().getMessageAsync(
groupId, topicId, queueId, result.getNextBeginOffset(), batchSize, filter);
Expand Down Expand Up @@ -482,10 +499,12 @@ public void clearCache(String groupId, String topicId, int queueId) {
}
}

public long revive(long currentTime, int maxCount) {
public long revive(AtomicLong currentTime, int maxCount) {
Stopwatch stopwatch = Stopwatch.createStarted();
List<PopConsumerRecord> consumerRecords =
this.popConsumerStore.scanExpiredRecords(currentTime, maxCount);
long upperTime = System.currentTimeMillis() - 50L;
List<PopConsumerRecord> consumerRecords = this.popConsumerStore.scanExpiredRecords(
currentTime.get() - TimeUnit.SECONDS.toMillis(3), upperTime, maxCount);
long scanCostTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Queue<PopConsumerRecord> failureList = new LinkedBlockingQueue<>();
List<CompletableFuture<?>> futureList = new ArrayList<>(consumerRecords.size());

Expand All @@ -497,9 +516,9 @@ public long revive(long currentTime, int maxCount) {
long backoffInterval = 1000L * REWRITE_INTERVALS_IN_SECONDS[
Math.min(REWRITE_INTERVALS_IN_SECONDS.length, record.getAttemptTimes())];
long nextInvisibleTime = record.getInvisibleTime() + backoffInterval;
PopConsumerRecord retryRecord = new PopConsumerRecord(record.getPopTime(), record.getGroupId(),
record.getTopicId(), record.getQueueId(), record.getRetryFlag(), nextInvisibleTime,
record.getOffset(), record.getAttemptId());
PopConsumerRecord retryRecord = new PopConsumerRecord(System.currentTimeMillis(),
record.getGroupId(), record.getTopicId(), record.getQueueId(),
record.getRetryFlag(), nextInvisibleTime, record.getOffset(), record.getAttemptId());
retryRecord.setAttemptTimes(record.getAttemptTimes() + 1);
failureList.add(retryRecord);
log.warn("PopConsumerService revive backoff retry, record={}", retryRecord);
Expand All @@ -513,14 +532,18 @@ public long revive(long currentTime, int maxCount) {
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
this.popConsumerStore.writeRecords(new ArrayList<>(failureList));
this.popConsumerStore.deleteRecords(consumerRecords);
currentTime.set(consumerRecords.isEmpty() ?
upperTime : consumerRecords.get(consumerRecords.size() - 1).getVisibilityTimeout());

if (brokerConfig.isEnablePopBufferMerge()) {
log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, cost={}ms",
popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), consumerRecords.size(),
failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, behind={}ms, scan={}ms, cost={}ms",
popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(),
consumerRecords.size(), failureList.size(), upperTime - currentTime.get(),
scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS));
} else {
log.info("PopConsumerService, revive count={}, failure count={}, cost={}ms",
consumerRecords.size(), failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
log.info("PopConsumerService, revive count={}, failure count={}, behind={}ms, scan={}ms, cost={}ms",
consumerRecords.size(), failureList.size(), upperTime - currentTime.get(),
scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

return consumerRecords.size();
Expand Down Expand Up @@ -588,11 +611,6 @@ public boolean reviveRetry(PopConsumerRecord record, MessageExt messageExt) {
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);

if (brokerConfig.isEnablePopLog()) {
log.debug("PopConsumerService revive retry msg, put status={}, ck={}, delay={}ms",
putMessageResult, JSON.toJSONString(record), System.currentTimeMillis() - record.getVisibilityTimeout());
}

if (putMessageResult.getAppendMessageResult() == null ||
putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
log.error("PopConsumerService revive retry msg error, put status={}, ck={}, delay={}ms",
Expand All @@ -616,7 +634,7 @@ public synchronized void transferToFsStore() {
while (true) {
try {
List<PopConsumerRecord> consumerRecords = this.popConsumerStore.scanExpiredRecords(
Long.MAX_VALUE, brokerConfig.getPopReviveMaxReturnSizePerRead());
0, Long.MAX_VALUE, brokerConfig.getPopReviveMaxReturnSizePerRead());
if (consumerRecords == null || consumerRecords.isEmpty()) {
break;
}
Expand Down Expand Up @@ -695,7 +713,7 @@ public void run() {
while (!isStopped()) {
try {
// to prevent concurrency issues during read and write operations
long reviveCount = this.revive(System.currentTimeMillis() - 50L,
long reviveCount = this.revive(this.currentTime,
brokerConfig.getPopReviveMaxReturnSizePerRead());

long current = System.currentTimeMillis();
Expand All @@ -704,7 +722,7 @@ public void run() {
this.lastCleanupLockTime.set(current);
}

if (reviveCount == 0) {
if (reviveCount < brokerConfig.getPopReviveMaxReturnSizePerRead()) {
this.waitForRunning(500);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@
*/
package org.apache.rocketmq.broker.pop;

import com.google.common.base.Stopwatch;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,18 +93,101 @@ public void rocksdbStoreWriteDeleteTest() {
.collect(Collectors.toList()));

List<PopConsumerRecord> consumerRecords =
consumerStore.scanExpiredRecords(20002, 2);
consumerStore.scanExpiredRecords(0, 20002, 2);
Assert.assertEquals(2, consumerRecords.size());
consumerStore.deleteRecords(consumerRecords);

consumerRecords = consumerStore.scanExpiredRecords(20002, 2);
consumerRecords = consumerStore.scanExpiredRecords(0, 20003, 2);
Assert.assertEquals(1, consumerRecords.size());
consumerStore.deleteRecords(consumerRecords);

consumerRecords = consumerStore.scanExpiredRecords(20004, 3);
consumerRecords = consumerStore.scanExpiredRecords(0, 20005, 3);
Assert.assertEquals(2, consumerRecords.size());

consumerStore.shutdown();
deleteStoreDirectory(filePath);
}

private long getDirectorySizeRecursive(File directory) {
long size = 0;
File[] files = directory.listFiles();
if (files != null) {
for (File file : files) {
if (file.isFile()) {
size += file.length();
} else if (file.isDirectory()) {
size += getDirectorySizeRecursive(file);
}
}
}
return size;
}

@Test
@Ignore
@SuppressWarnings("ConstantValue")
public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException {
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath());
rocksdbStore.start();

int iterCount = 1000 * 1000;
boolean useSeekFirstDelete = false;
Field dbField = AbstractRocksDBStorage.class.getDeclaredField("db");
dbField.setAccessible(true);
RocksDB rocksDB = (RocksDB) dbField.get(rocksdbStore);

long currentTime = 0L;
Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < iterCount; i++) {
List<PopConsumerRecord> records = new ArrayList<>();
for (int j = 0; j < 1000; j++) {
PopConsumerRecord record = getConsumerRecord();
record.setPopTime((long) i * iterCount + j);
record.setGroupId("GroupTest");
record.setTopicId("TopicTest");
record.setQueueId(i % 10);
record.setRetryFlag(0);
record.setInvisibleTime(TimeUnit.SECONDS.toMillis(30));
record.setOffset(i);
records.add(record);
}
rocksdbStore.writeRecords(records);

long start = stopwatch.elapsed(TimeUnit.MILLISECONDS);
List<PopConsumerRecord> deleteList = new ArrayList<>();
if (useSeekFirstDelete) {
try (RocksIterator iterator = rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) {
iterator.seekToFirst();
if (i % 10 == 0) {
long fileSize = getDirectorySizeRecursive(new File(rocksdbStore.getFilePath()));
log.info("DirectorySize={}, Cost={}ms",
MessageStoreUtil.toHumanReadable(fileSize), stopwatch.elapsed(TimeUnit.MILLISECONDS) - start);
}
while (iterator.isValid() && deleteList.size() < 1024) {
deleteList.add(PopConsumerRecord.decode(iterator.value()));
iterator.next();
}
}
} else {
long upper = System.currentTimeMillis();
deleteList = rocksdbStore.scanExpiredRecords(currentTime, upper, 800);
if (!deleteList.isEmpty()) {
currentTime = deleteList.get(deleteList.size() - 1).getVisibilityTimeout();
}
long scanCost = stopwatch.elapsed(TimeUnit.MILLISECONDS) - start;
if (i % 100 == 0) {
long fileSize = getDirectorySizeRecursive(new File(rocksdbStore.getFilePath()));
long seekTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
try (RocksIterator iterator = rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) {
iterator.seekToFirst();
}
log.info("DirectorySize={}, Cost={}ms, SeekFirstCost={}ms", MessageStoreUtil.toHumanReadable(fileSize),
scanCost, stopwatch.elapsed(TimeUnit.MILLISECONDS) - seekTime);
}
}
rocksdbStore.deleteRecords(deleteList);
}
rocksdbStore.shutdown();
deleteStoreDirectory(rocksdbStore.getFilePath());
}
}
Loading

0 comments on commit a1a4354

Please sign in to comment.