From b7a2eb1b3c715e1b4f135cd9e727cefa99f9f230 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 20 Jun 2025 02:27:03 -0400 Subject: [PATCH 1/6] refactor: make token a watch channel --- examples/oauth.rs | 2 +- src/perms/mod.rs | 2 +- src/perms/oauth.rs | 197 +++++++++++++++++++++++++++++++++++------- src/perms/tx_cache.rs | 9 +- 4 files changed, 173 insertions(+), 37 deletions(-) diff --git a/examples/oauth.rs b/examples/oauth.rs index 1320190..f538051 100644 --- a/examples/oauth.rs +++ b/examples/oauth.rs @@ -9,7 +9,7 @@ async fn main() -> eyre::Result<()> { let _jh = authenticator.spawn(); tokio::time::sleep(std::time::Duration::from_secs(5)).await; - dbg!(token.read()); + dbg!(token.secret().await.unwrap()); Ok(()) } diff --git a/src/perms/mod.rs b/src/perms/mod.rs index fb9193b..f50f2a2 100644 --- a/src/perms/mod.rs +++ b/src/perms/mod.rs @@ -5,7 +5,7 @@ pub(crate) mod config; pub use config::{SlotAuthzConfig, SlotAuthzConfigEnvError}; pub(crate) mod oauth; -pub use oauth::{Authenticator, OAuthConfig, SharedToken}; +pub use oauth::{Authenticator, OAuthConfig, OldSharedToken}; /// Contains [`BuilderTxCache`] client and related types for interacting with /// the transaction cache. diff --git a/src/perms/oauth.rs b/src/perms/oauth.rs index c0e9002..5ca2015 100644 --- a/src/perms/oauth.rs +++ b/src/perms/oauth.rs @@ -4,13 +4,18 @@ use crate::{ deps::tracing::{error, info}, utils::from_env::FromEnv, }; +use core::fmt; use oauth2::{ basic::{BasicClient, BasicTokenType}, - AuthUrl, ClientId, ClientSecret, EmptyExtraTokenFields, EndpointNotSet, EndpointSet, - HttpClientError, RequestTokenError, StandardErrorResponse, StandardTokenResponse, TokenUrl, + AccessToken, AuthUrl, ClientId, ClientSecret, EmptyExtraTokenFields, EndpointNotSet, + EndpointSet, HttpClientError, RefreshToken, RequestTokenError, Scope, StandardErrorResponse, + StandardTokenResponse, TokenResponse, TokenUrl, }; use std::sync::{Arc, Mutex}; -use tokio::task::JoinHandle; +use tokio::{ + sync::watch::{self, Ref}, + task::JoinHandle, +}; type Token = StandardTokenResponse; @@ -59,9 +64,9 @@ impl OAuthConfig { /// A shared token that can be read and written to by multiple threads. #[derive(Debug, Clone, Default)] -pub struct SharedToken(Arc>>); +pub struct OldSharedToken(Arc>>); -impl SharedToken { +impl OldSharedToken { /// Read the token from the shared token. pub fn read(&self) -> Option { self.0.lock().unwrap().clone() @@ -87,8 +92,9 @@ pub struct Authenticator { /// Configuration pub config: OAuthConfig, client: MyOAuthClient, - token: SharedToken, reqwest: reqwest::Client, + + token: watch::Sender>, } impl Authenticator { @@ -107,8 +113,8 @@ impl Authenticator { Self { config: config.clone(), client, - token: Default::default(), reqwest: rq_client, + token: watch::channel(None).0, } } @@ -129,20 +135,20 @@ impl Authenticator { /// Returns true if there is Some token set pub fn is_authenticated(&self) -> bool { - self.token.is_authenticated() + self.token.borrow().is_some() } /// Sets the Authenticator's token to the provided value fn set_token(&self, token: StandardTokenResponse) { - self.token.write(token); + self.token.send_replace(Some(token)); } /// Returns the currently set token pub fn token(&self) -> SharedToken { - self.token.clone() + self.token.subscribe().into() } - /// Fetches an oauth token + /// Fetches an oauth token. pub async fn fetch_oauth_token( &self, ) -> Result< @@ -161,25 +167,158 @@ impl Authenticator { Ok(token_result) } - /// Spawns a task that periodically fetches a new token every 300 seconds. - pub fn spawn(self) -> JoinHandle<()> { + /// Create a future that contains the periodic refresh loop. + async fn task_future(self) { let interval = self.config.oauth_token_refresh_interval; - let handle: JoinHandle<()> = tokio::spawn(async move { - loop { - info!("Refreshing oauth token"); - match self.authenticate().await { - Ok(_) => { - info!("Successfully refreshed oauth token"); - } - Err(e) => { - error!(%e, "Failed to refresh oauth token"); - } - }; - let _sleep = tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await; - } - }); - - handle + loop { + info!("Refreshing oauth token"); + match self.authenticate().await { + Ok(_) => { + info!("Successfully refreshed oauth token"); + } + Err(e) => { + error!(%e, "Failed to refresh oauth token"); + } + }; + let _sleep = tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await; + } + } + + /// Spawns a task that periodically fetches a new token. The refresh + /// interval may be configured via the + /// [`OAuthConfig::oauth_token_refresh_interval`] property. + pub fn spawn(self) -> JoinHandle<()> { + tokio::spawn(self.task_future()) + } +} + +/// A shared token, wrapped in a [`tokio::sync::watch`] Receiver. The token is +/// periodically refreshed by an [`Authenticator`] task, and can be awaited +/// for when it becomes available. +/// +/// This allows multiple tasks to wait for the token to be available, and +/// provides a way to check if the token is authenticated without blocking. +/// Please consult the [`Receiver`] documentation for caveats regarding +/// usage. +/// +/// [`Receiver`]: tokio::sync::watch::Receiver +#[derive(Debug, Clone)] +pub struct SharedToken(watch::Receiver>); + +impl From>> for SharedToken { + fn from(inner: watch::Receiver>) -> Self { + Self(inner) + } +} + +impl SharedToken { + /// Wait for the token to be available, and get a reference to the secret. + /// + /// This is implemented using [`Receiver::wait_for`], and has the same + /// blocking, panics, errors, and cancel safety. However, it uses a clone + /// of the [`watch::Receiver`] and will not update the local view of the + /// channel. + /// + /// [`Receiver::wait_for`]: tokio::sync::watch::Receiver::wait_for + pub async fn secret(&self) -> Result { + Ok(self + .clone() + .token() + .await? + .access_token() + .secret() + .to_owned()) + } + + /// Wait for the token to be available, then get a reference to it. + /// + /// This is implemented using [`Receiver::wait_for`], and has the same + /// blocking, panics, errors, and cancel safety. Unlike [`Self::secret`] + /// it is NOT implemented using a clone, and will update the local view of + /// the channel. + /// + /// Generally, prefer using [`Self::secret`] for simple use cases, and + /// this when deeper inspection of the token is required. + /// + /// [`Receiver::wait_for`]: tokio::sync::watch::Receiver::wait_for + pub async fn token(&mut self) -> Result, watch::error::RecvError> { + self.0.wait_for(Option::is_some).await.map(Into::into) + } + + /// Create a future that will resolve when the token is ready. + /// + /// This is implemented using [`Receiver::wait_for`], and has the same + /// blocking, panics, errors, and cancel safety. + /// + /// [`Receiver::wait_for`]: tokio::sync::watch::Receiver::wait_for + pub async fn wait(&self) -> Result<(), watch::error::RecvError> { + self.clone().0.wait_for(Option::is_some).await.map(drop) + } + + /// Borrow the current token, if available. If called before the token is + /// set by the authentication task, this will return `None`. + /// + /// This is implemented using [`Receiver::borrow`]. + /// + /// [`Receiver::borrow`]: tokio::sync::watch::Receiver::borrow + pub fn borrow(&mut self) -> Ref<'_, Option> { + self.0.borrow() + } + + /// Check if the background task has produced an authentication token. + /// + /// This is implemented using [`Receiver::borrow`], and checks if the + /// borrowed token is `Some`. + /// + /// [`Receiver::borrow`]: tokio::sync::watch::Receiver::borrow + pub fn is_authenticated(&self) -> bool { + self.0.borrow().is_some() + } +} + +/// A reference to token data, contained in a [`SharedToken`]. +/// +/// This is implemented using [`watch::Ref`], and as a result holds a lock on +/// the token data. It is recommended that this be dropped +pub struct TokenRef<'a> { + inner: Ref<'a, Option>, +} + +impl<'a> From>> for TokenRef<'a> { + fn from(inner: Ref<'a, Option>) -> Self { + Self { inner } + } +} + +impl fmt::Debug for TokenRef<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TokenRef").finish_non_exhaustive() + } +} + +impl<'a> TokenRef<'a> { + pub fn inner(&'a self) -> &'a Token { + self.inner.as_ref().unwrap() + } + + pub fn access_token(&self) -> &AccessToken { + self.inner().access_token() + } + + pub fn token_type(&self) -> &::TokenType { + self.inner().token_type() + } + + pub fn expires_in(&self) -> Option { + self.inner().expires_in() + } + + pub fn refresh_token(&self) -> Option<&RefreshToken> { + self.inner().refresh_token() + } + + pub fn scopes(&self) -> Option<&Vec> { + self.inner().scopes() } } diff --git a/src/perms/tx_cache.rs b/src/perms/tx_cache.rs index 25113bc..b575176 100644 --- a/src/perms/tx_cache.rs +++ b/src/perms/tx_cache.rs @@ -1,6 +1,5 @@ use crate::perms::oauth::SharedToken; -use eyre::{bail, Result}; -use oauth2::TokenResponse; +use eyre::Result; use serde::de::DeserializeOwned; use signet_tx_cache::{ client::TxCache, @@ -53,14 +52,12 @@ impl BuilderTxCache { async fn get_inner_with_token(&self, join: &str) -> Result { let url = self.tx_cache.url().join(join)?; - let Some(token) = self.token.read() else { - bail!("No token available for authentication"); - }; + let secret = self.token.secret().await?; self.tx_cache .client() .get(url) - .bearer_auth(token.access_token().secret()) + .bearer_auth(secret) .send() .await .inspect_err(|e| warn!(%e, "Failed to get object from transaction cache"))? From 0def8bbec23853d1ca7a1410a26d85c3084484cd Mon Sep 17 00:00:00 2001 From: James Date: Fri, 20 Jun 2025 08:50:45 -0400 Subject: [PATCH 2/6] fix: docs --- src/perms/mod.rs | 2 +- src/perms/oauth.rs | 23 ----------------------- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/src/perms/mod.rs b/src/perms/mod.rs index f50f2a2..fb9193b 100644 --- a/src/perms/mod.rs +++ b/src/perms/mod.rs @@ -5,7 +5,7 @@ pub(crate) mod config; pub use config::{SlotAuthzConfig, SlotAuthzConfigEnvError}; pub(crate) mod oauth; -pub use oauth::{Authenticator, OAuthConfig, OldSharedToken}; +pub use oauth::{Authenticator, OAuthConfig, SharedToken}; /// Contains [`BuilderTxCache`] client and related types for interacting with /// the transaction cache. diff --git a/src/perms/oauth.rs b/src/perms/oauth.rs index 5ca2015..2d1f713 100644 --- a/src/perms/oauth.rs +++ b/src/perms/oauth.rs @@ -11,7 +11,6 @@ use oauth2::{ EndpointSet, HttpClientError, RefreshToken, RequestTokenError, Scope, StandardErrorResponse, StandardTokenResponse, TokenResponse, TokenUrl, }; -use std::sync::{Arc, Mutex}; use tokio::{ sync::watch::{self, Ref}, task::JoinHandle, @@ -62,28 +61,6 @@ impl OAuthConfig { } } -/// A shared token that can be read and written to by multiple threads. -#[derive(Debug, Clone, Default)] -pub struct OldSharedToken(Arc>>); - -impl OldSharedToken { - /// Read the token from the shared token. - pub fn read(&self) -> Option { - self.0.lock().unwrap().clone() - } - - /// Write a new token to the shared token. - pub fn write(&self, token: Token) { - let mut lock = self.0.lock().unwrap(); - *lock = Some(token); - } - - /// Check if the token is authenticated. - pub fn is_authenticated(&self) -> bool { - self.0.lock().unwrap().is_some() - } -} - /// A self-refreshing, periodically fetching authenticator for the block /// builder. This task periodically fetches a new token, and stores it in a /// [`SharedToken`]. From 7b5900cae9269e4fca57dff6aa0eecf19748c10c Mon Sep 17 00:00:00 2001 From: James Date: Fri, 20 Jun 2025 11:25:39 -0400 Subject: [PATCH 3/6] nit: drive-by add security note --- src/perms/oauth.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/perms/oauth.rs b/src/perms/oauth.rs index 2d1f713..31de951 100644 --- a/src/perms/oauth.rs +++ b/src/perms/oauth.rs @@ -82,6 +82,8 @@ impl Authenticator { .set_auth_uri(AuthUrl::from_url(config.oauth_authenticate_url.clone())) .set_token_uri(TokenUrl::from_url(config.oauth_token_url.clone())); + // NB: this is MANDATORY + // https://docs.rs/oauth2/latest/oauth2/#security-warning let rq_client = reqwest::Client::builder() .redirect(reqwest::redirect::Policy::none()) .build() From 270f8b4b635702fcf4db3548c440656006caa7ef Mon Sep 17 00:00:00 2001 From: James Date: Fri, 27 Jun 2025 09:38:43 -0400 Subject: [PATCH 4/6] chore: bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ba5111f..204d9b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ name = "init4-bin-base" description = "Internal utilities for binaries produced by the init4 team" keywords = ["init4", "bin", "base"] -version = "0.4.3" +version = "0.5.0" edition = "2021" rust-version = "1.81" authors = ["init4", "James Prestwich"] From d0077f3b89b45c106be0c4a2fe89de8184104148 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 1 Jul 2025 14:21:16 -0400 Subject: [PATCH 5/6] chore: misc cleanup and documentation --- src/perms/oauth.rs | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/src/perms/oauth.rs b/src/perms/oauth.rs index 31de951..55652e1 100644 --- a/src/perms/oauth.rs +++ b/src/perms/oauth.rs @@ -62,12 +62,12 @@ impl OAuthConfig { } /// A self-refreshing, periodically fetching authenticator for the block -/// builder. This task periodically fetches a new token, and stores it in a -/// [`SharedToken`]. +/// builder. This task periodically fetches a new token, and sends it to all +/// active [`SharedToken`]s via a [`tokio::sync::watch`] channel.. #[derive(Debug)] pub struct Authenticator { /// Configuration - pub config: OAuthConfig, + config: OAuthConfig, client: MyOAuthClient, reqwest: reqwest::Client, @@ -146,6 +146,11 @@ impl Authenticator { Ok(token_result) } + /// Get a reference to the OAuth configuration. + pub fn config(&self) -> &OAuthConfig { + &self.config + } + /// Create a future that contains the periodic refresh loop. async fn task_future(self) { let interval = self.config.oauth_token_refresh_interval; @@ -212,6 +217,10 @@ impl SharedToken { /// Wait for the token to be available, then get a reference to it. /// + /// Holding this reference will block the background task from updating + /// the token until it is dropped, so it is recommended to drop this + /// reference as soon as possible. + /// /// This is implemented using [`Receiver::wait_for`], and has the same /// blocking, panics, errors, and cancel safety. Unlike [`Self::secret`] /// it is NOT implemented using a clone, and will update the local view of @@ -238,6 +247,10 @@ impl SharedToken { /// Borrow the current token, if available. If called before the token is /// set by the authentication task, this will return `None`. /// + /// Holding this reference will block the background task from updating + /// the token until it is dropped, so it is recommended to drop this + /// reference as soon as possible. + /// /// This is implemented using [`Receiver::borrow`]. /// /// [`Receiver::borrow`]: tokio::sync::watch::Receiver::borrow @@ -247,8 +260,11 @@ impl SharedToken { /// Check if the background task has produced an authentication token. /// - /// This is implemented using [`Receiver::borrow`], and checks if the - /// borrowed token is `Some`. + /// Holding this reference will block the background task from updating + /// the token until it is dropped, so it is recommended to drop this + /// reference as soon as possible. + /// + /// This is implemented using [`Receiver::borrow`]. /// /// [`Receiver::borrow`]: tokio::sync::watch::Receiver::borrow pub fn is_authenticated(&self) -> bool { @@ -259,7 +275,9 @@ impl SharedToken { /// A reference to token data, contained in a [`SharedToken`]. /// /// This is implemented using [`watch::Ref`], and as a result holds a lock on -/// the token data. It is recommended that this be dropped +/// the token data. Holding this reference will block the background task +/// from updating the token until it is dropped, so it is recommended to drop +/// this reference as soon as possible. pub struct TokenRef<'a> { inner: Ref<'a, Option>, } @@ -277,26 +295,34 @@ impl fmt::Debug for TokenRef<'_> { } impl<'a> TokenRef<'a> { + /// Get a reference to the inner token. pub fn inner(&'a self) -> &'a Token { self.inner.as_ref().unwrap() } + /// Get a reference to the [`AccessToken`] contained in the token. pub fn access_token(&self) -> &AccessToken { self.inner().access_token() } + /// Get a reference to the [`TokenType`] instance contained in the token. + /// + /// [`TokenType`]: oauth2::TokenType pub fn token_type(&self) -> &::TokenType { self.inner().token_type() } + /// Get a reference to the current token's expiration time, if it has one. pub fn expires_in(&self) -> Option { self.inner().expires_in() } + /// Get a reference to the refresh token, if it exists. pub fn refresh_token(&self) -> Option<&RefreshToken> { self.inner().refresh_token() } + /// Get a reference to the scopes associated with the token, if any. pub fn scopes(&self) -> Option<&Vec> { self.inner().scopes() } From 5f96be533b97d11d12090ca9c72feb979384f006 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 1 Jul 2025 14:26:46 -0400 Subject: [PATCH 6/6] lint: clippy --- src/perms/oauth.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/perms/oauth.rs b/src/perms/oauth.rs index 55652e1..654cc38 100644 --- a/src/perms/oauth.rs +++ b/src/perms/oauth.rs @@ -147,7 +147,7 @@ impl Authenticator { } /// Get a reference to the OAuth configuration. - pub fn config(&self) -> &OAuthConfig { + pub const fn config(&self) -> &OAuthConfig { &self.config }