Compare commits

..

33 Commits

Author SHA1 Message Date
zhiyong
f29f22e3cb update example 2025-05-11 22:25:45 +08:00
zhiyong
d3c1fa1246 删除冗余的log 2025-05-11 21:50:06 +08:00
zhiyong
f57ec0451c remove debug codes 2025-05-11 21:47:36 +08:00
zhiyong
1d4d52c053 fix: restart mini qmt 2025-05-11 21:29:23 +08:00
zhiyong
f76b38d1f5 feat: restart mini qmt software on working day 2025-05-11 21:05:48 +08:00
zhiyong
2e094d2776 update get_trader 2025-05-11 20:06:33 +08:00
zhiyong
a93e51f273 update results when clear strategy positions but strategy does not exist 2025-05-11 02:42:43 +08:00
zhiyong
e85d8bcdc8 fix logs folder position 2025-05-11 02:37:06 +08:00
zhiyong
0d491d14e3 feat: 实盘登陆失败, 发送邮件, 并每小时尝试连接 2025-05-11 02:24:54 +08:00
zhiyong
1c1b19383c feat: 自动重连, 并在重连失败时发送邮件警报 2025-05-11 02:05:39 +08:00
zhiyong
a9e074a116 添加断线重连功能 2025-05-11 01:18:04 +08:00
zhiyong
363cb17ea4 如果position data 文件不存在, 创建它 2025-05-10 23:39:11 +08:00
zhiyong
ffb2952fce update todayentrust to todayorders 2025-05-10 23:34:11 +08:00
zhiyong
e525c9e5f7 fix get_positions in simulation trader 2025-05-10 23:29:52 +08:00
zhiyong
89b8ffe767 fix sell in simulation trader 2025-05-10 23:22:29 +08:00
zhiyong
ba943ce9c6 fix sell in simulation trader 2025-05-10 23:20:29 +08:00
zhiyong
675ffb46a6 update logs dir and data dir 2025-05-10 23:03:41 +08:00
zhiyong
42f5064e77 fix no is_trading_hours in simulation trader 2025-05-10 22:58:16 +08:00
zhiyong
f8821fe1ad update data folder 2025-05-10 22:55:57 +08:00
zhiyong
cd0a747646 change log folder path 2025-05-10 22:48:01 +08:00
zhiyong
58c8bafe45 fix cannot buy for simulation trader 2025-05-10 22:41:56 +08:00
zhiyong
28de80a779 fix: no is_trading_hours in simulation trader 2025-05-10 22:37:31 +08:00
zhiyong
e818699c13 fix logger in base_trader 2025-05-10 22:14:51 +08:00
zhiyong
6ad14c689e fix buy sell for simulation trader 2025-05-10 22:11:22 +08:00
zhiyong
5e732bf97a update log in trade server 2025-05-10 21:43:12 +08:00
zhiyong
d9800ba094 remove unused method 2025-05-10 21:38:42 +08:00
zhiyong
f8448e0323 fix: cannot launch trader server 2025-05-10 21:33:57 +08:00
zhiyong
f971a95320 update python to 3.10.5 2025-05-10 21:20:47 +08:00
zhiyong
5b97619411 update login 2025-05-10 18:21:32 +08:00
zhiyong
ee165eb6fe updated trade server 2025-05-10 18:08:18 +08:00
zhiyong
a9f654d359 update position manager 2025-05-09 18:31:41 +08:00
zhiyong
978834772b refactor position manager 2025-05-01 15:19:27 +08:00
zhiyong
a407ce1f2f refactor trader 2025-05-01 05:33:28 +08:00
29 changed files with 13486 additions and 2006 deletions

View File

@ -1 +1 @@
3.12.8
3.10.5

View File

@ -41,7 +41,7 @@ Real Trader是一个量化交易执行平台专为中国A股市场设计
## 环境要求
- Python 3.12.8+
- Python 3.10.5+
- 依赖库:
- chinese-calendar
- flask
@ -57,7 +57,7 @@ Real Trader是一个量化交易执行平台专为中国A股市场设计
- **PORT**服务端口默认9527
- **HOST**服务监听地址默认0.0.0.0
- **DEBUG**调试模式默认False
- **SIMULATION_ONLY**是否仅使用模拟交易默认False
- **SIMULATION_MODE**是否仅使用模拟交易默认False
- **XT_ACCOUNT**XtQuant账号
- **XT_PATH**XtQuant路径
@ -100,3 +100,26 @@ print(response.json())
- 系统默认会根据交易时间自动判断是否使用模拟交易
- 交易日判断基于chinese-calendar库
- 请确保配置正确的交易账号和路径
## design
### strategy position manager
策略仓位管理是用于保存,更新基于策略名的股票仓位, 和未完成订单的
父类: BasePositionManager
子类: RealPositionManager(放入real 模块), SimulationPositionManager(放入simulation 模块)
position manager 中保存两个字典, positions, pending_orders, key都是策略名
position manager在trade_server中初始化, 作为参数传入trader
完整的交易流程是:
1. 下单
用户调用trader下单, trader在发出下单信号的同时添加一个pending_order给position manager
pending_order的结构是{order_id, order_status}, 当order_status是完成状态时, 应该从字典中删除
下单没有给策略名的, 策略名默认为"default_strategy"
2. 更新pending order状态
模拟盘立刻全部成交, 在下单后立刻更新仓位, 并删除pending order, 需要打印日志
实盘由real_trader_manager管理pending order状态, 具体是
- 下单后立刻尝试更新pending order状态, 比如状态变为部分成交, 全部成交等, 同时更新持仓,并计划一个1分钟后的任务
- 1分钟后再次更新订单状态, 如果全部成交, 则更新持仓, 否则(部分成交, 无成交), 撤单, 并下一个市价单数量是原先订单数量, 或者补单数量(部分成交)
- 如果下单发生错误, 表示没有成功下单, 则不添加pending order, 也不更新仓位, 即忽略这笔订单, 打印错误日志
3. 收盘后保存策略持仓(模拟盘, 实盘单独保存)
4. server启动时载入持仓文件
以上设计基于简洁, 逻辑清晰, 流程简单的思路, 如果有更好的建议, 可以提供

View File

@ -3,7 +3,7 @@ import requests
# 服务器地址
URL = "http://trader.biggerfish.tech:9527/yu"
# 策略名称常量
STRATEGY = "港股ETF网格"
STRATEGY = "香港证券ETF网格"
def buy(code: str, price: float, amount: int, strategy_name: str = STRATEGY) -> dict:
"""买入股票
@ -41,28 +41,8 @@ def sell(code: str, price: float, amount: int, strategy_name: str = STRATEGY) ->
response = requests.post(f"{URL}/sell", json=data)
return response.json()
def get_positions(strategy_name: str = STRATEGY) -> dict:
"""获取持仓信息
Args:
strategy_name: 策略名称默认为STRATEGY常量
"""
params = {}
if strategy_name:
params["strategy_name"] = strategy_name
response = requests.get(f"{URL}/positions", params=params)
return response.json()
def get_target_positions(strategy_name: str = STRATEGY) -> dict:
"""获取目标持仓信息(仅实盘模式有效)
Args:
strategy_name: 策略名称默认为STRATEGY常量
"""
params = {"target": "true"}
if strategy_name:
params["strategy_name"] = strategy_name
response = requests.get(f"{URL}/positions", params=params)
def get_positions() -> dict:
response = requests.get(f"{URL}/positions")
return response.json()
def clear_strategy(strategy_name: str = STRATEGY) -> dict:
@ -92,49 +72,25 @@ def get_today_trades() -> dict:
response = requests.get(f"{URL}/todaytrades")
return response.json()
def get_today_entrust() -> dict:
def get_today_orders() -> dict:
"""获取今日委托记录(仅实盘模式)
Returns:
字典形式的今日委托记录
"""
response = requests.get(f"{URL}/todayentrust")
response = requests.get(f"{URL}/todayorders")
return response.json()
def cancel_order(entrust_no: str) -> dict:
def cancel_order(order_id: str) -> dict:
"""取消订单
Args:
entrust_no: 委托编号
order_id: 委托编号
Returns:
取消结果
"""
response = requests.delete(f"{URL}/cancel/{entrust_no}")
return response.json()
def get_order_status() -> dict:
"""获取订单状态
Returns:
字典形式的订单状态信息
"""
response = requests.get(f"{URL}/order_status")
return response.json()
def get_strategy_targets(strategy_name: str = STRATEGY) -> dict:
"""获取策略目标持仓(仅实盘模式)
Args:
strategy_name: 策略名称默认为STRATEGY常量
Returns:
字典形式的策略目标持仓信息
"""
params = {}
if strategy_name:
params["strategy_name"] = strategy_name
response = requests.get(f"{URL}/strategy_targets", params=params)
response = requests.delete(f"{URL}/cancel/{order_id}")
return response.json()
def check_health() -> str:
@ -168,10 +124,6 @@ if __name__ == "__main__":
# 模拟输出:
#{'data': {'message': '模拟买入 - 代码: 601988.SH, 价格: 3.45, 数量: 3000', 'order_id': 'simulation'}, 'success': True}
# 示例:获取订单状态
order_status = get_order_status()
print("订单状态:", order_status)
# 示例卖出中国银行1000股价格3.48
result = sell("601988.SH", 3.48, 1000)
print("卖出结果:", result)
@ -183,8 +135,8 @@ if __name__ == "__main__":
print("今日成交:", trades)
# 示例:获取今日委托记录
entrusts = get_today_entrust()
print("今日委托:", entrusts)
orders = get_today_orders()
print("今日委托:", orders)
# 示例:再次查询持仓变化
positions = get_positions()
@ -192,20 +144,8 @@ if __name__ == "__main__":
# 模拟输出:
# {'data': [{'601988.SH': {'closeable_amount': 2000, 'total_amount': 2000}}], 'success': True}
# 示例:获取目标持仓
target_positions = get_target_positions()
print("目标持仓:", target_positions)
# 示例:获取策略目标持仓
strategy_targets = get_strategy_targets()
print("策略目标持仓:", strategy_targets)
# 示例:取消订单(需要真实的委托编号)
# cancel_result = cancel_order("123456")
# print("取消订单结果:", cancel_result)
# 示例:清除策略持仓数据
result = clear_strategy()
result = clear_strategy("测试策略名")
print("清除策略持仓结果:", result)
# 模拟输出:
# {'message': "成功清除策略 '追梦投资港股ETF' 的持仓数据", 'success': True}

View File

@ -1,6 +0,0 @@
def main():
print("Hello from real-trader!")
if __name__ == "__main__":
main()

View File

@ -3,8 +3,9 @@ name = "real-trader"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12.8"
requires-python = ">=3.10.5"
dependencies = [
"black>=25.1.0",
"chinese-calendar>=1.10.0",
"flask>=3.1.0",
"flask-limiter>=3.12",

11160
resources/grouped_etf.json Normal file

File diff suppressed because it is too large Load Diff

231
src/base_trader.py Normal file
View File

@ -0,0 +1,231 @@
import datetime as dt
from chinese_calendar import is_workday
from abc import ABC, abstractmethod
from logger_config import get_logger
from position_manager import PositionManager
from typing import Dict
# 获取日志记录器
class BaseTrader(ABC):
"""交易基类,定义交易接口的通用方法"""
def __init__(self, logger):
"""初始化交易基类"""
self.position_managers: Dict[str, PositionManager] = {}
self.logger = logger
pass
@abstractmethod
def is_logged_in(self):
"""检查交易系统是否已经登录
Returns:
bool: True表示已登录False表示未登录
"""
pass
@abstractmethod
def login(self):
"""登录交易系统
Returns:
bool: 登录是否成功
"""
pass
@abstractmethod
def logout(self):
"""登出交易系统"""
pass
@abstractmethod
def get_balance(self):
"""获取账户资金情况
Returns:
dict: 账户资金信息若失败返回None
"""
pass
@abstractmethod
def get_positions(self):
"""获取所有持仓
Returns:
list: 持仓列表若无持仓返回空列表
"""
pass
@abstractmethod
def get_position(self, stock_code):
"""查询指定股票代码的持仓信息
Args:
stock_code: 股票代码例如 "600000.SH"
Returns:
dict: 持仓详情如果未持有则返回None
"""
pass
@abstractmethod
def get_today_trades(self):
"""获取当日成交
Returns:
list: 成交列表若无成交返回空列表
"""
pass
@abstractmethod
def get_today_orders(self):
"""获取当日委托
Returns:
list: 委托列表若无委托返回空列表
"""
pass
@abstractmethod
def get_order(self, order_id):
"""查询指定订单ID的详细信息
Args:
order_id: 订单ID
Returns:
dict: 订单详情如果未找到则返回None
"""
pass
@abstractmethod
def buy(self, code, price, amount, order_type='limit'):
"""买入股票
Args:
code: 股票代码
price: 买入价格市价单时可为0
amount: 买入数量
order_type: 订单类型'limit'=限价单'market'=市价单默认为'limit'
Returns:
dict: 包含订单ID的字典
"""
pass
@abstractmethod
def sell(self, code, price, amount, order_type='limit'):
"""卖出股票
Args:
code: 股票代码
price: 卖出价格市价单时可为0
amount: 卖出数量
order_type: 订单类型'limit'=限价单'market'=市价单默认为'limit'
Returns:
dict: 包含订单ID的字典
"""
pass
@abstractmethod
def cancel(self, order_id):
"""撤销订单
Args:
order_id: 订单ID
Returns:
dict: 撤单结果
"""
pass
@staticmethod
def is_trading_time(self):
"""判断当前是否为交易时间
Returns:
bool: True 表示当前为交易时间False 表示当前休市
"""
try:
now = dt.datetime.now()
# 先判断是否为交易日
if not self.is_trading_date():
return False
# 判断是否在交易时间段内
current_time = now.time()
morning_start = dt.time(9, 30) # 上午开市时间 9:30
morning_end = dt.time(11, 30) # 上午休市时间 11:30
afternoon_start = dt.time(13, 0) # 下午开市时间 13:00
afternoon_end = dt.time(15, 0) # 下午休市时间 15:00
# 判断是否在上午或下午的交易时段
is_morning_session = morning_start <= current_time <= morning_end
is_afternoon_session = afternoon_start <= current_time <= afternoon_end
return is_morning_session or is_afternoon_session
except Exception as e:
self.logger.error(f"判断交易时间发生错误: {str(e)}")
return False
@staticmethod
def is_trading_date(date=None):
"""判断指定日期是否为交易日
Args:
date: 日期默认为当前日期
Returns:
bool: True 表示是交易日False 表示非交易日
"""
try:
# 如果未指定日期,使用当前日期
if date is None:
date = dt.datetime.now()
# 使用 chinese_calendar 判断是否为工作日(考虑节假日和调休)
return is_workday(date)
except Exception as e:
logger = get_logger("BaseTrader")
logger.error(f"判断交易日期发生错误: {str(e)}")
return False
def get_position_manager(self, strategy_name) -> PositionManager:
"""获取指定策略的持仓管理器
Args:
strategy_name: 策略名称
Returns:
PositionManager: 指定策略的持仓管理器
"""
if strategy_name not in self.position_managers:
self.position_managers[strategy_name] = PositionManager(strategy_name)
return self.position_managers[strategy_name]
def get_all_position_managers(self) -> Dict[str, PositionManager]:
"""获取所有持仓管理器"""
return self.position_managers
def is_today(self, datetime: dt.datetime) -> bool:
"""判断指定日期是否为当前日期
Args:
datetime: 日期时间
Returns:
bool: True 表示是当前日期False 表示不是当前日期
"""
return datetime.date() == dt.datetime.now().date()
def clear_position_manager(self, strategy_name):
"""清除指定策略的持仓管理器"""
if strategy_name in self.position_managers:
self.position_managers[strategy_name].clear()
return True
return False

View File

@ -3,53 +3,60 @@ import datetime
class Config:
# Server settings
PORT = int(os.environ.get("PORT", 9527))
HOST = os.environ.get("HOST", "0.0.0.0")
DEBUG = os.environ.get("DEBUG", "False").lower() == "true"
PORT = 9527
HOST = "0.0.0.0"
DEBUG = False
# Trading settings
TRADE_TIMEOUT = int(os.environ.get("TRADE_TIMEOUT", 5)) # 交易超时时间(秒)
SIMULATION_ONLY = os.environ.get("SIMULATION_ONLY", "False").lower() == "true"
TRADE_TIMEOUT = 5 # 交易超时时间(秒)
SIMULATION_MODE = False
# Trading hours
MARKET_OPEN_TIME = os.environ.get("MARKET_OPEN_TIME", "09:15")
MARKET_CLOSE_TIME = os.environ.get("MARKET_CLOSE_TIME", "15:30")
MARKET_OPEN_TIME = "09:20"
MARKET_CLOSE_TIME = "15:10"
# Logging
LOG_DIR = "logs"
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "logs")
LOG_LEVEL = "INFO"
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB
LOG_BACKUP_COUNT = 5
# strategy data
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data")
# API Rate limiting
RATE_LIMIT_REQUESTS = 100
RATE_LIMIT_PERIOD = 60 # seconds
# XtQuant 相关配置
XT_ACCOUNT = os.environ.get("XT_ACCOUNT", "80391818")
XT_PATH = os.environ.get("XT_PATH", r'C:\\江海证券QMT实盘_交易\\userdata_mini')
XT_ACCOUNT = "80391818"
XT_PATH = r'C:\\江海证券QMT实盘_交易\\userdata_mini'
XT_LAUNCHER = r'C:\\江海证券QMT实盘_交易\\bin.x64\\XtItClient.exe'
XT_PROCESS1 = r'miniquote.exe'
XT_PROCESS2 = r'XtMiniQmt.exe'
XT_RESTART_TIME = "09:00"
# 重连相关配置
XT_RECONNECT_INTERVAL = 3600 # 重连尝试间隔(秒)
XT_MAX_SESSION_ID = 999999 # 最大会话ID
XT_MIN_SESSION_ID = 100000 # 最小会话ID
XT_SESSION_ID_RANGE = 20 # 一次尝试的会话ID数量
# 邮件通知配置
MAIL_ENABLED = True
MAIL_SERVER = "mail.yushaoyou.com"
MAIL_PORT = 465
MAIL_USERNAME = "jq@yushaoyou.com"
MAIL_PASSWORD = "zhiyong214"
MAIL_FROM = "自动交易服务器"
MAIL_TO = ["jq@yushaoyou.com"] # 可以是多个邮箱
# RealTraderManager配置
RTM_ORDER_TIMEOUT = int(os.environ.get("RTM_ORDER_TIMEOUT", 60)) # 订单超时时间(秒)
RTM_MAX_RETRIES = int(os.environ.get("RTM_MAX_RETRIES", 3)) # 最大重试次数
RTM_USE_MARKET_ORDER = os.environ.get("RTM_USE_MARKET_ORDER", "True").lower() == "true" # 是否使用市价单进行补单
RTM_ORDER_TIMEOUT = 60 # 订单超时时间(秒)
RTM_MAX_RETRIES = 3 # 最大重试次数
RTM_USE_MARKET_ORDER = True # 是否使用市价单进行补单
# 计划任务运行时间
STRATEGY_SAVE_TIME = "15:10" # 每天保存策略数据的时间
CLEAN_ORDERS_TIME = "15:05" # 每天清理超时委托的时间
@staticmethod
def is_market_open():
"""判断当前是否在交易时间内
Returns:
bool: 是否在交易时间内
"""
now = datetime.datetime.now().time()
morning_start = datetime.time(9, 30)
morning_end = datetime.time(11, 30)
afternoon_start = datetime.time(13, 0)
afternoon_end = datetime.time(15, 0)
return (morning_start <= now <= morning_end) or (afternoon_start <= now <= afternoon_end)

16
src/local_order.py Normal file
View File

@ -0,0 +1,16 @@
from trade_constants import ORDER_STATUS_PENDING
from datetime import datetime
class LocalOrder:
def __init__(self, order_id, code, price, amount, direction, order_type='limit'):
self.order_id = order_id
self.code = code
self.price = price
self.amount = amount
self.filled = 0
self.direction = direction
self.order_type = order_type
self.status = ORDER_STATUS_PENDING
self.created_time = datetime.now()

5
src/local_position.py Normal file
View File

@ -0,0 +1,5 @@
class LocalPosition:
def __init__(self, code, total_amount, closeable_amount):
self.code = code
self.total_amount = total_amount
self.closeable_amount = closeable_amount

View File

@ -16,7 +16,7 @@ def create_formatter(format_str=None):
return logging.Formatter(format_str)
# 创建服务器日志记录器
def setup_server_logger(log_dir="logs", max_bytes=10*1024*1024, backup_count=5, level=logging.INFO, log_format=None):
def setup_server_logger(log_dir, max_bytes=10*1024*1024, backup_count=5, level=logging.INFO, log_format=None):
"""
创建服务器日志记录器
@ -56,7 +56,7 @@ def setup_server_logger(log_dir="logs", max_bytes=10*1024*1024, backup_count=5,
return logger
# 创建真实交易日志记录器
def setup_real_trader_logger(log_dir="logs", level=logging.INFO, log_format=None):
def setup_real_trader_logger(log_dir, level=logging.INFO, log_format=None):
"""
创建真实交易日志记录器
@ -97,7 +97,7 @@ def setup_real_trader_logger(log_dir="logs", level=logging.INFO, log_format=None
return logger
# 创建模拟交易日志记录器
def setup_simulation_logger(log_dir="logs", max_bytes=10*1024*1024, backup_count=5, level=logging.INFO, log_format=None):
def setup_simulation_logger(log_dir, max_bytes=10*1024*1024, backup_count=5, level=logging.INFO, log_format=None):
"""
创建模拟交易日志记录器
@ -147,7 +147,7 @@ def setup_strategy_logger():
return logging.getLogger('trade_server')
# 根据配置创建合适的日志记录器
def get_logger(module_name, log_dir="logs", level=Config.LOG_LEVEL if hasattr(Config, 'LOG_LEVEL') else logging.INFO):
def get_logger(module_name, log_dir=Config.LOG_DIR, level=Config.LOG_LEVEL if hasattr(Config, 'LOG_LEVEL') else logging.INFO):
"""
获取适合模块的日志记录器

253
src/position_manager.py Normal file
View File

@ -0,0 +1,253 @@
import os
import json
from logger_config import get_logger
from config import Config
from trade_constants import (
ORDER_DIRECTION_BUY,
ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET,
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
)
from local_position import LocalPosition
from local_order import LocalOrder
from t0_stocks import is_t0
from typing import Dict
# 获取日志记录器
logger = get_logger("position_manager")
class PositionManager:
"""实盘策略持仓管理器,负责管理不同策略在实盘环境下的持仓情况"""
def __init__(self, strategy_name="default_strategy"):
"""初始化实盘持仓管理器"""
super().__init__()
self.strategy_name = strategy_name
# 策略持仓信息
self.positions: Dict[str, LocalPosition] = {} # {股票代码 -> LocalPosition}
# 待处理订单信息
self.pending_orders = {} # {order_id -> LocalOrder}
self.data_path = os.path.join(Config.DATA_DIR, self.strategy_name + "_positions.json")
# 确保数据目录存在
os.makedirs(os.path.dirname(self.data_path), exist_ok=True)
# 如果文件不存在,创建一个空文件
if not os.path.exists(self.data_path):
with open(self.data_path, 'w') as f:
f.write('{}')
self.load_data()
def update_position(self, code, direction, amount):
# 如果股票代码在持仓字典中不存在,初始化它
if code not in self.positions:
self.positions[code] = LocalPosition(code, 0, 0)
# 根据方向更新持仓
position = self.positions[code]
is_t0_stock = is_t0(code)
if direction == ORDER_DIRECTION_BUY:
position.total_amount += amount
if is_t0_stock:
position.closeable_amount += amount
else: # sell
position.total_amount -= amount
position.closeable_amount -= amount
logger.info(
f"更新策略持仓 - 策略: {self.strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, "
f"更新后总量: {position.total_amount}, "
f"可用: {position.closeable_amount}"
)
# 移除total_amount为0的持仓
if code in self.positions and self.positions[code].total_amount <= 0:
del self.positions[code]
logger.info(f"移除空持仓 - 策略: {self.strategy_name}, 代码: {code}")
def add_pending_order(
self, order_id, code, price, amount, direction, order_type=ORDER_TYPE_LIMIT
):
if not self.strategy_name:
return
order = LocalOrder(order_id, code, price, amount, direction, order_type)
self.pending_orders[order_id] = order
logger.info(
f"添加订单 - ID: {order_id}, 策略: {self.strategy_name}, 代码: {code}, 方向: {direction}, "
f"数量: {amount}, 价格: {price}, 类型: {order_type}"
)
def update_order_status(self, order_id, filled, new_status):
if order_id in self.pending_orders:
_order = self.pending_orders[order_id]
# 记录之前的状态用于日志
previous_status = _order.status
# 更新状态
_order.status = new_status
_order.filled = filled
# 记录状态变化日志
if previous_status != new_status:
code = self.pending_orders[order_id].code
logger.info(
f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}"
)
# 如果订单已完成,移除它
if new_status in [
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_FAILED,
]:
# 保留订单信息以供参考,但标记为已完成
del self.pending_orders[order_id]
logger.info(f"订单已删除 - ID: {order_id}, 状态: {new_status}")
return True
return False
def get_pending_order(self, order_id) -> LocalOrder:
"""获取未完成委托信息
Args:
order_id: 订单ID
Returns:
dict: 委托信息如果不存在返回None
"""
return self.pending_orders.get(order_id)
def get_pending_orders(self):
"""获取所有未完成委托
Returns:
dict: 订单ID到委托信息的映射
"""
return self.pending_orders
def get_positions(self) -> Dict[str, LocalPosition]:
"""获取策略持仓
Returns:
Dict[str, LocalPosition]:
key为股票代码strvalue为LocalPosition对象若无持仓则返回空字典
"""
return self.positions
def save_data(self):
"""保存策略数据"""
try:
# 将对象转换为可序列化的字典
positions_dict = {}
for code, pos in self.positions.items():
positions_dict[code] = {
"code": pos.code,
"total_amount": pos.total_amount,
"closeable_amount": pos.closeable_amount,
}
pending_orders_dict = {}
for order_id, order in self.pending_orders.items():
pending_orders_dict[order_id] = {
"order_id": order.order_id,
"code": order.code,
"price": order.price,
"amount": order.amount,
"filled": order.filled,
"direction": order.direction,
"order_type": order.order_type,
"status": order.status,
"created_time": (
order.created_time.isoformat()
if hasattr(order, "created_time")
else None
),
}
with open(self.data_path, "w") as f:
json.dump(
{
"positions": positions_dict,
"pending_orders": pending_orders_dict,
},
f,
)
logger.info("成功保存实盘策略数据")
except Exception as e:
logger.error(f"保存实盘策略数据失败: {str(e)}")
def load_data(self):
"""加载策略数据"""
try:
if os.path.exists(self.data_path):
from datetime import datetime
with open(self.data_path, "r") as f:
data = json.load(f)
# 还原positions对象
self.positions = {}
positions_dict = data.get("positions", {})
for code, pos_data in positions_dict.items():
self.positions[code] = LocalPosition(
pos_data["code"],
pos_data["total_amount"],
pos_data["closeable_amount"],
)
# 还原pending_orders对象
self.pending_orders = {}
pending_orders_dict = data.get("pending_orders", {})
for order_id, order_data in pending_orders_dict.items():
order = LocalOrder(
order_data["order_id"],
order_data["code"],
order_data["price"],
order_data["amount"],
order_data["direction"],
order_data["order_type"],
)
order.filled = order_data["filled"]
order.status = order_data["status"]
if order_data.get("created_time"):
try:
order.created_time = datetime.fromisoformat(
order_data["created_time"]
)
except (ValueError, TypeError):
order.created_time = datetime.now()
self.pending_orders[order_id] = order
logger.info("已加载实盘策略数据")
logger.info(f"策略数: {len(self.positions)}")
else:
logger.info(f"实盘策略数据文件不存在: {self.data_path}")
self.positions = {}
self.pending_orders = {}
except Exception as e:
logger.error(f"加载实盘策略数据失败: {str(e)}")
# 初始化空数据结构
self.positions = {}
self.pending_orders = {}
def clear(self):
"""清除所有持仓管理数据"""
self.positions = {}
self.pending_orders = {}
self.save_data()
def update_closeable_amount(self):
"""更新可卖持仓"""
for _, position in self.positions.items():
if position.closeable_amount != position.total_amount:
position.closeable_amount = position.total_amount
def clear_pending_orders(self):
"""清除所有未完成订单"""
self.pending_orders = {}
self.save_data()

10
src/real/__init__.py Normal file
View File

@ -0,0 +1,10 @@
"""
实盘交易模块
此模块提供实盘交易的功能使用xtquant接口连接到实际交易系统
"""
from .xt_trader import XtTrader
from .real_trader_manager import RealTraderManager
__all__ = ['XtTrader', 'RealTraderManager']

View File

@ -0,0 +1,413 @@
import time
import threading
import schedule
from xtquant import xtconstant
from logger_config import get_logger
from config import Config
import json
from typing import Dict
from position_manager import PositionManager
from functools import wraps
from trade_constants import (
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
ORDER_STATUS_PENDING,
ORDER_STATUS_FAILED,
ORDER_STATUS_PARTIAL,
ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL,
ORDER_TYPE_LIMIT,
ORDER_TYPE_MARKET,
)
from real.xt_trader import XtTrader
# 获取日志记录器
logger = get_logger("real_trader_manager")
def run_threaded(func):
@wraps(func)
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return wrapper
class RealTraderManager:
"""实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致"""
def __init__(self, trader: XtTrader):
"""初始化实盘交易管理器
Args:
trader: XtTrader实例
position_manager: StrategyPositionManager实例
"""
# 使用传入的trader和position_manager实例
self.trader = trader
# 启动调度器
self._start_scheduler()
logger.info("实盘交易管理器初始化完成")
def _start_scheduler(self):
# 每日定时清理(增加配置校验)
if hasattr(Config, "STRATEGY_SAVE_TIME"):
try:
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(
run_threaded(self.clean_expired_orders)
)
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(
run_threaded(self.update_closeable_amount)
)
except Exception as e:
logger.error(f"清理任务配置错误: {e}")
else:
logger.error("STRATEGY_SAVE_TIME 未配置")
# 启动高精度调度线程
def run_scheduler():
while True:
try:
schedule.run_pending()
time.sleep(1) # 将休眠时间缩短至1秒提高精度
except Exception as e:
logger.error(f"调度器异常: {e}", exc_info=True)
time.sleep(10) # 发生错误时延长休眠避免日志风暴
scheduler_thread = threading.Thread(
target=run_scheduler, name="SchedulerThread"
)
scheduler_thread.daemon = True # 设为守护线程随主进程退出
scheduler_thread.start()
logger.info("交易管理器调度器已启动")
def place_order(
self, strategy_name, code, direction, amount, price, order_type=ORDER_TYPE_LIMIT
):
"""下单接口,处理买入/卖出请求
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向 'buy''sell'
amount: 交易数量
price: 交易价格市价单时可为0
order_type: 订单类型'limit'表示限价单'market'表示市价单默认为'limit'
Returns:
dict: 包含订单ID和状态信息
"""
if not strategy_name or not code or not direction:
logger.error("下单参数不完整")
return {"success": False, "error": "参数不完整"}
# 检查交易方向
if direction not in [ORDER_DIRECTION_BUY, ORDER_DIRECTION_SELL]:
logger.error(f"无效的交易方向: {direction}")
return {"success": False, "error": "无效的交易方向"}
# 检查订单类型
if order_type not in [ORDER_TYPE_LIMIT, ORDER_TYPE_MARKET]:
logger.error(f"无效的订单类型: {order_type}")
return {
"success": False,
"error": "无效的订单类型,必须是'limit''market'",
}
try:
# 对于限价单,检查资金和持仓是否足够
if order_type == ORDER_TYPE_LIMIT and not self._check_order_feasibility(
code, direction, amount, price
):
logger.warning(
f"资金或持仓不足,忽略订单: {direction} {code} {amount}{price}"
)
return {"success": False, "error": "资金或持仓不足"}
# 下单
logger.info(
f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}"
)
if direction == ORDER_DIRECTION_BUY:
result = self.trader.buy(code, price, amount, order_type)
else:
result = self.trader.sell(code, price, amount, order_type)
order_id = result.get("order_id")
if not order_id:
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 添加未完成委托到position_manager
position_manager = self.trader.get_position_manager(strategy_name)
position_manager.add_pending_order(order_id, code, price, amount, direction, order_type)
logger.info(
f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}"
)
# 立即更新一次订单状态
self.check_and_retry(order_id, strategy_name, code, direction, amount, 1)
return {"success": True, "order_id": order_id}
except Exception as e:
logger.error(f"下单过程发生异常: {str(e)}")
return {"success": False, "error": f"下单异常: {str(e)}"}
def _place_market_order_for_remainder(self, strategy_name, code, direction, left_amount):
"""对未完成的订单进行补单,下市价单
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向
left_amount: 剩余数量
available_retry_count: 重试次数
Returns:
bool: 补单是否成功
"""
if left_amount <= 0:
logger.info(f"无需补单,剩余数量为零或负数: {left_amount}")
return True
logger.info(f"限价单补单: 市价单, 剩余数量={left_amount}")
new_order = self.place_order(strategy_name, code, direction, left_amount, 0, ORDER_TYPE_MARKET)
new_order_id = new_order.get("order_id")
if new_order.get("success") and new_order_id:
# 立即检查新市价单
self.check_and_retry(new_order_id, strategy_name, code, direction, left_amount)
return True
else:
logger.error(f"补单失败: {new_order}")
return False
def check_and_retry(self, order_id, strategy_name, code, direction, amount, available_retry_count=1):
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
filled = order_info.filled
target_amount = order_info.amount
if not order_info:
logger.warning(f"订单信息不存在: ID={order_id}")
return
order_type = order_info.order_type
status = self._update_order_status(order_id, strategy_name)
if order_type == ORDER_TYPE_MARKET:
# 市价单,只递归检查
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
logger.info(f"市价单未完成1分钟后继续检查: ID={order_id}, 状态={status}")
threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name, code, direction, amount)).start()
else:
logger.info(f"市价单已完成: ID={order_id}, 状态={status}")
elif order_type == ORDER_TYPE_LIMIT:
# 限价单,未完成则撤单补市价单
if status in [ORDER_STATUS_PENDING, ORDER_STATUS_PARTIAL]:
if available_retry_count > 0:
logger.info(f"限价单未完成1分钟后继续检查: ID={order_id}, 状态={status}")
threading.Timer(60, self.check_and_retry, args=(order_id, strategy_name, code, direction, amount, 0)).start()
else:
# 尝试撤单
try:
logger.info(f"限价单未完成,尝试撤单: ID={order_id}, 状态={status}")
self.trader.cancel(order_id)
position_manager.update_order_status(order_id, 0, ORDER_STATUS_CANCELLED)
except Exception as e:
logger.error(f"撤单失败: order_id={order_id}, error={str(e)}")
# 计算剩余数量, 如果剩余数量大于0, 则补单
left_amount = target_amount - filled
self._place_market_order_for_remainder(strategy_name, code, direction, left_amount)
else:
logger.info(f"限价单已完成: ID={order_id}, 状态={status}")
else:
logger.warning(f"未知订单类型: ID={order_id}, type={order_type}")
def _update_order_status(self, order_id, strategy_name):
"""更新单个订单状态
Args:
order_id: 订单ID
"""
# 检查订单是否存在
position_manager = self.trader.get_position_manager(strategy_name)
order_info = position_manager.get_pending_order(order_id)
if not order_info:
return None
try:
# 获取订单之前的状态,用于判断是否发生变化
previous_status = order_info.status
previous_volume = order_info.filled
updated_order = self.trader.get_order(order_id)
# 根据委托状态更新订单状态
if updated_order["order_status"] == xtconstant.ORDER_SUCCEEDED:
# 全部成交
filled = updated_order["traded_volume"]
position_manager.update_order_status(order_id, filled, ORDER_STATUS_COMPLETED)
# 更新持仓
position_manager.update_position(
order_info.code,
order_info.direction,
filled,
)
return ORDER_STATUS_COMPLETED
elif updated_order["order_status"] == xtconstant.ORDER_PART_SUCC:
# 部分成交
filled = updated_order.get("traded_volume", 0)
position_manager.update_order_status(
order_id, filled, ORDER_STATUS_PARTIAL
)
# 如果成交量有变化,记录日志并更新持仓
if filled != previous_volume:
target_amount = order_info.amount
logger.info(
f"订单部分成交更新: ID={order_id}, 代码={order_info.code}, 目标数量={target_amount}, 已成交数量={filled}, 剩余数量={target_amount - filled}"
)
# 更新持仓(仅更新已成交部分)
if filled > 0:
position_manager.update_position(
order_info.code,
order_info.direction,
filled,
)
return ORDER_STATUS_PARTIAL
elif updated_order["order_status"] in [
xtconstant.ORDER_CANCELED,
xtconstant.ORDER_JUNK,
]:
# 已撤单或废单
position_manager.update_order_status(
order_id,
0,
ORDER_STATUS_CANCELLED
)
return ORDER_STATUS_CANCELLED
elif updated_order["order_status"] in [
xtconstant.ORDER_UNREPORTED,
xtconstant.ORDER_WAIT_REPORTING,
xtconstant.ORDER_REPORTED,
]:
# 未报、待报、已报
if previous_status != ORDER_STATUS_PENDING:
position_manager.update_order_status(order_id, 0, ORDER_STATUS_PENDING)
return ORDER_STATUS_PENDING
except Exception as e:
logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}")
return None
def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够)
Args:
code: 股票代码
direction: 交易方向
amount: 交易数量
price: 交易价格
Returns:
bool: 订单是否可行
"""
try:
if direction == ORDER_DIRECTION_BUY:
# 检查资金是否足够
balance = self.trader.get_balance()
if not balance:
logger.error("获取账户余额失败")
return False
# 计算所需资金加上3%的手续费作为缓冲)
required_cash = price * amount * 1.03
available_cash = balance.get("cash", 0) - balance.get("frozen_cash", 0)
if required_cash > available_cash:
logger.warning(
f"资金不足: 需要 {required_cash:.2f}, 可用 {available_cash:.2f}"
)
return False
return True
elif direction == "sell":
# 检查持仓是否足够
position = self.trader.get_position(code)
if not position:
logger.warning(f"没有持仓: {code}")
return False
available_volume = position.get("can_use_volume", 0)
if amount > available_volume:
logger.warning(
f"可用持仓不足: 需要 {amount}, 可用 {available_volume}"
)
return False
return True
return False
except Exception as e:
logger.error(f"检查订单可行性时发生异常: {str(e)}")
return False
def clean_expired_orders(self):
"""清理过期的未完成订单"""
try:
logger.info("开始清理过期未完成订单...")
# 获取所有未完成订单
position_managers = self.trader.get_all_position_managers()
# 遍历所有持仓管理器
for position_manager in position_managers.values():
# 获取所有未完成订单
pending_orders = position_manager.get_pending_orders()
# 遍历未完成订单,检查是否有无法成交的订单(如跌停无法卖出)
for order_id, order_info in pending_orders.items():
try:
logger.warning(
f"清理无法成交订单: ID={order_id}, 代码={order_info.code}, 方向={order_info.direction}, "
f"数量={order_info.amount}, 已成交数量={order_info.filled}"
)
except Exception as e:
logger.error(f"清理订单 {order_id} 时出错: {str(e)}")
position_manager.clear_pending_orders()
logger.info("过期未完成订单清理完毕")
except Exception as e:
logger.error(f"清理过期未完成订单时发生异常: {str(e)}")
def update_closeable_amount(self):
"""更新可卖持仓"""
try:
logger.info("开始更新可卖持仓...")
# 获取所有持仓
position_managers = self.trader.get_all_position_managers()
# 遍历持仓,更新可卖持仓
for position_manager in position_managers.values():
position_manager.update_closeable_amount()
logger.info("可卖持仓更新完毕")
except Exception as e:
logger.error(f"更新可卖持仓时发生异常: {str(e)}")

480
src/real/xt_trader.py Normal file
View File

@ -0,0 +1,480 @@
import os
import random
import time
from datetime import datetime
from config import Config
from base_trader import BaseTrader
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
from xtquant import xtconstant
from xtquant.xtdata import get_instrument_detail
from logger_config import get_logger
from utils.mail_util import MailUtil
# 获取日志记录器
logger = get_logger('real_trader')
class MyXtQuantTraderCallback:
def __init__(self, trader_instance):
self.trader_instance = trader_instance
def on_connected(self):
logger.info("连接成功")
def on_disconnected(self):
"""连接断开回调
当交易连接断开时调用会自动尝试重连
如果重连失败将设置连接状态为失败并通过邮件通知
"""
logger.warning("连接断开")
if self.trader_instance:
# 设置连接状态
self.trader_instance.connected = False
self.trader_instance.subscribed = False
# 尝试重连
if not self.trader_instance.reconnect():
logger.error("重连失败")
# 通知重连失败
self.trader_instance.connection_failed = True
self.trader_instance.last_reconnect_time = time.time()
self.trader_instance.notify_connection_failure()
def on_account_status(self, status):
pass
def on_stock_asset(self, asset):
logger.info(f"资金变动: {asset.account_id} {asset.cash} {asset.total_asset}")
def on_stock_order(self, order):
logger.info(f"委托回报: {order.stock_code} {order.order_status} {order.order_sysid}")
def on_stock_trade(self, trade):
logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}")
def on_stock_position(self, position):
logger.info(f"持仓变动: {position.stock_code} {position.volume}")
def on_order_error(self, order_error):
logger.error(f"委托失败: {order_error.order_id} {order_error.error_id} {order_error.error_msg}")
def on_cancel_error(self, cancel_error):
logger.error(f"撤单失败: {cancel_error.order_id} {cancel_error.error_id} {cancel_error.error_msg}")
def on_order_stock_async_response(self, response):
logger.info(f"异步下单反馈: {response.order_id}")
def on_cancel_order_stock_async_response(self, response):
logger.info(f"异步撤单反馈: {response.order_id}")
def on_smt_appointment_async_response(self, response):
logger.info(f"约券异步反馈: {response.seq}")
class XtTrader(BaseTrader):
def __init__(self, connect_failed_callback=None):
super().__init__(logger)
self.started = False
self.connected = False
self.subscribed = False
self._ACCOUNT = Config.XT_ACCOUNT
self._PATH = Config.XT_PATH
self._SESSION_ID = random.randint(Config.XT_MIN_SESSION_ID, Config.XT_MAX_SESSION_ID)
self._account_type = os.environ.get("XT_ACCOUNT_TYPE", "STOCK")
self._strategy_name = os.environ.get("XT_STRATEGY_NAME", "xt_strategy")
self._remark = os.environ.get("XT_REMARK", "remark")
# 重连相关
self.connection_failed = False
self.last_reconnect_time = None
self.reconnect_interval = Config.XT_RECONNECT_INTERVAL
self.connect_failed_callback = connect_failed_callback
self.connection_error_message = None
# 初始化trader
self._callback = MyXtQuantTraderCallback(self)
self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID)
self.account = StockAccount(self._ACCOUNT, self._account_type)
self.xt_trader.register_callback(self._callback)
def is_logged_in(self):
"""检查交易系统是否已经登录
Returns:
bool: True表示已登录False表示未登录
"""
return self.started and self.connected and self.subscribed
def is_available(self):
"""检查交易接口是否可用
Returns:
bool: True表示可用False表示不可用
"""
return self.is_logged_in() and not self.connection_failed
def login(self):
"""尝试登录交易系统
Returns:
bool: 登录是否成功
"""
success = True
try:
if not self.started:
self.xt_trader.start()
self.started = True
if not self.connected:
result = self.xt_trader.connect()
self.connected = (result == 0)
if not self.connected:
success = False
if not self.subscribed and self.connected:
result = self.xt_trader.subscribe(self.account)
self.subscribed = (result == 0)
if not self.subscribed:
success = False
# 登录失败,设置失败状态
if not success:
self.connection_failed = True
self.last_reconnect_time = time.time()
self.notify_connection_failure("登录失败")
return success
except Exception as e:
logger.error(f"登录异常: {str(e)}")
# 设置失败状态
self.connection_failed = True
self.last_reconnect_time = time.time()
self.notify_connection_failure(f"登录异常: {str(e)}")
return False
def logout(self):
if self.started:
self.xt_trader.stop()
self.started = False
self.connected = False
self.subscribed = False
def get_balance(self):
if not self.is_available():
return None
asset = self.xt_trader.query_stock_asset(self.account)
if asset:
return {
"account_id": asset.account_id,
"cash": asset.cash,
"frozen_cash": asset.frozen_cash,
"market_value": asset.market_value,
"total_asset": asset.total_asset
}
return None
def get_positions(self):
if not self.is_available():
return []
positions = self.xt_trader.query_stock_positions(self.account)
if positions:
return [
{
"account_id": p.account_id,
"stock_code": p.stock_code,
"volume": p.volume,
"can_use_volume": p.can_use_volume,
"open_price": p.open_price,
"market_value": p.market_value,
"frozen_volume": p.frozen_volume,
"on_road_volume": p.on_road_volume,
"yesterday_volume": p.yesterday_volume,
"avg_price": p.avg_price
} for p in positions
]
return []
def get_position(self, stock_code):
if not self.is_available():
return None
position = self.xt_trader.query_stock_position(self.account, stock_code)
if position:
return {
"account_id": position.account_id,
"stock_code": position.stock_code,
"volume": position.volume,
"can_use_volume": position.can_use_volume,
"open_price": position.open_price,
"market_value": position.market_value,
"frozen_volume": position.frozen_volume,
"on_road_volume": position.on_road_volume,
"yesterday_volume": position.yesterday_volume,
"avg_price": position.avg_price
}
return None
def get_today_trades(self):
if not self.is_available():
return []
trades = self.xt_trader.query_stock_trades(self.account)
if trades:
return [
{
"account_id": t.account_id,
"stock_code": t.stock_code,
"stock_name": self.get_stock_name(t.stock_code),
"order_id": t.order_id,
"traded_id": t.traded_id,
"traded_time": t.traded_time,
"traded_price": t.traded_price,
"traded_volume": t.traded_volume,
"traded_amount": t.traded_amount,
"trade_type": "buy" if t.order_type == xtconstant.STOCK_BUY else "sell"
} for t in trades
]
return []
def get_today_orders(self):
if not self.is_available():
return []
orders = self.xt_trader.query_stock_orders(self.account)
if orders:
return [
{
"account_id": o.account_id,
"stock_code": o.stock_code,
"order_id": o.order_id,
"order_time": o.order_time,
"order_type": "buy" if o.order_type == xtconstant.STOCK_BUY else "sell",
"order_volume": o.order_volume,
"price_type": self._convert_price_type(o.price_type),
"price": o.price,
"traded_volume": o.traded_volume,
"traded_price": o.traded_price,
"order_status": o.order_status,
"status_msg": o.status_msg
} for o in orders
]
return []
def get_order(self, order_id):
if not self.is_available():
return None
order = self.xt_trader.query_stock_order(self.account, int(order_id))
if order:
return {
"account_id": order.account_id,
"stock_code": order.stock_code,
"order_id": order.order_id,
"order_time": order.order_time,
"order_type": "buy" if order.order_type == xtconstant.STOCK_BUY else "sell",
"order_volume": order.order_volume,
"price_type": self._convert_price_type(order.price_type),
"price": order.price,
"traded_volume": order.traded_volume,
"traded_price": order.traded_price,
"order_status": order.order_status,
"status_msg": order.status_msg
}
return None
def _convert_price_type(self, price_type):
"""Convert numeric price type to readable string"""
price_type_map = {
xtconstant.LATEST_PRICE: "latest_price", # 最新价
xtconstant.FIX_PRICE: "limit_price", # 指定价/限价
xtconstant.MARKET_BEST: "market_best", # 市价最优价
xtconstant.MARKET_CANCEL: "market_cancel", # 市价即成剩撤
xtconstant.MARKET_CANCEL_ALL: "market_cancel_all", # 市价全额成交或撤销
xtconstant.MARKET_PEER_PRICE_FIRST: "market_peer_best", # 对手方最优价格
xtconstant.MARKET_MINE_PRICE_FIRST: "market_mine_best", # 本方最优价格
}
return price_type_map.get(price_type, f"unknown_{price_type}")
def get_stock_name(self, stock_code):
"""获取股票名称
Args:
stock_code: 股票代码例如 "600000.SH"
Returns:
str: 股票名称如果获取失败则返回空字符串
"""
try:
instrument_info = get_instrument_detail(stock_code)
if instrument_info and "InstrumentName" in instrument_info:
return instrument_info["InstrumentName"]
return ""
except Exception as e:
logger.error(f"获取股票名称失败: {stock_code}, {str(e)}")
return ""
def buy(self, code, price, amount, order_type='limit'):
if not self.is_available():
return {"error": self.connection_error_message or "交易系统连接失败"}
# 确定价格类型
price_type = xtconstant.MARKET_BEST if order_type == 'market' else xtconstant.FIX_PRICE
# 如果是市价单价格可以设为0
if price_type != xtconstant.FIX_PRICE:
price = 0
order_id = self.xt_trader.order_stock(
self.account, code, xtconstant.STOCK_BUY, amount, price_type, price, self._strategy_name, self._remark
)
return {"order_id": order_id}
def sell(self, code, price, amount, order_type='limit'):
if not self.is_available():
return {"error": self.connection_error_message or "交易系统连接失败"}
# 确定价格类型
price_type = xtconstant.MARKET_BEST if order_type == 'market' else xtconstant.FIX_PRICE
# 如果是市价单价格可以设为0
if price_type != xtconstant.FIX_PRICE:
price = 0
order_id = self.xt_trader.order_stock(
self.account, code, xtconstant.STOCK_SELL, amount, price_type, price, self._strategy_name, self._remark
)
return {"order_id": order_id}
def cancel(self, order_id):
if not self.is_available():
return {"success": False, "message": self.connection_error_message or "交易系统连接失败"}
# 撤单接口需要订单编号
result = self.xt_trader.cancel_order_stock(self.account, int(order_id))
return {"success": result == 0, "message": f"撤单结果: {result}"}
def get_quote(self, code):
"""获取行情数据
Args:
code: 股票代码
Returns:
dict: 行情数据如果获取失败则返回None
"""
if not self.is_available():
return None
try:
quote = self.xt_trader.query_quote(code)
if quote:
return {
"code": quote.stock_code,
"last": quote.last,
"open": quote.open,
"high": quote.high,
"low": quote.low,
"ask_price": [quote.ask_price1, quote.ask_price2, quote.ask_price3, quote.ask_price4, quote.ask_price5],
"ask_volume": [quote.ask_volume1, quote.ask_volume2, quote.ask_volume3, quote.ask_volume4, quote.ask_volume5],
"bid_price": [quote.bid_price1, quote.bid_price2, quote.bid_price3, quote.bid_price4, quote.bid_price5],
"bid_volume": [quote.bid_volume1, quote.bid_volume2, quote.bid_volume3, quote.bid_volume4, quote.bid_volume5],
}
return None
except Exception as e:
logger.error(f"获取行情失败: {code}, {str(e)}")
return None
def notify_connection_failure(self, message="交易连接断开且重连失败"):
"""通知交易连接失败
Args:
message: 错误信息
"""
self.connection_error_message = f"交易系统连接失败:{message},将在{self.reconnect_interval//60}分钟后自动尝试重连"
# 调用回调通知上层应用
if self.connect_failed_callback:
self.connect_failed_callback()
# 发送邮件通知
trader_info = f"账户:{self._ACCOUNT}会话ID{self._SESSION_ID}"
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = f"[交易系统] 连接失败通知 - {time_str}"
body = f"""
交易系统连接失败请尽快检查
时间{time_str}
{trader_info}
错误信息{message}
系统将在{self.reconnect_interval//60}分钟后自动尝试重连如需立即恢复请手动重启交易系统
"""
MailUtil.send_mail(subject, body)
def reconnect(self):
"""尝试重新连接交易系统
Returns:
bool: 重连是否成功
"""
# 关闭旧连接
if self.started:
self.xt_trader.stop()
self.started = False
self.connected = False
self.subscribed = False
# 尝试范围内的新session_id
start_id = Config.XT_MIN_SESSION_ID
end_id = start_id + Config.XT_SESSION_ID_RANGE
session_id_range = range(start_id, end_id)
for session_id in random.sample(list(session_id_range), len(session_id_range)):
self._SESSION_ID = session_id
self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID)
self._callback = MyXtQuantTraderCallback(self) # 传入self引用
self.xt_trader.register_callback(self._callback)
# 重新连接
self.xt_trader.start()
self.started = True
result = self.xt_trader.connect()
if result == 0:
self.connected = True
result = self.xt_trader.subscribe(self.account)
if result == 0:
self.subscribed = True
logger.info(f"重连成功使用session_id: {self._SESSION_ID}")
# 重置连接失败状态
if self.connection_failed:
self.connection_failed = False
self.connection_error_message = None
# 通知连接已恢复
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = f"[交易系统] 连接恢复通知 - {time_str}"
body = f"""
交易系统连接已恢复
时间{time_str}
账户{self._ACCOUNT}会话ID{self._SESSION_ID}
系统已自动恢复连接交易功能现已正常
"""
MailUtil.send_mail(subject, body)
return True
logger.error("所有尝试都失败,无法重连")
return False
def check_reconnect(self):
"""检查是否需要尝试重连
此方法应在主程序循环中定期调用检查是否需要尝试重连
"""
if (self.connection_failed and
self.last_reconnect_time and
(time.time() - self.last_reconnect_time) > self.reconnect_interval):
logger.info("尝试定期重连...")
if self.reconnect():
logger.info("定期重连成功")
self.connection_failed = False
self.last_reconnect_time = None
else:
logger.warning("定期重连失败")
self.last_reconnect_time = time.time()

View File

@ -1,528 +0,0 @@
import time
import threading
import schedule
from xtquant import xtconstant
from logger_config import get_logger
from config import Config
from strategy_position_manager import StrategyPositionManager
import json
# 获取日志记录器
logger = get_logger('real_trader_manager')
class RealTraderManager:
"""实盘交易管理器,处理实盘下单失败、部分成交等问题,尽量保证仓位与策略信号一致"""
def __init__(self, trader):
"""初始化实盘交易管理器
Args:
trader: XtTrader实例如果为None则自动获取
"""
# 使用传入的trader实例或获取单例
self.trader = trader
# 确保已登录
if not self.trader.is_logged_in():
self.trader.login()
# 不再自己维护pending_orders改用StrategyPositionManager管理
# self.pending_orders = {}
# 启动调度器
self._start_scheduler()
# 记录策略期望持仓状态
# 格式: {strategy_name: {code: target_amount}}
self.strategy_targets = {}
logger.info("实盘交易管理器初始化完成")
def _start_scheduler(self):
"""启动定时任务调度器"""
# 每分钟检查一次未完成订单状态并处理
schedule.every(1).minutes.do(self.check_pending_orders)
# 每天收盘后清理过期未完成订单
schedule.every().day.at(Config.STRATEGY_SAVE_TIME).do(self.clean_expired_orders)
# 启动调度线程
def run_scheduler():
while True:
try:
schedule.run_pending()
time.sleep(10)
except Exception as e:
logger.error(f"调度器运行错误: {str(e)}")
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
logger.info("交易管理器调度器已启动")
def place_order(self, strategy_name, code, direction, amount, price, order_type='limit'):
"""下单接口,处理买入/卖出请求
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向 'buy''sell'
amount: 交易数量
price: 交易价格市价单时可为0
order_type: 订单类型'limit'表示限价单'market'表示市价单默认为'limit'
Returns:
dict: 包含订单ID和状态信息
"""
if not strategy_name or not code or not direction:
logger.error("下单参数不完整")
return {"success": False, "error": "参数不完整"}
# 检查交易方向
if direction not in ['buy', 'sell']:
logger.error(f"无效的交易方向: {direction}")
return {"success": False, "error": "无效的交易方向"}
# 检查订单类型
if order_type not in ['limit', 'market']:
logger.error(f"无效的订单类型: {order_type}")
return {"success": False, "error": "无效的订单类型,必须是'limit''market'"}
try:
# 对于限价单,检查资金和持仓是否足够
if order_type == 'limit' and not self._check_order_feasibility(code, direction, amount, price):
logger.warning(f"资金或持仓不足,忽略订单: {direction} {code} {amount}{price}")
return {"success": False, "error": "资金或持仓不足"}
# 更新策略目标持仓
self._update_strategy_target(strategy_name, code, direction, amount)
# 下单
logger.info(f"准备{direction}订单: 代码={code}, 数量={amount}, 价格={price}, 订单类型={order_type}")
if direction == 'buy':
result = self.trader.buy(code, price, amount, order_type)
else:
result = self.trader.sell(code, price, amount, order_type)
order_id = result.get('order_id')
if not order_id or order_id == 'simulation':
logger.error(f"下单失败: {result}")
return {"success": False, "error": "下单失败"}
# 使用StrategyPositionManager添加未完成委托
StrategyPositionManager.add_pending_order(
self.trader,
order_id,
strategy_name,
code,
price,
amount,
direction,
order_type
)
logger.info(f"已提交订单: ID={order_id}, 策略={strategy_name}, 代码={code}, 方向={direction}, 数量={amount}, 价格={price}, 类型={order_type}")
# 立即更新一次订单状态
self._update_order_status(order_id)
return {"success": True, "order_id": order_id}
except Exception as e:
logger.error(f"下单过程发生异常: {str(e)}")
return {"success": False, "error": f"下单异常: {str(e)}"}
def check_pending_orders(self):
"""检查所有未完成订单状态并处理,定时任务调用"""
try:
logger.info("开始检查未完成订单...")
# 获取所有未完成订单
pending_orders = StrategyPositionManager.get_pending_orders(self.trader)
# 如果没有未完成订单,直接返回
if not pending_orders:
logger.info("没有未完成订单需要检查")
return
# 更新StrategyPositionManager中的未完成委托状态
try:
StrategyPositionManager.update_pending_orders(self.trader)
except Exception as e:
logger.error(f"更新StrategyPositionManager未完成委托状态失败: {str(e)}")
# 获取最新的委托列表
try:
entrusts = self.trader.get_today_entrust()
if entrusts is None:
logger.error("获取今日委托失败,跳过本次检查")
return
entrust_map = {str(e['order_id']): e for e in entrusts}
except Exception as e:
logger.error(f"获取今日委托失败: {str(e)},跳过本次检查")
return
# 检查每个未完成订单
for order_id, order_info in list(pending_orders.items()):
try:
# 跳过已完成的订单
if order_info['status'] in ['completed', 'cancelled', 'failed']:
continue
# 更新订单状态
self._update_order_status(order_id, entrust_map)
# 获取最新的订单信息
order_info = StrategyPositionManager.get_pending_order(self.trader, order_id)
if not order_info:
continue
# 处理超时未成交或部分成交的订单
current_time = time.time()
order_age = current_time - order_info['created_time']
# 如果订单超过配置的超时时间且状态仍为pending或partial
if order_age > Config.RTM_ORDER_TIMEOUT and order_info['status'] in ['pending', 'partial']:
# 记录超时信息
logger.warning(f"订单已超时({order_age:.0f}秒 > {Config.RTM_ORDER_TIMEOUT}秒): ID={order_id}, 代码={order_info['code']}, 状态={order_info['status']}")
# 如果是部分成交,记录详情
if order_info['status'] == 'partial' and 'traded_volume' in order_info:
original = order_info['target_amount']
traded = order_info['traded_volume']
remaining = original - traded
logger.info(f"订单部分成交详情: ID={order_id}, 原始数量={original}, 已成交={traded}, 剩余={remaining}")
self._handle_timeout_order(order_id, order_info)
except Exception as e:
logger.error(f"处理订单 {order_id} 时出错: {str(e)}")
# 同步策略持仓和实际持仓
try:
self._sync_strategy_positions()
except Exception as e:
logger.error(f"同步策略持仓和实际持仓失败: {str(e)}")
logger.info("未完成订单检查完毕")
except Exception as e:
logger.error(f"检查未完成订单时发生异常: {str(e)}")
def _update_order_status(self, order_id, entrust_map=None):
"""更新单个订单状态
Args:
order_id: 订单ID
entrust_map: 可选的委托字典如果为None则重新获取
"""
# 检查订单是否存在
order_info = StrategyPositionManager.get_pending_order(self.trader, order_id)
if not order_info:
return
try:
# 如果没有提供委托字典,则获取当前委托
if entrust_map is None:
entrusts = self.trader.get_today_entrust()
entrust_map = {str(e['order_id']): e for e in entrusts}
# 查找对应的委托记录
entrust = entrust_map.get(str(order_id))
if entrust:
# 获取订单之前的状态,用于判断是否发生变化
previous_status = order_info.get('status')
previous_volume = order_info.get('traded_volume', 0)
# 根据委托状态更新订单状态
if entrust['order_status'] == xtconstant.ORDER_SUCCEEDED:
# 全部成交
StrategyPositionManager.update_order_status(self.trader, order_id, 'completed')
# 日志记录在update_order_status中处理
elif entrust['order_status'] == xtconstant.ORDER_PART_SUCC:
# 部分成交
current_volume = entrust.get('traded_volume', 0)
StrategyPositionManager.update_order_status(
self.trader,
order_id,
'partial',
traded_volume=current_volume
)
# 如果成交量有变化,记录日志
if current_volume != previous_volume:
target_amount = order_info['target_amount']
logger.info(f"订单部分成交更新: ID={order_id}, 代码={entrust['stock_code']}, 目标数量={target_amount}, 已成交数量={current_volume}, 剩余数量={target_amount - current_volume}")
elif entrust['order_status'] in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]:
# 已撤单或废单
StrategyPositionManager.update_order_status(
self.trader,
order_id,
'cancelled',
err_msg=entrust.get('err_msg', '未知原因')
)
elif entrust['order_status'] == xtconstant.ORDER_UNREPORTED:
# 未报
if previous_status != 'pending':
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
elif entrust['order_status'] == xtconstant.ORDER_WAIT_REPORTING:
# 待报
if previous_status != 'pending':
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
elif entrust['order_status'] == xtconstant.ORDER_REPORTED:
# 已报
if previous_status != 'pending':
StrategyPositionManager.update_order_status(self.trader, order_id, 'pending')
else:
# 委托列表中找不到该订单,可能已经太久
current_time = time.time()
if current_time - order_info['created_time'] > 24 * 60 * 60:
previous_status = order_info.get('status')
StrategyPositionManager.update_order_status(self.trader, order_id, 'failed')
logger.warning(f"订单状态未知且过期: ID={order_id}, 旧状态={previous_status}, 新状态=failed, 创建时长={(current_time - order_info['created_time'])/3600:.1f}小时")
except Exception as e:
logger.error(f"更新订单状态时发生异常: order_id={order_id}, error={str(e)}")
def _handle_timeout_order(self, order_id, order_info):
"""处理超时或部分成交的订单
Args:
order_id: 订单ID
order_info: 订单信息字典
"""
try:
# 首先尝试撤单
logger.info(f"尝试撤销超时订单: ID={order_id}, 代码={order_info['code']}, 超时时间={(time.time() - order_info['created_time']):.0f}")
cancel_result = self.trader.cancel(order_id)
# 记录撤单结果
if isinstance(cancel_result, dict):
result_str = json.dumps(cancel_result)
else:
result_str = str(cancel_result)
logger.info(f"撤单结果: ID={order_id}, 结果={result_str}")
# 计算未成交数量
original_amount = order_info['target_amount']
traded_amount = order_info.get('traded_volume', 0)
remaining_amount = original_amount - traded_amount
# 记录详细的成交情况
logger.info(f"订单成交情况: ID={order_id}, 代码={order_info['code']}, 原始数量={original_amount}, 已成交={traded_amount}, 剩余={remaining_amount}")
# 如果有未成交的部分,使用市价单补充交易
if remaining_amount > 0:
# 递增重试计数
new_retry_count = StrategyPositionManager.increment_retry_count(self.trader, order_id)
# 决定是否使用市价单进行补单
use_market_order = Config.RTM_USE_MARKET_ORDER
logger.info(f"准备补充交易: 代码={order_info['code']}, 方向={order_info['direction']}, 补充数量={remaining_amount}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}, 使用市价单={use_market_order}")
# 如果重试次数少于最大重试次数,则进行补单
if new_retry_count <= Config.RTM_MAX_RETRIES:
# 决定使用的订单类型
new_order_type = 'market' if use_market_order else 'limit'
# 对于市价单价格参数可设为0对于限价单使用原价格
new_price = 0 if new_order_type == 'market' else order_info['price']
# 下新订单
new_order = self.place_order(
order_info['strategy_name'],
order_info['code'],
order_info['direction'],
remaining_amount,
new_price,
new_order_type
)
if new_order.get('success'):
logger.info(f"补单成功: 原订单ID={order_id}, 新订单ID={new_order['order_id']}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}, 订单类型={new_order_type}")
else:
logger.error(f"补单失败: 原订单ID={order_id}, 错误={new_order.get('error')}, 代码={order_info['code']}, 方向={order_info['direction']}, 数量={remaining_amount}, 订单类型={new_order_type}")
else:
logger.warning(f"订单重试次数过多,不再尝试: ID={order_id}, 重试次数={new_retry_count}/{Config.RTM_MAX_RETRIES}, 代码={order_info['code']}, 方向={order_info['direction']}, 未成交数量={remaining_amount}")
else:
logger.info(f"订单已全部成交,无需补单: ID={order_id}, 代码={order_info['code']}, 成交数量={traded_amount}")
# 更新原订单状态
previous_status = order_info['status']
StrategyPositionManager.update_order_status(self.trader, order_id, 'cancelled')
logger.info(f"更新原订单状态: ID={order_id}, 旧状态={previous_status}, 新状态=cancelled")
except Exception as e:
logger.error(f"处理超时订单时发生异常: order_id={order_id}, error={str(e)}")
def _check_order_feasibility(self, code, direction, amount, price):
"""检查订单是否可行(资金或持仓是否足够)
Args:
code: 股票代码
direction: 交易方向
amount: 交易数量
price: 交易价格
Returns:
bool: 订单是否可行
"""
try:
if direction == 'buy':
# 检查资金是否足够
balance = self.trader.get_balance()
if not balance:
logger.error("获取账户余额失败")
return False
# 计算所需资金加上3%的手续费作为缓冲)
required_cash = price * amount * 1.03
available_cash = balance.get('cash', 0)
if required_cash > available_cash:
logger.warning(f"资金不足: 需要 {required_cash:.2f}, 可用 {available_cash:.2f}")
return False
return True
elif direction == 'sell':
# 检查持仓是否足够
positions = self.trader.get_positions()
position = next((p for p in positions if p.get('stock_code') == code), None)
if not position:
logger.warning(f"没有持仓: {code}")
return False
available_volume = position.get('can_use_volume', 0)
if amount > available_volume:
logger.warning(f"可用持仓不足: 需要 {amount}, 可用 {available_volume}")
return False
return True
return False
except Exception as e:
logger.error(f"检查订单可行性时发生异常: {str(e)}")
return False
def _update_strategy_target(self, strategy_name, code, direction, amount):
"""更新策略目标持仓
Args:
strategy_name: 策略名称
code: 股票代码
direction: 交易方向
amount: 交易数量
"""
# 确保策略存在于目标字典中
if strategy_name not in self.strategy_targets:
self.strategy_targets[strategy_name] = {}
# 确保股票代码存在于策略目标中
if code not in self.strategy_targets[strategy_name]:
self.strategy_targets[strategy_name][code] = 0
# 根据交易方向更新目标持仓
if direction == 'buy':
self.strategy_targets[strategy_name][code] += amount
else: # sell
self.strategy_targets[strategy_name][code] -= amount
# 避免负数持仓
if self.strategy_targets[strategy_name][code] < 0:
self.strategy_targets[strategy_name][code] = 0
logger.info(f"更新策略目标持仓: 策略={strategy_name}, 代码={code}, 目标持仓={self.strategy_targets[strategy_name][code]}")
def _sync_strategy_positions(self):
"""同步策略持仓和实际持仓"""
try:
# 获取实际持仓
actual_positions = self.trader.get_positions()
if actual_positions is None:
logger.error("获取实际持仓失败,跳过同步")
return
position_map = {p['stock_code']: p for p in actual_positions}
# 如果没有策略目标持仓,直接返回
if not self.strategy_targets:
logger.info("没有策略目标持仓需要同步")
return
# 遍历每个策略的目标持仓
for strategy_name, targets in self.strategy_targets.items():
# 该策略的实际持仓映射
strategy_actual_positions = {}
# 遍历该策略的目标持仓
for code, target_amount in targets.items():
try:
# 获取股票的实际持仓
actual_position = position_map.get(code, {})
actual_amount = actual_position.get('volume', 0)
if actual_amount > 0:
strategy_actual_positions[code] = actual_amount
# 更新策略持仓管理器中的持仓记录
try:
StrategyPositionManager.update_strategy_position(
self.trader,
strategy_name,
code,
'sync', # 使用同步模式
actual_amount
)
except Exception as e:
logger.error(f"更新策略持仓管理器持仓记录失败: {str(e)}")
# 检查是否需要调整持仓
if actual_amount != target_amount:
diff = target_amount - actual_amount
if diff != 0:
logger.warning(f"持仓不一致: 策略={strategy_name}, 代码={code}, 目标={target_amount}, 实际={actual_amount}")
except Exception as e:
logger.error(f"同步股票 {code} 持仓时出错: {str(e)}")
# 记录日志
logger.info(f"策略 {strategy_name} 的目标持仓: {targets}")
logger.info(f"策略 {strategy_name} 的实际持仓: {strategy_actual_positions}")
except Exception as e:
logger.error(f"同步策略持仓时发生异常: {str(e)}")
def clean_expired_orders(self):
"""清理过期的未完成订单"""
# 直接调用StrategyPositionManager的方法
StrategyPositionManager.clean_timeout_orders()
def get_pending_orders(self):
"""获取所有未完成订单
Returns:
list: 未完成订单列表
"""
# 从StrategyPositionManager获取未完成订单
pending_orders = StrategyPositionManager.get_pending_orders(self.trader)
return [{
'order_id': order_id,
**order_info
} for order_id, order_info in pending_orders.items()]
def get_strategy_targets(self):
"""获取策略目标持仓
Returns:
dict: 策略目标持仓
"""
return self.strategy_targets

4
src/settlement_type.py Normal file
View File

@ -0,0 +1,4 @@
from enum import Enum
class SettlementType(Enum):
T0 = 0
T1 = 1

View File

@ -0,0 +1,9 @@
"""
模拟交易模块
此模块提供模拟交易的功能用于在不涉及真实资金的情况下测试交易策略
"""
from .simulation_trader import SimulationTrader
__all__ = ['SimulationTrader']

View File

@ -0,0 +1,193 @@
from logger_config import get_logger
from trade_constants import (
TRADE_TYPE_SIMULATION,
ORDER_DIRECTION_BUY,
ORDER_DIRECTION_SELL,
ORDER_STATUS_COMPLETED,
ORDER_STATUS_CANCELLED,
)
from position_manager import PositionManager
from base_trader import BaseTrader
import random
from typing import Dict
from local_position import LocalPosition
class SimulationTrader(BaseTrader):
def __init__(self, logger=None):
super().__init__(logger)
self.logger = logger or get_logger("simulation_trader")
# 模拟资金账户信息
self.sim_balance = {"account_id": "simulation", "cash": 1000000.00, "frozen": 0.00, "total": 1000000.00}
def is_logged_in(self):
"""检查交易系统是否已经登录
Returns:
bool: True表示已登录模拟交易系统总是返回已登录状态
"""
return True
def login(self):
self.logger.info("模拟交易:登录成功")
return True
def logout(self):
self.logger.info("模拟交易:登出成功")
return True
def buy(self, code, price, amount, strategy_name="default_strategy"):
message = f"模拟买入 - 代码: {code}, 价格: {price}, 数量: {amount}, 策略: {strategy_name}"
self.logger.info(message)
# 计算交易成本
cost = price * amount
# 检查余额是否足够
if self.sim_balance["cash"] < cost:
message = f"模拟买入失败 - 代码: {code}, 资金不足"
self.logger.warning(message)
return {"order_id": None, "message": message, "success": False}
# 更新资金
self.sim_balance["cash"] -= cost
# 更新持仓管理器
position_manager = self.get_position_manager(strategy_name)
position_manager.update_position(
code, ORDER_DIRECTION_BUY, amount
)
order_id = random.randint(1, 999999) # 使用随机函数生成小于1000000的随机整数
position_manager.add_pending_order(
order_id, code, price, amount, ORDER_DIRECTION_BUY
)
# 假设立刻全部成交
position_manager.update_order_status(order_id, amount, ORDER_STATUS_COMPLETED)
# 更新总资产
self._update_total_assets()
return {"order_id": order_id, "message": message, "success": True}
def sell(self, code, price, amount, strategy_name="default_strategy"):
message = f"模拟卖出 - 代码: {code}, 价格: {price}, 数量: {amount}, 策略: {strategy_name}"
self.logger.info(message)
# 获取策略持仓
position_manager = self.get_position_manager(strategy_name)
strategy_positions = position_manager.get_positions()
# 检查持仓是否足够
if (
code not in strategy_positions
or strategy_positions[code].closeable_amount < amount
):
message = f"模拟卖出失败 - 代码: {code}, 可用数量不足"
self.logger.warning(message)
return {"order_id": None, "message": message, "success": False}
# 更新资金
proceeds = price * amount
self.sim_balance["cash"] += proceeds
# 更新持仓管理器
position_manager.update_position(
code, ORDER_DIRECTION_SELL, amount
)
order_id = random.randint(1, 999999) # 使用随机函数生成小于1000000的随机整数
position_manager.add_pending_order(
order_id, code, price, amount, ORDER_DIRECTION_SELL
)
# 假设立刻全部成交
position_manager.update_order_status(order_id, amount, ORDER_STATUS_COMPLETED)
# 更新总资产
self._update_total_assets()
return {"order_id": order_id, "message": message, "success": True}
def _update_total_assets(self):
"""更新总资产"""
# 此处简化处理,在实际情况中应该计算所有持仓的市值
self.sim_balance["total"] = self.sim_balance["cash"]
def cancel(self, order_id):
message = f"模拟撤单 - 委托号: {order_id}"
self.logger.info(message)
position_managers = self.get_all_position_managers()
for position_manager in position_managers.values():
if order_id in position_manager.pending_orders:
position_manager.update_order_status(order_id, 0, ORDER_STATUS_CANCELLED)
return {"order_id": "order_id", "message": message, "success": True}
else:
return {"order_id": None, "message": "订单不存在", "success": False}
def get_balance(self):
message = "模拟交易:查询余额"
self.logger.info(message)
return self.sim_balance
def get_positions(self):
message = "模拟交易:查询持仓"
self.logger.info(message)
position_managers = self.get_all_position_managers()
positions: Dict[str, LocalPosition] = {}
for position_manager in position_managers.values():
positions.update(position_manager.get_positions())
# convert to json list
return [{"account_id": "simulation", "code": position.code, "total_amount": position.total_amount, "closeable_amount": position.closeable_amount} for position in positions.values()]
def get_today_trades(self):
message = "模拟交易:查询今日成交"
self.logger.info(message)
return {"message": "模拟交易:查询今日成交未实现", "success": True}
def get_today_orders(self):
message = "模拟交易:查询今日委托"
self.logger.info(message)
return {"message": "模拟交易:查询今日委托未实现", "success": True}
def is_trading_time(self):
return True
def get_position(self, stock_code, strategy_name="default_strategy"):
"""查询指定股票代码的持仓信息
Args:
stock_code: 股票代码例如 "600000.SH"
strategy_name: 策略名称默认为"default_strategy"
Returns:
dict: 持仓详情如果未持有则返回None
"""
position_manager = self.get_position_manager(strategy_name)
positions = position_manager.get_positions()
if stock_code in positions:
position_info = positions[stock_code]
return {
"account_id": "simulation",
"code": stock_code,
"strategy_name": strategy_name,
"total_amount": position_info.total_amount,
"closeable_amount": position_info.closeable_amount,
}
return None
def get_order(self, order_id):
position_managers = self.get_all_position_managers()
for position_manager in position_managers.values():
if order_id in position_manager.pending_orders:
order_info = position_manager.pending_orders[order_id]
return {
"order_id": order_id,
"stock_code": order_info.stock_code,
"price": order_info.price,
"amount": order_info.amount,
"direction": order_info.direction,
"status": order_info.status,
"strategy_name": order_info.strategy_name,
}
return None
def is_trading_time(self):
return True

View File

@ -1,128 +0,0 @@
from logger_config import get_logger
class SimulationTrader:
def __init__(self, logger=None):
self.logger = logger or get_logger('simulation_trader')
# 添加模拟持仓字典,用于追踪模拟交易的持仓
self.sim_positions = {}
# 模拟资金账户信息
self.sim_balance = {"cash": 1000000.00, "frozen": 0.00, "total": 1000000.00}
def is_logged_in(self):
"""检查交易系统是否已经登录
Returns:
bool: True表示已登录模拟交易系统总是返回已登录状态
"""
return True
def login(self):
self.logger.info("模拟交易:登录成功")
return True
def logout(self):
self.logger.info("模拟交易:登出成功")
return True
def buy(self, code, price, amount):
message = f"模拟买入 - 代码: {code}, 价格: {price}, 数量: {amount}"
self.logger.info(message)
# 更新模拟持仓
if code not in self.sim_positions:
self.sim_positions[code] = {
"stock_code": code,
"volume": 0,
"can_use_volume": 0,
"frozen_volume": 0,
"avg_price": 0.0,
"market_value": 0.0
}
# 计算新的平均成本
current_cost = self.sim_positions[code]["avg_price"] * self.sim_positions[code]["volume"]
new_cost = price * amount
total_volume = self.sim_positions[code]["volume"] + amount
# 更新持仓信息
self.sim_positions[code]["volume"] += amount
self.sim_positions[code]["can_use_volume"] += amount
self.sim_positions[code]["avg_price"] = (current_cost + new_cost) / total_volume if total_volume > 0 else 0
self.sim_positions[code]["market_value"] = self.sim_positions[code]["volume"] * price
# 更新资金
self.sim_balance["cash"] -= price * amount
self.sim_balance["total"] = self.sim_balance["cash"] + sum(pos["market_value"] for pos in self.sim_positions.values())
return {"order_id": "simulation", "message": message}
def sell(self, code, price, amount):
message = f"模拟卖出 - 代码: {code}, 价格: {price}, 数量: {amount}"
self.logger.info(message)
# 更新模拟持仓
if code in self.sim_positions:
# 确保可用数量足够
if self.sim_positions[code]["can_use_volume"] >= amount:
# 更新持仓信息
self.sim_positions[code]["volume"] -= amount
self.sim_positions[code]["can_use_volume"] -= amount
self.sim_positions[code]["market_value"] = self.sim_positions[code]["volume"] * price
# 如果持仓为0删除该股票
if self.sim_positions[code]["volume"] <= 0:
del self.sim_positions[code]
# 更新资金
self.sim_balance["cash"] += price * amount
self.sim_balance["total"] = self.sim_balance["cash"] + sum(pos["market_value"] for pos in self.sim_positions.values())
else:
message = f"模拟卖出失败 - 代码: {code}, 可用数量不足"
self.logger.warning(message)
else:
message = f"模拟卖出失败 - 代码: {code}, 无持仓"
self.logger.warning(message)
return {"order_id": "simulation", "message": message}
def cancel(self, entrust_no):
message = f"模拟撤单 - 委托号: {entrust_no}"
self.logger.info(message)
return {"order_id": "simulation", "message": message}
def get_balance(self):
message = "模拟交易:查询余额"
self.logger.info(message)
return self.sim_balance
def get_positions(self):
message = "模拟交易:查询持仓"
self.logger.info(message)
# 返回与XtTrader格式一致的持仓数据
return [
{
"account_id": "simulation",
"stock_code": code,
"volume": pos["volume"],
"can_use_volume": pos["can_use_volume"],
"open_price": pos["avg_price"],
"avg_price": pos["avg_price"],
"market_value": pos["market_value"],
"frozen_volume": pos["frozen_volume"],
"on_road_volume": 0
} for code, pos in self.sim_positions.items()
]
def get_today_trades(self):
message = "模拟交易:查询今日成交"
self.logger.info(message)
return []
def get_today_entrust(self):
message = "模拟交易:查询今日委托"
self.logger.info(message)
return []
def is_trading_time(self):
return True

View File

@ -1,438 +0,0 @@
import time
import os
import json
from simulation_trader import SimulationTrader
from xtquant import xtconstant
from logger_config import get_logger
# 获取日志记录器
logger = get_logger('strategy')
# 策略仓位管理
strategy_positions = {
'real': {}, # 存储实盘策略持仓
'simulation': {} # 存储模拟交易策略持仓
}
strategy_trades = {
'real': {}, # 存储实盘策略交易记录
'simulation': {} # 存储模拟交易策略交易记录
}
pending_orders = {
'real': {}, # 存储实盘未完成委托
'simulation': {} # 存储模拟交易未完成委托
}
class StrategyPositionManager:
"""策略持仓管理器,负责管理不同策略的持仓情况"""
@staticmethod
def get_trader_type(trader):
"""根据交易实例确定交易类型
Args:
trader: 交易实例
Returns:
str: 'simulation''real'
"""
return 'simulation' if isinstance(trader, SimulationTrader) else 'real'
@staticmethod
def update_strategy_position(trader, strategy_name, code, direction, amount):
"""更新策略持仓
Args:
trader: 交易实例
strategy_name: 策略名称
code: 股票代码
direction: 'buy''sell'
amount: 交易数量
"""
if not strategy_name:
return
# 判断交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 确保策略在字典中
if strategy_name not in strategy_positions[trader_type]:
strategy_positions[trader_type][strategy_name] = {}
try:
# 获取交易实例持仓情况
actual_positions = trader.get_positions()
code_position = next((pos for pos in actual_positions if pos.get('stock_code') == code), None)
# 记录实际持仓总量
actual_total = code_position.get('volume', 0) if code_position else 0
actual_can_use = code_position.get('can_use_volume', 0) if code_position else 0
logger.info(f"实际持仓 - 代码: {code}, 总量: {actual_total}, 可用: {actual_can_use}")
# 如果股票代码在持仓字典中不存在,初始化它
if code not in strategy_positions[trader_type][strategy_name]:
strategy_positions[trader_type][strategy_name][code] = {
'total_amount': 0,
'closeable_amount': 0
}
# 直接使用实际持仓数据更新策略持仓
strategy_positions[trader_type][strategy_name][code]['total_amount'] = actual_total
strategy_positions[trader_type][strategy_name][code]['closeable_amount'] = actual_can_use
logger.info(f"更新策略持仓 - 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}, 数量: {amount}, 总量: {strategy_positions[trader_type][strategy_name][code]['total_amount']}, 可用: {strategy_positions[trader_type][strategy_name][code]['closeable_amount']}")
except Exception as e:
logger.error(f"获取实际持仓失败: {str(e)}")
# 异常情况下只记录错误,不尝试更新持仓
# 移除total_amount为0的持仓
if code in strategy_positions[trader_type][strategy_name] and strategy_positions[trader_type][strategy_name][code]['total_amount'] <= 0:
del strategy_positions[trader_type][strategy_name][code]
@staticmethod
def update_pending_orders(trader):
"""更新未完成委托状态
Args:
trader: 交易实例
"""
try:
# 判断当前交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 获取今日委托
today_entrusts = trader.get_today_entrust()
# 更新委托状态
for order_id, order_info in list(pending_orders[trader_type].items()):
entrust = next((e for e in today_entrusts if e.get('order_id') == order_id), None)
if entrust:
if entrust.get('order_status') in [xtconstant.ORDER_SUCCEEDED, xtconstant.ORDER_PART_SUCC]:
# 成交量计算
traded_amount = int(entrust.get('traded_volume', 0))
# 更新策略持仓
StrategyPositionManager.update_strategy_position(
trader,
order_info['strategy_name'],
order_info['code'],
order_info['direction'],
traded_amount
)
# 如果完全成交,从待处理列表中移除
if entrust.get('order_status') == xtconstant.ORDER_SUCCEEDED:
del pending_orders[trader_type][order_id]
# 如果已撤单、废单等终态,也从待处理列表中移除
elif entrust.get('order_status') in [xtconstant.ORDER_CANCELED, xtconstant.ORDER_JUNK]:
del pending_orders[trader_type][order_id]
except Exception as e:
logger.error(f"更新未完成委托状态失败: {str(e)}")
@staticmethod
def add_pending_order(trader, order_id, strategy_name, code, price, amount, direction, order_type='limit'):
"""添加未完成委托
Args:
trader: 交易实例
order_id: 委托编号
strategy_name: 策略名称
code: 股票代码
price: 委托价格
amount: 委托数量
direction: 交易方向'buy''sell'
order_type: 订单类型'limit''market'默认为'limit'
"""
if not order_id or order_id == 'simulation':
return
# 判断当前交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 添加到未完成委托列表
pending_orders[trader_type][order_id] = {
'strategy_name': strategy_name,
'code': code,
'price': price,
'amount': amount,
'direction': direction,
'created_time': time.time(),
'target_amount': amount,
'status': 'pending',
'last_check_time': time.time(),
'retry_count': 0,
'order_type': order_type
}
# 同时记录到交易历史
if strategy_name:
if strategy_name not in strategy_trades[trader_type]:
strategy_trades[trader_type][strategy_name] = []
strategy_trades[trader_type][strategy_name].append({
'time': time.strftime('%Y-%m-%d %H:%M:%S'),
'type': direction,
'code': code,
'price': price,
'amount': amount,
'order_id': order_id,
'status': 'pending'
})
logger.info(f"添加未完成委托: {order_id}, 交易类型: {trader_type}, 策略: {strategy_name}, 代码: {code}, 方向: {direction}")
@staticmethod
def get_pending_orders(trader):
"""获取指定交易类型的所有未完成委托
Args:
trader: 交易实例
Returns:
dict: 未完成委托字典以order_id为键
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
return pending_orders[trader_type]
@staticmethod
def get_pending_order(trader, order_id):
"""获取指定订单信息
Args:
trader: 交易实例
order_id: 订单ID
Returns:
dict: 订单信息字典如果不存在则返回None
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
return pending_orders[trader_type].get(order_id)
@staticmethod
def update_order_status(trader, order_id, new_status, **additional_data):
"""更新订单状态
Args:
trader: 交易实例
order_id: 订单ID
new_status: 新状态
additional_data: 附加数据如成交量重试次数等
Returns:
bool: 是否成功更新
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
# 记录之前的状态用于日志
previous_status = pending_orders[trader_type][order_id].get('status')
# 更新状态和最后检查时间
pending_orders[trader_type][order_id]['status'] = new_status
pending_orders[trader_type][order_id]['last_check_time'] = time.time()
# 更新附加数据
for key, value in additional_data.items():
pending_orders[trader_type][order_id][key] = value
# 记录状态变化日志
if previous_status != new_status:
code = pending_orders[trader_type][order_id].get('code')
logger.info(f"订单状态变化: ID={order_id}, 代码={code}, 旧状态={previous_status}, 新状态={new_status}")
return True
return False
@staticmethod
def increment_retry_count(trader, order_id):
"""增加订单重试次数
Args:
trader: 交易实例
order_id: 订单ID
Returns:
int: 新的重试次数如果订单不存在则返回-1
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
current = pending_orders[trader_type][order_id].get('retry_count', 0)
pending_orders[trader_type][order_id]['retry_count'] = current + 1
return current + 1
return -1
@staticmethod
def remove_pending_order(trader, order_id):
"""移除未完成委托
Args:
trader: 交易实例
order_id: 订单ID
Returns:
bool: 是否成功移除
"""
trader_type = StrategyPositionManager.get_trader_type(trader)
if order_id in pending_orders[trader_type]:
del pending_orders[trader_type][order_id]
return True
return False
@staticmethod
def clean_timeout_orders():
"""清理超时委托"""
current_time = time.time()
# 遍历实盘和模拟两种类型的委托
for trader_type in ['real', 'simulation']:
for order_id, order_info in list(pending_orders[trader_type].items()):
# 超过24小时的委托视为超时
if current_time - order_info['created_time'] > 24 * 60 * 60:
del pending_orders[trader_type][order_id]
logger.warning(f"清理超时委托: ID={order_id}, 状态={order_info.get('status')}")
@staticmethod
def load_strategy_data():
"""加载策略数据"""
global strategy_positions, strategy_trades, pending_orders
try:
if os.path.exists('strategy_data.json'):
with open('strategy_data.json', 'r') as f:
data = json.load(f)
# 直接使用新版数据结构,不再兼容旧版格式
strategy_positions = data.get('positions', {'real': {}, 'simulation': {}})
strategy_trades = data.get('trades', {'real': {}, 'simulation': {}})
pending_orders = data.get('pending_orders', {'real': {}, 'simulation': {}})
# 确保数据结构完整
if 'real' not in strategy_positions:
strategy_positions['real'] = {}
if 'simulation' not in strategy_positions:
strategy_positions['simulation'] = {}
if 'real' not in strategy_trades:
strategy_trades['real'] = {}
if 'simulation' not in strategy_trades:
strategy_trades['simulation'] = {}
if 'real' not in pending_orders:
pending_orders['real'] = {}
if 'simulation' not in pending_orders:
pending_orders['simulation'] = {}
logger.info("已加载策略数据")
logger.info(f"实盘策略数: {len(strategy_positions['real'])}, 模拟策略数: {len(strategy_positions['simulation'])}")
except Exception as e:
logger.error(f"加载策略数据失败: {str(e)}")
# 初始化空数据结构
strategy_positions = {'real': {}, 'simulation': {}}
strategy_trades = {'real': {}, 'simulation': {}}
pending_orders = {'real': {}, 'simulation': {}}
@staticmethod
def save_strategy_data():
"""保存策略数据"""
try:
with open('strategy_data.json', 'w') as f:
json.dump({
'positions': strategy_positions,
'trades': strategy_trades,
'pending_orders': pending_orders
}, f)
except Exception as e:
logger.error(f"保存策略数据失败: {str(e)}")
@staticmethod
def get_strategy_positions(trader, strategy_name=None):
"""获取策略持仓
Args:
trader: 交易实例
strategy_name: 策略名称如果为None返回所有持仓
Returns:
如果strategy_name为None返回交易实例的所有持仓
否则返回指定策略的持仓
"""
# 判断当前交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 如果指定了策略名称,返回该策略的持仓
if strategy_name:
# 获取真实账户持仓,用于计算可交易量
real_positions = trader.get_positions()
real_positions_map = {}
for pos in real_positions:
# 使用xt_trader返回的字段名
if 'stock_code' in pos and 'can_use_volume' in pos:
real_positions_map[pos['stock_code']] = pos
# 如果该策略没有记录,返回空列表
if strategy_name not in strategy_positions[trader_type]:
logger.info(f"Strategy {strategy_name} has no positions in {trader_type} mode")
return []
# 合并策略持仓和真实持仓的可交易量
result = []
for code, pos_info in strategy_positions[trader_type][strategy_name].items():
# 忽略total_amount为0的持仓
if pos_info['total_amount'] <= 0:
continue
# 使用真实账户的可交易量作为策略的可交易量上限
real_pos = real_positions_map.get(code, {})
closeable = min(pos_info['total_amount'], real_pos.get('can_use_volume', 0))
result.append({
code: {
'total_amount': pos_info['total_amount'],
'closeable_amount': closeable
}
})
logger.info(f"Strategy {strategy_name} positions in {trader_type} mode: {result}")
return result
# 否则返回原始持仓
positions = trader.get_positions()
logger.info(f"Positions in {trader_type} mode: {positions}")
return positions
@staticmethod
def clear_strategy(trader, strategy_name):
"""清除指定策略的持仓管理数据
Args:
trader: 交易实例
strategy_name: 策略名称
Returns:
tuple: (success, message)
success: 是否成功清除
message: 提示信息
"""
if not strategy_name:
return False, "缺少策略名称参数"
# 判断当前交易类型
trader_type = StrategyPositionManager.get_trader_type(trader)
# 检查策略是否存在于当前交易类型中
if strategy_name in strategy_positions[trader_type]:
# 从策略持仓字典中删除该策略
del strategy_positions[trader_type][strategy_name]
# 清除该策略的交易记录
if strategy_name in strategy_trades[trader_type]:
del strategy_trades[trader_type][strategy_name]
# 清除与该策略相关的未完成委托
for order_id, order_info in list(pending_orders[trader_type].items()):
if order_info.get('strategy_name') == strategy_name:
del pending_orders[trader_type][order_id]
# 保存更新后的策略数据
StrategyPositionManager.save_strategy_data()
logger.info(f"成功清除策略持仓数据: {strategy_name} (交易类型: {trader_type})")
return True, f"成功清除策略 '{strategy_name}' 的持仓数据 (交易类型: {trader_type})"
else:
logger.info(f"策略不存在或没有持仓数据: {strategy_name} (交易类型: {trader_type})")
return True, f"策略 '{strategy_name}' 不存在或没有持仓数据 (交易类型: {trader_type})"

117
src/t0_stocks.py Normal file
View File

@ -0,0 +1,117 @@
# 读取所有ETF文件 /resources/grouped_etf.json
import json
import os
from typing import List
def get_all_t0() -> List[str]:
"""
读取/resources/grouped_etf.json文件获取所有T+0交易的ETF代码
除了"其他ETF"分类外其余都是T+0
Returns:
List[str]: 所有T+0交易的ETF代码列表
"""
# 获取当前文件所在目录的路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建json文件的绝对路径
json_path = os.path.join(os.path.dirname(current_dir), 'resources', 'grouped_etf.json')
# 读取json文件
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 初始化结果列表
t0_stocks = []
# 遍历所有分类
for category, stocks in data.items():
# 跳过"其他ETF"分类
if category == "其他ETF":
continue
# 将当前分类下的所有股票代码添加到结果列表中
for stock in stocks:
t0_stocks.append(stock["code"])
return t0_stocks
def normalize_stock_code(stock: str) -> str:
"""
标准化股票代码格式
Args:
stock (str): 可能是"123456.XSHE""123456.SH""123456"格式的股票代码
Returns:
str: 标准化后的股票代码格式为"123456.XSHE""123456.XSHG"
"""
if '.' not in stock:
# 如果没有后缀,则根据第一位数字判断交易所
code = stock.strip()
if code[0] in ['0', '3']: # 深交所
return f"{code}.XSHE"
else: # 上交所
return f"{code}.XSHG"
else:
# 已经有后缀,判断是否需要转换
code, exchange = stock.split('.')
if exchange.upper() in ['SH', 'XSHG']:
return f"{code}.XSHG"
elif exchange.upper() in ['SZ', 'XSHE']:
return f"{code}.XSHE"
else:
# 已经是标准格式或其他格式,直接返回
return stock
def is_t0(stock: str) -> bool:
"""
判断给定的股票代码是否属于T+0交易的ETF
Args:
stock (str): 股票代码可能是"123456.XSHE""123456.SH""123456"格式
Returns:
bool: 如果是T+0交易的ETF则返回True否则返回False
"""
# 获取所有T+0股票列表
t0_list = get_all_t0()
# 标准化输入的股票代码
normalized_stock = normalize_stock_code(stock)
# 判断标准化后的代码是否在T+0列表中
return normalized_stock in t0_list
def get_hk_t0() -> List[str]:
"""
获取所有T+0交易的香港ETF代码
Returns:
List[str]: 所有T+0交易的香港ETF代码列表
"""
# 获取当前文件所在目录的路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建json文件的绝对路径
json_path = os.path.join(os.path.dirname(current_dir), 'resources', 'grouped_etf.json')
# 读取json文件
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 初始化结果列表
hk_t0_stocks = []
# 检查"港股ETF"分类是否存在
if "港股ETF" in data:
# 获取所有港股ETF
for stock in data["港股ETF"]:
hk_t0_stocks.append(stock["code"])
return hk_t0_stocks
if __name__ == "__main__":
print(get_hk_t0())

19
src/trade_constants.py Normal file
View File

@ -0,0 +1,19 @@
# 交易常量
TRADE_TYPE_REAL = 'real'
TRADE_TYPE_SIMULATION = 'simulation'
# 订单状态
ORDER_STATUS_PENDING = 'pending'
ORDER_STATUS_PARTIAL = 'partial'
ORDER_STATUS_COMPLETED = 'completed'
ORDER_STATUS_CANCELLED = 'cancelled'
ORDER_STATUS_FAILED = 'failed'
# 订单类型
ORDER_TYPE_LIMIT = 'limit'
ORDER_TYPE_MARKET = 'market'
# 订单方向
ORDER_DIRECTION_BUY = 'buy'
ORDER_DIRECTION_SELL = 'sell'

File diff suppressed because it is too large Load Diff

1
src/trade_tools.py Normal file
View File

@ -0,0 +1 @@

4
src/utils/__init__.py Normal file
View File

@ -0,0 +1,4 @@
"""
工具模块
包含各种辅助功能
"""

68
src/utils/mail_util.py Normal file
View File

@ -0,0 +1,68 @@
import smtplib
import logging
import sys
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.utils import formataddr
# 添加项目根目录到sys.path
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from config import Config
from logger_config import get_logger
# 获取正确配置的日志记录器
logger = get_logger('mail_util')
class MailUtil:
@staticmethod
def send_mail(subject, body, recipients=None):
"""发送邮件
Args:
subject: 邮件主题
body: 邮件内容
recipients: 收件人列表如果为None则使用配置中的默认收件人
Returns:
bool: 发送是否成功
"""
if not Config.MAIL_ENABLED:
logger.info("邮件通知未启用")
return False
recipients = recipients or Config.MAIL_TO
if not recipients:
logger.warning("未配置收件人")
return False
try:
msg = MIMEMultipart()
msg['From'] = formataddr((Config.MAIL_FROM, Config.MAIL_USERNAME))
msg['To'] = ', '.join(recipients)
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP_SSL(Config.MAIL_SERVER, Config.MAIL_PORT)
if Config.MAIL_USERNAME and Config.MAIL_PASSWORD:
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
server.send_message(msg)
server.quit()
logger.info(f"邮件发送成功: {subject}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {str(e)}")
return False
if __name__ == "__main__":
# 测试邮件发送
result = MailUtil.send_mail("测试邮件", "这是一封测试邮件")

View File

@ -1,273 +0,0 @@
import os
import random
from config import Config
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
from xtquant import xtconstant
from xtquant.xtdata import get_instrument_detail, get_trading_time
import datetime as dt
from chinese_calendar import is_workday
from logger_config import get_logger
# 获取日志记录器
logger = get_logger('real_trader')
class MyXtQuantTraderCallback:
def on_connected(self):
logger.info("连接成功")
def on_disconnected(self):
logger.warning("连接断开")
def on_account_status(self, status):
logger.info(f"账号状态: {status.account_id} {status.status}")
def on_stock_asset(self, asset):
logger.info(f"资金变动: {asset.account_id} {asset.cash} {asset.total_asset}")
def on_stock_order(self, order):
logger.info(f"委托回报: {order.stock_code} {order.order_status} {order.order_sysid}")
def on_stock_trade(self, trade):
logger.info(f"成交变动: {trade.account_id} {trade.stock_code} {trade.order_id}")
def on_stock_position(self, position):
logger.info(f"持仓变动: {position.stock_code} {position.volume}")
def on_order_error(self, order_error):
logger.error(f"委托失败: {order_error.order_id} {order_error.error_id} {order_error.error_msg}")
def on_cancel_error(self, cancel_error):
logger.error(f"撤单失败: {cancel_error.order_id} {cancel_error.error_id} {cancel_error.error_msg}")
def on_order_stock_async_response(self, response):
logger.info(f"异步下单反馈: {response.order_id}")
def on_cancel_order_stock_async_response(self, response):
logger.info(f"异步撤单反馈: {response.order_id}")
def on_smt_appointment_async_response(self, response):
logger.info(f"约券异步反馈: {response.seq}")
class XtTrader:
def __init__(self):
self._ACCOUNT = Config.XT_ACCOUNT
self._PATH = Config.XT_PATH
self._SESSION_ID = random.randint(100000, 99999999)
self._account_type = os.environ.get("XT_ACCOUNT_TYPE", "STOCK")
self._strategy_name = os.environ.get("XT_STRATEGY_NAME", "xt_strategy")
self._remark = os.environ.get("XT_REMARK", "remark")
self._callback = MyXtQuantTraderCallback()
self.xt_trader = XtQuantTrader(self._PATH, self._SESSION_ID)
self.account = StockAccount(self._ACCOUNT, self._account_type)
self.xt_trader.register_callback(self._callback)
self.started = False
self.connected = False
self.subscribed = False
def is_logged_in(self):
"""检查交易系统是否已经登录
Returns:
bool: True表示已登录False表示未登录
"""
return self.started and self.connected and self.subscribed
def login(self):
if not self.started:
self.xt_trader.start()
self.started = True
if not self.connected:
result = self.xt_trader.connect()
self.connected = (result == 0)
if not self.subscribed:
result = self.xt_trader.subscribe(self.account)
self.subscribed = (result == 0)
return self.connected and self.subscribed
def logout(self):
if self.started:
self.xt_trader.stop()
self.started = False
self.connected = False
self.subscribed = False
def get_balance(self):
asset = self.xt_trader.query_stock_asset(self.account)
if asset:
return {
"account_id": asset.account_id,
"cash": asset.cash,
"frozen_cash": asset.frozen_cash,
"market_value": asset.market_value,
"total_asset": asset.total_asset
}
return None
def get_positions(self):
positions = self.xt_trader.query_stock_positions(self.account)
if positions:
return [
{
"account_id": p.account_id,
"stock_code": p.stock_code,
"volume": p.volume,
"can_use_volume": p.can_use_volume,
"open_price": p.open_price,
"market_value": p.market_value,
"frozen_volume": p.frozen_volume,
"on_road_volume": p.on_road_volume,
"yesterday_volume": p.yesterday_volume,
"avg_price": p.avg_price
} for p in positions
]
return []
def get_today_trades(self):
trades = self.xt_trader.query_stock_trades(self.account)
if trades:
return [
{
"account_id": t.account_id,
"stock_code": t.stock_code,
"stock_name": get_instrument_detail(t.stock_code)["InstrumentName"] if get_instrument_detail(t.stock_code) else "",
"order_id": t.order_id,
"traded_id": t.traded_id,
"traded_time": t.traded_time,
"traded_price": t.traded_price,
"traded_volume": t.traded_volume,
"traded_amount": t.traded_amount,
"trade_type": "buy" if t.order_type == xtconstant.STOCK_BUY else "sell"
} for t in trades
]
return []
def get_today_entrust(self):
orders = self.xt_trader.query_stock_orders(self.account)
if orders:
return [
{
"account_id": o.account_id,
"stock_code": o.stock_code,
"order_id": o.order_id,
"order_time": o.order_time,
"order_type": "buy" if o.order_type == xtconstant.STOCK_BUY else "sell",
"order_volume": o.order_volume,
"price_type": self._convert_price_type(o.price_type),
"price": o.price,
"traded_volume": o.traded_volume,
"traded_price": o.traded_price,
"order_status": o.order_status,
"status_msg": o.status_msg
} for o in orders
]
return []
def _convert_price_type(self, price_type):
"""Convert numeric price type to readable string"""
price_type_map = {
xtconstant.LATEST_PRICE: "latest_price", # 最新价
xtconstant.FIX_PRICE: "limit_price", # 指定价/限价
xtconstant.MARKET_BEST: "market_best", # 市价最优价
xtconstant.MARKET_CANCEL: "market_cancel", # 市价即成剩撤
xtconstant.MARKET_CANCEL_ALL: "market_cancel_all", # 市价全额成交或撤销
xtconstant.MARKET_PEER_PRICE_FIRST: "market_peer_best", # 对手方最优价格
xtconstant.MARKET_MINE_PRICE_FIRST: "market_mine_best", # 本方最优价格
}
return price_type_map.get(price_type, f"unknown_{price_type}")
def buy(self, code, price, amount, order_type='limit'):
"""买入股票
Args:
code: 股票代码
price: 买入价格市价单时可为0
amount: 买入数量
order_type: 订单类型'limit'=限价单'market'=市价单默认为'limit'
Returns:
dict: 包含订单ID的字典
"""
# 确定价格类型
price_type = xtconstant.FIX_PRICE # 默认为限价单
if order_type == 'market':
# 市价单,根据不同市场选择合适的市价单类型
if code.startswith('1') or code.startswith('5'):
# 基金等可能需要不同的市价单类型
price_type = xtconstant.MARKET_BEST
else:
price_type = xtconstant.MARKET_BEST # 市价最优价
# 如果是市价单价格可以设为0
if price_type != xtconstant.FIX_PRICE:
price = 0
order_id = self.xt_trader.order_stock(
self.account, code, xtconstant.STOCK_BUY, amount, price_type, price, self._strategy_name, self._remark
)
return {"order_id": order_id}
def sell(self, code, price, amount, order_type='limit'):
"""卖出股票
Args:
code: 股票代码
price: 卖出价格市价单时可为0
amount: 卖出数量
order_type: 订单类型'limit'=限价单'market'=市价单默认为'limit'
Returns:
dict: 包含订单ID的字典
"""
# 确定价格类型
price_type = xtconstant.FIX_PRICE # 默认为限价单
if order_type == 'market':
# 市价单,根据不同市场选择合适的市价单类型
if code.startswith('1') or code.startswith('5'):
# 基金等可能需要不同的市价单类型
price_type = xtconstant.MARKET_BEST
else:
price_type = xtconstant.MARKET_BEST # 市价最优价
# 如果是市价单价格可以设为0
if price_type != xtconstant.FIX_PRICE:
price = 0
order_id = self.xt_trader.order_stock(
self.account, code, xtconstant.STOCK_SELL, amount, price_type, price, self._strategy_name, self._remark
)
return {"order_id": order_id}
def cancel(self, entrust_no):
# 撤单接口需要订单编号
result = self.xt_trader.cancel_order_stock(self.account, int(entrust_no))
return {"cancel_result": result}
def is_trading_time(self):
"""判断当前是否为交易时间
Returns:
bool: True 表示当前为交易时间False 表示当前休市
"""
try:
now = dt.datetime.now()
# 判断是否为工作日(使用 chinese_calendar 判断,会考虑节假日和调休)
if not is_workday(now):
return False
# 判断是否在交易时间段内
current_time = now.time()
morning_start = dt.time(9, 30) # 上午开市时间 9:30
morning_end = dt.time(11, 30) # 上午休市时间 11:30
afternoon_start = dt.time(13, 0) # 下午开市时间 13:00
afternoon_end = dt.time(15, 0) # 下午休市时间 15:00
# 判断是否在上午或下午的交易时段
is_morning_session = morning_start <= current_time <= morning_end
is_afternoon_session = afternoon_start <= current_time <= afternoon_end
return is_morning_session or is_afternoon_session
except Exception as e:
logger.error(f"判断交易时间发生错误: {str(e)}")
return False
if __name__ == "__main__":
trader = XtTrader()
trader.login()
logger.info(f"账户余额: {trader.get_balance()}")
logger.info(f"持仓: {trader.get_positions()}")
logger.info(f"当日成交: {trader.get_today_trades()}")
logger.info(f"当日委托: {trader.get_today_entrust()}")

55
uv.lock generated
View File

@ -1,6 +1,30 @@
version = 1
revision = 1
requires-python = ">=3.12.8"
requires-python = ">=3.10.5"
[[package]]
name = "black"
version = "25.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "mypy-extensions" },
{ name = "packaging" },
{ name = "pathspec" },
{ name = "platformdirs" },
]
sdist = { url = "https://files.pythonhosted.org/packages/94/49/26a7b0f3f35da4b5a65f081943b7bcd22d7002f5f0fb8098ec1ff21cb6ef/black-25.1.0.tar.gz", hash = "sha256:33496d5cd1222ad73391352b4ae8da15253c5de89b93a80b3e2c8d9a19ec2666", size = 649449 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/83/71/3fe4741df7adf015ad8dfa082dd36c94ca86bb21f25608eb247b4afb15b2/black-25.1.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:4b60580e829091e6f9238c848ea6750efed72140b91b048770b64e74fe04908b", size = 1650988 },
{ url = "https://files.pythonhosted.org/packages/13/f3/89aac8a83d73937ccd39bbe8fc6ac8860c11cfa0af5b1c96d081facac844/black-25.1.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1e2978f6df243b155ef5fa7e558a43037c3079093ed5d10fd84c43900f2d8ecc", size = 1453985 },
{ url = "https://files.pythonhosted.org/packages/6f/22/b99efca33f1f3a1d2552c714b1e1b5ae92efac6c43e790ad539a163d1754/black-25.1.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3b48735872ec535027d979e8dcb20bf4f70b5ac75a8ea99f127c106a7d7aba9f", size = 1783816 },
{ url = "https://files.pythonhosted.org/packages/18/7e/a27c3ad3822b6f2e0e00d63d58ff6299a99a5b3aee69fa77cd4b0076b261/black-25.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:ea0213189960bda9cf99be5b8c8ce66bb054af5e9e861249cd23471bd7b0b3ba", size = 1440860 },
{ url = "https://files.pythonhosted.org/packages/98/87/0edf98916640efa5d0696e1abb0a8357b52e69e82322628f25bf14d263d1/black-25.1.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:8f0b18a02996a836cc9c9c78e5babec10930862827b1b724ddfe98ccf2f2fe4f", size = 1650673 },
{ url = "https://files.pythonhosted.org/packages/52/e5/f7bf17207cf87fa6e9b676576749c6b6ed0d70f179a3d812c997870291c3/black-25.1.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:afebb7098bfbc70037a053b91ae8437c3857482d3a690fefc03e9ff7aa9a5fd3", size = 1453190 },
{ url = "https://files.pythonhosted.org/packages/e3/ee/adda3d46d4a9120772fae6de454c8495603c37c4c3b9c60f25b1ab6401fe/black-25.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:030b9759066a4ee5e5aca28c3c77f9c64789cdd4de8ac1df642c40b708be6171", size = 1782926 },
{ url = "https://files.pythonhosted.org/packages/cc/64/94eb5f45dcb997d2082f097a3944cfc7fe87e071907f677e80788a2d7b7a/black-25.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:a22f402b410566e2d1c950708c77ebf5ebd5d0d88a6a2e87c86d9fb48afa0d18", size = 1442613 },
{ url = "https://files.pythonhosted.org/packages/09/71/54e999902aed72baf26bca0d50781b01838251a462612966e9fc4891eadd/black-25.1.0-py3-none-any.whl", hash = "sha256:95e8176dae143ba9097f351d174fdaf0ccd29efb414b362ae3fd72bf0f710717", size = 207646 },
]
[[package]]
name = "blinker"
@ -231,6 +255,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979 },
]
[[package]]
name = "mypy-extensions"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a2/6e/371856a3fb9d31ca8dac321cda606860fa4548858c0cc45d9d1d4ca2628b/mypy_extensions-1.1.0.tar.gz", hash = "sha256:52e68efc3284861e772bbcd66823fde5ae21fd2fdb51c62a211403730b916558", size = 6343 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/79/7b/2c79738432f5c924bef5071f933bcc9efd0473bac3b4aa584a6f7c1c8df8/mypy_extensions-1.1.0-py3-none-any.whl", hash = "sha256:1be4cccdb0f2482337c4743e60421de3a356cd97508abadd57d47403e94f5505", size = 4963 },
]
[[package]]
name = "ordered-set"
version = "4.1.0"
@ -249,6 +282,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469 },
]
[[package]]
name = "pathspec"
version = "0.12.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ca/bc/f35b8446f4531a7cb215605d100cd88b7ac6f44ab3fc94870c120ab3adbf/pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712", size = 51043 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cc/20/ff623b09d963f88bfde16306a54e12ee5ea43e9b597108672ff3a408aad6/pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08", size = 31191 },
]
[[package]]
name = "platformdirs"
version = "4.3.8"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/fe/8b/3c73abc9c759ecd3f1f7ceff6685840859e8070c4d947c93fae71f6a0bf2/platformdirs-4.3.8.tar.gz", hash = "sha256:3d512d96e16bcb959a814c9f348431070822a6496326a4be0911c40b5a74c2bc", size = 21362 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fe/39/979e8e21520d4e47a0bbe349e2713c0aac6f3d853d0e5b34d76206c439aa/platformdirs-4.3.8-py3-none-any.whl", hash = "sha256:ff7059bb7eb1179e2685604f4aaf157cfd9535242bd23742eadc3c13542139b4", size = 18567 },
]
[[package]]
name = "pygments"
version = "2.19.1"
@ -263,6 +314,7 @@ name = "real-trader"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "black" },
{ name = "chinese-calendar" },
{ name = "flask" },
{ name = "flask-limiter" },
@ -272,6 +324,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "black", specifier = ">=25.1.0" },
{ name = "chinese-calendar", specifier = ">=1.10.0" },
{ name = "flask", specifier = ">=3.1.0" },
{ name = "flask-limiter", specifier = ">=3.12" },