Skip to content

Commit

Permalink
Added a timeout to state regeneration (Consensys#8097)
Browse files Browse the repository at this point in the history
Added a development flag to allow us to set state regeneration timeout, with a 120 second timeout by default, and a tiny bit of sanity around the flag not being less than 1.

Signed-off-by: Paul Harris <[email protected]>
  • Loading branch information
rolfyone authored Apr 11, 2024
1 parent 91b4e3d commit 514539b
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ protected SafeFuture<?> doStart() {
}
final EventChannels eventChannels = serviceConfig.getEventChannels();
chainStorage =
ChainStorage.create(database, config.getSpec(), config.getDataStorageMode());
ChainStorage.create(
database,
config.getSpec(),
config.getDataStorageMode(),
config.getStateRebuildTimeoutSeconds());
final DepositStorage depositStorage =
DepositStorage.create(
eventChannels.getPublisher(Eth1EventsChannel.class),
Expand Down
2 changes: 2 additions & 0 deletions storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
testImplementation testFixtures(project(':infrastructure:metrics'))
testImplementation project(':ethereum:networks')
testImplementation testFixtures(project(':ethereum:spec'))
testImplementation testFixtures(project(':infrastructure:logging'))
testImplementation testFixtures(project(':infrastructure:async'))
testImplementation testFixtures(project(':infrastructure:time'))
testImplementation testFixtures(project(':storage'))
Expand Down Expand Up @@ -67,6 +68,7 @@ dependencies {
testFixturesImplementation 'org.hyperledger.besu.internal:metrics-core'
testFixturesImplementation 'org.hyperledger.besu:plugin-api'


jmhImplementation testFixtures(project(':storage'))
jmhImplementation testFixtures(project(':ethereum:spec'))
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,15 @@ private ChainStorage(
}

public static ChainStorage create(
final Database database, final Spec spec, final StateStorageMode dataStorageMode) {
final Database database,
final Spec spec,
final StateStorageMode dataStorageMode,
int stateRebuildTimeoutSeconds) {
final int finalizedStateCacheSize = spec.getSlotsPerEpoch(SpecConfig.GENESIS_EPOCH) * 3;
return new ChainStorage(
database,
new FinalizedStateCache(spec, database, finalizedStateCacheSize, true),
new FinalizedStateCache(
spec, database, finalizedStateCacheSize, true, stateRebuildTimeoutSeconds),
dataStorageMode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class StorageConfiguration {

public static final boolean DEFAULT_STORE_NON_CANONICAL_BLOCKS_ENABLED = false;

public static final int DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS = 120;
public static final long DEFAULT_STORAGE_FREQUENCY = 2048L;
public static final int DEFAULT_MAX_KNOWN_NODE_CACHE_SIZE = 100_000;
public static final Duration DEFAULT_BLOCK_PRUNING_INTERVAL = Duration.ofMinutes(15);
Expand All @@ -59,6 +59,8 @@ public class StorageConfiguration {
private final Duration blobsPruningInterval;
private final int blobsPruningLimit;

private final int stateRebuildTimeoutSeconds;

private StorageConfiguration(
final Eth1Address eth1DepositContract,
final StateStorageMode dataStorageMode,
Expand All @@ -70,6 +72,7 @@ private StorageConfiguration(
final int blockPruningLimit,
final Duration blobsPruningInterval,
final int blobsPruningLimit,
int stateRebuildTimeoutSeconds,
final Spec spec) {
this.eth1DepositContract = eth1DepositContract;
this.dataStorageMode = dataStorageMode;
Expand All @@ -81,6 +84,7 @@ private StorageConfiguration(
this.blockPruningLimit = blockPruningLimit;
this.blobsPruningInterval = blobsPruningInterval;
this.blobsPruningLimit = blobsPruningLimit;
this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds;
this.spec = spec;
}

Expand All @@ -96,6 +100,10 @@ public StateStorageMode getDataStorageMode() {
return dataStorageMode;
}

public int getStateRebuildTimeoutSeconds() {
return stateRebuildTimeoutSeconds;
}

public long getDataStorageFrequency() {
return dataStorageFrequency;
}
Expand Down Expand Up @@ -146,6 +154,7 @@ public static final class Builder {
private int blockPruningLimit = DEFAULT_BLOCK_PRUNING_LIMIT;
private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL;
private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT;
private int stateRebuildTimeoutSeconds = DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS;

private Builder() {}

Expand Down Expand Up @@ -251,6 +260,7 @@ public StorageConfiguration build() {
blockPruningLimit,
blobsPruningInterval,
blobsPruningLimit,
stateRebuildTimeoutSeconds,
spec);
}

Expand Down Expand Up @@ -285,6 +295,17 @@ private Optional<StateStorageMode> getStorageModeFromPersistedDatabase(
throw new UncheckedIOException("Failed to read storage mode from file", ex);
}
}

public Builder stateRebuildTimeoutSeconds(int stateRebuildTimeoutSeconds) {
if (stateRebuildTimeoutSeconds < 10 || stateRebuildTimeoutSeconds > 300) {
LOG.warn(
"State rebuild timeout is set outside of sensible defaults of 10 -> 300, {} was defined. Cannot be below 1, will allow the value to exceed 300.",
stateRebuildTimeoutSeconds);
}
this.stateRebuildTimeoutSeconds = Math.max(stateRebuildTimeoutSeconds, 1);
LOG.debug("stateRebuildTimeoutSeconds = {}", stateRebuildTimeoutSeconds);
return this;
}
}

static StateStorageMode determineStorageDefault(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,24 @@

package tech.pegasys.teku.storage.server.state;

import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE;

import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;
import tech.pegasys.teku.dataproviders.generators.StreamingStateRegenerator;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.storage.server.Database;

public class FinalizedStateCache {

private static final long MAX_REGENERATE_LOTS = 10_000L;

/**
* Note this is a best effort basis to track what states are cached. Slots are added here slightly
* before the stateCache is actually updated and removed slightly after they are evicted from the
Expand All @@ -42,24 +39,40 @@ public class FinalizedStateCache {
private final NavigableSet<UInt64> availableSlots = new ConcurrentSkipListSet<>();

private final LoadingCache<UInt64, BeaconState> stateCache;
private final Spec spec;
private final Database database;

public FinalizedStateCache(
final Spec spec,
final Database database,
final int maximumCacheSize,
final boolean useSoftReferences) {
this.spec = spec;
this.database = database;
final boolean useSoftReferences,
final int stateRebuildTimeoutSeconds) {
this(
spec,
database,
maximumCacheSize,
useSoftReferences,
stateRebuildTimeoutSeconds,
MAX_REGENERATE_LOTS);
}

FinalizedStateCache(
final Spec spec,
final Database database,
final int maximumCacheSize,
final boolean useSoftReferences,
int stateRebuildTimeoutSeconds,
final long maxRegenerateSlots) {
final CacheBuilder<UInt64, BeaconState> cacheBuilder =
CacheBuilder.newBuilder()
.maximumSize(maximumCacheSize)
.removalListener(this::onRemovedFromCache);
if (useSoftReferences) {
cacheBuilder.softValues();
}
this.stateCache = cacheBuilder.build(new StateCacheLoader());
this.stateCache =
cacheBuilder.build(
new StateCacheLoader(
spec, database, stateRebuildTimeoutSeconds, maxRegenerateSlots, this));
}

private void onRemovedFromCache(
Expand All @@ -80,46 +93,17 @@ public Optional<BeaconState> getFinalizedState(final UInt64 slot) {
}
}

private Optional<BeaconState> getLatestStateFromCache(final UInt64 slot) {
Optional<BeaconState> getLatestStateFromCache(final UInt64 slot) {
return Optional.ofNullable(availableSlots.floor(slot)).map(stateCache::getIfPresent);
}

private class StateCacheLoader extends CacheLoader<UInt64, BeaconState> {

@Override
public BeaconState load(final UInt64 key) {
return regenerateState(key).orElseThrow(StateUnavailableException::new);
}

private Optional<BeaconState> regenerateState(final UInt64 slot) {
return database
.getLatestAvailableFinalizedState(slot)
.map(state -> regenerateState(slot, state));
}

private BeaconState regenerateState(final UInt64 slot, final BeaconState stateFromDisk) {
final Optional<BeaconState> latestStateFromCache = getLatestStateFromCache(slot);
final BeaconState preState =
latestStateFromCache
.filter(
stateFromCache ->
stateFromCache.getSlot().compareTo(stateFromDisk.getSlot()) >= 0)
.orElse(stateFromDisk);
if (preState.getSlot().equals(slot)) {
return preState;
}
try (final Stream<SignedBeaconBlock> blocks =
database.streamFinalizedBlocks(preState.getSlot().plus(ONE), slot)) {
final BeaconState state = StreamingStateRegenerator.regenerate(spec, preState, blocks);
availableSlots.add(state.getSlot());
return state;
}
}
NavigableSet<UInt64> getAvailableSlots() {
return availableSlots;
}

/**
* Cache doesn't allow returning null but we may not be able to regenerate a state so throw this
* exception and catch it in {@link #getFinalizedState(UInt64)}
*/
private static class StateUnavailableException extends RuntimeException {}
static class StateUnavailableException extends RuntimeException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server.state;

import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE;

import com.google.common.cache.CacheLoader;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.dataproviders.generators.StreamingStateRegenerator;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.storage.server.Database;

class StateCacheLoader extends CacheLoader<UInt64, BeaconState> {
private static final Logger LOG = LogManager.getLogger();
private final int stateRebuildTimeoutSeconds;
private final Database database;
private final long maxRegenerateSlots;
private final FinalizedStateCache finalizedStateCache;
private final Spec spec;

StateCacheLoader(
final Spec spec,
final Database database,
final int stateRebuildTimeoutSeconds,
final long maxRegenerateSlots,
final FinalizedStateCache finalizedStateCache) {
this.database = database;
this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds;
this.maxRegenerateSlots = maxRegenerateSlots;
this.finalizedStateCache = finalizedStateCache;
this.spec = spec;
}

@Override
public BeaconState load(final UInt64 key) {
return regenerateState(key).orElseThrow(FinalizedStateCache.StateUnavailableException::new);
}

private Optional<BeaconState> regenerateState(final UInt64 slot) {
final Optional<BeaconState> maybeState = database.getLatestAvailableFinalizedState(slot);
if (maybeState.isEmpty()) {
return Optional.empty();
}
final BeaconState state = maybeState.get();
try {
return Optional.of(
regenerateStateWithinReasonableTime(slot, state)
.get(stateRebuildTimeoutSeconds, TimeUnit.SECONDS));
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Failed to regenerate state for slot {}", slot, e);
return Optional.empty();
} catch (TimeoutException e) {
LOG.error(
"Timed out trying to regenerate state at slot {} starting from slot {} within {} seconds",
slot,
state.getSlot(),
stateRebuildTimeoutSeconds);
return Optional.empty();
}
}

private SafeFuture<BeaconState> regenerateStateWithinReasonableTime(
final UInt64 slot, final BeaconState stateFromDisk) {
final Optional<BeaconState> latestStateFromCache =
finalizedStateCache.getLatestStateFromCache(slot);
final BeaconState preState =
latestStateFromCache
.filter(
stateFromCache -> stateFromCache.getSlot().compareTo(stateFromDisk.getSlot()) >= 0)
.orElse(stateFromDisk);
if (preState.getSlot().equals(slot)) {
return SafeFuture.completedFuture(preState);
}
final long regenerateSlotCount = slot.minusMinZero(stateFromDisk.getSlot()).longValue();
LOG.trace("Slots to regenerate state from: {}", regenerateSlotCount);
if (regenerateSlotCount > maxRegenerateSlots) {
LOG.error(
"Refusing to regenerate a state that is {} slots from what we have stored",
regenerateSlotCount);
return SafeFuture.failedFuture(new FinalizedStateCache.StateUnavailableException());
}
try (final Stream<SignedBeaconBlock> blocks =
database.streamFinalizedBlocks(preState.getSlot().plus(ONE), slot)) {
final BeaconState state = StreamingStateRegenerator.regenerate(spec, preState, blocks);
finalizedStateCache.getAvailableSlots().add(state.getSlot());
return SafeFuture.completedFuture(state);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ private StorageSystem createStorageSystem(
.dataDir(tempDir.toPath())
.version(DatabaseVersion.LEVELDB2)
.storageMode(storageMode)
.stateRebuildTimeoutSeconds(12)
.stateStorageFrequency(1L)
.storeConfig(storeConfig)
.storeNonCanonicalBlocks(storeNonCanonicalBlocks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class FinalizedStateCacheTest {
private final Database database = mock(Database.class);
// We don't use soft references in unit tests to avoid intermittency
private final FinalizedStateCache cache =
new FinalizedStateCache(spec, database, MAXIMUM_CACHE_SIZE, false);
new FinalizedStateCache(spec, database, MAXIMUM_CACHE_SIZE, false, 120);

@BeforeEach
public void setUp() {
Expand Down
Loading

0 comments on commit 514539b

Please sign in to comment.