Skip to content

Commit

Permalink
feat: improve shared data structure peformance
Browse files Browse the repository at this point in the history
The data that is synced across threads for incoming requests needs to
be stored in a data structure that facilitates udpating the underlying
data at runtime so that up-to-date search results can be served.

The previous implementation utilized Mutexes to acquire read access for
searching and write access to update the underlying data structure.
With an optimized data structure for frequent reads and infrequent
writes the performance can be improved significantly while eliminating
cross-thread syncronization slowdowns.

This commit also introduces some profiling tools for the development
environment provided in the nix flake.
  • Loading branch information
PhilTaken committed Mar 17, 2024
1 parent 7f93e32 commit e137b71
Show file tree
Hide file tree
Showing 13 changed files with 569 additions and 423 deletions.
299 changes: 291 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ askama = { version = "0.12.1", features = ["with-axum"] }
askama_axum = "0.4.0"
axum = { version = "0.7.4", features = ["macros"] }
clap = { version = "4.5.1", features = ["derive"] }
console-subscriber = "0.2.0"
ctrlc = "3.4.2"
itertools = "0.12.1"
markdown = "1.0.0-alpha.16"
Expand All @@ -36,3 +37,10 @@ strip = "debuginfo"
codegen-units = 1
lto = true
opt-level = 'z'

[profile.profiling]
inherits = "release"
debug = true

[build]
rustflags = ["--cfg", "tokio_unstable"]
3 changes: 3 additions & 0 deletions bacon.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ command = [
"cargo", "run",
"--bin", "fc-search",
"--color", "always",
"--",
"--test",
"--state-dir", "./test-state-dir"
]
need_stdout = true
watch = ["templates", "src"]
Expand Down
5 changes: 2 additions & 3 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,11 @@
{
packages =
(old.packages or [])
++ [pkgs.bacon pkgs.samply pkgs.tailwindcss pkgs.drill];
++ [pkgs.bacon pkgs.samply pkgs.tailwindcss pkgs.drill pkgs.wrk pkgs.inferno pkgs.tokio-console];
shellHook = ''
${old.shellHook or ""}
${config.pre-commit.installationScript}
'';
RUST_LOG = "fc_search=debug";
}
// (pkgs.lib.optionalAttrs (system == "aarch64-linux") {
# use mold in the devshell on aarch64-linux for quicker iteration
Expand All @@ -89,7 +88,7 @@
(pkgs.lib.makeLibraryPath [pkgs.openssl])
}";
CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER = "${pkgs.llvmPackages.clangUseLLVM}/bin/clang";
RUSTFLAGS = "-Clink-arg=-fuse-ld=${pkgs.mold}/bin/mold -Zthreads=0 -Zshare-generics=n";
RUSTFLAGS = "--cfg tokio_unstable -Clink-arg=-fuse-ld=${pkgs.mold}/bin/mold -Zthreads=0 -Zshare-generics=n";
}));
};
};
Expand Down
201 changes: 116 additions & 85 deletions src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
use rust_embed::RustEmbed;
use std::{
collections::HashMap,
path::Path,
sync::{Arc, Mutex, Weak},
time::Duration,
};
use tokio::time::interval_at;

use anyhow::Context;
use askama::Template;
use axum::{
Expand All @@ -16,81 +7,74 @@ use axum::{
routing::get,
Router,
};
use itertools::Itertools;
use tracing::{debug, info};

use fc_search::{
get_fcio_flake_uris, nix::NixPackage, search::ChannelSearcher, Flake, NaiveNixosOption, NixHtml,
};

use itertools::Itertools;
use rust_embed::RustEmbed;
use serde::Deserialize;
use std::{
collections::HashMap,
path::Path,
sync::{Arc, RwLock},
time::Duration,
};
use tokio::time::interval;
use tracing::{debug, error, info};

#[derive(Clone)]
struct AppState {
// Arc to prevent clones for every request, just need read access in the search handler
channels: Arc<HashMap<String, Weak<Mutex<ChannelSearcher>>>>,
}

fn default_channel() -> String {
"fc-23.11-dev".to_string()
channels: Arc<HashMap<String, RwLock<ChannelSearcher>>>,
}

#[derive(Deserialize, Debug)]
struct SearchForm {
#[serde(default)]
q: String,
#[serde(default = "default_channel")]
channel: String,
channel: Option<String>,
}

impl AppState {
fn active_branches(&self) -> Vec<&String> {
self.channels
.iter()
.filter(|(_, searcher)| {
searcher
.upgrade()
.and_then(|s| s.lock().map(|s| s.active()).ok())
.unwrap_or(false)
})
.map(|(name, _)| name)
.sorted()
.collect_vec()
async fn active_branches(&self) -> Vec<&String> {
let mut channels = Vec::new();
for channel in self.channels.iter() {
if channel.1.read().unwrap().active() {
channels.push(channel.0)
}
}
channels
}

fn in_dir(state_dir: &Path, branches: Vec<Flake>, start_timers: bool) -> anyhow::Result<Self> {
fn in_dir(state_dir: &Path, branches: Vec<Flake>) -> anyhow::Result<Self> {
debug!("initializing app state");

if !state_dir.exists() {
std::fs::create_dir_all(state_dir)?;
}

let mut channels = HashMap::new();
for (i, flake) in branches.iter().enumerate() {
for mut flake in branches {
let branchname = flake.branch.clone();
let branch_path = state_dir.join(branchname.clone());

debug!("starting searcher for branch {}", &branchname);
let searcher = ChannelSearcher::new(&branch_path, flake);

// attempt not to (re)build multiple channels at the same time by spreading them 5
// minutes apart
let weak = if start_timers {
let freq = Duration::from_hours(5);
let start_time = tokio::time::Instant::now() + Duration::from_mins(i as u64 * 5);
let interval = interval_at(start_time, freq);
searcher.start_timer(interval)
} else {
let start_time = if !searcher.active() {
tokio::time::Instant::now()
} else {
tokio::time::Instant::now() + Duration::from_days(100_000)

let flake_info_path = branch_path.join("flake_info.json");
if matches!(flake.rev, fc_search::FlakeRev::FallbackToCached)
&& flake_info_path.exists()
{
if let Ok(saved_flake) = serde_json::from_str::<Flake>(
&std::fs::read_to_string(flake_info_path)
.expect("flake_info.json exists but could not be read"),
) {
info!("loaded flake from file cache: {:#?}", saved_flake);
flake = saved_flake;
};
let freq = Duration::from_days(100_000);
let interval = interval_at(start_time, freq);
searcher.start_timer(interval)
};
channels.insert(branchname, weak);
}

let searcher = RwLock::new(ChannelSearcher::new(&branch_path, &flake));
channels.insert(branchname, searcher);
}

Ok(Self {
Expand All @@ -106,9 +90,7 @@ pub async fn run(port: u16, state_dir: &Path, test: bool) -> anyhow::Result<()>
owner: "flyingcircusio".to_string(),
name: "fc-nixos".to_string(),
branch: "fc-23.11-dev".to_string(),
rev: fc_search::FlakeRev::Specific(
"62dd02d70222ffc1f3841fb8308952bedb2bfe96".to_string(),
),
rev: fc_search::FlakeRev::FallbackToCached,
}]
};

Expand All @@ -121,7 +103,7 @@ pub async fn run(port: u16, state_dir: &Path, test: bool) -> anyhow::Result<()>
};

// in release mode try to load the cached index from disk
AppState::in_dir(state_dir, branches, !test)?
AppState::in_dir(state_dir, branches)?
};

let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
Expand All @@ -135,50 +117,70 @@ pub async fn run(port: u16, state_dir: &Path, test: bool) -> anyhow::Result<()>
.route("/search/options", get(search_options_handler))
.route("/search/packages", get(search_packages_handler))
.route("/assets/*file", get(static_handler))
.with_state(state);
.with_state(state.clone());

let listener = tokio::net::TcpListener::bind(addr).await?;
info!(
"router initialized, now listening on http://{}",
listener.local_addr().unwrap()
);

axum::serve(listener, router.into_make_service())
let updater_channels = state.channels.clone();
let updater_handle = if !test {
// run update loop in the background
tokio::spawn(async move {
let freq = Duration::from_hours(5);
let mut interval = interval(freq);
loop {
interval.tick().await;
for (branch, searcher) in updater_channels.iter() {
update_channel(branch, searcher).await;
}
}
})
} else {
// just update once, no need for a timed update
tokio::spawn(async move {
for (branch, searcher) in updater_channels.iter() {
update_channel(branch, searcher).await;
}
})
};

if let Err(e) = axum::serve(listener, router.into_make_service())
.await
.context("error while starting server")?;
Ok(())
.context("error while starting server")
{
let _ = updater_handle.abort();
Err(e)
} else {
Ok(())
}
}

async fn index_handler() -> impl IntoResponse {
Redirect::permanent("/search").into_response()
}

fn search_with_channel<F, V>(state: &AppState, channel: &str, f: F) -> Vec<V>
where
F: FnOnce(&ChannelSearcher) -> Vec<&V>,
V: Clone,
{
state
.channels
.get(channel)
.and_then(|c| {
let channel = c.upgrade()?;
channel
.lock()
.map(|c| f(&c).into_iter().cloned().collect_vec())
.ok()
})
.unwrap_or_default()
}

#[tracing::instrument(skip(state, headers))]
async fn search_options_handler<'a>(
State(state): State<AppState>,
headers: HeaderMap,
form: axum::extract::Form<SearchForm>,
) -> impl IntoResponse {
let search_results = if !form.q.is_empty() {
search_with_channel(&state, &form.channel, |c| c.search_options(&form.q))
let channel = form.channel.as_ref().unwrap_or_else(|| {
state
.channels
.keys()
.sorted()
.next()
.context("no channels active")
.unwrap()
});
match state.channels.get(channel) {
Some(c) => c.read().unwrap().search_options(&form.q),
None => Vec::new(),
}
} else {
Vec::new()
};
Expand All @@ -191,21 +193,32 @@ async fn search_options_handler<'a>(
}

HtmlTemplate(OptionsIndexTemplate {
branches: state.active_branches(),
branches: state.active_branches().await,
results: search_results,
search_value: &form.q,
})
.into_response()
}

#[tracing::instrument(skip(state))]
async fn search_packages_handler<'a>(
State(state): State<AppState>,
headers: HeaderMap,
form: axum::extract::Form<SearchForm>,
) -> impl IntoResponse {
let search_results = if !form.q.is_empty() {
search_with_channel(&state, &form.channel, |c| c.search_packages(&form.q))
let channel = form.channel.as_ref().unwrap_or_else(|| {
state
.channels
.keys()
.sorted()
.next()
.context("no channels active")
.unwrap()
});
match state.channels.get(channel) {
Some(c) => c.read().unwrap().search_packages(&form.q),
None => Vec::new(),
}
} else {
Vec::new()
};
Expand All @@ -218,7 +231,7 @@ async fn search_packages_handler<'a>(
}

HtmlTemplate(PackagesIndexTemplate {
branches: state.active_branches(),
branches: state.active_branches().await,
results: search_results,
search_value: &form.q,
})
Expand Down Expand Up @@ -303,3 +316,21 @@ where
}
}
}

async fn update_channel(branch: &str, channel: &RwLock<ChannelSearcher>) {
// obtain the current searcher
let mut cs: ChannelSearcher = channel.read().unwrap().clone();

// no lock on the channel searcher here, so we can update it
// and replace the value on success while search is still running
// in an error case the old status is retained and the error logged
info!("starting update for branch {}", branch);
match cs.update().await {
Err(e) => error!("error updating branch {}: {e:?}", branch),
Ok(()) => {
// replace the old searcher with the updated one on success
let mut old = channel.write().unwrap();
*old = cs;
}
}
}
Loading

0 comments on commit e137b71

Please sign in to comment.