real_trader/src/trade_server.py
2025-05-12 13:39:10 +08:00

556 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import schedule
import threading
import time
import subprocess
import os
from real.xt_trader import XtTrader
from flask import Flask, request, abort, jsonify
from config import Config
from simulation.simulation_trader import SimulationTrader
from logger_config import get_logger
from real.real_trader_manager import RealTraderManager
from trade_constants import *
from base_trader import BaseTrader
# 获取日志记录器
logger = get_logger("server")
# 全局交易实例(采用单例模式)
_trader_instance = None # 交易实例(单例)
_real_trader_manager_instance = None # 实盘交易管理器实例(单例)
def is_real_mode():
return not Config.SIMULATION_MODE
# 交易接口连接失败回调
def on_connect_failed():
"""交易连接失败的回调函数"""
logger.critical("交易系统连接失败API将返回错误状态")
# 获取实盘交易管理器实例的辅助函数
def get_real_trader_manager():
global _real_trader_manager_instance
if _real_trader_manager_instance is None:
_real_trader_manager_instance = (
None if Config.SIMULATION_MODE else RealTraderManager(get_trader())
)
return _real_trader_manager_instance
# 获取交易实例 - 根据情况返回模拟或实盘交易实例
def get_trader():
global _trader_instance
# 实盘模式下
if is_real_mode() and _trader_instance is None:
# 检查是否为交易日
if not BaseTrader.is_trading_date():
# 非交易日创建临时实例提供错误信息
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易日,交易系统未连接"
return temp_trader
# 交易日但非交易时间
if not BaseTrader.is_trading_time():
# 非交易时间创建临时实例提供错误信息
temp_trader = XtTrader(connect_failed_callback=on_connect_failed)
temp_trader.connection_failed = True
temp_trader.connection_error_message = "当前为非交易时间,交易系统未连接"
return temp_trader
# 返回已存在的实例
return _trader_instance
def login():
global _trader_instance
try:
# 如果已经有实例,先销毁
if _trader_instance is not None and is_real_mode():
_trader_instance.logout()
_trader_instance = None
# 创建新的XtTrader实例
_trader_instance = SimulationTrader() if Config.SIMULATION_MODE else XtTrader(connect_failed_callback=on_connect_failed)
logger.info("开始登录")
# 模拟交易直接返回成功
if Config.SIMULATION_MODE:
return True
# 如果是实盘交易调用login方法
login_success = _trader_instance.login()
# 验证连接
if login_success:
result = _trader_instance.get_balance()
if result and result.get("account_id") is not None:
logger.info(f"查询余额成功: {result}")
return True
else:
logger.error(f"登录成功但查询余额失败: {result}")
_trader_instance.connection_failed = True
_trader_instance.last_reconnect_time = time.time()
_trader_instance.notify_connection_failure("登录成功但查询余额失败")
return False
else:
logger.error("登录失败")
# 不需要在这里设置失败状态在XtTrader.login方法中已设置
return False
except Exception as e:
logger.error(f"登录初始化异常: {str(e)}")
# 如果是实盘模式并且已经成功创建了实例,设置失败状态
if is_real_mode() and _trader_instance is not None:
_trader_instance.connection_failed = True
_trader_instance.last_reconnect_time = time.time()
_trader_instance.notify_connection_failure(f"登录初始化异常: {str(e)}")
return False
# 如果创建实例失败,则抛出异常
raise Exception(f"登录失败,无法创建交易实例: {e}")
def logout():
global _trader_instance
if _trader_instance is not None:
_trader_instance.logout()
logger.info("登出成功")
# 销毁实例
if is_real_mode():
_trader_instance = None
logger.info("XtTrader实例已销毁")
def _run_scheduler():
"""定时任务执行线程"""
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"调度器执行错误: {str(e)}")
def _check_reconnect():
"""重连检查线程,定期检查是否需要重连"""
while True:
try:
if is_real_mode() and _trader_instance is not None:
_trader_instance.check_reconnect()
time.sleep(60) # 每分钟检查一次
except Exception as e:
logger.error(f"重连检查错误: {str(e)}")
# 定时任务:检查是否为交易日,并在交易日执行登录操作
def login_on_trading_day():
"""仅在交易日执行登录操作"""
# 使用BaseTrader中的静态方法检查是否为交易日
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行登录操作")
login()
else:
logger.info("今天不是交易日,跳过登录操作")
# 定时任务:检查是否为交易日,并在交易日执行登出操作
def logout_on_trading_day():
"""仅在交易日执行登出操作"""
# 使用BaseTrader中的静态方法检查是否为交易日
if BaseTrader.is_trading_date():
logger.info("今天是交易日,执行登出操作")
logout()
else:
logger.info("今天不是交易日,跳过登出操作")
def restart_qmt():
try:
# 检查QMT路径是否存在
if not os.path.exists(Config.XT_LAUNCHER):
logger.error(f"QMT启动路径不存在: {Config.XT_LAUNCHER}")
return
# 先关闭所有QMT相关进程
for proc_name in [Config.XT_PROCESS1, Config.XT_PROCESS2]:
try:
kill_cmd = f'taskkill /F /IM {proc_name}'
result = subprocess.run(kill_cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"已关闭进程: {proc_name}")
else:
logger.warning(f"关闭进程{proc_name}时返回码: {result.returncode}, 输出: {result.stdout}, 错误: {result.stderr}")
except Exception as e:
logger.error(f"关闭进程{proc_name}时发生异常: {str(e)}")
# 尝试启动QMT软件
subprocess.Popen(Config.XT_LAUNCHER)
logger.info(f"已启动QMT软件: {Config.XT_LAUNCHER}")
except Exception as e:
logger.error(f"重启QMT软件时发生错误: {str(e)}")
# 定时任务在交易日重启QMT软件
def restart_qmt_on_trading_day():
"""仅在交易日执行QMT软件重启操作"""
if BaseTrader.is_trading_date():
logger.info("今天是交易日执行QMT软件重启操作")
restart_qmt()
else:
logger.info("今天不是交易日跳过QMT软件重启")
def setup_scheduler():
# 设置每日任务,仅在交易日执行
schedule.every().day.at(Config.MARKET_OPEN_TIME).do(login_on_trading_day)
schedule.every().day.at(Config.MARKET_CLOSE_TIME).do(logout_on_trading_day)
schedule.every().day.at(Config.XT_RESTART_TIME).do(restart_qmt_on_trading_day)
# 启动调度线程
scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("定时任务调度器已启动")
# 启动重连检查线程(仅在实盘模式下)
if is_real_mode():
reconnect_thread = threading.Thread(target=_check_reconnect, daemon=True)
reconnect_thread.start()
logger.info("重连检查线程已启动")
return scheduler_thread
# 初始化交易系统
try:
# 程序启动时初始化交易实例 - 尝试登录,但即使失败也继续启动服务
login_success = login()
if not login_success and is_real_mode():
logger.warning("初始登录失败,系统将在稍后定期尝试重连")
# 设置并启动调度器
setup_scheduler()
logger.info("交易系统初始化完成")
except Exception as e:
logger.error(f"交易系统初始化异常: {e}")
# 即使初始化失败也尝试启动调度器
try:
setup_scheduler()
logger.info("调度器启动成功,将尝试定期重新初始化交易系统")
except Exception as scheduler_e:
logger.error(f"调度器启动失败: {scheduler_e}")
raise
app = Flask(__name__)
@app.route("/yu/healthcheck", methods=["GET"])
def health_check():
if is_real_mode() and _trader_instance and not _trader_instance.is_available():
return jsonify({
"status": "error",
"message": _trader_instance.connection_error_message or "交易系统连接失败"
}), 503
return jsonify({"status": "ok"}), 200
@app.route("/yu/buy", methods=["POST"])
def buy():
"""Buy an item with given parameters."""
logger.info("Received buy request")
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get(
"strategy_name", "default_strategy"
) # 新增策略名称参数,默认为空
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(amount_str)
if price <= 0 or amount <= 0:
raise ValueError("Price and amount must be positive")
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
# 检查是否在交易时间内
if not BaseTrader.is_trading_time():
logger.warning(
f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}"
)
return (
jsonify(
{"success": False, "error": f"交易失败: 非交易时间不能实盘交易"}
),
400,
)
if is_real_mode():
logger.info(
f"使用RealTraderManager执行买入: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}"
)
rtm = get_real_trader_manager()
result = rtm.place_order(
strategy_name, code, ORDER_DIRECTION_BUY, amount, price
)
else:
result = trader.buy(code, price, amount, strategy_name)
if result.get("success"):
logger.info(f"买入成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
else:
logger.error(f"买入失败: {result}")
return jsonify({"success": False, "error": result}), 400
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing buy request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/sell", methods=["POST"])
def sell():
"""Sell an item with given parameters."""
logger.info("Received sell request")
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(amount_str)
if price <= 0 or amount <= 0:
raise ValueError("Price and amount must be positive")
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
# 检查是否在交易时间内
if not BaseTrader.is_trading_time():
logger.warning(
f"交易失败 - 非交易时间不能交易 - 代码: {code}, 价格: {price}, 数量: {amount}"
)
return (
jsonify(
{"success": False, "error": f"交易失败: 非交易时间不能实盘交易"}
),
400,
)
if is_real_mode():
logger.info(
f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}"
)
rtm = get_real_trader_manager()
result = rtm.place_order(
strategy_name, code, ORDER_DIRECTION_SELL, amount, price
)
else:
result = trader.sell(code, price, amount)
if result.get("success"):
logger.info(f"卖出成功: {result}")
return jsonify({"success": True, "order_id": result.get("order_id")}), 200
elif "error" in result:
logger.error(f"卖出失败: {result}")
return jsonify({"success": False, "error": result["error"]}), 400
else:
logger.error(f"卖出失败: {result}")
return jsonify({"success": False, "error": result}), 400
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing sell request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/cancel/<order_id>", methods=["DELETE"])
def cancel(order_id):
"""Cancel an order by order_id."""
logger.info(f"Received cancel request for order {order_id}")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
result = trader.cancel(order_id)
logger.info(f"撤单结果: {result}")
return jsonify(result), 200
except Exception as e:
logger.error(f"Error processing cancel request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/balance", methods=["GET"])
def get_balance():
"""Get balance information."""
logger.info("Received balance request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
balance = trader.get_balance()
return jsonify(balance), 200
except Exception as e:
logger.error(f"Error processing balance request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/positions", methods=["GET"])
def get_positions():
"""Get position information."""
logger.info("Received positions request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
positions = trader.get_positions()
return jsonify(positions), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todaytrades", methods=["GET"])
def get_today_trades():
"""Get today's trade information."""
logger.info("Received today trades request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
trades = trader.get_today_trades()
return jsonify(trades), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todayorders", methods=["GET"])
def get_today_orders():
"""Get today's order information."""
logger.info("Received today orders request")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
orders = trader.get_today_orders()
return jsonify(orders), 200
except Exception as e:
logger.error(f"Error processing today orders request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/order/<order_id>", methods=["GET"])
def get_order(order_id):
"""Get order information by order_id."""
logger.info(f"Received order request for order {order_id}")
try:
# 检查交易系统是否可用
trader = get_trader()
if is_real_mode() and not trader.is_available():
return jsonify({
"success": False,
"error": trader.connection_error_message or "交易系统连接失败,请稍后再试"
}), 503
order = trader.get_order(order_id)
return jsonify(order), 200
except Exception as e:
logger.error(f"Error processing order request: {str(e)}")
abort(500, description="Internal server error")
def get_order(order_id):
"""Get order information by order_id."""
logger.info(f"Received order request for order {order_id}")
@app.route("/yu/clear/<strategy_name>", methods=["DELETE"])
def clear_strategy(strategy_name):
"""清除指定策略的持仓管理数据"""
logger.info(f"接收到清除策略持仓请求: {strategy_name}")
try:
if get_trader().clear_position_manager(strategy_name):
return jsonify({"success": True, "message": "clear success"}), 200
else:
return jsonify({"success": False, "message": "策略不存在: " + strategy_name}), 400
except Exception as e:
logger.error(f"清除策略持仓时出错: {str(e)}")
abort(500, description="服务器内部错误")
if __name__ == "__main__":
logger.info(f"Server starting on {Config.HOST}:{Config.PORT}")
app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT)