Skip to content

Commit

Permalink
Fix wrap-around bug (Azure#4518)
Browse files Browse the repository at this point in the history
# Description
This is a bug fix for a bug I have recently found. I assume I have just gotten really lucky and that is why it took a while to find. 

# The problem
When RingBuffer wraps around it is possible to write a smaller block than what was previously there.
This means we have a buffer that looks like this:
+------------+
|  2  | 1  |0000 |
+------------+

Where 2 is smaller than previous block and when the next insert happens and we check to see if can overwrite the block we will end up seeing garbage and may or may not do it.

# The fix
We will allow for writes if we cannot determine if block header is valid. Other existing conditions will allow for protection against most corruptions, we just need to allow for write in the case of wrap around and writing a smaller block before hand. We do not want to skip beyond the fragmented block/data to try to avoid the problem as it will result in lots of fragmentation. Instead we want to do as described above and just allow the write and continue on. As we do not allow for write to pass read or read to pass write.

# Todo (later prs)
- [ ] Should merge into release 1.2
  • Loading branch information
nyanzebra authored Mar 2, 2021
1 parent 436bada commit 0998859
Showing 1 changed file with 51 additions and 11 deletions.
62 changes: 51 additions & 11 deletions mqtt/mqtt-bridge/src/persist/waking_state/ring_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,6 @@ impl StreamWakeableState for RingBuffer {

let start = write_index;

// Check if an existing block header is present. If the block is there
// and if the overwrite flag is not set then we shouldn't write.
let result = load_block_header(&mut self.file, start, block_size, self.max_file_size);
if let Ok(block_header) = result {
let BlockVersion::Version1(inner) = block_header.inner();
let should_not_overwrite = inner.should_not_overwrite();
if should_not_overwrite {
return Err(PersistError::RingBuffer(RingBufferError::Full));
}
}

let v1 = BlockHeaderV1::new(BLOCK_HINT, data_crc, data_size, order, write_index);
let versioned_block = BlockVersion::Version1(v1);

Expand Down Expand Up @@ -952,6 +941,57 @@ mod tests {
}
}

#[test]
fn it_inserts_ok_after_leftover_and_wrap_around_is_smaller() {
let mut rb = TestRingBuffer::default();
let publication = Publication {
topic_name: "test".to_owned(),
qos: QoS::AtMostOnce,
retain: true,
payload: Bytes::new(),
};

let block_size = *SERIALIZED_BLOCK_SIZE;

let result = bincode::serialize(&publication);
let data = result.unwrap();

let data_size = data.len() as u64;

let total_size = block_size + data_size;

let inserts = MAX_FILE_SIZE / total_size;
for _ in 0..inserts {
rb.0.insert(&publication)
.expect("Failed to insert into RingBuffer");
}

let result = rb.0.batch(2);
let batch = result.unwrap();
for entry in batch {
rb.0.remove(entry.0)
.expect("Failed to remove from RingBuffer");
}

let smaller_publication = Publication {
topic_name: "t".to_owned(),
qos: QoS::AtMostOnce,
retain: true,
payload: Bytes::new(),
};

rb.0.insert(&smaller_publication)
.expect("Failed to insert into RingBuffer");

rb.0.insert(&publication)
.expect("Failed to insert into RingBuffer");

assert_eq!(rb.0.metadata.order, inserts + 2);
let result = rb.0.insert(&publication);
assert_matches!(result, Err(PersistError::RingBuffer(RingBufferError::Full)));
assert_eq!(rb.0.metadata.order, inserts + 2);
}

#[test]
fn it_errs_on_insert_when_full() {
let mut rb = TestRingBuffer::default();
Expand Down

0 comments on commit 0998859

Please sign in to comment.