Skip to content

Commit

Permalink
feat: unified logging interface with tracing-subscribers (paradedb#1527)
Browse files Browse the repository at this point in the history
  • Loading branch information
neilyio authored Aug 14, 2024
1 parent 16a2d2a commit bb9938b
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 15 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pg_search/src/index/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tantivy::{query::QueryParser, Executor, Index, Searcher};
use tantivy::{schema::Value, IndexReader, IndexWriter, TantivyDocument, TantivyError};
use thiserror::Error;
use tokenizers::{create_normalizer_manager, create_tokenizer_manager};
use tracing::{error, info};
use tracing::trace;

use super::state::SearchState;
use crate::schema::{
Expand Down Expand Up @@ -109,7 +109,7 @@ impl SearchIndex {
.filter_map(|field| {
let field_config = &field.config;
let field_name: &str = field.name.as_ref();
info!(field_name, "attempting to create tokenizer");
trace!(field_name, "attempting to create tokenizer");
match field_config {
SearchFieldConfig::Text { tokenizer, .. }
| SearchFieldConfig::Json { tokenizer, .. } => Some(tokenizer),
Expand Down
20 changes: 18 additions & 2 deletions pg_search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use shared::gucs::PostgresGlobalGucSettings;
use shared::telemetry::setup_telemetry_background_worker;
use std::process;
use std::time::Duration;
use tracing::debug;

// A static variable is required to host grand unified configuration settings.
pub static GUCS: PostgresGlobalGucSettings = PostgresGlobalGucSettings::new();
Expand All @@ -43,6 +44,8 @@ pgrx::pg_module_magic!();

extension_sql!("GRANT ALL ON SCHEMA paradedb TO PUBLIC;" name = "paradedb_grant_all");

static mut TRACE_HOOK: shared::trace::TraceHook = shared::trace::TraceHook;

/// Initializes option parsing and telemetry
#[allow(clippy::missing_safety_doc)]
#[allow(non_snake_case)]
Expand All @@ -59,6 +62,11 @@ pub unsafe extern "C" fn _PG_init() {
setup_background_workers();

setup_telemetry_background_worker(shared::telemetry::ParadeExtension::PgSearch);

// Register our tracing / logging hook, so that we can ensure that the logger
// is initialized for all connections.
#[allow(static_mut_refs)]
pgrx::hooks::register_hook(&mut TRACE_HOOK);
}

#[pg_guard]
Expand Down Expand Up @@ -98,7 +106,11 @@ pub fn setup_background_workers() {
#[pg_guard]
#[no_mangle]
pub extern "C" fn pg_search_insert_worker(_arg: pg_sys::Datum) {
pgrx::log!("starting pg_search insert worker at PID {}", process::id());
// This function runs in the spawned background worker process. That means
// that we need to re-initialize logging.
shared::trace::init_ereport_logger("pg_search");

debug!("starting pg_search insert worker at PID {}", process::id());
let writer = writer::Writer::new();
let mut server = writer::Server::new(writer).expect("error starting writer server");

Expand Down Expand Up @@ -127,7 +139,11 @@ pub extern "C" fn pg_search_insert_worker(_arg: pg_sys::Datum) {
#[pg_guard]
#[no_mangle]
pub extern "C" fn pg_search_shutdown_worker(_arg: pg_sys::Datum) {
pgrx::log!(
// This function runs in the spawned background worker process. That means
// that we need to re-initialize logging.
shared::trace::init_ereport_logger("pg_search");

debug!(
"starting pg_search shutdown worker at PID {}",
process::id()
);
Expand Down
4 changes: 2 additions & 2 deletions pg_search/src/writer/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::marker::PhantomData;
use std::path::Path;
use std::{cell::RefCell, io::Cursor};
use thiserror::Error;
use tracing::{error, info};
use tracing::{debug, error};

/// A generic server for receiving requests and transfers from a client.
pub struct Server<'a, T, H>
Expand Down Expand Up @@ -93,7 +93,7 @@ where
}

fn listen_request(&mut self) -> Result<(), ServerError> {
info!("listening to incoming requests at {:?}", self.addr);
debug!(address = %self.addr, "listening to incoming requests");
for mut incoming in self.http.incoming_requests() {
let reader = incoming.as_reader();
let request: Result<ServerRequest<T>, ServerError> = bincode::deserialize_from(reader)
Expand Down
2 changes: 2 additions & 0 deletions shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ humansize = "2.1.3"
anyhow = "1.0.83"
datafusion = "38.0.0"
tempfile = "3.10.1"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
libc = "0.2.155"

[dev-dependencies]
mockall = "0.12.1"
Expand Down
4 changes: 4 additions & 0 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod github;
pub mod gucs;
pub mod postgres;
pub mod telemetry;
pub mod trace;

#[cfg(feature = "fixtures")]
pub mod fixtures;
Expand All @@ -27,3 +28,6 @@ pub mod fixtures;
// This lets consumers of the macros use them without needing to also install these dependencies.
pub use pgrx;
pub use serde_json;
pub use trace::init_ereport_logger;
pub use tracing;
pub use tracing_subscriber;
15 changes: 10 additions & 5 deletions shared/src/telemetry/bgworker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::path::PathBuf;
use std::process;
use std::sync::Mutex;
use std::time::Duration;
use tracing::debug;

/// Enumerating our extensions. It's important that these can be enumerated as integers
/// so that this enum can be passed as an i32 datum to a background worker.
Expand Down Expand Up @@ -100,6 +101,10 @@ pub fn setup_telemetry_background_worker(extension: ParadeExtension) {
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn telemetry_worker(extension_name_datum: pg_sys::Datum) {
// This function runs in the spawned background worker process. That means
// that we need to re-initialize logging.
crate::trace::init_ereport_logger("pg_search");

let extension_i32 = unsafe { i32::from_datum(extension_name_datum, false) }
.expect("extension enum i32 not passed to bgworker");
let extension = ParadeExtension::from_i32(extension_i32)
Expand All @@ -108,11 +113,11 @@ pub unsafe extern "C" fn telemetry_worker(extension_name_datum: pg_sys::Datum) {

// If telemetry is not enabled at compile time, return early.
if option_env!("PARADEDB_TELEMETRY") != Some("true") {
pgrx::log!("PARADEDB_TELEMETRY var not set at compile time for {extension_name}");
debug!("PARADEDB_TELEMETRY var not set at compile time for {extension_name}");
return;
}

pgrx::log!(
debug!(
"starting {extension_name} telemetry worker at PID {}",
process::id()
);
Expand Down Expand Up @@ -146,9 +151,9 @@ pub unsafe extern "C" fn telemetry_worker(extension_name_datum: pg_sys::Datum) {
term_poll: Box::new(sigterm_handler),
};

pgrx::log!("starting {extension_name} telemetry event loop");
debug!("starting {extension_name} telemetry event loop");
controller.run().expect("error in telemetry server");
pgrx::log!("exiting {extension_name} telemetry event loop");
debug!("exiting {extension_name} telemetry event loop");
}

// The bgworker must only connect once to SPI, or it will segfault. We'll
Expand Down Expand Up @@ -210,7 +215,7 @@ impl BgWorkerTelemetryConfig {
Ok(Some("on")) => Ok(true),
Err(err) => Err(anyhow!("error checking telemetry guc setting: {err}")),
other => {
pgrx::log!("{guc_setting_query} = {other:?}");
debug!("{guc_setting_query} = {other:?}");
Ok(false)
}
})
Expand Down
5 changes: 3 additions & 2 deletions shared/src/telemetry/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
thread,
time::{Duration, Instant},
};
use tracing::debug;

use super::{
event::TelemetryEvent, DirectoryStore, TelemetryConfigStore, TelemetryStore, TermPoll,
Expand All @@ -40,7 +41,7 @@ impl TelemetrySender {
if self.config_store.telemetry_enabled()? {
conn.send(uuid, event)
} else {
pgrx::log!(
debug!(
"paradedb telemetry is disabled, not sending event: {}",
event.name()
);
Expand All @@ -49,7 +50,7 @@ impl TelemetrySender {
}
pub fn send_deployment(&self) -> Result<()> {
if self.directory_store.extension_uuid_path()?.exists() {
pgrx::log!("extension has been deployed before, skipping deployment telemetry");
debug!("extension has been deployed before, skipping deployment telemetry");
return Ok(());
}
let uuid = self.directory_store.extension_uuid()?;
Expand Down
Loading

0 comments on commit bb9938b

Please sign in to comment.