Skip to content

Commit

Permalink
chore: revert fill sort_key_ids
Browse files Browse the repository at this point in the history
  • Loading branch information
NGA-TRAN committed Aug 11, 2023
1 parent fcdf7dc commit 9bf1c8c
Show file tree
Hide file tree
Showing 21 changed files with 113 additions and 388 deletions.
15 changes: 6 additions & 9 deletions compactor_test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use compactor::{
PartitionInfo,
};
use compactor_scheduler::SchedulerConfig;
use data_types::{ColumnSet, ColumnType, CompactionLevel, ParquetFile, TableId};
use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_util::config::register_iox_object_store;
use futures::TryStreamExt;
Expand Down Expand Up @@ -93,20 +93,17 @@ impl TestSetupBuilder<false> {
let ns = catalog.create_namespace_1hr_retention("ns").await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
let tag1 = table.create_column("tag1", ColumnType::Tag).await;
let tag2 = table.create_column("tag2", ColumnType::Tag).await;
let tag3 = table.create_column("tag3", ColumnType::Tag).await;
let col_time = table.create_column("time", ColumnType::Time).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;

let partition = table.create_partition("2022-07-13").await;

// The sort key comes from the catalog and should be the union of all tags the
// ingester has seen
let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]);
let sort_key_col_ids = ColumnSet::from([tag1.id(), tag2.id(), tag3.id(), col_time.id()]);
let partition = partition
.update_sort_key(sort_key.clone(), &sort_key_col_ids)
.await;
let partition = partition.update_sort_key(sort_key.clone()).await;

// Ensure the input scenario conforms to the expected invariants.
let invariant_check = Arc::new(CatalogInvariants {
Expand Down
80 changes: 1 addition & 79 deletions data_types/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,6 @@ impl ColumnsByName {
self.0.values().map(|c| c.id)
}

/// Return column ids of the given column names
/// Will panic if any of the names are not found
pub fn ids_for_names(&self, names: &[&str]) -> ColumnSet {
ColumnSet::from(names.iter().map(|name| {
self.get(name)
.unwrap_or_else(|| panic!("column name not found: {}", name))
.id
.get()
}))
}

/// Get a column by its name.
pub fn get(&self, name: &str) -> Option<&ColumnSchema> {
self.0.get(name)
Expand Down Expand Up @@ -342,7 +331,7 @@ impl TryFrom<proto::column_schema::ColumnType> for ColumnType {
}

/// Set of columns.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, sqlx::Type)]
#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)]
#[sqlx(transparent, no_pg_array)]
pub struct ColumnSet(Vec<ColumnId>);

Expand Down Expand Up @@ -382,15 +371,6 @@ impl From<ColumnSet> for Vec<ColumnId> {
}
}

impl<I> From<I> for ColumnSet
where
I: IntoIterator<Item = i64>,
{
fn from(ids: I) -> Self {
Self(ids.into_iter().map(ColumnId::new).collect())
}
}

impl Deref for ColumnSet {
type Target = [ColumnId];

Expand All @@ -402,7 +382,6 @@ impl Deref for ColumnSet {
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use std::collections::BTreeMap;

use super::*;

Expand Down Expand Up @@ -471,61 +450,4 @@ mod tests {

ColumnSchema::try_from(&proto).expect_err("should succeed");
}

#[test]
fn test_columns_by_names_exist() {
let columns = build_columns_by_names();

let ids = columns.ids_for_names(&["foo", "bar"]);
assert_eq!(ids, ColumnSet::from([1, 2]));
}

#[test]
fn test_columns_by_names_exist_different_order() {
let columns = build_columns_by_names();

let ids = columns.ids_for_names(&["bar", "foo"]);
assert_eq!(ids, ColumnSet::from([2, 1]));
}

#[test]
#[should_panic = "column name not found: baz"]
fn test_columns_by_names_not_exist() {
let columns = build_columns_by_names();
columns.ids_for_names(&["foo", "baz"]);
}

fn build_columns_by_names() -> ColumnsByName {
let mut columns: BTreeMap<String, ColumnSchema> = BTreeMap::new();
columns.insert(
"foo".to_string(),
ColumnSchema {
id: ColumnId::new(1),
column_type: ColumnType::I64,
},
);
columns.insert(
"bar".to_string(),
ColumnSchema {
id: ColumnId::new(2),
column_type: ColumnType::I64,
},
);
columns.insert(
"time".to_string(),
ColumnSchema {
id: ColumnId::new(3),
column_type: ColumnType::Time,
},
);
columns.insert(
"tag1".to_string(),
ColumnSchema {
id: ColumnId::new(4),
column_type: ColumnType::Tag,
},
);

ColumnsByName(columns)
}
}
18 changes: 3 additions & 15 deletions data_types/src/partition.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Types having to do with partitions.
use crate::ColumnSet;

use super::{TableId, Timestamp};

use schema::sort::SortKey;
Expand Down Expand Up @@ -360,15 +358,9 @@ pub struct Partition {
pub table_id: TableId,
/// the string key of the partition
pub partition_key: PartitionKey,

// TODO: This `sort_key` will be removed after `sort_key_ids` is fully implemented
/// See comments of [`Partition::sort_key_ids`] for this as it plays the
/// same role but stores column IDs instead of names
pub sort_key: Vec<String>,

/// vector of column IDs that describes how *every* parquet file
/// vector of column names that describes how *every* parquet file
/// in this [`Partition`] is sorted. The sort_key contains all the
/// ID of primary key (PK) columns that have been persisted, and nothing
/// primary key (PK) columns that have been persisted, and nothing
/// else. The PK columns are all `tag` columns and the `time`
/// column.
///
Expand All @@ -391,7 +383,7 @@ pub struct Partition {
/// For example, updating `A,B,C` to either `A,D,B,C` or `A,B,C,D`
/// is legal. However, updating to `A,C,D,B` is not because the
/// relative order of B and C have been reversed.
pub sort_key_ids: ColumnSet,
pub sort_key: Vec<String>,

/// The time at which the newest file of the partition is created
pub new_file_at: Option<Timestamp>,
Expand All @@ -407,7 +399,6 @@ impl Partition {
table_id: TableId,
partition_key: PartitionKey,
sort_key: Vec<String>,
sort_key_ids: ColumnSet,
new_file_at: Option<Timestamp>,
) -> Self {
let hash_id = PartitionHashId::new(table_id, &partition_key);
Expand All @@ -417,7 +408,6 @@ impl Partition {
table_id,
partition_key,
sort_key,
sort_key_ids,
new_file_at,
}
}
Expand All @@ -434,7 +424,6 @@ impl Partition {
table_id: TableId,
partition_key: PartitionKey,
sort_key: Vec<String>,
sort_key_ids: ColumnSet,
new_file_at: Option<Timestamp>,
) -> Self {
Self {
Expand All @@ -443,7 +432,6 @@ impl Partition {
table_id,
partition_key,
sort_key,
sort_key_ids,
new_file_at,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ message Partition {
repeated string array_sort_key = 6;

PartitionIdentifier identifier = 8;

// the sort key ids sort_key_ids for data in parquet files of this partition which
// is an array of column ids of the sort keys
repeated int64 array_sort_key_ids = 9;
}

message GetPartitionsByTableIdRequest {
Expand Down
67 changes: 15 additions & 52 deletions import_export/src/file/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use data_types::{
partition_template::{
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, PARTITION_BY_DAY_PROTO,
},
ColumnSet, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceName,
NamespaceNameError, ParquetFileParams, Partition, PartitionKey, Statistics, Table, TableId,
Timestamp,
ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceName, NamespaceNameError,
ParquetFileParams, Partition, Statistics, Table, TableId, Timestamp,
};
use generated_types::influxdata::iox::catalog::v1 as proto;
// ParquetFile as ProtoParquetFile, Partition as ProtoPartition,
Expand Down Expand Up @@ -370,10 +369,8 @@ impl RemoteImporter {
let table_id = table.id;
debug!(%table_id, "Inserting catalog records into table");

// Create a new partition
let partition_key = iox_metadata.partition_key.clone();
let mut partition = self
.create_partition(repos.as_mut(), &table, partition_key)
let partition = self
.partition_for_parquet_file(repos.as_mut(), &table, &iox_metadata)
.await?;

// Note that for some reason, the object_store_id that is
Expand Down Expand Up @@ -419,11 +416,6 @@ impl RemoteImporter {
}
};

// Update partition sort key
let partition = self
.update_partition(&mut partition, repos.as_mut(), &table, &iox_metadata)
.await?;

// Now copy the parquet files into the object store
//let partition_id = TransitionPartitionId::Deprecated(partition.id);
let transition_partition_id = partition.transition_partition_id();
Expand Down Expand Up @@ -479,40 +471,25 @@ impl RemoteImporter {
Ok(table)
}

/// Create the catalog [`Partition`] into which the specified parquet
/// Return the catalog [`Partition`] into which the specified parquet
/// file shoudl be inserted.
///
/// The sort_key and sort_key_ids of the partition should be empty when it is first created
/// because there are no columns in any parquet files to use for sorting yet.
/// The sort_key and sort_key_ids will be updated after the parquet files are created.
async fn create_partition(
&self,
repos: &mut dyn RepoCollection,
table: &Table,
partition_key: PartitionKey,
) -> Result<Partition> {
let partition = repos
.partitions()
.create_or_get(partition_key, table.id)
.await?;

Ok(partition)
}

/// Update sort keys of the partition
///
/// First attempts to use any available metadata from the
/// catalog export, and falls back to what is in the iox
/// metadata stored in the parquet file, if needed
async fn update_partition(
async fn partition_for_parquet_file(
&self,
partition: &mut Partition,
repos: &mut dyn RepoCollection,
table: &Table,
iox_metadata: &IoxMetadata,
) -> Result<Partition> {
let partition_key = iox_metadata.partition_key.clone();

let partition = repos
.partitions()
.create_or_get(partition_key.clone(), table.id)
.await?;

// Note we use the table_id embedded in the file's metadata
// from the source catalog to match the exported catlog (which
// is dfferent than the new table we just created in the
Expand All @@ -521,35 +498,22 @@ impl RemoteImporter {
.exported_contents
.partition_metadata(iox_metadata.table_id.get(), partition_key.inner());

let (new_sort_key, new_sort_key_ids) = if let Some(proto_partition) =
proto_partition.as_ref()
{
let new_sort_key: Vec<&str> = if let Some(proto_partition) = proto_partition.as_ref() {
// Use the sort key from the source catalog
debug!(array_sort_key=?proto_partition.array_sort_key, "Using sort key from catalog export");
let new_sort_key = proto_partition
proto_partition
.array_sort_key
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>();

let new_sort_key_ids =
ColumnSet::from(proto_partition.array_sort_key_ids.iter().cloned());

(new_sort_key, new_sort_key_ids)
.collect()
} else {
warn!("Could not find sort key in catalog metadata export, falling back to embedded metadata");
let sort_key = iox_metadata
.sort_key
.as_ref()
.ok_or_else(|| Error::NoSortKey)?;

let new_sort_key = sort_key.to_columns().collect::<Vec<_>>();

// fecth table columns
let columns = ColumnsByName::new(repos.columns().list_by_table_id(table.id).await?);
let new_sort_key_ids = columns.ids_for_names(&new_sort_key);

(new_sort_key, new_sort_key_ids)
sort_key.to_columns().collect()
};

if !partition.sort_key.is_empty() && partition.sort_key != new_sort_key {
Expand All @@ -565,7 +529,6 @@ impl RemoteImporter {
&partition.transition_partition_id(),
Some(partition.sort_key.clone()),
&new_sort_key,
&new_sort_key_ids,
)
.await;

Expand Down
8 changes: 1 addition & 7 deletions ingester/src/buffer_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ mod tests {
use arrow_util::assert_batches_eq;
use assert_matches::assert_matches;
use backoff::BackoffConfig;
use data_types::ColumnSet;
use datafusion::{
physical_expr::PhysicalSortExpr,
physical_plan::{expressions::col, memory::MemoryExec, ExecutionPlan},
Expand Down Expand Up @@ -1007,12 +1006,7 @@ mod tests {
.repositories()
.await
.partitions()
.cas_sort_key(
&partition.transition_partition_id(),
None,
&["terrific"],
&ColumnSet::from([1]),
)
.cas_sort_key(&partition.transition_partition_id(), None, &["terrific"])
.await
.unwrap();

Expand Down
Loading

0 comments on commit 9bf1c8c

Please sign in to comment.