From 80a41669b247ed9c70d348a9660b803aa215476b Mon Sep 17 00:00:00 2001 From: HugoCasa Date: Tue, 6 Aug 2024 16:17:33 +0200 Subject: [PATCH] feat: email triggers (#4163) * feat: email triggers v0 * update docker compose to nginx with tcp reverse proxy + move smtp to private * fix: open source build * test: update ee ref for testing * feat: use caddy with layer4 * fix: nit * feat: configurable email domain * fix: nit * fix: nit * fix: get l4 from main * fix: default email domain to mail.domain * update ee ref --- Caddyfile | 10 +++ backend/Cargo.lock | 14 ++++ backend/Cargo.toml | 3 + backend/ee-repo-ref.txt | 2 +- backend/src/main.rs | 1 + backend/tests/worker.rs | 1 + backend/windmill-api/Cargo.toml | 4 + backend/windmill-api/src/jobs.rs | 81 ++++++++++++++++++- backend/windmill-api/src/lib.rs | 20 ++++- backend/windmill-api/src/settings.rs | 6 +- backend/windmill-api/src/smtp_server_ee.rs | 17 ++++ .../windmill-common/src/global_settings.rs | 1 + backend/windmill-worker/src/worker.rs | 28 +++++++ docker-compose.yml | 5 +- .../components/details/WebhooksPanel.svelte | 65 +++++++++++++-- .../src/lib/components/instanceSettings.ts | 9 +++ 16 files changed, 248 insertions(+), 19 deletions(-) create mode 100644 backend/windmill-api/src/smtp_server_ee.rs diff --git a/Caddyfile b/Caddyfile index 8d3cc962653da..67925e6492865 100644 --- a/Caddyfile +++ b/Caddyfile @@ -1,3 +1,13 @@ +{ + layer4 { + :25 { + proxy { + to windmill_server:2525 + } + } + } +} + {$BASE_URL} { bind {$ADDRESS} reverse_proxy /ws/* http://lsp:3001 diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 4d34ff0e8c17c..d9bf8090b57b7 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -4856,6 +4856,16 @@ dependencies = [ "gethostname", ] +[[package]] +name = "mail-parser" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5a1335c3a964788c90cb42ae04a34b5f2628e89566949ce3bd4ada695c0bcd" +dependencies = [ + "encoding_rs", + "serde", +] + [[package]] name = "mail-send" version = "0.4.9" @@ -10455,9 +10465,12 @@ dependencies = [ "jsonwebtoken", "lazy_static", "magic-crypt", + "mail-parser", "mime_guess", + "native-tls", "object_store", "openidconnect", + "openssl", "pin-project", "prometheus", "quick_cache", @@ -10479,6 +10492,7 @@ dependencies = [ "tinyvector", "tokenizers", "tokio", + "tokio-native-tls", "tokio-tar", "tokio-util", "tower", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index aebecc37b79a4..e8f53be76f0b9 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -240,6 +240,9 @@ candle-nn = "0.3.0" tiberius = { git = "https://github.com/prisma/tiberius", rev = "8f66a699dfa041e7b5f736c7e94f92c945453c9e", default-features = false, features = ["rustls", "tds73", "chrono", "sql-browser-tokio"]} pin-project = "1" indexmap = { version = "2.2.5", features = ["serde"]} +tokio-native-tls = "^0" +openssl = "=0.10" +mail-parser = "^0" datafusion = "39.0.0" object_store = { version = "0.10.0", features = ["aws", "azure"] } diff --git a/backend/ee-repo-ref.txt b/backend/ee-repo-ref.txt index 428f05b845728..e02e4aa6a6722 100644 --- a/backend/ee-repo-ref.txt +++ b/backend/ee-repo-ref.txt @@ -1 +1 @@ -1a2febc371c907789b25860366872fca888d6477 +70e475a5cec356d2fe992038f303aea8988b5046 \ No newline at end of file diff --git a/backend/src/main.rs b/backend/src/main.rs index 90758997cbb7a..c26695d7c57a6 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -405,6 +405,7 @@ Windmill Community Edition {GIT_VERSION} server_killpill_rx, base_internal_tx, server_mode, + base_internal_url.clone(), ) .await?; } else { diff --git a/backend/tests/worker.rs b/backend/tests/worker.rs index 48eb9f2642688..7ed4e56085d7c 100644 --- a/backend/tests/worker.rs +++ b/backend/tests/worker.rs @@ -128,6 +128,7 @@ impl ApiServer { rx, port_tx, false, + format!("http://localhost:{}", addr.port()), )); _port_rx.await.unwrap(); diff --git a/backend/windmill-api/Cargo.toml b/backend/windmill-api/Cargo.toml index a443a19698c29..ce5addbd38259 100644 --- a/backend/windmill-api/Cargo.toml +++ b/backend/windmill-api/Cargo.toml @@ -59,6 +59,10 @@ tracing-subscriber.workspace = true quick_cache.workspace = true rand.workspace = true time.workspace = true +native-tls.workspace = true +tokio-native-tls.workspace = true +openssl.workspace = true +mail-parser = { workspace = true, features = ["serde_support"] } magic-crypt.workspace = true tempfile.workspace = true tokio-util.workspace = true diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 706f588c9cc95..c102ec6fb41e3 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -1034,7 +1034,7 @@ pub struct ListableCompletedJob { pub labels: Option, } -#[derive(Deserialize, Clone)] +#[derive(Deserialize, Clone, Default)] pub struct RunJobQuery { scheduled_for: Option>, scheduled_in_secs: Option, @@ -2751,6 +2751,23 @@ pub async fn run_flow_by_path( Path((w_id, flow_path)): Path<(String, StripPath)>, Query(run_query): Query, args: PushArgsOwned, +) -> error::Result<(StatusCode, String)> { + run_flow_by_path_inner( + authed, db, user_db, rsmq, w_id, flow_path, run_query, args, None, + ) + .await +} + +pub async fn run_flow_by_path_inner( + authed: ApiAuthed, + db: DB, + user_db: UserDB, + rsmq: Option, + w_id: String, + flow_path: StripPath, + run_query: RunJobQuery, + args: PushArgsOwned, + label_prefix: Option, ) -> error::Result<(StatusCode, String)> { #[cfg(feature = "enterprise")] check_license_key_valid().await?; @@ -2784,7 +2801,9 @@ pub async fn run_flow_by_path( &w_id, JobPayload::Flow { path: flow_path.to_string(), dedicated_worker }, PushArgs { args: &args.args, extra: args.extra }, - authed.display_username(), + &label_prefix + .map(|x| x + authed.display_username()) + .unwrap_or_else(|| authed.display_username().to_string()), &authed.email, username_to_permissioned_as(&authed.username), scheduled_for, @@ -2910,6 +2929,31 @@ pub async fn run_script_by_path( Path((w_id, script_path)): Path<(String, StripPath)>, Query(run_query): Query, args: PushArgsOwned, +) -> error::Result<(StatusCode, String)> { + run_script_by_path_inner( + authed, + db, + user_db, + rsmq, + w_id, + script_path, + run_query, + args, + None, + ) + .await +} + +pub async fn run_script_by_path_inner( + authed: ApiAuthed, + db: DB, + user_db: UserDB, + rsmq: Option, + w_id: String, + script_path: StripPath, + run_query: RunJobQuery, + args: PushArgsOwned, + label_prefix: Option, ) -> error::Result<(StatusCode, String)> { #[cfg(feature = "enterprise")] check_license_key_valid().await?; @@ -2935,7 +2979,9 @@ pub async fn run_script_by_path( &w_id, job_payload, PushArgs { args: &args.args, extra: args.extra }, - authed.display_username(), + &label_prefix + .map(|x| x + authed.display_username()) + .unwrap_or_else(|| authed.display_username().to_string()), &authed.email, username_to_permissioned_as(&authed.username), scheduled_for, @@ -4352,6 +4398,31 @@ pub async fn run_job_by_hash( Path((w_id, script_hash)): Path<(String, ScriptHash)>, Query(run_query): Query, args: PushArgsOwned, +) -> error::Result<(StatusCode, String)> { + run_job_by_hash_inner( + authed, + db, + user_db, + rsmq, + w_id, + script_hash, + run_query, + args, + None, + ) + .await +} + +pub async fn run_job_by_hash_inner( + authed: ApiAuthed, + db: DB, + user_db: UserDB, + rsmq: Option, + w_id: String, + script_hash: ScriptHash, + run_query: RunJobQuery, + args: PushArgsOwned, + label_prefix: Option, ) -> error::Result<(StatusCode, String)> { #[cfg(feature = "enterprise")] check_license_key_valid().await?; @@ -4398,7 +4469,9 @@ pub async fn run_job_by_hash( priority, }, PushArgs { args: &args.args, extra: args.extra }, - authed.display_username(), + &label_prefix + .map(|x| x + authed.display_username()) + .unwrap_or_else(|| authed.display_username().to_string()), &authed.email, username_to_permissioned_as(&authed.username), scheduled_for, diff --git a/backend/windmill-api/src/lib.rs b/backend/windmill-api/src/lib.rs index 1540395ed43d6..ed842aeb42a67 100644 --- a/backend/windmill-api/src/lib.rs +++ b/backend/windmill-api/src/lib.rs @@ -12,6 +12,7 @@ use crate::ee::ExternalJwks; #[cfg(feature = "embedding")] use crate::embeddings::load_embeddings_db; use crate::oauth2_ee::AllClients; +use crate::smtp_server_ee::SmtpServer; use crate::tracing_init::MyOnFailure; use crate::{ oauth2_ee::SlackVerifier, @@ -76,6 +77,7 @@ mod schedule; mod scim_ee; mod scripts; mod settings; +pub mod smtp_server_ee; mod static_assets; mod stripe_ee; mod tracing_init; @@ -163,6 +165,7 @@ pub async fn run_server( mut rx: tokio::sync::broadcast::Receiver<()>, port_tx: tokio::sync::oneshot::Sender, server_mode: bool, + base_internal_url: String, ) -> anyhow::Result<()> { if let Some(mut rsmq) = rsmq.clone() { for tag in ALL_TAGS.read().await.iter() { @@ -194,8 +197,8 @@ pub async fn run_server( let middleware_stack = ServiceBuilder::new() .layer(Extension(db.clone())) - .layer(Extension(rsmq)) - .layer(Extension(user_db)) + .layer(Extension(rsmq.clone())) + .layer(Extension(user_db.clone())) .layer(Extension(auth_cache.clone())) .layer(Extension(index_reader)) .layer(Extension(index_writer)) @@ -214,7 +217,18 @@ pub async fn run_server( if server_mode { #[cfg(feature = "embedding")] - load_embeddings_db(&db) + load_embeddings_db(&db); + + let smtp_server = Arc::new(SmtpServer { + db: db.clone(), + user_db: user_db, + auth_cache: auth_cache.clone(), + rsmq: rsmq, + base_internal_url: base_internal_url.clone(), + }); + if let Err(err) = smtp_server.start_listener_thread(addr).await { + tracing::error!("Error starting SMTP server: {err:#}"); + } } let job_helpers_service = { diff --git a/backend/windmill-api/src/settings.rs b/backend/windmill-api/src/settings.rs index 6650b60ae53b3..c0a8c2960fbc1 100644 --- a/backend/windmill-api/src/settings.rs +++ b/backend/windmill-api/src/settings.rs @@ -24,7 +24,10 @@ use axum::{ use serde::Deserialize; use windmill_common::{ error::{self, JsonResult, Result}, - global_settings::{AUTOMATE_USERNAME_CREATION_SETTING, ENV_SETTINGS, HUB_BASE_URL_SETTING}, + global_settings::{ + AUTOMATE_USERNAME_CREATION_SETTING, EMAIL_DOMAIN_SETTING, ENV_SETTINGS, + HUB_BASE_URL_SETTING, + }, server::Smtp, utils::send_email, }; @@ -256,6 +259,7 @@ pub async fn get_global_setting( && !key.starts_with("default_recovery_handler_") && key != AUTOMATE_USERNAME_CREATION_SETTING && key != HUB_BASE_URL_SETTING + && key != EMAIL_DOMAIN_SETTING { require_super_admin(&db, &authed.email).await?; } diff --git a/backend/windmill-api/src/smtp_server_ee.rs b/backend/windmill-api/src/smtp_server_ee.rs new file mode 100644 index 0000000000000..88837c5974102 --- /dev/null +++ b/backend/windmill-api/src/smtp_server_ee.rs @@ -0,0 +1,17 @@ +use crate::{db::DB, users::AuthCache}; +use std::{net::SocketAddr, sync::Arc}; +use windmill_common::db::UserDB; + +pub struct SmtpServer { + pub auth_cache: Arc, + pub db: DB, + pub user_db: UserDB, + pub rsmq: Option, + pub base_internal_url: String, +} + +impl SmtpServer { + pub async fn start_listener_thread(self: Arc, _addr: SocketAddr) -> anyhow::Result<()> { + Err(anyhow::anyhow!("Implementation not open source")) + } +} diff --git a/backend/windmill-common/src/global_settings.rs b/backend/windmill-common/src/global_settings.rs index 6e86646dcc4c5..a49b755ddb3ba 100644 --- a/backend/windmill-common/src/global_settings.rs +++ b/backend/windmill-common/src/global_settings.rs @@ -28,6 +28,7 @@ pub const HUB_BASE_URL_SETTING: &str = "hub_base_url"; pub const CRITICAL_ERROR_CHANNELS_SETTING: &str = "critical_error_channels"; pub const DEV_INSTANCE_SETTING: &str = "dev_instance"; pub const JWT_SECRET_SETTING: &str = "jwt_secret"; +pub const EMAIL_DOMAIN_SETTING: &str = "email_domain"; pub const ENV_SETTINGS: [&str; 50] = [ "DISABLE_NSJAIL", diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index e274f1790da24..af4b4a9234194 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -2278,6 +2278,34 @@ async fn handle_queued_job( return Err(Error::ExecutionErr(e.to_string())); } + #[cfg(not(feature = "enterprise"))] + if job.created_by.starts_with("email-trigger-") { + let daily_count = sqlx::query!( + "SELECT value FROM metrics WHERE id = 'email_trigger_usage' AND created_at > NOW() - INTERVAL '1 day' ORDER BY created_at DESC LIMIT 1" + ).fetch_optional(db).await?.map(|x| serde_json::from_value::(x.value).unwrap_or(1)); + + if let Some(count) = daily_count { + if count >= 100 { + return Err(error::Error::QuotaExceeded(format!( + "Email trigger usage limit of 100 per day has been reached." + ))); + } else { + sqlx::query!( + "UPDATE metrics SET value = $1 WHERE id = 'email_trigger_usage' AND created_at > NOW() - INTERVAL '1 day'", + serde_json::json!(count + 1) + ) + .execute(db) + .await?; + } + } else { + sqlx::query!( + "INSERT INTO metrics (id, value) VALUES ('email_trigger_usage', to_jsonb(1))" + ) + .execute(db) + .await?; + } + } + let step = if job.is_flow_step { let r = update_flow_status_in_progress( db, diff --git a/docker-compose.yml b/docker-compose.yml index a867f9a29233d..0aefa4549617b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,7 @@ services: restart: unless-stopped expose: - 8000 + - 2525 environment: - DATABASE_URL=${DATABASE_URL} - MODE=server @@ -154,9 +155,8 @@ services: - 3002 caddy: - image: caddy:2.5.2-alpine + image: ghcr.io/windmill-labs/caddy-l4:latest restart: unless-stopped - # Configure the mounted Caddyfile and the exposed ports or use another reverse proxy if needed volumes: - ./Caddyfile:/etc/caddy/Caddyfile @@ -164,6 +164,7 @@ services: ports: # To change the exposed port, simply change 80:80 to :80. No other changes needed - 80:80 + - 25:25 # - 443:443 # Uncomment to enable HTTPS handling by Caddy environment: - BASE_URL=":80" diff --git a/frontend/src/lib/components/details/WebhooksPanel.svelte b/frontend/src/lib/components/details/WebhooksPanel.svelte index 24dac2fba5ef2..963f70f055e37 100644 --- a/frontend/src/lib/components/details/WebhooksPanel.svelte +++ b/frontend/src/lib/components/details/WebhooksPanel.svelte @@ -18,6 +18,8 @@ import ClipboardPanel from './ClipboardPanel.svelte' import { copyToClipboard, generateRandomString } from '$lib/utils' import HighlightTheme from '../HighlightTheme.svelte' + import Alert from '../common/alert/Alert.svelte' + import { SettingService } from '$lib/gen' let userSettings: UserSettings @@ -28,6 +30,8 @@ export let hash: string | undefined = undefined export let path: string + let selectedTab: string = 'rest' + let webhooks: { async: { hash?: string @@ -40,6 +44,15 @@ } } + let emailDomain: string = "mail." + $page.url.hostname + async function getEmailDomain() { + emailDomain = + ((await SettingService.getGlobal({ + key: 'email_domain' + })) as any) ?? ("mail." + $page.url.hostname) + } + getEmailDomain() + $: webhooks = isFlow ? computeFlowWebhooks(path) : computeScriptWebhooks(hash, path) function computeScriptWebhooks(hash: string | undefined, path: string) { @@ -82,6 +95,10 @@ requestType = 'hash' } + $: if (webhookType === 'sync' && selectedTab === 'email') { + webhookType = 'async' + } + $: url = webhooks[webhookType][requestType] + (tokenType === 'query' @@ -108,6 +125,12 @@ return headers } + function emailAddress() { + return `${$workspaceStore}+${ + requestType === 'hash' ? 'hash.' + hash : (isFlow ? 'flow.' : '') + path.replaceAll('/', '.') + }+${token}@${emailDomain}` + } + function fetchCode() { if (webhookType === 'sync') { return ` @@ -261,6 +284,7 @@ done` label="Sync" value="sync" tooltip="Triggers the execution, wait for the job to complete and return it as a response." + disabled={selectedTab === 'email'} /> @@ -291,22 +315,25 @@ done` /> -
-
Token configuration
- - - - -
+ {#if selectedTab !== 'email'} +
+
Token configuration
+ + + + +
+ {/if} - + REST {#if SCRIPT_VIEW_SHOW_EXAMPLE_CURL} Curl {/if} Fetch + Email {#key token} @@ -365,6 +392,28 @@ done` {/key}{/key}{/key}{/key} {/key} + +
+ {#key args} + {#key requestType} + {#key webhookType} + {#key tokenType} + {#key token} +
+ +
+ {/key} + {/key} + {/key} + {/key} + {/key} + + To trigger the job by email, send an email to the address above. The job will receive + two arguments: `raw_email` containing the raw email as string, and `parsed_email` + containing the parsed email as an object. + +
+
{/key}
diff --git a/frontend/src/lib/components/instanceSettings.ts b/frontend/src/lib/components/instanceSettings.ts index da244110c61f2..5f1d4f765f0cb 100644 --- a/frontend/src/lib/components/instanceSettings.ts +++ b/frontend/src/lib/components/instanceSettings.ts @@ -44,6 +44,15 @@ export const settings: Record = { !value?.endsWith(' ') : false }, + { + label: 'Email domain', + description: + 'Domain to display in webhooks for email triggers, default is the webpage domain prefixed by "mail."', + key: 'email_domain', + fieldType: 'text', + storage: 'setting', + placeholder: 'mail.windmill.com' + }, { label: 'Request Size Limit In MB', description: 'Maximum size of HTTP requests in MB.',