real_trader/src/trade_server.py
2025-04-30 14:31:19 +08:00

758 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import schedule
import threading
import time
from xt_trader import XtTrader
from flask import Flask, request, abort, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from config import Config
import logging
import os
from logging.handlers import RotatingFileHandler
import concurrent.futures
from concurrent.futures import TimeoutError
import json
import atexit
from simulation_trader import SimulationTrader
import datetime
from xtquant import xtconstant
# 策略仓位管理
strategy_positions = {} # 存储各策略持仓
strategy_trades = {} # 存储各策略交易记录
pending_orders = {} # 存储未完成委托
# 全局交易实例(采用单例模式)
_sim_trader_instance = None # 模拟交易实例(单例)
_real_trader_instance = None # 实盘交易实例(单例)
# 获取交易实例
def get_trader(use_sim_trader=False):
"""获取交易实例 - 采用单例模式
Args:
use_sim_trader (bool): 是否强制使用模拟交易True表示必定返回模拟交易实例
Returns:
返回交易实例,根据参数和配置决定是模拟交易还是实盘交易
"""
global _sim_trader_instance, _real_trader_instance
# 如果强制使用模拟交易,返回模拟交易单例
if use_sim_trader:
# 如果模拟交易实例不存在,创建一个
if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader()
return _sim_trader_instance
# 如果配置为仅模拟交易,返回模拟交易单例
if Config.SIMULATION_ONLY:
# 如果模拟交易实例不存在,创建一个
if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader()
return _sim_trader_instance
# 判断当前是否为交易时间
now = datetime.datetime.now()
current_time = now.time()
# 是否在交易时间段内9:30-11:30, 13:00-15:00
morning_start = datetime.time(9, 30)
morning_end = datetime.time(11, 30)
afternoon_start = datetime.time(13, 0)
afternoon_end = datetime.time(15, 0)
is_trading_hour = (morning_start <= current_time <= morning_end) or (afternoon_start <= current_time <= afternoon_end)
# 尝试导入chinese_calendar判断是否为交易日
try:
from chinese_calendar import is_workday, is_holiday
is_trading_day = is_workday(now) and not is_holiday(now)
except ImportError:
# 如果无法导入chinese_calendar则简单地用工作日判断
is_trading_day = now.weekday() < 5 # 0-4 为周一至周五
# 如果不是交易日或不在交易时间内,返回模拟交易单例
if not is_trading_day or not is_trading_hour:
logger.info(f"当前非交易时段 - 日期: {now.date()}, 时间: {current_time}, 使用模拟交易")
# 如果模拟交易实例不存在,创建一个
if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader()
return _sim_trader_instance
# 否则返回真实交易单例(如果存在)或创建一个新的
if _real_trader_instance is None:
_real_trader_instance = XtTrader()
# 检查交易实例是否已登录,如果未登录则进行登录
if not _real_trader_instance.is_logged_in():
logger.info("创建新的XtTrader实例并登录")
_real_trader_instance.login()
return _real_trader_instance
# 配置日志
def setup_logger():
log_dir = Config.LOG_DIR
try:
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logger = logging.getLogger('trade_server')
logger.info(f"Created logs directory at: {os.path.abspath(log_dir)}")
except Exception as e:
print(f"Error creating logs directory: {str(e)}")
# 如果无法创建目录,使用当前目录
log_dir = "."
logger = logging.getLogger('trade_server')
logger.setLevel(getattr(logging, Config.LOG_LEVEL))
# 确保没有重复的处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 文件处理器
file_handler = RotatingFileHandler(
os.path.join(log_dir, 'trade_server.log'),
maxBytes=Config.LOG_MAX_BYTES,
backupCount=Config.LOG_BACKUP_COUNT
)
file_handler.setFormatter(logging.Formatter(Config.LOG_FORMAT))
logger.addHandler(file_handler)
# 添加控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(Config.LOG_FORMAT))
logger.addHandler(console_handler)
return logger
logger = setup_logger()
def run_daily(time_str, job_func):
"""设置每天在指定时间运行的任务
Args:
time_str: 运行时间,格式为"HH:MM"
job_func: 要运行的函数
"""
# 不再区分周一到周五,而是每天执行
# 交易日判断逻辑已移到get_trader函数中
schedule.every().day.at(time_str).do(job_func)
# 策略持仓管理辅助函数
def update_strategy_position(strategy_name, code, direction, amount):
"""更新策略持仓
Args:
strategy_name: 策略名称
code: 股票代码
direction: 'buy''sell'
amount: 交易数量
"""
if not strategy_name:
return
# 确保策略在字典中
if strategy_name not in strategy_positions:
strategy_positions[strategy_name] = {}
try:
# 获取交易实例持仓情况(不论真实还是模拟)
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
actual_positions = current_trader.get_positions()
code_position = next((pos for pos in actual_positions if pos.get('stock_code') == code), None)
# 记录实际持仓总量
actual_total = code_position.get('volume', 0) if code_position else 0
actual_can_use = code_position.get('can_use_volume', 0) if code_position else 0
logger.info(f"实际持仓 - 代码: {code}, 总量: {actual_total}, 可用: {actual_can_use}")
# 如果股票代码在持仓字典中不存在,初始化它
if code not in strategy_positions[strategy_name]:
strategy_positions[strategy_name][code] = {
'total_amount': 0,
'closeable_amount': 0
}
# 直接使用实际持仓数据更新策略持仓
strategy_positions[strategy_name][code]['total_amount'] = actual_total
strategy_positions[strategy_name][code]['closeable_amount'] = actual_can_use
logger.info(f"更新策略持仓 - 策略: {strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, 总量: {strategy_positions[strategy_name][code]['total_amount']}, 可用: {strategy_positions[strategy_name][code]['closeable_amount']}")
except Exception as e:
logger.error(f"获取实际持仓失败: {str(e)}")
# 异常情况下只记录错误,不尝试更新持仓
# 移除total_amount为0的持仓
if code in strategy_positions[strategy_name] and strategy_positions[strategy_name][code]['total_amount'] <= 0:
del strategy_positions[strategy_name][code]
def update_pending_orders():
"""更新未完成委托状态"""
try:
# 获取今日委托
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
today_entrusts = current_trader.get_today_entrust()
# 更新委托状态
for order_id, order_info in list(pending_orders.items()):
entrust = next((e for e in today_entrusts if e.get('order_id') == order_id), None)
if entrust:
if entrust.get('order_status') in [xtconstant.ORDER_SUCCEEDED, xtconstant.ORDER_PART_SUCC]:
# 成交量计算
traded_amount = int(entrust.get('traded_volume', 0))
# 更新策略持仓
update_strategy_position(
order_info['strategy_name'],
order_info['code'],
order_info['direction'],
traded_amount
)
# 如果完全成交,从待处理列表中移除
if entrust.get('order_status') == xtconstant.ORDER_SUCCEEDED:
del pending_orders[order_id]
# 如果已撤单、废单等终态,也从待处理列表中移除
elif entrust.get('order_status') in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]:
del pending_orders[order_id]
except Exception as e:
logger.error(f"更新未完成委托状态失败: {str(e)}")
def add_pending_order(order_id, strategy_name, code, price, amount, direction):
"""添加未完成委托
Args:
order_id: 委托编号
strategy_name: 策略名称
code: 股票代码
price: 委托价格
amount: 委托数量
direction: 交易方向,'buy''sell'
"""
if not order_id or order_id == 'simulation':
return
# 添加到未完成委托列表
pending_orders[order_id] = {
'strategy_name': strategy_name,
'code': code,
'price': price,
'amount': amount,
'direction': direction,
'created_time': time.time()
}
# 同时记录到交易历史
if strategy_name:
if strategy_name not in strategy_trades:
strategy_trades[strategy_name] = []
strategy_trades[strategy_name].append({
'time': time.strftime('%Y-%m-%d %H:%M:%S'),
'type': direction,
'code': code,
'price': price,
'amount': amount,
'order_id': order_id,
'status': 'pending'
})
logger.info(f"添加未完成委托: {order_id}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}")
def clean_timeout_orders():
"""清理超时委托"""
current_time = time.time()
for order_id, order_info in list(pending_orders.items()):
# 超过24小时的委托视为超时
if current_time - order_info['created_time'] > 24 * 60 * 60:
del pending_orders[order_id]
def load_strategy_data():
"""加载策略数据"""
global strategy_positions, strategy_trades, pending_orders
try:
if os.path.exists('strategy_data.json'):
with open('strategy_data.json', 'r') as f:
data = json.load(f)
strategy_positions = data.get('positions', {})
strategy_trades = data.get('trades', {})
pending_orders = data.get('pending_orders', {})
except Exception as e:
logger.error(f"加载策略数据失败: {str(e)}")
def save_strategy_data():
"""保存策略数据"""
try:
with open('strategy_data.json', 'w') as f:
json.dump({
'positions': strategy_positions,
'trades': strategy_trades,
'pending_orders': pending_orders
}, f)
except Exception as e:
logger.error(f"保存策略数据失败: {str(e)}")
def run_pending_tasks():
while True:
try:
schedule.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"Error in scheduler: {str(e)}")
# Run the task scheduler in a new thread
threading.Thread(target=run_pending_tasks).start()
# 初始化交易环境
get_trader().login()
# 添加请求频率限制
app = Flask(__name__)
# 添加策略数据相关的定期任务
schedule.every().day.at("00:01").do(clean_timeout_orders) # 每天清理超时委托
schedule.every().day.at("15:30").do(save_strategy_data) # 每天收盘后保存策略数据
# 程序启动时加载策略数据
load_strategy_data()
# 程序退出时保存策略数据
atexit.register(save_strategy_data)
# 使用配置文件中的时间
run_daily(Config.MARKET_OPEN_TIME, lambda: get_trader().login())
run_daily(Config.MARKET_ACTIVE_TIME, lambda: get_trader().get_balance())
run_daily(Config.MARKET_CLOSE_TIME, lambda: get_trader().logout())
@app.route("/yu/healthcheck", methods=["GET"])
def health_check():
return "ok", 200
def should_use_simulation():
"""判断是否应该使用模拟交易
Returns:
tuple: (should_simulate: bool, simulation_reason: str)
should_simulate: 是否应该使用模拟交易
simulation_reason: 使用模拟交易的原因
"""
# 直接使用get_trader()返回的实例类型判断
trader = get_trader()
if isinstance(trader, SimulationTrader):
# 获取原因
if Config.SIMULATION_ONLY:
return True, "配置为仅模拟交易"
else:
now = datetime.datetime.now()
return True, f"当前非交易时段 - {now.strftime('%Y-%m-%d %H:%M:%S')}"
# 如果是实盘交易实例
return False, ""
@app.route("/yu/buy", methods=["POST"])
def buy():
"""Buy an item with given parameters."""
logger.info("Received buy request")
# 每次操作前更新未完成委托状态
update_pending_orders()
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(amount_str)
if price <= 0 or amount <= 0:
raise ValueError("Price and amount must be positive")
# 检查是否需要模拟交易
should_simulate, simulation_reason = should_use_simulation()
if should_simulate:
# 使用模拟交易
logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}")
# 获取模拟交易实例并执行买入操作
sim_trader = get_trader(True)
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 模拟交易立即生效,更新策略持仓
update_strategy_position(strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
# 尝试实盘交易
logger.info(f"Executing buy order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}")
try:
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
result = execute_with_timeout(current_trader.buy, Config.TRADE_TIMEOUT, code, price, amount)
if result is None:
# 超时时使用模拟交易
logger.warning(f"Buy order timeout after {Config.TRADE_TIMEOUT} seconds, switching to simulation mode")
# 创建模拟交易实例并执行买入操作
sim_trader = get_trader(True)
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 超时情况下,使用模拟交易,立即更新策略持仓
update_strategy_position(strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
# 如果指定了策略名称且是真实交易
if strategy_name and 'order_id' in result and result['order_id'] != 'simulation':
order_id = result['order_id']
# 添加到未完成委托
add_pending_order(
order_id,
strategy_name,
code,
price,
amount,
'buy'
)
# 注意不在这里调用update_strategy_position
# 持仓更新将由update_pending_orders函数处理
# 这避免了持仓更新的冗余操作
logger.info(f"Buy order result: {result}")
return jsonify({"success": True, "data": result}), 200
except Exception as e:
# 发生错误时使用模拟交易
logger.error(f"Buy order failed: {str(e)}, switching to simulation mode")
# 创建模拟交易实例并执行买入操作
sim_trader = get_trader(True)
result = sim_trader.buy(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 错误情况下,使用模拟交易,立即更新策略持仓
update_strategy_position(strategy_name, code, 'buy', amount)
return jsonify({"success": True, "data": result}), 200
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing buy request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/sell", methods=["POST"])
def sell():
"""Sell an item with given parameters."""
logger.info("Received sell request")
# 每次操作前更新未完成委托状态
update_pending_orders()
# Get data from request body
data = request.get_json()
code = data.get("code")
price_str = data.get("price")
amount_str = data.get("amount")
strategy_name = data.get("strategy_name", "") # 新增策略名称参数,默认为空
try:
if not all([code, price_str, amount_str]):
raise ValueError("Missing required parameters")
price = float(price_str)
amount = int(amount_str)
if price <= 0 or amount <= 0:
raise ValueError("Price and amount must be positive")
# 检查是否需要模拟交易
should_simulate, simulation_reason = should_use_simulation()
if should_simulate:
# 使用模拟交易
logger.info(f"使用模拟交易 - {simulation_reason} - 代码: {code}, 价格: {price}, 数量: {amount}")
# 获取模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 模拟交易下,使用简单更新模式
update_strategy_position(strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
# 尝试实盘交易
logger.info(f"Executing sell order: code={code}, price={price}, amount={amount}, strategy_name={strategy_name}")
try:
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
result = execute_with_timeout(current_trader.sell, Config.TRADE_TIMEOUT, code, price, amount)
if result is None:
# 超时时使用模拟交易
logger.warning(f"Sell order timeout after {Config.TRADE_TIMEOUT} seconds, switching to simulation mode")
# 创建模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 超时情况下,使用简单更新模式
update_strategy_position(strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
# 如果指定了策略名称,记录到未完成委托
if strategy_name and 'order_id' in result and result['order_id'] != 'simulation':
order_id = result['order_id']
# 添加到未完成委托
add_pending_order(
order_id,
strategy_name,
code,
price,
amount,
'sell'
)
logger.info(f"Sell order result: {result}")
return jsonify({"success": True, "data": result}), 200
except Exception as e:
# 发生错误时使用模拟交易
logger.error(f"Sell order failed: {str(e)}, switching to simulation mode")
# 创建模拟交易实例并执行卖出操作
sim_trader = get_trader(True)
result = sim_trader.sell(code, price, amount)
# 如果指定了策略名称,记录到策略持仓
if strategy_name:
# 错误情况下,使用简单更新模式
update_strategy_position(strategy_name, code, 'sell', amount)
return jsonify({"success": True, "data": result}), 200
except ValueError as e:
logger.error(f"Invalid request parameters: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"Error processing sell request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/cancel/<entrust_no>", methods=["DELETE"])
def cancel(entrust_no):
logger.info(f"Received cancel request for entrust_no={entrust_no}")
try:
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
result = current_trader.cancel(entrust_no)
logger.info(f"Cancel result: {result}")
# 如果取消成功从pending_orders中移除该订单
if entrust_no in pending_orders:
order_info = pending_orders[entrust_no]
logger.info(f"从待处理委托中移除已取消订单: {entrust_no}, 代码: {order_info.get('code', 'unknown')}")
del pending_orders[entrust_no]
response = {"success": True, "data": result}
return jsonify(response), 200
except Exception as e:
logger.error(f"Error processing cancel request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/balance", methods=["GET"])
def get_balance():
"""Get the balance of the account."""
logger.info("Received balance request")
try:
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
balance = current_trader.get_balance()
logger.info(f"Balance: {balance}")
response = {"success": True, "data": balance}
return jsonify(response), 200
except Exception as e:
print(e)
logger.error(f"Error processing balance request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/positions", methods=["GET"])
def get_positions():
"""Get the positions of the account."""
logger.info("Received positions request")
# 每次查询前更新未完成委托状态
update_pending_orders()
try:
# 获取查询参数中的策略名称
strategy_name = request.args.get("strategy_name", "")
# 如果指定了策略名称,返回该策略的持仓
if strategy_name:
# 获取真实账户持仓,用于计算可交易量
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
real_positions = current_trader.get_positions()
real_positions_map = {}
for pos in real_positions:
# 使用xt_trader返回的字段名
if 'stock_code' in pos and 'can_use_volume' in pos:
real_positions_map[pos['stock_code']] = pos
# 如果该策略没有记录,返回空列表
if strategy_name not in strategy_positions:
logger.info(f"Strategy {strategy_name} has no positions")
return jsonify({"success": True, "data": []}), 200
# 合并策略持仓和真实持仓的可交易量
result = []
for code, pos_info in strategy_positions[strategy_name].items():
# 忽略total_amount为0的持仓
if pos_info['total_amount'] <= 0:
continue
# 使用真实账户的可交易量作为策略的可交易量上限
real_pos = real_positions_map.get(code, {})
closeable = min(pos_info['total_amount'], real_pos.get('can_use_volume', 0))
result.append({
code: {
'total_amount': pos_info['total_amount'],
'closeable_amount': closeable
}
})
logger.info(f"Strategy {strategy_name} positions: {result}")
return jsonify({"success": True, "data": result}), 200
# 否则返回原始持仓
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
positions = current_trader.get_positions()
logger.info(f"Positions: {positions}")
response = {"success": True, "data": positions}
return jsonify(response), 200
except Exception as e:
logger.error(f"Error processing positions request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todaytrades", methods=["GET"])
def get_today_trades():
"""Get the today's trades of the account."""
logger.info("Received today trades request")
try:
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
trades = current_trader.get_today_trades()
logger.info(f"Today trades: {trades}")
response = {"success": True, "data": trades}
return jsonify(response), 200
except Exception as e:
logger.error(f"Error processing today trades request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/todayentrust", methods=["GET"])
def get_today_entrust():
"""Get the today's entrust of the account."""
logger.info("Received today entrust request")
try:
current_trader = get_trader()
# get_trader 已经确保交易实例是登录状态的,无需再次检查
entrust = current_trader.get_today_entrust()
logger.info(f"Today entrust: {entrust}")
response = {"success": True, "data": entrust}
return jsonify(response), 200
except Exception as e:
logger.error(f"Error processing today entrust request: {str(e)}")
abort(500, description="Internal server error")
@app.route("/yu/clear/<strategy_name>", methods=["DELETE"])
def clear_strategy(strategy_name):
"""清除指定策略的持仓管理数据"""
logger.info(f"接收到清除策略持仓请求: {strategy_name}")
try:
if not strategy_name:
raise ValueError("缺少策略名称参数")
# 检查策略是否存在
if strategy_name in strategy_positions:
# 从策略持仓字典中删除该策略
del strategy_positions[strategy_name]
# 清除该策略的交易记录
if strategy_name in strategy_trades:
del strategy_trades[strategy_name]
# 清除与该策略相关的未完成委托
for order_id, order_info in list(pending_orders.items()):
if order_info.get('strategy_name') == strategy_name:
del pending_orders[order_id]
# 保存更新后的策略数据
save_strategy_data()
logger.info(f"成功清除策略持仓数据: {strategy_name}")
return jsonify({"success": True, "message": f"成功清除策略 '{strategy_name}' 的持仓数据"}), 200
else:
logger.info(f"策略不存在或没有持仓数据: {strategy_name}")
return jsonify({"success": True, "message": f"策略 '{strategy_name}' 不存在或没有持仓数据"}), 200
except ValueError as e:
logger.error(f"无效的请求参数: {str(e)}")
abort(400, description=str(e))
except Exception as e:
logger.error(f"清除策略持仓时出错: {str(e)}")
abort(500, description="服务器内部错误")
# 超时处理函数
def execute_with_timeout(func, timeout, *args, **kwargs):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout)
except TimeoutError:
return None
if __name__ == "__main__":
logger.info(f"Server starting on {Config.HOST}:{Config.PORT}")
app.run(debug=Config.DEBUG, host=Config.HOST, port=Config.PORT)