Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add await #1184

Merged
merged 5 commits into from
Feb 4, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix workflow
  • Loading branch information
thibaultleouay committed Feb 4, 2025
commit bf7c83778566073d8802418cb65e599102014912
81 changes: 53 additions & 28 deletions apps/workflows/src/cron/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,37 +123,62 @@ export async function LaunchMonitorWorkflow() {
// Let's merge both results
const users = [...u, ...u1];
// iterate over users

const allResult = [];

for (const user of users) {
console.log(`Starting workflow for ${user.userId}`);
// Let's check if the user is in the workflow
const isMember = await redis.sismember("workflow:users", user.userId);
if (isMember) {
continue;
}
// check if user has some running monitors
const nbRunningMonitor = await db.$count(
schema.monitor,
and(
eq(schema.monitor.workspaceId, user.workspaceId),
eq(schema.monitor.active, true),
isNull(schema.monitor.deletedAt)
)
);
if (nbRunningMonitor > 0) {
continue;
}
await CreateTask({
parent,
client: client,
step: "14days",
userId: user.userId,
initialRun: new Date().getTime(),
});
// // Add our user to the list of users that have started the workflow
const workflow = workflowInit({ user });
allResult.push(workflow);
}

const allRequests = await Promise.allSettled(allResult);

const success = allRequests.filter((r) => r.status === "fulfilled").length;
const failed = allRequests.filter((r) => r.status === "rejected").length;

console.log(
`End cron with ${allResult.length} jobs with ${success} success and ${failed} failed`
);
}

await redis.sadd("workflow:users", user.userId);
console.log(`user workflow started for ${user.userId}`);
async function workflowInit({
user,
}: {
user: {
userId: number;
email: string | null;
workspaceId: number;
};
}) {
console.log(`Starting workflow for ${user.userId}`);
// Let's check if the user is in the workflow
const isMember = await redis.sismember("workflow:users", user.userId);
if (isMember) {
return;
}
// check if user has some running monitors
const nbRunningMonitor = await db.$count(
schema.monitor,
and(
eq(schema.monitor.workspaceId, user.workspaceId),
eq(schema.monitor.active, true),
isNull(schema.monitor.deletedAt)
)
);
if (nbRunningMonitor > 0) {
return;
}
await CreateTask({
parent,
client: client,
step: "14days",
userId: user.userId,
initialRun: new Date().getTime(),
});
// // Add our user to the list of users that have started the workflow

await redis.sadd("workflow:users", user.userId);
console.log(`user workflow started for ${user.userId}`);
}

export async function Step14Days(userId: number) {
Expand Down
Loading