From 08ef509e27c975bd4b1583c91ca32214d122c5d4 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Sun, 16 Nov 2025 21:11:10 +0800 Subject: [PATCH] Optimize log file writes using stream pooling (#2835) * Initial plan * Implement LogStreamManager for optimized log writing Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Fix error handler in LogStreamManager to avoid race conditions Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> --- back/api/system.ts | 7 +- back/services/cron.ts | 9 ++- back/services/subscription.ts | 18 ++++-- back/shared/logStreamManager.ts | 110 ++++++++++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 12 deletions(-) create mode 100644 back/shared/logStreamManager.ts diff --git a/back/api/system.ts b/back/api/system.ts index 8074e6ff..30742a52 100644 --- a/back/api/system.ts +++ b/back/api/system.ts @@ -14,6 +14,7 @@ import { } from '../config/util'; import dayjs from 'dayjs'; import multer from 'multer'; +import { logStreamManager } from '../shared/logStreamManager'; const route = Router(); const storage = multer.diskStorage({ @@ -276,17 +277,19 @@ export default (app: Router) => { res.setHeader('QL-Task-Log', `${logPath}`); }, onEnd: async (cp, endTime, diff) => { + // Close the stream after task completion + await logStreamManager.closeStream(await handleLogPath(logPath)); res.end(); }, onError: async (message: string) => { res.write(message); const absolutePath = await handleLogPath(logPath); - await fs.appendFile(absolutePath, message); + await logStreamManager.write(absolutePath, message); }, onLog: async (message: string) => { res.write(message); const absolutePath = await handleLogPath(logPath); - await fs.appendFile(absolutePath, message); + await logStreamManager.write(absolutePath, message); }, }, ); diff --git a/back/services/cron.ts b/back/services/cron.ts index 58f7cb7b..cd2975bd 100644 --- a/back/services/cron.ts +++ b/back/services/cron.ts @@ -24,6 +24,7 @@ import pickBy from 'lodash/pickBy'; import omit from 'lodash/omit'; import { writeFileWithLock } from '../shared/utils'; import { ScheduleType } from '../interface/schedule'; +import { logStreamManager } from '../shared/logStreamManager'; @Service() export default class CronService { @@ -516,7 +517,7 @@ export default class CronService { { where: { id } }, ); cp.stdout.on('data', async (data) => { - await fs.appendFile(absolutePath, data.toString()); + await logStreamManager.write(absolutePath, data.toString()); }); cp.stderr.on('data', async (data) => { this.logger.info( @@ -524,7 +525,7 @@ export default class CronService { command, data.toString(), ); - await fs.appendFile(absolutePath, data.toString()); + await logStreamManager.write(absolutePath, data.toString()); }); cp.on('error', async (err) => { this.logger.error( @@ -532,7 +533,7 @@ export default class CronService { command, err, ); - await fs.appendFile(absolutePath, JSON.stringify(err)); + await logStreamManager.write(absolutePath, JSON.stringify(err)); }); cp.on('exit', async (code) => { @@ -541,6 +542,8 @@ export default class CronService { JSON.stringify(params), code, ); + // Close the stream after task completion + await logStreamManager.closeStream(absolutePath); await CrontabModel.update( { status: CrontabStatus.idle, pid: undefined }, { where: { id } }, diff --git a/back/services/subscription.ts b/back/services/subscription.ts index 5b13161b..1a8638f8 100644 --- a/back/services/subscription.ts +++ b/back/services/subscription.ts @@ -31,6 +31,7 @@ import { formatCommand, formatUrl } from '../config/subscription'; import { CrontabModel } from '../data/cron'; import CrontabService from './cron'; import taskLimit from '../shared/pLimit'; +import { logStreamManager } from '../shared/logStreamManager'; @Service() export default class SubscriptionService { @@ -136,7 +137,7 @@ export default class SubscriptionService { let beforeStr = ''; try { if (doc.sub_before) { - await fs.appendFile(absolutePath, `\n## 执行before命令...\n\n`); + await logStreamManager.write(absolutePath, `\n## 执行before命令...\n\n`); beforeStr = await promiseExec(doc.sub_before); } } catch (error: any) { @@ -144,7 +145,7 @@ export default class SubscriptionService { (error.stderr && error.stderr.toString()) || JSON.stringify(error); } if (beforeStr) { - await fs.appendFile(absolutePath, `${beforeStr}\n`); + await logStreamManager.write(absolutePath, `${beforeStr}\n`); } }, onStart: async (cp: ChildProcessWithoutNullStreams, startTime) => { @@ -163,7 +164,7 @@ export default class SubscriptionService { let afterStr = ''; try { if (sub.sub_after) { - await fs.appendFile(absolutePath, `\n\n## 执行after命令...\n\n`); + await logStreamManager.write(absolutePath, `\n\n## 执行after命令...\n\n`); afterStr = await promiseExec(sub.sub_after); } } catch (error: any) { @@ -171,16 +172,19 @@ export default class SubscriptionService { (error.stderr && error.stderr.toString()) || JSON.stringify(error); } if (afterStr) { - await fs.appendFile(absolutePath, `${afterStr}\n`); + await logStreamManager.write(absolutePath, `${afterStr}\n`); } - await fs.appendFile( + await logStreamManager.write( absolutePath, `\n## 执行结束... ${endTime.format( 'YYYY-MM-DD HH:mm:ss', )} 耗时 ${diff} 秒${LOG_END_SYMBOL}`, ); + // Close the stream after task completion + await logStreamManager.closeStream(absolutePath); + await SubscriptionModel.update( { status: SubscriptionStatus.idle, pid: undefined }, { where: { id: sub.id } }, @@ -195,12 +199,12 @@ export default class SubscriptionService { onError: async (message: string) => { const sub = await this.getDb({ id: doc.id }); const absolutePath = await handleLogPath(sub.log_path as string); - await fs.appendFile(absolutePath, `\n${message}`); + await logStreamManager.write(absolutePath, `\n${message}`); }, onLog: async (message: string) => { const sub = await this.getDb({ id: doc.id }); const absolutePath = await handleLogPath(sub.log_path as string); - await fs.appendFile(absolutePath, `\n${message}`); + await logStreamManager.write(absolutePath, `\n${message}`); }, }; } diff --git a/back/shared/logStreamManager.ts b/back/shared/logStreamManager.ts new file mode 100644 index 00000000..815ce409 --- /dev/null +++ b/back/shared/logStreamManager.ts @@ -0,0 +1,110 @@ +import { createWriteStream, WriteStream } from 'fs'; +import { EventEmitter } from 'events'; + +/** + * Manages write streams for log files to improve performance by avoiding repeated file opens + */ +export class LogStreamManager extends EventEmitter { + private streams: Map = new Map(); + private pendingWrites: Map> = new Map(); + + /** + * Write data to a log file using a managed stream + * @param filePath - Absolute path to the log file + * @param data - Data to write to the log file + */ + async write(filePath: string, data: string): Promise { + // Wait for any pending writes to this file to complete + const pending = this.pendingWrites.get(filePath); + if (pending) { + await pending; + } + + // Create a new promise for this write operation + const writePromise = new Promise((resolve, reject) => { + let stream = this.streams.get(filePath); + + if (!stream) { + // Create a new write stream if one doesn't exist + stream = createWriteStream(filePath, { flags: 'a' }); + this.streams.set(filePath, stream); + + // Handle stream errors + stream.on('error', (error) => { + this.emit('error', { filePath, error }); + // Remove the stream from the map on error + this.streams.delete(filePath); + reject(error); + }); + } + + // Write the data + const canContinue = stream.write(data, 'utf8', (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + + // Handle backpressure + if (!canContinue) { + stream.once('drain', () => { + // Stream is ready for more data + }); + } + }); + + this.pendingWrites.set(filePath, writePromise); + + try { + await writePromise; + } finally { + this.pendingWrites.delete(filePath); + } + } + + /** + * Close the stream for a specific file path + * @param filePath - Absolute path to the log file + */ + async closeStream(filePath: string): Promise { + // Wait for any pending writes to complete + const pending = this.pendingWrites.get(filePath); + if (pending) { + await pending.catch(() => { + // Ignore errors on pending writes during close + }); + } + + const stream = this.streams.get(filePath); + if (stream) { + return new Promise((resolve) => { + stream.end(() => { + this.streams.delete(filePath); + resolve(); + }); + }); + } + } + + /** + * Close all open streams + */ + async closeAll(): Promise { + const closePromises = Array.from(this.streams.keys()).map((filePath) => + this.closeStream(filePath), + ); + await Promise.all(closePromises); + } + + /** + * Get the number of open streams + */ + getOpenStreamCount(): number { + return this.streams.size; + } +} + +// Export a singleton instance for shared use +export const logStreamManager = new LogStreamManager();