Skip to content

Commit

Permalink
feat(webserver): Implement new job runs APIs (TabbyML#1720)
Browse files Browse the repository at this point in the history
* feat(webserver): Implement new job runs APIs

* [autofix.ci] apply automated fixes

* Apply suggestions, add unit test

* Add test, use created_at

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
boxbeam and autofix-ci[bot] authored Mar 27, 2024
1 parent f05563a commit be17d8b
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 17 deletions.
1 change: 1 addition & 0 deletions ee/tabby-db/migrations/0020_job-run-index.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX idx_job_created_at;
1 change: 1 addition & 0 deletions ee/tabby-db/migrations/0020_job-run-index.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX idx_job_created_at ON job_runs(job, created_at);
52 changes: 47 additions & 5 deletions ee/tabby-db/src/job_runs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use chrono::{Duration, Utc};
use sqlx::{query, FromRow};
use tabby_db_macros::query_paged_as;

Expand All @@ -20,6 +21,13 @@ pub struct JobRunDAO {
pub finished_at: DbOption<DateTimeUtc>,
}

#[derive(FromRow)]
pub struct JobStatsDAO {
pub success: i32,
pub failed: i32,
pub pending: i32,
}

/// db read/write operations for `job_runs` table
impl DbConn {
pub async fn create_job_run(&self, job: String) -> Result<i32> {
Expand Down Expand Up @@ -61,17 +69,26 @@ impl DbConn {
pub async fn list_job_runs_with_filter(
&self,
ids: Option<Vec<i32>>,
jobs: Option<Vec<String>>,
limit: Option<usize>,
skip_id: Option<i32>,
backwards: bool,
) -> Result<Vec<JobRunDAO>> {
let condition = if let Some(ids) = ids {
let mut conditions = vec![];

if let Some(ids) = ids {
let ids: Vec<String> = ids.iter().map(i32::to_string).collect();
let ids = ids.join(", ");
Some(format!("id in ({ids})"))
} else {
None
};
conditions.push(format!("id in ({ids})"));
}

if let Some(jobs) = jobs {
let jobs: Vec<String> = jobs.iter().map(|s| format!("{s:?}")).collect();
let jobs = jobs.join(", ");
conditions.push(format!("job in ({jobs})"));
}

let condition = (!conditions.is_empty()).then_some(conditions.join(" AND "));
let job_runs: Vec<JobRunDAO> = query_paged_as!(
JobRunDAO,
"job_runs",
Expand All @@ -96,6 +113,31 @@ impl DbConn {
Ok(job_runs)
}

pub async fn compute_job_stats(&self, jobs: Option<Vec<String>>) -> Result<JobStatsDAO> {
let condition = match jobs {
Some(jobs) => {
let jobs: Vec<_> = jobs.into_iter().map(|s| format!("{s:?}")).collect();
let jobs = jobs.join(", ");
format!("AND job IN ({jobs})")
}
None => "".into(),
};

let cutoff = Utc::now() - Duration::days(7);

let stats = sqlx::query_as(&format!(
r#"SELECT
SUM(exit_code == 0) AS success,
SUM(exit_code != 0 AND exit_code IS NOT NULL) AS failed,
SUM(exit_code IS NULL) AS pending FROM job_runs
WHERE created_at > ? {condition};"#
))
.bind(cutoff)
.fetch_one(&self.pool)
.await?;
Ok(stats)
}

pub async fn cleanup_stale_job_runs(&self) -> Result<()> {
query!("DELETE FROM job_runs WHERE exit_code IS NULL;")
.execute(&self.pool)
Expand Down
16 changes: 12 additions & 4 deletions ee/tabby-webserver/graphql/schema.graphql
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
enum OAuthProvider {
GITHUB
GOOGLE
}

input RequestInvitationInput {
email: String!
}
Expand Down Expand Up @@ -69,7 +74,8 @@ type Query {
me: User!
users(after: String, before: String, first: Int, last: Int): UserConnection!
invitations(after: String, before: String, first: Int, last: Int): InvitationConnection!
jobRuns(ids: [ID!], after: String, before: String, first: Int, last: Int): JobRunConnection!
jobRuns(ids: [ID!], jobs: [String!], after: String, before: String, first: Int, last: Int): JobRunConnection!
jobRunStats(jobs: [String!]): JobStats!
emailSetting: EmailSetting
isEmailConfigured: Boolean! @deprecated
networkSetting: NetworkSetting!
Expand All @@ -79,6 +85,7 @@ type Query {
oauthCallbackUrl(provider: OAuthProvider!): String!
serverInfo: ServerInfo!
license: LicenseInfo!
jobs: [String!]!
}

input NetworkSettingInput {
Expand Down Expand Up @@ -249,9 +256,10 @@ type InvitationEdge {
cursor: String!
}

enum OAuthProvider {
GITHUB
GOOGLE
type JobStats {
success: Int!
failed: Int!
pending: Int!
}

type PageInfo {
Expand Down
9 changes: 9 additions & 0 deletions ee/tabby-webserver/src/schema/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ pub struct JobRun {
pub stderr: String,
}

#[derive(Debug, GraphQLObject)]
pub struct JobStats {
pub success: i32,
pub failed: i32,
pub pending: i32,
}

impl relay::NodeType for JobRun {
type Cursor = String;

Expand Down Expand Up @@ -47,9 +54,11 @@ pub trait JobService: Send + Sync {
async fn list_job_runs(
&self,
ids: Option<Vec<ID>>,
jobs: Option<Vec<String>>,
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Vec<JobRun>>;
async fn compute_job_run_stats(&self, jobs: Option<Vec<String>>) -> Result<JobStats>;
}
16 changes: 14 additions & 2 deletions ee/tabby-webserver/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ use self::{
NetworkSetting, NetworkSettingInput, SecuritySetting, SecuritySettingInput, SettingService,
},
};
use crate::schema::auth::{JWTPayload, OAuthCredential, OAuthProvider};
use crate::schema::{
auth::{JWTPayload, OAuthCredential, OAuthProvider},
job::JobStats,
};

pub trait ServiceLocator: Send + Sync {
fn auth(&self) -> Arc<dyn AuthenticationService>;
Expand Down Expand Up @@ -220,6 +223,7 @@ impl Query {
async fn job_runs(
ctx: &Context,
ids: Option<Vec<ID>>,
jobs: Option<Vec<String>>,
after: Option<String>,
before: Option<String>,
first: Option<i32>,
Expand All @@ -235,13 +239,17 @@ impl Query {
Ok(ctx
.locator
.job()
.list_job_runs(ids, after, before, first, last)
.list_job_runs(ids, jobs, after, before, first, last)
.await?)
},
)
.await
}

async fn job_run_stats(ctx: &Context, jobs: Option<Vec<String>>) -> FieldResult<JobStats> {
Ok(ctx.locator.job().compute_job_run_stats(jobs).await?)
}

async fn email_setting(ctx: &Context) -> Result<Option<EmailSetting>> {
check_admin(ctx).await?;
ctx.locator.email().read_email_setting().await
Expand Down Expand Up @@ -320,6 +328,10 @@ impl Query {
async fn license(ctx: &Context) -> Result<LicenseInfo> {
ctx.locator.license().read_license().await
}

async fn jobs() -> Result<Vec<String>> {
Ok(vec!["scheduler".into()])
}
}

#[derive(GraphQLObject)]
Expand Down
65 changes: 59 additions & 6 deletions ee/tabby-webserver/src/service/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tabby_db::DbConn;

use super::{graphql_pagination_to_filter, AsID, AsRowid};
use crate::schema::{
job::{JobRun, JobService},
job::{JobRun, JobService, JobStats},
Result,
};

Expand Down Expand Up @@ -37,6 +37,7 @@ impl JobService for DbConn {
async fn list_job_runs(
&self,
ids: Option<Vec<ID>>,
jobs: Option<Vec<String>>,
after: Option<String>,
before: Option<String>,
first: Option<usize>,
Expand All @@ -45,12 +46,21 @@ impl JobService for DbConn {
let (limit, skip_id, backwards) = graphql_pagination_to_filter(after, before, first, last)?;
let rowids = ids.map(|ids| ids.into_iter().filter_map(|x| x.as_rowid().ok()).collect());
Ok(self
.list_job_runs_with_filter(rowids, limit, skip_id, backwards)
.list_job_runs_with_filter(rowids, jobs, limit, skip_id, backwards)
.await?
.into_iter()
.map(Into::into)
.collect())
}

async fn compute_job_run_stats(&self, jobs: Option<Vec<String>>) -> Result<JobStats> {
let stats = (self as &DbConn).compute_job_stats(jobs).await?;
Ok(JobStats {
success: stats.success,
failed: stats.failed,
pending: stats.pending,
})
}
}

#[cfg(test)]
Expand All @@ -71,7 +81,7 @@ mod tests {
svc.complete_job_run(&id, 0).await.unwrap();

let job = svc
.list_job_runs(None, None, None, None, None)
.list_job_runs(None, None, None, None, None, None)
.await
.unwrap();
let job = job.first().unwrap();
Expand All @@ -80,10 +90,53 @@ mod tests {
assert_eq!(job.stderr, "stderr");
assert_eq!(job.exit_code, Some(0));

let job = svc
.list_job_runs(Some(vec![id]), None, None, None, None)
let jobs = svc
.list_job_runs(Some(vec![id]), None, None, None, None, None)
.await
.unwrap();
assert_eq!(jobs.len(), 1);

svc.create_job_run("another-job".into()).await.unwrap();
let jobs = svc
.list_job_runs(
None,
Some(vec!["another-job".into()]),
None,
None,
None,
None,
)
.await
.unwrap();
assert_eq!(jobs.len(), 1);
}

#[tokio::test]
async fn test_job_stats() {
let db = DbConn::new_in_memory().await.unwrap();
let jobs: Box<dyn JobService> = Box::new(db);

let id = jobs.create_job_run("test-job".into()).await.unwrap();
jobs.complete_job_run(&id, 0).await.unwrap();

let id2 = jobs.create_job_run("test-job".into()).await.unwrap();
jobs.complete_job_run(&id2, 1).await.unwrap();

jobs.create_job_run("pending-job".into()).await.unwrap();

let stats = jobs.compute_job_run_stats(None).await.unwrap();

assert_eq!(stats.success, 1);
assert_eq!(stats.failed, 1);
assert_eq!(stats.pending, 1);

let stats = jobs
.compute_job_run_stats(Some(vec!["test-job".into()]))
.await
.unwrap();
assert_eq!(job.len(), 1)

assert_eq!(stats.success, 1);
assert_eq!(stats.failed, 1);
assert_eq!(stats.pending, 0);
}
}

0 comments on commit be17d8b

Please sign in to comment.