Skip to content

Commit

Permalink
improve supercache
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Oct 12, 2022
1 parent 889965f commit d116218
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 50 deletions.
140 changes: 91 additions & 49 deletions backend/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ const PIP_CACHE_DIR: &str = "/tmp/windmill/cache/pip";
const DENO_CACHE_DIR: &str = "/tmp/windmill/cache/deno";
const GO_CACHE_DIR: &str = "/tmp/windmill/cache/go";
const NUM_SECS_ENV_CHECK: u64 = 15;
const DEFAULT_HEAVY_DEPS: [&str; 16] = [
"numpy",
"pandas",
"anyio",
"attrs",
"certifi",
"h11",
"httpcore",
"httpx",
"idna",
"python-dateutil",
"rfc3986",
"six",
"sniffio",
"windmill-api",
"wmill",
"psycopg2-binary",
];

const INCLUDE_DEPS_PY_SH_CONTENT: &str = include_str!("../../nsjail/download_deps.py.sh");
const NSJAIL_CONFIG_DOWNLOAD_PY_CONTENT: &str =
Expand Down Expand Up @@ -604,11 +622,7 @@ async fn handle_code_execution_job(
|| matches!(job.job_kind, JobKind::Script_Hub)
{
let code = (job.raw_code.as_ref().unwrap_or(&"no raw code".to_owned())).to_owned();
let reqs = match job.language {
Some(ScriptLang::Python3) => Some(parser_py::parse_python_imports(&code)?.join("\n")),
_ => None,
};
(code, reqs, job.language.to_owned())
(code, None, job.language.to_owned())
} else {
sqlx::query_as::<_, (String, Option<String>, Option<ScriptLang>)>(
"SELECT content, lock, language FROM script WHERE hash = $1 AND (workspace_id = $2 OR \
Expand Down Expand Up @@ -1043,7 +1057,7 @@ async fn create_args_and_out_file(

async fn handle_python_job(
WorkerConfig { base_internal_url, base_url, disable_nuser, disable_nsjail, .. }: &WorkerConfig,
Envs {
envs @ Envs {
nsjail_path,
python_path,
python_heavy_deps,
Expand All @@ -1065,13 +1079,26 @@ async fn handle_python_job(
inner_content: &String,
shared_mount: &str,
) -> error::Result<serde_json::Value> {
let requirements =
requirements_o.ok_or_else(|| Error::InternalErr(format!("lockfile missing")))?;

create_dependencies_dir(job_dir).await;

let mut additional_python_paths: Vec<String> = vec![];

let requirements = match requirements_o {
Some(r) => r,
None => {
let requirements = parser_py::parse_python_imports(&inner_content)?.join("\n");
if requirements.is_empty() {
"".to_string()
} else {
pip_compile(job, &requirements, logs, job_dir, envs, db, timeout)
.await?
.map_err(|e| {
Error::ExecutionErr(format!("pip compile failed: {}", e.to_string()))
})?
}
}
};

if requirements.len() > 0 {
if !disable_nsjail {
let _ = write_file(
Expand All @@ -1086,14 +1113,17 @@ async fn handle_python_job(
.await?;
}

let mut heavy_deps = vec!["numpy".to_string(), "pandas".to_string()];
let mut heavy_deps = DEFAULT_HEAVY_DEPS
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>();
heavy_deps.extend(python_heavy_deps.into_iter().map(|s| s.to_string()));

let (heavy, regular): (Vec<&str>, Vec<&str>) = requirements
.split("\n")
.partition(|d| heavy_deps.iter().any(|hd| d.starts_with(hd)));

let _ = write_file(job_dir, "requirements.txt", &requirements).await?;
let _ = write_file(job_dir, "requirements.txt", &regular.join("\n")).await?;

let mut vars = vec![];
if let Some(url) = pip_extra_index_url {
Expand Down Expand Up @@ -1137,6 +1167,7 @@ async fn handle_python_job(
"-m",
"pip",
"install",
"--no-deps",
"--no-color",
"--isolated",
"--no-warn-conflicts",
Expand Down Expand Up @@ -1351,49 +1382,17 @@ async fn handle_dependency_job(
job_dir: &str,
db: &sqlx::Pool<sqlx::Postgres>,
timeout: i32,
Envs { go_path, pip_extra_index_url, pip_index_url, pip_trusted_host, .. }: &Envs,
envs: &Envs,
) -> error::Result<serde_json::Value> {
let content = match job.language {
Some(ScriptLang::Python3) => {
create_dependencies_dir(job_dir).await;

let requirements = job
let requirements = &job
.raw_code
.as_ref()
.ok_or_else(|| Error::ExecutionErr("missing requirements".to_string()))?;
logs.push_str(&format!("content of requirements:\n{}\n", &requirements));
let file = "requirements.in";
write_file(job_dir, file, &requirements).await?;
let mut args = vec!["-q", "--no-header", file];
if let Some(url) = pip_extra_index_url {
args.extend(["--extra-index-url", url]);
}
if let Some(url) = pip_index_url {
args.extend(["--index-url", url]);
}
if let Some(host) = pip_trusted_host {
args.extend(["--trusted-host", host]);
}
let child = Command::new("pip-compile")
.current_dir(job_dir)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
handle_child(&job.id, db, logs, timeout, child)
.await
.map_err(|e| Error::ExecutionErr(format!("Lock file generation failed: {e:?}")))?;
let path_lock = format!("{job_dir}/requirements.txt");
let mut file = File::open(path_lock).await?;

let mut req_content = "".to_string();
file.read_to_string(&mut req_content).await?;
Ok(req_content
.lines()
.filter(|x| !x.trim_start().starts_with('#'))
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join("\n"))
.ok_or_else(|| Error::ExecutionErr("missing requirements".to_string()))?
.clone();
pip_compile(job, requirements, logs, job_dir, envs, db, timeout).await?
}
Some(ScriptLang::Go) => {
let requirements = job
Expand All @@ -1407,7 +1406,7 @@ async fn handle_dependency_job(
job_dir,
db,
timeout,
go_path,
&envs.go_path,
false,
)
.await
Expand Down Expand Up @@ -1442,6 +1441,49 @@ async fn handle_dependency_job(
}
}

async fn pip_compile(
job: &QueuedJob,
requirements: &str,
logs: &mut String,
job_dir: &str,
Envs { pip_extra_index_url, pip_index_url, pip_trusted_host, .. }: &Envs,
db: &DB,
timeout: i32,
) -> Result<Result<String, String>, Error> {
logs.push_str(&format!("content of requirements:\n{}\n", requirements));
let file = "requirements.in";
write_file(job_dir, file, &requirements).await?;
let mut args = vec!["-q", "--no-header", file];
if let Some(url) = pip_extra_index_url {
args.extend(["--extra-index-url", url]);
}
if let Some(url) = pip_index_url {
args.extend(["--index-url", url]);
}
if let Some(host) = pip_trusted_host {
args.extend(["--trusted-host", host]);
}
let child = Command::new("pip-compile")
.current_dir(job_dir)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
handle_child(&job.id, db, logs, timeout, child)
.await
.map_err(|e| Error::ExecutionErr(format!("Lock file generation failed: {e:?}")))?;
let path_lock = format!("{job_dir}/requirements.txt");
let mut file = File::open(path_lock).await?;
let mut req_content = "".to_string();
file.read_to_string(&mut req_content).await?;
Ok(Ok(req_content
.lines()
.filter(|x| !x.trim_start().starts_with('#'))
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join("\n")))
}

async fn install_go_dependencies(
job_id: &Uuid,
code: &str,
Expand Down
2 changes: 1 addition & 1 deletion nsjail/download_deps.py.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ fi
mkdir -p /tmp/dependencies
touch /tmp/dependencies/_windmill
/usr/local/bin/python3 -m pip install --cache-dir /tmp/.cache/pip -t /tmp/dependencies -r /user/requirements.txt\
--no-color --isolated --no-warn-conflicts --disable-pip-version-check $INDEX_URL_ARG $EXTRA_INDEX_URL_ARG $TRUSTED_HOST_ARG
--no-color --no-deps --isolated --no-warn-conflicts --disable-pip-version-check $INDEX_URL_ARG $EXTRA_INDEX_URL_ARG $TRUSTED_HOST_ARG

mv /tmp/dependencies/* /out

0 comments on commit d116218

Please sign in to comment.