Skip to content

Commit

Permalink
send a webhook on new user + lazy static refactor (windmill-labs#1203)
Browse files Browse the repository at this point in the history
* supercharge

* supercharge

* progress

* progress

* display config

* display config

* display config

* display config

* fix extensions

* fix build

* disable nsjail = false for test
  • Loading branch information
rubenfiszel authored Feb 15, 2023
1 parent 81f64a4 commit c0b87cc
Show file tree
Hide file tree
Showing 31 changed files with 684 additions and 993 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backend-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ jobs:
backend -> target
- name: cargo test
timeout-minutes: 10
run: mkdir frontend/build && cd backend && touch windmill-api/openapi-deref.yaml && DATABASE_URL=postgres://postgres:changeme@postgres:5432/windmill cargo test --all -- --nocapture
run: mkdir frontend/build && cd backend && touch windmill-api/openapi-deref.yaml && DATABASE_URL=postgres://postgres:changeme@postgres:5432/windmill DISABLE_NSJAIL=false cargo test --all -- --nocapture
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ you to have it being synced automatically everyday.
| Environment Variable name | Default | Description | Api Server/Worker/All |
| ------------------------- | ---------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------- |
| DATABASE_URL | | The Postgres database url. | All |
| DISABLE_NSJAIL | true | Disable Nsjail Sandboxing | |
| DISABLE_NSJAIL | true | Disable Nsjail Sandboxing | Worker |
| PORT | 8000 | Exposed port | Server | |
| NUM_WORKERS | 3 | The number of worker per Worker instance (set to 1 on Eks to have 1 pod = 1 worker, set to 0 for an API only instance) | Worker |
| DISABLE_SERVER | false | Binary would operate as a worker only instance | Worker |
| METRICS_ADDR | None | The socket addr at which to expose Prometheus metrics at the /metrics path. Set to "true" to expose it on port 8001 | All |
Expand All @@ -318,8 +319,7 @@ you to have it being synced automatically everyday.
| S3_CACHE_BUCKET (EE only) | None | The S3 bucket to sync the cache of the workers to | Worker |
| TAR_CACHE_RATE (EE only) | 100 | The rate at which to tar the cache of the workers. 100 means every 100th job in average (uniformly randomly distributed). | Worker |
| SLACK_SIGNING_SECRET | None | The signing secret of your Slack app. See [Slack documentation](https://api.slack.com/authentication/verifying-requests-from-slack) | Server |
| COOKIE_DOMAIN | None | The domain of the cookie. If not set, the cookie will be set by the browser based on the full origin | Server |
| SERVE_CSP | None | The CSP directives to use when serving the frontend static assets | Server |
| COOKIE_DOMAIN | None | The domain of the cookie. If not set, the cookie will be set by the browser based on the full origin | Server | |
| DENO_PATH | /usr/bin/deno | The path to the deno binary. | Worker |
| PYTHON_PATH | /usr/local/bin/python3 | The path to the python binary. | Worker |
| GO_PATH | /usr/bin/go | The path to the go binary. | Worker |
Expand Down
1 change: 1 addition & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

166 changes: 81 additions & 85 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use std::net::SocketAddr;
use git_version::git_version;
use sqlx::{Pool, Postgres};
use windmill_common::utils::rd_string;
use windmill_worker::WorkerConfig;

const GIT_VERSION: &str = git_version!(args = ["--tag", "--always"], fallback = "unknown-version");
const DEFAULT_NUM_WORKERS: usize = 3;
const DEFAULT_PORT: u16 = 8000;

mod ee;

Expand All @@ -26,7 +27,7 @@ async fn main() -> anyhow::Result<()> {
let num_workers = std::env::var("NUM_WORKERS")
.ok()
.and_then(|x| x.parse::<i32>().ok())
.unwrap_or(windmill_common::DEFAULT_NUM_WORKERS as i32);
.unwrap_or(DEFAULT_NUM_WORKERS as i32);

let metrics_addr: Option<SocketAddr> = std::env::var("METRICS_ADDR")
.ok()
Expand All @@ -38,6 +39,13 @@ async fn main() -> anyhow::Result<()> {
.transpose()?
.flatten();

let port: u16 = std::env::var("PORT")
.ok()
.and_then(|x| x.parse::<u16>().ok())
.unwrap_or(DEFAULT_PORT as u16);
let base_internal_url: String = std::env::var("BASE_INTERNAL_URL")
.unwrap_or_else(|_| format!("http://localhost:{}", port.to_string()));

let server_mode = !std::env::var("DISABLE_SERVER")
.ok()
.and_then(|x| x.parse::<bool>().ok())
Expand All @@ -52,58 +60,25 @@ async fn main() -> anyhow::Result<()> {
let (tx, rx) = tokio::sync::broadcast::channel::<()>(3);
let shutdown_signal = windmill_common::shutdown_signal(tx);

let base_url = std::env::var("BASE_URL").unwrap_or_else(|_| "http://localhost".to_string());

let base_internal_url =
std::env::var("BASE_INTERNAL_URL").unwrap_or_else(|_| "http://localhost:8000".to_string());
let timeout = std::env::var("TIMEOUT")
.ok()
.and_then(|x| x.parse::<i32>().ok())
.unwrap_or(windmill_common::DEFAULT_TIMEOUT);

if server_mode || num_workers > 0 {
let addr = SocketAddr::from(([0, 0, 0, 0], 8000));
let addr = SocketAddr::from(([0, 0, 0, 0], port));

let base_url2 = base_url.clone();
let server_f = async {
if server_mode {
windmill_api::run_server(db.clone(), addr, base_url, rx.resubscribe()).await?;
windmill_api::run_server(db.clone(), addr, rx.resubscribe()).await?;
}
Ok(()) as anyhow::Result<()>
};

let base_url = base_url2.clone();
let workers_f = async {
if num_workers > 0 {
let sleep_queue = std::env::var("SLEEP_QUEUE")
.ok()
.and_then(|x| x.parse::<u64>().ok())
.unwrap_or(windmill_common::DEFAULT_SLEEP_QUEUE);
let disable_nuser = std::env::var("DISABLE_NUSER")
.ok()
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or(false);
let disable_nsjail = std::env::var("DISABLE_NSJAIL")
.ok()
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or(true);
let keep_job_dir = std::env::var("KEEP_JOB_DIR")
.ok()
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or(false);
let license_key = std::env::var("LICENSE_KEY").ok();
let sync_bucket = std::env::var("S3_CACHE_BUCKET")
.ok()
.map(|e| Some(e))
.unwrap_or(None);

#[cfg(feature = "enterprise")]
tracing::info!(
"
"
##############################
Windmill Enterprise Edition {GIT_VERSION} LICENSE_KEY: {license_key:?}, S3_CACHE_BUCKET: {sync_bucket:?}
Windmill Enterprise Edition {GIT_VERSION}
##############################"
);
);

#[cfg(not(feature = "enterprise"))]
tracing::info!(
Expand All @@ -113,38 +88,58 @@ Windmill Community Edition {GIT_VERSION}
##############################"
);

tracing::info!(
"DISABLE_NSJAIL: {disable_nsjail}, DISABLE_NUSER: {disable_nuser}, BASE_URL: \
{base_url}, SLEEP_QUEUE: {sleep_queue}, NUM_WORKERS: {num_workers}, TIMEOUT: \
{timeout}, KEEP_JOB_DIR: {keep_job_dir}"
);
display_config(vec![
"DISABLE_NSJAIL",
"DISABLE_SERVER",
"NUM_WORKERS",
"METRICS_ADDR",
"JSON_FMT",
"BASE_URL",
"BASE_INTERNAL_URL",
"TIMEOUT",
"SLEEP_QUEUE",
"MAX_LOG_SIZE",
"PORT",
"KEEP_JOB_DIR",
"S3_CACHE_BUCKET",
"TAR_CACHE_RATE",
"COOKIE_DOMAIN",
"PYTHON_PATH",
"DENO_PATH",
"GO_PATH",
"PIP_INDEX_URL",
"PIP_EXTRA_INDEX_URL",
"PIP_TRUSTED_HOST",
"PATH",
"HOME",
"DATABASE_CONNECTIONS",
"TIMEOUT_WAIT_RESULT",
"QUEUE_LIMIT_WAIT_RESULT",
"DENO_AUTH_TOKENS",
"DENO_FLAGS",
"PIP_LOCAL_DEPENDENCIES",
"ADDITIONAL_PYTHON_PATHS",
"INCLUDE_HEADERS",
"WHITELIST_WORKSPACES",
"BLACKLIST_WORKSPACES",
"NEW_USER_WEBHOOK",
"CLOUD_HOSTED",
]);

run_workers(
db.clone(),
addr,
timeout,
num_workers,
sleep_queue,
WorkerConfig {
disable_nsjail,
disable_nuser,
base_internal_url,
base_url,
keep_job_dir,
},
rx.resubscribe(),
sync_bucket,
license_key,
num_workers,
base_internal_url.clone(),
)
.await?;
}
Ok(()) as anyhow::Result<()>
};

let base_url = base_url2;
let monitor_f = async {
if server_mode {
monitor_db(&db, timeout, base_url, rx.resubscribe());
monitor_db(&db, rx.resubscribe(), &base_internal_url);
}
Ok(()) as anyhow::Result<()>
};
Expand All @@ -163,47 +158,53 @@ Windmill Community Edition {GIT_VERSION}
Ok(())
}

fn display_config(envs: Vec<&str>) {
tracing::info!(
"config: {}",
envs.iter()
.filter(|env| std::env::var(env).is_ok())
.map(|env| {
format!(
"{}: {}",
env,
std::env::var(env).unwrap_or_else(|_| "not set".to_string())
)
})
.collect::<Vec<String>>()
.join(", ")
)
}

pub fn monitor_db(
db: &Pool<Postgres>,
timeout: i32,
base_url: String,
rx: tokio::sync::broadcast::Receiver<()>,
base_internal_url: &str,
) {
let db1 = db.clone();
let db2 = db.clone();

let rx2 = rx.resubscribe();

let base_internal_url = base_internal_url.to_string();
tokio::spawn(async move {
windmill_worker::handle_zombie_jobs_periodically(&db1, timeout, &base_url, rx).await
windmill_worker::handle_zombie_jobs_periodically(&db1, rx, &base_internal_url).await
});
tokio::spawn(async move { windmill_api::delete_expired_items_perdiodically(&db2, rx2).await });
}

pub async fn run_workers(
db: Pool<Postgres>,
addr: SocketAddr,
timeout: i32,
num_workers: i32,
sleep_queue: u64,
worker_config: WorkerConfig,
rx: tokio::sync::broadcast::Receiver<()>,
mut periodic_script: Option<String>,
license_key: Option<String>,
num_workers: i32,
base_internal_url: String,
) -> anyhow::Result<()> {
let license_key = std::env::var("LICENSE_KEY").ok();
#[cfg(feature = "enterprise")]
ee::verify_license_key(license_key)?;

#[cfg(not(feature = "enterprise"))]
if license_key.is_some() {
panic!("License key is required ONLY for the enterprise edition");
}
#[cfg(not(feature = "enterprise"))]
if !worker_config.disable_nsjail {
tracing::warn!(
"NSJAIL to sandbox process in untrusted environments is an enterprise feature but allowed to be used for testing purposes"
);
}

let instance_name = rd_string(5);
let monitor = tokio_metrics::TaskMonitor::new();
Expand All @@ -223,22 +224,17 @@ pub async fn run_workers(
let worker_name = format!("dt-worker-{}-{}", &instance_name, rd_string(5));
let ip = ip.clone();
let rx = rx.resubscribe();
let worker_config = worker_config.clone();
let wp = periodic_script.take();
let base_internal_url = base_internal_url.clone();
handles.push(tokio::spawn(monitor.instrument(async move {
tracing::info!(addr = %addr.to_string(), worker = %worker_name, "starting worker");
tracing::info!(worker = %worker_name, "starting worker");
windmill_worker::run_worker(
&db1,
timeout,
&instance_name,
worker_name,
i as u64,
num_workers as u64,
&ip,
sleep_queue,
worker_config,
wp,
rx,
&base_internal_url,
)
.await
})));
Expand Down
38 changes: 3 additions & 35 deletions backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ use windmill_common::{
flow_status::{FlowStatus, FlowStatusModule},
flows::{FlowModule, FlowModuleValue, FlowValue, InputTransform},
scripts::ScriptLang,
DEFAULT_SLEEP_QUEUE,
};
use windmill_queue::{get_queued_job, JobPayload, RawCode};
use windmill_worker::WorkerConfig;

async fn initialize_tracing() {
use std::sync::Once;
Expand Down Expand Up @@ -89,14 +87,7 @@ impl ApiServer {
let addr = sock.local_addr().unwrap();
drop(sock);

let task = tokio::task::spawn({
windmill_api::run_server(
db.clone(),
addr,
format!("http://localhost:{}", addr.port()),
rx,
)
});
let task = tokio::task::spawn(windmill_api::run_server(db.clone(), addr, rx));

return Self { addr, tx, task };
}
Expand Down Expand Up @@ -917,43 +908,20 @@ fn spawn_test_worker(
) {
let (tx, rx) = tokio::sync::broadcast::channel(1);
let db = db.to_owned();
let timeout = 4_000;
let worker_instance: &str = "test worker instance";
let worker_name: String = next_worker_name();
let i_worker: u64 = Default::default();
let num_workers: u64 = 2;
let ip: &str = Default::default();
let sleep_queue: u64 = DEFAULT_SLEEP_QUEUE / num_workers;
let port = port;
let worker_config = WorkerConfig {
base_internal_url: format!("http://localhost:{port}"),
base_url: format!("http://localhost:{port}"),
disable_nuser: std::env::var("DISABLE_NUSER")
.ok()
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or(false),
disable_nsjail: std::env::var("DISABLE_NSJAIL")
.ok()
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or(false),
keep_job_dir: std::env::var("KEEP_JOB_DIR")
.ok()
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or(false),
};
let future = async move {
let base_internal_url = format!("http://localhost:{}", port);
windmill_worker::run_worker(
&db,
timeout,
worker_instance,
worker_name,
i_worker,
num_workers,
ip,
sleep_queue,
worker_config,
None,
rx,
&base_internal_url,
)
.await
};
Expand Down
4 changes: 0 additions & 4 deletions backend/windmill-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ edition.workspace = true
name = "windmill_api"
path = "src/lib.rs"

[[bin]]
name = "windmill_api"
path = "src/main.rs"

[features]
enterprise = ["windmill-queue/enterprise"]

Expand Down
Loading

0 comments on commit c0b87cc

Please sign in to comment.