Skip to content

Commit

Permalink
Check credentials in the health check
Browse files Browse the repository at this point in the history
  • Loading branch information
goakley authored and AndrooTheChen committed Sep 23, 2024
1 parent 908e9ff commit 918cd25
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions src/sinks/gcp/bigquery/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ impl GenerateConfig for BigqueryConfig {
}

/// Create a future that sends a single nothing-request to BigQuery
async fn healthcheck_future(uri: Uri, auth: GcpAuthenticator) -> crate::Result<()> {
async fn healthcheck_future(
uri: Uri,
auth: GcpAuthenticator,
write_stream: String,
) -> crate::Result<()> {
let channel = Channel::builder(uri)
.tls_config(tonic::transport::channel::ClientTlsConfig::new())
.unwrap()
Expand All @@ -116,10 +120,15 @@ async fn healthcheck_future(uri: Uri, auth: GcpAuthenticator) -> crate::Result<(
channel,
AuthInterceptor { auth },
);
let stream = tokio_stream::once(proto::AppendRowsRequest::default());
// specify the write_stream so that there's enough information to perform an IAM check
let stream = tokio_stream::once(proto::AppendRowsRequest {
write_stream,
..proto::AppendRowsRequest::default()
});
let mut response = client.append_rows(stream).await?;
// the result is expected to be `InvalidArgument`
// because we use a bunch of empty values in the request
// (and `InvalidArgument` specifically means we made it past the auth check)
if let Err(status) = response.get_mut().message().await {
if status.code() != tonic::Code::InvalidArgument {
return Err(status.into());
Expand All @@ -141,7 +150,12 @@ impl SinkConfig for BigqueryConfig {

// Kick off the healthcheck
let healthcheck: Healthcheck = if cx.healthcheck.enabled {
healthcheck_future(self.endpoint.parse()?, auth.clone()).boxed()
healthcheck_future(
self.endpoint.parse()?,
auth.clone(),
self.get_write_stream(),
)
.boxed()
} else {
Box::pin(async move { Ok(()) })
};
Expand Down

0 comments on commit 918cd25

Please sign in to comment.