real_trader/src/trade_server.py
2025-05-01 05:33:28 +08:00

682 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import schedule
import threading
import time
from xt_trader import XtTrader
from flask import Flask, request, abort, jsonify
from config import Config
from concurrent.futures import TimeoutError
import concurrent.futures
import atexit
from simulation_trader import SimulationTrader
import datetime
from strategy_position_manager import StrategyPositionManager
from logger_config import get_logger
# 获取日志记录器
logger = get_logger('server')
# 全局交易实例(采用单例模式)
_sim_trader_instance = None # 模拟交易实例(单例)
_real_trader_instance = None # 实盘交易实例(单例)
_real_trader_manager_instance = None # 实盘交易管理器实例(单例)
# 添加线程锁,保护单例实例的创建
_instance_lock = threading.RLock()
# 后台任务执行线程
_scheduler_thread = None
# 获取模拟交易实例的辅助函数
def get_sim_trader():
"""获取模拟交易实例 - 保证单例模式
Returns:
返回模拟交易单例实例
"""
global _sim_trader_instance
with _instance_lock:
if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader()
return _sim_trader_instance
# 获取实盘交易实例的辅助函数
def get_real_trader():
"""获取实盘交易实例 - 保证单例模式
Returns:
返回实盘交易单例实例
"""
global _real_trader_instance
with _instance_lock:
if _real_trader_instance is None:
_real_trader_instance = XtTrader()
# 检查交易实例是否已登录,如果未登录则进行登录
if not _real_trader_instance.is_logged_in():
logger.info("创建新的XtTrader实例并登录")
login_success = _real_trader_instance.login()
if not login_success:
logger.error("XtTrader登录失败")
return _real_trader_instance
# 获取实盘交易管理器实例的辅助函数
def get_real_trader_manager():
"""获取实盘交易管理器实例 - 保证单例模式
Returns:
返回实盘交易管理器单例实例
"""
global _real_trader_manager_instance
with _instance_lock:
if _real_trader_manager_instance is None:
# 延迟导入避免循环依赖
from real_trader_manager import RealTraderManager
_real_trader_manager_instance = RealTraderManager(get_real_trader())
logger.info("创建新的RealTraderManager实例")
return _real_trader_manager_instance
# 判断当前是否应该使用模拟交易
def should_use_simulation():
"""判断是否应该使用模拟交易
Returns:
tuple: (should_simulate: bool, simulation_reason: str)
should_simulate: 是否应该使用模拟交易
simulation_reason: 使用模拟交易的原因
"""
# 如果配置为仅模拟交易返回True
if Config.SIMULATION_ONLY:
return True, "配置为仅模拟交易"
# 判断当前是否为交易日(只基于日期,不考虑时间)
now = datetime.datetime.now()
# 使用chinese_calendar判断是否为交易日
from chinese_calendar import is_workday, is_holiday
is_trading_day = is_workday(now) and not is_holiday(now)
logger.debug(f"使用chinese_calendar判断交易日: {now.date()}, 是交易日: {is_trading_day}")
# 如果不是交易日返回True使用模拟交易
if not is_trading_day:
return True, f"当前非交易日 - {now.date()}"
# 如果是交易日无论是否在交易时间都返回False使用实盘
return False, ""
# 判断当前是否在交易时间内
def is_trading_hours():
"""判断当前是否在交易时间内
Returns:
tuple: (is_trading: bool, message: str)
is_trading: 是否在交易时间
message: 相关信息
"""
now = datetime.datetime.now()
current_time = now.time()
# 是否在交易时间段内9:30-11:30, 13:00-15:00
morning_start = datetime.time(9, 30)
morning_end = datetime.time(11, 30)
afternoon_start = datetime.time(13, 0)
afternoon_end = datetime.time(15, 0)
is_trading_hour = (morning_start <= current_time <= morning_end) or (afternoon_start <= current_time <= afternoon_end)
if is_trading_hour:
return True, ""
else:
return False, f"当前非交易时段 - 时间: {current_time.strftime('%H:%M:%S')}"
# 获取交易实例 - 根据情况返回模拟或实盘交易实例
def get_trader():
"""获取交易实例 - 根据当前状态决定返回模拟还是实盘交易实例
Returns:
返回交易实例,根据配置和当前时间决定是模拟交易还是实盘交易
"""
should_simulate, _ = should_use_simulation()
if should_simulate:
return get_sim_trader()
else:
return get_real_trader()
# 获取指定类型的交易实例 - 供内部API查询等使用
def get_trader_by_type(trader_type='auto'):
"""获取指定类型的交易实例
Args:
trader_type: 'simulation'=模拟交易, 'real'=实盘交易, 'auto'=自动判断
Returns:
指定类型的交易实例
"""
if trader_type == 'simulation':
return get_sim_trader()
elif trader_type == 'real':
return get_real_trader()
else: # 'auto'
return get_trader()
def run_daily(time_str, job_func):
"""设置每天在指定时间运行的任务
Args:
time_str: 运行时间,格式为"HH:MM"
job_func: 要运行的函数
"""
# 不再区分周一到周五,而是每天执行
# 交易日判断逻辑已移到get_trader函数中
schedule.every().day.at(time_str).do(job_func)
def run_pending_tasks():
"""定时任务执行线程"""
global _scheduler_thread_running
logger.info("定时任务调度线程已启动")
while _scheduler_thread_running:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"Error in scheduler: {str(e)}")
logger.info("定时任务调度线程已停止")
# 程序启动时初始化线程
_scheduler_thread_running = True
_scheduler_thread = threading.Thread(target=run_pending_tasks, daemon=True)
_scheduler_thread.start()
# 程序退出清理函数
def cleanup():
"""程序退出时执行的清理操作"""
logger.info("开始执行程序退出清理...")
# 停止调度线程
global _scheduler_thread_running
_scheduler_thread_running = False
# 等待调度线程结束最多等待5秒
if _scheduler_thread and _scheduler_thread.is_alive():
_scheduler_thread.join(timeout=5)
# 保存策略数据
try:
StrategyPositionManager.save_strategy_data()
logger.info("策略数据已保存")
except Exception as e:
logger.error(f"保存策略数据失败: {str(e)}")
# 登出交易实例
try:
# 登出模拟交易实例
if _sim_trader_instance is not None:
_sim_trader_instance.logout()
logger.info("模拟交易实例已登出")
# 登出实盘交易实例
if _real_trader_instance is not None:
_real_trader_instance.logout()
logger.info("实盘交易实例已登出")
except Exception as e:
logger.error(f"登出交易实例失败: {str(e)}")
logger.info("程序退出清理完成")
# 注册程序退出处理函数
atexit.register(cleanup)
# 初始化交易环境
get_trader().login()
# 添加请求频率限制
app = Flask(__name__)
# 添加策略数据相关的定期任务
schedule.every().day.at(Config.CLEAN_ORDERS_TIME).do(StrategyPositionManager.clean_timeout_orders) # 每天清理超时委托
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(StrategyPositionManager.save_strategy_data) # 每天收盘后保存策略数据
# 程序启动时加载策略数据
StrategyPositionManager.load_strategy_data()
# 程序退出时保存策略数据
atexit.register(StrategyPositionManager.save_strategy_data)
# 使用配置文件中的时间
run_daily(Config.MARKET_OPEN_TIME, lambda: get_trader().login())
run_daily(Config.MARKET_CLOSE_TIME, lambda: get_trader().logout())
@app.route("/yu/healthcheck", methods=["GET"])
def health_check():
return "ok", 200
@app.route("/yu/buy", methods=["POST"])
def buy():
"""Buy an item with given parameters."""
logger.info("Received buy request")
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(amount_str)
if price <= 0 or amount <= 0:
raise ValueError("Price and amount must be positive")
# 检查是否需要模拟交易
should_simulate, simulation_reason = should_use_simulation()
# 自动判断需要使用模拟交易
if should_simulate:
# 使用模拟交易
logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}")
# 获取模拟交易实例并执行买入操作
sim_trader = get_sim_trader()
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 模拟交易立即生效,更新策略持仓
StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result, "simulation": True}), 200
# 检查是否在交易时间内
trading_hours, hours_message = is_trading_hours()
if not trading_hours:
logger.warning(f"实盘交易失败 - {hours_message} - 代码: {code}, 价格: {price}, 数量: {amount}")
return jsonify({"success": False, "error": f"交易失败: {hours_message},非交易时间不能实盘交易"}), 400
# 使用RealTraderManager执行实盘交易
logger.info(f"使用RealTraderManager执行买入: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}")
rtm = get_real_trader_manager()
result = rtm.place_order(strategy_name, code, 'buy', amount, price)
if result.get('success'):
logger.info(f"RealTraderManager买入成功: {result}")
return jsonify({"success": True, "data": result, "simulation": False}), 200
else:
logger.error(f"RealTraderManager买入失败: {result.get('error')}")
return jsonify({"success": False, "error": result.get('error')}), 400
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing buy request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/sell", methods=["POST"])
def sell():
"""Sell an item with given parameters."""
logger.info("Received sell request")
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(amount_str)
if price <= 0 or amount <= 0:
raise ValueError("Price and amount must be positive")
# 检查是否需要模拟交易
should_simulate, simulation_reason = should_use_simulation()
# 自动判断需要使用模拟交易
if should_simulate:
# 使用模拟交易
logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}")
# 获取模拟交易实例并执行卖出操作
sim_trader = get_sim_trader()
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 模拟交易下,使用简单更新模式
StrategyPositionManager.update_strategy_position(sim_trader, strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result, "simulation": True}), 200
# 检查是否在交易时间内
trading_hours, hours_message = is_trading_hours()
if not trading_hours:
logger.warning(f"实盘交易失败 - {hours_message} - 代码: {code}, 价格: {price}, 数量: {amount}")
return jsonify({"success": False, "error": f"交易失败: {hours_message},非交易时间不能实盘交易"}), 400
# 使用RealTraderManager执行实盘交易
logger.info(f"使用RealTraderManager执行卖出: 代码={code}, 价格={price}, 数量={amount}, 策略={strategy_name}")
rtm = get_real_trader_manager()
result = rtm.place_order(strategy_name, code, 'sell', amount, price)
if result.get('success'):
logger.info(f"RealTraderManager卖出成功: {result}")
return jsonify({"success": True, "data": result, "simulation": False}), 200
else:
logger.error(f"RealTraderManager卖出失败: {result.get('error')}")
return jsonify({"success": False, "error": result.get('error')}), 400
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing sell request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/cancel/<entrust_no>", methods=["DELETE"])
def cancel(entrust_no):
logger.info(f"Received cancel request for entrust_no={entrust_no}")
try:
# 不考虑是否为模拟交易,直接使用实盘
# 使用RealTraderManager
rtm = get_real_trader_manager()
# 在RealTraderManager的待处理订单中查找
found_in_rtm = False
for order in rtm.get_pending_orders():
if str(order['order_id']) == str(entrust_no):
found_in_rtm = True
# 使用RealTraderManager中的trader进行撤单
result = rtm.trader.cancel(entrust_no)
logger.info(f"通过RealTraderManager撤单结果: {result}")
# 更新订单状态
rtm.check_pending_orders()
return jsonify({"success": True, "data": result, "simulation": False}), 200
# 如果RealTraderManager中未找到使用普通实盘撤单
if not found_in_rtm:
logger.info(f"在RealTraderManager中未找到订单{entrust_no},使用普通实盘撤单")
real_trader = get_real_trader()
result = real_trader.cancel(entrust_no)
logger.info(f"普通实盘撤单结果: {result}")
# 更新未完成委托状态
StrategyPositionManager.update_pending_orders(real_trader)
return jsonify({"success": True, "data": result, "simulation": False}), 200
except Exception as e:
logger.error(f"Error processing cancel request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/balance", methods=["GET"])
def get_balance():
"""Get the balance of the account."""
logger.info("Received balance request")
try:
# 直接使用实盘交易实例,不考虑模拟盘
trader = get_real_trader()
balance = execute_with_timeout(trader.get_balance, Config.TRADE_TIMEOUT)
if balance is None:
logger.error("获取实盘余额超时")
return jsonify({"success": False, "error": "获取余额超时,请稍后重试", "simulation": False}), 500
logger.info(f"实盘交易余额: {balance}")
return jsonify({"success": True, "data": balance, "simulation": False}), 200
except Exception as e:
logger.error(f"Error processing balance request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/positions", methods=["GET"])
def get_positions():
"""Get the positions of the account."""
logger.info("Received positions request")
try:
# 获取查询参数
strategy_name = request.args.get("strategy_name", "")
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
# 选择相应的交易实例
trader = get_sim_trader() if should_simulate else get_real_trader()
# 更新未完成委托状态
StrategyPositionManager.update_pending_orders(trader)
# 如果实盘且指定要查询RealTraderManager中的目标持仓
if not should_simulate and request.args.get("target", "").lower() == "true":
rtm = get_real_trader_manager()
targets = rtm.get_strategy_targets()
# 如果指定了策略名称
if strategy_name:
strategy_target = targets.get(strategy_name, {})
return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200
return jsonify({"success": True, "data": targets, "simulation": False}), 200
# 使用StrategyPositionManager获取持仓信息
result = StrategyPositionManager.get_strategy_positions(trader, strategy_name if strategy_name else None)
return jsonify({"success": True, "data": result, "simulation": should_simulate}), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todaytrades", methods=["GET"])
def get_today_trades():
"""Get the today's trades of the account."""
logger.info("Received today trades request")
try:
# 直接使用实盘交易实例,不考虑模拟盘
trader = get_real_trader()
trades = trader.get_today_trades()
logger.info(f"今日成交: {trades}")
return jsonify({"success": True, "data": trades, "simulation": False}), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todayentrust", methods=["GET"])
def get_today_orders():
"""Get the today's entrust of the account."""
logger.info("Received today entrust request")
try:
# 直接使用实盘交易实例,不考虑模拟盘
trader = get_real_trader()
entrust = trader.get_today_orders()
logger.info(f"今日委托: {entrust}")
return jsonify({"success": True, "data": entrust, "simulation": False}), 200
except Exception as e:
logger.error(f"Error processing today entrust request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/clear/<strategy_name>", methods=["DELETE"])
def clear_strategy(strategy_name):
"""清除指定策略的持仓管理数据"""
logger.info(f"接收到清除策略持仓请求: {strategy_name}")
try:
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
# 如果是实盘模式使用RealTraderManager
if not should_simulate:
# 清除RealTraderManager中的策略目标
rtm = get_real_trader_manager()
if strategy_name in rtm.strategy_targets:
with _instance_lock: # 使用锁保护操作
if strategy_name in rtm.strategy_targets:
del rtm.strategy_targets[strategy_name]
logger.info(f"已清除RealTraderManager中的策略目标: {strategy_name}")
# 清除RealTraderManager中相关的待处理订单
pending_orders_to_remove = []
for order_id, order_info in rtm.pending_orders.items():
if order_info.get('strategy_name') == strategy_name:
pending_orders_to_remove.append(order_id)
# 删除相关订单
for order_id in pending_orders_to_remove:
with _instance_lock: # 使用锁保护操作
if order_id in rtm.pending_orders:
del rtm.pending_orders[order_id]
logger.info(f"已清除RealTraderManager中的订单: {order_id}")
# 获取相应的交易实例
trader = get_sim_trader() if should_simulate else get_real_trader()
# 如果是模拟交易实例,则重置模拟交易实例
if should_simulate and isinstance(trader, SimulationTrader):
with _instance_lock: # 使用锁保护操作
global _sim_trader_instance
if _sim_trader_instance is not None:
logger.info("重置模拟交易实例")
# 创建一个新的模拟交易实例,替换原有实例
_sim_trader_instance = SimulationTrader()
trader = _sim_trader_instance
# 使用StrategyPositionManager清除策略
success, message = StrategyPositionManager.clear_strategy(trader, strategy_name)
if success:
return jsonify({"success": True, "message": message, "simulation": should_simulate}), 200
else:
abort(400, description=message)
except Exception as e:
logger.error(f"清除策略持仓时出错: {str(e)}")
abort(500, description="服务器内部错误")
# 超时处理函数
def execute_with_timeout(func, timeout, *args, **kwargs):
"""执行函数并设置超时时间如果超时则返回None
Args:
func: 要执行的函数
timeout: 超时时间(秒)
args, kwargs: 传递给func的参数
Returns:
func的返回值如果超时则返回None
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout)
except TimeoutError:
logger.warning(f"函数 {func.__name__} 执行超时 (>{timeout}秒)")
return None
except Exception as e:
logger.error(f"函数 {func.__name__} 执行出错: {str(e)}")
return None
# 添加新的API端点查询订单状态
@app.route("/yu/order_status", methods=["GET"])
def get_order_status():
"""获取订单状态"""
logger.info("Received order status request")
try:
# 判断当前交易模式
should_simulate, _ = should_use_simulation()
if not should_simulate:
# 实盘模式使用RealTraderManager
try:
rtm = get_real_trader_manager()
pending_orders = rtm.get_pending_orders()
if pending_orders is None:
logger.error("从RealTraderManager获取订单状态失败")
return jsonify({"success": False, "error": "获取订单状态失败", "simulation": False}), 500
return jsonify({"success": True, "data": pending_orders, "simulation": False}), 200
except Exception as e:
logger.error(f"从RealTraderManager获取订单状态时出错: {str(e)}")
# 发生错误时,回退到使用普通交易实例
logger.info("回退到使用普通交易实例获取订单状态")
trader = get_real_trader()
try:
entrusts = execute_with_timeout(trader.get_today_orders, Config.TRADE_TIMEOUT)
if entrusts is None:
logger.error("获取今日委托超时")
return jsonify({"success": False, "error": "获取今日委托超时", "simulation": False}), 500
return jsonify({"success": True, "data": entrusts, "simulation": False}), 200
except Exception as e:
logger.error(f"获取今日委托时出错: {str(e)}")
return jsonify({"success": False, "error": f"获取今日委托时出错: {str(e)}", "simulation": False}), 500
else:
# 模拟交易模式
trader = get_sim_trader()
try:
entrusts = trader.get_today_orders()
return jsonify({"success": True, "data": entrusts, "simulation": True}), 200
except Exception as e:
logger.error(f"获取今日委托时出错: {str(e)}")
return jsonify({"success": False, "error": f"获取今日委托时出错: {str(e)}", "simulation": True}), 500
except Exception as e:
logger.error(f"处理订单状态请求时出错: {str(e)}")
abort(500, description="Internal server error")
# 添加新的API端点查询策略目标持仓
@app.route("/yu/strategy_targets", methods=["GET"])
def get_strategy_targets():
"""获取策略目标持仓"""
logger.info("Received strategy targets request")
try:
# 获取查询参数
strategy_name = request.args.get("strategy_name")
# 检查是否是实盘模式
should_simulate, _ = should_use_simulation()
if should_simulate:
return jsonify({"success": False, "error": "模拟交易模式下不支持目标持仓", "simulation": True}), 400
try:
rtm = get_real_trader_manager()
targets = rtm.get_strategy_targets()
# 如果指定了策略名称,则只返回该策略的目标持仓
if strategy_name:
strategy_target = targets.get(strategy_name, {})
return jsonify({"success": True, "data": {strategy_name: strategy_target}, "simulation": False}), 200
return jsonify({"success": True, "data": targets, "simulation": False}), 200
except Exception as e:
logger.error(f"获取策略目标持仓时出错: {str(e)}")
return jsonify({"success": False, "error": f"获取策略目标持仓时出错: {str(e)}", "simulation": False}), 500
except Exception as e:
logger.error(f"处理策略目标持仓请求时出错: {str(e)}")
abort(500, description="Internal server error")
if __name__ == "__main__":
logger.info(f"Server starting on {Config.HOST}:{Config.PORT}")
app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT)