diff --git a/Cargo.lock b/Cargo.lock index 262b4f97fb9..1e81f75863e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -559,7 +559,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -570,7 +570,7 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -992,7 +992,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -1294,7 +1294,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -1752,9 +1752,9 @@ dependencies = [ [[package]] name = "either" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" dependencies = [ "serde", ] @@ -1991,7 +1991,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -2057,9 +2057,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.15" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" dependencies = [ "cfg-if", "libc", @@ -2596,6 +2596,7 @@ dependencies = [ "iox_query_influxql", "iox_query_influxql_rewrite", "iox_query_params", + "iox_system_tables", "iox_time", "metric", "metric_exporters", @@ -2909,6 +2910,18 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "iox_system_tables" +version = "0.1.0" +source = "git+https://github.com/influxdata/influxdb3_core?rev=0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94#0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" +dependencies = [ + "arrow", + "async-trait", + "datafusion", + "futures", + "workspace-hack", +] + [[package]] name = "iox_time" version = "0.1.0" @@ -3110,9 +3123,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.154" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libm" @@ -3139,9 +3152,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "lock_api" @@ -3842,7 +3855,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -3962,7 +3975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -4037,7 +4050,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" dependencies = [ "bytes", - "prost-derive 0.12.5", + "prost-derive 0.12.4", ] [[package]] @@ -4057,7 +4070,7 @@ dependencies = [ "prost 0.12.4", "prost-types 0.12.4", "regex", - "syn 2.0.63", + "syn 2.0.64", "tempfile", ] @@ -4076,15 +4089,15 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.5" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a" +checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" dependencies = [ "anyhow", "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -4440,7 +4453,7 @@ dependencies = [ "log", "ring", "rustls-pki-types", - "rustls-webpki 0.102.3", + "rustls-webpki 0.102.4", "subtle", "zeroize", ] @@ -4507,9 +4520,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.3" +version = "0.102.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf" +checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" dependencies = [ "ring", "rustls-pki-types", @@ -4649,7 +4662,7 @@ checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -4853,7 +4866,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -4926,7 +4939,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -5183,7 +5196,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -5205,9 +5218,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.63" +version = "2.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf5be731623ca1a1fb7d8be6f261a3be6d3e2337b8a1f97be944d020c8fcb704" +checksum = "7ad3dee41f36859875573074334c200d1add8e4a87bb37113ebd31d926b7b11f" dependencies = [ "proc-macro2", "quote", @@ -5358,7 +5371,7 @@ checksum = "e2470041c06ec3ac1ab38d0356a6119054dedaea53e12fbefc0de730a1c08524" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -5486,7 +5499,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -5626,7 +5639,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -5791,7 +5804,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -6087,7 +6100,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", "wasm-bindgen-shared", ] @@ -6121,7 +6134,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6471,7 +6484,7 @@ dependencies = [ "strum", "subtle", "syn 1.0.109", - "syn 2.0.63", + "syn 2.0.64", "thrift", "tokio", "tokio-stream", @@ -6525,7 +6538,7 @@ checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] @@ -6545,7 +6558,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.64", ] [[package]] diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index ad63b57331f..8b850294194 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -156,6 +156,17 @@ pub struct Config { action )] pub segment_duration: SegmentDuration, + + // TODO - tune this default: + /// The size of the query log. Up to this many queries will remain in the log before + /// old queries are evicted to make room for new ones. + #[clap( + long = "query-log-size", + env = "INFLUXDB3_QUERY_LOG_SIZE", + default_value = "1000", + action + )] + pub query_log_size: usize, } /// If `p` does not exist, try to create it as a directory. @@ -275,6 +286,7 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&metrics), Arc::new(config.datafusion_config), 10, + config.query_log_size, )); let builder = ServerBuilder::new(common_state) diff --git a/influxdb3/tests/server/main.rs b/influxdb3/tests/server/main.rs index 775012b67eb..9da022a4e73 100644 --- a/influxdb3/tests/server/main.rs +++ b/influxdb3/tests/server/main.rs @@ -17,6 +17,7 @@ mod flight; mod limits; mod ping; mod query; +mod system_tables; mod write; /// Configuration for a [`TestServer`] @@ -115,6 +116,14 @@ impl TestServer { /// Get a [`FlightSqlClient`] for making requests to the running service over gRPC pub async fn flight_sql_client(&self, database: &str) -> FlightSqlClient { + self.flight_sql_client_debug_mode(database, false).await + } + + pub async fn flight_sql_client_debug_mode( + &self, + database: &str, + debug_mode: bool, + ) -> FlightSqlClient { let channel = tonic::transport::Channel::from_shared(self.client_addr()) .expect("create tonic channel") .connect() @@ -122,7 +131,9 @@ impl TestServer { .expect("connect to gRPC client"); let mut client = FlightSqlClient::new(channel); client.add_header("database", database).unwrap(); - client.add_header("iox-debug", "true").unwrap(); + if debug_mode { + client.add_header("iox-debug", "true").unwrap(); + } client } diff --git a/influxdb3/tests/server/system_tables.rs b/influxdb3/tests/server/system_tables.rs new file mode 100644 index 00000000000..755012c7843 --- /dev/null +++ b/influxdb3/tests/server/system_tables.rs @@ -0,0 +1,92 @@ +use arrow_util::assert_batches_sorted_eq; +use influxdb3_client::Precision; + +use crate::{collect_stream, TestServer}; + +#[tokio::test] +async fn queries_table() { + let server = TestServer::spawn().await; + + server + .write_lp_to_db( + "foo", + "cpu,host=s1,region=us-east usage=0.9 1\n\ + cpu,host=s1,region=us-east usage=0.89 2\n\ + cpu,host=s1,region=us-east usage=0.85 3", + Precision::Nanosecond, + ) + .await + .expect("write some lp"); + + let mut client = server.flight_sql_client_debug_mode("foo", true).await; + + // Check queries table for completed queries, will be empty: + { + let response = client + .query("SELECT COUNT(*) FROM system.queries WHERE running = false") + .await + .unwrap(); + + let batches = collect_stream(response).await; + assert_batches_sorted_eq!( + [ + "+----------+", + "| COUNT(*) |", + "+----------+", + "| 0 |", + "+----------+", + ], + &batches + ); + } + + // Do some queries, to produce some query logs: + { + let queries = [ + "SELECT * FROM cpu", // valid + "SELECT * FROM mem", // not valid table, will fail, and not be logged + "SELECT usage, time FROM cpu", // specific columns + ]; + for q in queries { + let response = client.query(q).await; + // collect the stream to make sure the query completes: + if let Ok(stream) = response { + let _batches = collect_stream(stream).await; + } + } + } + + // Now check the log: + { + // A sub-set of columns is selected in this query, because the queries + // table contains may columns whose values change in susequent test runs + let response = client + .query( + "SELECT \ + phase, \ + query_type, \ + query_text, \ + success, \ + running, \ + cancelled \ + FROM system.queries \ + WHERE success = true", + ) + .await + .unwrap(); + + let batches = collect_stream(response).await; + assert_batches_sorted_eq!( + [ + "+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+", + "| phase | query_type | query_text | success | running | cancelled |", + "+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+", + "| success | flightsql | CommandStatementQuerySELECT * FROM cpu | true | false | false |", + "| success | flightsql | CommandStatementQuerySELECT COUNT(*) FROM system.queries WHERE running = false | true | false | false |", + "| success | flightsql | CommandStatementQuerySELECT usage, time FROM cpu | true | false | false |", + "+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+", + ], + &batches + ); + } +} diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index f3c7497ca8c..4822dbc1f4d 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -16,6 +16,7 @@ iox_http.workspace = true iox_query.workspace = true iox_query_params.workspace = true iox_query_influxql.workspace = true +iox_system_tables.workspace = true iox_time.workspace = true metric.workspace = true metric_exporters.workspace = true diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 0d11a3ee04d..65881bbdb0e 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -290,6 +290,7 @@ mod tests { Arc::clone(&metrics), Arc::new(HashMap::new()), 10, + 10, )); let server = ServerBuilder::new(common_state) @@ -449,6 +450,7 @@ mod tests { Arc::clone(&metrics), Arc::new(HashMap::new()), 10, + 10, ); let server = ServerBuilder::new(common_state) @@ -655,6 +657,7 @@ mod tests { Arc::clone(&metrics), Arc::new(HashMap::new()), 10, + 10, ); let server = ServerBuilder::new(common_state) diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index d27d193875d..bffb5ffbdbd 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -1,9 +1,12 @@ //! module for query executor use crate::{QueryExecutor, QueryKind}; -use arrow::array::{ArrayRef, Int64Builder, StringBuilder, StructArray}; +use arrow::array::{ + ArrayRef, BooleanArray, DurationNanosecondArray, Int64Array, Int64Builder, StringBuilder, + StructArray, TimestampNanosecondArray, +}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use arrow_schema::ArrowError; +use arrow_schema::{ArrowError, TimeUnit}; use async_trait::async_trait; use data_types::NamespaceId; use datafusion::catalog::schema::SchemaProvider; @@ -26,14 +29,15 @@ use influxdb3_write::{ use iox_query::exec::{Executor, IOxSessionContext, QueryConfig}; use iox_query::frontend::sql::SqlQueryPlanner; use iox_query::provider::ProviderBuilder; -use iox_query::query_log::QueryLog; -use iox_query::query_log::QueryText; use iox_query::query_log::StateReceived; use iox_query::query_log::{QueryCompletedToken, QueryLogEntries}; +use iox_query::query_log::{QueryLog, QueryLogEntryState}; +use iox_query::query_log::{QueryPhase, QueryText}; use iox_query::QueryDatabase; use iox_query::{QueryChunk, QueryNamespace}; use iox_query_influxql::frontend::planner::InfluxQLQueryPlanner; use iox_query_params::StatementParams; +use iox_system_tables::{IoxSystemTable, SystemTableProvider}; use metric::Registry; use observability_deps::tracing::{debug, info, trace}; use schema::Schema; @@ -66,6 +70,7 @@ impl QueryExecutorImpl { metrics: Arc, datafusion_config: Arc>, concurrent_query_limit: usize, + query_log_size: usize, ) -> Self { let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new( &metrics, @@ -73,10 +78,8 @@ impl QueryExecutorImpl { )); let query_execution_semaphore = Arc::new(semaphore_metrics.new_semaphore(concurrent_query_limit)); - // TODO Fine tune this number or make configurable - const QUERY_LOG_LIMIT: usize = 1_000; let query_log = Arc::new(QueryLog::new( - QUERY_LOG_LIMIT, + query_log_size, Arc::new(iox_time::SystemProvider::new()), )); Self { @@ -117,16 +120,15 @@ impl QueryExecutor for QueryExecutorImpl { // TODO - configure query here? let ctx = db.new_query_context(span_ctx, Default::default()); + let params = params.unwrap_or_default(); let token = db.record_query( external_span_ctx.as_ref().map(RequestLogContext::ctx), "sql", Box::new(q.to_string()), - // TODO - ignoring params for now: - StatementParams::default(), + params.clone(), ); info!("plan"); - let params = params.unwrap_or_default(); let plan = match kind { QueryKind::Sql => { let planner = SqlQueryPlanner::new(); @@ -137,7 +139,14 @@ impl QueryExecutor for QueryExecutorImpl { planner.query(q, params, &ctx).await } } - .map_err(Error::QueryPlanning)?; + .map_err(Error::QueryPlanning); + let plan = match plan { + Ok(plan) => plan, + Err(e) => { + token.fail(); + return Err(e); + } + }; let token = token.planned(&ctx, Arc::clone(&plan)); // TODO: Enforce concurrency limit here @@ -318,7 +327,7 @@ impl QueryDatabase for QueryExecutorImpl { } fn query_log(&self) -> QueryLogEntries { - todo!(); + self.query_log.entries() } } @@ -448,13 +457,14 @@ impl CatalogProvider for Database { fn schema_names(&self) -> Vec { info!("CatalogProvider schema_names"); - vec![DEFAULT_SCHEMA.to_string()] + vec![DEFAULT_SCHEMA.to_string(), SYSTEM_SCHEMA.to_string()] } fn schema(&self, name: &str) -> Option> { info!("CatalogProvider schema {}", name); match name { DEFAULT_SCHEMA => Some(Arc::new(Self::from_namespace(self))), + SYSTEM_SCHEMA => Some(Arc::clone(&self.system_schema_provider) as _), _ => None, } } @@ -554,7 +564,9 @@ impl TableProvider for QueryTable { } } -const _QUERIES_TABLE: &str = "queries"; +pub const SYSTEM_SCHEMA: &str = "system"; + +const QUERIES_TABLE: &str = "queries"; const _PARQUET_FILES_TABLE: &str = "parquet_files"; struct SystemSchemaProvider { @@ -573,17 +585,251 @@ impl std::fmt::Debug for SystemSchemaProvider { } impl SystemSchemaProvider { - fn new(_catalog: Arc, _query_log: Arc, include_debug_info: bool) -> Self { - let tables = HashMap::new(); + fn new(_catalog: Arc, query_log: Arc, include_debug_info: bool) -> Self { + let mut tables = HashMap::<&'static str, Arc>::new(); if include_debug_info { - // Using todo!() here causes gRPC integration tests to fail, likely because they - // enable debug mode by default, thus entering this if block. So, just leaving this - // here in lieu of todo!(). - // - // Eventually, we will implement the queries and parquet_files tables and they will - // be injected to the provider's table hashmap here... - info!("TODO - gather system tables"); + // TODO - remaining system tables gathered here... + let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new( + query_log, + )))); + tables.insert(QUERIES_TABLE, queries); } Self { tables } } } + +#[async_trait] +impl SchemaProvider for SystemSchemaProvider { + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } + + fn table_names(&self) -> Vec { + let mut names = self + .tables + .keys() + .map(|s| (*s).to_owned()) + .collect::>(); + names.sort(); + names + } + + async fn table(&self, name: &str) -> Result>, DataFusionError> { + Ok(self.tables.get(name).cloned()) + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} + +struct QueriesTable { + schema: SchemaRef, + query_log: Arc, +} + +impl QueriesTable { + fn new(query_log: Arc) -> Self { + Self { + schema: queries_schema(), + query_log, + } + } +} + +#[async_trait::async_trait] +impl IoxSystemTable for QueriesTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + async fn scan( + &self, + _filters: Option>, + _limit: Option, + ) -> Result { + let schema = self.schema(); + + let entries = self + .query_log + .entries() + .entries + .into_iter() + .map(|e| e.state()) + .collect::>(); + + from_query_log_entries(Arc::clone(&schema), &entries) + } +} + +fn queries_schema() -> SchemaRef { + let columns = vec![ + Field::new("id", DataType::Utf8, false), + Field::new("phase", DataType::Utf8, false), + Field::new( + "issue_time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("query_type", DataType::Utf8, false), + Field::new("query_text", DataType::Utf8, false), + Field::new("partitions", DataType::Int64, true), + Field::new("parquet_files", DataType::Int64, true), + Field::new( + "plan_duration", + DataType::Duration(TimeUnit::Nanosecond), + true, + ), + Field::new( + "permit_duration", + DataType::Duration(TimeUnit::Nanosecond), + true, + ), + Field::new( + "execute_duration", + DataType::Duration(TimeUnit::Nanosecond), + true, + ), + Field::new( + "end2end_duration", + DataType::Duration(TimeUnit::Nanosecond), + true, + ), + Field::new( + "compute_duration", + DataType::Duration(TimeUnit::Nanosecond), + true, + ), + Field::new("max_memory", DataType::Int64, true), + Field::new("success", DataType::Boolean, false), + Field::new("running", DataType::Boolean, false), + Field::new("cancelled", DataType::Boolean, false), + Field::new("trace_id", DataType::Utf8, true), + ]; + + Arc::new(DatafusionSchema::new(columns)) +} + +fn from_query_log_entries( + schema: SchemaRef, + entries: &[Arc], +) -> Result { + let mut columns: Vec = vec![]; + + columns.push(Arc::new( + entries + .iter() + .map(|e| Some(e.id.to_string())) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| Some(e.phase.name())) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| e.issue_time) + .map(|ts| Some(ts.timestamp_nanos())) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| Some(&e.query_type)) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| Some(e.query_text.to_string())) + .collect::(), + )); + + columns.push(Arc::new( + entries.iter().map(|e| e.partitions).collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| e.parquet_files) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| e.plan_duration.map(|d| d.as_nanos() as i64)) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| e.permit_duration.map(|d| d.as_nanos() as i64)) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| e.execute_duration.map(|d| d.as_nanos() as i64)) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| e.end2end_duration.map(|d| d.as_nanos() as i64)) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| e.compute_duration.map(|d| d.as_nanos() as i64)) + .collect::(), + )); + + columns.push(Arc::new( + entries.iter().map(|e| e.max_memory).collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| Some(e.success)) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| Some(e.running)) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| Some(e.phase == QueryPhase::Cancel)) + .collect::(), + )); + + columns.push(Arc::new( + entries + .iter() + .map(|e| e.trace_id.map(|x| format!("{:x}", x.0))) + .collect::(), + )); + + let batch = RecordBatch::try_new(schema, columns)?; + Ok(batch) +}