Skip to content

Commit

Permalink
feat: email triggers (windmill-labs#4163)
Browse files Browse the repository at this point in the history
* feat: email triggers v0

* update docker compose to nginx with tcp reverse proxy + move smtp to private

* fix: open source build

* test: update ee ref for testing

* feat: use caddy with layer4

* fix: nit

* feat: configurable email domain

* fix: nit

* fix: nit

* fix: get l4 from main

* fix: default email domain to mail.domain

* update ee ref
  • Loading branch information
HugoCasa authored Aug 6, 2024
1 parent 50599a5 commit 80a4166
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 19 deletions.
10 changes: 10 additions & 0 deletions Caddyfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
{
layer4 {
:25 {
proxy {
to windmill_server:2525
}
}
}
}

{$BASE_URL} {
bind {$ADDRESS}
reverse_proxy /ws/* http://lsp:3001
Expand Down
14 changes: 14 additions & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ candle-nn = "0.3.0"
tiberius = { git = "https://github.com/prisma/tiberius", rev = "8f66a699dfa041e7b5f736c7e94f92c945453c9e", default-features = false, features = ["rustls", "tds73", "chrono", "sql-browser-tokio"]}
pin-project = "1"
indexmap = { version = "2.2.5", features = ["serde"]}
tokio-native-tls = "^0"
openssl = "=0.10"
mail-parser = "^0"

datafusion = "39.0.0"
object_store = { version = "0.10.0", features = ["aws", "azure"] }
Expand Down
2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1a2febc371c907789b25860366872fca888d6477
70e475a5cec356d2fe992038f303aea8988b5046
1 change: 1 addition & 0 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ Windmill Community Edition {GIT_VERSION}
server_killpill_rx,
base_internal_tx,
server_mode,
base_internal_url.clone(),
)
.await?;
} else {
Expand Down
1 change: 1 addition & 0 deletions backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl ApiServer {
rx,
port_tx,
false,
format!("http://localhost:{}", addr.port()),
));

_port_rx.await.unwrap();
Expand Down
4 changes: 4 additions & 0 deletions backend/windmill-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ tracing-subscriber.workspace = true
quick_cache.workspace = true
rand.workspace = true
time.workspace = true
native-tls.workspace = true
tokio-native-tls.workspace = true
openssl.workspace = true
mail-parser = { workspace = true, features = ["serde_support"] }
magic-crypt.workspace = true
tempfile.workspace = true
tokio-util.workspace = true
Expand Down
81 changes: 77 additions & 4 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ pub struct ListableCompletedJob {
pub labels: Option<serde_json::Value>,
}

#[derive(Deserialize, Clone)]
#[derive(Deserialize, Clone, Default)]
pub struct RunJobQuery {
scheduled_for: Option<chrono::DateTime<chrono::Utc>>,
scheduled_in_secs: Option<i64>,
Expand Down Expand Up @@ -2751,6 +2751,23 @@ pub async fn run_flow_by_path(
Path((w_id, flow_path)): Path<(String, StripPath)>,
Query(run_query): Query<RunJobQuery>,
args: PushArgsOwned,
) -> error::Result<(StatusCode, String)> {
run_flow_by_path_inner(
authed, db, user_db, rsmq, w_id, flow_path, run_query, args, None,
)
.await
}

pub async fn run_flow_by_path_inner(
authed: ApiAuthed,
db: DB,
user_db: UserDB,
rsmq: Option<rsmq_async::MultiplexedRsmq>,
w_id: String,
flow_path: StripPath,
run_query: RunJobQuery,
args: PushArgsOwned,
label_prefix: Option<String>,
) -> error::Result<(StatusCode, String)> {
#[cfg(feature = "enterprise")]
check_license_key_valid().await?;
Expand Down Expand Up @@ -2784,7 +2801,9 @@ pub async fn run_flow_by_path(
&w_id,
JobPayload::Flow { path: flow_path.to_string(), dedicated_worker },
PushArgs { args: &args.args, extra: args.extra },
authed.display_username(),
&label_prefix
.map(|x| x + authed.display_username())
.unwrap_or_else(|| authed.display_username().to_string()),
&authed.email,
username_to_permissioned_as(&authed.username),
scheduled_for,
Expand Down Expand Up @@ -2910,6 +2929,31 @@ pub async fn run_script_by_path(
Path((w_id, script_path)): Path<(String, StripPath)>,
Query(run_query): Query<RunJobQuery>,
args: PushArgsOwned,
) -> error::Result<(StatusCode, String)> {
run_script_by_path_inner(
authed,
db,
user_db,
rsmq,
w_id,
script_path,
run_query,
args,
None,
)
.await
}

pub async fn run_script_by_path_inner(
authed: ApiAuthed,
db: DB,
user_db: UserDB,
rsmq: Option<rsmq_async::MultiplexedRsmq>,
w_id: String,
script_path: StripPath,
run_query: RunJobQuery,
args: PushArgsOwned,
label_prefix: Option<String>,
) -> error::Result<(StatusCode, String)> {
#[cfg(feature = "enterprise")]
check_license_key_valid().await?;
Expand All @@ -2935,7 +2979,9 @@ pub async fn run_script_by_path(
&w_id,
job_payload,
PushArgs { args: &args.args, extra: args.extra },
authed.display_username(),
&label_prefix
.map(|x| x + authed.display_username())
.unwrap_or_else(|| authed.display_username().to_string()),
&authed.email,
username_to_permissioned_as(&authed.username),
scheduled_for,
Expand Down Expand Up @@ -4352,6 +4398,31 @@ pub async fn run_job_by_hash(
Path((w_id, script_hash)): Path<(String, ScriptHash)>,
Query(run_query): Query<RunJobQuery>,
args: PushArgsOwned,
) -> error::Result<(StatusCode, String)> {
run_job_by_hash_inner(
authed,
db,
user_db,
rsmq,
w_id,
script_hash,
run_query,
args,
None,
)
.await
}

pub async fn run_job_by_hash_inner(
authed: ApiAuthed,
db: DB,
user_db: UserDB,
rsmq: Option<rsmq_async::MultiplexedRsmq>,
w_id: String,
script_hash: ScriptHash,
run_query: RunJobQuery,
args: PushArgsOwned,
label_prefix: Option<String>,
) -> error::Result<(StatusCode, String)> {
#[cfg(feature = "enterprise")]
check_license_key_valid().await?;
Expand Down Expand Up @@ -4398,7 +4469,9 @@ pub async fn run_job_by_hash(
priority,
},
PushArgs { args: &args.args, extra: args.extra },
authed.display_username(),
&label_prefix
.map(|x| x + authed.display_username())
.unwrap_or_else(|| authed.display_username().to_string()),
&authed.email,
username_to_permissioned_as(&authed.username),
scheduled_for,
Expand Down
20 changes: 17 additions & 3 deletions backend/windmill-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::ee::ExternalJwks;
#[cfg(feature = "embedding")]
use crate::embeddings::load_embeddings_db;
use crate::oauth2_ee::AllClients;
use crate::smtp_server_ee::SmtpServer;
use crate::tracing_init::MyOnFailure;
use crate::{
oauth2_ee::SlackVerifier,
Expand Down Expand Up @@ -76,6 +77,7 @@ mod schedule;
mod scim_ee;
mod scripts;
mod settings;
pub mod smtp_server_ee;
mod static_assets;
mod stripe_ee;
mod tracing_init;
Expand Down Expand Up @@ -163,6 +165,7 @@ pub async fn run_server(
mut rx: tokio::sync::broadcast::Receiver<()>,
port_tx: tokio::sync::oneshot::Sender<String>,
server_mode: bool,
base_internal_url: String,
) -> anyhow::Result<()> {
if let Some(mut rsmq) = rsmq.clone() {
for tag in ALL_TAGS.read().await.iter() {
Expand Down Expand Up @@ -194,8 +197,8 @@ pub async fn run_server(

let middleware_stack = ServiceBuilder::new()
.layer(Extension(db.clone()))
.layer(Extension(rsmq))
.layer(Extension(user_db))
.layer(Extension(rsmq.clone()))
.layer(Extension(user_db.clone()))
.layer(Extension(auth_cache.clone()))
.layer(Extension(index_reader))
.layer(Extension(index_writer))
Expand All @@ -214,7 +217,18 @@ pub async fn run_server(

if server_mode {
#[cfg(feature = "embedding")]
load_embeddings_db(&db)
load_embeddings_db(&db);

let smtp_server = Arc::new(SmtpServer {
db: db.clone(),
user_db: user_db,
auth_cache: auth_cache.clone(),
rsmq: rsmq,
base_internal_url: base_internal_url.clone(),
});
if let Err(err) = smtp_server.start_listener_thread(addr).await {
tracing::error!("Error starting SMTP server: {err:#}");
}
}

let job_helpers_service = {
Expand Down
6 changes: 5 additions & 1 deletion backend/windmill-api/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use axum::{
use serde::Deserialize;
use windmill_common::{
error::{self, JsonResult, Result},
global_settings::{AUTOMATE_USERNAME_CREATION_SETTING, ENV_SETTINGS, HUB_BASE_URL_SETTING},
global_settings::{
AUTOMATE_USERNAME_CREATION_SETTING, EMAIL_DOMAIN_SETTING, ENV_SETTINGS,
HUB_BASE_URL_SETTING,
},
server::Smtp,
utils::send_email,
};
Expand Down Expand Up @@ -256,6 +259,7 @@ pub async fn get_global_setting(
&& !key.starts_with("default_recovery_handler_")
&& key != AUTOMATE_USERNAME_CREATION_SETTING
&& key != HUB_BASE_URL_SETTING
&& key != EMAIL_DOMAIN_SETTING
{
require_super_admin(&db, &authed.email).await?;
}
Expand Down
17 changes: 17 additions & 0 deletions backend/windmill-api/src/smtp_server_ee.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::{db::DB, users::AuthCache};
use std::{net::SocketAddr, sync::Arc};
use windmill_common::db::UserDB;

pub struct SmtpServer {
pub auth_cache: Arc<AuthCache>,
pub db: DB,
pub user_db: UserDB,
pub rsmq: Option<rsmq_async::MultiplexedRsmq>,
pub base_internal_url: String,
}

impl SmtpServer {
pub async fn start_listener_thread(self: Arc<Self>, _addr: SocketAddr) -> anyhow::Result<()> {
Err(anyhow::anyhow!("Implementation not open source"))
}
}
1 change: 1 addition & 0 deletions backend/windmill-common/src/global_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub const HUB_BASE_URL_SETTING: &str = "hub_base_url";
pub const CRITICAL_ERROR_CHANNELS_SETTING: &str = "critical_error_channels";
pub const DEV_INSTANCE_SETTING: &str = "dev_instance";
pub const JWT_SECRET_SETTING: &str = "jwt_secret";
pub const EMAIL_DOMAIN_SETTING: &str = "email_domain";

pub const ENV_SETTINGS: [&str; 50] = [
"DISABLE_NSJAIL",
Expand Down
28 changes: 28 additions & 0 deletions backend/windmill-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2278,6 +2278,34 @@ async fn handle_queued_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>(
return Err(Error::ExecutionErr(e.to_string()));
}

#[cfg(not(feature = "enterprise"))]
if job.created_by.starts_with("email-trigger-") {
let daily_count = sqlx::query!(
"SELECT value FROM metrics WHERE id = 'email_trigger_usage' AND created_at > NOW() - INTERVAL '1 day' ORDER BY created_at DESC LIMIT 1"
).fetch_optional(db).await?.map(|x| serde_json::from_value::<i64>(x.value).unwrap_or(1));

if let Some(count) = daily_count {
if count >= 100 {
return Err(error::Error::QuotaExceeded(format!(
"Email trigger usage limit of 100 per day has been reached."
)));
} else {
sqlx::query!(
"UPDATE metrics SET value = $1 WHERE id = 'email_trigger_usage' AND created_at > NOW() - INTERVAL '1 day'",
serde_json::json!(count + 1)
)
.execute(db)
.await?;
}
} else {
sqlx::query!(
"INSERT INTO metrics (id, value) VALUES ('email_trigger_usage', to_jsonb(1))"
)
.execute(db)
.await?;
}
}

let step = if job.is_flow_step {
let r = update_flow_status_in_progress(
db,
Expand Down
5 changes: 3 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:
restart: unless-stopped
expose:
- 8000
- 2525
environment:
- DATABASE_URL=${DATABASE_URL}
- MODE=server
Expand Down Expand Up @@ -154,16 +155,16 @@ services:
- 3002

caddy:
image: caddy:2.5.2-alpine
image: ghcr.io/windmill-labs/caddy-l4:latest
restart: unless-stopped

# Configure the mounted Caddyfile and the exposed ports or use another reverse proxy if needed
volumes:
- ./Caddyfile:/etc/caddy/Caddyfile
# - ./certs:/certs # Provide custom certificate files like cert.pem and key.pem to enable HTTPS - See the corresponding section in the Caddyfile
ports:
# To change the exposed port, simply change 80:80 to <desired_port>:80. No other changes needed
- 80:80
- 25:25
# - 443:443 # Uncomment to enable HTTPS handling by Caddy
environment:
- BASE_URL=":80"
Expand Down
Loading

0 comments on commit 80a4166

Please sign in to comment.