fs 文件操作替换为 fs.promise

This commit is contained in:
whyour
2023-11-01 16:44:34 +08:00
parent 66a2769e7c
commit 20f615eadf
17 changed files with 284 additions and 260 deletions
+66 -33
View File
@@ -3,10 +3,15 @@ import winston from 'winston';
import config from '../config';
import { Crontab, CrontabModel, CrontabStatus } from '../data/cron';
import { exec, execSync } from 'child_process';
import fs from 'fs';
import fs from 'fs/promises';
import cron_parser from 'cron-parser';
import { getFileContentByName, fileExist, killTask, getUniqPath, safeJSONParse } from '../config/util';
import { promises, existsSync } from 'fs';
import {
getFileContentByName,
fileExist,
killTask,
getUniqPath,
safeJSONParse,
} from '../config/util';
import { Op, where, col as colFn, FindOptions, fn } from 'sequelize';
import path from 'path';
import { TASK_PREFIX, QL_PREFIX } from '../config/const';
@@ -19,7 +24,7 @@ import omit from 'lodash/omit';
@Service()
export default class CronService {
constructor(@Inject('logger') private logger: winston.Logger) { }
constructor(@Inject('logger') private logger: winston.Logger) {}
private isSixCron(cron: Crontab) {
const { schedule } = cron;
@@ -35,7 +40,13 @@ export default class CronService {
const doc = await this.insert(tab);
if (this.isSixCron(doc) || doc.extra_schedules?.length) {
await cronClient.addCron([
{ name: doc.name || '', id: String(doc.id), schedule: doc.schedule!, command: this.makeCommand(doc), extraSchedules: doc.extra_schedules || [] },
{
name: doc.name || '',
id: String(doc.id),
schedule: doc.schedule!,
command: this.makeCommand(doc),
extraSchedules: doc.extra_schedules || [],
},
]);
}
await this.set_crontab();
@@ -64,7 +75,7 @@ export default class CronService {
id: String(newDoc.id),
schedule: newDoc.schedule!,
command: this.makeCommand(newDoc),
extraSchedules: newDoc.extra_schedules || []
extraSchedules: newDoc.extra_schedules || [],
},
]);
}
@@ -107,7 +118,10 @@ export default class CronService {
if (status === CrontabStatus.idle && log_path !== cron.log_path) {
options = omit(options, ['status', 'log_path', 'pid']);
}
await CrontabModel.update({ ...pickBy(options, (v) => v === 0 || !!v) }, { where: { id } });
await CrontabModel.update(
{ ...pickBy(options, (v) => v === 0 || !!v) },
{ where: { id } },
);
}
}
@@ -396,35 +410,45 @@ export default class CronService {
return taskLimit.runWithCronLimit(() => {
return new Promise(async (resolve: any) => {
const cron = await this.getDb({ id: cronId });
const params = { name: cron.name, command: cron.command, schedule: cron.schedule, extraSchedules: cron.extra_schedules };
const params = {
name: cron.name,
command: cron.command,
schedule: cron.schedule,
extraSchedules: cron.extra_schedules,
};
if (cron.status !== CrontabStatus.queued) {
resolve(params);
return;
}
this.logger.info(`[panel][开始执行任务] 参数 ${JSON.stringify(params)}`);
this.logger.info(
`[panel][开始执行任务] 参数 ${JSON.stringify(params)}`,
);
let { id, command, log_path } = cron;
const uniqPath = await getUniqPath(command, `${id}`);
const logTime = dayjs().format('YYYY-MM-DD-HH-mm-ss-SSS');
const logDirPath = path.resolve(config.logPath, `${uniqPath}`);
if (log_path?.split('/')?.every(x => x !== uniqPath)) {
fs.mkdirSync(logDirPath, { recursive: true });
if (log_path?.split('/')?.every((x) => x !== uniqPath)) {
await fs.mkdir(logDirPath, { recursive: true });
}
const logPath = `${uniqPath}/${logTime}.log`;
const absolutePath = path.resolve(config.logPath, `${logPath}`);
const cp = spawn(`real_log_path=${logPath} no_delay=true ${this.makeCommand(cron)}`, { shell: '/bin/bash' });
const cp = spawn(
`real_log_path=${logPath} no_delay=true ${this.makeCommand(cron)}`,
{ shell: '/bin/bash' },
);
await CrontabModel.update(
{ status: CrontabStatus.running, pid: cp.pid, log_path: logPath },
{ where: { id } },
);
cp.stderr.on('data', (data) => {
fs.appendFileSync(`${absolutePath}`, `${data.toString()}`);
cp.stderr.on('data', async (data) => {
await fs.appendFile(`${absolutePath}`, `${data.toString()}`);
});
cp.on('error', (err) => {
fs.appendFileSync(`${absolutePath}`, `${JSON.stringify(err)}`);
cp.on('error', async (err) => {
await fs.appendFile(`${absolutePath}`, `${JSON.stringify(err)}`);
});
cp.on('exit', async (code) => {
@@ -454,7 +478,7 @@ export default class CronService {
id: String(doc.id),
schedule: doc.schedule!,
command: this.makeCommand(doc),
extraSchedules: doc.extra_schedules || []
extraSchedules: doc.extra_schedules || [],
}));
await cronClient.addCron(sixCron);
await this.set_crontab();
@@ -469,7 +493,7 @@ export default class CronService {
const absolutePath = path.resolve(config.logPath, `${doc.log_path}`);
const logFileExist = doc.log_path && (await fileExist(absolutePath));
if (logFileExist) {
return getFileContentByName(`${absolutePath}`);
return await getFileContentByName(`${absolutePath}`);
} else {
return '任务未运行';
}
@@ -483,15 +507,18 @@ export default class CronService {
const relativeDir = path.dirname(`${doc.log_path}`);
const dir = path.resolve(config.logPath, relativeDir);
if (existsSync(dir)) {
let files = await promises.readdir(dir);
return files
.map((x) => ({
filename: x,
directory: relativeDir.replace(config.logPath, ''),
time: fs.statSync(`${dir}/${x}`).mtime.getTime(),
}))
.sort((a, b) => b.time - a.time);
const dirExist = await fileExist(dir);
if (dirExist) {
let files = await fs.readdir(dir);
return (
await Promise.all(
files.map(async (x) => ({
filename: x,
directory: relativeDir.replace(config.logPath, ''),
time: (await fs.stat(`${dir}/${x}`)).mtime.getTime(),
})),
)
).sort((a, b) => b.time - a.time);
} else {
return [];
}
@@ -502,13 +529,15 @@ export default class CronService {
if (!command.startsWith(TASK_PREFIX) && !command.startsWith(QL_PREFIX)) {
command = `${TASK_PREFIX}${tab.command}`;
}
let commandVariable = `no_tee=true ID=${tab.id} `
let commandVariable = `no_tee=true ID=${tab.id} `;
if (tab.task_before) {
commandVariable += `task_before='${tab.task_before.replace(/'/g, "'\\''")
commandVariable += `task_before='${tab.task_before
.replace(/'/g, "'\\''")
.trim()}' `;
}
if (tab.task_after) {
commandVariable += `task_after='${tab.task_after.replace(/'/g, "'\\''")
commandVariable += `task_after='${tab.task_after
.replace(/'/g, "'\\''")
.trim()}' `;
}
@@ -521,7 +550,11 @@ export default class CronService {
var crontab_string = '';
tabs.data.forEach((tab) => {
const _schedule = tab.schedule && tab.schedule.split(/ +/);
if (tab.isDisabled === 1 || _schedule!.length !== 5 || tab.extra_schedules?.length) {
if (
tab.isDisabled === 1 ||
_schedule!.length !== 5 ||
tab.extra_schedules?.length
) {
crontab_string += '# ';
crontab_string += tab.schedule;
crontab_string += ' ';
@@ -535,7 +568,7 @@ export default class CronService {
}
});
fs.writeFileSync(config.crontabFile, crontab_string);
await fs.writeFile(config.crontabFile, crontab_string);
execSync(`crontab ${config.crontabFile}`);
await CrontabModel.update({ saved: true }, { where: {} });
@@ -586,7 +619,7 @@ export default class CronService {
id: String(doc.id),
schedule: doc.schedule!,
command: this.makeCommand(doc),
extraSchedules: doc.extra_schedules || []
extraSchedules: doc.extra_schedules || [],
}));
await cronClient.addCron(sixCron);
}