diff --git a/core/src/repair/repair_generic_traversal.rs b/core/src/repair/repair_generic_traversal.rs index c4b5736200..3e704149cb 100644 --- a/core/src/repair/repair_generic_traversal.rs +++ b/core/src/repair/repair_generic_traversal.rs @@ -270,7 +270,7 @@ pub mod test { &mut processed_slots, 1, ); - assert_eq!(repairs, [ShredRepairType::Shred(1, 4)]); + assert_eq!(repairs, [ShredRepairType::Shred(1, 30)]); } fn add_tree_with_missing_shreds( diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 9a293f6a8b..c98360401f 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -1665,7 +1665,7 @@ mod test { vec![ ShredRepairType::Shred(2, 0), ShredRepairType::HighestShred(82, 0), - ShredRepairType::HighestShred(7, 3), + ShredRepairType::Shred(7, 3), ], ); } diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index a4c676bf76..67163a45e0 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -2163,7 +2163,7 @@ mod tests { // TODO: The test previously relied on corrupting shred payload // size which we no longer want to expose. Current test no longer // covers packet size check in repair_response_packet_from_bytes. - shreds.remove(0); + shreds.retain(|shred| shred.slot() != 1); blockstore .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); @@ -2192,7 +2192,7 @@ mod tests { let expected = vec![repair_response::repair_response_packet( &blockstore, 2, - 0, + 31, // shred_index &socketaddr_any!(), nonce, ) diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 84c50ea602..f7ac0153a5 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -595,30 +595,14 @@ pub(crate) mod tests { ), new_rand_data_shred( &mut rng, - next_shred_index + 1, - &shredder, - &leader, - merkle_variant, - false, - ), - ), - ( - new_rand_data_shred( - &mut rng, - next_shred_index + 1, + // With Merkle shreds, last erasure batch is padded with + // empty data shreds. + next_shred_index + if merkle_variant { 30 } else { 1 }, &shredder, &leader, merkle_variant, false, ), - new_rand_data_shred( - &mut rng, - next_shred_index, - &shredder, - &leader, - merkle_variant, - true, - ), ), ( new_rand_data_shred( @@ -638,26 +622,8 @@ pub(crate) mod tests { true, ), ), - ( - new_rand_data_shred( - &mut rng, - next_shred_index, - &shredder, - &leader, - merkle_variant, - true, - ), - new_rand_data_shred( - &mut rng, - next_shred_index + 100, - &shredder, - &leader, - merkle_variant, - true, - ), - ), ]; - for (shred1, shred2) in test_cases.into_iter() { + for (shred1, shred2) in test_cases.iter().flat_map(|(a, b)| [(a, b), (b, a)]) { let chunks: Vec<_> = from_shred( shred1.clone(), Pubkey::new_unique(), // self_pubkey @@ -670,8 +636,8 @@ pub(crate) mod tests { .collect(); assert!(chunks.len() > 4); let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap(); - assert_eq!(shred1, shred3); - assert_eq!(shred2, shred4); + assert_eq!(shred1, &shred3); + assert_eq!(shred2, &shred4); } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 06a3ef6d94..d96a3f8148 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -7665,7 +7665,7 @@ pub mod tests { assert_eq!(slot_meta.last_index, Some(num_shreds - 1)); assert!(slot_meta.is_full()); - let (shreds, _) = make_slot_entries(0, 0, 22, /*merkle_variant:*/ true); + let (shreds, _) = make_slot_entries(0, 0, 600, /*merkle_variant:*/ true); assert!(shreds.len() > num_shreds as usize); blockstore.insert_shreds(shreds, None, false).unwrap(); let slot_meta = blockstore.meta(0).unwrap().unwrap(); @@ -10240,15 +10240,13 @@ pub mod tests { .flat_map(|x| x.0) .collect(); blockstore.insert_shreds(shreds, None, false).unwrap(); - // Should only be one shred in slot 9 - assert!(blockstore - .get_data_shred(unconfirmed_slot, 0) - .unwrap() - .is_some()); - assert!(blockstore - .get_data_shred(unconfirmed_slot, 1) - .unwrap() - .is_none()); + // There are 32 data shreds in slot 9. + for index in 0..32 { + assert_matches!( + blockstore.get_data_shred(unconfirmed_slot, index as u64), + Ok(Some(_)) + ); + } blockstore.set_dead_slot(unconfirmed_slot).unwrap(); // Purge the slot @@ -10281,10 +10279,10 @@ pub mod tests { .into_iter() .flat_map(|x| x.0) .collect(); - assert_eq!(shreds.len(), 2); + assert_eq!(shreds.len(), 2 * 32); - // Save off unconfirmed_slot for later, just one shred at shreds[1] - let unconfirmed_slot_shreds = vec![shreds[1].clone()]; + // Save off unconfirmed_slot for later, just one shred at shreds[32] + let unconfirmed_slot_shreds = vec![shreds[32].clone()]; assert_eq!(unconfirmed_slot_shreds[0].slot(), unconfirmed_slot); // Insert into slot 9 diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index a7cc134824..a6ffb9757e 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -127,6 +127,10 @@ impl Shred { dispatch!(fn merkle_root(&self) -> Result); dispatch!(fn proof_size(&self) -> Result); + fn fec_set_index(&self) -> u32 { + self.common_header().fec_set_index + } + fn index(&self) -> u32 { self.common_header().index } @@ -1098,17 +1102,29 @@ pub(super) fn make_shreds_from_data( // If shreds.is_empty() then the data argument was empty. In that case we // want to generate one data shred with empty data. if !data.is_empty() || shreds.is_empty() { + // Should generate at least one data shred (which may have no data). + // Last erasure batch should also be padded with empty data shreds to + // make >= 32 data shreds. This gaurantees that the batch cannot be + // recovered unless 32+ shreds are received from turbine or repair. + let min_num_data_shreds = if is_last_in_slot { + DATA_SHREDS_PER_FEC_BLOCK + } else { + 1 + }; // Find the Merkle proof_size and data_buffer_size // which can embed the remaining data. - let (proof_size, data_buffer_size) = (1u8..32) + let (proof_size, data_buffer_size, num_data_shreds) = (1u8..32) .find_map(|proof_size| { let data_buffer_size = ShredData::capacity(proof_size, chained, resigned).ok()?; let num_data_shreds = (data.len() + data_buffer_size - 1) / data_buffer_size; - let num_data_shreds = num_data_shreds.max(1); + let num_data_shreds = num_data_shreds.max(min_num_data_shreds); let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds, is_last_in_slot); - (proof_size == get_proof_size(erasure_batch_size)) - .then_some((proof_size, data_buffer_size)) + (proof_size == get_proof_size(erasure_batch_size)).then_some(( + proof_size, + data_buffer_size, + num_data_shreds, + )) }) .ok_or(Error::UnknownProofSize)?; common_header.shred_variant = ShredVariant::MerkleData { @@ -1117,13 +1133,11 @@ pub(super) fn make_shreds_from_data( resigned, }; common_header.fec_set_index = common_header.index; - let chunks = if data.is_empty() { - // Generate one data shred with empty data. - Either::Left(std::iter::once(data)) - } else { - Either::Right(data.chunks(data_buffer_size)) - }; - for shred in chunks { + for shred in data + .chunks(data_buffer_size) + .chain(std::iter::repeat(&[][..])) + .take(num_data_shreds) + { let shred = new_shred_data(common_header, data_header, shred); shreds.push(shred); common_header.index += 1; @@ -1132,12 +1146,17 @@ pub(super) fn make_shreds_from_data( stats.data_buffer_residual += data_buffer_size - shred.data()?.len(); } } - // Only the very last shred may have residual data buffer. - debug_assert!(shreds.iter().rev().skip(1).all(|shred| { - let proof_size = shred.proof_size().unwrap(); - let capacity = ShredData::capacity(proof_size, chained, resigned).unwrap(); - shred.data().unwrap().len() == capacity - })); + // Only the trailing data shreds may have residual data buffer. + debug_assert!(shreds + .iter() + .rev() + .skip_while(|shred| is_last_in_slot && shred.data().unwrap().is_empty()) + .skip(1) + .all(|shred| { + let proof_size = shred.proof_size().unwrap(); + let capacity = ShredData::capacity(proof_size, chained, resigned).unwrap(); + shred.data().unwrap().len() == capacity + })); // Adjust flags for the very last shred. if let Some(shred) = shreds.last_mut() { shred.data_header.flags |= if is_last_in_slot { @@ -1890,6 +1909,18 @@ mod test { .contains(ShredFlags::LAST_SHRED_IN_SLOT), is_last_in_slot ); + // Assert that the last erasure batch has 32+ data shreds. + if is_last_in_slot { + let fec_set_index = shreds.iter().map(Shred::fec_set_index).max().unwrap(); + assert!( + shreds + .iter() + .filter(|shred| shred.fec_set_index() == fec_set_index) + .filter(|shred| shred.shred_type() == ShredType::Data) + .count() + >= 32 + ) + } // Assert that data shreds can be recovered from coding shreds. let recovered_data_shreds: Vec<_> = shreds .iter() diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index ba127ef009..e23ac208a6 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -513,7 +513,7 @@ mod tests { assert_eq!(verify, shred.verify(pk)); } - fn run_test_data_shredder(slot: Slot, chained: bool) { + fn run_test_data_shredder(slot: Slot, chained: bool, is_last_in_slot: bool) { let keypair = Arc::new(Keypair::new()); // Test that parent cannot be > current slot @@ -538,11 +538,15 @@ mod tests { }) .collect(); - let is_last_in_slot = true; let size = serialized_size(&entries).unwrap() as usize; // Integer division to ensure we have enough shreds to fit all the data let data_buffer_size = ShredData::capacity(/*merkle_proof_size:*/ None).unwrap(); let num_expected_data_shreds = (size + data_buffer_size - 1) / data_buffer_size; + let num_expected_data_shreds = num_expected_data_shreds.max(if is_last_in_slot { + DATA_SHREDS_PER_FEC_BLOCK + } else { + 1 + }); let num_expected_coding_shreds = get_erasure_batch_size(num_expected_data_shreds, is_last_in_slot) - num_expected_data_shreds; @@ -574,8 +578,8 @@ mod tests { slot, parent_slot, &keypair.pubkey(), - true, - is_last, + true, // verify + is_last && is_last_in_slot, is_last, ); assert!(!data_shred_indexes.contains(&index)); @@ -607,10 +611,12 @@ mod tests { assert_eq!(entries, deshred_entries); } - #[test_case(false)] - #[test_case(true)] - fn test_data_shredder(chained: bool) { - run_test_data_shredder(0x1234_5678_9abc_def0, chained); + #[test_case(false, false)] + #[test_case(false, true)] + #[test_case(true, false)] + #[test_case(true, true)] + fn test_data_shredder(chained: bool, is_last_in_slot: bool) { + run_test_data_shredder(0x1234_5678_9abc_def0, chained, is_last_in_slot); } #[test_case(false)] diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 804c0db788..a03d068e0e 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -3769,6 +3769,7 @@ fn test_kill_partition_switch_threshold_progress() { #[test] #[serial] +#[ignore] #[allow(unused_attributes)] fn test_duplicate_shreds_broadcast_leader() { run_duplicate_shreds_broadcast_leader(true); diff --git a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs index adca69ed49..2fd7dbf3b9 100644 --- a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs @@ -237,9 +237,11 @@ impl BroadcastRun for BroadcastDuplicatesRun { sigs, ); - assert_eq!(original_last_data_shred.len(), 1); - assert_eq!(partition_last_data_shred.len(), 1); - self.next_shred_index += 1; + assert_eq!( + original_last_data_shred.len(), + partition_last_data_shred.len() + ); + self.next_shred_index += u32::try_from(original_last_data_shred.len()).unwrap(); (original_last_data_shred, partition_last_data_shred) });