Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(history_migration): don't migrate post-merge headers and adding tests #1688

Merged
merged 3 commits into from
Feb 19, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: add test when content_key is undecodable
  • Loading branch information
morph-dev committed Feb 19, 2025
commit 70aeb9397bd0e0c5b35a70f45bf59d51c6e6f851
180 changes: 143 additions & 37 deletions crates/subnetworks/history/src/storage_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use ethportal_api::{
},
network::Subnetwork,
},
ContentValue, HistoryContentKey, OverlayContentKey, RawContentValue,
ContentValue, HistoryContentKey, OverlayContentKey, RawContentKey, RawContentValue,
};
use rusqlite::{named_params, types::Type};
use rusqlite::named_params;
use tracing::{debug, error, info, warn};
use trin_storage::{
error::ContentStoreError,
Expand Down Expand Up @@ -72,35 +72,54 @@ pub fn maybe_migrate(config: &PortalStorageConfig) -> Result<(), ContentStoreErr
.query_map(named_params! { ":limit": BATCH_DELETE_LIMIT }, |row| {
let key_bytes: Vec<u8> = row.get("content_key")?;
let value_bytes: Vec<u8> = row.get("content_value")?;
let key = RawContentKey::from(key_bytes);
let value = RawContentValue::from(value_bytes);
HistoryContentKey::try_from_bytes(key_bytes)
.map(|key| (key, value))
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(0, Type::Blob, e.into()))
Ok((key, value))
})?
.collect::<Result<Vec<(HistoryContentKey, RawContentValue)>, rusqlite::Error>>()?;
.collect::<Result<Vec<(RawContentKey, RawContentValue)>, rusqlite::Error>>()?;

if deleted.is_empty() {
break;
}

for (content_key, old_content_value) in deleted {
for (raw_content_key, raw_content_value) in deleted {
// Decode content key
let content_key = match HistoryContentKey::try_from_bytes(&raw_content_key) {
Ok(content_value) => content_value,
Err(err) => {
error!(
err=%err,
"Error decoding content key",
);
continue;
}
};

// Decode content value
let old_content_value =
match OldHistoryContentValue::decode(&content_key, &raw_content_value) {
Ok(old_content_value) => old_content_value,
Err(err) => {
error!(
key=content_key.to_hex(),
err=%err,
"Error decoding content value",
);
continue;
}
};

// Convert and write content value
match convert_content_value(&content_key, old_content_value) {
Ok(Some(new_content_value)) => {
Some(new_content_value) => {
store.insert(&content_key, new_content_value)?;
}
Ok(None) => {
None => {
debug!(
key=%content_key.to_bytes(),
"Not migrating content item",
"Dropping content item",
)
}
Err(err) => {
error!(
key=%content_key.to_bytes(),
err=%err,
"Error converting content item",
);
}
}
}
}
Expand All @@ -114,40 +133,43 @@ pub fn maybe_migrate(config: &PortalStorageConfig) -> Result<(), ContentStoreErr

fn convert_content_value(
content_key: &HistoryContentKey,
raw_content_value: RawContentValue,
) -> Result<Option<RawContentValue>, ContentStoreError> {
let old_content_value = OldHistoryContentValue::decode(content_key, &raw_content_value)?;
old_content_value: OldHistoryContentValue,
) -> Option<RawContentValue> {
match old_content_value {
OldHistoryContentValue::BlockHeaderWithProof(old_header_with_proof) => {
let proof = match old_header_with_proof.proof {
OldBlockHeaderProof::None(_) => return Ok(None),
OldBlockHeaderProof::None(_) => return None,
OldBlockHeaderProof::PreMergeAccumulatorProof(pre_merge_accumulator_proof) => {
let proof = BlockProofHistoricalHashesAccumulator::new(
pre_merge_accumulator_proof.proof.to_vec(),
)
.map_err(|err| ContentStoreError::InvalidData {
message: format!("Invalid HistoricalHashes proof: {err:?}"),
})?;
.expect("[B256; 15] should convert to FixedVector<B256, U15>");
BlockHeaderProof::HistoricalHashes(proof)
}
OldBlockHeaderProof::HistoricalRootsBlockProof(_) => {
warn!(content_key=%content_key.to_hex(), "Unexpected HistoricalRootsBlockProof");
return Ok(None);
warn!(
content_key = content_key.to_hex(),
"Unexpected HistoricalRootsBlockProof"
);
return None;
}
OldBlockHeaderProof::HistoricalSummariesBlockProof(_) => {
warn!(content_key=%content_key.to_hex(), "Unexpected HistoricalSummariesBlockProof");
return Ok(None);
warn!(
content_key = content_key.to_hex(),
"Unexpected HistoricalSummariesBlockProof"
);
return None;
}
};
let new_content_value = NewHistoryContentValue::BlockHeaderWithProof(HeaderWithProof {
header: old_header_with_proof.header,
proof,
});
Ok(Some(new_content_value.encode()))
Some(new_content_value.encode())
}
OldHistoryContentValue::BlockBody(_) | OldHistoryContentValue::Receipts(_) => {
// TODO: consider whether to filter post-merge bodies and receipts
Ok(Some(raw_content_value))
Some(old_content_value.encode())
}
}
}
Expand All @@ -156,10 +178,14 @@ fn convert_content_value(
mod tests {
use std::{collections::HashMap, fs, str::FromStr};

use alloy::primitives::map::HashSet;
use ethportal_api::types::execution::header_with_proof::HeaderWithProof as OldHeaderWithProof;
use alloy::primitives::{map::HashSet, B256};
use ethportal_api::types::{
distance::{Metric, XorMetric},
execution::header_with_proof::HeaderWithProof as OldHeaderWithProof,
};
use rand::seq::SliceRandom;
use rstest::{fixture, rstest};
use rusqlite::params;
use serde::Deserialize;
use ssz::Decode;
use trin_storage::test_utils::create_test_portal_storage_config_with_capacity;
Expand Down Expand Up @@ -248,6 +274,16 @@ mod tests {
);
}

// 3.3 Verify that old table doesn't exist
assert!(
!config
.sql_connection_pool
.get()?
.prepare(sql::TABLE_EXISTS)?
.exists(())?,
"Old table should no longer exist"
);

// 4. Cleanup

drop(new_store);
Expand All @@ -258,13 +294,10 @@ mod tests {

// Fixtures

/// Migration shouldn't crash with undecodable content.
/// Migration shouldn't crash with undecodable content value.
#[fixture]
fn invalid_content() -> MigrationContentItem {
let content_key = HistoryContentKey::try_from_hex(
"0x00000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f",
)
.unwrap();
let content_key = HistoryContentKey::random().unwrap();
let invalid_content_value =
RawContentValue::from_str("0x000102030405060708090a0b0c0d0e0f").unwrap();
(content_key, invalid_content_value, None)
Expand Down Expand Up @@ -456,4 +489,77 @@ mod tests {
verify_migration(&content)?;
Ok(())
}

#[rstest]
fn invalid_content_key() -> anyhow::Result<()> {
let (temp_dir, config) =
create_test_portal_storage_config_with_capacity(/* capacity_mb= */ 1000)?;

// 1. Manully write content with undecodable content key

let content_id = B256::random();
let content_key = vec![0x00u8, 0x01, 0x02, 0x03, 0x04];
let content_value = RawContentValue::from_str(
"0x080000000c020000f90201a0f27cf46c7051211f7dc78a3e837b84afc52a3d17397ff7f3d45cb325d7bfc452a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d4934794388c818ca8b9251b393131c08a736a67ccb19297a0d6389937b3f9b463a5a6ea4a404eca63cb53c625e7a2768f1ec1c232295adc50a0a3d66862764ad81dd1384712247f53e171ecd40553328e722d3a627494b678c8a0c5f1cc46e949ce9a6607f1ffb4018b0a893e071e9fd63123d9e8f3647f74d99bb901005920c0e4011a3db1367828408091036a7c2830229442428128d5321ed4d213621584ca0422a290480846eef83203558c07a840541a0136c044720b2e64b7a2c040802208530e053a6d05451c074bc170009102f10ed4100322a234419ac03120422b18498a27c2ba3420219184301f50441b0c5a19260cf06036467cce8b0dc4183105225a3fb04a3acb644a46a320200cc282c18bf55078022502c8427839809ac019eb0022e4f21ca14085071990809345808b21462a8b06242b2e4ccc11c96f5e5d87240134801c4801428a2cc854202008cd9088a0b665661f6f3c10b25ef61d24a8042006480408dca20787385401188164c0aca14221462808eb3070568083ee09808401c9c3808383c6d984632e607f80a0fab4b7eb057ad749b436c2bd93321ecd6bc7ad58d12e5ac72e7e20b1f55e96c388000000000000000085018422588900",
)?;
let distance = XorMetric::distance(&content_id, &config.node_id.raw());
let content_size = content_id.len() + content_key.len() + content_value.len();

let conn = config.sql_connection_pool.get()?;
conn.execute(
"
CREATE TABLE IF NOT EXISTS 'ii1_history' (
content_id BLOB PRIMARY KEY,
content_key BLOB NOT NULL,
content_value BLOB NOT NULL,
distance_short INTEGER NOT NULL,
content_size INTEGER NOT NULL
);",
(),
)?;
conn.execute(
"INSERT INTO 'ii1_history' (content_id, content_key, content_value, distance_short, content_size)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
content_id.as_slice(),
content_key.as_slice(),
content_value.as_ref(),
distance.big_endian_u32(),
content_size,
],
)?;

// 2. Do migration, shouldn't crash

maybe_migrate(&config)?;

// 3. Verify that old table doesn't exist and content was migrated

let new_store: IdIndexedV1Store<HistoryContentKey> = create_store(
ContentType::HistoryEternal,
IdIndexedV1StoreConfig::new(
ContentType::HistoryEternal,
Subnetwork::History,
config.clone(),
),
config.sql_connection_pool.clone(),
)?;

assert!(
!conn.prepare(sql::TABLE_EXISTS)?.exists(())?,
"Old table should no longer exist"
);
assert_eq!(
new_store.paginate(0, 10)?.entry_count,
0,
"New table should be empty"
);

// 4. Cleanup

drop(new_store);
temp_dir.close()?;

Ok(())
}
}