mirror of
				https://github.com/whyour/qinglong.git
				synced 2025-11-04 02:56:07 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			477 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			477 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
import { Service, Inject } from 'typedi';
 | 
						||
import winston from 'winston';
 | 
						||
import DataStore from 'nedb';
 | 
						||
import config from '../config';
 | 
						||
import { Crontab, CrontabStatus } from '../data/cron';
 | 
						||
import { exec, execSync, spawn } from 'child_process';
 | 
						||
import fs from 'fs';
 | 
						||
import cron_parser from 'cron-parser';
 | 
						||
import { getFileContentByName } from '../config/util';
 | 
						||
import PQueue from 'p-queue';
 | 
						||
import { promises, existsSync } from 'fs';
 | 
						||
import { promisify } from 'util';
 | 
						||
import { dbs } from '../loaders/db';
 | 
						||
 | 
						||
@Service()
 | 
						||
export default class CronService {
 | 
						||
  private cronDb = dbs.cronDb;
 | 
						||
 | 
						||
  private queue = new PQueue({
 | 
						||
    concurrency: parseInt(process.env.MaxConcurrentNum as string) || 5,
 | 
						||
  });
 | 
						||
 | 
						||
  constructor(@Inject('logger') private logger: winston.Logger) {}
 | 
						||
 | 
						||
  private isSixCron(cron: Crontab) {
 | 
						||
    const { schedule } = cron;
 | 
						||
    if (schedule.split(/ +/).length === 6) {
 | 
						||
      return true;
 | 
						||
    }
 | 
						||
    return false;
 | 
						||
  }
 | 
						||
 | 
						||
  public async create(payload: Crontab): Promise<Crontab> {
 | 
						||
    const tab = new Crontab(payload);
 | 
						||
    tab.created = new Date().valueOf();
 | 
						||
    tab.saved = false;
 | 
						||
    const doc = await this.insert(tab);
 | 
						||
    await this.set_crontab(this.isSixCron(doc));
 | 
						||
    return doc;
 | 
						||
  }
 | 
						||
 | 
						||
  public async insert(payload: Crontab): Promise<Crontab> {
 | 
						||
    return new Promise((resolve) => {
 | 
						||
      this.cronDb.insert(payload, (err, docs) => {
 | 
						||
        if (err) {
 | 
						||
          this.logger.error(err);
 | 
						||
        } else {
 | 
						||
          resolve(docs);
 | 
						||
        }
 | 
						||
      });
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async update(payload: Crontab): Promise<Crontab> {
 | 
						||
    const { _id, ...other } = payload;
 | 
						||
    const doc = await this.get(_id);
 | 
						||
    const tab = new Crontab({ ...doc, ...other });
 | 
						||
    tab.saved = false;
 | 
						||
    const newDoc = await this.updateDb(tab);
 | 
						||
    await this.set_crontab(this.isSixCron(newDoc));
 | 
						||
    return newDoc;
 | 
						||
  }
 | 
						||
 | 
						||
  public async updateDb(payload: Crontab): Promise<Crontab> {
 | 
						||
    return new Promise((resolve) => {
 | 
						||
      this.cronDb.update(
 | 
						||
        { _id: payload._id },
 | 
						||
        payload,
 | 
						||
        { returnUpdatedDocs: true },
 | 
						||
        (err, num, docs: any) => {
 | 
						||
          if (err) {
 | 
						||
            this.logger.error(err);
 | 
						||
          } else {
 | 
						||
            resolve(docs);
 | 
						||
          }
 | 
						||
        },
 | 
						||
      );
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async status({
 | 
						||
    ids,
 | 
						||
    status,
 | 
						||
    pid,
 | 
						||
    log_path,
 | 
						||
    last_running_time = 0,
 | 
						||
    last_execution_time = 0,
 | 
						||
  }: {
 | 
						||
    ids: string[];
 | 
						||
    status: CrontabStatus;
 | 
						||
    pid: number;
 | 
						||
    log_path: string;
 | 
						||
    last_running_time: number;
 | 
						||
    last_execution_time: number;
 | 
						||
  }) {
 | 
						||
    const options: any = {
 | 
						||
      status,
 | 
						||
      pid,
 | 
						||
      log_path,
 | 
						||
      last_execution_time,
 | 
						||
    };
 | 
						||
    if (last_running_time > 0) {
 | 
						||
      options.last_running_time = last_running_time;
 | 
						||
    }
 | 
						||
 | 
						||
    return new Promise((resolve) => {
 | 
						||
      this.cronDb.update(
 | 
						||
        { _id: { $in: ids } },
 | 
						||
        {
 | 
						||
          $set: options,
 | 
						||
        },
 | 
						||
        { multi: true, returnUpdatedDocs: true },
 | 
						||
        (err) => {
 | 
						||
          resolve(null);
 | 
						||
        },
 | 
						||
      );
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async remove(ids: string[]) {
 | 
						||
    return new Promise((resolve: any) => {
 | 
						||
      this.cronDb.remove(
 | 
						||
        { _id: { $in: ids } },
 | 
						||
        { multi: true },
 | 
						||
        async (err) => {
 | 
						||
          await this.set_crontab(true);
 | 
						||
          resolve();
 | 
						||
        },
 | 
						||
      );
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async pin(ids: string[]) {
 | 
						||
    return new Promise((resolve: any) => {
 | 
						||
      this.cronDb.update(
 | 
						||
        { _id: { $in: ids } },
 | 
						||
        { $set: { isPinned: 1 } },
 | 
						||
        { multi: true },
 | 
						||
        async (err) => {
 | 
						||
          resolve();
 | 
						||
        },
 | 
						||
      );
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async unPin(ids: string[]) {
 | 
						||
    return new Promise((resolve: any) => {
 | 
						||
      this.cronDb.update(
 | 
						||
        { _id: { $in: ids } },
 | 
						||
        { $set: { isPinned: 0 } },
 | 
						||
        { multi: true },
 | 
						||
        async (err) => {
 | 
						||
          resolve();
 | 
						||
        },
 | 
						||
      );
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async crontabs(searchText?: string): Promise<Crontab[]> {
 | 
						||
    let query = {};
 | 
						||
    if (searchText) {
 | 
						||
      const encodeText = encodeURIComponent(searchText);
 | 
						||
      const reg = new RegExp(`${searchText}|${encodeText}`, 'i');
 | 
						||
 | 
						||
      query = {
 | 
						||
        $or: [
 | 
						||
          {
 | 
						||
            name: reg,
 | 
						||
          },
 | 
						||
          {
 | 
						||
            command: reg,
 | 
						||
          },
 | 
						||
          {
 | 
						||
            schedule: reg,
 | 
						||
          },
 | 
						||
        ],
 | 
						||
      };
 | 
						||
    }
 | 
						||
    return new Promise((resolve) => {
 | 
						||
      this.cronDb
 | 
						||
        .find(query)
 | 
						||
        .sort({ created: -1 })
 | 
						||
        .exec((err, docs) => {
 | 
						||
          resolve(docs);
 | 
						||
        });
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async get(_id: string): Promise<Crontab> {
 | 
						||
    return new Promise((resolve) => {
 | 
						||
      this.cronDb.find({ _id }).exec((err, docs) => {
 | 
						||
        resolve(docs[0]);
 | 
						||
      });
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async run(ids: string[]) {
 | 
						||
    this.cronDb.update(
 | 
						||
      { _id: { $in: ids } },
 | 
						||
      { $set: { status: CrontabStatus.queued } },
 | 
						||
      { multi: true },
 | 
						||
    );
 | 
						||
    for (let i = 0; i < ids.length; i++) {
 | 
						||
      const id = ids[i];
 | 
						||
      this.queue.add(() => this.runSingle(id));
 | 
						||
    }
 | 
						||
  }
 | 
						||
 | 
						||
  public async stop(ids: string[]) {
 | 
						||
    return new Promise((resolve: any) => {
 | 
						||
      this.cronDb
 | 
						||
        .find({ _id: { $in: ids } })
 | 
						||
        .exec(async (err, docs: Crontab[]) => {
 | 
						||
          for (const doc of docs) {
 | 
						||
            if (doc.pid) {
 | 
						||
              try {
 | 
						||
                process.kill(-doc.pid);
 | 
						||
              } catch (error) {
 | 
						||
                this.logger.silly(error);
 | 
						||
              }
 | 
						||
            }
 | 
						||
            const err = await this.killTask(doc.command);
 | 
						||
            if (doc.log_path) {
 | 
						||
              const str = err ? `\n${err}` : '';
 | 
						||
              fs.appendFileSync(
 | 
						||
                `${doc.log_path}`,
 | 
						||
                `${str}\n## 执行结束...  ${new Date()
 | 
						||
                  .toLocaleString('zh', { hour12: false })
 | 
						||
                  .replace(' 24:', ' 00:')} `,
 | 
						||
              );
 | 
						||
            }
 | 
						||
          }
 | 
						||
          this.cronDb.update(
 | 
						||
            { _id: { $in: ids } },
 | 
						||
            { $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
 | 
						||
            { multi: true },
 | 
						||
          );
 | 
						||
          this.queue.clear();
 | 
						||
          resolve();
 | 
						||
        });
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  private async killTask(name: string) {
 | 
						||
    let taskCommand = `ps -ef | grep "${name}" | grep -v grep | awk '{print $1}'`;
 | 
						||
    const execAsync = promisify(exec);
 | 
						||
    try {
 | 
						||
      let pid = (await execAsync(taskCommand)).stdout;
 | 
						||
      if (pid) {
 | 
						||
        pid = (await execAsync(`pstree -p ${pid}`)).stdout;
 | 
						||
      } else {
 | 
						||
        return;
 | 
						||
      }
 | 
						||
      let pids = pid.match(/\(\d+/g);
 | 
						||
      const killLogs = [];
 | 
						||
      if (pids && pids.length > 0) {
 | 
						||
        // node 执行脚本时还会有10个子进程,但是ps -ef中不存在,所以截取前三个
 | 
						||
        pids = pids.slice(0, 3);
 | 
						||
        for (const id of pids) {
 | 
						||
          const c = `kill -9 ${id.slice(1)}`;
 | 
						||
          const { stdout, stderr } = await execAsync(c);
 | 
						||
          if (stderr) {
 | 
						||
            killLogs.push(stderr);
 | 
						||
          }
 | 
						||
          if (stdout) {
 | 
						||
            killLogs.push(stdout);
 | 
						||
          }
 | 
						||
        }
 | 
						||
      }
 | 
						||
      return killLogs.length > 0 ? JSON.stringify(killLogs) : '';
 | 
						||
    } catch (e) {
 | 
						||
      return JSON.stringify(e);
 | 
						||
    }
 | 
						||
  }
 | 
						||
 | 
						||
  private async runSingle(id: string): Promise<number> {
 | 
						||
    return new Promise(async (resolve: any) => {
 | 
						||
      const cron = await this.get(id);
 | 
						||
      if (cron.status !== CrontabStatus.queued) {
 | 
						||
        resolve();
 | 
						||
        return;
 | 
						||
      }
 | 
						||
 | 
						||
      let { _id, command, log_path } = cron;
 | 
						||
 | 
						||
      this.logger.silly('Running job');
 | 
						||
      this.logger.silly('ID: ' + _id);
 | 
						||
      this.logger.silly('Original command: ' + command);
 | 
						||
 | 
						||
      let cmdStr = command;
 | 
						||
      if (!cmdStr.includes('task ') && !cmdStr.includes('ql ')) {
 | 
						||
        cmdStr = `task ${cmdStr}`;
 | 
						||
      }
 | 
						||
      if (cmdStr.endsWith('.js')) {
 | 
						||
        cmdStr = `${cmdStr} now`;
 | 
						||
      }
 | 
						||
 | 
						||
      const cp = spawn(cmdStr, { shell: '/bin/bash' });
 | 
						||
      this.cronDb.update(
 | 
						||
        { _id },
 | 
						||
        { $set: { status: CrontabStatus.running, pid: cp.pid } },
 | 
						||
      );
 | 
						||
      cp.stderr.on('data', (data) => {
 | 
						||
        if (log_path) {
 | 
						||
          fs.appendFileSync(`${log_path}`, `${data}`);
 | 
						||
        }
 | 
						||
      });
 | 
						||
      cp.on('error', (err) => {
 | 
						||
        if (log_path) {
 | 
						||
          fs.appendFileSync(`${log_path}`, `${JSON.stringify(err)}`);
 | 
						||
        }
 | 
						||
      });
 | 
						||
 | 
						||
      cp.on('exit', (code, signal) => {
 | 
						||
        this.logger.info(
 | 
						||
          `${command} pid: ${cp.pid} exit ${code} signal ${signal}`,
 | 
						||
        );
 | 
						||
        this.cronDb.update(
 | 
						||
          { _id },
 | 
						||
          { $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
 | 
						||
        );
 | 
						||
        resolve();
 | 
						||
      });
 | 
						||
      cp.on('close', (code) => {
 | 
						||
        this.logger.info(`${command} pid: ${cp.pid} closed ${code}`);
 | 
						||
        this.cronDb.update(
 | 
						||
          { _id },
 | 
						||
          { $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
 | 
						||
        );
 | 
						||
        resolve();
 | 
						||
      });
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async disabled(ids: string[]) {
 | 
						||
    return new Promise((resolve: any) => {
 | 
						||
      this.cronDb.update(
 | 
						||
        { _id: { $in: ids } },
 | 
						||
        { $set: { isDisabled: 1 } },
 | 
						||
        { multi: true },
 | 
						||
        async (err) => {
 | 
						||
          await this.set_crontab(true);
 | 
						||
          resolve();
 | 
						||
        },
 | 
						||
      );
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async enabled(ids: string[]) {
 | 
						||
    return new Promise((resolve: any) => {
 | 
						||
      this.cronDb.update(
 | 
						||
        { _id: { $in: ids } },
 | 
						||
        { $set: { isDisabled: 0 } },
 | 
						||
        { multi: true },
 | 
						||
        async (err) => {
 | 
						||
          await this.set_crontab(true);
 | 
						||
          resolve();
 | 
						||
        },
 | 
						||
      );
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public async log(_id: string) {
 | 
						||
    const doc = await this.get(_id);
 | 
						||
    if (!doc) {
 | 
						||
      return '';
 | 
						||
    }
 | 
						||
 | 
						||
    if (doc.log_path) {
 | 
						||
      return getFileContentByName(`${doc.log_path}`);
 | 
						||
    }
 | 
						||
    const [, commandStr, url] = doc.command.split(/ +/);
 | 
						||
    let logPath = this.getKey(commandStr);
 | 
						||
    const isQlCommand = doc.command.startsWith('ql ');
 | 
						||
    const key =
 | 
						||
      (url && ['repo', 'raw'].includes(commandStr) && this.getKey(url)) ||
 | 
						||
      logPath;
 | 
						||
    if (isQlCommand) {
 | 
						||
      logPath = 'update';
 | 
						||
    }
 | 
						||
    let logDir = `${config.logPath}${logPath}`;
 | 
						||
    if (existsSync(logDir)) {
 | 
						||
      let files = await promises.readdir(logDir);
 | 
						||
      if (isQlCommand) {
 | 
						||
        files = files.filter((x) => x.includes(key));
 | 
						||
      }
 | 
						||
      return getFileContentByName(`${logDir}/${files[files.length - 1]}`);
 | 
						||
    } else {
 | 
						||
      return '';
 | 
						||
    }
 | 
						||
  }
 | 
						||
 | 
						||
  private getKey(command: string) {
 | 
						||
    const start =
 | 
						||
      command.lastIndexOf('/') !== -1 ? command.lastIndexOf('/') + 1 : 0;
 | 
						||
    const end =
 | 
						||
      command.lastIndexOf('.') !== -1
 | 
						||
        ? command.lastIndexOf('.')
 | 
						||
        : command.length;
 | 
						||
    return command.substring(start, end);
 | 
						||
  }
 | 
						||
 | 
						||
  private make_command(tab: Crontab) {
 | 
						||
    const crontab_job_string = `ID=${tab._id} ${tab.command}`;
 | 
						||
    return crontab_job_string;
 | 
						||
  }
 | 
						||
 | 
						||
  private async set_crontab(needReloadSchedule: boolean = false) {
 | 
						||
    const tabs = await this.crontabs();
 | 
						||
    var crontab_string = '';
 | 
						||
    tabs.forEach((tab) => {
 | 
						||
      const _schedule = tab.schedule && tab.schedule.split(/ +/);
 | 
						||
      if (tab.isDisabled === 1 || _schedule.length !== 5) {
 | 
						||
        crontab_string += '# ';
 | 
						||
        crontab_string += tab.schedule;
 | 
						||
        crontab_string += ' ';
 | 
						||
        crontab_string += this.make_command(tab);
 | 
						||
        crontab_string += '\n';
 | 
						||
      } else {
 | 
						||
        crontab_string += tab.schedule;
 | 
						||
        crontab_string += ' ';
 | 
						||
        crontab_string += this.make_command(tab);
 | 
						||
        crontab_string += '\n';
 | 
						||
      }
 | 
						||
    });
 | 
						||
 | 
						||
    this.logger.silly(crontab_string);
 | 
						||
    fs.writeFileSync(config.crontabFile, crontab_string);
 | 
						||
 | 
						||
    execSync(`crontab ${config.crontabFile}`);
 | 
						||
    if (needReloadSchedule) {
 | 
						||
      exec(`pm2 reload schedule`);
 | 
						||
    }
 | 
						||
    this.cronDb.update({}, { $set: { saved: true } }, { multi: true });
 | 
						||
  }
 | 
						||
 | 
						||
  public import_crontab() {
 | 
						||
    exec('crontab -l', (error, stdout, stderr) => {
 | 
						||
      var lines = stdout.split('\n');
 | 
						||
      var namePrefix = new Date().getTime();
 | 
						||
 | 
						||
      lines.reverse().forEach((line, index) => {
 | 
						||
        line = line.replace(/\t+/g, ' ');
 | 
						||
        var regex =
 | 
						||
          /^((\@[a-zA-Z]+\s+)|(([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+))/;
 | 
						||
        var command = line.replace(regex, '').trim();
 | 
						||
        var schedule = line.replace(command, '').trim();
 | 
						||
 | 
						||
        if (
 | 
						||
          command &&
 | 
						||
          schedule &&
 | 
						||
          cron_parser.parseExpression(schedule).hasNext()
 | 
						||
        ) {
 | 
						||
          var name = namePrefix + '_' + index;
 | 
						||
 | 
						||
          this.cronDb.findOne({ command, schedule }, (err, doc) => {
 | 
						||
            if (err) {
 | 
						||
              throw err;
 | 
						||
            }
 | 
						||
            if (!doc) {
 | 
						||
              this.create({ name, command, schedule });
 | 
						||
            } else {
 | 
						||
              doc.command = command;
 | 
						||
              doc.schedule = schedule;
 | 
						||
              this.update(doc);
 | 
						||
            }
 | 
						||
          });
 | 
						||
        }
 | 
						||
      });
 | 
						||
    });
 | 
						||
  }
 | 
						||
 | 
						||
  public autosave_crontab() {
 | 
						||
    return this.set_crontab();
 | 
						||
  }
 | 
						||
}
 |