Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read cloud creds for obstore from env #556

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ jobs:
- uses: Swatinem/rust-cache@v2
- name: Test
run: cargo test -p stac-extensions
test-object-store-cache:
name: Test stac-object-store-cache
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: Swatinem/rust-cache@v2
- name: Test
run: cargo test -p stac-object-store-cache --all-features
test-pgstac:
name: Test pgstac
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"crates/derive",
"crates/duckdb",
"crates/extensions",
"crates/object-store-cache",
"crates/pgstac",
"crates/server",
]
Expand Down Expand Up @@ -57,6 +58,7 @@ log = "0.4.22"
mime = "0.3.17"
mockito = "1.5"
object_store = "0.11.0"
once_cell = "1.20.2"
openssl = { version = "0.10.68", features = ["vendored"] }
openssl-src = "=300.4.1" # joinked from https://github.com/iopsystems/rpc-perf/commit/705b290d2105af6f33150da04b217422c6d68701#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R41 to cross-compile Python
parquet = { version = "52.2", default-features = false }
Expand All @@ -75,6 +77,7 @@ stac-api = { version = "0.6.2", path = "crates/api" }
stac-derive = { version = "0.1.0", path = "crates/derive" }
stac-duckdb = { version = "0.0.3", path = "crates/duckdb" }
stac-server = { version = "0.3.2", path = "crates/server" }
stac-object-store-cache = { version = "0.1.0", path = "crates/object-store-cache" }
syn = "2.0"
tempfile = "3.13"
thiserror = "2.0"
Expand Down
3 changes: 2 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ geoparquet-compression = [
"parquet/lz4",
"parquet/zstd",
]
object-store = ["dep:object_store", "dep:tokio"]
object-store = ["dep:object_store", "dep:tokio", "dep:stac-object-store-cache"]
object-store-aws = ["object-store", "object_store/aws"]
object-store-azure = ["object-store", "object_store/azure"]
object-store-gcp = ["object-store", "object_store/gcp"]
Expand Down Expand Up @@ -68,6 +68,7 @@ reqwest = { workspace = true, features = ["json", "blocking"], optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["preserve_order"] }
stac-derive.workspace = true
stac-object-store-cache = { workspace = true, optional = true }
thiserror.workspace = true
tokio = { workspace = true, optional = true }
tracing.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ pub enum Error {
#[error("json value is not an object")]
NotAnObject(serde_json::Value),

/// [stac-object-store-cache::Error]
#[error(transparent)]
#[cfg(feature = "object-store")]
ObjectStoreCache(#[from] stac_object_store_cache::Error),

/// [object_store::Error]
#[error(transparent)]
#[cfg(feature = "object-store")]
Expand Down
10 changes: 4 additions & 6 deletions crates/core/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ impl Format {
let href = href.into();
match href.realize() {
RealizedHref::Url(url) => {
use object_store::ObjectStore;

let (object_store, path) = object_store::parse_url_opts(&url, options)?;
let (object_store, path) =
stac_object_store_cache::parse_url_opts(&url, options).await?;
let get_result = object_store.get(&path).await?;
let mut value: T = self.from_bytes(get_result.bytes().await?)?;
*value.self_href_mut() = Some(Href::Url(url));
Expand Down Expand Up @@ -237,9 +236,8 @@ impl Format {
{
let href = href.to_string();
if let Ok(url) = url::Url::parse(&href) {
use object_store::ObjectStore;

let (object_store, path) = object_store::parse_url_opts(&url, options)?;
let (object_store, path) =
stac_object_store_cache::parse_url_opts(&url, options).await?;
let bytes = self.into_vec(value)?;
let put_result = object_store.put(&path, bytes.into()).await?;
Ok(Some(put_result))
Expand Down
35 changes: 35 additions & 0 deletions crates/object-store-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "stac-object-store-cache"
description = "Create and cache object stores based on url in stac."
version = "0.1.0"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true
categories.workspace = true
rust-version.workspace = true

[features]
object-store-aws = ["object_store/aws"]
object-store-azure = ["object_store/azure"]
object-store-gcp = ["object_store/gcp"]
object-store-http = ["object_store/http"]
object-store-all = [
"object-store-aws",
"object-store-azure",
"object-store-gcp",
"object-store-http",
]


[dependencies]
object_store = { workspace = true }
once_cell = { workspace = true }
thiserror.workspace = true
tokio = { workspace = true }
url = { workspace = true, features = ["serde"] }

[dev-dependencies]
tokio = { workspace = true, features = ["macros"] }
tokio-test.workspace = true
Empty file.
247 changes: 247 additions & 0 deletions crates/object-store-cache/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
use std::{collections::HashMap, sync::Arc};

use crate::{Error, Result};

use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme};
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use url::Url;

// To avoid memory leaks, we clear the cache when it grows too big.
// The value does not have any meaning, other than polars use the same.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you link to the polars code that uses it? Credit where credit is due.

const CACHE_SIZE: usize = 8;

static OBJECT_STORE_CACHE: Lazy<RwLock<HashMap<ObjectStoreIdentifier, Arc<DynObjectStore>>>> =
Lazy::new(Default::default);

/// Parameter set to identify and cache an object store
#[derive(PartialEq, Eq, Hash, Debug)]
struct ObjectStoreIdentifier {
/// A base url to the bucket.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not always a bucket, right? I.e. Azure doesn't call them buckets.

base_url: Url,

/// Object Store options
options: Vec<(String, String)>,
}

impl ObjectStoreIdentifier {
fn new<I, K, V>(base_url: Url, options: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
Self {
base_url,
options: options
.into_iter()
.map(|(k, v)| (k.as_ref().into(), v.into()))
.collect(),
}
}

fn get_options(&self) -> Vec<(String, String)> {
self.options.to_owned()
}
}

#[cfg(any(
feature = "object-store-aws",
feature = "object-store-gcp",
feature = "object-store-azure"
))]
macro_rules! builder_env_opts {
($builder:ty, $url:expr, $options:expr) => {{
let builder = $options.into_iter().fold(
<$builder>::from_env().with_url($url.to_string()),
|builder, (key, value)| match key.as_ref().parse() {
Ok(k) => builder.with_config(k, value),
Err(_) => builder,
},
);
Arc::new(builder.build()?)
}};
}

/// This was yanked from [object_store::parse_url_opts] with the following changes:
///
/// - Build [object_store::ObjectStore] with environment variables
/// - Return [Arc] instead of [Box]
#[cfg_attr(
not(any(
feature = "object-store-aws",
feature = "object-store-gcp",
feature = "object-store-azure",
feature = "object-store-http"
)),
allow(unused_variables)
)]
fn create_object_store<I, K, V>(
scheme: ObjectStoreScheme,
url: &Url,
options: I,
) -> Result<Arc<DynObjectStore>>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let store: Arc<DynObjectStore> = match scheme {
ObjectStoreScheme::Local => Arc::new(LocalFileSystem::new()),
#[cfg(feature = "object-store-aws")]
ObjectStoreScheme::AmazonS3 => {
builder_env_opts!(object_store::aws::AmazonS3Builder, url, options)
}
#[cfg(feature = "object-store-gcp")]
ObjectStoreScheme::GoogleCloudStorage => {
builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, options)
}
#[cfg(feature = "object-store-azure")]
ObjectStoreScheme::MicrosoftAzure => {
builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, options)
}
#[cfg(feature = "object-store-http")]
ObjectStoreScheme::Http => {
let url = &url[..url::Position::BeforePath];
let builder = options.into_iter().fold(
object_store::http::HttpBuilder::new().with_url(url.to_string()),
|builder, (key, value)| match key.as_ref().parse() {
Ok(k) => builder.with_config(k, value),
Err(_) => builder,
},
);
Arc::new(builder.build()?)
}
s => return Err(Error::ObjectStoreCreate { scheme: s }),
};
Ok(store)
}

/// Drop-in replacement for [object_store::parse_url_opts] with caching and env vars.
///
/// It will create or retrieve object store based on passed `url` and `options`.
/// Keeps global cache
pub async fn parse_url_opts<I, K, V>(
url: &Url,
options: I,
) -> crate::Result<(Arc<DynObjectStore>, Path)>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let (scheme, path) = ObjectStoreScheme::parse(url).map_err(object_store::Error::from)?;

let base_url = url
.as_ref()
.strip_suffix(path.as_ref())
.unwrap_or_default()
.try_into()?;

let object_store_id = ObjectStoreIdentifier::new(base_url, options);
let options = object_store_id.get_options();

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some(store) = (*cache).get(&object_store_id) {
return Ok((store.clone(), path));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some tracing::debug statements here and Below when we write to (and sometimes clear) the cache?

}
}
let store = create_object_store(scheme, url, options)?;
{
let mut cache = OBJECT_STORE_CACHE.write().await;

if cache.len() >= CACHE_SIZE {
(*cache).clear()
}
_ = (*cache).insert(object_store_id, store.clone());
}

Ok((store.clone(), path))
}

#[cfg(test)]
mod tests {
use url::Url;

use super::*;

#[tokio::test]
async fn file_different_path() {
let options: Vec<(String, String)> = Vec::new();

let url = Url::parse("file:///some/path").unwrap();
let (store, path) = parse_url_opts(&url, options.clone()).await.unwrap();

let url2 = Url::parse("file:///other/path").unwrap();
let (store2, _) = parse_url_opts(&url2, options.clone()).await.unwrap();

{
let cache = OBJECT_STORE_CACHE.read().await;
println!("{cache:#?}")
}

Comment on lines +179 to +183
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{
let cache = OBJECT_STORE_CACHE.read().await;
println!("{cache:#?}")
}

assert!(Arc::ptr_eq(&store, &store2));
assert!(std::ptr::addr_eq(Arc::as_ptr(&store), Arc::as_ptr(&store2)));
assert_eq!(path.as_ref(), "some/path");
}

#[tokio::test]
async fn file_different_options() {
let options: Vec<(String, String)> = Vec::new();

let url = Url::parse("file:///some/path").unwrap();
let (store, _) = parse_url_opts(&url, options).await.unwrap();

let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];
let url2 = Url::parse("file:///some/path").unwrap();
let (store2, _) = parse_url_opts(&url2, options2).await.unwrap();

assert!(!Arc::ptr_eq(&store, &store2));
}

#[cfg(feature = "object-store-aws")]
#[tokio::test]
async fn cache_works() {
let url = Url::parse("s3://bucket/item").unwrap();
let options: Vec<(String, String)> = Vec::new();

let (store1, path) = parse_url_opts(&url, options.clone()).await.unwrap();

let url2 = Url::parse("s3://bucket/item2").unwrap();
let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap();

assert!(Arc::ptr_eq(&store1, &store2));
assert_eq!(path.as_ref(), "item");
}

#[cfg(feature = "object-store-aws")]
#[tokio::test]
async fn different_options() {
let url = Url::parse("s3://bucket/item").unwrap();
let options: Vec<(String, String)> = Vec::new();

let (store, _path) = parse_url_opts(&url, options).await.unwrap();

let url2 = Url::parse("s3://bucket/item2").unwrap();
let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];
let (store2, _path) = parse_url_opts(&url2, options2).await.unwrap();

assert!(!Arc::ptr_eq(&store, &store2));
}

#[cfg(feature = "object-store-aws")]
#[tokio::test]
async fn different_urls() {
let url = Url::parse("s3://bucket/item").unwrap();
let options: Vec<(String, String)> = Vec::new();

let (store, _path) = parse_url_opts(&url, options.clone()).await.unwrap();

let url2 = Url::parse("s3://other-bucket/item").unwrap();
// let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];

let (store2, _path) = parse_url_opts(&url2, options).await.unwrap();

assert!(!Arc::ptr_eq(&store, &store2));
}
}
Loading
Loading