重构六位定时任务服务

This commit is contained in:
whyour
2023-04-08 17:07:06 +08:00
parent 59c26d90d3
commit 6fb39ce835
13 changed files with 854 additions and 68 deletions
+22
View File
@@ -0,0 +1,22 @@
syntax = "proto3";
package com.ql.cron;
service CronService {
rpc addCron(AddCronRequest) returns (AddCronResponse);
rpc delCron(DeleteCronRequest) returns (DeleteCronResponse);
}
message Cron {
string id = 1;
string schedule = 2;
string command = 3;
}
message AddCronRequest { repeated Cron crons = 1; }
message AddCronResponse {}
message DeleteCronRequest { repeated string ids = 1; }
message DeleteCronResponse {}
+482
View File
@@ -0,0 +1,482 @@
/* eslint-disable */
import {
CallOptions,
ChannelCredentials,
Client,
ClientOptions,
ClientUnaryCall,
handleUnaryCall,
makeGenericClientConstructor,
Metadata,
ServiceError,
UntypedServiceImplementation,
} from '@grpc/grpc-js';
import _m0 from 'protobufjs/minimal';
export const protobufPackage = 'com.ql.cron';
export interface Cron {
id: string;
schedule: string;
command: string;
}
export interface AddCronRequest {
crons: Cron[];
}
export interface AddCronResponse {}
export interface DeleteCronRequest {
ids: string[];
}
export interface DeleteCronResponse {}
function createBaseCron(): Cron {
return { id: '', schedule: '', command: '' };
}
export const Cron = {
encode(message: Cron, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.id !== '') {
writer.uint32(10).string(message.id);
}
if (message.schedule !== '') {
writer.uint32(18).string(message.schedule);
}
if (message.command !== '') {
writer.uint32(26).string(message.command);
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): Cron {
const reader =
input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseCron();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag != 10) {
break;
}
message.id = reader.string();
continue;
case 2:
if (tag != 18) {
break;
}
message.schedule = reader.string();
continue;
case 3:
if (tag != 26) {
break;
}
message.command = reader.string();
continue;
}
if ((tag & 7) == 4 || tag == 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
fromJSON(object: any): Cron {
return {
id: isSet(object.id) ? String(object.id) : '',
schedule: isSet(object.schedule) ? String(object.schedule) : '',
command: isSet(object.command) ? String(object.command) : '',
};
},
toJSON(message: Cron): unknown {
const obj: any = {};
message.id !== undefined && (obj.id = message.id);
message.schedule !== undefined && (obj.schedule = message.schedule);
message.command !== undefined && (obj.command = message.command);
return obj;
},
create<I extends Exact<DeepPartial<Cron>, I>>(base?: I): Cron {
return Cron.fromPartial(base ?? {});
},
fromPartial<I extends Exact<DeepPartial<Cron>, I>>(object: I): Cron {
const message = createBaseCron();
message.id = object.id ?? '';
message.schedule = object.schedule ?? '';
message.command = object.command ?? '';
return message;
},
};
function createBaseAddCronRequest(): AddCronRequest {
return { crons: [] };
}
export const AddCronRequest = {
encode(
message: AddCronRequest,
writer: _m0.Writer = _m0.Writer.create(),
): _m0.Writer {
for (const v of message.crons) {
Cron.encode(v!, writer.uint32(10).fork()).ldelim();
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): AddCronRequest {
const reader =
input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseAddCronRequest();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag != 10) {
break;
}
message.crons.push(Cron.decode(reader, reader.uint32()));
continue;
}
if ((tag & 7) == 4 || tag == 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
fromJSON(object: any): AddCronRequest {
return {
crons: Array.isArray(object?.crons)
? object.crons.map((e: any) => Cron.fromJSON(e))
: [],
};
},
toJSON(message: AddCronRequest): unknown {
const obj: any = {};
if (message.crons) {
obj.crons = message.crons.map((e) => (e ? Cron.toJSON(e) : undefined));
} else {
obj.crons = [];
}
return obj;
},
create<I extends Exact<DeepPartial<AddCronRequest>, I>>(
base?: I,
): AddCronRequest {
return AddCronRequest.fromPartial(base ?? {});
},
fromPartial<I extends Exact<DeepPartial<AddCronRequest>, I>>(
object: I,
): AddCronRequest {
const message = createBaseAddCronRequest();
message.crons = object.crons?.map((e) => Cron.fromPartial(e)) || [];
return message;
},
};
function createBaseAddCronResponse(): AddCronResponse {
return {};
}
export const AddCronResponse = {
encode(
_: AddCronResponse,
writer: _m0.Writer = _m0.Writer.create(),
): _m0.Writer {
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): AddCronResponse {
const reader =
input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseAddCronResponse();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
}
if ((tag & 7) == 4 || tag == 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
fromJSON(_: any): AddCronResponse {
return {};
},
toJSON(_: AddCronResponse): unknown {
const obj: any = {};
return obj;
},
create<I extends Exact<DeepPartial<AddCronResponse>, I>>(
base?: I,
): AddCronResponse {
return AddCronResponse.fromPartial(base ?? {});
},
fromPartial<I extends Exact<DeepPartial<AddCronResponse>, I>>(
_: I,
): AddCronResponse {
const message = createBaseAddCronResponse();
return message;
},
};
function createBaseDeleteCronRequest(): DeleteCronRequest {
return { ids: [] };
}
export const DeleteCronRequest = {
encode(
message: DeleteCronRequest,
writer: _m0.Writer = _m0.Writer.create(),
): _m0.Writer {
for (const v of message.ids) {
writer.uint32(10).string(v!);
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): DeleteCronRequest {
const reader =
input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseDeleteCronRequest();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag != 10) {
break;
}
message.ids.push(reader.string());
continue;
}
if ((tag & 7) == 4 || tag == 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
fromJSON(object: any): DeleteCronRequest {
return {
ids: Array.isArray(object?.ids)
? object.ids.map((e: any) => String(e))
: [],
};
},
toJSON(message: DeleteCronRequest): unknown {
const obj: any = {};
if (message.ids) {
obj.ids = message.ids.map((e) => e);
} else {
obj.ids = [];
}
return obj;
},
create<I extends Exact<DeepPartial<DeleteCronRequest>, I>>(
base?: I,
): DeleteCronRequest {
return DeleteCronRequest.fromPartial(base ?? {});
},
fromPartial<I extends Exact<DeepPartial<DeleteCronRequest>, I>>(
object: I,
): DeleteCronRequest {
const message = createBaseDeleteCronRequest();
message.ids = object.ids?.map((e) => e) || [];
return message;
},
};
function createBaseDeleteCronResponse(): DeleteCronResponse {
return {};
}
export const DeleteCronResponse = {
encode(
_: DeleteCronResponse,
writer: _m0.Writer = _m0.Writer.create(),
): _m0.Writer {
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): DeleteCronResponse {
const reader =
input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseDeleteCronResponse();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
}
if ((tag & 7) == 4 || tag == 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
fromJSON(_: any): DeleteCronResponse {
return {};
},
toJSON(_: DeleteCronResponse): unknown {
const obj: any = {};
return obj;
},
create<I extends Exact<DeepPartial<DeleteCronResponse>, I>>(
base?: I,
): DeleteCronResponse {
return DeleteCronResponse.fromPartial(base ?? {});
},
fromPartial<I extends Exact<DeepPartial<DeleteCronResponse>, I>>(
_: I,
): DeleteCronResponse {
const message = createBaseDeleteCronResponse();
return message;
},
};
export type CronServiceService = typeof CronServiceService;
export const CronServiceService = {
addCron: {
path: '/com.ql.cron.CronService/addCron',
requestStream: false,
responseStream: false,
requestSerialize: (value: AddCronRequest) =>
Buffer.from(AddCronRequest.encode(value).finish()),
requestDeserialize: (value: Buffer) => AddCronRequest.decode(value),
responseSerialize: (value: AddCronResponse) =>
Buffer.from(AddCronResponse.encode(value).finish()),
responseDeserialize: (value: Buffer) => AddCronResponse.decode(value),
},
delCron: {
path: '/com.ql.cron.CronService/delCron',
requestStream: false,
responseStream: false,
requestSerialize: (value: DeleteCronRequest) =>
Buffer.from(DeleteCronRequest.encode(value).finish()),
requestDeserialize: (value: Buffer) => DeleteCronRequest.decode(value),
responseSerialize: (value: DeleteCronResponse) =>
Buffer.from(DeleteCronResponse.encode(value).finish()),
responseDeserialize: (value: Buffer) => DeleteCronResponse.decode(value),
},
} as const;
export interface CronServiceServer extends UntypedServiceImplementation {
addCron: handleUnaryCall<AddCronRequest, AddCronResponse>;
delCron: handleUnaryCall<DeleteCronRequest, DeleteCronResponse>;
}
export interface CronServiceClient extends Client {
addCron(
request: AddCronRequest,
callback: (error: ServiceError | null, response: AddCronResponse) => void,
): ClientUnaryCall;
addCron(
request: AddCronRequest,
metadata: Metadata,
callback: (error: ServiceError | null, response: AddCronResponse) => void,
): ClientUnaryCall;
addCron(
request: AddCronRequest,
metadata: Metadata,
options: Partial<CallOptions>,
callback: (error: ServiceError | null, response: AddCronResponse) => void,
): ClientUnaryCall;
delCron(
request: DeleteCronRequest,
callback: (
error: ServiceError | null,
response: DeleteCronResponse,
) => void,
): ClientUnaryCall;
delCron(
request: DeleteCronRequest,
metadata: Metadata,
callback: (
error: ServiceError | null,
response: DeleteCronResponse,
) => void,
): ClientUnaryCall;
delCron(
request: DeleteCronRequest,
metadata: Metadata,
options: Partial<CallOptions>,
callback: (
error: ServiceError | null,
response: DeleteCronResponse,
) => void,
): ClientUnaryCall;
}
export const CronServiceClient = makeGenericClientConstructor(
CronServiceService,
'com.ql.cron.CronService',
) as unknown as {
new (
address: string,
credentials: ChannelCredentials,
options?: Partial<ClientOptions>,
): CronServiceClient;
service: typeof CronServiceService;
};
type Builtin =
| Date
| Function
| Uint8Array
| string
| number
| boolean
| undefined;
export type DeepPartial<T> = T extends Builtin
? T
: T extends Array<infer U>
? Array<DeepPartial<U>>
: T extends ReadonlyArray<infer U>
? ReadonlyArray<DeepPartial<U>>
: T extends {}
? { [K in keyof T]?: DeepPartial<T[K]> }
: Partial<T>;
type KeysOfUnion<T> = T extends T ? keyof T : never;
export type Exact<P, I extends P> = P extends Builtin
? P
: P & { [K in keyof P]: Exact<P[K], I[K]> } & {
[K in Exclude<keyof I, KeysOfUnion<P>>]: never;
};
function isSet(value: any): boolean {
return value !== null && value !== undefined;
}
-56
View File
@@ -1,56 +0,0 @@
import schedule from 'node-schedule';
import express from 'express';
import { exec } from 'child_process';
import Logger from './loaders/logger';
import { CrontabModel, CrontabStatus } from './data/cron';
import config from './config';
import { QL_PREFIX, TASK_PREFIX } from './config/const';
const app = express();
const run = async () => {
CrontabModel.findAll({ where: {} })
.then((docs) => {
if (docs && docs.length > 0) {
for (let i = 0; i < docs.length; i++) {
const task = docs[i];
const _schedule = task.schedule && task.schedule.split(/ +/);
if (
_schedule &&
_schedule.length > 5 &&
task.status !== CrontabStatus.disabled &&
!task.isDisabled &&
task.schedule
) {
schedule.scheduleJob(task.schedule, function () {
let command = task.command as string;
if (
!command.startsWith(TASK_PREFIX) &&
!command.startsWith(QL_PREFIX)
) {
command = `${TASK_PREFIX}${command}`;
}
exec(`ID=${task.id} ${command}`);
});
}
}
}
})
.catch((err) => {
Logger.error(err);
process.exit(1);
});
};
app
.listen(config.cronPort, async () => {
await require('./loaders/sentry').default({ expressApp: app });
await require('./loaders/db').default();
await run();
Logger.debug('定时任务服务启动成功!');
})
.on('error', (err) => {
Logger.error(err);
process.exit(1);
});
+24
View File
@@ -0,0 +1,24 @@
import { ServerUnaryCall, sendUnaryData } from '@grpc/grpc-js';
import { AddCronRequest, AddCronResponse } from '../protos/cron';
import nodeSchedule from 'node-schedule';
import { scheduleStacks } from './data';
import { exec } from 'child_process';
const addCron = (
call: ServerUnaryCall<AddCronRequest, AddCronResponse>,
callback: sendUnaryData<AddCronResponse>,
) => {
for (const item of call.request.crons) {
const { id, schedule, command } = item;
scheduleStacks.set(
id,
nodeSchedule.scheduleJob(id, schedule, async () => {
exec(`ID=${id} ${command}`);
}),
);
}
callback(null, null);
};
export { addCron };
+40
View File
@@ -0,0 +1,40 @@
import { credentials } from '@grpc/grpc-js';
import {
AddCronRequest,
AddCronResponse,
CronServiceClient,
DeleteCronRequest,
DeleteCronResponse,
} from '../protos/cron';
import config from '../config';
class Client {
private client = new CronServiceClient(
`localhost:${config.cronPort}`,
credentials.createInsecure(),
);
addCron(request: AddCronRequest['crons']): Promise<AddCronResponse> {
return new Promise((resolve, reject) => {
this.client.addCron({ crons: request }, (err, res) => {
if (err) {
reject(err);
}
resolve(res);
});
});
}
delCron(request: DeleteCronRequest['ids']): Promise<DeleteCronResponse> {
return new Promise((resolve, reject) => {
this.client.delCron({ ids: request }, (err, res) => {
if (err) {
reject(err);
}
resolve(res);
});
});
}
}
export default new Client();
+6
View File
@@ -0,0 +1,6 @@
import nodeSchedule from 'node-schedule';
import { ToadScheduler } from 'toad-scheduler';
export const scheduleStacks = new Map<string, nodeSchedule.Job>();
export const intervalSchedule = new ToadScheduler();
+19
View File
@@ -0,0 +1,19 @@
import { ServerUnaryCall, sendUnaryData } from '@grpc/grpc-js';
import { DeleteCronRequest, DeleteCronResponse } from '../protos/cron';
import { scheduleStacks } from './data';
const delCron = (
call: ServerUnaryCall<DeleteCronRequest, DeleteCronResponse>,
callback: sendUnaryData<DeleteCronResponse>,
) => {
for (const id of call.request.ids) {
if (scheduleStacks.has(id)) {
scheduleStacks.get(id)?.cancel();
scheduleStacks.delete(id);
}
}
callback(null, null);
};
export { delCron };
+15
View File
@@ -0,0 +1,15 @@
import { Server, ServerCredentials } from '@grpc/grpc-js';
import { CronServiceService } from '../protos/cron';
import { addCron } from './addCron';
import { delCron } from './delCron';
import config from '../config';
const server = new Server();
server.addService(CronServiceService, { addCron, delCron });
server.bindAsync(
`localhost:${config.cronPort}`,
ServerCredentials.createInsecure(),
() => {
server.start();
},
);
+45 -7
View File
@@ -15,10 +15,11 @@ import { promises, existsSync } from 'fs';
import { Op, where, col as colFn, FindOptions } from 'sequelize';
import path from 'path';
import { TASK_PREFIX, QL_PREFIX } from '../config/const';
import cronClient from '../schedule/client';
@Service()
export default class CronService {
constructor(@Inject('logger') private logger: winston.Logger) { }
constructor(@Inject('logger') private logger: winston.Logger) {}
private isSixCron(cron: Crontab) {
const { schedule } = cron;
@@ -32,6 +33,11 @@ export default class CronService {
const tab = new Crontab(payload);
tab.saved = false;
const doc = await this.insert(tab);
if (this.isSixCron(doc)) {
await cronClient.addCron([
{ id: String(doc.id), schedule: doc.schedule!, command: doc.command },
]);
}
await this.set_crontab();
return doc;
}
@@ -41,10 +47,22 @@ export default class CronService {
}
public async update(payload: Crontab): Promise<Crontab> {
const doc = await this.getDb({ id: payload.id })
const doc = await this.getDb({ id: payload.id });
const tab = new Crontab({ ...doc, ...payload });
tab.saved = false;
const newDoc = await this.updateDb(tab);
if (this.isSixCron(doc)) {
await cronClient.delCron([String(newDoc.id)]);
}
if (this.isSixCron(newDoc)) {
await cronClient.addCron([
{
id: String(newDoc.id),
schedule: newDoc.schedule!,
command: newDoc.command,
},
]);
}
await this.set_crontab();
return newDoc;
}
@@ -84,6 +102,7 @@ export default class CronService {
public async remove(ids: number[]) {
await CrontabModel.destroy({ where: { id: ids } });
await cronClient.delCron(ids.map(String));
await this.set_crontab();
}
@@ -431,11 +450,21 @@ export default class CronService {
public async disabled(ids: number[]) {
await CrontabModel.update({ isDisabled: 1 }, { where: { id: ids } });
await cronClient.delCron(ids.map(String));
await this.set_crontab();
}
public async enabled(ids: number[]) {
await CrontabModel.update({ isDisabled: 0 }, { where: { id: ids } });
const docs = await CrontabModel.findAll({ where: { id: ids } });
const sixCron = docs
.filter((x) => this.isSixCron(x))
.map((doc) => ({
id: String(doc.id),
schedule: doc.schedule!,
command: doc.command,
}));
await cronClient.addCron(sixCron);
await this.set_crontab();
}
@@ -487,8 +516,8 @@ export default class CronService {
return crontab_job_string;
}
private async set_crontab() {
const tabs = await this.crontabs();
private async set_crontab(data?: { data: Crontab[]; total: number }) {
const tabs = data ?? (await this.crontabs());
var crontab_string = '';
tabs.data.forEach((tab) => {
const _schedule = tab.schedule && tab.schedule.split(/ +/);
@@ -510,7 +539,6 @@ export default class CronService {
fs.writeFileSync(config.crontabFile, crontab_string);
execSync(`crontab ${config.crontabFile}`);
exec(`pm2 reload schedule`);
await CrontabModel.update({ saved: true }, { where: {} });
}
@@ -548,7 +576,17 @@ export default class CronService {
});
}
public autosave_crontab() {
return this.set_crontab();
public async autosave_crontab() {
const tabs = await this.crontabs();
this.set_crontab(tabs);
const sixCron = tabs.data
.filter((x) => this.isSixCron(x))
.map((doc) => ({
id: String(doc.id),
schedule: doc.schedule!,
command: doc.command,
}));
await cronClient.addCron(sixCron);
}
}
+4 -1
View File
@@ -138,7 +138,10 @@ export default class ScheduleService {
async cancelCronTask({ id = 0, name }: ScheduleTaskType) {
const _id = this.formatId(id);
this.logger.info('[取消定时任务],任务名:%s', name);
this.scheduleStacks.has(_id) && this.scheduleStacks.get(_id)?.cancel();
if (this.scheduleStacks.has(_id)) {
this.scheduleStacks.get(_id)?.cancel();
this.scheduleStacks.delete(_id);
}
}
async createIntervalTask(