Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ impl Client {
operation_name,
start_time,
timeout,
self.options().tracing_max_document_length_bytes,
);
#[cfg(feature = "tracing-unstable")]
event_emitter.emit_started_event(self.inner.topology.latest().description.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/client/csfle/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::{
use crate::{
client::{csfle::options::KmsProvidersTlsOptions, options::ServerAddress, WeakClient},
error::{Error, Result},
operation::{run_command::RunCommand, RawOutput},
operation::{raw_output::RawOutput, run_command::RunCommand},
options::ReadConcern,
runtime::{process::Process, AsyncStream, TlsConfig},
Client,
Expand Down
20 changes: 12 additions & 8 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,13 @@ impl Client {
let start_time = Instant::now();
let command_result = match connection.send_message(message).await {
Ok(response) => {
let is_sharded =
connection.stream_description()?.initial_server_type == ServerType::Mongos;
self.parse_response(op, session, is_sharded, response).await
match self
.parse_response(op, session, connection.is_sharded()?, &response)
.await
{
Ok(()) => Ok(response),
Err(error) => Err(error.with_server_response(&response)),
}
}
Err(err) => Err(err),
};
Expand Down Expand Up @@ -612,15 +616,15 @@ impl Client {
effective_criteria: effective_criteria.clone(),
};

match op.handle_response(response, context).await {
match op.handle_response(&response, context).await {
Ok(response) => Ok(response),
Err(mut err) => {
err.add_labels_and_update_pin(
Some(connection.stream_description()?),
session,
Some(retryability),
);
Err(err)
Err(err.with_server_response(&response))
}
}
}
Expand Down Expand Up @@ -820,8 +824,8 @@ impl Client {
op: &T,
session: &mut Option<&mut ClientSession>,
is_sharded: bool,
response: RawCommandResponse,
) -> Result<RawCommandResponse> {
response: &RawCommandResponse,
) -> Result<()> {
let raw_doc = RawDocument::from_bytes(response.as_bytes())?;

let ok = match raw_doc.get("ok")? {
Expand Down Expand Up @@ -870,7 +874,7 @@ impl Client {
}
}

Ok(response)
Ok(())
} else {
Err(response
.body::<CommandErrorBody>()
Expand Down
18 changes: 15 additions & 3 deletions src/cmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ impl ConnectionPool {
address: ServerAddress,
connection_establisher: ConnectionEstablisher,
server_updater: TopologyUpdater,
topology_id: ObjectId,
options: Option<ConnectionPoolOptions>,
#[cfg(feature = "tracing-unstable")] topology_id: ObjectId,
) -> Self {
let event_handler = options
.as_ref()
.and_then(|opts| opts.cmap_event_handler.clone());

let event_emitter = CmapEventEmitter::new(event_handler, topology_id);
#[cfg(feature = "tracing-unstable")]
let event_emitter = CmapEventEmitter::new(
event_handler,
topology_id,
options
.as_ref()
.and_then(|options| options.max_document_length_bytes),
);
#[cfg(not(feature = "tracing-unstable"))]
let event_emitter = CmapEventEmitter::new(event_handler);

let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start(
address.clone(),
Expand Down Expand Up @@ -114,7 +123,10 @@ impl ConnectionPool {
manager,
connection_requester,
generation_subscriber,
event_emitter: CmapEventEmitter::new(None, ObjectId::new()),
#[cfg(feature = "tracing-unstable")]
event_emitter: CmapEventEmitter::new(None, ObjectId::new(), None),
#[cfg(not(feature = "tracing-unstable"))]
event_emitter: CmapEventEmitter::new(None),
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/cmap/conn/pooled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
ConnectionReadyEvent,
},
runtime::AsyncStream,
ServerType,
};

/// A wrapper around the [`Connection`] type that represents a connection within a connection pool.
Expand Down Expand Up @@ -205,6 +206,11 @@ impl PooledConnection {
Instant::now().duration_since(available_time) >= max_idle_time
}

/// Whether this connection is to a mongos.
pub(crate) fn is_sharded(&self) -> Result<bool> {
Ok(self.stream_description()?.initial_server_type == ServerType::Mongos)
}

/// Nullifies the internal state of this connection and returns it in a new [PooledConnection]
/// with the given state.
fn take(&mut self, new_state: PooledConnectionState) -> Self {
Expand Down
6 changes: 6 additions & 0 deletions src/cmap/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub(crate) struct ConnectionPoolOptions {
///
/// The default is 2.
pub(crate) max_connecting: Option<u32>,

/// The maximum length for documents in tracing events.
#[cfg(feature = "tracing-unstable")]
pub(crate) max_document_length_bytes: Option<usize>,
}

impl ConnectionPoolOptions {
Expand All @@ -86,6 +90,8 @@ impl ConnectionPoolOptions {
load_balanced: options.load_balanced,
credential: options.credential.clone(),
max_connecting: options.max_connecting,
#[cfg(feature = "tracing-unstable")]
max_document_length_bytes: options.tracing_max_document_length_bytes,
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/cmap/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ impl Executor {
ConnectionEstablisher::new(EstablisherOptions::from(get_client_options().await))
.unwrap(),
updater,
crate::bson::oid::ObjectId::new(),
Some(self.pool_options),
#[cfg(feature = "tracing-unstable")]
crate::bson::oid::ObjectId::new(),
);

// Mock a monitoring task responding to errors reported by the pool.
Expand Down
9 changes: 6 additions & 3 deletions src/cmap/test/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ async fn acquire_connection_and_send_command() {
client_options.hosts[0].clone(),
ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(),
TopologyUpdater::channel().0,
crate::bson::oid::ObjectId::new(),
Some(pool_options),
#[cfg(feature = "tracing-unstable")]
crate::bson::oid::ObjectId::new(),
);
let mut connection = pool.check_out().await.unwrap();

Expand Down Expand Up @@ -116,8 +117,9 @@ async fn concurrent_connections() {
get_client_options().await.hosts[0].clone(),
ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(),
TopologyUpdater::channel().0,
crate::bson::oid::ObjectId::new(),
Some(options),
#[cfg(feature = "tracing-unstable")]
crate::bson::oid::ObjectId::new(),
);

let tasks = (0..2).map(|_| {
Expand Down Expand Up @@ -191,8 +193,9 @@ async fn connection_error_during_establishment() {
client_options.hosts[0].clone(),
ConnectionEstablisher::new(EstablisherOptions::from(&client_options)).unwrap(),
TopologyUpdater::channel().0,
crate::bson::oid::ObjectId::new(),
Some(options),
#[cfg(feature = "tracing-unstable")]
crate::bson::oid::ObjectId::new(),
);

pool.check_out().await.expect_err("check out should fail");
Expand Down
41 changes: 38 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;

use crate::{
bson::{Bson, Document},
bson::{doc, Bson, Document},
cmap::RawCommandResponse,
options::ServerAddress,
sdam::{ServerType, TopologyVersion},
};
Expand Down Expand Up @@ -52,20 +53,32 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Debug, Error)]
#[cfg_attr(
test,
error("Kind: {kind}, labels: {labels:?}, source: {source:?}, backtrace: {bt}")
error(
"Kind: {kind}, labels: {labels:?}, source: {source:?}, backtrace: {bt}, server response: \
{server_response:?}"
)
)]
#[cfg_attr(
not(test),
error("Kind: {kind}, labels: {labels:?}, source: {source:?}")
error(
"Kind: {kind}, labels: {labels:?}, source: {source:?}, server response: \
{server_response:?}"
)
)]
#[non_exhaustive]
pub struct Error {
/// The type of error that occurred.
pub kind: Box<ErrorKind>,

labels: HashSet<String>,

pub(crate) wire_version: Option<i32>,

#[source]
pub(crate) source: Option<Box<Error>>,

pub(crate) server_response: Option<Box<Document>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd lean towards storing and exposing this as a RawDocumentBuf - this feels like a "lower level" API where the wire representation is more appropriate, and also it allows inspecting the bytes in case the server sends a response that doesn't fully parse.


#[cfg(test)]
bt: Arc<std::backtrace::Backtrace>,
}
Expand Down Expand Up @@ -99,6 +112,7 @@ impl Error {
labels,
wire_version: None,
source: None,
server_response: None,
#[cfg(test)]
bt: Arc::new(std::backtrace::Backtrace::capture()),
}
Expand Down Expand Up @@ -288,6 +302,23 @@ impl Error {
self.labels.insert(label);
}

/// The full response returned from the server. This can be used to inspect error fields that
/// are not represented in the `Error` type.
pub fn server_response(&self) -> Option<&Document> {
self.server_response.as_deref()
}

/// Adds the server's response to this error if it is not already present.
pub(crate) fn with_server_response(mut self, response: &RawCommandResponse) -> Self {
if self.server_response.is_none() {
self.server_response =
Some(Box::new(response.body::<Document>().unwrap_or_else(
|e| doc! { "serialization error": e.to_string() },
)));
}
self
}

#[cfg(feature = "dns-resolver")]
pub(crate) fn from_resolve_error(error: hickory_resolver::error::ResolveError) -> Self {
ErrorKind::DnsResolve {
Expand Down Expand Up @@ -496,6 +527,10 @@ impl Error {
source.redact();
}

if self.server_response.is_some() {
self.server_response = Some(Box::new(doc! { "redacted": true }));
}

// This is intentionally written without a catch-all branch so that if new error
// kinds are added we remember to reason about whether they need to be redacted.
match *self.kind {
Expand Down
10 changes: 6 additions & 4 deletions src/event/cmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,16 +427,18 @@ pub(crate) struct CmapEventEmitter {
}

impl CmapEventEmitter {
// the topology ID is only used when the tracing feature is on.
#[allow(unused_variables)]
pub(crate) fn new(
user_handler: Option<EventHandler<CmapEvent>>,
topology_id: ObjectId,
#[cfg(feature = "tracing-unstable")] topology_id: ObjectId,
#[cfg(feature = "tracing-unstable")] max_document_length_bytes: Option<usize>,
) -> CmapEventEmitter {
Self {
user_handler,
#[cfg(feature = "tracing-unstable")]
tracing_emitter: ConnectionTracingEventEmitter::new(topology_id),
tracing_emitter: ConnectionTracingEventEmitter::new(
topology_id,
max_document_length_bytes,
),
}
}

Expand Down
12 changes: 5 additions & 7 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) mod list_collections;
pub(crate) mod list_databases;
mod list_indexes;
#[cfg(feature = "in-use-encryption")]
mod raw_output;
pub(crate) mod raw_output;
pub(crate) mod run_command;
pub(crate) mod run_cursor_command;
mod search_index;
Expand Down Expand Up @@ -70,8 +70,6 @@ pub(crate) use find_and_modify::FindAndModify;
pub(crate) use get_more::GetMore;
pub(crate) use insert::Insert;
pub(crate) use list_indexes::ListIndexes;
#[cfg(feature = "in-use-encryption")]
pub(crate) use raw_output::RawOutput;
pub(crate) use search_index::{CreateSearchIndexes, DropSearchIndex, UpdateSearchIndex};
pub(crate) use update::{Update, UpdateOrReplace};

Expand Down Expand Up @@ -141,7 +139,7 @@ pub(crate) trait Operation {
/// Interprets the server response to the command.
fn handle_response<'a>(
&'a self,
response: RawCommandResponse,
response: &'a RawCommandResponse,
context: ExecutionContext<'a>,
) -> BoxFuture<'a, Result<Self::O>>;

Expand Down Expand Up @@ -205,7 +203,7 @@ pub(crate) trait OperationWithDefaults: Send + Sync {
/// Interprets the server response to the command.
fn handle_response<'a>(
&'a self,
_response: RawCommandResponse,
_response: &'a RawCommandResponse,
_context: ExecutionContext<'a>,
) -> Result<Self::O> {
Err(ErrorKind::Internal {
Expand All @@ -218,7 +216,7 @@ pub(crate) trait OperationWithDefaults: Send + Sync {
/// async code is required to handle the response.
fn handle_response_async<'a>(
&'a self,
response: RawCommandResponse,
response: &'a RawCommandResponse,
context: ExecutionContext<'a>,
) -> BoxFuture<'a, Result<Self::O>> {
async move { self.handle_response(response, context) }.boxed()
Expand Down Expand Up @@ -295,7 +293,7 @@ where
}
fn handle_response<'a>(
&'a self,
response: RawCommandResponse,
response: &'a RawCommandResponse,
context: ExecutionContext<'a>,
) -> BoxFuture<'a, Result<Self::O>> {
self.handle_response_async(response, context)
Expand Down
2 changes: 1 addition & 1 deletion src/operation/abort_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl OperationWithDefaults for AbortTransaction {

fn handle_response<'a>(
&'a self,
response: RawCommandResponse,
response: &RawCommandResponse,
_context: ExecutionContext<'a>,
) -> Result<Self::O> {
let response: WriteConcernOnlyBody = response.body()?;
Expand Down
2 changes: 1 addition & 1 deletion src/operation/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl OperationWithDefaults for Aggregate {

fn handle_response<'a>(
&'a self,
response: RawCommandResponse,
response: &RawCommandResponse,
context: ExecutionContext<'a>,
) -> Result<Self::O> {
let cursor_response: CursorBody = response.body()?;
Expand Down
2 changes: 1 addition & 1 deletion src/operation/aggregate/change_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl OperationWithDefaults for ChangeStreamAggregate {

fn handle_response<'a>(
&'a self,
response: RawCommandResponse,
response: &RawCommandResponse,
mut context: ExecutionContext<'a>,
) -> Result<Self::O> {
let op_time = response
Expand Down
2 changes: 1 addition & 1 deletion src/operation/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ where

fn handle_response_async<'b>(
&'b self,
response: RawCommandResponse,
response: &'b RawCommandResponse,
mut context: ExecutionContext<'b>,
) -> BoxFuture<'b, Result<Self::O>> {
async move {
Expand Down
Loading