Skip to content

Commit

Permalink
Implement message compatibility conversion mechanism (iggy-rs#781)
Browse files Browse the repository at this point in the history
Co-authored-by: Piotr Gankiewicz <[email protected]>
  • Loading branch information
numinnex and spetz authored Mar 18, 2024
1 parent 122c6db commit c2da419
Show file tree
Hide file tree
Showing 30 changed files with 1,017 additions and 18 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@
"database": {
"path": "database"
},
"backup" : {
"path": "backup",
"compatibility": {
"path": "compatibility"
}
},
"runtime": {
"path": "runtime"
},
Expand Down
10 changes: 10 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ interval = "1m"
# Base path for system data storage.
path = "local_data"

# Backup configuration
[system.backup]
# Path for storing backup.
path = "backup"

# Compatibility conversion configuration
[system.backup.compatibility]
# Subpath of the backup directory where converted segment data is stored after compatibility conversion.
path = "compatibility"

# Database configuration.
[system.database]
# Path for storing database files.
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ pub enum IggyError {
CannotReadLastOffsetDelta = 7002,
#[error("Cannot read batch maximum timestamp")]
CannotReadMaxTimestamp = 7003,
#[error("Cannot read batch attributes")]
#[error("Cannot read batch payload")]
CannotReadBatchPayload = 7004,
}

Expand Down
3 changes: 2 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.2.6"
version = "0.2.7"
edition = "2021"
build = "src/build.rs"

Expand All @@ -12,6 +12,7 @@ tokio-console = ["dep:console-subscriber", "tokio/tracing"]
[dependencies]
aes-gcm = "0.10.3"
anyhow = "1.0.81"
async-stream = "0.3.5"
async-trait = "0.1.77"
atone = "0.3.7"
axum = "0.7.4"
Expand Down
5 changes: 5 additions & 0 deletions server/src/compat/binary_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#[derive(Debug, Clone, Copy)]
pub enum BinarySchema {
RetainedMessageSchema,
RetainedMessageBatchSchema,
}
18 changes: 18 additions & 0 deletions server/src/compat/chunks_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use futures::stream::TryChunksError;
use iggy::error::IggyError;

pub trait IntoTryChunksError<T> {
fn into_try_chunks_error(self) -> TryChunksError<T, IggyError>;
}

impl<T> IntoTryChunksError<T> for IggyError {
fn into_try_chunks_error(self) -> TryChunksError<T, IggyError> {
TryChunksError(Vec::new(), self)
}
}

impl<T> IntoTryChunksError<T> for std::io::Error {
fn into_try_chunks_error(self) -> TryChunksError<T, IggyError> {
TryChunksError(Vec::new(), IggyError::from(self))
}
}
113 changes: 113 additions & 0 deletions server/src/compat/conversion_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use crate::streaming::utils::file;
use iggy::error::IggyError;
use tracing::trace;

//TODO (numinex) - Make this writer transactional
pub struct ConversionWriter<'w> {
pub log_path: &'w str,
pub index_path: &'w str,
pub time_index_path: &'w str,

pub alt_log_path: String,
pub alt_index_path: String,
pub alt_time_index_path: String,

compat_backup_path: &'w str,
}

impl<'w> ConversionWriter<'w> {
pub fn init(
log_path: &'w str,
index_path: &'w str,
time_index_path: &'w str,
compat_backup_path: &'w str,
) -> ConversionWriter<'w> {
ConversionWriter {
log_path,
index_path,
time_index_path,
alt_log_path: format!("{}_temp.{}", log_path.split('.').next().unwrap(), "log"),
alt_index_path: format!("{}_temp.{}", index_path.split('.').next().unwrap(), "index"),
alt_time_index_path: format!(
"{}_temp.{}",
time_index_path.split('.').next().unwrap(),
"timeindex"
),
compat_backup_path,
}
}

pub async fn create_alt_directories(&self) -> Result<(), IggyError> {
tokio::fs::File::create(&self.alt_log_path).await?;
tokio::fs::File::create(&self.alt_index_path).await?;
tokio::fs::File::create(&self.alt_time_index_path).await?;

trace!(
"Created temporary files for conversion, log: {}, index: {}, time_index: {}",
&self.alt_log_path,
&self.alt_index_path,
&self.alt_time_index_path
);
Ok(())
}

pub async fn create_old_segment_backup(&self) -> Result<(), IggyError> {
let log_backup_path = &self
.log_path
.split_once('/')
.map(|(_, path)| format!("{}/{}", &self.compat_backup_path, path))
.unwrap();
let index_backup_path = &self
.index_path
.split_once('/')
.map(|(_, path)| format!("{}/{}", self.compat_backup_path, path))
.unwrap();
let time_index_backup_path = &self
.time_index_path
.split_once('/')
.map(|(_, path)| format!("{}/{}", self.compat_backup_path, path))
.unwrap();

let log_path_last_idx = log_backup_path.rfind('/').unwrap();
let index_path_last_idx = index_backup_path.rfind('/').unwrap();
let time_index_path_last_idx = time_index_backup_path.rfind('/').unwrap();
if tokio::fs::metadata(&log_backup_path[..log_path_last_idx])
.await
.is_err()
{
tokio::fs::create_dir_all(&log_backup_path[..log_path_last_idx]).await?;
}
if tokio::fs::metadata(&index_backup_path[..index_path_last_idx])
.await
.is_err()
{
tokio::fs::create_dir_all(&index_backup_path[..index_path_last_idx]).await?;
}
if tokio::fs::metadata(&time_index_backup_path[..time_index_path_last_idx])
.await
.is_err()
{
tokio::fs::create_dir_all(&time_index_backup_path[..time_index_path_last_idx]).await?;
}
file::rename(self.log_path, log_backup_path).await?;
file::rename(self.index_path, index_backup_path).await?;
file::rename(self.time_index_path, time_index_backup_path).await?;

trace!(
"Created backup of converted segment, log: {}, index: {}, time_index: {}",
&log_backup_path,
&index_backup_path,
&time_index_backup_path
);
Ok(())
}

pub async fn replace_with_converted(&self) -> Result<(), IggyError> {
file::rename(&self.alt_log_path, self.log_path).await?;
file::rename(&self.alt_index_path, self.index_path).await?;
file::rename(&self.alt_time_index_path, self.time_index_path).await?;

trace!("Replaced old segment with newly converted files");
Ok(())
}
}
99 changes: 99 additions & 0 deletions server/src/compat/message_converter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use crate::compat::samplers::message_sampler::MessageSampler;
use crate::compat::samplers::retained_batch_sampler::RetainedMessageBatchSampler;
use crate::compat::schema_sampler::BinarySchemaSampler;
use crate::streaming::sizeable::Sizeable;
use bytes::{BufMut, BytesMut};
use iggy::error::IggyError;

use crate::streaming::segments::storage::{INDEX_SIZE, TIME_INDEX_SIZE};
use tokio::io::{AsyncWrite, AsyncWriteExt};

pub trait Extendable {
fn extend(&self, bytes: &mut BytesMut);
}

pub trait MessageFormatConverterPersister<W: AsyncWrite> {
async fn persist(&self, writer: &mut W) -> Result<(), IggyError>;
async fn persist_index(
&self,
position: u32,
relative_offset: u32,
writer: &mut W,
) -> Result<(), IggyError>;
async fn persist_time_index(
&self,
timestamp: u64,
relative_offset: u32,
writer: &mut W,
) -> Result<(), IggyError>;
}

impl<W: AsyncWrite + Unpin, T> MessageFormatConverterPersister<W> for T
where
T: Sizeable + Extendable,
{
async fn persist(&self, writer: &mut W) -> Result<(), IggyError> {
let size = self.get_size_bytes();
let mut batch_bytes = BytesMut::with_capacity(size as usize);
self.extend(&mut batch_bytes);

writer.write_all(&batch_bytes).await?;
Ok(())
}

async fn persist_index(
&self,
position: u32,
relative_offset: u32,
writer: &mut W,
) -> Result<(), IggyError> {
let mut index_bytes = BytesMut::with_capacity(INDEX_SIZE as usize);
index_bytes.put_u32_le(relative_offset);
index_bytes.put_u32_le(position);

writer.write_all(&index_bytes).await?;
Ok(())
}

async fn persist_time_index(
&self,
timestamp: u64,
relative_offset: u32,
writer: &mut W,
) -> Result<(), IggyError> {
let mut time_index_bytes = BytesMut::with_capacity(TIME_INDEX_SIZE as usize);
time_index_bytes.put_u32_le(relative_offset);
time_index_bytes.put_u64_le(timestamp);

writer.write_all(&time_index_bytes).await?;
Ok(())
}
}

pub struct MessageFormatConverter {
pub samplers: Vec<Box<dyn BinarySchemaSampler>>,
}

impl MessageFormatConverter {
pub fn init(
segment_start_offset: u64,
log_path: String,
index_path: String,
) -> MessageFormatConverter {
// Always append new schemas to beginning of vec
MessageFormatConverter {
samplers: vec![
Box::new(RetainedMessageBatchSampler::new(
segment_start_offset,
log_path.clone(),
index_path.clone(),
)),
Box::new(MessageSampler::new(
segment_start_offset,
log_path,
index_path,
)),
],
}
}
}
6 changes: 6 additions & 0 deletions server/src/compat/message_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use futures::Stream;

pub trait MessageStream {
type Item;
fn into_stream(self) -> impl Stream<Item = Self::Item>;
}
9 changes: 9 additions & 0 deletions server/src/compat/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pub(crate) mod binary_schema;
pub(crate) mod chunks_error;
pub(crate) mod conversion_writer;
pub(crate) mod message_converter;
pub(crate) mod message_stream;
pub(crate) mod samplers;
pub(crate) mod schema_sampler;
pub(crate) mod snapshots;
pub(crate) mod streams;
53 changes: 53 additions & 0 deletions server/src/compat/samplers/message_sampler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::compat::binary_schema::BinarySchema;
use crate::compat::schema_sampler::BinarySchemaSampler;
use crate::compat::snapshots::message_snapshot::MessageSnapshot;
use crate::server_error::ServerError;
use crate::streaming::utils::file;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use tokio::io::AsyncReadExt;

pub struct MessageSampler {
pub segment_start_offset: u64,
pub log_path: String,
pub index_path: String,
}
impl MessageSampler {
pub fn new(segment_start_offset: u64, log_path: String, index_path: String) -> MessageSampler {
MessageSampler {
segment_start_offset,
log_path,
index_path,
}
}
}

unsafe impl Send for MessageSampler {}
unsafe impl Sync for MessageSampler {}

#[async_trait]
impl BinarySchemaSampler for MessageSampler {
async fn try_sample(&self) -> Result<BinarySchema, ServerError> {
let mut index_file = file::open(&self.index_path).await?;
let mut log_file = file::open(&self.log_path).await?;
let log_file_size = log_file.metadata().await?.len();

if log_file_size == 0 {
return Ok(BinarySchema::RetainedMessageSchema);
}

let _ = index_file.read_u32_le().await?;
let end_position = index_file.read_u32_le().await?;

let buffer_size = end_position as usize;
let mut buffer = BytesMut::with_capacity(buffer_size);
buffer.put_bytes(0, buffer_size);
let _ = log_file.read_exact(&mut buffer).await?;

let message = MessageSnapshot::try_from(buffer.freeze())?;
if message.offset != self.segment_start_offset {
return Err(ServerError::InvalidMessageOffsetFormatConversion);
}
Ok(BinarySchema::RetainedMessageSchema)
}
}
2 changes: 2 additions & 0 deletions server/src/compat/samplers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod message_sampler;
pub(crate) mod retained_batch_sampler;
Loading

0 comments on commit c2da419

Please sign in to comment.