Skip to content

Commit

Permalink
增加任务重复运行提醒
Browse files Browse the repository at this point in the history
  • Loading branch information
whyour committed Aug 23, 2024
1 parent f4cb3ea commit 8b8eae2
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 17 deletions.
16 changes: 8 additions & 8 deletions back/schedule/addCron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,29 @@ const addCron = (
);

if (extraSchedules?.length) {
extraSchedules.forEach(x => {
extraSchedules.forEach((x) => {
Logger.info(
'[schedule][创建定时任务], 任务ID: %s, 名称: %s, cron: %s, 执行命令: %s',
id,
name,
x.schedule,
command,
);
})
});
}

scheduleStacks.set(id, [
nodeSchedule.scheduleJob(id, schedule, async () => {
Logger.info(`[schedule][准备运行任务] 命令: ${command}`);
runCron(command, { name, schedule, extraSchedules });
runCron(command, item);
}),
...(extraSchedules?.length
? extraSchedules.map((x) =>
nodeSchedule.scheduleJob(id, x.schedule, async () => {
Logger.info(`[schedule][准备运行任务] 命令: ${command}`);
runCron(command, { name, schedule, extraSchedules });
}),
)
nodeSchedule.scheduleJob(id, x.schedule, async () => {
Logger.info(`[schedule][准备运行任务] 命令: ${command}`);
runCron(command, item);
}),
)
: []),
]);
}
Expand Down
19 changes: 16 additions & 3 deletions back/services/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default class ScheduleService {

private maxBuffer = 200 * 1024 * 1024;

constructor(@Inject('logger') private logger: winston.Logger) { }
constructor(@Inject('logger') private logger: winston.Logger) {}

async runTask(
command: string,
Expand All @@ -51,12 +51,19 @@ export default class ScheduleService {
schedule?: string;
name?: string;
command?: string;
id: string;
},
completionTime: 'start' | 'end' = 'end',
) {
return taskLimit.runWithCronLimit(() => {
return taskLimit.runWithCronLimit(params, () => {
return new Promise(async (resolve, reject) => {
this.logger.info(`[panel][开始执行任务] 参数 ${JSON.stringify({ ...params, command })}`);
taskLimit.removeQueuedCron(params.id);
this.logger.info(
`[panel][开始执行任务] 参数 ${JSON.stringify({
...params,
command,
})}`,
);

try {
const startTime = dayjs();
Expand Down Expand Up @@ -131,6 +138,7 @@ export default class ScheduleService {
name,
schedule,
command,
id: _id,
});
}),
);
Expand All @@ -140,6 +148,7 @@ export default class ScheduleService {
name,
schedule,
command,
id: _id,
});
}
}
Expand All @@ -148,6 +157,7 @@ export default class ScheduleService {
const _id = this.formatId(id);
this.logger.info('[panel][取消定时任务], 任务名: %s', name);
if (this.scheduleStacks.has(_id)) {
taskLimit.removeQueuedCron(_id);
this.scheduleStacks.get(_id)?.cancel();
this.scheduleStacks.delete(_id);
}
Expand All @@ -172,6 +182,7 @@ export default class ScheduleService {
this.runTask(command, callbacks, {
name,
command,
id: _id,
});
},
(err) => {
Expand All @@ -195,13 +206,15 @@ export default class ScheduleService {
this.runTask(command, callbacks, {
name,
command,
id: _id,
});
}
}

async cancelIntervalTask({ id = 0, name }: ScheduleTaskType) {
const _id = this.formatId(id);
this.logger.info('[取消interval任务], 任务ID: %s, 任务名: %s', _id, name);
taskLimit.removeQueuedCron(_id);
this.intervalSchedule.removeById(_id);
}

Expand Down
4 changes: 3 additions & 1 deletion back/services/script.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ScheduleService, { TaskCallbacks } from './schedule';
import config from '../config';
import { TASK_COMMAND } from '../config/const';
import { getFileContentByName, getPid, killTask, rmPath } from '../config/util';
import taskLimit from '../shared/pLimit';

@Service()
export default class ScriptService {
Expand Down Expand Up @@ -43,7 +44,7 @@ export default class ScriptService {
const pid = await this.scheduleService.runTask(
`real_time=true ${command}`,
this.taskCallbacks(filePath),
{ command },
{ command, id: relativePath.replace(/ /g, '-') },
'start',
);

Expand All @@ -53,6 +54,7 @@ export default class ScriptService {
public async stopScript(filePath: string, pid: number) {
if (!pid) {
const relativePath = path.relative(config.scriptPath, filePath);
taskLimit.removeQueuedCron(relativePath.replace(/ /g, '-'));
pid = (await getPid(`${TASK_COMMAND} ${relativePath} now`)) as number;
}
try {
Expand Down
3 changes: 3 additions & 0 deletions back/services/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { LOG_END_SYMBOL } from '../config/const';
import { formatCommand, formatUrl } from '../config/subscription';
import { CrontabModel } from '../data/cron';
import CrontabService from './cron';
import taskLimit from '../shared/pLimit';

@Service()
export default class SubscriptionService {
Expand Down Expand Up @@ -301,6 +302,7 @@ export default class SubscriptionService {
for (const doc of docs) {
if (doc.pid) {
try {
taskLimit.removeQueuedCron(String(doc.id));
await killTask(doc.pid);
} catch (error) {
this.logger.error(error);
Expand All @@ -326,6 +328,7 @@ export default class SubscriptionService {
name: subscription.name,
schedule: subscription.schedule,
command,
id: String(subscription.id),
});
}

Expand Down
4 changes: 4 additions & 0 deletions back/services/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ export default class SystemService {
},
{
command,
id: 'update-node-mirror',
},
);
}
Expand Down Expand Up @@ -252,6 +253,7 @@ export default class SystemService {
},
{
command,
id: 'update-linux-mirror',
},
);
}
Expand Down Expand Up @@ -363,6 +365,7 @@ export default class SystemService {
}
this.scheduleService.runTask(`real_time=true ${command}`, callback, {
command,
id: command.replace(/ /g, '-'),
});
}

Expand All @@ -371,6 +374,7 @@ export default class SystemService {
return { code: 400, message: '参数错误' };
}

taskLimit.removeQueuedCron(command.replace(/ /g, '-'));
if (pid) {
await killTask(pid);
return { code: 200 };
Expand Down
44 changes: 43 additions & 1 deletion back/shared/pLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,38 @@ import os from 'os';
import { AuthDataType, SystemModel } from '../data/system';
import Logger from '../loaders/logger';
import { Dependence } from '../data/dependence';
import { ICron } from '../protos/cron';
import NotificationService from '../services/notify';
import { Inject } from 'typedi';

export type Override<
T,
K extends Partial<{ [P in keyof T]: any }> | string,
> = K extends string
? Omit<T, K> & { [P in keyof T]: T[P] | unknown }
: Omit<T, keyof K> & K;
type TCron = Override<Partial<ICron>, { id: string }>;
interface IDependencyFn<T> {
(): Promise<T>;
dependency?: Dependence;
}
interface ICronFn<T> {
(): Promise<T>;
cron?: TCron;
}
class TaskLimit {
private dependenyLimit = new PQueue({ concurrency: 1 });
private queuedDependencyIds = new Set<number>([]);
private queuedCrons = new Map<string, TCron[]>();
private updateLogLimit = new PQueue({ concurrency: 1 });
private cronLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4),
});
private manualCronoLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4),
});
@Inject((type) => NotificationService)
private notificationService!: NotificationService;

get cronLimitActiveCount() {
return this.cronLimit.pending;
Expand Down Expand Up @@ -71,6 +88,16 @@ class TaskLimit {
}
}

public removeQueuedCron(id: string) {
if (this.queuedCrons.has(id)) {
const runs = this.queuedCrons.get(id);
if (runs && runs.length > 0) {
runs.pop();
this.queuedCrons.set(id, runs);
}
}
}

public async setCustomLimit(limit?: number) {
if (limit) {
this.cronLimit.concurrency = limit;
Expand All @@ -88,9 +115,24 @@ class TaskLimit {
}

public async runWithCronLimit<T>(
fn: () => Promise<T>,
cron: TCron,
fn: ICronFn<T>,
options?: Partial<QueueAddOptions>,
): Promise<T | void> {
let runs = this.queuedCrons.get(cron.id);
if (!runs?.length) {
runs = [];
}
runs.push(cron);
if (runs.length >= 5) {
this.notificationService.notify(
'任务重复运行',
`任务 ${cron.name} ${cron.command} 处于运行中的已达 5 个,请检查系统日志`,
);
return;
}
this.queuedCrons.set(cron.id, runs);
fn.cron = cron;
return this.cronLimit.add(fn, options);
}

Expand Down
15 changes: 11 additions & 4 deletions back/shared/runCron.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import { spawn } from 'cross-spawn';
import taskLimit from './pLimit';
import Logger from '../loaders/logger';
import { ICron } from '../protos/cron';

export function runCron(cmd: string, options?: { schedule: string; extraSchedules: Array<{ schedule: string }>; name: string }): Promise<number | void> {
return taskLimit.runWithCronLimit(() => {
export function runCron(cmd: string, cron: ICron): Promise<number | void> {
return taskLimit.runWithCronLimit(cron, () => {
return new Promise(async (resolve: any) => {
Logger.info(`[schedule][开始执行任务] 参数 ${JSON.stringify({ ...options, command: cmd })}`);
taskLimit.removeQueuedCron(cron.id);
Logger.info(
`[schedule][开始执行任务] 参数 ${JSON.stringify({
...cron,
command: cmd,
})}`,
);
const cp = spawn(cmd, { shell: '/bin/bash' });

cp.stderr.on('data', (data) => {
Expand All @@ -24,7 +31,7 @@ export function runCron(cmd: string, options?: { schedule: string; extraSchedule
});

cp.on('exit', async (code) => {
resolve({ ...options, command: cmd, pid: cp.pid, code });
resolve({ ...cron, command: cmd, pid: cp.pid, code });
});
});
});
Expand Down

0 comments on commit 8b8eae2

Please sign in to comment.