Skip to content

Commit

Permalink
Changes in forward sync and rate limit constants (#8982)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Jan 15, 2025
1 parent 9abd34c commit c315305
Show file tree
Hide file tree
Showing 17 changed files with 191 additions and 77 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
## Unreleased Changes

### Breaking Changes
- `--Xvalidators-builder-registration-default-gas-limit` is removed in favour of `--validators-builder-registration-default-gas-limit`
- `--Xvalidators-builder-registration-default-gas-limit` CLI option is replaced by `--validators-builder-registration-default-gas-limit`
- `--Xp2p-sync-rate-limit` CLI option is removed in favour of `--Xp2p-sync-blocks-rate-limit` and `--Xp2p-sync-blob-sidecars-rate-limit`
- `--Xpeer-rate-limit` CLI options is removed in favour of `--Xpeer-blocks-rate-limit` and `--Xpeer-blob-sidecars-rate-limit`
- With the upgrade of the Prometheus Java Metrics library, there are the following changes:
- Gauge names are not allowed to end with `total`, therefore metrics as `beacon_proposers_data_total` and `beacon_eth1_current_period_votes_total` are dropping the `_total` suffix
- The `_created` timestamps are not returned by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ protected ForwardSyncService createForwardSyncService() {
syncConfig.getForwardSyncBatchSize(),
syncConfig.getForwardSyncMaxPendingBatches(),
syncConfig.getForwardSyncMaxBlocksPerMinute(),
syncConfig.getForwardSyncMaxBlobSidecarsPerMinute(),
spec);
} else {
LOG.info("Using single peer sync");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@

import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.teku.networking.eth2.P2PConfig;

public class SyncConfig {

public static final boolean DEFAULT_MULTI_PEER_SYNC_ENABLED = true;
public static final boolean DEFAULT_RECONSTRUCT_HISTORIC_STATES_ENABLED = false;
public static final boolean DEFAULT_FETCH_ALL_HISTORIC_BLOCKS = true;
public static final int DEFAULT_FORWARD_SYNC_BATCH_SIZE = 50;

public static final int DEFAULT_HISTORICAL_SYNC_BATCH_SIZE = 50;
public static final int DEFAULT_FORWARD_SYNC_BATCH_SIZE = 25;
public static final int DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES = 5;

/** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOCKS_RATE_LIMIT} */
public static final int DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE = 500;

/** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT} */
public static final int DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE = 2000;

private final boolean isEnabled;
private final boolean isMultiPeerSyncEnabled;
private final boolean reconstructHistoricStatesEnabled;
Expand All @@ -33,6 +41,7 @@ public class SyncConfig {
private final int forwardSyncBatchSize;
private final int forwardSyncMaxPendingBatches;
private final int forwardSyncMaxBlocksPerMinute;
private final int forwardSyncMaxBlobSidecarsPerMinute;

private SyncConfig(
final boolean isEnabled,
Expand All @@ -42,7 +51,8 @@ private SyncConfig(
final int historicalSyncBatchSize,
final int forwardSyncBatchSize,
final int forwardSyncMaxPendingBatches,
final int forwardSyncMaxBlocksPerMinute) {
final int forwardSyncMaxBlocksPerMinute,
final int forwardSyncMaxBlobSidecarsPerMinute) {
this.isEnabled = isEnabled;
this.isMultiPeerSyncEnabled = isMultiPeerSyncEnabled;
this.reconstructHistoricStatesEnabled = reconstructHistoricStatesEnabled;
Expand All @@ -51,6 +61,7 @@ private SyncConfig(
this.forwardSyncBatchSize = forwardSyncBatchSize;
this.forwardSyncMaxPendingBatches = forwardSyncMaxPendingBatches;
this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute;
this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute;
}

public static Builder builder() {
Expand Down Expand Up @@ -89,6 +100,10 @@ public int getForwardSyncMaxBlocksPerMinute() {
return forwardSyncMaxBlocksPerMinute;
}

public int getForwardSyncMaxBlobSidecarsPerMinute() {
return forwardSyncMaxBlobSidecarsPerMinute;
}

public static class Builder {
private Boolean isEnabled;
private Boolean isMultiPeerSyncEnabled = DEFAULT_MULTI_PEER_SYNC_ENABLED;
Expand All @@ -98,6 +113,8 @@ public static class Builder {
private Integer forwardSyncBatchSize = DEFAULT_FORWARD_SYNC_BATCH_SIZE;
private Integer forwardSyncMaxPendingBatches = DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES;
private Integer forwardSyncMaxBlocksPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE;
private Integer forwardSyncMaxBlobSidecarsPerMinute =
DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE;

private Builder() {}

Expand All @@ -111,7 +128,8 @@ public SyncConfig build() {
historicalSyncBatchSize,
forwardSyncBatchSize,
forwardSyncMaxPendingBatches,
forwardSyncMaxBlocksPerMinute);
forwardSyncMaxBlocksPerMinute,
forwardSyncMaxBlobSidecarsPerMinute);
}

private void initMissingDefaults() {
Expand Down Expand Up @@ -163,6 +181,13 @@ public Builder forwardSyncMaxBlocksPerMinute(final Integer forwardSyncMaxBlocksP
return this;
}

public Builder forwardSyncMaxBlobSidecarsPerMinute(
final Integer forwardSyncMaxBlobSidecarsPerMinute) {
checkNotNull(forwardSyncMaxBlobSidecarsPerMinute);
this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute;
return this;
}

public Builder reconstructHistoricStatesEnabled(
final Boolean reconstructHistoricStatesEnabled) {
checkNotNull(reconstructHistoricStatesEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static MultipeerSyncService create(
final int batchSize,
final int maxPendingBatches,
final int maxBlocksPerMinute,
final int maxBlobSidecarsPerMinute,
final Spec spec) {
final EventThread eventThread = new AsyncRunnerEventThread("sync", asyncRunnerFactory);
final SettableLabelledGauge targetChainCountGauge =
Expand Down Expand Up @@ -117,7 +118,8 @@ eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()),
recentChainData.getSpec(),
eventThread,
p2pNetwork,
new SyncSourceFactory(asyncRunner, timeProvider, maxBlocksPerMinute, batchSize),
new SyncSourceFactory(
asyncRunner, timeProvider, batchSize, maxBlocksPerMinute, maxBlobSidecarsPerMinute),
finalizedTargetChains,
nonfinalizedTargetChains);
peerChainTracker.subscribeToTargetChainUpdates(syncController::onTargetChainsUpdated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,33 @@ public class SyncSourceFactory {

private final AsyncRunner asyncRunner;
private final TimeProvider timeProvider;
private final Map<Eth2Peer, SyncSource> syncSourcesByPeer = new HashMap<>();
private final int maxBlocksPerMinute;
private final int batchSize;
private final int maxBlocksPerMinute;
private final int maxBlobSidecarsPerMinute;

private final Map<Eth2Peer, SyncSource> syncSourcesByPeer = new HashMap<>();

public SyncSourceFactory(
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final int batchSize,
final int maxBlocksPerMinute,
final int batchSize) {
final int maxBlobSidecarsPerMinute) {
this.asyncRunner = asyncRunner;
this.timeProvider = timeProvider;
this.maxBlocksPerMinute = maxBlocksPerMinute;
this.batchSize = batchSize;
this.maxBlocksPerMinute = maxBlocksPerMinute;
this.maxBlobSidecarsPerMinute = maxBlobSidecarsPerMinute;
}

public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) {
// Limit request rate to just a little under what we'd accept
// Limit request rates for blocks/blobs to just a little under what we'd accept (see
// Eth2PeerFactory)
final int maxBlocksPerMinute = this.maxBlocksPerMinute - batchSize - 1;
final Optional<Integer> maybeMaxBlobsPerBlock = spec.getMaxBlobsPerBlockForHighestMilestone();
final Optional<Integer> maxBlobSidecarsPerMinute =
maybeMaxBlobsPerBlock.map(maxBlobsPerBlock -> maxBlocksPerMinute * maxBlobsPerBlock);

final Optional<Integer> maybeMaxBlobSidecarsPerMinute =
maybeMaxBlobsPerBlock.map(
maxBlobsPerBlock -> this.maxBlobSidecarsPerMinute - (batchSize * maxBlobsPerBlock) - 1);
return syncSourcesByPeer.computeIfAbsent(
peer,
source ->
Expand All @@ -57,7 +62,7 @@ public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) {
source,
maxBlocksPerMinute,
maybeMaxBlobsPerBlock,
maxBlobSidecarsPerMinute));
maybeMaxBlobSidecarsPerMinute));
}

public void onPeerDisconnected(final Eth2Peer peer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class PeerSyncTest extends AbstractSyncTest {
UInt64.valueOf(SyncConfig.DEFAULT_FORWARD_SYNC_BATCH_SIZE);

private static final Bytes32 PEER_HEAD_BLOCK_ROOT = Bytes32.fromHexString("0x1234");
private static final UInt64 PEER_HEAD_SLOT = UInt64.valueOf(30);
private static final UInt64 PEER_HEAD_SLOT = UInt64.valueOf(20);
private static final UInt64 PEER_FINALIZED_EPOCH = UInt64.valueOf(3);

private final int slotsPerEpoch = spec.getGenesisSpecConfig().getSlotsPerEpoch();
Expand All @@ -73,7 +73,7 @@ public class PeerSyncTest extends AbstractSyncTest {
PEER_HEAD_BLOCK_ROOT,
PEER_HEAD_SLOT));

private final UInt64 denebPeerSlotsAhead = UInt64.valueOf(30);
private final UInt64 denebPeerSlotsAhead = UInt64.valueOf(20);
private final UInt64 denebPeerHeadSlot = denebFirstSlot.plus(denebPeerSlotsAhead);
private final UInt64 denebPeerFinalizedEpoch = spec.computeEpochAtSlot(denebPeerHeadSlot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public Eth2P2PNetwork build() {
eth2RpcOutstandingPingThreshold,
eth2StatusUpdateInterval,
timeProvider,
config.getPeerRateLimit(),
config.getPeerBlocksRateLimit(),
config.getPeerBlobSidecarsRateLimit(),
config.getPeerRequestLimit(),
spec,
kzg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@

public class P2PConfig {

public static final int DEFAULT_PEER_RATE_LIMIT = 500;
public static final int DEFAULT_PEER_BLOCKS_RATE_LIMIT = 500;
// 250 MB per peer per minute (~ 4.16 MB/s)
public static final int DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT = 2000;

public static final int DEFAULT_PEER_REQUEST_LIMIT = 100;

public static final boolean DEFAULT_PEER_ALL_TOPIC_FILTER_ENABLED = true;
public static final int DEFAULT_PEER_REQUEST_LIMIT = 50;
public static final int DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT = 2;
public static final boolean DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED = false;
public static final boolean DEFAULT_GOSSIP_SCORING_ENABLED = true;
Expand All @@ -54,7 +57,8 @@ public class P2PConfig {
private final GossipEncoding gossipEncoding;
private final int targetSubnetSubscriberCount;
private final boolean subscribeAllSubnetsEnabled;
private final int peerRateLimit;
private final int peerBlocksRateLimit;
private final int peerBlobSidecarsRateLimit;
private final int peerRequestLimit;
private final int batchVerifyMaxThreads;
private final int batchVerifyQueueCapacity;
Expand All @@ -71,7 +75,8 @@ private P2PConfig(
final GossipEncoding gossipEncoding,
final int targetSubnetSubscriberCount,
final boolean subscribeAllSubnetsEnabled,
final int peerRateLimit,
final int peerBlocksRateLimit,
final int peerBlobSidecarsRateLimit,
final int peerRequestLimit,
final int batchVerifyMaxThreads,
final int batchVerifyQueueCapacity,
Expand All @@ -86,7 +91,8 @@ private P2PConfig(
this.gossipEncoding = gossipEncoding;
this.targetSubnetSubscriberCount = targetSubnetSubscriberCount;
this.subscribeAllSubnetsEnabled = subscribeAllSubnetsEnabled;
this.peerRateLimit = peerRateLimit;
this.peerBlocksRateLimit = peerBlocksRateLimit;
this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit;
this.peerRequestLimit = peerRequestLimit;
this.batchVerifyMaxThreads = batchVerifyMaxThreads;
this.batchVerifyQueueCapacity = batchVerifyQueueCapacity;
Expand Down Expand Up @@ -129,8 +135,12 @@ public boolean isSubscribeAllSubnetsEnabled() {
return subscribeAllSubnetsEnabled;
}

public int getPeerRateLimit() {
return peerRateLimit;
public int getPeerBlocksRateLimit() {
return peerBlocksRateLimit;
}

public int getPeerBlobSidecarsRateLimit() {
return peerBlobSidecarsRateLimit;
}

public int getPeerRequestLimit() {
Expand Down Expand Up @@ -174,7 +184,8 @@ public static class Builder {
private final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY;
private Integer targetSubnetSubscriberCount = DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT;
private Boolean subscribeAllSubnetsEnabled = DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED;
private Integer peerRateLimit = DEFAULT_PEER_RATE_LIMIT;
private Integer peerBlocksRateLimit = DEFAULT_PEER_BLOCKS_RATE_LIMIT;
private Integer peerBlobSidecarsRateLimit = DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT;
private Integer peerRequestLimit = DEFAULT_PEER_REQUEST_LIMIT;
private int batchVerifyMaxThreads = DEFAULT_BATCH_VERIFY_MAX_THREADS;
private OptionalInt batchVerifyQueueCapacity = OptionalInt.empty();
Expand Down Expand Up @@ -225,7 +236,8 @@ public P2PConfig build() {
gossipEncoding,
targetSubnetSubscriberCount,
subscribeAllSubnetsEnabled,
peerRateLimit,
peerBlocksRateLimit,
peerBlobSidecarsRateLimit,
peerRequestLimit,
batchVerifyMaxThreads,
batchVerifyQueueCapacity.orElse(DEFAULT_BATCH_VERIFY_QUEUE_CAPACITY),
Expand Down Expand Up @@ -277,13 +289,23 @@ public Builder subscribeAllSubnetsEnabled(final Boolean subscribeAllSubnetsEnabl
return this;
}

public Builder peerRateLimit(final Integer peerRateLimit) {
checkNotNull(peerRateLimit);
if (peerRateLimit < 0) {
public Builder peerBlocksRateLimit(final Integer peerBlocksRateLimit) {
checkNotNull(peerBlocksRateLimit);
if (peerBlocksRateLimit < 0) {
throw new InvalidConfigurationException(
String.format("Invalid peerBlocksRateLimit: %d", peerBlocksRateLimit));
}
this.peerBlocksRateLimit = peerBlocksRateLimit;
return this;
}

public Builder peerBlobSidecarsRateLimit(final Integer peerBlobSidecarsRateLimit) {
checkNotNull(peerBlobSidecarsRateLimit);
if (peerBlobSidecarsRateLimit < 0) {
throw new InvalidConfigurationException(
String.format("Invalid peerRateLimit: %d", peerRateLimit));
String.format("Invalid peerBlobSidecarsRateLimit: %d", peerBlobSidecarsRateLimit));
}
this.peerRateLimit = peerRateLimit;
this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class Eth2PeerFactory {
private final CombinedChainDataClient chainDataClient;
private final TimeProvider timeProvider;
private final Optional<Checkpoint> requiredCheckpoint;
private final int peerRateLimit;
private final int peerBlocksRateLimit;
private final int peerBlobSidecarsRateLimit;
private final int peerRequestLimit;
private final KZG kzg;
private final DiscoveryNodeIdExtractor discoveryNodeIdExtractor;
Expand All @@ -48,7 +49,8 @@ public Eth2PeerFactory(
final MetadataMessagesFactory metadataMessagesFactory,
final TimeProvider timeProvider,
final Optional<Checkpoint> requiredCheckpoint,
final int peerRateLimit,
final int peerBlocksRateLimit,
final int peerBlobSidecarsRateLimit,
final int peerRequestLimit,
final KZG kzg,
final DiscoveryNodeIdExtractor discoveryNodeIdExtractor) {
Expand All @@ -59,7 +61,8 @@ public Eth2PeerFactory(
this.statusMessageFactory = statusMessageFactory;
this.metadataMessagesFactory = metadataMessagesFactory;
this.requiredCheckpoint = requiredCheckpoint;
this.peerRateLimit = peerRateLimit;
this.peerBlocksRateLimit = peerBlocksRateLimit;
this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit;
this.peerRequestLimit = peerRequestLimit;
this.kzg = kzg;
this.discoveryNodeIdExtractor = discoveryNodeIdExtractor;
Expand All @@ -74,11 +77,8 @@ public Eth2Peer create(final Peer peer, final BeaconChainMethods rpcMethods) {
statusMessageFactory,
metadataMessagesFactory,
PeerChainValidator.create(spec, metricsSystem, chainDataClient, requiredCheckpoint),
RateTracker.create(peerRateLimit, TIME_OUT, timeProvider),
RateTracker.create(
peerRateLimit * spec.getMaxBlobsPerBlockForHighestMilestone().orElse(1),
TIME_OUT,
timeProvider),
RateTracker.create(peerBlocksRateLimit, TIME_OUT, timeProvider),
RateTracker.create(peerBlobSidecarsRateLimit, TIME_OUT, timeProvider),
RateTracker.create(peerRequestLimit, TIME_OUT, timeProvider),
kzg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public static Eth2PeerManager create(
final int eth2RpcOutstandingPingThreshold,
final Duration eth2StatusUpdateInterval,
final TimeProvider timeProvider,
final int peerRateLimit,
final int peerBlocksRateLimit,
final int peerBlobSidecarsRateLimit,
final int peerRequestLimit,
final Spec spec,
final KZG kzg,
Expand All @@ -140,7 +141,8 @@ public static Eth2PeerManager create(
metadataMessagesFactory,
timeProvider,
requiredCheckpoint,
peerRateLimit,
peerBlocksRateLimit,
peerBlobSidecarsRateLimit,
peerRequestLimit,
kzg,
discoveryNodeIdExtractor),
Expand Down
Loading

0 comments on commit c315305

Please sign in to comment.