From 7a2867b98bc540237bfc3a56f748c602efdb529c Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Wed, 8 May 2024 13:05:24 -0400 Subject: [PATCH] feat: Store precision in WAL for replayability (#24966) Up to this point we assumed that a precision for everything was in nanoseconds. While we do write and persist data as nanoseconds we made this assumption for the WAL. However, we store the original line protocol data. If we want it to be replayable we would need to include the precision and use that when loading the WAL from disk. This commit changes the code to do that and we can see that that data is definitely peristed as the WAL is now bigger in the tests. --- influxdb3_write/src/lib.rs | 5 +- influxdb3_write/src/wal.rs | 94 +++++++++++++++++++ .../src/write_buffer/buffer_segment.rs | 7 +- influxdb3_write/src/write_buffer/loader.rs | 9 +- influxdb3_write/src/write_buffer/mod.rs | 2 + 5 files changed, 111 insertions(+), 6 deletions(-) diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 6d8b86d2f89..c654629917b 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -447,6 +447,7 @@ pub struct LpWriteOp { pub db_name: String, pub lp: String, pub default_time: i64, + pub precision: Precision, } /// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while @@ -533,8 +534,8 @@ impl ParquetFile { } } -/// The summary data for a persisted parquet file in a segment. -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +/// The precision of the timestamp +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum Precision { Auto, diff --git a/influxdb3_write/src/wal.rs b/influxdb3_write/src/wal.rs index 4faddb2f91c..945c315e83c 100644 --- a/influxdb3_write/src/wal.rs +++ b/influxdb3_write/src/wal.rs @@ -660,7 +660,9 @@ fn segment_id_from_file_name(name: &str) -> Result { #[cfg(test)] mod tests { use super::*; + use crate::Catalog; use crate::LpWriteOp; + use crate::Precision; #[test] fn segment_writer_reader() { @@ -673,6 +675,7 @@ mod tests { db_name: "foo".to_string(), lp: "cpu host=a val=10i 10".to_string(), default_time: 1, + precision: Precision::Nanosecond, }); writer.write_batch(vec![wal_op.clone()]).unwrap(); @@ -690,6 +693,7 @@ mod tests { db_name: "foo".to_string(), lp: "cpu host=a val=10i 10".to_string(), default_time: 1, + precision: Precision::Nanosecond, }); // open the file, write and close it @@ -729,6 +733,7 @@ mod tests { db_name: "foo".to_string(), lp: "cpu host=a val=10i 10".to_string(), default_time: 1, + precision: Precision::Nanosecond, }); let wal = WalImpl::new(dir.clone()).unwrap(); @@ -752,4 +757,93 @@ mod tests { assert_eq!(batch.ops, vec![wal_op.clone()]); assert_eq!(batch.sequence_number, SequenceNumber::new(1)); } + + #[test] + fn wal_written_and_read_with_different_precisions() { + let dir = test_helpers::tmp_dir().unwrap().into_path(); + let wal = WalImpl::new(dir.clone()).unwrap(); + let wal_ops = vec![ + WalOp::LpWrite(LpWriteOp { + db_name: "foo".to_string(), + lp: "cpu,host=a val=1i 1".to_string(), + default_time: 1, + precision: Precision::Second, + }), + WalOp::LpWrite(LpWriteOp { + db_name: "foo".to_string(), + lp: "cpu,host=b val=2i 1000".to_string(), + default_time: 1, + precision: Precision::Millisecond, + }), + WalOp::LpWrite(LpWriteOp { + db_name: "foo".to_string(), + lp: "cpu,host=c val=3i 1000000".to_string(), + default_time: 1, + precision: Precision::Microsecond, + }), + WalOp::LpWrite(LpWriteOp { + db_name: "foo".to_string(), + lp: "cpu,host=d val=4i 1000000000".to_string(), + default_time: 1, + precision: Precision::Nanosecond, + }), + WalOp::LpWrite(LpWriteOp { + db_name: "foo".to_string(), + lp: "cpu,host=e val=5i 1".to_string(), + default_time: 1, + precision: Precision::Auto, + }), + ]; + + let segment = SegmentId::new(0); + // open the file, write and close it + { + let mut writer = wal + .new_segment_writer(segment, SegmentRange::test_range()) + .unwrap(); + writer.write_batch(wal_ops).unwrap(); + // close the wal + drop(wal); + } + + // Reopen the wal and make sure it loads the precision via + // `load_buffer_from_segment` + let catalog = Catalog::default(); + let wal = WalImpl::new(dir).unwrap(); + let schema = schema::SchemaBuilder::new() + .tag("host") + .influx_column( + "val", + schema::InfluxColumnType::Field(schema::InfluxFieldType::Integer), + ) + .timestamp() + .build() + .unwrap(); + + // Load the data into a buffer. + let buffer = crate::write_buffer::buffer_segment::load_buffer_from_segment( + &catalog, + wal.open_segment_reader(segment).unwrap(), + ) + .unwrap() + .0; + + // Get the buffer data as record batches + let batch = buffer + .table_record_batches("foo", "cpu", schema.as_arrow(), &[]) + .unwrap() + .unwrap(); + let mut writer = arrow::json::LineDelimitedWriter::new(Vec::new()); + writer.write_batches(&[&batch]).unwrap(); + writer.finish().unwrap(); + + pretty_assertions::assert_eq!( + "{\"host\":\"a\",\"time\":\"1970-01-01T00:00:01\",\"val\":1}\n\ + {\"host\":\"b\",\"time\":\"1970-01-01T00:00:01\",\"val\":2}\n\ + {\"host\":\"c\",\"time\":\"1970-01-01T00:00:01\",\"val\":3}\n\ + {\"host\":\"d\",\"time\":\"1970-01-01T00:00:01\",\"val\":4}\n\ + {\"host\":\"e\",\"time\":\"1970-01-01T00:00:01\",\"val\":5}\n", + String::from_utf8(writer.into_inner()).unwrap() + ) + } } diff --git a/influxdb3_write/src/write_buffer/buffer_segment.rs b/influxdb3_write/src/write_buffer/buffer_segment.rs index d9dbcd9946a..cb3139af75c 100644 --- a/influxdb3_write/src/write_buffer/buffer_segment.rs +++ b/influxdb3_write/src/write_buffer/buffer_segment.rs @@ -12,8 +12,8 @@ use crate::write_buffer::{ }; use crate::{ wal, write_buffer, write_buffer::Result, DatabaseTables, ParquetFile, PersistedSegment, - Persister, Precision, SegmentDuration, SegmentId, SegmentRange, SequenceNumber, - TableParquetFiles, WalOp, WalSegmentReader, WalSegmentWriter, + Persister, SegmentDuration, SegmentId, SegmentRange, SequenceNumber, TableParquetFiles, WalOp, + WalSegmentReader, WalSegmentWriter, }; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -196,7 +196,7 @@ pub(crate) fn load_buffer_from_segment( Time::from_timestamp_nanos(write.default_time), segment_duration, false, - Precision::Nanosecond, + write.precision, )?; let db_name = &write.db_name; @@ -736,6 +736,7 @@ pub(crate) mod tests { db_name: "db1".to_string(), lp: lp.to_string(), default_time: 0, + precision: crate::Precision::Nanosecond, }); let write_batch = lp_to_write_batch(&catalog, "db1", lp); diff --git a/influxdb3_write/src/write_buffer/loader.rs b/influxdb3_write/src/write_buffer/loader.rs index 8bc097f3cff..88e27ea8918 100644 --- a/influxdb3_write/src/write_buffer/loader.rs +++ b/influxdb3_write/src/write_buffer/loader.rs @@ -145,6 +145,7 @@ mod tests { use crate::persister::PersisterImpl; use crate::test_helpers::lp_to_write_batch; use crate::wal::{WalImpl, WalSegmentWriterNoopImpl}; + use crate::Precision; use crate::{ DatabaseTables, LpWriteOp, ParquetFile, SegmentRange, SequenceNumber, TableParquetFiles, WalOp, @@ -180,6 +181,7 @@ mod tests { db_name: "db1".to_string(), lp: lp.to_string(), default_time: 0, + precision: Precision::Nanosecond, }); let write_batch = lp_to_write_batch(&catalog, "db1", lp); @@ -267,6 +269,7 @@ mod tests { db_name: db_name.to_string(), lp: lp.to_string(), default_time: 0, + precision: Precision::Nanosecond, }); let write_batch = lp_to_write_batch(&catalog, db_name, lp); @@ -352,6 +355,7 @@ mod tests { db_name: db_name.to_string(), lp: lp.to_string(), default_time: 0, + precision: Precision::Nanosecond, }); let write_batch = lp_to_write_batch(&catalog, db_name, lp); @@ -379,6 +383,7 @@ mod tests { db_name: db_name.to_string(), lp: lp.to_string(), default_time: 0, + precision: Precision::Nanosecond, }); let write_batch = lp_to_write_batch(&catalog, db_name, lp); @@ -417,7 +422,7 @@ mod tests { loaded_state.persisted_segments[0], PersistedSegment { segment_id, - segment_wal_size_bytes: 227, + segment_wal_size_bytes: 252, segment_parquet_size_bytes: 3458, segment_row_count: 3, segment_min_time: 10, @@ -526,6 +531,7 @@ mod tests { db_name: db_name.to_string(), lp: lp.to_string(), default_time: 0, + precision: Precision::Nanosecond, }); let write_batch = lp_to_write_batch(&catalog, db_name, lp); @@ -546,6 +552,7 @@ mod tests { db_name: db_name.to_string(), lp: lp.to_string(), default_time: 0, + precision: Precision::Nanosecond, }); let write_batch = lp_to_write_batch(&catalog, db_name, lp); diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index fdb84b56a04..29b9f36321b 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -577,6 +577,7 @@ pub(crate) fn validate_or_insert_schema_and_partitions( db_name: db_name.to_string(), lp: table_batches.lines.join("\n"), default_time: ingest_time.timestamp_nanos(), + precision, }), starting_catalog_sequence_number, }) @@ -928,6 +929,7 @@ mod tests { db_name: "foo".to_string(), lp: "cpu bar=1 10".to_string(), default_time: 123, + precision: Precision::Nanosecond, })], }; assert_eq!(batch, expected_batch);