diff --git a/mqtt/mqtt-bridge/src/persist/waking_state/ring_buffer/mod.rs b/mqtt/mqtt-bridge/src/persist/waking_state/ring_buffer/mod.rs index 9b67e8c691e..d431d92287f 100644 --- a/mqtt/mqtt-bridge/src/persist/waking_state/ring_buffer/mod.rs +++ b/mqtt/mqtt-bridge/src/persist/waking_state/ring_buffer/mod.rs @@ -220,17 +220,6 @@ impl StreamWakeableState for RingBuffer { let start = write_index; - // Check if an existing block header is present. If the block is there - // and if the overwrite flag is not set then we shouldn't write. - let result = load_block_header(&mut self.file, start, block_size, self.max_file_size); - if let Ok(block_header) = result { - let BlockVersion::Version1(inner) = block_header.inner(); - let should_not_overwrite = inner.should_not_overwrite(); - if should_not_overwrite { - return Err(PersistError::RingBuffer(RingBufferError::Full)); - } - } - let v1 = BlockHeaderV1::new(BLOCK_HINT, data_crc, data_size, order, write_index); let versioned_block = BlockVersion::Version1(v1); @@ -952,6 +941,57 @@ mod tests { } } + #[test] + fn it_inserts_ok_after_leftover_and_wrap_around_is_smaller() { + let mut rb = TestRingBuffer::default(); + let publication = Publication { + topic_name: "test".to_owned(), + qos: QoS::AtMostOnce, + retain: true, + payload: Bytes::new(), + }; + + let block_size = *SERIALIZED_BLOCK_SIZE; + + let result = bincode::serialize(&publication); + let data = result.unwrap(); + + let data_size = data.len() as u64; + + let total_size = block_size + data_size; + + let inserts = MAX_FILE_SIZE / total_size; + for _ in 0..inserts { + rb.0.insert(&publication) + .expect("Failed to insert into RingBuffer"); + } + + let result = rb.0.batch(2); + let batch = result.unwrap(); + for entry in batch { + rb.0.remove(entry.0) + .expect("Failed to remove from RingBuffer"); + } + + let smaller_publication = Publication { + topic_name: "t".to_owned(), + qos: QoS::AtMostOnce, + retain: true, + payload: Bytes::new(), + }; + + rb.0.insert(&smaller_publication) + .expect("Failed to insert into RingBuffer"); + + rb.0.insert(&publication) + .expect("Failed to insert into RingBuffer"); + + assert_eq!(rb.0.metadata.order, inserts + 2); + let result = rb.0.insert(&publication); + assert_matches!(result, Err(PersistError::RingBuffer(RingBufferError::Full))); + assert_eq!(rb.0.metadata.order, inserts + 2); + } + #[test] fn it_errs_on_insert_when_full() { let mut rb = TestRingBuffer::default();