Skip to content

Commit

Permalink
web/run-worker: subscribe to queue & kill worker when removed from store
Browse files Browse the repository at this point in the history
& also clear the interval
  • Loading branch information
wukko committed Jan 25, 2025
1 parent d4684fa commit de66ac6
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions web/src/lib/queen-bee/run-worker.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,20 @@
import RemuxWorker from "$lib/workers/remux?worker";

import type { CobaltPipelineItem } from "$lib/types/workers";
import { itemDone, itemError } from "$lib/state/queen-bee/queue";
import { itemDone, itemError, queue } from "$lib/state/queen-bee/queue";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";

const workerError = (parentId: string, workerId: string, worker: Worker, error: string) => {
itemError(parentId, workerId, error);
worker.terminate();
}
import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltPipelineItem } from "$lib/types/workers";

const workerSuccess = (parentId: string, workerId: string, worker: Worker, file: File) => {
itemDone(parentId, workerId, file);
const killWorker = (worker: Worker, unsubscribe: () => void, interval: NodeJS.Timeout) => {
unsubscribe();
worker.terminate();
clearInterval(interval);
}

export const runRemuxWorker = async (workerId: string, parentId: string, file: File) => {
const worker = new RemuxWorker();

worker.postMessage({ file });

worker.onerror = (e) => {
console.error("remux worker exploded:", e);

// TODO: proper error code
workerError(parentId, workerId, worker, "internal error");
};

// sometimes chrome refuses to start libav wasm,
// so we check the health and kill self if it doesn't spawn

Expand All @@ -34,25 +23,43 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
bumpAttempts++;

if (bumpAttempts === 8) {
worker.terminate();
killWorker(worker, unsubscribe, startCheck);
console.error("worker didn't start after 4 seconds, so it was killed");

// TODO: proper error code
return workerError(parentId, workerId, worker, "worker didn't start");
return itemError(parentId, workerId, "worker didn't start");
}
}, 500);

const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe, startCheck);
}
});

worker.postMessage({ file });

worker.onerror = (e) => {
console.error("remux worker exploded:", e);
killWorker(worker, unsubscribe, startCheck);

// TODO: proper error code
return itemError(parentId, workerId, "internal error");
};

let totalDuration: number | null = null;

worker.onmessage = (event) => {
const eventData = event.data.cobaltRemuxWorker;
if (!eventData) return;

clearInterval(startCheck);

// temporary debug logging
console.log(JSON.stringify(eventData, null, 2));

clearInterval(startCheck);

if (eventData.progress) {
if (eventData.progress.duration) {
totalDuration = eventData.progress.duration;
Expand All @@ -65,18 +72,19 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
}

if (eventData.render) {
return workerSuccess(
killWorker(worker, unsubscribe, startCheck);
return itemDone(
parentId,
workerId,
worker,
new File([eventData.render], eventData.filename, {
type: eventData.render.type,
})
);
}

if (eventData.error) {
return workerError(parentId, workerId, worker, eventData.error);
killWorker(worker, unsubscribe, startCheck);
return itemError(parentId, workerId, eventData.error);
}
};
}
Expand Down

0 comments on commit de66ac6

Please sign in to comment.