From 4e5ad6d5f393abd28f9479dc319295e31920d1bd Mon Sep 17 00:00:00 2001 From: whyour Date: Fri, 23 Aug 2024 23:06:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E6=9C=8D=E5=8A=A1=E5=8C=BA?= =?UTF-8?q?=E5=88=86=E7=B3=BB=E7=BB=9F=E3=80=81=E8=AE=A2=E9=98=85=E3=80=81?= =?UTF-8?q?=E8=84=9A=E6=9C=AC=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- back/loaders/initTask.ts | 32 ++++++++++++------- back/services/schedule.ts | 27 ++++++++++------ back/services/script.ts | 2 +- back/services/subscription.ts | 5 +-- back/services/system.ts | 16 +++++++--- back/shared/interface.ts | 33 +++++++++++++++++++ back/shared/pLimit.ts | 58 +++++++++++++++++++++++++--------- src/pages/script/editModal.tsx | 4 +-- 8 files changed, 132 insertions(+), 45 deletions(-) create mode 100644 back/shared/interface.ts diff --git a/back/loaders/initTask.ts b/back/loaders/initTask.ts index 33a249ad..01319bc5 100644 --- a/back/loaders/initTask.ts +++ b/back/loaders/initTask.ts @@ -12,7 +12,10 @@ export default async () => { const subscriptionService = Container.get(SubscriptionService); // 生成内置token - let tokenCommand = `ts-node-transpile-only ${join(config.rootPath, 'back/token.ts')}`; + let tokenCommand = `ts-node-transpile-only ${join( + config.rootPath, + 'back/token.ts', + )}`; const tokenFile = join(config.rootPath, 'static/build/token.js'); if (await fileExist(tokenFile)) { @@ -22,11 +25,16 @@ export default async () => { id: NaN, name: '生成token', command: tokenCommand, + runOrigin: 'system', } as ScheduleTaskType; await scheduleService.cancelIntervalTask(cron); - scheduleService.createIntervalTask(cron, { - days: 28, - }); + scheduleService.createIntervalTask( + cron, + { + days: 28, + }, + true, + ); // 运行删除日志任务 const data = await systemService.getSystemConfig(); @@ -35,17 +43,17 @@ export default async () => { id: data.id as number, name: '删除日志', command: `ql rmlog ${data.info.logRemoveFrequency}`, + runOrigin: 'system' as const, }; await scheduleService.cancelIntervalTask(rmlogCron); - scheduleService.createIntervalTask(rmlogCron, { - days: data.info.logRemoveFrequency, - }); + scheduleService.createIntervalTask( + rmlogCron, + { + days: data.info.logRemoveFrequency, + }, + true, + ); } - // 运行所有订阅 await subscriptionService.setSshConfig(); - const subs = await subscriptionService.list(); - for (const sub of subs) { - subscriptionService.handleTask(sub, !sub.is_disabled, !sub.is_disabled); - } }; diff --git a/back/services/schedule.ts b/back/services/schedule.ts index 5365d8d6..124645a6 100644 --- a/back/services/schedule.ts +++ b/back/services/schedule.ts @@ -17,6 +17,7 @@ export interface ScheduleTaskType { command: string; name?: string; schedule?: string; + runOrigin: 'subscription' | 'system' | 'script'; } export interface TaskCallbacks { @@ -40,7 +41,11 @@ export default class ScheduleService { private intervalSchedule = new ToadScheduler(); - private maxBuffer = 200 * 1024 * 1024; + private taskLimitMap = { + system: 'runWithSystemLimit' as const, + script: 'runWithScriptLimit' as const, + subscription: 'runWithSubscriptionLimit' as const, + }; constructor(@Inject('logger') private logger: winston.Logger) {} @@ -52,15 +57,17 @@ export default class ScheduleService { name?: string; command?: string; id: string; + runOrigin: 'subscription' | 'system' | 'script'; }, completionTime: 'start' | 'end' = 'end', ) { - return taskLimit.runWithCronLimit(params, () => { + const { runOrigin, ...others } = params; + + return taskLimit[this.taskLimitMap[runOrigin]](others, () => { return new Promise(async (resolve, reject) => { - taskLimit.removeQueuedCron(params.id); this.logger.info( `[panel][开始执行任务] 参数 ${JSON.stringify({ - ...params, + ...others, command, })}`, ); @@ -103,7 +110,7 @@ export default class ScheduleService { endTime, endTime.diff(startTime, 'seconds'), ); - resolve({ ...params, pid: cp.pid, code }); + resolve({ ...others, pid: cp.pid, code }); }); } catch (error) { this.logger.error( @@ -118,7 +125,7 @@ export default class ScheduleService { } async createCronTask( - { id = 0, command, name, schedule = '' }: ScheduleTaskType, + { id = 0, command, name, schedule = '', runOrigin }: ScheduleTaskType, callbacks?: TaskCallbacks, runImmediately = false, ) { @@ -139,6 +146,7 @@ export default class ScheduleService { schedule, command, id: _id, + runOrigin, }); }), ); @@ -149,6 +157,7 @@ export default class ScheduleService { schedule, command, id: _id, + runOrigin, }); } } @@ -157,14 +166,13 @@ export default class ScheduleService { const _id = this.formatId(id); this.logger.info('[panel][取消定时任务], 任务名: %s', name); if (this.scheduleStacks.has(_id)) { - taskLimit.removeQueuedCron(_id); this.scheduleStacks.get(_id)?.cancel(); this.scheduleStacks.delete(_id); } } async createIntervalTask( - { id = 0, command, name = '' }: ScheduleTaskType, + { id = 0, command, name = '', runOrigin }: ScheduleTaskType, schedule: SimpleIntervalSchedule, runImmediately = true, callbacks?: TaskCallbacks, @@ -183,6 +191,7 @@ export default class ScheduleService { name, command, id: _id, + runOrigin, }); }, (err) => { @@ -207,6 +216,7 @@ export default class ScheduleService { name, command, id: _id, + runOrigin, }); } } @@ -214,7 +224,6 @@ export default class ScheduleService { async cancelIntervalTask({ id = 0, name }: ScheduleTaskType) { const _id = this.formatId(id); this.logger.info('[取消interval任务], 任务ID: %s, 任务名: %s', _id, name); - taskLimit.removeQueuedCron(_id); this.intervalSchedule.removeById(_id); } diff --git a/back/services/script.ts b/back/services/script.ts index a8d965ac..084d1292 100644 --- a/back/services/script.ts +++ b/back/services/script.ts @@ -44,7 +44,7 @@ export default class ScriptService { const pid = await this.scheduleService.runTask( `real_time=true ${command}`, this.taskCallbacks(filePath), - { command, id: relativePath.replace(/ /g, '-') }, + { command, id: relativePath.replace(/ /g, '-'), runOrigin: 'script' }, 'start', ); diff --git a/back/services/subscription.ts b/back/services/subscription.ts index b7acbe2a..974b6b70 100644 --- a/back/services/subscription.ts +++ b/back/services/subscription.ts @@ -88,7 +88,7 @@ export default class SubscriptionService { this.scheduleService.cancelCronTask(doc as any); needCreate && (await this.scheduleService.createCronTask( - doc as any, + { ...doc, runOrigin: 'subscription' } as any, this.taskCallbacks(doc), runImmediately, )); @@ -97,7 +97,7 @@ export default class SubscriptionService { const { type, value } = doc.interval_schedule; needCreate && (await this.scheduleService.createIntervalTask( - doc as any, + { ...doc, runOrigin: 'subscription' } as any, { [type]: value } as SimpleIntervalSchedule, runImmediately, this.taskCallbacks(doc), @@ -329,6 +329,7 @@ export default class SubscriptionService { schedule: subscription.schedule, command, id: String(subscription.id), + runOrigin: 'subscription', }); } diff --git a/back/services/system.ts b/back/services/system.ts index a463fb52..33f3e64c 100644 --- a/back/services/system.ts +++ b/back/services/system.ts @@ -92,17 +92,22 @@ export default class SystemService { info: { ...oDoc.info, ...info }, }); const cron = { - id: result.id || NaN, + id: result.id as number, name: '删除日志', command: `ql rmlog ${info.logRemoveFrequency}`, + runOrigin: 'system' as const, }; if (oDoc.info?.logRemoveFrequency) { await this.scheduleService.cancelIntervalTask(cron); } if (info.logRemoveFrequency && info.logRemoveFrequency > 0) { - this.scheduleService.createIntervalTask(cron, { - days: info.logRemoveFrequency, - }); + this.scheduleService.createIntervalTask( + cron, + { + days: info.logRemoveFrequency, + }, + true, + ); } return { code: 200, data: info }; } @@ -179,6 +184,7 @@ export default class SystemService { { command, id: 'update-node-mirror', + runOrigin: 'system', }, ); } @@ -254,6 +260,7 @@ export default class SystemService { { command, id: 'update-linux-mirror', + runOrigin: 'system', }, ); } @@ -366,6 +373,7 @@ export default class SystemService { this.scheduleService.runTask(`real_time=true ${command}`, callback, { command, id: command.replace(/ /g, '-'), + runOrigin: 'system', }); } diff --git a/back/shared/interface.ts b/back/shared/interface.ts new file mode 100644 index 00000000..01fd6cb7 --- /dev/null +++ b/back/shared/interface.ts @@ -0,0 +1,33 @@ +import { Dependence } from '../data/dependence'; +import { ICron } from '../protos/cron'; + +export type Override< + T, + K extends Partial<{ [P in keyof T]: any }> | string, +> = K extends string + ? Omit & { [P in keyof T]: T[P] | unknown } + : Omit & K; + +export type TCron = Override, { id: string }>; + +export interface IDependencyFn { + (): Promise; + dependency?: Dependence; +} + +export interface ICronFn { + (): Promise; + cron?: TCron; +} + +export interface ISchedule { + schedule?: string; + name?: string; + command?: string; + id: string; +} + +export interface IScheduleFn { + (): Promise; + schedule?: ISchedule; +} diff --git a/back/shared/pLimit.ts b/back/shared/pLimit.ts index 9215665d..6ccebcd5 100644 --- a/back/shared/pLimit.ts +++ b/back/shared/pLimit.ts @@ -6,22 +6,14 @@ import { Dependence } from '../data/dependence'; import { ICron } from '../protos/cron'; import NotificationService from '../services/notify'; import { Inject } from 'typedi'; +import { + ICronFn, + IDependencyFn, + ISchedule, + IScheduleFn, + TCron, +} from './interface'; -export type Override< - T, - K extends Partial<{ [P in keyof T]: any }> | string, -> = K extends string - ? Omit & { [P in keyof T]: T[P] | unknown } - : Omit & K; -type TCron = Override, { id: string }>; -interface IDependencyFn { - (): Promise; - dependency?: Dependence; -} -interface ICronFn { - (): Promise; - cron?: TCron; -} class TaskLimit { private dependenyLimit = new PQueue({ concurrency: 1 }); private queuedDependencyIds = new Set([]); @@ -33,6 +25,15 @@ class TaskLimit { private manualCronoLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); + private subscriptionLimit = new PQueue({ + concurrency: Math.max(os.cpus().length, 4), + }); + private scriptLimit = new PQueue({ + concurrency: Math.max(os.cpus().length, 4), + }); + private systemLimit = new PQueue({ + concurrency: Math.max(os.cpus().length, 4), + }); @Inject((type) => NotificationService) private notificationService!: NotificationService; @@ -143,6 +144,33 @@ class TaskLimit { return this.manualCronoLimit.add(fn, options); } + public async runWithSubscriptionLimit( + schedule: TCron, + fn: IScheduleFn, + options?: Partial, + ): Promise { + fn.schedule = schedule; + return this.subscriptionLimit.add(fn, options); + } + + public async runWithSystemLimit( + schedule: TCron, + fn: IScheduleFn, + options?: Partial, + ): Promise { + fn.schedule = schedule; + return this.systemLimit.add(fn, options); + } + + public async runWithScriptLimit( + schedule: ISchedule, + fn: IScheduleFn, + options?: Partial, + ): Promise { + fn.schedule = schedule; + return this.scriptLimit.add(fn, options); + } + public runDependeny( dependency: Dependence, fn: IDependencyFn, diff --git a/src/pages/script/editModal.tsx b/src/pages/script/editModal.tsx index c898e31b..ef7d4eea 100644 --- a/src/pages/script/editModal.tsx +++ b/src/pages/script/editModal.tsx @@ -34,7 +34,7 @@ const EditModal = ({ handleCancel: () => void; }) => { const [value, setValue] = useState(''); - const [language, setLanguage] = useState('javascript'); + const [language, setLanguage] = useState(); const [cNode, setCNode] = useState(); const [selectedKey, setSelectedKey] = useState(); const [saveModalVisible, setSaveModalVisible] = useState(false); @@ -242,7 +242,7 @@ const EditModal = ({ minimap: { enabled: false }, lineNumbersMinChars: 3, glyphMargin: false, - accessibilitySupport: 'off' + accessibilitySupport: 'off', }} onMount={(editor) => { editorRef.current = editor;