Skip to content

Commit

Permalink
Merge pull request #262 from superfly/cheaper-updates
Browse files Browse the repository at this point in the history
Add endpoint to receive updates for tables
  • Loading branch information
pborzenkov authored Dec 10, 2024
2 parents 7836045 + a8c4c63 commit e66f039
Show file tree
Hide file tree
Showing 15 changed files with 1,251 additions and 239 deletions.
1 change: 1 addition & 0 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use corro_types::{
broadcast::{FocaCmd, FocaInput, Timestamp},
sqlite::SqlitePoolError,
sync::generate_sync,
updates::Handle,
};
use futures::{SinkExt, TryStreamExt};
use rusqlite::{named_params, params, OptionalExtension};
Expand Down
2 changes: 2 additions & 0 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
rx_foca,
subs_manager,
subs_bcast_cache,
updates_bcast_cache,
rtt_rx,
} = opts;

Expand Down Expand Up @@ -96,6 +97,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
&agent,
&tripwire,
subs_bcast_cache,
updates_bcast_cache,
&subs_manager,
api_listeners,
)
Expand Down
12 changes: 11 additions & 1 deletion crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ use tripwire::Tripwire;
use crate::{
api::{
peer::gossip_server_endpoint,
public::pubsub::{process_sub_channel, MatcherBroadcastCache, SharedMatcherBroadcastCache},
public::{
pubsub::{process_sub_channel, MatcherBroadcastCache, SharedMatcherBroadcastCache},
update::SharedUpdateBroadcastCache,
},
},
transport::Transport,
};
use corro_types::updates::UpdatesManager;
use corro_types::{
actor::ActorId,
agent::{migrate, Agent, AgentConfig, Booked, BookedVersions, LockRegistry, SplitPool},
Expand Down Expand Up @@ -59,6 +63,7 @@ pub struct AgentOptions {
pub rtt_rx: TokioReceiver<(SocketAddr, Duration)>,
pub subs_manager: SubsManager,
pub subs_bcast_cache: SharedMatcherBroadcastCache,
pub updates_bcast_cache: SharedUpdateBroadcastCache,
pub tripwire: Tripwire,
}

Expand Down Expand Up @@ -108,6 +113,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

let subs_manager = SubsManager::default();

let updates_manager = UpdatesManager::default();
// Setup subscription handlers
let subs_bcast_cache = setup_spawn_subscriptions(
&subs_manager,
Expand All @@ -118,6 +124,8 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
)
.await?;

let updates_bcast_cache = SharedUpdateBroadcastCache::default();

let cluster_id = {
let conn = pool.read().await?;
conn.query_row(
Expand Down Expand Up @@ -235,6 +243,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
rtt_rx,
subs_manager: subs_manager.clone(),
subs_bcast_cache,
updates_bcast_cache,
tripwire: tripwire.clone(),
};

Expand All @@ -258,6 +267,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
schema: RwLock::new(schema),
cluster_id,
subs_manager,
updates_manager,
tripwire,
});

Expand Down
37 changes: 30 additions & 7 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
api::public::{
api_v1_db_schema, api_v1_queries, api_v1_table_stats, api_v1_transactions,
pubsub::{api_v1_sub_by_id, api_v1_subs},
update::SharedUpdateBroadcastCache,
},
transport::Transport,
};
Expand All @@ -26,6 +27,7 @@ use corro_types::{
channel::CorroReceiver,
config::AuthzConfig,
pubsub::SubsManager,
updates::{match_changes, match_changes_from_db_version},
};
use std::{
cmp,
Expand All @@ -37,6 +39,7 @@ use std::{
time::{Duration, Instant},
};

use crate::api::public::update::api_v1_updates;
use axum::{
error_handling::HandleErrorLayer,
extract::DefaultBodyLimit,
Expand Down Expand Up @@ -169,6 +172,7 @@ pub async fn setup_http_api_handler(
agent: &Agent,
tripwire: &Tripwire,
subs_bcast_cache: BcastCache,
updates_bcast_cache: SharedUpdateBroadcastCache,
subs_manager: &SubsManager,
api_listeners: Vec<TcpListener>,
) -> eyre::Result<()> {
Expand Down Expand Up @@ -217,6 +221,20 @@ pub async fn setup_http_api_handler(
.layer(ConcurrencyLimitLayer::new(128)),
),
)
.route(
"/v1/updates/:table",
post(api_v1_updates).route_layer(
tower::ServiceBuilder::new()
.layer(HandleErrorLayer::new(|_error: BoxError| async {
Ok::<_, Infallible>((
StatusCode::SERVICE_UNAVAILABLE,
"max concurrency limit reached".to_string(),
))
}))
.layer(LoadShedLayer::new())
.layer(ConcurrencyLimitLayer::new(128)),
),
)
.route(
"/v1/subscriptions/:id",
get(api_v1_sub_by_id).route_layer(
Expand Down Expand Up @@ -265,6 +283,7 @@ pub async fn setup_http_api_handler(
.layer(Extension(Arc::new(AtomicI64::new(0))))
.layer(Extension(agent.clone()))
.layer(Extension(subs_bcast_cache))
.layer(Extension(updates_bcast_cache))
.layer(Extension(subs_manager.clone()))
.layer(Extension(tripwire.clone())),
)
Expand Down Expand Up @@ -712,11 +731,16 @@ pub async fn process_fully_buffered_changes(
if let Some(db_version) = db_version {
let conn = agent.pool().read().await?;
block_in_place(|| {
if let Err(e) = agent
.subs_manager()
.match_changes_from_db_version(&conn, db_version)
if let Err(e) = match_changes_from_db_version(agent.subs_manager(), &conn, db_version) {
error!(%db_version, "could not match changes for subs from db version: {e}");
}
});

block_in_place(|| {
if let Err(e) =
match_changes_from_db_version(agent.updates_manager(), &conn, db_version)
{
error!(%db_version, "could not match changes from db version: {e}");
error!(%db_version, "could not match changes for updates from db version: {e}");
}
});
}
Expand Down Expand Up @@ -1063,9 +1087,8 @@ pub async fn process_multiple_changes(

for (_actor_id, changeset, db_version, _src) in changesets {
change_chunk_size += changeset.changes().len();
agent
.subs_manager()
.match_changes(changeset.changes(), db_version);
match_changes(agent.subs_manager(), changeset.changes(), db_version);
match_changes(agent.updates_manager(), changeset.changes(), db_version);
}

histogram!("corro.agent.changes.processing.time.seconds").record(start.elapsed());
Expand Down
2 changes: 2 additions & 0 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use corro_types::broadcast::broadcast_changes;

pub mod pubsub;

pub mod update;

pub async fn make_broadcastable_changes<F, T>(
agent: &Agent,
f: F,
Expand Down
Loading

0 comments on commit e66f039

Please sign in to comment.