Optimize log file writes using stream pooling (#2835)

* Initial plan

* Implement LogStreamManager for optimized log writing

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

* Fix error handler in LogStreamManager to avoid race conditions

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
This commit is contained in:
Copilot 2025-11-16 21:11:10 +08:00 committed by GitHub
parent fbeb4f4a6c
commit 08ef509e27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 132 additions and 12 deletions

View File

@ -14,6 +14,7 @@ import {
} from '../config/util'; } from '../config/util';
import dayjs from 'dayjs'; import dayjs from 'dayjs';
import multer from 'multer'; import multer from 'multer';
import { logStreamManager } from '../shared/logStreamManager';
const route = Router(); const route = Router();
const storage = multer.diskStorage({ const storage = multer.diskStorage({
@ -276,17 +277,19 @@ export default (app: Router) => {
res.setHeader('QL-Task-Log', `${logPath}`); res.setHeader('QL-Task-Log', `${logPath}`);
}, },
onEnd: async (cp, endTime, diff) => { onEnd: async (cp, endTime, diff) => {
// Close the stream after task completion
await logStreamManager.closeStream(await handleLogPath(logPath));
res.end(); res.end();
}, },
onError: async (message: string) => { onError: async (message: string) => {
res.write(message); res.write(message);
const absolutePath = await handleLogPath(logPath); const absolutePath = await handleLogPath(logPath);
await fs.appendFile(absolutePath, message); await logStreamManager.write(absolutePath, message);
}, },
onLog: async (message: string) => { onLog: async (message: string) => {
res.write(message); res.write(message);
const absolutePath = await handleLogPath(logPath); const absolutePath = await handleLogPath(logPath);
await fs.appendFile(absolutePath, message); await logStreamManager.write(absolutePath, message);
}, },
}, },
); );

View File

@ -24,6 +24,7 @@ import pickBy from 'lodash/pickBy';
import omit from 'lodash/omit'; import omit from 'lodash/omit';
import { writeFileWithLock } from '../shared/utils'; import { writeFileWithLock } from '../shared/utils';
import { ScheduleType } from '../interface/schedule'; import { ScheduleType } from '../interface/schedule';
import { logStreamManager } from '../shared/logStreamManager';
@Service() @Service()
export default class CronService { export default class CronService {
@ -516,7 +517,7 @@ export default class CronService {
{ where: { id } }, { where: { id } },
); );
cp.stdout.on('data', async (data) => { cp.stdout.on('data', async (data) => {
await fs.appendFile(absolutePath, data.toString()); await logStreamManager.write(absolutePath, data.toString());
}); });
cp.stderr.on('data', async (data) => { cp.stderr.on('data', async (data) => {
this.logger.info( this.logger.info(
@ -524,7 +525,7 @@ export default class CronService {
command, command,
data.toString(), data.toString(),
); );
await fs.appendFile(absolutePath, data.toString()); await logStreamManager.write(absolutePath, data.toString());
}); });
cp.on('error', async (err) => { cp.on('error', async (err) => {
this.logger.error( this.logger.error(
@ -532,7 +533,7 @@ export default class CronService {
command, command,
err, err,
); );
await fs.appendFile(absolutePath, JSON.stringify(err)); await logStreamManager.write(absolutePath, JSON.stringify(err));
}); });
cp.on('exit', async (code) => { cp.on('exit', async (code) => {
@ -541,6 +542,8 @@ export default class CronService {
JSON.stringify(params), JSON.stringify(params),
code, code,
); );
// Close the stream after task completion
await logStreamManager.closeStream(absolutePath);
await CrontabModel.update( await CrontabModel.update(
{ status: CrontabStatus.idle, pid: undefined }, { status: CrontabStatus.idle, pid: undefined },
{ where: { id } }, { where: { id } },

View File

@ -31,6 +31,7 @@ import { formatCommand, formatUrl } from '../config/subscription';
import { CrontabModel } from '../data/cron'; import { CrontabModel } from '../data/cron';
import CrontabService from './cron'; import CrontabService from './cron';
import taskLimit from '../shared/pLimit'; import taskLimit from '../shared/pLimit';
import { logStreamManager } from '../shared/logStreamManager';
@Service() @Service()
export default class SubscriptionService { export default class SubscriptionService {
@ -136,7 +137,7 @@ export default class SubscriptionService {
let beforeStr = ''; let beforeStr = '';
try { try {
if (doc.sub_before) { if (doc.sub_before) {
await fs.appendFile(absolutePath, `\n## 执行before命令...\n\n`); await logStreamManager.write(absolutePath, `\n## 执行before命令...\n\n`);
beforeStr = await promiseExec(doc.sub_before); beforeStr = await promiseExec(doc.sub_before);
} }
} catch (error: any) { } catch (error: any) {
@ -144,7 +145,7 @@ export default class SubscriptionService {
(error.stderr && error.stderr.toString()) || JSON.stringify(error); (error.stderr && error.stderr.toString()) || JSON.stringify(error);
} }
if (beforeStr) { if (beforeStr) {
await fs.appendFile(absolutePath, `${beforeStr}\n`); await logStreamManager.write(absolutePath, `${beforeStr}\n`);
} }
}, },
onStart: async (cp: ChildProcessWithoutNullStreams, startTime) => { onStart: async (cp: ChildProcessWithoutNullStreams, startTime) => {
@ -163,7 +164,7 @@ export default class SubscriptionService {
let afterStr = ''; let afterStr = '';
try { try {
if (sub.sub_after) { if (sub.sub_after) {
await fs.appendFile(absolutePath, `\n\n## 执行after命令...\n\n`); await logStreamManager.write(absolutePath, `\n\n## 执行after命令...\n\n`);
afterStr = await promiseExec(sub.sub_after); afterStr = await promiseExec(sub.sub_after);
} }
} catch (error: any) { } catch (error: any) {
@ -171,16 +172,19 @@ export default class SubscriptionService {
(error.stderr && error.stderr.toString()) || JSON.stringify(error); (error.stderr && error.stderr.toString()) || JSON.stringify(error);
} }
if (afterStr) { if (afterStr) {
await fs.appendFile(absolutePath, `${afterStr}\n`); await logStreamManager.write(absolutePath, `${afterStr}\n`);
} }
await fs.appendFile( await logStreamManager.write(
absolutePath, absolutePath,
`\n## 执行结束... ${endTime.format( `\n## 执行结束... ${endTime.format(
'YYYY-MM-DD HH:mm:ss', 'YYYY-MM-DD HH:mm:ss',
)} ${diff} ${LOG_END_SYMBOL}`, )} ${diff} ${LOG_END_SYMBOL}`,
); );
// Close the stream after task completion
await logStreamManager.closeStream(absolutePath);
await SubscriptionModel.update( await SubscriptionModel.update(
{ status: SubscriptionStatus.idle, pid: undefined }, { status: SubscriptionStatus.idle, pid: undefined },
{ where: { id: sub.id } }, { where: { id: sub.id } },
@ -195,12 +199,12 @@ export default class SubscriptionService {
onError: async (message: string) => { onError: async (message: string) => {
const sub = await this.getDb({ id: doc.id }); const sub = await this.getDb({ id: doc.id });
const absolutePath = await handleLogPath(sub.log_path as string); const absolutePath = await handleLogPath(sub.log_path as string);
await fs.appendFile(absolutePath, `\n${message}`); await logStreamManager.write(absolutePath, `\n${message}`);
}, },
onLog: async (message: string) => { onLog: async (message: string) => {
const sub = await this.getDb({ id: doc.id }); const sub = await this.getDb({ id: doc.id });
const absolutePath = await handleLogPath(sub.log_path as string); const absolutePath = await handleLogPath(sub.log_path as string);
await fs.appendFile(absolutePath, `\n${message}`); await logStreamManager.write(absolutePath, `\n${message}`);
}, },
}; };
} }

View File

@ -0,0 +1,110 @@
import { createWriteStream, WriteStream } from 'fs';
import { EventEmitter } from 'events';
/**
* Manages write streams for log files to improve performance by avoiding repeated file opens
*/
export class LogStreamManager extends EventEmitter {
private streams: Map<string, WriteStream> = new Map();
private pendingWrites: Map<string, Promise<void>> = new Map();
/**
* Write data to a log file using a managed stream
* @param filePath - Absolute path to the log file
* @param data - Data to write to the log file
*/
async write(filePath: string, data: string): Promise<void> {
// Wait for any pending writes to this file to complete
const pending = this.pendingWrites.get(filePath);
if (pending) {
await pending;
}
// Create a new promise for this write operation
const writePromise = new Promise<void>((resolve, reject) => {
let stream = this.streams.get(filePath);
if (!stream) {
// Create a new write stream if one doesn't exist
stream = createWriteStream(filePath, { flags: 'a' });
this.streams.set(filePath, stream);
// Handle stream errors
stream.on('error', (error) => {
this.emit('error', { filePath, error });
// Remove the stream from the map on error
this.streams.delete(filePath);
reject(error);
});
}
// Write the data
const canContinue = stream.write(data, 'utf8', (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
// Handle backpressure
if (!canContinue) {
stream.once('drain', () => {
// Stream is ready for more data
});
}
});
this.pendingWrites.set(filePath, writePromise);
try {
await writePromise;
} finally {
this.pendingWrites.delete(filePath);
}
}
/**
* Close the stream for a specific file path
* @param filePath - Absolute path to the log file
*/
async closeStream(filePath: string): Promise<void> {
// Wait for any pending writes to complete
const pending = this.pendingWrites.get(filePath);
if (pending) {
await pending.catch(() => {
// Ignore errors on pending writes during close
});
}
const stream = this.streams.get(filePath);
if (stream) {
return new Promise<void>((resolve) => {
stream.end(() => {
this.streams.delete(filePath);
resolve();
});
});
}
}
/**
* Close all open streams
*/
async closeAll(): Promise<void> {
const closePromises = Array.from(this.streams.keys()).map((filePath) =>
this.closeStream(filePath),
);
await Promise.all(closePromises);
}
/**
* Get the number of open streams
*/
getOpenStreamCount(): number {
return this.streams.size;
}
}
// Export a singleton instance for shared use
export const logStreamManager = new LogStreamManager();