import schedule import threading import time import subprocess import os from real.xt_trader import XtTrader from flask import Flask, request, abort, jsonify from config import Config from simulation.simulation_trader import SimulationTrader from logger_config import get_logger from real.real_trader_manager import RealTraderManager from trade_constants import * from base_trader import BaseTrader # 获取日志记录器 logger = get_logger("server") # 全局交易实例(采用单例模式) _trader_instance = None # 交易实例(单例) _real_trader_manager_instance = None # 实盘交易管理器实例(单例) def is_real_mode(): return not Config.SIMULATION_MODE # 交易接口连接失败回调 def on_connect_failed(): """交易连接失败的回调函数""" logger.critical("交易系统连接失败,API将返回错误状态") # 获取实盘交易管理器实例的辅助函数 def get_real_trader_manager(): global _real_trader_manager_instance if _real_trader_manager_instance is None: _real_trader_manager_instance = ( None if Config.SIMULATION_MODE else RealTraderManager(get_trader()) ) return _real_trader_manager_instance # 获取交易实例 - 根据情况返回模拟或实盘交易实例 def get_trader(): global _trader_instance # 实盘模式下 if is_real_mode() and _trader_instance is None: # 检查是否为交易日 if not BaseTrader.is_trading_date(): # 非交易日创建临时实例提供错误信息 temp_trader = XtTrader(connect_failed_callback=on_connect_failed) temp_trader.connection_failed = True temp_trader.connection_error_message = "当前为非交易日,交易系统未连接" return temp_trader # 交易日但非交易时间 if not BaseTrader.is_trading_time(): # 非交易时间创建临时实例提供错误信息 temp_trader = XtTrader(connect_failed_callback=on_connect_failed) temp_trader.connection_failed = True temp_trader.connection_error_message = "当前为非交易时间,交易系统未连接" return temp_trader # 返回已存在的实例 return _trader_instance def login(): global _trader_instance try: # 如果已经有实例,先销毁 if _trader_instance is not None and is_real_mode(): _trader_instance.logout() _trader_instance = None # 创建新的XtTrader实例 _trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed) logger.info("开始登录") # 模拟交易直接返回成功 if Config.SIMULATION_MODE: return True # 如果是实盘交易,调用login方法 login_success = _trader_instance.login() # 验证连接 if login_success: result = _trader_instance.get_balance() if result and result.get("account_id") is not None: logger.info(f"查询余额成功: {result}") return True else: logger.error(f"登录成功但查询余额失败: {result}") _trader_instance.connection_failed = True _trader_instance.last_reconnect_time = time.time() _trader_instance.notify_connection_failure("登录成功但查询余额失败") return False else: logger.error("登录失败") # 不需要在这里设置失败状态,在XtTrader.login方法中已设置 return False except Exception as e: logger.error(f"登录初始化异常: {str(e)}") # 如果是实盘模式并且已经成功创建了实例,设置失败状态 if is_real_mode() and _trader_instance is not None: _trader_instance.connection_failed = True _trader_instance.last_reconnect_time = time.time() _trader_instance.notify_connection_failure(f"登录初始化异常: {str(e)}") return False # 如果创建实例失败,则抛出异常 raise Exception(f"登录失败,无法创建交易实例: {e}") def logout(): global _trader_instance if _trader_instance is not None: _trader_instance.logout() logger.info("登出成功") # 销毁实例 if is_real_mode(): _trader_instance = None logger.info("XtTrader实例已销毁") def _run_scheduler(): """定时任务执行线程""" while True: try: schedule.run_pending() time.sleep(1) except Exception as e: logger.error(f"调度器执行错误: {str(e)}") def _check_reconnect(): """重连检查线程,定期检查是否需要重连""" while True: try: if is_real_mode() and _trader_instance is not None: _trader_instance.check_reconnect() time.sleep(60) # 每分钟检查一次 except Exception as e: logger.error(f"重连检查错误: {str(e)}") # 定时任务:检查是否为交易日,并在交易日执行登录操作 def login_on_trading_day(): """仅在交易日执行登录操作""" # 使用BaseTrader中的静态方法检查是否为交易日 if BaseTrader.is_trading_date(): logger.info("今天是交易日,执行登录操作") login() else: logger.info("今天不是交易日,跳过登录操作") # 定时任务:检查是否为交易日,并在交易日执行登出操作 def logout_on_trading_day(): """仅在交易日执行登出操作""" # 使用BaseTrader中的静态方法检查是否为交易日 if BaseTrader.is_trading_date(): logger.info("今天是交易日,执行登出操作") logout() else: logger.info("今天不是交易日,跳过登出操作") def restart_qmt(): try: # 检查QMT路径是否存在 if not os.path.exists(Config.XT_LAUNCHER): logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}") return # 先关闭所有QMT相关进程 for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]: try: kill_cmd = f'taskkill /F /IM {proc_name}' result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True) if result.returncode == 0: logger.info(f"已关闭进程: {proc_name}") else: logger.warning(f"关闭进程{proc_name}时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}") except Exception as e: logger.error(f"关闭进程{proc_name}时发生异常: {str(e)}") # 尝试启动QMT软件 subprocess.Popen(Config.XT_LAUNCHER) logger.info(f"已启动QMT软件: {Config.XT_LAUNCHER}") except Exception as e: logger.error(f"重启QMT软件时发生错误: {str(e)}") # 定时任务:在交易日重启QMT软件 def restart_qmt_on_trading_day(): """仅在交易日执行QMT软件重启操作""" if BaseTrader.is_trading_date(): logger.info("今天是交易日,执行QMT软件重启操作") restart_qmt() else: logger.info("今天不是交易日,跳过QMT软件重启") def setup_scheduler(): # 设置每日任务,仅在交易日执行 schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day) schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day) schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day) # 启动调度线程 scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True) scheduler_thread.start() logger.info("定时任务调度器已启动") # 启动重连检查线程(仅在实盘模式下) if is_real_mode(): reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True) reconnect_thread.start() logger.info("重连检查线程已启动") return scheduler_thread # 初始化交易系统 try: # 程序启动时初始化交易实例 - 尝试登录,但即使失败也继续启动服务 login_success = login() if not login_success and is_real_mode(): logger.warning("初始登录失败,系统将在稍后定期尝试重连") # 设置并启动调度器 setup_scheduler() logger.info("交易系统初始化完成") except Exception as e: logger.error(f"交易系统初始化异常: {e}") # 即使初始化失败也尝试启动调度器 try: setup_scheduler() logger.info("调度器启动成功,将尝试定期重新初始化交易系统") except Exception as scheduler_e: logger.error(f"调度器启动失败: {scheduler_e}") raise app = Flask(__name__) # 添加中间件,拦截所有不以"/yu"开头的请求并返回404 @app.before_request def check_path_prefix(): if not request.path.startswith('/yu'): logger.warning(f"拦截非法请求: {request.path}") abort(404) @app.route("/yu/healthcheck", methods=["GET"]) def health_check(): if is_real_mode() and _trader_instance and not _trader_instance.is_available(): return jsonify({ "status": "error", "message": _trader_instance.connection_error_message or "交易系统连接失败" }), 503 return jsonify({"status": "ok"}), 200 @app.route("/yu/buy", methods=["POST"]) def buy(): """Buy an item with given parameters.""" logger.info("Received buy request") # Get data from request body data = request.get_json() code = data.get("code") price_str = data.get("price") amount_str = data.get("amount") strategy_name = data.get( "strategy_name", "default_strategy" ) # 新增策略名称参数,默认为空 order_type = data.get("order_type", ORDER_TYPE_LIMIT) try: if not all([code, price_str, amount_str]): raise ValueError("Missing required parameters") price = float(price_str) amount = int(float(amount_str)) if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0): raise ValueError("Price and amount must be positive") trader = get_trader() if is_real_mode(): # 检查是否在交易时间内 if not BaseTrader.is_trading_time(): logger.warning( f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}" ) return ( jsonify( {"success": False, "error": f"交易失败: 非交易时间不能实盘交易"} ), 400, ) # 检查交易系统是否可用 if is_real_mode() and not trader.is_available(): return jsonify({ "success": False, "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" }), 503 # 开始买入 logger.info( f"使用RealTraderManager执行买入: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}" ) rtm = get_real_trader_manager() result = rtm.place_order( strategy_name, code, ORDER_DIRECTION_BUY, amount, price, order_type ) else: result = trader.buy(code, price, amount, strategy_name) if result.get("success"): logger.info(f"买入下单成功: {result}") return jsonify({"success": True, "order_id": result.get("order_id")}), 200 else: logger.error(f"买入下单失败: {result}") return jsonify({"success": False, "error": result}), 400 except ValueError as e: logger.error(f"Invalid request parameters: {str(e)}") abort(400, description=str(e)) except Exception as e: logger.error(f"Error processing buy request: {str(e)}") abort(500, description="Internal server error") @app.route("/yu/sell", methods=["POST"]) def sell(): """Sell an item with given parameters.""" logger.info("Received sell request") # Get data from request body data = request.get_json() code = data.get("code") price_str = data.get("price") amount_str = data.get("amount") strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空 order_type = data.get("order_type", ORDER_TYPE_LIMIT) try: if not all([code, price_str, amount_str]): raise ValueError("Missing required parameters") price = float(price_str) amount = int(float(amount_str)) if order_type == ORDER_TYPE_LIMIT and (price <= 0 or amount <= 0): raise ValueError("Price and amount must be positive") trader = get_trader() if is_real_mode(): # 检查是否在交易时间内 if not BaseTrader.is_trading_time(): logger.warning( f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}" ) return ( jsonify( {"success": False, "error": f"交易失败: 非交易时间不能实盘交易"} ), 400, ) # 检查交易系统是否可用 if not trader.is_available(): return jsonify({ "success": False, "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" }), 503 # 开始卖出 logger.info( f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}" ) rtm = get_real_trader_manager() result = rtm.place_order( strategy_name, code, ORDER_DIRECTION_SELL, amount, price, order_type ) else: result = trader.sell(code, price, amount, strategy_name) if result.get("success"): logger.info(f"卖出下单成功: {result}") return jsonify({"success": True, "order_id": result.get("order_id")}), 200 elif "error" in result: logger.error(f"卖出下单失败: {result}") return jsonify({"success": False, "error": result["error"]}), 400 else: logger.error(f"卖出下单失败: {result}") return jsonify({"success": False, "error": result}), 400 except ValueError as e: logger.error(f"Invalid request parameters: {str(e)}") abort(400, description=str(e)) except Exception as e: logger.error(f"Error processing sell request: {str(e)}") abort(500, description="Internal server error") @app.route("/yu/cancel/", methods=["DELETE"]) def cancel(order_id): """Cancel an order by order_id.""" logger.info(f"Received cancel request for order {order_id}") try: # 检查交易系统是否可用 trader = get_trader() if is_real_mode() and not trader.is_available(): return jsonify({ "success": False, "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" }), 503 result = trader.cancel(order_id) logger.info(f"撤单结果: {result}") return jsonify(result), 200 except Exception as e: logger.error(f"Error processing cancel request: {str(e)}") abort(500, description="Internal server error") @app.route("/yu/balance", methods=["GET"]) def get_balance(): """Get balance information.""" logger.info("Received balance request") try: # 检查交易系统是否可用 trader = get_trader() if is_real_mode() and not trader.is_available(): return jsonify({ "success": False, "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" }), 503 balance = trader.get_balance() return jsonify(balance), 200 except Exception as e: logger.error(f"Error processing balance request: {str(e)}") abort(500, description="Internal server error") @app.route("/yu/positions", methods=["GET"]) def get_positions(): """Get position information.""" logger.info("Received positions request") try: # 检查交易系统是否可用 trader = get_trader() if is_real_mode() and not trader.is_available(): return jsonify({ "success": False, "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" }), 503 positions = trader.get_positions() return jsonify(positions), 200 except Exception as e: logger.error(f"Error processing positions request: {str(e)}") abort(500, description="Internal server error") @app.route("/yu/todaytrades", methods=["GET"]) def get_today_trades(): """Get today's trade information.""" logger.info("Received today trades request") try: # 检查交易系统是否可用 trader = get_trader() if is_real_mode() and not trader.is_available(): return jsonify({ "success": False, "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" }), 503 trades = trader.get_today_trades() return jsonify(trades), 200 except Exception as e: logger.error(f"Error processing today trades request: {str(e)}") abort(500, description="Internal server error") @app.route("/yu/todayorders", methods=["GET"]) def get_today_orders(): """Get today's order information.""" logger.info("Received today orders request") try: # 检查交易系统是否可用 trader = get_trader() if is_real_mode() and not trader.is_available(): return jsonify({ "success": False, "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" }), 503 orders = trader.get_today_orders() return jsonify(orders), 200 except Exception as e: logger.error(f"Error processing today orders request: {str(e)}") abort(500, description="Internal server error") @app.route("/yu/order/", methods=["GET"]) def get_order(order_id): """Get order information by order_id.""" logger.info(f"Received order request for order {order_id}") try: # 检查交易系统是否可用 trader = get_trader() if is_real_mode() and not trader.is_available(): return jsonify({ "success": False, "error": trader.connection_error_message or "交易系统连接失败,请稍后再试" }), 503 order = trader.get_order(order_id) return jsonify(order), 200 except Exception as e: logger.error(f"Error processing order request: {str(e)}") abort(500, description="Internal server error") @app.route("/yu/clear/", methods=["DELETE"]) def clear_strategy(strategy_name): """清除指定策略的持仓管理数据""" logger.info(f"接收到清除策略持仓请求: {strategy_name}") try: if get_trader().clear_position_manager(strategy_name): return jsonify({"success": True, "message": "clear success"}), 200 else: return jsonify({"success": False, "message": "策略不存在: " + strategy_name}), 400 except Exception as e: logger.error(f"清除策略持仓时出错: {str(e)}") abort(500, description="服务器内部错误") @app.route("/yu/test", methods=["GET"]) def test_route(): """测试路由,用于验证中间件是否正常工作""" return jsonify({"success": True, "message": "API路由前缀验证成功"}), 200 if __name__ == "__main__": logger.info(f"Server starting on {Config.HOST}:{Config.PORT}") app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT)