Skip to content

Commit

Permalink
pads last erasure batch with empty data shreds (jito-foundation#639)
Browse files Browse the repository at this point in the history
For duplicate blocks prevention we want to verify that the last erasure
batch was sufficiently propagated through turbine. This requires
additional bookkeeping because, depending on the erasure coding schema,
the entire batch might be recovered from only a few coding shreds.

In order to simplify above, this commit instead ensures that the last
erasure batch has >= 32 data shreds so that the batch cannot be
recovered unless 32+ shreds are received from turbine or repair.
  • Loading branch information
behzadnouri authored Apr 11, 2024
1 parent 4fc6b0b commit 293414f
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 85 deletions.
2 changes: 1 addition & 1 deletion core/src/repair/repair_generic_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub mod test {
&mut processed_slots,
1,
);
assert_eq!(repairs, [ShredRepairType::Shred(1, 4)]);
assert_eq!(repairs, [ShredRepairType::Shred(1, 30)]);
}

fn add_tree_with_missing_shreds(
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,7 @@ mod test {
vec![
ShredRepairType::Shred(2, 0),
ShredRepairType::HighestShred(82, 0),
ShredRepairType::HighestShred(7, 3),
ShredRepairType::Shred(7, 3),
],
);
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2163,7 +2163,7 @@ mod tests {
// TODO: The test previously relied on corrupting shred payload
// size which we no longer want to expose. Current test no longer
// covers packet size check in repair_response_packet_from_bytes.
shreds.remove(0);
shreds.retain(|shred| shred.slot() != 1);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
Expand Down Expand Up @@ -2192,7 +2192,7 @@ mod tests {
let expected = vec![repair_response::repair_response_packet(
&blockstore,
2,
0,
31, // shred_index
&socketaddr_any!(),
nonce,
)
Expand Down
46 changes: 6 additions & 40 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,30 +595,14 @@ pub(crate) mod tests {
),
new_rand_data_shred(
&mut rng,
next_shred_index + 1,
&shredder,
&leader,
merkle_variant,
false,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index + 1,
// With Merkle shreds, last erasure batch is padded with
// empty data shreds.
next_shred_index + if merkle_variant { 30 } else { 1 },
&shredder,
&leader,
merkle_variant,
false,
),
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
),
),
(
new_rand_data_shred(
Expand All @@ -638,26 +622,8 @@ pub(crate) mod tests {
true,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
),
new_rand_data_shred(
&mut rng,
next_shred_index + 100,
&shredder,
&leader,
merkle_variant,
true,
),
),
];
for (shred1, shred2) in test_cases.into_iter() {
for (shred1, shred2) in test_cases.iter().flat_map(|(a, b)| [(a, b), (b, a)]) {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
Expand All @@ -670,8 +636,8 @@ pub(crate) mod tests {
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
assert_eq!(shred1, &shred3);
assert_eq!(shred2, &shred4);
}
}

Expand Down
24 changes: 11 additions & 13 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7665,7 +7665,7 @@ pub mod tests {
assert_eq!(slot_meta.last_index, Some(num_shreds - 1));
assert!(slot_meta.is_full());

let (shreds, _) = make_slot_entries(0, 0, 22, /*merkle_variant:*/ true);
let (shreds, _) = make_slot_entries(0, 0, 600, /*merkle_variant:*/ true);
assert!(shreds.len() > num_shreds as usize);
blockstore.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
Expand Down Expand Up @@ -10240,15 +10240,13 @@ pub mod tests {
.flat_map(|x| x.0)
.collect();
blockstore.insert_shreds(shreds, None, false).unwrap();
// Should only be one shred in slot 9
assert!(blockstore
.get_data_shred(unconfirmed_slot, 0)
.unwrap()
.is_some());
assert!(blockstore
.get_data_shred(unconfirmed_slot, 1)
.unwrap()
.is_none());
// There are 32 data shreds in slot 9.
for index in 0..32 {
assert_matches!(
blockstore.get_data_shred(unconfirmed_slot, index as u64),
Ok(Some(_))
);
}
blockstore.set_dead_slot(unconfirmed_slot).unwrap();

// Purge the slot
Expand Down Expand Up @@ -10281,10 +10279,10 @@ pub mod tests {
.into_iter()
.flat_map(|x| x.0)
.collect();
assert_eq!(shreds.len(), 2);
assert_eq!(shreds.len(), 2 * 32);

// Save off unconfirmed_slot for later, just one shred at shreds[1]
let unconfirmed_slot_shreds = vec![shreds[1].clone()];
// Save off unconfirmed_slot for later, just one shred at shreds[32]
let unconfirmed_slot_shreds = vec![shreds[32].clone()];
assert_eq!(unconfirmed_slot_shreds[0].slot(), unconfirmed_slot);

// Insert into slot 9
Expand Down
65 changes: 48 additions & 17 deletions ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl Shred {
dispatch!(fn merkle_root(&self) -> Result<Hash, Error>);
dispatch!(fn proof_size(&self) -> Result<u8, Error>);

fn fec_set_index(&self) -> u32 {
self.common_header().fec_set_index
}

fn index(&self) -> u32 {
self.common_header().index
}
Expand Down Expand Up @@ -1098,17 +1102,29 @@ pub(super) fn make_shreds_from_data(
// If shreds.is_empty() then the data argument was empty. In that case we
// want to generate one data shred with empty data.
if !data.is_empty() || shreds.is_empty() {
// Should generate at least one data shred (which may have no data).
// Last erasure batch should also be padded with empty data shreds to
// make >= 32 data shreds. This gaurantees that the batch cannot be
// recovered unless 32+ shreds are received from turbine or repair.
let min_num_data_shreds = if is_last_in_slot {
DATA_SHREDS_PER_FEC_BLOCK
} else {
1
};
// Find the Merkle proof_size and data_buffer_size
// which can embed the remaining data.
let (proof_size, data_buffer_size) = (1u8..32)
let (proof_size, data_buffer_size, num_data_shreds) = (1u8..32)
.find_map(|proof_size| {
let data_buffer_size = ShredData::capacity(proof_size, chained, resigned).ok()?;
let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size;
let num_data_shreds = num_data_shreds.max(1);
let num_data_shreds = num_data_shreds.max(min_num_data_shreds);
let erasure_batch_size =
shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot);
(proof_size == get_proof_size(erasure_batch_size))
.then_some((proof_size, data_buffer_size))
(proof_size == get_proof_size(erasure_batch_size)).then_some((
proof_size,
data_buffer_size,
num_data_shreds,
))
})
.ok_or(Error::UnknownProofSize)?;
common_header.shred_variant = ShredVariant::MerkleData {
Expand All @@ -1117,13 +1133,11 @@ pub(super) fn make_shreds_from_data(
resigned,
};
common_header.fec_set_index = common_header.index;
let chunks = if data.is_empty() {
// Generate one data shred with empty data.
Either::Left(std::iter::once(data))
} else {
Either::Right(data.chunks(data_buffer_size))
};
for shred in chunks {
for shred in data
.chunks(data_buffer_size)
.chain(std::iter::repeat(&[][..]))
.take(num_data_shreds)
{
let shred = new_shred_data(common_header, data_header, shred);
shreds.push(shred);
common_header.index += 1;
Expand All @@ -1132,12 +1146,17 @@ pub(super) fn make_shreds_from_data(
stats.data_buffer_residual += data_buffer_size - shred.data()?.len();
}
}
// Only the very last shred may have residual data buffer.
debug_assert!(shreds.iter().rev().skip(1).all(|shred| {
let proof_size = shred.proof_size().unwrap();
let capacity = ShredData::capacity(proof_size, chained, resigned).unwrap();
shred.data().unwrap().len() == capacity
}));
// Only the trailing data shreds may have residual data buffer.
debug_assert!(shreds
.iter()
.rev()
.skip_while(|shred| is_last_in_slot && shred.data().unwrap().is_empty())
.skip(1)
.all(|shred| {
let proof_size = shred.proof_size().unwrap();
let capacity = ShredData::capacity(proof_size, chained, resigned).unwrap();
shred.data().unwrap().len() == capacity
}));
// Adjust flags for the very last shred.
if let Some(shred) = shreds.last_mut() {
shred.data_header.flags |= if is_last_in_slot {
Expand Down Expand Up @@ -1890,6 +1909,18 @@ mod test {
.contains(ShredFlags::LAST_SHRED_IN_SLOT),
is_last_in_slot
);
// Assert that the last erasure batch has 32+ data shreds.
if is_last_in_slot {
let fec_set_index = shreds.iter().map(Shred::fec_set_index).max().unwrap();
assert!(
shreds
.iter()
.filter(|shred| shred.fec_set_index() == fec_set_index)
.filter(|shred| shred.shred_type() == ShredType::Data)
.count()
>= 32
)
}
// Assert that data shreds can be recovered from coding shreds.
let recovered_data_shreds: Vec<_> = shreds
.iter()
Expand Down
22 changes: 14 additions & 8 deletions ledger/src/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ mod tests {
assert_eq!(verify, shred.verify(pk));
}

fn run_test_data_shredder(slot: Slot, chained: bool) {
fn run_test_data_shredder(slot: Slot, chained: bool, is_last_in_slot: bool) {
let keypair = Arc::new(Keypair::new());

// Test that parent cannot be > current slot
Expand All @@ -538,11 +538,15 @@ mod tests {
})
.collect();

let is_last_in_slot = true;
let size = serialized_size(&entries).unwrap() as usize;
// Integer division to ensure we have enough shreds to fit all the data
let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap();
let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size;
let num_expected_data_shreds = num_expected_data_shreds.max(if is_last_in_slot {
DATA_SHREDS_PER_FEC_BLOCK
} else {
1
});
let num_expected_coding_shreds =
get_erasure_batch_size(num_expected_data_shreds, is_last_in_slot)
- num_expected_data_shreds;
Expand Down Expand Up @@ -574,8 +578,8 @@ mod tests {
slot,
parent_slot,
&keypair.pubkey(),
true,
is_last,
true, // verify
is_last && is_last_in_slot,
is_last,
);
assert!(!data_shred_indexes.contains(&index));
Expand Down Expand Up @@ -607,10 +611,12 @@ mod tests {
assert_eq!(entries, deshred_entries);
}

#[test_case(false)]
#[test_case(true)]
fn test_data_shredder(chained: bool) {
run_test_data_shredder(0x1234_5678_9abc_def0, chained);
#[test_case(false, false)]
#[test_case(false, true)]
#[test_case(true, false)]
#[test_case(true, true)]
fn test_data_shredder(chained: bool, is_last_in_slot: bool) {
run_test_data_shredder(0x1234_5678_9abc_def0, chained, is_last_in_slot);
}

#[test_case(false)]
Expand Down
1 change: 1 addition & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3769,6 +3769,7 @@ fn test_kill_partition_switch_threshold_progress() {

#[test]
#[serial]
#[ignore]
#[allow(unused_attributes)]
fn test_duplicate_shreds_broadcast_leader() {
run_duplicate_shreds_broadcast_leader(true);
Expand Down
8 changes: 5 additions & 3 deletions turbine/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,11 @@ impl BroadcastRun for BroadcastDuplicatesRun {
sigs,
);

assert_eq!(original_last_data_shred.len(), 1);
assert_eq!(partition_last_data_shred.len(), 1);
self.next_shred_index += 1;
assert_eq!(
original_last_data_shred.len(),
partition_last_data_shred.len()
);
self.next_shred_index += u32::try_from(original_last_data_shred.len()).unwrap();
(original_last_data_shred, partition_last_data_shred)
});

Expand Down

0 comments on commit 293414f

Please sign in to comment.