mirror of
https://github.com/whyour/qinglong.git
synced 2026-07-01 04:40:38 +08:00
依赖管理支持取消安装和状态筛选
This commit is contained in:
@@ -134,4 +134,20 @@ export default (app: Router) => {
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
route.put(
|
||||
'/cancel',
|
||||
celebrate({
|
||||
body: Joi.array().items(Joi.number().required()),
|
||||
}),
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const dependenceService = Container.get(DependenceService);
|
||||
await dependenceService.cancel(req.body);
|
||||
return res.send({ code: 200 });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
+2
-2
@@ -435,8 +435,8 @@ export async function killTask(pid: number) {
|
||||
}
|
||||
}
|
||||
|
||||
export async function getPid(name: string) {
|
||||
const taskCommand = `ps -eo pid,command | grep "${name}" | grep -v grep | awk '{print $1}' | head -1 | xargs echo -n`;
|
||||
export async function getPid(cmd: string) {
|
||||
const taskCommand = `ps -eo pid,command | grep "${cmd}" | grep -v grep | awk '{print $1}' | head -1 | xargs echo -n`;
|
||||
const pid = await promiseExec(taskCommand);
|
||||
return pid ? Number(pid) : undefined;
|
||||
}
|
||||
|
||||
@@ -14,7 +14,12 @@ import {
|
||||
import { spawn } from 'cross-spawn';
|
||||
import SockService from './sock';
|
||||
import { FindOptions, Op } from 'sequelize';
|
||||
import { fileExist, promiseExecSuccess } from '../config/util';
|
||||
import {
|
||||
fileExist,
|
||||
getPid,
|
||||
killTask,
|
||||
promiseExecSuccess,
|
||||
} from '../config/util';
|
||||
import dayjs from 'dayjs';
|
||||
import taskLimit from '../shared/pLimit';
|
||||
|
||||
@@ -86,11 +91,21 @@ export default class DependenceService {
|
||||
}
|
||||
|
||||
public async dependencies(
|
||||
{ searchValue, type }: { searchValue: string; type: string },
|
||||
sort: any = { position: -1 },
|
||||
{
|
||||
searchValue,
|
||||
type,
|
||||
status,
|
||||
}: { searchValue: string; type: string; status: string },
|
||||
sort: any = [],
|
||||
query: any = {},
|
||||
): Promise<Dependence[]> {
|
||||
let condition = { ...query, type: DependenceTypes[type as any] };
|
||||
let condition = {
|
||||
...query,
|
||||
type: DependenceTypes[type as any],
|
||||
};
|
||||
if (status) {
|
||||
condition.status = status.split(',').map(Number);
|
||||
}
|
||||
if (searchValue) {
|
||||
const encodeText = encodeURI(searchValue);
|
||||
const reg = {
|
||||
@@ -106,7 +121,7 @@ export default class DependenceService {
|
||||
};
|
||||
}
|
||||
try {
|
||||
const result = await this.find(condition);
|
||||
const result = await this.find(condition, sort);
|
||||
return result as any;
|
||||
} catch (error) {
|
||||
throw error;
|
||||
@@ -134,6 +149,18 @@ export default class DependenceService {
|
||||
return docs;
|
||||
}
|
||||
|
||||
public async cancel(ids: number[]) {
|
||||
const docs = await DependenceModel.findAll({ where: { id: ids } });
|
||||
for (const doc of docs) {
|
||||
taskLimit.removeQueuedDependency(doc);
|
||||
const depRunCommand = InstallDependenceCommandTypes[doc.type];
|
||||
const cmd = `${depRunCommand} ${doc.name.trim()}`;
|
||||
const pid = await getPid(cmd);
|
||||
pid && (await killTask(pid));
|
||||
}
|
||||
await this.removeDb(ids);
|
||||
}
|
||||
|
||||
private async find(query: any, sort: any = []): Promise<Dependence[]> {
|
||||
const docs = await DependenceModel.findAll({
|
||||
where: { ...query },
|
||||
@@ -168,8 +195,14 @@ export default class DependenceService {
|
||||
isInstall: boolean = true,
|
||||
force: boolean = false,
|
||||
) {
|
||||
return taskLimit.runOneByOne(() => {
|
||||
return taskLimit.runDependeny(dependency, () => {
|
||||
return new Promise(async (resolve) => {
|
||||
if (taskLimit.firstDependencyId !== dependency.id) {
|
||||
return resolve(null);
|
||||
}
|
||||
|
||||
taskLimit.removeQueuedDependency(dependency);
|
||||
|
||||
const depIds = [dependency.id!];
|
||||
const status = isInstall
|
||||
? DependenceStatus.installing
|
||||
|
||||
+46
-20
@@ -2,11 +2,19 @@ import PQueue, { QueueAddOptions } from 'p-queue-cjs';
|
||||
import os from 'os';
|
||||
import { AuthDataType, SystemModel } from '../data/system';
|
||||
import Logger from '../loaders/logger';
|
||||
import { Dependence } from '../data/dependence';
|
||||
|
||||
interface IDependencyFn<T> {
|
||||
(): Promise<T>;
|
||||
dependency?: Dependence;
|
||||
}
|
||||
class TaskLimit {
|
||||
private oneLimit = new PQueue({ concurrency: 1 });
|
||||
private dependenyLimit = new PQueue({ concurrency: 1 });
|
||||
private queuedDependencyIds = new Set<number>([]);
|
||||
private updateLogLimit = new PQueue({ concurrency: 1 });
|
||||
private cronLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4) });
|
||||
private cronLimit = new PQueue({
|
||||
concurrency: Math.max(os.cpus().length, 4),
|
||||
});
|
||||
|
||||
get cronLimitActiveCount() {
|
||||
return this.cronLimit.pending;
|
||||
@@ -16,6 +24,10 @@ class TaskLimit {
|
||||
return this.cronLimit.size;
|
||||
}
|
||||
|
||||
get firstDependencyId() {
|
||||
return [...this.queuedDependencyIds.values()][0];
|
||||
}
|
||||
|
||||
constructor() {
|
||||
this.setCustomLimit();
|
||||
this.handleEvents();
|
||||
@@ -26,21 +38,19 @@ class TaskLimit {
|
||||
Logger.info(
|
||||
`[schedule][任务加入队列] 运行中任务数: ${this.cronLimitActiveCount}, 等待中任务数: ${this.cronLimitPendingCount}`,
|
||||
);
|
||||
})
|
||||
});
|
||||
this.cronLimit.on('active', () => {
|
||||
Logger.info(
|
||||
`[schedule][开始处理任务] 运行中任务数: ${this.cronLimitActiveCount + 1}, 等待中任务数: ${this.cronLimitPendingCount}`,
|
||||
);
|
||||
})
|
||||
this.cronLimit.on('completed', (param) => {
|
||||
Logger.info(
|
||||
`[schedule][任务处理成功] 参数 ${JSON.stringify(param)}`,
|
||||
`[schedule][开始处理任务] 运行中任务数: ${
|
||||
this.cronLimitActiveCount + 1
|
||||
}, 等待中任务数: ${this.cronLimitPendingCount}`,
|
||||
);
|
||||
});
|
||||
this.cronLimit.on('error', error => {
|
||||
Logger.error(
|
||||
`[schedule][任务处理错误] 参数 ${JSON.stringify(error)}`,
|
||||
);
|
||||
this.cronLimit.on('completed', (param) => {
|
||||
Logger.info(`[schedule][任务处理成功] 参数 ${JSON.stringify(param)}`);
|
||||
});
|
||||
this.cronLimit.on('error', (error) => {
|
||||
Logger.error(`[schedule][任务处理错误] 参数 ${JSON.stringify(error)}`);
|
||||
});
|
||||
this.cronLimit.on('next', () => {
|
||||
Logger.info(
|
||||
@@ -48,12 +58,16 @@ class TaskLimit {
|
||||
);
|
||||
});
|
||||
this.cronLimit.on('idle', () => {
|
||||
Logger.info(
|
||||
`[schedule][任务队列] 空闲中...`,
|
||||
);
|
||||
Logger.info(`[schedule][任务队列] 空闲中...`);
|
||||
});
|
||||
}
|
||||
|
||||
public removeQueuedDependency(dependency: Dependence) {
|
||||
if (this.queuedDependencyIds.has(dependency.id!)) {
|
||||
this.queuedDependencyIds.delete(dependency.id!);
|
||||
}
|
||||
}
|
||||
|
||||
public async setCustomLimit(limit?: number) {
|
||||
if (limit) {
|
||||
this.cronLimit.concurrency = limit;
|
||||
@@ -68,15 +82,27 @@ class TaskLimit {
|
||||
}
|
||||
}
|
||||
|
||||
public async runWithCronLimit<T>(fn: () => Promise<T>, options?: Partial<QueueAddOptions>): Promise<T | void> {
|
||||
public async runWithCronLimit<T>(
|
||||
fn: () => Promise<T>,
|
||||
options?: Partial<QueueAddOptions>,
|
||||
): Promise<T | void> {
|
||||
return this.cronLimit.add(fn, options);
|
||||
}
|
||||
|
||||
public runOneByOne<T>(fn: () => Promise<T>, options?: Partial<QueueAddOptions>): Promise<T | void> {
|
||||
return this.oneLimit.add(fn, options);
|
||||
public runDependeny<T>(
|
||||
dependency: Dependence,
|
||||
fn: IDependencyFn<T>,
|
||||
options?: Partial<QueueAddOptions>,
|
||||
): Promise<T | void> {
|
||||
this.queuedDependencyIds.add(dependency.id!);
|
||||
fn.dependency = dependency;
|
||||
return this.dependenyLimit.add(fn, options);
|
||||
}
|
||||
|
||||
public updateDepLog<T>(fn: () => Promise<T>, options?: Partial<QueueAddOptions>): Promise<T | void> {
|
||||
public updateDepLog<T>(
|
||||
fn: () => Promise<T>,
|
||||
options?: Partial<QueueAddOptions>,
|
||||
): Promise<T | void> {
|
||||
return this.updateLogLimit.add(fn, options);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user