Skip to content

Commit

Permalink
Fix BlocksByRange RPC requests when count is 0 (#8341)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored May 29, 2024
1 parent 45b612d commit 015f7bd
Show file tree
Hide file tree
Showing 19 changed files with 164 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public UInt64 getStep() {
}

public UInt64 getMaxSlot() {
return getStartSlot().plus(getCount().minus(1).times(getStep()));
return getStartSlot().plus(getCount().minusMinZero(1).times(getStep()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public UInt64 getCount() {
}

public UInt64 getMaxSlot() {
return getStartSlot().plus(getCount()).minusMinZero(1);
return getStartSlot().plus(getCount().minusMinZero(1));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public NavigableMap<UInt64, Bytes32> getAncestors(
final UInt64 count) {
final NavigableMap<UInt64, Bytes32> roots = new TreeMap<>();
// minus(ONE) because the start block is included
final UInt64 endSlot = startSlot.plus(step.times(count)).minus(UInt64.ONE);
final UInt64 endSlot = startSlot.plus(step.times(count)).minusMinZero(1);
Bytes32 parentRoot = root;
Optional<UInt64> parentSlot = forkChoiceStrategy.blockSlot(parentRoot);
while (parentSlot.isPresent() && parentSlot.get().compareTo(startSlot) > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public static Stream<Arguments> getMaxSlotParams() {
Arguments.of(0, 2, 2, 2),
Arguments.of(10, 2, 2, 12),
Arguments.of(0, 5, 2, 8),
Arguments.of(10, 5, 2, 18));
Arguments.of(10, 5, 2, 18),
Arguments.of(10, 3, 5, 20),
Arguments.of(0, 0, 1, 0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public static Stream<Arguments> getMaxSlotParams() {
Arguments.of(0, 2, 1),
Arguments.of(10, 2, 11),
Arguments.of(0, 5, 4),
Arguments.of(10, 5, 14));
Arguments.of(10, 5, 14),
Arguments.of(1, 0, 1),
Arguments.of(0, 0, 0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ public void shouldSendEmptyResponseWhenNoBlocksAreAvailable() throws Exception {
assertThat(response).isEmpty();
}

@Test
public void shouldSendEmptyResponseWhenCountIsZero() throws Exception {
final Eth2Peer peer = createPeer();

final SignedBlockAndState block = peerStorage.chainUpdater().advanceChain();
peerStorage.chainUpdater().updateBestBlock(block);

final List<SignedBeaconBlock> blocks = new ArrayList<>();
waitFor(
peer.requestBlocksByRange(UInt64.ONE, UInt64.ZERO, RpcResponseListener.from(blocks::add)));
assertThat(peer.getOutstandingRequests()).isEqualTo(0);
assertThat(blocks).isEmpty();
}

@Test
public void shouldRespondWithBlocksFromCanonicalChain() throws Exception {
final Eth2Peer peer = createPeer();
Expand All @@ -74,7 +88,7 @@ public void shouldRespondWithBlocksFromCanonicalChain() throws Exception {
}

@Test
public void requestBlocksByRangeAfterPeerDisconnectedImmediately() throws Exception {
public void requestBlocksByRangeAfterPeerDisconnectedImmediately() {
final Eth2Peer peer = createPeer();

// Setup chain
Expand Down Expand Up @@ -116,7 +130,7 @@ public void requestBlocksByRangeAfterPeerDisconnected() throws Exception {
}

@Test
public void requestBlockBySlotAfterPeerDisconnectedImmediately() throws Exception {
public void requestBlockBySlotAfterPeerDisconnectedImmediately() {
final Eth2Peer peer = createPeer();

// Setup chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,26 @@ public void requestBlobSidecars_shouldReturnEmptyBlobSidecarsOnDenebMilestone()
}

@Test
public void requestBlobSidecars_shouldReturnCanonicalBlobSidecarsOnDenebMilestone()
public void requestBlobSidecars_shouldReturnEmptyBlobSidecarsWhenCountIsZero()
throws ExecutionException, InterruptedException, TimeoutException {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalDeneb());

// finalize chain 2 blobs per block
peerStorage.chainUpdater().blockOptions.setGenerateRandomBlobs(true);
peerStorage.chainUpdater().blockOptions.setGenerateRandomBlobsCount(Optional.of(2));
finalizeChainWithBlobs(2);

final List<SignedBlockAndState> finalizedBlocksAndStates =
peerStorage
.chainBuilder()
.finalizeCurrentChain(Optional.of(peerStorage.chainUpdater().blockOptions));
finalizedBlocksAndStates.forEach(
blockAndState -> {
final List<BlobSidecar> blobSidecars =
peerStorage.chainBuilder().getBlobSidecars(blockAndState.getRoot());
peerStorage.chainUpdater().saveBlock(blockAndState, blobSidecars);
peerStorage.chainUpdater().updateBestBlock(blockAndState);
});
final List<BlobSidecar> blobSidecars =
requestBlobSidecarsByRange(peer, UInt64.ONE, UInt64.ZERO);

assertThat(blobSidecars).isEmpty();
}

@Test
public void requestBlobSidecars_shouldReturnCanonicalBlobSidecarsOnDenebMilestone()
throws ExecutionException, InterruptedException, TimeoutException {
final Eth2Peer peer = createPeer(TestSpecFactory.createMinimalDeneb());

// finalize chain 2 blobs per block
finalizeChainWithBlobs(2);

final ChainBuilder fork = peerStorage.chainBuilder().fork();

Expand Down Expand Up @@ -118,6 +119,23 @@ public void requestBlobSidecars_shouldReturnCanonicalBlobSidecarsOnDenebMileston
assertThat(blobSidecars).doesNotContainAnyElementsOf(nonCanonicalBlobSidecars);
}

private void finalizeChainWithBlobs(final int blobsPerBlock) {
peerStorage.chainUpdater().blockOptions.setGenerateRandomBlobs(true);
peerStorage.chainUpdater().blockOptions.setGenerateRandomBlobsCount(Optional.of(blobsPerBlock));

final List<SignedBlockAndState> finalizedBlocksAndStates =
peerStorage
.chainBuilder()
.finalizeCurrentChain(Optional.of(peerStorage.chainUpdater().blockOptions));
finalizedBlocksAndStates.forEach(
blockAndState -> {
final List<BlobSidecar> blobSidecars =
peerStorage.chainBuilder().getBlobSidecars(blockAndState.getRoot());
peerStorage.chainUpdater().saveBlock(blockAndState, blobSidecars);
peerStorage.chainUpdater().updateBestBlock(blockAndState);
});
}

private List<BlobSidecar> requestBlobSidecarsByRange(
final Eth2Peer peer, final UInt64 from, final UInt64 count)
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ private SafeFuture<RequestState> sendMatchingBlocks(
final UInt64 startSlot = message.getStartSlot();
final UInt64 count = message.getCount();
final UInt64 step = message.getStep();
final UInt64 endSlot = startSlot.plus(step.times(count)).minus(ONE);

return combinedChainDataClient
.getEarliestAvailableBlockSlot()
Expand All @@ -183,7 +182,7 @@ private SafeFuture<RequestState> sendMatchingBlocks(
.map(MinimalBeaconBlockSummary::getSlot)
.orElse(ZERO);
final NavigableMap<UInt64, Bytes32> hotRoots;
if (combinedChainDataClient.isFinalized(endSlot)) {
if (combinedChainDataClient.isFinalized(message.getMaxSlot())) {
// All blocks are finalized so skip scanning the protoarray
hotRoots = new TreeMap<>();
} else {
Expand All @@ -195,8 +194,12 @@ private SafeFuture<RequestState> sendMatchingBlocks(
// finalized
// so we don't need to worry about inconsistent blocks
final UInt64 headSlot = hotRoots.isEmpty() ? headBlockSlot : hotRoots.lastKey();
return sendNextBlock(
new RequestState(startSlot, step, count, headSlot, hotRoots, callback));
final RequestState initialState =
new RequestState(startSlot, step, count, headSlot, hotRoots, callback);
if (initialState.isComplete()) {
return SafeFuture.completedFuture(initialState);
}
return sendNextBlock(initialState);
});
}

Expand All @@ -220,7 +223,11 @@ private SafeFuture<Boolean> processNextBlock(final RequestState requestState) {
// Ensure blocks are loaded off of the event thread
return requestState
.loadNextBlock()
.thenCompose(block -> handleLoadedBlock(requestState, block));
.thenCompose(
block -> {
requestState.decrementRemainingBlocks();
return handleLoadedBlock(requestState, block);
});
}

/** Sends the block and returns true if the request is now complete. */
Expand All @@ -241,12 +248,14 @@ private SafeFuture<Boolean> handleLoadedBlock(
}

private class RequestState {

private final UInt64 headSlot;
private final ResponseCallback<SignedBeaconBlock> callback;
private final UInt64 step;
private final NavigableMap<UInt64, Bytes32> knownBlockRoots;
private UInt64 currentSlot;
private UInt64 remainingBlocks;

private final AtomicInteger sentBlocks = new AtomicInteger(0);

RequestState(
Expand All @@ -258,9 +267,7 @@ private class RequestState {
final ResponseCallback<SignedBeaconBlock> callback) {
this.currentSlot = startSlot;
this.knownBlockRoots = knownBlockRoots;
// Minus 1 to account for sending the block at startSlot.
// We only decrement this when moving to the next slot but we're already at the first slot
this.remainingBlocks = count.minus(ONE);
this.remainingBlocks = count;
this.step = step;
this.headSlot = headSlot;
this.callback = callback;
Expand All @@ -287,8 +294,11 @@ SafeFuture<Void> sendBlock(final SignedBeaconBlock block) {
return callback.respond(block).thenRun(sentBlocks::incrementAndGet);
}

void decrementRemainingBlocks() {
remainingBlocks = remainingBlocks.minusMinZero(1);
}

void incrementCurrentSlot() {
remainingBlocks = remainingBlocks.minus(ONE);
currentSlot = currentSlot.plus(step);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,29 @@ public BeaconBlocksByRootMessageHandler(
}

@Override
public void onIncomingMessage(
final String protocolId,
final Eth2Peer peer,
final BeaconBlocksByRootRequestMessage message,
final ResponseCallback<SignedBeaconBlock> callback) {
LOG.trace(
"Peer {} requested {} BeaconBlocks with roots: {}", peer.getId(), message.size(), message);

public Optional<RpcException> validateRequest(
final String protocolId, final BeaconBlocksByRootRequestMessage request) {
final UInt64 maxRequestBlocks = getMaxRequestBlocks();

if (message.size() > maxRequestBlocks.intValue()) {
if (request.size() > maxRequestBlocks.intValue()) {
requestCounter.labels("count_too_big").inc();
callback.completeWithErrorResponse(
return Optional.of(
new RpcException(
INVALID_REQUEST_CODE,
"Only a maximum of " + maxRequestBlocks + " blocks can be requested per request"));
return;
}

SafeFuture<Void> future = SafeFuture.COMPLETE;
return Optional.empty();
}

@Override
public void onIncomingMessage(
final String protocolId,
final Eth2Peer peer,
final BeaconBlocksByRootRequestMessage message,
final ResponseCallback<SignedBeaconBlock> callback) {
LOG.trace(
"Peer {} requested {} BeaconBlocks with roots: {}", peer.getId(), message.size(), message);

final Optional<RequestApproval> blocksRequestApproval =
peer.approveBlocksRequest(callback, message.size());
Expand All @@ -104,7 +107,9 @@ public void onIncomingMessage(
totalBlocksRequestedCounter.inc(message.size());
final AtomicInteger sentBlocks = new AtomicInteger(0);

for (SszBytes32 blockRoot : message) {
SafeFuture<Void> future = SafeFuture.COMPLETE;

for (final SszBytes32 blockRoot : message) {
future =
future.thenCompose(
__ ->
Expand Down
Loading

0 comments on commit 015f7bd

Please sign in to comment.