qinglong/shell/preload/client.py

114 lines
3.4 KiB
Python

import subprocess
import json
import tempfile
import os
from typing import Dict, List
from functools import wraps
def error_handler(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except json.JSONDecodeError as e:
raise Exception(f"parse json error: {str(e)}")
except subprocess.SubprocessError as e:
raise Exception(f"node process error: {str(e)}")
except Exception as e:
raise Exception(f"unknown error: {str(e)}")
return wrapper
class Client:
def __init__(self):
self.temp_dir = tempfile.mkdtemp(prefix="node_client_")
self.temp_script = os.path.join(self.temp_dir, "temp_script.js")
def __del__(self):
try:
if os.path.exists(self.temp_script):
os.remove(self.temp_script)
os.rmdir(self.temp_dir)
except Exception:
pass
@error_handler
def _execute_node(self, method: str, params: Dict = None) -> Dict:
node_code = f"""
const api = require('{os.getenv("QL_DIR")}/shell/preload/client.js');
(async () => {{
try {{
const result = await api.{method}({json.dumps(params) if params else ''});
console.log(JSON.stringify(result));
}} catch (error) {{
console.error(JSON.stringify({{
error: error.message,
stack: error.stack
}}));
process.exit(1);
}}
}})();
"""
with open(self.temp_script, "w", encoding="utf-8") as f:
f.write(node_code)
try:
result = subprocess.run(
["node", self.temp_script],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
error_data = json.loads(result.stderr)
raise Exception(f"{error_data.get('stack')}")
return json.loads(result.stdout)
except subprocess.TimeoutExpired:
raise Exception("node process timeout")
@error_handler
def getEnvs(self, params: Dict = None) -> Dict:
return self._execute_node("getEnvs", params)
@error_handler
def createEnv(self, data: Dict) -> Dict:
return self._execute_node("createEnv", data)
@error_handler
def updateEnv(self, data: Dict) -> Dict:
return self._execute_node("updateEnv", data)
@error_handler
def deleteEnvs(self, env_ids: List[str]) -> Dict:
return self._execute_node("deleteEnvs", {"ids": env_ids})
@error_handler
def moveEnv(self, data: Dict) -> Dict:
return self._execute_node("moveEnv", data)
@error_handler
def disableEnvs(self, env_ids: List[str]) -> Dict:
return self._execute_node("disableEnvs", {"ids": env_ids})
@error_handler
def enableEnvs(self, env_ids: List[str]) -> Dict:
return self._execute_node("enableEnvs", {"ids": env_ids})
@error_handler
def updateEnvNames(self, data: Dict) -> Dict:
return self._execute_node("updateEnvNames", data)
@error_handler
def getEnvById(self, env_id: str) -> Dict:
return self._execute_node("getEnvById", {"id": env_id})
@error_handler
def systemNotify(self, data: Dict) -> Dict:
return self._execute_node("systemNotify", data)