Skip to content

Commit

Permalink
feat: secure suspended resume event + configurable timeout (windmill-…
Browse files Browse the repository at this point in the history
…labs#721)

* fix suspend count for early message + delete resume_job

* all

* done everything

* done everything
  • Loading branch information
rubenfiszel authored Oct 11, 2022
1 parent 9acee22 commit ff7fb0f
Show file tree
Hide file tree
Showing 16 changed files with 563 additions and 684 deletions.
742 changes: 252 additions & 490 deletions backend/Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ url = "^2"
async-oauth2 = "^0"
reqwest = { version = "^0", features = ["json"] }
time = "^0"
slack-http-verifier = "^0"
serde_urlencoded = "^0"
tokio-tar = "^0"
tempfile = "^3"
Expand All @@ -60,6 +59,8 @@ swc_ecma_parser = "^0"
swc_ecma_ast = "^0"
base64 = "^0"
unicode-general-category = "^0"
hmac = "^0"
sha2 = "^0"

sqlx = { version = "^0", features = ["offline", "macros", "migrate", "uuid", "json", "chrono", "postgres", "runtime-tokio-rustls"]}
dotenv = "^0"
Expand Down
24 changes: 22 additions & 2 deletions backend/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2551,7 +2551,7 @@ paths:
schema:
type: string

/w/{workspace}/jobs/resume/{id}:
/w/{workspace}/jobs/resume/{job_id}/{resume_id}/{signature}:
get:
summary: resume a job for a suspended flow
operationId: resumeSuspendedJob
Expand All @@ -2560,6 +2560,16 @@ paths:
parameters:
- $ref: "#/components/parameters/WorkspaceId"
- $ref: "#/components/parameters/JobId"
- name: resume_id
in: path
required: true
schema:
type: integer
- name: signature
in: path
required: true
schema:
type: string
- name: payload
in: query
schema:
Expand Down Expand Up @@ -2593,7 +2603,7 @@ paths:
schema:
type: string

/w/{workspace}/jobs/cancel/{id}:
/w/{workspace}/jobs/cancel/{job_id}/{resume_id}/{signature}:
get:
summary: cancel a job for a suspended flow
operationId: cancelSuspendedJob
Expand All @@ -2602,6 +2612,16 @@ paths:
parameters:
- $ref: "#/components/parameters/WorkspaceId"
- $ref: "#/components/parameters/JobId"
- name: resume_id
in: path
required: true
schema:
type: integer
- name: signature
in: path
required: true
schema:
type: string
- name: payload
in: query
schema:
Expand Down
64 changes: 32 additions & 32 deletions backend/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -1178,38 +1178,6 @@
},
"query": "SELECT * from resource_type WHERE name = $1 AND (workspace_id = $2 OR workspace_id = 'starter')"
},
"5a8eeff8ac0f52408720f8204bae085045cc4313bb31ab57442b093182633101": {
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Uuid"
},
{
"name": "flow_status",
"ordinal": 1,
"type_info": "Jsonb"
},
{
"name": "suspend",
"ordinal": 2,
"type_info": "Int4"
}
],
"nullable": [
false,
true,
false
],
"parameters": {
"Left": [
"Uuid"
]
}
},
"query": "\n SELECT id, flow_status, suspend\n FROM queue\n WHERE id = ( SELECT parent_job FROM queue WHERE id = $1\n UNION\n SELECT parent_job from completed_job WHERE id = $1 )\n FOR UPDATE\n "
},
"5b7a1d16d8109a65479ab33d411c60d14ea91d870fdff8606d7aa4ad39f0ba00": {
"describe": {
"columns": [
Expand Down Expand Up @@ -1887,6 +1855,38 @@
},
"query": "INSERT INTO completed_job AS cj\n ( workspace_id\n , id\n , parent_job\n , created_by\n , created_at\n , started_at\n , duration_ms\n , success\n , script_hash\n , script_path\n , args\n , result\n , logs\n , raw_code\n , canceled\n , canceled_by\n , canceled_reason\n , job_kind\n , schedule_path\n , permissioned_as\n , flow_status\n , raw_flow\n , is_flow_step\n , is_skipped\n , language )\n VALUES ($1, $2, $3, $4, $5, $6, EXTRACT(milliseconds FROM (now() - $6)), $7, $8, $9,$10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)\n ON CONFLICT (id) DO UPDATE SET success = $7, result = $11, logs = concat(cj.logs, $12)"
},
"853788436dbe987853433e8dc83665f68bd127de31d4c807abafeead896f6ac4": {
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Uuid"
},
{
"name": "flow_status",
"ordinal": 1,
"type_info": "Jsonb"
},
{
"name": "suspend",
"ordinal": 2,
"type_info": "Int4"
}
],
"nullable": [
false,
true,
false
],
"parameters": {
"Left": [
"Uuid"
]
}
},
"query": "\n SELECT id, flow_status, suspend\n FROM queue\n WHERE id = ( SELECT parent_job FROM queue WHERE id = $1 UNION ALL SELECT parent_job FROM completed_job WHERE id = $1)\n FOR UPDATE\n "
},
"88a3f58a1a315200fdd2e4bb8638246ee21818f8aaaf56f6e9d7ddce1490d886": {
"describe": {
"columns": [
Expand Down
31 changes: 0 additions & 31 deletions backend/src/email.rs

This file was deleted.

8 changes: 8 additions & 0 deletions backend/src/fixtures/base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ INSERT INTO workspace
(id, name, owner, domain)
VALUES ('test-workspace', 'test-workspace', 'test-user', null);

INSERT INTO usr(workspace_id, email, username, is_admin, role) VALUES
('test-workspace', '[email protected]', 'test-user', true, 'Admin');

INSERT INTO workspace_key(workspace_id, kind, key) VALUES
('test-workspace', 'cloud', 'test-key');

GRANT ALL PRIVILEGES ON TABLE workspace_key TO windmill_admin;
GRANT ALL PRIVILEGES ON TABLE workspace_key TO windmill_user;

CREATE FUNCTION "notify_insert_on_completed_job" ()
RETURNS TRIGGER AS $$
Expand Down
13 changes: 10 additions & 3 deletions backend/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ impl Default for ExponentialDelay {
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Suspend {
#[serde(skip_serializing_if = "Option::is_none")]
pub required_events: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<u32>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct FlowModule {
#[serde(default)]
Expand All @@ -159,9 +167,8 @@ pub struct FlowModule {
pub value: FlowModuleValue,
pub stop_after_if: Option<StopAfterIf>,
pub summary: Option<String>,
#[serde(default)]
#[serde(skip_serializing_if = "is_default")]
pub suspend: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub suspend: Option<Suspend>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<Retry>,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down
Loading

0 comments on commit ff7fb0f

Please sign in to comment.