From 949d956aef64e8f16839ebccf463be2ff7b681ac Mon Sep 17 00:00:00 2001 From: whyour Date: Sat, 13 Jun 2026 19:35:47 +0800 Subject: [PATCH] =?UTF-8?q?grpc=20=E6=9C=8D=E5=8A=A1=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E8=AF=81=E4=B9=A6=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- back/app.ts | 4 ++ back/config/grpcCerts.ts | 150 +++++++++++++++++++++++++++++++++++++++ back/config/index.ts | 2 + back/schedule/client.ts | 23 ++++-- back/services/grpc.ts | 10 ++- back/shared/pLimit.ts | 26 +++++-- shell/preload/client.js | 20 +++++- 7 files changed, 223 insertions(+), 12 deletions(-) create mode 100644 back/config/grpcCerts.ts diff --git a/back/app.ts b/back/app.ts index fc95a72f..7a265da5 100644 --- a/back/app.ts +++ b/back/app.ts @@ -232,6 +232,10 @@ class Application { } private async startHttpService() { + // 在导入任何 gRPC 客户端模块之前初始化 mTLS 证书 + const { initGrpcCerts } = await import('./config/grpcCerts'); + await initGrpcCerts(); + this.setupMiddlewares(); const { HttpServerService } = await import('./services/http'); diff --git a/back/config/grpcCerts.ts b/back/config/grpcCerts.ts new file mode 100644 index 00000000..63af59aa --- /dev/null +++ b/back/config/grpcCerts.ts @@ -0,0 +1,150 @@ +import { execSync } from 'child_process'; +import * as fs from 'fs/promises'; +import * as os from 'os'; +import path from 'path'; +import config from './index'; +import { fileExist } from './util'; +import Logger from '../loaders/logger'; + +export interface GrpcTlsConfig { + caCert: string; + serverCert: string; + serverKey: string; + clientCert: string; + clientKey: string; +} + +const certDir = path.join(config.configPath, 'grpc'); +const caKeyPath = path.join(certDir, 'ca.key'); +const caCertPath = path.join(certDir, 'ca.crt'); +const serverKeyPath = path.join(certDir, 'server.key'); +const serverCertPath = path.join(certDir, 'server.crt'); +const clientKeyPath = path.join(certDir, 'client.key'); +const clientCertPath = path.join(certDir, 'client.crt'); + +let cachedConfig: GrpcTlsConfig | null = null; + +function run(cmd: string, execOpts?: Record): string { + const opts = { stdio: 'pipe', timeout: 30000, encoding: 'utf-8', ...execOpts } as any; + return (execSync(cmd, opts) as string).trim(); +} + +async function tmpFile(prefix: string): Promise { + const dir = (await fileExist(certDir)) ? certDir : os.tmpdir(); + await fs.mkdir(dir, { recursive: true }); + return path.join(dir, `.${prefix}_${Date.now()}_${Math.random().toString(36).slice(2)}.pem`); +} + +async function generateAllCerts(): Promise { + Logger.info('Generating gRPC mTLS certificates...'); + + const caKeyTmp = await tmpFile('ca_key'); + const caCertTmp = await tmpFile('ca_cert'); + const serverKeyTmp = await tmpFile('server_key'); + const serverCsrTmp = await tmpFile('server_csr'); + const serverExtTmp = await tmpFile('server_ext'); + const clientKeyTmp = await tmpFile('client_key'); + const clientCsrTmp = await tmpFile('client_csr'); + const clientExtTmp = await tmpFile('client_ext'); + const srlTmp = path.join(path.dirname(caKeyTmp), '.grpc_ca.srl'); + + const cleanup = async () => { + for (const f of [caKeyTmp, caCertTmp, serverKeyTmp, serverCsrTmp, serverExtTmp, + clientKeyTmp, clientCsrTmp, clientExtTmp, srlTmp]) { + try { await fs.unlink(f); } catch {} + } + }; + + try { + // 1. CA(私钥直接存盘,证书写入临时文件供签发使用) + run(`openssl genrsa -out '${caKeyTmp}' 2048 2>/dev/null`); + run(`openssl req -new -x509 -days 3650 -key '${caKeyTmp}' -out '${caCertTmp}' -subj '/CN=qinglong-ca/O=qinglong/C=CN' 2>/dev/null`); + const caKey = await fs.readFile(caKeyTmp, 'utf-8'); + const caCert = await fs.readFile(caCertTmp, 'utf-8'); + await fs.mkdir(certDir, { recursive: true }); + await fs.writeFile(caKeyPath, caKey, { mode: 0o600 }); + + // 2. 服务端 + run(`openssl genrsa -out '${serverKeyTmp}' 2048 2>/dev/null`); + run(`openssl req -new -key '${serverKeyTmp}' -out '${serverCsrTmp}' -subj '/CN=grpc-server' 2>/dev/null`); + await fs.writeFile(serverExtTmp, 'subjectAltName=DNS:localhost,IP:127.0.0.1,IP:::1\n'); + const serverCert = run( + `openssl x509 -req -days 3650 -in '${serverCsrTmp}' -CA '${caCertTmp}' -CAkey '${caKeyTmp}' -CAcreateserial -extfile '${serverExtTmp}' 2>/dev/null`, + ); + const serverKey = await fs.readFile(serverKeyTmp, 'utf-8'); + + // 3. 客户端 + run(`openssl genrsa -out '${clientKeyTmp}' 2048 2>/dev/null`); + run(`openssl req -new -key '${clientKeyTmp}' -out '${clientCsrTmp}' -subj '/CN=grpc-client' 2>/dev/null`); + await fs.writeFile(clientExtTmp, 'extendedKeyUsage=clientAuth\n'); + const clientCert = run( + `openssl x509 -req -days 3650 -in '${clientCsrTmp}' -CA '${caCertTmp}' -CAkey '${caKeyTmp}' -CAcreateserial -extfile '${clientExtTmp}' 2>/dev/null`, + ); + const clientKey = await fs.readFile(clientKeyTmp, 'utf-8'); + + await cleanup(); + Logger.info('gRPC mTLS certificates generated successfully'); + + return { caCert, serverCert, serverKey, clientCert, clientKey }; + } catch (e) { + await cleanup(); + throw e; + } +} + +async function saveCerts(tlsConfig: GrpcTlsConfig): Promise { + await fs.mkdir(certDir, { recursive: true }); + + await fs.writeFile(caCertPath, tlsConfig.caCert, { mode: 0o644 }); + await fs.writeFile(serverCertPath, tlsConfig.serverCert, { mode: 0o644 }); + await fs.writeFile(serverKeyPath, tlsConfig.serverKey, { mode: 0o600 }); + await fs.writeFile(clientCertPath, tlsConfig.clientCert, { mode: 0o644 }); + await fs.writeFile(clientKeyPath, tlsConfig.clientKey, { mode: 0o600 }); + + Logger.info(`gRPC mTLS certificates saved to ${certDir}`); +} + +async function loadExistingCerts(): Promise { + const exists = await Promise.all([ + fileExist(caCertPath), + fileExist(serverCertPath), + fileExist(serverKeyPath), + fileExist(clientCertPath), + fileExist(clientKeyPath), + ]); + + if (exists.some((e) => !e)) { + return null; + } + + const [caCert, serverCert, serverKey, clientCert, clientKey] = await Promise.all([ + fs.readFile(caCertPath, 'utf-8'), + fs.readFile(serverCertPath, 'utf-8'), + fs.readFile(serverKeyPath, 'utf-8'), + fs.readFile(clientCertPath, 'utf-8'), + fs.readFile(clientKeyPath, 'utf-8'), + ]); + + Logger.info('Loaded existing gRPC mTLS certificates from disk'); + return { caCert, serverCert, serverKey, clientCert, clientKey }; +} + +export async function initGrpcCerts(): Promise { + if (cachedConfig) { + return cachedConfig; + } + + let tlsConfig = await loadExistingCerts(); + + if (!tlsConfig) { + tlsConfig = await generateAllCerts(); + await saveCerts(tlsConfig); + } + + cachedConfig = tlsConfig; + return tlsConfig; +} + +export function getGrpcCerts(): GrpcTlsConfig | null { + return cachedConfig; +} diff --git a/back/config/index.ts b/back/config/index.ts index 72596f1f..869f5d10 100644 --- a/back/config/index.ts +++ b/back/config/index.ts @@ -173,6 +173,8 @@ export default { 'env.js', 'env.py', 'token.json', + 'grpc', + '__pycache__', ], writePathList: [configPath, scriptPath], bakPath, diff --git a/back/schedule/client.ts b/back/schedule/client.ts index 6b086452..30daacf9 100644 --- a/back/schedule/client.ts +++ b/back/schedule/client.ts @@ -7,13 +7,26 @@ import { DeleteCronResponse, } from '../protos/cron'; import config from '../config'; +import { getGrpcCerts } from '../config/grpcCerts'; class Client { - private client = new CronClient( - `localhost:${config.grpcPort}`, - credentials.createInsecure(), - { 'grpc.enable_http_proxy': 0 }, - ); + private _client: CronClient | null = null; + + private get client(): CronClient { + if (!this._client) { + const tlsConfig = getGrpcCerts()!; + this._client = new CronClient( + `localhost:${config.grpcPort}`, + credentials.createSsl( + Buffer.from(tlsConfig.caCert), + Buffer.from(tlsConfig.clientKey), + Buffer.from(tlsConfig.clientCert), + ), + { 'grpc.enable_http_proxy': 0 }, + ); + } + return this._client; + } addCron(request: AddCronRequest['crons']): Promise { return new Promise((resolve, reject) => { diff --git a/back/services/grpc.ts b/back/services/grpc.ts index 5edd9d2c..7432ca70 100644 --- a/back/services/grpc.ts +++ b/back/services/grpc.ts @@ -11,6 +11,7 @@ import { promisify } from 'util'; import config from '../config'; import { metricsService } from './metrics'; import { Service } from 'typedi'; +import { initGrpcCerts } from '../config/grpcCerts'; @Service() export class GrpcServerService { @@ -29,6 +30,13 @@ export class GrpcServerService { this.server.addService(CronService, { addCron, delCron }); this.server.addService(ApiService, Api); + const tlsConfig = await initGrpcCerts(); + const credentials = ServerCredentials.createSsl( + Buffer.from(tlsConfig.caCert), + [{ cert_chain: Buffer.from(tlsConfig.serverCert), private_key: Buffer.from(tlsConfig.serverKey) }], + true, + ); + const grpcPort = config.grpcPort; const hostsToTry = [ config.bindHostGrpc, @@ -41,7 +49,7 @@ export class GrpcServerService { for (const host of hostsToTry) { try { const address = this.formatGrpcAddress(host, grpcPort); - await bindAsync(address, ServerCredentials.createInsecure()); + await bindAsync(address, credentials); Logger.debug(`✌️ gRPC service started successfully on ${address}`); metricsService.record('grpc_service_start', 1, { port: grpcPort.toString(), diff --git a/back/shared/pLimit.ts b/back/shared/pLimit.ts index 2f04d5be..7d75678f 100644 --- a/back/shared/pLimit.ts +++ b/back/shared/pLimit.ts @@ -14,6 +14,7 @@ import { import config from '../config'; import { credentials } from '@grpc/grpc-js'; import { ApiClient } from '../protos/api'; +import { getGrpcCerts } from '../config/grpcCerts'; class TaskLimit { private dependenyLimit = new PQueue({ concurrency: 1 }); @@ -36,11 +37,26 @@ class TaskLimit { private systemLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); - private client = new ApiClient( - `localhost:${config.grpcPort}`, - credentials.createInsecure(), - { 'grpc.enable_http_proxy': 0 }, - ); + private _client: ApiClient | null = null; + + private get client(): ApiClient { + if (!this._client) { + const tlsConfig = getGrpcCerts(); + const creds = tlsConfig + ? credentials.createSsl( + Buffer.from(tlsConfig.caCert), + Buffer.from(tlsConfig.clientKey), + Buffer.from(tlsConfig.clientCert), + ) + : credentials.createInsecure(); + this._client = new ApiClient( + `localhost:${config.grpcPort}`, + creds, + { 'grpc.enable_http_proxy': 0 }, + ); + } + return this._client; + } get cronLimitActiveCount() { return this.cronLimit.pending; diff --git a/shell/preload/client.js b/shell/preload/client.js index 3a94c0c2..7b0420f8 100644 --- a/shell/preload/client.js +++ b/shell/preload/client.js @@ -1,8 +1,26 @@ const grpc = require('@grpc/grpc-js'); const protoLoader = require('@grpc/proto-loader'); +const { readFileSync } = require('fs'); const { join } = require('path'); class GrpcClient { + static #certDir = join( + process.env.QL_DATA_DIR || join(process.env.QL_DIR, 'data'), + 'config/grpc', + ); + + static #loadTlsCredentials() { + try { + return grpc.credentials.createSsl( + readFileSync(join(GrpcClient.#certDir, 'ca.crt')), + readFileSync(join(GrpcClient.#certDir, 'client.key')), + readFileSync(join(GrpcClient.#certDir, 'client.crt')), + ); + } catch { + return grpc.credentials.createInsecure(); + } + } + static #config = { protoPath: join(process.env.QL_DIR, 'back/protos/api.proto'), serverAddress: `localhost:${process.env.GRPC_PORT || '5500'}`, @@ -58,7 +76,7 @@ class GrpcClient { this.#client = new apiProto.Api( serverAddress, - grpc.credentials.createInsecure(), + GrpcClient.#loadTlsCredentials(), grpcOptions, ); } catch (error) {