mirror of
https://github.com/whyour/qinglong.git
synced 2026-07-01 04:40:38 +08:00
使用sqlite替换nedb
This commit is contained in:
+102
-192
@@ -1,25 +1,17 @@
|
||||
import { Service, Inject } from 'typedi';
|
||||
import winston from 'winston';
|
||||
import DataStore from 'nedb';
|
||||
import config from '../config';
|
||||
import { Crontab, CrontabStatus } from '../data/cron';
|
||||
import { Crontab, CrontabModel, CrontabStatus } from '../data/cron';
|
||||
import { exec, execSync, spawn } from 'child_process';
|
||||
import fs from 'fs';
|
||||
import cron_parser from 'cron-parser';
|
||||
import { getFileContentByName, concurrentRun } from '../config/util';
|
||||
import PQueue from 'p-queue';
|
||||
import { promises, existsSync } from 'fs';
|
||||
import { promisify } from 'util';
|
||||
import { dbs } from '../loaders/db';
|
||||
import { Op } from 'sequelize';
|
||||
|
||||
@Service()
|
||||
export default class CronService {
|
||||
private cronDb = dbs.cronDb;
|
||||
|
||||
private queue = new PQueue({
|
||||
concurrency: parseInt(process.env.MaxConcurrentNum as string) || 5,
|
||||
});
|
||||
|
||||
constructor(@Inject('logger') private logger: winston.Logger) {}
|
||||
|
||||
private isSixCron(cron: Crontab) {
|
||||
@@ -32,7 +24,6 @@ export default class CronService {
|
||||
|
||||
public async create(payload: Crontab): Promise<Crontab> {
|
||||
const tab = new Crontab(payload);
|
||||
tab.created = new Date().valueOf();
|
||||
tab.saved = false;
|
||||
const doc = await this.insert(tab);
|
||||
await this.set_crontab(this.isSixCron(doc));
|
||||
@@ -40,20 +31,12 @@ export default class CronService {
|
||||
}
|
||||
|
||||
public async insert(payload: Crontab): Promise<Crontab> {
|
||||
return new Promise((resolve) => {
|
||||
this.cronDb.insert(payload, (err, docs) => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
} else {
|
||||
resolve(docs);
|
||||
}
|
||||
});
|
||||
});
|
||||
return await CrontabModel.create(payload);
|
||||
}
|
||||
|
||||
public async update(payload: Crontab): Promise<Crontab> {
|
||||
const { _id, ...other } = payload;
|
||||
const doc = await this.get(_id);
|
||||
const { id, ...other } = payload;
|
||||
const doc = await this.get(id as number);
|
||||
const tab = new Crontab({ ...doc, ...other });
|
||||
tab.saved = false;
|
||||
const newDoc = await this.updateDb(tab);
|
||||
@@ -62,20 +45,11 @@ export default class CronService {
|
||||
}
|
||||
|
||||
public async updateDb(payload: Crontab): Promise<Crontab> {
|
||||
return new Promise((resolve) => {
|
||||
this.cronDb.update(
|
||||
{ _id: payload._id },
|
||||
payload,
|
||||
{ returnUpdatedDocs: true },
|
||||
(err, num, docs: any) => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
} else {
|
||||
resolve(docs);
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
const result = await CrontabModel.update(
|
||||
{ ...payload },
|
||||
{ where: { id: payload.id } },
|
||||
);
|
||||
return result[1][0];
|
||||
}
|
||||
|
||||
public async status({
|
||||
@@ -86,7 +60,7 @@ export default class CronService {
|
||||
last_running_time = 0,
|
||||
last_execution_time = 0,
|
||||
}: {
|
||||
ids: string[];
|
||||
ids: number[];
|
||||
status: CrontabStatus;
|
||||
pid: number;
|
||||
log_path: string;
|
||||
@@ -103,67 +77,35 @@ export default class CronService {
|
||||
options.last_running_time = last_running_time;
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
this.cronDb.update(
|
||||
{ _id: { $in: ids } },
|
||||
{
|
||||
$set: options,
|
||||
},
|
||||
{ multi: true, returnUpdatedDocs: true },
|
||||
(err) => {
|
||||
resolve(null);
|
||||
},
|
||||
);
|
||||
});
|
||||
return await CrontabModel.update({ ...options }, { where: { id: ids } });
|
||||
}
|
||||
|
||||
public async remove(ids: string[]) {
|
||||
return new Promise((resolve: any) => {
|
||||
this.cronDb.remove(
|
||||
{ _id: { $in: ids } },
|
||||
{ multi: true },
|
||||
async (err) => {
|
||||
await this.set_crontab(true);
|
||||
resolve();
|
||||
},
|
||||
);
|
||||
});
|
||||
public async remove(ids: number[]) {
|
||||
await CrontabModel.destroy({ where: { id: ids } });
|
||||
await this.set_crontab(true);
|
||||
}
|
||||
|
||||
public async pin(ids: string[]) {
|
||||
return new Promise((resolve: any) => {
|
||||
this.cronDb.update(
|
||||
{ _id: { $in: ids } },
|
||||
{ $set: { isPinned: 1 } },
|
||||
{ multi: true },
|
||||
async (err) => {
|
||||
resolve();
|
||||
},
|
||||
);
|
||||
});
|
||||
public async pin(ids: number[]) {
|
||||
await CrontabModel.update({ isPinned: 1 }, { where: { id: ids } });
|
||||
}
|
||||
|
||||
public async unPin(ids: string[]) {
|
||||
return new Promise((resolve: any) => {
|
||||
this.cronDb.update(
|
||||
{ _id: { $in: ids } },
|
||||
{ $set: { isPinned: 0 } },
|
||||
{ multi: true },
|
||||
async (err) => {
|
||||
resolve();
|
||||
},
|
||||
);
|
||||
});
|
||||
public async unPin(ids: number[]) {
|
||||
await CrontabModel.update({ isPinned: 0 }, { where: { id: ids } });
|
||||
}
|
||||
|
||||
public async crontabs(searchText?: string): Promise<Crontab[]> {
|
||||
let query = {};
|
||||
if (searchText) {
|
||||
const encodeText = encodeURIComponent(searchText);
|
||||
const reg = new RegExp(`${searchText}|${encodeText}`, 'i');
|
||||
const reg = {
|
||||
[Op.or]: [
|
||||
{ [Op.like]: `%${searchText}&` },
|
||||
{ [Op.like]: `%${encodeText}%` },
|
||||
],
|
||||
};
|
||||
|
||||
query = {
|
||||
$or: [
|
||||
[Op.or]: [
|
||||
{
|
||||
name: reg,
|
||||
},
|
||||
@@ -176,69 +118,56 @@ export default class CronService {
|
||||
],
|
||||
};
|
||||
}
|
||||
return new Promise((resolve) => {
|
||||
this.cronDb
|
||||
.find(query)
|
||||
.sort({ created: -1 })
|
||||
.exec((err, docs) => {
|
||||
resolve(docs);
|
||||
});
|
||||
});
|
||||
try {
|
||||
const result = await CrontabModel.findAll({ where: query });
|
||||
return result as any;
|
||||
} catch (error) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public async get(_id: string): Promise<Crontab> {
|
||||
return new Promise((resolve) => {
|
||||
this.cronDb.find({ _id }).exec((err, docs) => {
|
||||
resolve(docs[0]);
|
||||
});
|
||||
});
|
||||
public async get(id: number): Promise<Crontab> {
|
||||
const result = await CrontabModel.findAll({ where: { id } });
|
||||
return result[0] as any;
|
||||
}
|
||||
|
||||
public async run(ids: string[]) {
|
||||
this.cronDb.update(
|
||||
{ _id: { $in: ids } },
|
||||
{ $set: { status: CrontabStatus.queued } },
|
||||
{ multi: true },
|
||||
public async run(ids: number[]) {
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.queued },
|
||||
{ where: { id: ids } },
|
||||
);
|
||||
concurrentRun(
|
||||
ids.map((id) => () => this.runSingle(id)),
|
||||
ids.map((id) => async () => await this.runSingle(id)),
|
||||
10,
|
||||
);
|
||||
}
|
||||
|
||||
public async stop(ids: string[]) {
|
||||
return new Promise((resolve: any) => {
|
||||
this.cronDb
|
||||
.find({ _id: { $in: ids } })
|
||||
.exec(async (err, docs: Crontab[]) => {
|
||||
for (const doc of docs) {
|
||||
if (doc.pid) {
|
||||
try {
|
||||
process.kill(-doc.pid);
|
||||
} catch (error) {
|
||||
this.logger.silly(error);
|
||||
}
|
||||
}
|
||||
const err = await this.killTask(doc.command);
|
||||
if (doc.log_path) {
|
||||
const str = err ? `\n${err}` : '';
|
||||
fs.appendFileSync(
|
||||
`${doc.log_path}`,
|
||||
`${str}\n## 执行结束... ${new Date()
|
||||
.toLocaleString('zh', { hour12: false })
|
||||
.replace(' 24:', ' 00:')} `,
|
||||
);
|
||||
}
|
||||
}
|
||||
this.cronDb.update(
|
||||
{ _id: { $in: ids } },
|
||||
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
|
||||
{ multi: true },
|
||||
);
|
||||
this.queue.clear();
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
public async stop(ids: number[]) {
|
||||
const docs = await CrontabModel.findAll({ where: { id: ids } });
|
||||
for (const doc of docs) {
|
||||
if (doc.pid) {
|
||||
try {
|
||||
process.kill(-doc.pid);
|
||||
} catch (error) {
|
||||
this.logger.silly(error);
|
||||
}
|
||||
}
|
||||
const err = await this.killTask(doc.command);
|
||||
if (doc.log_path) {
|
||||
const str = err ? `\n${err}` : '';
|
||||
fs.appendFileSync(
|
||||
`${doc.log_path}`,
|
||||
`${str}\n## 执行结束... ${new Date()
|
||||
.toLocaleString('zh', { hour12: false })
|
||||
.replace(' 24:', ' 00:')} `,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.queued, pid: undefined },
|
||||
{ where: { id: ids } },
|
||||
);
|
||||
}
|
||||
|
||||
public async killTask(name: string) {
|
||||
@@ -273,18 +202,18 @@ export default class CronService {
|
||||
}
|
||||
}
|
||||
|
||||
private async runSingle(id: string): Promise<number> {
|
||||
private async runSingle(cronId: number): Promise<number> {
|
||||
return new Promise(async (resolve: any) => {
|
||||
const cron = await this.get(id);
|
||||
const cron = await this.get(cronId);
|
||||
if (cron.status !== CrontabStatus.queued) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
let { _id, command, log_path } = cron;
|
||||
let { id, command, log_path } = cron;
|
||||
|
||||
this.logger.silly('Running job');
|
||||
this.logger.silly('ID: ' + _id);
|
||||
this.logger.silly('ID: ' + id);
|
||||
this.logger.silly('Original command: ' + command);
|
||||
|
||||
let cmdStr = command;
|
||||
@@ -296,9 +225,10 @@ export default class CronService {
|
||||
}
|
||||
|
||||
const cp = spawn(cmdStr, { shell: '/bin/bash' });
|
||||
this.cronDb.update(
|
||||
{ _id },
|
||||
{ $set: { status: CrontabStatus.running, pid: cp.pid } },
|
||||
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.running, pid: cp.pid },
|
||||
{ where: { id } },
|
||||
);
|
||||
cp.stderr.on('data', (data) => {
|
||||
if (log_path) {
|
||||
@@ -311,57 +241,39 @@ export default class CronService {
|
||||
}
|
||||
});
|
||||
|
||||
cp.on('exit', (code, signal) => {
|
||||
cp.on('exit', async (code, signal) => {
|
||||
this.logger.info(
|
||||
`${command} pid: ${cp.pid} exit ${code} signal ${signal}`,
|
||||
);
|
||||
this.cronDb.update(
|
||||
{ _id },
|
||||
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.idle, pid: undefined },
|
||||
{ where: { id } },
|
||||
);
|
||||
resolve();
|
||||
});
|
||||
cp.on('close', (code) => {
|
||||
cp.on('close', async (code) => {
|
||||
this.logger.info(`${command} pid: ${cp.pid} closed ${code}`);
|
||||
this.cronDb.update(
|
||||
{ _id },
|
||||
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.idle, pid: undefined },
|
||||
{ where: { id } },
|
||||
);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public async disabled(ids: string[]) {
|
||||
return new Promise((resolve: any) => {
|
||||
this.cronDb.update(
|
||||
{ _id: { $in: ids } },
|
||||
{ $set: { isDisabled: 1 } },
|
||||
{ multi: true },
|
||||
async (err) => {
|
||||
await this.set_crontab(true);
|
||||
resolve();
|
||||
},
|
||||
);
|
||||
});
|
||||
public async disabled(ids: number[]) {
|
||||
await CrontabModel.update({ isDisabled: 1 }, { where: { id: ids } });
|
||||
await this.set_crontab(true);
|
||||
}
|
||||
|
||||
public async enabled(ids: string[]) {
|
||||
return new Promise((resolve: any) => {
|
||||
this.cronDb.update(
|
||||
{ _id: { $in: ids } },
|
||||
{ $set: { isDisabled: 0 } },
|
||||
{ multi: true },
|
||||
async (err) => {
|
||||
await this.set_crontab(true);
|
||||
resolve();
|
||||
},
|
||||
);
|
||||
});
|
||||
public async enabled(ids: number[]) {
|
||||
await CrontabModel.update({ isDisabled: 0 }, { where: { id: ids } });
|
||||
await this.set_crontab(true);
|
||||
}
|
||||
|
||||
public async log(_id: string) {
|
||||
const doc = await this.get(_id);
|
||||
public async log(id: number) {
|
||||
const doc = await this.get(id);
|
||||
if (!doc) {
|
||||
return '';
|
||||
}
|
||||
@@ -401,7 +313,7 @@ export default class CronService {
|
||||
}
|
||||
|
||||
private make_command(tab: Crontab) {
|
||||
const crontab_job_string = `ID=${tab._id} ${tab.command}`;
|
||||
const crontab_job_string = `ID=${tab.id} ${tab.command}`;
|
||||
return crontab_job_string;
|
||||
}
|
||||
|
||||
@@ -431,7 +343,7 @@ export default class CronService {
|
||||
if (needReloadSchedule) {
|
||||
exec(`pm2 reload schedule`);
|
||||
}
|
||||
this.cronDb.update({}, { $set: { saved: true } }, { multi: true });
|
||||
await CrontabModel.update({ saved: true }, { where: {} });
|
||||
}
|
||||
|
||||
public import_crontab() {
|
||||
@@ -439,7 +351,7 @@ export default class CronService {
|
||||
var lines = stdout.split('\n');
|
||||
var namePrefix = new Date().getTime();
|
||||
|
||||
lines.reverse().forEach((line, index) => {
|
||||
lines.reverse().forEach(async (line, index) => {
|
||||
line = line.replace(/\t+/g, ' ');
|
||||
var regex =
|
||||
/^((\@[a-zA-Z]+\s+)|(([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+))/;
|
||||
@@ -453,18 +365,16 @@ export default class CronService {
|
||||
) {
|
||||
var name = namePrefix + '_' + index;
|
||||
|
||||
this.cronDb.findOne({ command, schedule }, (err, doc) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
if (!doc) {
|
||||
this.create({ name, command, schedule });
|
||||
} else {
|
||||
doc.command = command;
|
||||
doc.schedule = schedule;
|
||||
this.update(doc);
|
||||
}
|
||||
const _crontab = await CrontabModel.findOne({
|
||||
where: { command, schedule },
|
||||
});
|
||||
if (!_crontab) {
|
||||
await this.create({ name, command, schedule });
|
||||
} else {
|
||||
_crontab.command = command;
|
||||
_crontab.schedule = schedule;
|
||||
await this.update(_crontab);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user