Skip to content

Commit

Permalink
chore: use latest influxdb3_core changes (influxdata#24982)
Browse files Browse the repository at this point in the history
Introduction of the `TokioDatafusionConfig` clap block for configuring the DataFusion runtime - this exposes many new `--datafusion-*` options on start, including `--datafusion-num-threads`

To accommodate renaming of `QueryNamespaceProvider` to `QueryDatabase` in `influxdb3_core`, I renamed the `QueryDatabase` type to `Database`.

Fixed tests that broke as a result of sync.
  • Loading branch information
hiltontj authored May 13, 2024
1 parent 6f3d6b1 commit 8f72bf0
Show file tree
Hide file tree
Showing 11 changed files with 938 additions and 1,114 deletions.
1,735 changes: 749 additions & 986 deletions Cargo.lock

Large diffs are not rendered by default.

105 changes: 53 additions & 52 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ license = "MIT OR Apache-2.0"

[workspace.dependencies]
anyhow = "1.0"
arrow = { version = "50.0.0", features = ["prettyprint", "chrono-tz"] }
arrow-array = "50.0.0"
arrow-buffer = "50.0.0"
arrow-csv = "50.0.0"
arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
arrow-json = "50.0.0"
arrow-schema = "50.0.0"
arrow = { version = "51.0.0", features = ["prettyprint", "chrono-tz"] }
arrow-array = "51.0.0"
arrow-buffer = "51.0.0"
arrow-csv = "51.0.0"
arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
arrow-json = "51.0.0"
arrow-schema = "51.0.0"
assert_cmd = "2.0.14"
async-trait = "0.1"
backtrace = "0.3"
Expand All @@ -49,8 +49,8 @@ chrono = "0.4"
clap = { version = "4", features = ["derive", "env", "string"] }
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "581e74785b876615d6a63db8c2e5ba372bf78828" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "581e74785b876615d6a63db8c2e5ba372bf78828" }
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "e0245296792eabdc35e83e8c5872345ff38c1fdf" }
datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "e0245296792eabdc35e83e8c5872345ff38c1fdf" }
csv = "1.3.0"
dotenvy = "0.15.7"
flate2 = "1.0.27"
Expand All @@ -67,7 +67,7 @@ num_cpus = "1.16.0"
object_store = "0.9.1"
once_cell = { version = "1.18", features = ["parking_lot"] }
parking_lot = "0.12.1"
parquet = { version = "50.0.0", features = ["object_store"] }
parquet = { version = "51.0.0", features = ["object_store"] }
pbjson = "0.6.0"
pbjson-build = "0.6.2"
pbjson-types = "0.6.0"
Expand All @@ -80,7 +80,6 @@ rand = "0.8.5"
reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls", "stream"] }
secrecy = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
serde_arrow = { version = "0.10", features = ["arrow-50"] }
serde_json = "1.0"
serde_urlencoded = "0.7.0"
sha2 = "0.10.8"
Expand All @@ -90,48 +89,49 @@ sysinfo = "0.30.8"
thiserror = "1.0"
tokio = { version = "1.35", features = ["full"] }
tokio-util = "0.7.9"
tonic = { version = "0.10.2", features = ["tls", "tls-roots"] }
tonic-build = "0.10.2"
tonic-health = "0.10.2"
tonic-reflection = "0.10.2"
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tonic-build = "0.11.0"
tonic-health = "0.11.0"
tonic-reflection = "0.11.0"
tower = "0.4.13"
unicode-segmentation = "1.11.0"
url = "2.5.0"
urlencoding = "1.1"
uuid = { version = "1", features = ["v4"] }

# Core.git crates we depend on
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "b546e7f86ee9adbff0dd3c5e687140848397604a", default-features = true, features = ["clap"] }
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e"}
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e", features = ["http"] }
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_system_tables = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e" }
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "ab23285fe62f29a60652011b0bee9cc6ee3a899e", default-features = true, features = ["clap"] }

[workspace.lints.rust]
rust_2018_idioms = "deny"
Expand Down Expand Up @@ -175,9 +175,10 @@ opt-level = 3
# patch arrow-flight crate to allow for prepared statement parameters
# see related arrow-rs PR https://github.com/apache/arrow-rs/pull/5433
[patch.crates-io]
arrow-array = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-schema = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-data = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-buffer = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-ipc = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-flight = { git = "https://github.com/erratic-pattern/arrow-rs.git", branch = "50.0.0" }
arrow-array = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-schema = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-data = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-buffer = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-ipc = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
arrow-flight = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
parquet = { git = "https://github.com/influxdata/arrow-rs.git", rev = "ea454d74707357731535d4bf20e9508e838f5f5d" }
62 changes: 41 additions & 21 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use clap_blocks::{
memory_size::MemorySize,
object_store::{make_object_store, ObjectStoreConfig},
socket_addr::SocketAddr,
tokio::TokioDatafusionConfig,
};
use datafusion_util::config::register_iox_object_store;
use influxdb3_process::{
Expand All @@ -17,7 +18,7 @@ use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::SegmentDuration;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::SystemProvider;
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
Expand Down Expand Up @@ -50,6 +51,9 @@ pub enum Error {
#[error("Tracing config error: {0}")]
TracingConfig(#[from] trace_exporters::Error),

#[error("Error initializing tokio runtime: {0}")]
TokioRuntime(#[source] std::io::Error),

#[error("Server error: {0}")]
Server(#[from] influxdb3_server::Error),

Expand All @@ -67,6 +71,22 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, clap::Parser)]
pub struct Config {
/// object store options
#[clap(flatten)]
object_store_config: ObjectStoreConfig,

/// logging options
#[clap(flatten)]
pub(crate) logging_config: LoggingConfig,

/// tracing options
#[clap(flatten)]
pub(crate) tracing_config: TracingConfig,

/// tokio datafusion config
#[clap(flatten)]
pub(crate) tokio_datafusion_config: TokioDatafusionConfig,

/// Maximum size of HTTP requests.
#[clap(
long = "max-http-request-size",
Expand All @@ -76,9 +96,6 @@ pub struct Config {
)]
pub max_http_request_size: usize,

#[clap(flatten)]
object_store_config: ObjectStoreConfig,

/// The directory to store the write ahead log
///
/// If not specified, defaults to INFLUXDB3_DB_DIR/wal
Expand Down Expand Up @@ -116,14 +133,6 @@ pub struct Config {
)]
pub exec_mem_pool_bytes: MemorySize,

/// logging options
#[clap(flatten)]
pub(crate) logging_config: LoggingConfig,

/// tracing options
#[clap(flatten)]
pub(crate) tracing_config: TracingConfig,

/// DataFusion config.
#[clap(
long = "datafusion-config",
Expand Down Expand Up @@ -194,25 +203,36 @@ pub async fn command(config: Config) -> Result<()> {

let trace_exporter = config.tracing_config.build()?;

// TODO: make this a parameter
let num_threads =
NonZeroUsize::new(num_cpus::get()).unwrap_or_else(|| NonZeroUsize::new(1).unwrap());

info!(%num_threads, "Creating shared query executor");
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let exec = Arc::new(Executor::new_with_config(
"datafusion",

let mut tokio_datafusion_config = config.tokio_datafusion_config;
tokio_datafusion_config.num_threads = tokio_datafusion_config
.num_threads
.or_else(|| NonZeroUsize::new(num_cpus::get()))
.or_else(|| NonZeroUsize::new(1));
info!(
num_threads = tokio_datafusion_config.num_threads.map(|n| n.get()),
"Creating shared query executor"
);

let exec = Arc::new(Executor::new_with_config_and_executor(
ExecutorConfig {
num_threads,
target_query_partitions: num_threads,
target_query_partitions: tokio_datafusion_config.num_threads.unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: config.exec_mem_pool_bytes.bytes(),
},
DedicatedExecutor::new(
"datafusion",
tokio_datafusion_config
.builder()
.map_err(Error::TokioRuntime)?,
Arc::clone(&metrics),
),
));
let runtime_env = exec.new_context().inner().runtime_env();
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
Expand Down
1 change: 0 additions & 1 deletion influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ parking_lot.workspace = true
pin-project-lite.workspace = true
secrecy.workspace = true
serde.workspace = true
serde_arrow.workspace = true
serde_json.workspace = true
serde_urlencoded.workspace = true
sha2.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use authz::Authorizer;
use iox_query::QueryNamespaceProvider;
use iox_query::QueryDatabase;

pub(crate) fn make_flight_server<Q: QueryNamespaceProvider>(
pub(crate) fn make_flight_server<Q: QueryDatabase>(
server: Arc<Q>,
authz: Option<Arc<dyn Authorizer>>,
) -> FlightServer<impl Flight> {
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,8 @@ async fn record_batch_stream_to_body(
) -> Result<Body, Error> {
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
let batches: Vec<&RecordBatch> = batches.iter().collect();
// See https://github.com/influxdata/influxdb/issues/24981
#[allow(deprecated)]
Ok(Bytes::from(serde_json::to_string(
&arrow_json::writer::record_batches_to_json_rows(batches.as_slice())?,
)?))
Expand Down
4 changes: 4 additions & 0 deletions influxdb3_server/src/http/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use arrow::{
compute::{cast_with_options, CastOptions},
record_batch::RecordBatch,
};
// Note: see https://github.com/influxdata/influxdb/issues/24981
#[allow(deprecated)]
use arrow_json::writer::record_batches_to_json_rows;

use arrow_schema::DataType;
Expand Down Expand Up @@ -335,6 +337,8 @@ impl QueryResponseStream {
}))
.context("failed to cast batch time column with `epoch` parameter specified")?;
}
// See https://github.com/influxdata/influxdb/issues/24981
#[allow(deprecated)]
let json_rows = record_batches_to_json_rows(&[&batch])
.context("failed to convert RecordBatch to JSON rows")?;
for json_row in json_rows {
Expand Down
Loading

0 comments on commit 8f72bf0

Please sign in to comment.