From c2da419b390cbed1c2bd3c106ab8fb10840862b4 Mon Sep 17 00:00:00 2001 From: Grzegorz Koszyk <112548209+numinnex@users.noreply.github.com> Date: Mon, 18 Mar 2024 16:23:25 +0100 Subject: [PATCH] Implement message compatibility conversion mechanism (#781) Co-authored-by: Piotr Gankiewicz --- Cargo.lock | 3 +- configs/server.json | 6 + configs/server.toml | 10 + sdk/src/error.rs | 2 +- server/Cargo.toml | 3 +- server/src/compat/binary_schema.rs | 5 + server/src/compat/chunks_error.rs | 18 ++ server/src/compat/conversion_writer.rs | 113 +++++++++++ server/src/compat/message_converter.rs | 99 ++++++++++ server/src/compat/message_stream.rs | 6 + server/src/compat/mod.rs | 9 + server/src/compat/samplers/message_sampler.rs | 53 ++++++ server/src/compat/samplers/mod.rs | 2 + .../compat/samplers/retained_batch_sampler.rs | 63 +++++++ server/src/compat/schema_sampler.rs | 8 + .../src/compat/snapshots/message_snapshot.rs | 176 ++++++++++++++++++ server/src/compat/snapshots/mod.rs | 2 + .../snapshots/retained_batch_snapshot.rs | 140 ++++++++++++++ server/src/compat/streams/mod.rs | 2 + server/src/compat/streams/retained_batch.rs | 20 ++ server/src/compat/streams/retained_message.rs | 82 ++++++++ server/src/configs/defaults.rs | 26 ++- server/src/configs/system.rs | 24 +++ server/src/lib.rs | 1 + server/src/server_error.rs | 12 ++ server/src/streaming/partitions/storage.rs | 36 ++++ server/src/streaming/segments/segment.rs | 86 +++++++++ server/src/streaming/segments/storage.rs | 15 +- server/src/streaming/storage.rs | 4 +- server/src/streaming/utils/file.rs | 9 +- 30 files changed, 1017 insertions(+), 18 deletions(-) create mode 100644 server/src/compat/binary_schema.rs create mode 100644 server/src/compat/chunks_error.rs create mode 100644 server/src/compat/conversion_writer.rs create mode 100644 server/src/compat/message_converter.rs create mode 100644 server/src/compat/message_stream.rs create mode 100644 server/src/compat/mod.rs create mode 100644 server/src/compat/samplers/message_sampler.rs create mode 100644 server/src/compat/samplers/mod.rs create mode 100644 server/src/compat/samplers/retained_batch_sampler.rs create mode 100644 server/src/compat/schema_sampler.rs create mode 100644 server/src/compat/snapshots/message_snapshot.rs create mode 100644 server/src/compat/snapshots/mod.rs create mode 100644 server/src/compat/snapshots/retained_batch_snapshot.rs create mode 100644 server/src/compat/streams/mod.rs create mode 100644 server/src/compat/streams/retained_batch.rs create mode 100644 server/src/compat/streams/retained_message.rs diff --git a/Cargo.lock b/Cargo.lock index 70909b948..fdb1d1ce2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3830,10 +3830,11 @@ dependencies = [ [[package]] name = "server" -version = "0.2.6" +version = "0.2.7" dependencies = [ "aes-gcm", "anyhow", + "async-stream", "async-trait", "atone", "axum 0.7.4", diff --git a/configs/server.json b/configs/server.json index 3db287d24..c9864cdd1 100644 --- a/configs/server.json +++ b/configs/server.json @@ -94,6 +94,12 @@ "database": { "path": "database" }, + "backup" : { + "path": "backup", + "compatibility": { + "path": "compatibility" + } + }, "runtime": { "path": "runtime" }, diff --git a/configs/server.toml b/configs/server.toml index 766181c83..33b49c5e6 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -220,6 +220,16 @@ interval = "1m" # Base path for system data storage. path = "local_data" +# Backup configuration +[system.backup] +# Path for storing backup. +path = "backup" + +# Compatibility conversion configuration +[system.backup.compatibility] +# Subpath of the backup directory where converted segment data is stored after compatibility conversion. +path = "compatibility" + # Database configuration. [system.database] # Path for storing database files. diff --git a/sdk/src/error.rs b/sdk/src/error.rs index 124e10381..697507161 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -322,7 +322,7 @@ pub enum IggyError { CannotReadLastOffsetDelta = 7002, #[error("Cannot read batch maximum timestamp")] CannotReadMaxTimestamp = 7003, - #[error("Cannot read batch attributes")] + #[error("Cannot read batch payload")] CannotReadBatchPayload = 7004, } diff --git a/server/Cargo.toml b/server/Cargo.toml index f205816b8..8ecc74905 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.2.6" +version = "0.2.7" edition = "2021" build = "src/build.rs" @@ -12,6 +12,7 @@ tokio-console = ["dep:console-subscriber", "tokio/tracing"] [dependencies] aes-gcm = "0.10.3" anyhow = "1.0.81" +async-stream = "0.3.5" async-trait = "0.1.77" atone = "0.3.7" axum = "0.7.4" diff --git a/server/src/compat/binary_schema.rs b/server/src/compat/binary_schema.rs new file mode 100644 index 000000000..f26328aba --- /dev/null +++ b/server/src/compat/binary_schema.rs @@ -0,0 +1,5 @@ +#[derive(Debug, Clone, Copy)] +pub enum BinarySchema { + RetainedMessageSchema, + RetainedMessageBatchSchema, +} diff --git a/server/src/compat/chunks_error.rs b/server/src/compat/chunks_error.rs new file mode 100644 index 000000000..531497642 --- /dev/null +++ b/server/src/compat/chunks_error.rs @@ -0,0 +1,18 @@ +use futures::stream::TryChunksError; +use iggy::error::IggyError; + +pub trait IntoTryChunksError { + fn into_try_chunks_error(self) -> TryChunksError; +} + +impl IntoTryChunksError for IggyError { + fn into_try_chunks_error(self) -> TryChunksError { + TryChunksError(Vec::new(), self) + } +} + +impl IntoTryChunksError for std::io::Error { + fn into_try_chunks_error(self) -> TryChunksError { + TryChunksError(Vec::new(), IggyError::from(self)) + } +} diff --git a/server/src/compat/conversion_writer.rs b/server/src/compat/conversion_writer.rs new file mode 100644 index 000000000..c38fbea37 --- /dev/null +++ b/server/src/compat/conversion_writer.rs @@ -0,0 +1,113 @@ +use crate::streaming::utils::file; +use iggy::error::IggyError; +use tracing::trace; + +//TODO (numinex) - Make this writer transactional +pub struct ConversionWriter<'w> { + pub log_path: &'w str, + pub index_path: &'w str, + pub time_index_path: &'w str, + + pub alt_log_path: String, + pub alt_index_path: String, + pub alt_time_index_path: String, + + compat_backup_path: &'w str, +} + +impl<'w> ConversionWriter<'w> { + pub fn init( + log_path: &'w str, + index_path: &'w str, + time_index_path: &'w str, + compat_backup_path: &'w str, + ) -> ConversionWriter<'w> { + ConversionWriter { + log_path, + index_path, + time_index_path, + alt_log_path: format!("{}_temp.{}", log_path.split('.').next().unwrap(), "log"), + alt_index_path: format!("{}_temp.{}", index_path.split('.').next().unwrap(), "index"), + alt_time_index_path: format!( + "{}_temp.{}", + time_index_path.split('.').next().unwrap(), + "timeindex" + ), + compat_backup_path, + } + } + + pub async fn create_alt_directories(&self) -> Result<(), IggyError> { + tokio::fs::File::create(&self.alt_log_path).await?; + tokio::fs::File::create(&self.alt_index_path).await?; + tokio::fs::File::create(&self.alt_time_index_path).await?; + + trace!( + "Created temporary files for conversion, log: {}, index: {}, time_index: {}", + &self.alt_log_path, + &self.alt_index_path, + &self.alt_time_index_path + ); + Ok(()) + } + + pub async fn create_old_segment_backup(&self) -> Result<(), IggyError> { + let log_backup_path = &self + .log_path + .split_once('/') + .map(|(_, path)| format!("{}/{}", &self.compat_backup_path, path)) + .unwrap(); + let index_backup_path = &self + .index_path + .split_once('/') + .map(|(_, path)| format!("{}/{}", self.compat_backup_path, path)) + .unwrap(); + let time_index_backup_path = &self + .time_index_path + .split_once('/') + .map(|(_, path)| format!("{}/{}", self.compat_backup_path, path)) + .unwrap(); + + let log_path_last_idx = log_backup_path.rfind('/').unwrap(); + let index_path_last_idx = index_backup_path.rfind('/').unwrap(); + let time_index_path_last_idx = time_index_backup_path.rfind('/').unwrap(); + if tokio::fs::metadata(&log_backup_path[..log_path_last_idx]) + .await + .is_err() + { + tokio::fs::create_dir_all(&log_backup_path[..log_path_last_idx]).await?; + } + if tokio::fs::metadata(&index_backup_path[..index_path_last_idx]) + .await + .is_err() + { + tokio::fs::create_dir_all(&index_backup_path[..index_path_last_idx]).await?; + } + if tokio::fs::metadata(&time_index_backup_path[..time_index_path_last_idx]) + .await + .is_err() + { + tokio::fs::create_dir_all(&time_index_backup_path[..time_index_path_last_idx]).await?; + } + file::rename(self.log_path, log_backup_path).await?; + file::rename(self.index_path, index_backup_path).await?; + file::rename(self.time_index_path, time_index_backup_path).await?; + + trace!( + "Created backup of converted segment, log: {}, index: {}, time_index: {}", + &log_backup_path, + &index_backup_path, + &time_index_backup_path + ); + Ok(()) + } + + pub async fn replace_with_converted(&self) -> Result<(), IggyError> { + file::rename(&self.alt_log_path, self.log_path).await?; + file::rename(&self.alt_index_path, self.index_path).await?; + file::rename(&self.alt_time_index_path, self.time_index_path).await?; + + trace!("Replaced old segment with newly converted files"); + Ok(()) + } +} diff --git a/server/src/compat/message_converter.rs b/server/src/compat/message_converter.rs new file mode 100644 index 000000000..7a195b0a1 --- /dev/null +++ b/server/src/compat/message_converter.rs @@ -0,0 +1,99 @@ +use crate::compat::samplers::message_sampler::MessageSampler; +use crate::compat::samplers::retained_batch_sampler::RetainedMessageBatchSampler; +use crate::compat::schema_sampler::BinarySchemaSampler; +use crate::streaming::sizeable::Sizeable; +use bytes::{BufMut, BytesMut}; +use iggy::error::IggyError; + +use crate::streaming::segments::storage::{INDEX_SIZE, TIME_INDEX_SIZE}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +pub trait Extendable { + fn extend(&self, bytes: &mut BytesMut); +} + +pub trait MessageFormatConverterPersister { + async fn persist(&self, writer: &mut W) -> Result<(), IggyError>; + async fn persist_index( + &self, + position: u32, + relative_offset: u32, + writer: &mut W, + ) -> Result<(), IggyError>; + async fn persist_time_index( + &self, + timestamp: u64, + relative_offset: u32, + writer: &mut W, + ) -> Result<(), IggyError>; +} + +impl MessageFormatConverterPersister for T +where + T: Sizeable + Extendable, +{ + async fn persist(&self, writer: &mut W) -> Result<(), IggyError> { + let size = self.get_size_bytes(); + let mut batch_bytes = BytesMut::with_capacity(size as usize); + self.extend(&mut batch_bytes); + + writer.write_all(&batch_bytes).await?; + Ok(()) + } + + async fn persist_index( + &self, + position: u32, + relative_offset: u32, + writer: &mut W, + ) -> Result<(), IggyError> { + let mut index_bytes = BytesMut::with_capacity(INDEX_SIZE as usize); + index_bytes.put_u32_le(relative_offset); + index_bytes.put_u32_le(position); + + writer.write_all(&index_bytes).await?; + Ok(()) + } + + async fn persist_time_index( + &self, + timestamp: u64, + relative_offset: u32, + writer: &mut W, + ) -> Result<(), IggyError> { + let mut time_index_bytes = BytesMut::with_capacity(TIME_INDEX_SIZE as usize); + time_index_bytes.put_u32_le(relative_offset); + time_index_bytes.put_u64_le(timestamp); + + writer.write_all(&time_index_bytes).await?; + Ok(()) + } +} + +pub struct MessageFormatConverter { + pub samplers: Vec>, +} + +impl MessageFormatConverter { + pub fn init( + segment_start_offset: u64, + log_path: String, + index_path: String, + ) -> MessageFormatConverter { + // Always append new schemas to beginning of vec + MessageFormatConverter { + samplers: vec![ + Box::new(RetainedMessageBatchSampler::new( + segment_start_offset, + log_path.clone(), + index_path.clone(), + )), + Box::new(MessageSampler::new( + segment_start_offset, + log_path, + index_path, + )), + ], + } + } +} diff --git a/server/src/compat/message_stream.rs b/server/src/compat/message_stream.rs new file mode 100644 index 000000000..f2b922085 --- /dev/null +++ b/server/src/compat/message_stream.rs @@ -0,0 +1,6 @@ +use futures::Stream; + +pub trait MessageStream { + type Item; + fn into_stream(self) -> impl Stream; +} diff --git a/server/src/compat/mod.rs b/server/src/compat/mod.rs new file mode 100644 index 000000000..715891011 --- /dev/null +++ b/server/src/compat/mod.rs @@ -0,0 +1,9 @@ +pub(crate) mod binary_schema; +pub(crate) mod chunks_error; +pub(crate) mod conversion_writer; +pub(crate) mod message_converter; +pub(crate) mod message_stream; +pub(crate) mod samplers; +pub(crate) mod schema_sampler; +pub(crate) mod snapshots; +pub(crate) mod streams; diff --git a/server/src/compat/samplers/message_sampler.rs b/server/src/compat/samplers/message_sampler.rs new file mode 100644 index 000000000..c7739e215 --- /dev/null +++ b/server/src/compat/samplers/message_sampler.rs @@ -0,0 +1,53 @@ +use crate::compat::binary_schema::BinarySchema; +use crate::compat::schema_sampler::BinarySchemaSampler; +use crate::compat::snapshots::message_snapshot::MessageSnapshot; +use crate::server_error::ServerError; +use crate::streaming::utils::file; +use async_trait::async_trait; +use bytes::{BufMut, BytesMut}; +use tokio::io::AsyncReadExt; + +pub struct MessageSampler { + pub segment_start_offset: u64, + pub log_path: String, + pub index_path: String, +} +impl MessageSampler { + pub fn new(segment_start_offset: u64, log_path: String, index_path: String) -> MessageSampler { + MessageSampler { + segment_start_offset, + log_path, + index_path, + } + } +} + +unsafe impl Send for MessageSampler {} +unsafe impl Sync for MessageSampler {} + +#[async_trait] +impl BinarySchemaSampler for MessageSampler { + async fn try_sample(&self) -> Result { + let mut index_file = file::open(&self.index_path).await?; + let mut log_file = file::open(&self.log_path).await?; + let log_file_size = log_file.metadata().await?.len(); + + if log_file_size == 0 { + return Ok(BinarySchema::RetainedMessageSchema); + } + + let _ = index_file.read_u32_le().await?; + let end_position = index_file.read_u32_le().await?; + + let buffer_size = end_position as usize; + let mut buffer = BytesMut::with_capacity(buffer_size); + buffer.put_bytes(0, buffer_size); + let _ = log_file.read_exact(&mut buffer).await?; + + let message = MessageSnapshot::try_from(buffer.freeze())?; + if message.offset != self.segment_start_offset { + return Err(ServerError::InvalidMessageOffsetFormatConversion); + } + Ok(BinarySchema::RetainedMessageSchema) + } +} diff --git a/server/src/compat/samplers/mod.rs b/server/src/compat/samplers/mod.rs new file mode 100644 index 000000000..d75fb81ff --- /dev/null +++ b/server/src/compat/samplers/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod message_sampler; +pub(crate) mod retained_batch_sampler; diff --git a/server/src/compat/samplers/retained_batch_sampler.rs b/server/src/compat/samplers/retained_batch_sampler.rs new file mode 100644 index 000000000..2fbfee469 --- /dev/null +++ b/server/src/compat/samplers/retained_batch_sampler.rs @@ -0,0 +1,63 @@ +use crate::compat::binary_schema::BinarySchema; +use crate::compat::schema_sampler::BinarySchemaSampler; +use crate::compat::snapshots::retained_batch_snapshot::RetainedMessageBatchSnapshot; +use crate::server_error::ServerError; +use crate::streaming::utils::file; +use async_trait::async_trait; +use bytes::{BufMut, Bytes}; +use tokio::io::AsyncReadExt; + +pub struct RetainedMessageBatchSampler { + pub segment_start_offset: u64, + pub log_path: String, + pub index_path: String, +} + +impl RetainedMessageBatchSampler { + pub fn new( + segment_start_offset: u64, + log_path: String, + index_path: String, + ) -> RetainedMessageBatchSampler { + RetainedMessageBatchSampler { + segment_start_offset, + log_path, + index_path, + } + } +} + +unsafe impl Send for RetainedMessageBatchSampler {} +unsafe impl Sync for RetainedMessageBatchSampler {} + +#[async_trait] +impl BinarySchemaSampler for RetainedMessageBatchSampler { + async fn try_sample(&self) -> Result { + let mut index_file = file::open(&self.index_path).await?; + let mut log_file = file::open(&self.log_path).await?; + let log_file_size = log_file.metadata().await?.len(); + + if log_file_size == 0 { + return Ok(BinarySchema::RetainedMessageBatchSchema); + } + + let _ = index_file.read_u32_le().await?; + let _ = index_file.read_u32_le().await?; + let second_index_offset = index_file.read_u32_le().await; + let second_end_position = index_file.read_u32_le().await; + + let mut buffer = Vec::new(); + if second_index_offset.is_err() && second_end_position.is_err() { + let _ = log_file.read_to_end(&mut buffer).await?; + } else { + let buffer_size = second_end_position.unwrap() as usize; + buffer.put_bytes(0, buffer_size); + let _ = log_file.read_exact(&mut buffer).await?; + } + let batch = RetainedMessageBatchSnapshot::try_from(Bytes::from(buffer))?; + if batch.base_offset != self.segment_start_offset { + return Err(ServerError::InvalidBatchBaseOffsetFormatConversion); + } + Ok(BinarySchema::RetainedMessageBatchSchema) + } +} diff --git a/server/src/compat/schema_sampler.rs b/server/src/compat/schema_sampler.rs new file mode 100644 index 000000000..b35dda06e --- /dev/null +++ b/server/src/compat/schema_sampler.rs @@ -0,0 +1,8 @@ +use crate::compat::binary_schema::BinarySchema; +use crate::server_error::ServerError; +use async_trait::async_trait; + +#[async_trait] +pub trait BinarySchemaSampler: Send + Sync { + async fn try_sample(&self) -> Result; +} diff --git a/server/src/compat/snapshots/message_snapshot.rs b/server/src/compat/snapshots/message_snapshot.rs new file mode 100644 index 000000000..c264604f5 --- /dev/null +++ b/server/src/compat/snapshots/message_snapshot.rs @@ -0,0 +1,176 @@ +use crate::compat::message_converter::Extendable; +use crate::server_error::ServerError; +use crate::streaming::sizeable::Sizeable; +use bytes::{BufMut, Bytes, BytesMut}; +use iggy::bytes_serializable::BytesSerializable; +use iggy::models::header::{self, HeaderKey, HeaderValue}; +use iggy::models::messages::MessageState; +use std::collections::HashMap; +use std::convert::TryFrom; + +#[derive(Debug)] +pub struct MessageSnapshot { + pub offset: u64, + pub state: MessageState, + pub timestamp: u64, + pub id: u128, + pub payload: Bytes, + pub checksum: u32, + pub headers: Option>, +} + +impl MessageSnapshot { + pub fn new( + offset: u64, + state: MessageState, + timestamp: u64, + id: u128, + payload: Bytes, + checksum: u32, + headers: Option>, + ) -> MessageSnapshot { + MessageSnapshot { + offset, + state, + timestamp, + id, + payload, + checksum, + headers, + } + } +} + +impl Extendable for MessageSnapshot { + fn extend(&self, bytes: &mut BytesMut) { + let length = self.get_size_bytes() - 4; + let id = self.id; + let offset = self.offset; + let timestamp = self.timestamp; + let payload = self.payload.clone(); + let checksum = self.checksum; + let message_state = self.state; + let headers = &self.headers; + + bytes.put_u32_le(length); + bytes.put_u64_le(offset); + bytes.put_u8(message_state.as_code()); + bytes.put_u64_le(timestamp); + bytes.put_u128_le(id); + bytes.put_u32_le(checksum); + if let Some(headers) = headers { + #[allow(clippy::cast_possible_truncation)] + bytes.put_u32_le(headers.len() as u32); + bytes.put_slice(&headers.as_bytes()); + } else { + bytes.put_u32_le(0u32); + } + bytes.put_slice(&payload); + } +} + +impl Sizeable for MessageSnapshot { + fn get_size_bytes(&self) -> u32 { + let headers_size = header::get_headers_size_bytes(&self.headers); + 41 + headers_size + self.payload.len() as u32 + } +} + +impl TryFrom for MessageSnapshot { + type Error = ServerError; + + fn try_from(value: Bytes) -> Result { + let offset = u64::from_le_bytes( + value + .get(0..8) + .ok_or_else(|| { + ServerError::InvalidMessageFieldFormatConversionSampling( + "Invalid offset bytes for message snapshot".to_owned(), + ) + })? + .try_into()?, + ); + let state = MessageState::from_code(*value.get(8).ok_or_else(|| { + ServerError::InvalidMessageFieldFormatConversionSampling( + "Invalid state for message snapshot".to_owned(), + ) + })?)?; + let timestamp = u64::from_le_bytes( + value + .get(9..17) + .ok_or_else(|| { + ServerError::InvalidMessageFieldFormatConversionSampling( + "Invalid timestamp bytes for message snapshot".to_owned(), + ) + })? + .try_into()?, + ); + let id = u128::from_le_bytes( + value + .get(17..33) + .ok_or_else(|| { + ServerError::InvalidMessageFieldFormatConversionSampling( + "Invalid id bytes for message snapshot".to_owned(), + ) + })? + .try_into()?, + ); + let checksum = u32::from_le_bytes( + value + .get(33..37) + .ok_or_else(|| { + ServerError::InvalidMessageFieldFormatConversionSampling( + "Invalid checksum bytes for message snapshot".to_owned(), + ) + })? + .try_into()?, + ); + let headers_length = u32::from_le_bytes( + value + .get(37..41) + .ok_or_else(|| { + ServerError::InvalidMessageFieldFormatConversionSampling( + "Invalid headers_length bytes for message snapshot".to_owned(), + ) + })? + .try_into()?, + ); + let headers = match headers_length { + 0 => None, + _ => { + let headers_payload = &value[41..(41 + headers_length as usize)]; + let headers = HashMap::from_bytes(Bytes::copy_from_slice(headers_payload)) + .map_err(|_| { + ServerError::InvalidMessageFieldFormatConversionSampling( + "Invalid headers bytes for message snapshot".to_owned(), + ) + })?; + Some(headers) + } + }; + + let position = 41 + headers_length as usize; + let payload_length = u32::from_le_bytes( + value + .get(position..(position + 4)) + .ok_or_else(|| { + ServerError::InvalidMessageFieldFormatConversionSampling( + "Invalid payload bytes for message snapshot".to_owned(), + ) + })? + .try_into()?, + ); + let payload = + Bytes::copy_from_slice(&value[position + 4..position + 4 + payload_length as usize]); + + Ok(MessageSnapshot { + offset, + state, + timestamp, + id, + payload, + checksum, + headers, + }) + } +} diff --git a/server/src/compat/snapshots/mod.rs b/server/src/compat/snapshots/mod.rs new file mode 100644 index 000000000..e3a3bf69c --- /dev/null +++ b/server/src/compat/snapshots/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod message_snapshot; +pub(crate) mod retained_batch_snapshot; diff --git a/server/src/compat/snapshots/retained_batch_snapshot.rs b/server/src/compat/snapshots/retained_batch_snapshot.rs new file mode 100644 index 000000000..71f59ee7a --- /dev/null +++ b/server/src/compat/snapshots/retained_batch_snapshot.rs @@ -0,0 +1,140 @@ +use super::message_snapshot::MessageSnapshot; +use crate::compat::message_converter::Extendable; +use crate::server_error::ServerError; +use crate::streaming::sizeable::Sizeable; +use bytes::{BufMut, Bytes, BytesMut}; +use iggy::error::IggyError; + +pub struct RetainedMessageBatchSnapshot { + pub base_offset: u64, + pub last_offset_delta: u32, + pub max_timestamp: u64, + pub length: u32, + pub bytes: Bytes, +} + +impl RetainedMessageBatchSnapshot { + pub fn new( + base_offset: u64, + last_offset_delta: u32, + max_timestamp: u64, + length: u32, + bytes: Bytes, + ) -> RetainedMessageBatchSnapshot { + RetainedMessageBatchSnapshot { + base_offset, + last_offset_delta, + max_timestamp, + length, + bytes, + } + } + + pub fn get_last_offset(&self) -> u64 { + self.base_offset + self.last_offset_delta as u64 + } + + pub fn try_from_messages( + messages: Vec, + ) -> Result { + let first_message = messages.first().unwrap(); + let last_message = messages.last().unwrap(); + let base_offset = first_message.offset; + let last_offset_delta = last_message.offset - base_offset; + let max_timestamp = last_message.timestamp; + + let size = messages + .iter() + .map(|msg| msg.get_size_bytes() as usize) + .sum(); + let mut bytes = BytesMut::with_capacity(size); + for message in messages.iter() { + message.extend(&mut bytes); + } + Ok(RetainedMessageBatchSnapshot::new( + base_offset, + last_offset_delta as u32, + max_timestamp, + bytes.len() as u32, + bytes.freeze(), + )) + } +} +impl Sizeable for RetainedMessageBatchSnapshot { + fn get_size_bytes(&self) -> u32 { + 24 + self.bytes.len() as u32 + } +} + +impl Extendable for RetainedMessageBatchSnapshot { + fn extend(&self, bytes: &mut BytesMut) { + bytes.put_u64_le(self.base_offset); + bytes.put_u32_le(self.length); + bytes.put_u32_le(self.last_offset_delta); + bytes.put_u64_le(self.max_timestamp); + bytes.put_slice(&self.bytes); + } +} + +impl TryFrom for RetainedMessageBatchSnapshot { + type Error = ServerError; + fn try_from(value: Bytes) -> Result { + let base_offset = u64::from_le_bytes( + value + .get(0..8) + .ok_or_else(|| { + ServerError::CannotReadMessageBatchFormatConversion( + "Failed to read batch base offset".to_owned(), + ) + })? + .try_into()?, + ); + let length = u32::from_le_bytes( + value + .get(8..12) + .ok_or_else(|| { + ServerError::CannotReadMessageBatchFormatConversion( + "Failed to read batch length".to_owned(), + ) + })? + .try_into()?, + ); + let last_offset_delta = u32::from_le_bytes( + value + .get(12..16) + .ok_or_else(|| { + ServerError::CannotReadMessageBatchFormatConversion( + "Failed to read batch last_offset_delta".to_owned(), + ) + })? + .try_into()?, + ); + let max_timestamp = u64::from_le_bytes( + value + .get(16..24) + .ok_or_else(|| { + ServerError::CannotReadMessageBatchFormatConversion( + "Failed to read batch max_timestamp".to_owned(), + ) + })? + .try_into()?, + ); + let bytes = Bytes::from( + value + .get(24..length as usize) + .ok_or_else(|| { + ServerError::CannotReadMessageBatchFormatConversion( + "Failed to read batch payload".to_owned(), + ) + })? + .to_owned(), + ); + Ok(RetainedMessageBatchSnapshot { + base_offset, + last_offset_delta, + max_timestamp, + length, + bytes, + }) + } +} diff --git a/server/src/compat/streams/mod.rs b/server/src/compat/streams/mod.rs new file mode 100644 index 000000000..eb6dc297c --- /dev/null +++ b/server/src/compat/streams/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod retained_batch; +pub(crate) mod retained_message; diff --git a/server/src/compat/streams/retained_batch.rs b/server/src/compat/streams/retained_batch.rs new file mode 100644 index 000000000..a01f52c6c --- /dev/null +++ b/server/src/compat/streams/retained_batch.rs @@ -0,0 +1,20 @@ +use tokio::fs::File; +use tokio::io::BufWriter; + +const BUF_WRITER_CAPACITY_BYTES: usize = 512 * 1000; + +pub struct RetainedBatchWriter { + pub log_writer: BufWriter, + pub index_writer: BufWriter, + pub time_index_writer: BufWriter, +} + +impl RetainedBatchWriter { + pub fn init(log_file: File, index_file: File, time_index_file: File) -> Self { + RetainedBatchWriter { + log_writer: BufWriter::with_capacity(BUF_WRITER_CAPACITY_BYTES, log_file), + index_writer: BufWriter::with_capacity(BUF_WRITER_CAPACITY_BYTES, index_file), + time_index_writer: BufWriter::with_capacity(BUF_WRITER_CAPACITY_BYTES, time_index_file), + } + } +} diff --git a/server/src/compat/streams/retained_message.rs b/server/src/compat/streams/retained_message.rs new file mode 100644 index 000000000..37213f759 --- /dev/null +++ b/server/src/compat/streams/retained_message.rs @@ -0,0 +1,82 @@ +use crate::compat::message_stream::MessageStream; +use crate::compat::snapshots::message_snapshot::MessageSnapshot; + +use async_stream::try_stream; +use bytes::{BufMut, BytesMut}; +use futures::Stream; +use iggy::bytes_serializable::BytesSerializable; +use iggy::error::IggyError; +use iggy::models::messages::MessageState; +use std::collections::HashMap; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, BufReader}; + +const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000; + +pub struct RetainedMessageStream { + pub reader: BufReader, + read_length: u64, + read_bytes: u64, +} +impl RetainedMessageStream { + pub fn new(file: File, read_length: u64) -> RetainedMessageStream { + RetainedMessageStream { + reader: BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file), + read_bytes: 0, + read_length, + } + } +} + +impl MessageStream for RetainedMessageStream { + type Item = Result; + + fn into_stream(mut self) -> impl Stream { + try_stream! { + while self.read_bytes < self.read_length { + let offset = self.reader.read_u64_le().await?; + self.read_bytes += 8; + + let state = self.reader.read_u8().await?; + self.read_bytes += 1; + + let state = MessageState::from_code(state)?; + let timestamp = self.reader.read_u64_le().await?; + self.read_bytes += 8; + + let id = self.reader.read_u128_le().await?; + self.read_bytes += 16; + + let checksum = self.reader.read_u32_le().await?; + self.read_bytes += 4; + + let headers_length = self.reader.read_u32_le().await?; + self.read_bytes += 4; + + let headers = match headers_length { + 0 => None, + _ => { + let mut headers_payload = BytesMut::with_capacity(headers_length as usize); + headers_payload.put_bytes(0, headers_length as usize); + self.reader.read_exact(&mut headers_payload).await?; + + let headers = HashMap::from_bytes(headers_payload.freeze())?; + Some(headers) + } + }; + self.read_bytes += headers_length as u64; + + let payload_len = self.reader.read_u32_le().await?; + + let mut payload = BytesMut::with_capacity(payload_len as usize); + payload.put_bytes(0, payload_len as usize); + self.reader.read_exact(&mut payload).await?; + self.read_bytes += 4 + payload_len as u64; + + let message = + MessageSnapshot::new(offset, state, timestamp, id, payload.freeze(), checksum, headers); + yield message; + } + } + } +} diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index 560bfc015..d352432d4 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -7,13 +7,15 @@ use crate::configs::server::{ PersonalAccessTokenConfig, ServerConfig, }; use crate::configs::system::{ - CacheConfig, CompressionConfig, DatabaseConfig, EncryptionConfig, LoggingConfig, - MessageDeduplicationConfig, PartitionConfig, RetentionPolicyConfig, RuntimeConfig, - SegmentConfig, StreamConfig, SystemConfig, TopicConfig, + CacheConfig, CompatibilityConfig, CompressionConfig, DatabaseConfig, EncryptionConfig, + LoggingConfig, MessageDeduplicationConfig, PartitionConfig, RetentionPolicyConfig, + RuntimeConfig, SegmentConfig, StreamConfig, SystemConfig, TopicConfig, }; use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use std::sync::Arc; +use super::system::BackupConfig; + impl Default for ServerConfig { fn default() -> ServerConfig { ServerConfig { @@ -138,6 +140,7 @@ impl Default for SystemConfig { fn default() -> SystemConfig { SystemConfig { path: "local_data".to_string(), + backup: BackupConfig::default(), database: DatabaseConfig::default(), runtime: RuntimeConfig::default(), logging: LoggingConfig::default(), @@ -154,6 +157,23 @@ impl Default for SystemConfig { } } +impl Default for BackupConfig { + fn default() -> BackupConfig { + BackupConfig { + path: "backup".to_string(), + compatibility: CompatibilityConfig::default(), + } + } +} + +impl Default for CompatibilityConfig { + fn default() -> Self { + CompatibilityConfig { + path: "compatibility".to_string(), + } + } +} + impl Default for DatabaseConfig { fn default() -> DatabaseConfig { DatabaseConfig { diff --git a/server/src/configs/system.rs b/server/src/configs/system.rs index b1f99a315..d34bd5cf4 100644 --- a/server/src/configs/system.rs +++ b/server/src/configs/system.rs @@ -10,6 +10,7 @@ use serde_with::DisplayFromStr; #[derive(Debug, Deserialize, Serialize)] pub struct SystemConfig { pub path: String, + pub backup: BackupConfig, pub database: DatabaseConfig, pub runtime: RuntimeConfig, pub logging: LoggingConfig, @@ -29,6 +30,17 @@ pub struct DatabaseConfig { pub path: String, } +#[derive(Debug, Deserialize, Serialize)] +pub struct BackupConfig { + pub path: String, + pub compatibility: CompatibilityConfig, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct CompatibilityConfig { + pub path: String, +} + #[derive(Debug, Deserialize, Serialize)] pub struct RuntimeConfig { pub path: String, @@ -111,6 +123,18 @@ impl SystemConfig { self.path.to_string() } + pub fn get_backup_path(&self) -> String { + format!("{}/{}", self.get_system_path(), self.backup.path) + } + + pub fn get_compatibility_backup_path(&self) -> String { + format!( + "{}/{}", + self.get_backup_path(), + self.backup.compatibility.path + ) + } + pub fn get_database_path(&self) -> String { format!("{}/{}", self.get_system_path(), self.database.path) } diff --git a/server/src/lib.rs b/server/src/lib.rs index ff7f20519..b4be2d8f5 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -11,6 +11,7 @@ static GLOBAL: Jemalloc = Jemalloc; pub mod args; pub mod binary; pub mod channels; +pub(crate) mod compat; pub mod configs; pub mod http; pub mod log; diff --git a/server/src/server_error.rs b/server/src/server_error.rs index 11b004551..fecfe7c49 100644 --- a/server/src/server_error.rs +++ b/server/src/server_error.rs @@ -33,4 +33,16 @@ pub enum ServerError { CacheConfigValidationFailure(String), #[error("Command length error: {0}")] CommandLengthError(String), + #[error("Cannot read message, when performing format conversion, {0}")] + InvalidMessageFieldFormatConversionSampling(String), + #[error("Invalid message offset, when performing format conversion")] + InvalidMessageOffsetFormatConversion, + #[error("Invalid batch base offset, when performing format conversion")] + InvalidBatchBaseOffsetFormatConversion, + #[error("Cannot read message batch, when performing format conversion, {0}")] + CannotReadMessageBatchFormatConversion(String), + #[error("Cannot remove old segment files")] + CannotRemoveOldSegmentFiles, + #[error("Cannot persist new segment files")] + CannotPersistNewSegmentFiles, } diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index e90df57eb..30055cee5 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -1,3 +1,4 @@ +use crate::compat::message_converter::MessageFormatConverter; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::segments::segment::{Segment, LOG_EXTENSION}; use crate::streaming::storage::{PartitionStorage, Storage}; @@ -220,6 +221,41 @@ impl Storage for FilePartitionStorage { partition.messages_count_of_parent_topic.clone(), partition.messages_count.clone(), ); + + let log_path = segment.log_path.to_owned(); + let index_path = segment.index_path.to_owned(); + let message_format_converter = + MessageFormatConverter::init(start_offset, log_path, index_path); + + info!("Attempting to detect changes in binary schema for partition with ID: {} and segment with start offset: {}", partition.partition_id, start_offset); + let samplers_count = message_format_converter.samplers.len(); + // Check if partition has any segments + for (idx, sampler) in message_format_converter.samplers.iter().enumerate() { + trace!("Trying to sample the message format for partition with ID: {} and segment with start offset: {}", partition.partition_id, start_offset); + match sampler.try_sample().await { + Ok(schema) if idx == 0 => { + // Found message in the newest format, no conversion needed + trace!("Detected up to date binary schema: {:?}, for partition with ID: {} and segment with start offset: {}", schema, partition.partition_id, start_offset); + break; + } + Ok(schema) => { + // Found old format, need to convert it + info!("Detected changes in binary schema for partition with ID: {} and segment with start offset: {}", partition.partition_id, start_offset); + segment.convert_segment_from_schema(schema).await?; + } + Err(err) if idx + 1 == samplers_count => { + // Didn't find any message format, return an error + return Err(IggyError::CannotLoadResource(anyhow::anyhow!(err) + .context(format!( + "Failed to find a valid message format, when trying to perform a conversion for partition with ID: {} and segment with start offset: {}.", + partition.partition_id, + start_offset + )))); + } + _ => {} + } + } + segment.load().await?; if !segment.is_closed { segment.unsaved_batches = Some(Vec::new()) diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index aca7c64e0..19de899c2 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -1,11 +1,25 @@ +use crate::compat::binary_schema::BinarySchema; +use crate::compat::chunks_error::IntoTryChunksError; +use crate::compat::conversion_writer::ConversionWriter; +use crate::compat::message_converter::MessageFormatConverterPersister; +use crate::compat::message_stream::MessageStream; +use crate::compat::snapshots::retained_batch_snapshot::RetainedMessageBatchSnapshot; +use crate::compat::streams::retained_batch::RetainedBatchWriter; +use crate::compat::streams::retained_message::RetainedMessageStream; use crate::configs::system::SystemConfig; use crate::streaming::batching::message_batch::RetainedMessageBatch; use crate::streaming::segments::index::Index; use crate::streaming::segments::time_index::TimeIndex; +use crate::streaming::sizeable::Sizeable; use crate::streaming::storage::SystemStorage; +use crate::streaming::utils::file; +use futures::{pin_mut, TryStreamExt}; +use iggy::error::IggyError; use iggy::utils::timestamp::IggyTimestamp; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use tokio::io::AsyncWriteExt; +use tracing::{info, trace}; pub const LOG_EXTENSION: &str = "log"; pub const INDEX_EXTENSION: &str = "index"; @@ -134,6 +148,78 @@ impl Segment { fn get_time_index_path(path: &str) -> String { format!("{}.{}", path, TIME_INDEX_EXTENSION) } + + pub async fn convert_segment_from_schema(&self, schema: BinarySchema) -> Result<(), IggyError> { + let log_path = self.log_path.as_str(); + let index_path = self.index_path.as_str(); + let time_index_path = self.time_index_path.as_str(); + + match schema { + BinarySchema::RetainedMessageSchema => { + let file = file::open(&self.log_path).await?; + let file_size = file.metadata().await?.len(); + if file_size == 0 { + return Ok(()); + } + + let compat_backup_path = self.config.get_compatibility_backup_path(); + let conversion_writer = ConversionWriter::init( + log_path, + index_path, + time_index_path, + &compat_backup_path, + ); + conversion_writer.create_alt_directories().await?; + let retained_batch_writer = RetainedBatchWriter::init( + file::append(&conversion_writer.alt_log_path).await?, + file::append(&conversion_writer.alt_index_path).await?, + file::append(&conversion_writer.alt_time_index_path).await?, + ); + + let stream = RetainedMessageStream::new(file, file_size).into_stream(); + pin_mut!(stream); + let (_, mut retained_batch_writer) = stream + .try_chunks(1000) + .try_fold((0u32, retained_batch_writer), |(position, mut retained_batch_writer), messages| async move { + let batch = RetainedMessageBatchSnapshot::try_from_messages(messages) + .map_err(|err| err.into_try_chunks_error())?; + let size = batch.get_size_bytes(); + info!("Converted messages with start offset: {} and end offset: {}, with binary schema: {:?} to newest schema", + batch.base_offset, batch.get_last_offset(), schema); + + batch + .persist(&mut retained_batch_writer.log_writer) + .await + .map_err(|err| err.into_try_chunks_error())?; + trace!("Persisted message batch with new format to log file, saved {} bytes", size); + let relative_offset = (batch.get_last_offset() - self.start_offset) as u32; + batch + .persist_index(position, relative_offset, &mut retained_batch_writer.index_writer) + .await + .map_err(|err| err.into_try_chunks_error())?; + trace!("Persisted index with offset: {} and position: {} to index file", relative_offset, position); + batch + .persist_time_index(batch.max_timestamp, relative_offset, &mut retained_batch_writer.time_index_writer) + .await + .map_err(|err| err.into_try_chunks_error())?; + trace!("Persisted time index with offset: {} to time index file", relative_offset); + let position = position + size; + + Ok((position, retained_batch_writer)) + }) + .await + .map_err(|err| err.1)?; // For now + retained_batch_writer.log_writer.flush().await?; + retained_batch_writer.index_writer.flush().await?; + retained_batch_writer.time_index_writer.flush().await?; + + conversion_writer.create_old_segment_backup().await?; + conversion_writer.replace_with_converted().await?; + Ok(()) + } + BinarySchema::RetainedMessageBatchSchema => Ok(()), + } + } } #[cfg(test)] diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 0c76a48ad..0cf07e5a3 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -23,8 +23,8 @@ use tracing::{error, info, trace, warn}; const EMPTY_INDEXES: Vec = vec![]; const EMPTY_TIME_INDEXES: Vec = vec![]; -const INDEX_SIZE: u32 = 8; -const TIME_INDEX_SIZE: u32 = 12; +pub(crate) const INDEX_SIZE: u32 = 8; +pub(crate) const TIME_INDEX_SIZE: u32 = 12; const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000; #[derive(Debug)] @@ -251,16 +251,13 @@ impl SegmentStorage for FileSegmentStorage { async fn save_batches( &self, segment: &Segment, - messages: &[Arc], + batches: &[Arc], ) -> Result { - let messages_size = messages - .iter() - .map(|message| message.get_size_bytes()) - .sum(); + let messages_size = batches.iter().map(|batch| batch.get_size_bytes()).sum(); let mut bytes = BytesMut::with_capacity(messages_size as usize); - for message in messages { - message.extend(&mut bytes); + for batch in batches { + batch.extend(&mut bytes); } if let Err(err) = self diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index 609574bb0..ad6fdecb6 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -107,7 +107,7 @@ pub trait SegmentStorage: Storage { async fn save_batches( &self, segment: &Segment, - messages: &[Arc], + batches: &[Arc], ) -> Result; async fn load_message_ids(&self, segment: &Segment) -> Result, IggyError>; async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>; @@ -451,7 +451,7 @@ pub(crate) mod tests { async fn save_batches( &self, _segment: &Segment, - _messages: &[Arc], + _batches: &[Arc], ) -> Result { Ok(0) } diff --git a/server/src/streaming/utils/file.rs b/server/src/streaming/utils/file.rs index 4577e550e..5c5cb5e1b 100644 --- a/server/src/streaming/utils/file.rs +++ b/server/src/streaming/utils/file.rs @@ -1,6 +1,6 @@ use atone::Vc; use std::path::{Path, PathBuf}; -use tokio::fs::{read_dir, File, OpenOptions}; +use tokio::fs::{read_dir, remove_file, File, OpenOptions}; pub async fn open(path: &str) -> Result { OpenOptions::new().read(true).open(path).await @@ -18,6 +18,13 @@ pub async fn overwrite(path: &str) -> Result { .open(path) .await } +pub async fn remove(path: &str) -> Result<(), std::io::Error> { + remove_file(path).await +} + +pub async fn rename(old_path: &str, new_path: &str) -> Result<(), std::io::Error> { + tokio::fs::rename(Path::new(old_path), Path::new(new_path)).await +} pub async fn folder_size

(path: P) -> std::io::Result where