增加新建订阅

This commit is contained in:
whyour
2022-05-10 14:38:06 +08:00
parent 85d8565241
commit ebb6290468
13 changed files with 489 additions and 31 deletions
+17 -3
View File
@@ -3,8 +3,10 @@ import { DataTypes, Model, ModelDefined } from 'sequelize';
import { SimpleIntervalSchedule } from 'toad-scheduler';
export class Subscription {
id?: number;
name?: string;
type?: 'public-repo' | 'private-repo' | 'file';
schedule_type?: 'crontab' | 'interval';
schedule?: string | SimpleIntervalSchedule;
url?: string;
whitelist?: string;
@@ -14,11 +16,15 @@ export class Subscription {
status?: SubscriptionStatus;
pull_type?: 'ssh-key' | 'user-pwd';
pull_option?:
| { private_key: string; key_alias: string }
| { private_key: string }
| { username: string; password: string };
pid?: string;
pid?: number;
isDisabled?: 1 | 0;
log_path?: string;
alias: string;
constructor(options: Subscription) {
this.id = options.id;
this.name = options.name;
this.type = options.type;
this.schedule = options.schedule;
@@ -31,6 +37,10 @@ export class Subscription {
this.pull_type = options.pull_type;
this.pull_option = options.pull_option;
this.pid = options.pid;
this.isDisabled = options.isDisabled;
this.log_path = options.log_path;
this.schedule_type = options.schedule_type;
this.alias = options.alias;
}
}
@@ -44,7 +54,7 @@ export enum SubscriptionStatus {
interface SubscriptionInstance
extends Model<Subscription, Subscription>,
Subscription {}
export const CrontabModel = sequelize.define<SubscriptionInstance>(
export const SubscriptionModel = sequelize.define<SubscriptionInstance>(
'Subscription',
{
name: {
@@ -67,5 +77,9 @@ export const CrontabModel = sequelize.define<SubscriptionInstance>(
pull_type: DataTypes.STRING,
pull_option: DataTypes.JSON,
pid: DataTypes.NUMBER,
isDisabled: DataTypes.NUMBER,
log_path: DataTypes.STRING,
schedule_type: DataTypes.STRING,
alias: DataTypes.STRING,
},
);
+254
View File
@@ -0,0 +1,254 @@
import { Service, Inject } from 'typedi';
import winston from 'winston';
import config from '../config';
import {
Subscription,
SubscriptionModel,
SubscriptionStatus,
} from '../data/subscription';
import { exec, execSync, spawn } from 'child_process';
import fs from 'fs';
import cron_parser from 'cron-parser';
import { getFileContentByName, concurrentRun, fileExist } from '../config/util';
import { promises, existsSync } from 'fs';
import { promisify } from 'util';
import { Op } from 'sequelize';
import path from 'path';
@Service()
export default class SubscriptionService {
constructor(@Inject('logger') private logger: winston.Logger) {}
public async create(payload: Subscription): Promise<Subscription> {
const tab = new Subscription(payload);
const doc = await this.insert(tab);
return doc;
}
public async insert(payload: Subscription): Promise<Subscription> {
return await SubscriptionModel.create(payload, { returning: true });
}
public async update(payload: Subscription): Promise<Subscription> {
const newDoc = await this.updateDb(payload);
return newDoc;
}
public async updateDb(payload: Subscription): Promise<Subscription> {
await SubscriptionModel.update(payload, { where: { id: payload.id } });
return await this.getDb({ id: payload.id });
}
public async status({
ids,
status,
pid,
log_path,
last_running_time = 0,
last_execution_time = 0,
}: {
ids: number[];
status: SubscriptionStatus;
pid: number;
log_path: string;
last_running_time: number;
last_execution_time: number;
}) {
const options: any = {
status,
pid,
log_path,
last_execution_time,
};
if (last_running_time > 0) {
options.last_running_time = last_running_time;
}
return await SubscriptionModel.update(
{ ...options },
{ where: { id: ids } },
);
}
public async remove(ids: number[]) {
await SubscriptionModel.destroy({ where: { id: ids } });
}
public async getDb(query: any): Promise<Subscription> {
const doc: any = await SubscriptionModel.findOne({ where: { ...query } });
return doc && (doc.get({ plain: true }) as Subscription);
}
public async run(ids: number[]) {
await SubscriptionModel.update(
{ status: SubscriptionStatus.queued },
{ where: { id: ids } },
);
concurrentRun(
ids.map((id) => async () => await this.runSingle(id)),
10,
);
}
public async stop(ids: number[]) {
const docs = await SubscriptionModel.findAll({ where: { id: ids } });
for (const doc of docs) {
if (doc.pid) {
try {
process.kill(-doc.pid);
} catch (error) {
this.logger.silly(error);
}
}
const err = await this.killTask(doc.command);
const absolutePath = path.resolve(config.logPath, `${doc.log_path}`);
const logFileExist = doc.log_path && (await fileExist(absolutePath));
if (logFileExist) {
const str = err ? `\n${err}` : '';
fs.appendFileSync(
`${absolutePath}`,
`${str}\n## 执行结束... ${new Date()
.toLocaleString('zh', { hour12: false })
.replace(' 24:', ' 00:')} `,
);
}
}
await SubscriptionModel.update(
{ status: SubscriptionStatus.idle, pid: undefined },
{ where: { id: ids } },
);
}
public async killTask(name: string) {
let taskCommand = `ps -ef | grep "${name}" | grep -v grep | awk '{print $1}'`;
const execAsync = promisify(exec);
try {
let pid = (await execAsync(taskCommand)).stdout;
if (pid) {
pid = (await execAsync(`pstree -p ${pid}`)).stdout;
} else {
return;
}
let pids = pid.match(/\(\d+/g);
const killLogs = [];
if (pids && pids.length > 0) {
// node 执行脚本时还会有10个子进程,但是ps -ef中不存在,所以截取前三个
for (const id of pids) {
const c = `kill -9 ${id.slice(1)}`;
try {
const { stdout, stderr } = await execAsync(c);
if (stderr) {
killLogs.push(stderr);
}
if (stdout) {
killLogs.push(stdout);
}
} catch (error: any) {
killLogs.push(error.message);
}
}
}
return killLogs.length > 0 ? JSON.stringify(killLogs) : '';
} catch (e) {
return JSON.stringify(e);
}
}
private async runSingle(cronId: number): Promise<number> {
return new Promise(async (resolve: any) => {
const cron = await this.getDb({ id: cronId });
if (cron.status !== SubscriptionStatus.queued) {
resolve();
return;
}
let { id, log_path } = cron;
const absolutePath = path.resolve(config.logPath, `${log_path}`);
const logFileExist = log_path && (await fileExist(absolutePath));
this.logger.silly('Running job');
this.logger.silly('ID: ' + id);
this.logger.silly('Original command: ');
let cmdStr = '';
const cp = spawn(cmdStr, { shell: '/bin/bash' });
await SubscriptionModel.update(
{ status: SubscriptionStatus.running, pid: cp.pid },
{ where: { id } },
);
cp.stderr.on('data', (data) => {
if (logFileExist) {
fs.appendFileSync(`${absolutePath}`, `${data}`);
}
});
cp.on('error', (err) => {
if (logFileExist) {
fs.appendFileSync(`${absolutePath}`, `${JSON.stringify(err)}`);
}
});
cp.on('exit', async (code, signal) => {
this.logger.info(`${''} pid: ${cp.pid} exit ${code} signal ${signal}`);
await SubscriptionModel.update(
{ status: SubscriptionStatus.idle, pid: undefined },
{ where: { id } },
);
resolve();
});
cp.on('close', async (code) => {
this.logger.info(`${''} pid: ${cp.pid} closed ${code}`);
await SubscriptionModel.update(
{ status: SubscriptionStatus.idle, pid: undefined },
{ where: { id } },
);
resolve();
});
});
}
public async disabled(ids: number[]) {
await SubscriptionModel.update({ isDisabled: 1 }, { where: { id: ids } });
}
public async enabled(ids: number[]) {
await SubscriptionModel.update({ isDisabled: 0 }, { where: { id: ids } });
}
public async log(id: number) {
const doc = await this.getDb({ id });
if (!doc) {
return '';
}
const absolutePath = path.resolve(config.logPath, `${doc.log_path}`);
const logFileExist = doc.log_path && (await fileExist(absolutePath));
if (logFileExist) {
return getFileContentByName(`${absolutePath}`);
}
}
public async logs(id: number) {
const doc = await this.getDb({ id });
if (!doc) {
return [];
}
if (doc.log_path) {
const relativeDir = path.dirname(`${doc.log_path}`);
const dir = path.resolve(config.logPath, relativeDir);
if (existsSync(dir)) {
let files = await promises.readdir(dir);
return files
.map((x) => ({
filename: x,
directory: relativeDir.replace(config.logPath, ''),
time: fs.statSync(`${dir}/${x}`).mtime.getTime(),
}))
.sort((a, b) => b.time - a.time);
}
}
}
}