Skip to content

Commit

Permalink
refactor: extract RepositoryAccess to abstract out scheduler reading …
Browse files Browse the repository at this point in the history
…(from config / database) (TabbyML#1388)

* hide all enterprise commands

* refactor(scheduler): extract RepositoryAccess for scheduler

* ensure community build always success

* remove scheduler related code
  • Loading branch information
wsxiaoys authored Feb 6, 2024
1 parent 99d2947 commit 4ec8b39
Show file tree
Hide file tree
Showing 19 changed files with 155 additions and 207 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ jobs:
- run: bash ./ci/prepare_build_environment.sh

- name: Run unit tests on community build
run: cargo test --bin tabby --no-default-features

- name: Run unit tests
run: cargo test --bin tabby --lib
60 changes: 9 additions & 51 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ serde_json = "1"
serdeconv = "0.4.1"
tokio = "1.28"
tracing = "0.1"
tokio-cron-scheduler = "0.9.4"
tracing-subscriber = "0.3"
anyhow = "1.0.71"
serde-jsonlines = "0.4.0"
Expand Down
15 changes: 15 additions & 0 deletions crates/tabby-common/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashSet, path::PathBuf};

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use filenamify::filenamify;
use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -132,6 +133,20 @@ impl Default for ServerConfig {
}
}

#[async_trait]
pub trait RepositoryAccess: Send + Sync {
async fn list_repositories(&self) -> Result<Vec<RepositoryConfig>>;
}

pub struct ConfigRepositoryAccess;

#[async_trait]
impl RepositoryAccess for ConfigRepositoryAccess {
async fn list_repositories(&self) -> Result<Vec<RepositoryConfig>> {
Ok(Config::load()?.repositories)
}
}

#[cfg(test)]
mod tests {
use super::{Config, RepositoryConfig};
Expand Down
1 change: 1 addition & 0 deletions crates/tabby-common/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub static USER_HEADER_FIELD_NAME: &str = "x-tabby-user";
1 change: 1 addition & 0 deletions crates/tabby-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Defines common types and utilities used across multiple tabby subprojects, especially serialization and deserialization targets.
pub mod api;
pub mod config;
pub mod constants;
pub mod index;
pub mod languages;
pub mod path;
Expand Down
3 changes: 2 additions & 1 deletion crates/tabby-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ edition = "2021"

[dependencies]
anyhow = { workspace = true }
job_scheduler = "1.2.1"
tabby-common = { path = "../tabby-common" }
tantivy = { workspace = true }
tracing = { workspace = true }
Expand All @@ -28,10 +27,12 @@ kdam = { version = "0.5.0" }
requirements = "0.3.0"
serdeconv.workspace = true
cargo-lock = { version = "9.0.0", features = ["dependency-tree"] }
tokio-cron-scheduler = { workspace = true }

[dev-dependencies]
temp_testdir = "0.2"
tabby-common = { path = "../tabby-common", features = [ "testutils" ] }
tracing-test = "0.1"
tokio = { workspace = true, features = ["rt", "macros"] }
serde_json = { workspace = true }
async-trait = { workspace = true }
60 changes: 38 additions & 22 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,57 @@ mod index;
mod repository;
mod utils;

use anyhow::Result;
use job_scheduler::{Job, JobScheduler};
use tabby_common::config::RepositoryConfig;
use tracing::{error, info};

pub async fn scheduler(now: bool, access: Vec<RepositoryConfig>) -> Result<()> {
let mut scheduler = JobScheduler::new();
use std::sync::Arc;

let job1 = || job_sync(&access);

let job2 = || job_index(&access);
use anyhow::Result;
use tabby_common::config::{RepositoryAccess, RepositoryConfig};
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{error, info, warn};

pub async fn scheduler<T: RepositoryAccess + 'static>(now: bool, access: T) -> Result<()> {
if now {
job1();
job2();
let repositories = access.list_repositories().await?;
job_sync(&repositories);
job_index(&repositories);
} else {
let access = Arc::new(access);
let scheduler = JobScheduler::new().await?;
// Every 5 minutes.
scheduler.add(Job::new("0 1/5 * * * * *".parse().unwrap(), job1));
let access_clone = access.clone();
scheduler
.add(Job::new_async("0 1/5 * * * * *", move |_, _| {
let access = access_clone.clone();
Box::pin(async move {
match access.list_repositories().await {
Ok(repositories) => job_sync(&repositories),
Err(err) => warn!("Failed to list_repositories: {}", err),
}
})
})?)
.await?;

// Every 5 hours.
scheduler.add(Job::new("0 0 1/5 * * * *".parse().unwrap(), job2));
let access_clone = access.clone();
scheduler
.add(Job::new_async("0 0 1/5 * * * *", move |_, _| {
let access = access_clone.clone();
Box::pin(async move {
match access.list_repositories().await {
Ok(repositories) => job_index(&repositories),
Err(err) => warn!("Failed to list_repositories: {}", err),
}
})
})?)
.await?;

info!("Scheduler activated...");
loop {
scheduler.tick();
let duration = scheduler.time_till_next_job();
info!("Sleep {:?} for next job ...", duration);
std::thread::sleep(duration);
}
scheduler.start().await?;
}

Ok(())
}

pub fn job_index(repositories: &[RepositoryConfig]) {
fn job_index(repositories: &[RepositoryConfig]) {
println!("Indexing repositories...");
let ret = index::index_repositories(repositories);
if let Err(err) = ret {
Expand All @@ -48,7 +64,7 @@ pub fn job_index(repositories: &[RepositoryConfig]) {
println!();
}

pub fn job_sync(repositories: &[RepositoryConfig]) {
fn job_sync(repositories: &[RepositoryConfig]) {
println!("Syncing repositories...");
let ret = repository::sync_repositories(repositories);
if let Err(err) = ret {
Expand Down
26 changes: 19 additions & 7 deletions crates/tabby-scheduler/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,41 @@
use async_trait::async_trait;
use tabby_common::config::{RepositoryAccess, RepositoryConfig};

struct StaticRepositoryAccess {
repositories: Vec<RepositoryConfig>,
}

#[async_trait]
impl RepositoryAccess for StaticRepositoryAccess {
async fn list_repositories(&self) -> anyhow::Result<Vec<RepositoryConfig>> {
Ok(self.repositories.clone())
}
}

#[cfg(test)]
mod tests {
use std::fs::create_dir_all;

use tabby_common::{
config::{Config, RepositoryConfig, ServerConfig},
path::set_tabby_root,
};
use tabby_common::{config::RepositoryConfig, path::set_tabby_root};
use temp_testdir::*;
use tracing_test::traced_test;

use super::StaticRepositoryAccess;

#[traced_test]
#[tokio::test]
async fn end_to_end() {
let root = TempDir::default();
create_dir_all(&root).expect("Failed to create tabby root");
set_tabby_root(root.to_path_buf());

let config = Config {
let config = StaticRepositoryAccess {
repositories: vec![RepositoryConfig::new(
"https://github.com/TabbyML/interview-questions".to_owned(),
)],
server: ServerConfig::default(),
};

let res = tabby_scheduler::scheduler(true, config.repositories).await;
let res = tabby_scheduler::scheduler(true, config).await;
res.expect("Failed to run scheduler");
}
}
Loading

0 comments on commit 4ec8b39

Please sign in to comment.