refactor: logger

This commit is contained in:
zhiyong 2025-04-30 16:30:07 +08:00
parent edc072230e
commit 414dca6ee4
5 changed files with 238 additions and 125 deletions

191
src/logger_config.py Normal file
View File

@ -0,0 +1,191 @@
import logging
from config import Config
import os
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# 确保日志目录存在
def ensure_log_dir(log_dir):
if not os.path.exists(log_dir):
os.makedirs(log_dir)
return log_dir
# 创建通用的日志格式化器
def create_formatter(format_str=None):
if not format_str:
format_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
return logging.Formatter(format_str)
# 创建服务器日志记录器
def setup_server_logger(log_dir="logs", max_bytes=10*1024*1024, backup_count=5, level=logging.INFO, log_format=None):
"""
创建服务器日志记录器
Args:
log_dir: 日志目录
max_bytes: 单个日志文件最大大小
backup_count: 保留的备份文件数量
level: 日志级别
log_format: 日志格式
Returns:
logging.Logger: 日志记录器
"""
log_dir = ensure_log_dir(log_dir)
logger = logging.getLogger('trade_server')
logger.setLevel(level)
# 清除已有的处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 创建文件处理器
file_handler = RotatingFileHandler(
os.path.join(log_dir, 'trade_server.log'),
maxBytes=max_bytes,
backupCount=backup_count
)
formatter = create_formatter(log_format)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 添加控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
# 创建真实交易日志记录器
def setup_real_trader_logger(log_dir="logs", level=logging.INFO, log_format=None):
"""
创建真实交易日志记录器
Args:
log_dir: 日志目录
level: 日志级别
log_format: 日志格式
Returns:
logging.Logger: 日志记录器
"""
log_dir = ensure_log_dir(log_dir)
logger = logging.getLogger('real_trader')
logger.setLevel(level)
# 清除已有的处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 创建每日滚动文件处理器
file_handler = TimedRotatingFileHandler(
os.path.join(log_dir, 'real_trader.log'),
when="midnight",
interval=1,
backupCount=7,
encoding="utf-8"
)
file_handler.suffix = "%Y-%m-%d"
formatter = create_formatter(log_format or '%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 添加控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
# 创建模拟交易日志记录器
def setup_simulation_logger(log_dir="logs", max_bytes=10*1024*1024, backup_count=5, level=logging.INFO, log_format=None):
"""
创建模拟交易日志记录器
Args:
log_dir: 日志目录
max_bytes: 单个日志文件最大大小
backup_count: 保留的备份文件数量
level: 日志级别
log_format: 日志格式
Returns:
logging.Logger: 日志记录器
"""
log_dir = ensure_log_dir(log_dir)
logger = logging.getLogger('simulation_trader')
logger.setLevel(level)
# 清除已有的处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 创建文件处理器
file_handler = RotatingFileHandler(
os.path.join(log_dir, 'simulation_trader.log'),
maxBytes=max_bytes,
backupCount=backup_count
)
formatter = create_formatter(log_format or '%(asctime)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 添加控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
# 创建策略持仓管理日志记录器 (使用与服务器相同的日志记录器)
def setup_strategy_logger():
"""
创建策略持仓管理日志记录器
Returns:
logging.Logger: 日志记录器
"""
return logging.getLogger('trade_server')
# 根据配置创建合适的日志记录器
def get_logger(module_name, log_dir="logs", level=Config.LOG_LEVEL if hasattr(Config, 'LOG_LEVEL') else logging.INFO):
"""
获取适合模块的日志记录器
Args:
module_name: 模块名称 'server', 'real_trader', 'simulation_trader', 'strategy'
log_dir: 日志目录
level: 日志级别
Returns:
logging.Logger: 日志记录器
"""
if module_name == 'server':
return setup_server_logger(log_dir, level=level)
elif module_name == 'real_trader':
return setup_real_trader_logger(log_dir, level=level)
elif module_name == 'simulation_trader':
return setup_simulation_logger(log_dir, level=level)
elif module_name == 'strategy':
return setup_strategy_logger()
else:
# 默认日志记录器
logger = logging.getLogger(module_name)
logger.setLevel(level)
# 确保日志记录器至少有一个处理器
if not logger.handlers:
log_dir = ensure_log_dir(log_dir)
handler = logging.StreamHandler()
handler.setFormatter(create_formatter())
logger.addHandler(handler)
# 添加文件处理器
file_handler = RotatingFileHandler(
os.path.join(log_dir, f'{module_name}.log'),
maxBytes=10*1024*1024,
backupCount=5
)
file_handler.setFormatter(create_formatter())
logger.addHandler(file_handler)
return logger

View File

@ -1,45 +1,14 @@
import logging
from logging.handlers import RotatingFileHandler from logger_config import get_logger
import os
import time
class SimulationTrader: class SimulationTrader:
def __init__(self, logger=None): def __init__(self, logger=None):
self.logger = logger or self._setup_default_logger() self.logger = logger or get_logger('simulation_trader')
# 添加模拟持仓字典,用于追踪模拟交易的持仓 # 添加模拟持仓字典,用于追踪模拟交易的持仓
self.sim_positions = {} self.sim_positions = {}
# 模拟资金账户信息 # 模拟资金账户信息
self.sim_balance = {"cash": 1000000.00, "frozen": 0.00, "total": 1000000.00} self.sim_balance = {"cash": 1000000.00, "frozen": 0.00, "total": 1000000.00}
def _setup_default_logger(self):
"""设置默认日志记录器"""
sim_logger = logging.getLogger('trade_simulation')
sim_logger.setLevel(logging.INFO)
# 确保没有重复的处理器
for handler in sim_logger.handlers[:]:
sim_logger.removeHandler(handler)
# 文件处理器
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
file_handler = RotatingFileHandler(
os.path.join(log_dir, 'trade_simulation.log'),
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
sim_logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
sim_logger.addHandler(console_handler)
return sim_logger
def is_logged_in(self): def is_logged_in(self):
"""检查交易系统是否已经登录 """检查交易系统是否已经登录

View File

@ -1,9 +1,12 @@
import time import time
import os import os
import json import json
import logging
from simulation_trader import SimulationTrader from simulation_trader import SimulationTrader
from xtquant import xtconstant from xtquant import xtconstant
from logger_config import get_logger
# 获取日志记录器
logger = get_logger('strategy')
# 策略仓位管理 # 策略仓位管理
strategy_positions = { strategy_positions = {
@ -19,9 +22,6 @@ pending_orders = {
'simulation': {} # 存储模拟交易未完成委托 'simulation': {} # 存储模拟交易未完成委托
} }
# 获取日志记录器
logger = logging.getLogger('trade_server')
class StrategyPositionManager: class StrategyPositionManager:
"""策略持仓管理器,负责管理不同策略的持仓情况""" """策略持仓管理器,负责管理不同策略的持仓情况"""

View File

@ -3,26 +3,50 @@ import threading
import time import time
from xt_trader import XtTrader from xt_trader import XtTrader
from flask import Flask, request, abort, jsonify from flask import Flask, request, abort, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from config import Config from config import Config
import logging
import os
from logging.handlers import RotatingFileHandler
import concurrent.futures
from concurrent.futures import TimeoutError from concurrent.futures import TimeoutError
import json import concurrent.futures
import atexit import atexit
from simulation_trader import SimulationTrader from simulation_trader import SimulationTrader
import datetime import datetime
from xtquant import xtconstant
from strategy_position_manager import StrategyPositionManager from strategy_position_manager import StrategyPositionManager
from logger_config import get_logger
# 获取日志记录器
logger = get_logger('server')
# 全局交易实例(采用单例模式) # 全局交易实例(采用单例模式)
_sim_trader_instance = None # 模拟交易实例(单例) _sim_trader_instance = None # 模拟交易实例(单例)
_real_trader_instance = None # 实盘交易实例(单例) _real_trader_instance = None # 实盘交易实例(单例)
# 获取模拟交易实例的辅助函数
def get_sim_trader():
"""获取模拟交易实例 - 保证单例模式
Returns:
返回模拟交易单例实例
"""
global _sim_trader_instance
if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader()
return _sim_trader_instance
# 获取实盘交易实例的辅助函数
def get_real_trader():
"""获取实盘交易实例 - 保证单例模式
Returns:
返回实盘交易单例实例
"""
global _real_trader_instance
if _real_trader_instance is None:
_real_trader_instance = XtTrader()
# 检查交易实例是否已登录,如果未登录则进行登录
if not _real_trader_instance.is_logged_in():
logger.info("创建新的XtTrader实例并登录")
_real_trader_instance.login()
return _real_trader_instance
# 获取交易实例 # 获取交易实例
def get_trader(use_sim_trader=False): def get_trader(use_sim_trader=False):
"""获取交易实例 - 采用单例模式 """获取交易实例 - 采用单例模式
@ -33,21 +57,13 @@ def get_trader(use_sim_trader=False):
Returns: Returns:
返回交易实例根据参数和配置决定是模拟交易还是实盘交易 返回交易实例根据参数和配置决定是模拟交易还是实盘交易
""" """
global _sim_trader_instance, _real_trader_instance
# 如果强制使用模拟交易,返回模拟交易单例 # 如果强制使用模拟交易,返回模拟交易单例
if use_sim_trader: if use_sim_trader:
# 如果模拟交易实例不存在,创建一个 return get_sim_trader()
if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader()
return _sim_trader_instance
# 如果配置为仅模拟交易,返回模拟交易单例 # 如果配置为仅模拟交易,返回模拟交易单例
if Config.SIMULATION_ONLY: if Config.SIMULATION_ONLY:
# 如果模拟交易实例不存在,创建一个 return get_sim_trader()
if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader()
return _sim_trader_instance
# 判断当前是否为交易时间 # 判断当前是否为交易时间
now = datetime.datetime.now() now = datetime.datetime.now()
@ -72,59 +88,10 @@ def get_trader(use_sim_trader=False):
# 如果不是交易日或不在交易时间内,返回模拟交易单例 # 如果不是交易日或不在交易时间内,返回模拟交易单例
if not is_trading_day or not is_trading_hour: if not is_trading_day or not is_trading_hour:
logger.info(f"当前非交易时段 - 日期: {now.date()}, 时间: {current_time}, 使用模拟交易") logger.info(f"当前非交易时段 - 日期: {now.date()}, 时间: {current_time}, 使用模拟交易")
# 如果模拟交易实例不存在,创建一个 return get_sim_trader()
if _sim_trader_instance is None:
_sim_trader_instance = SimulationTrader()
return _sim_trader_instance
# 否则返回真实交易单例(如果存在)或创建一个新的
if _real_trader_instance is None:
_real_trader_instance = XtTrader()
# 检查交易实例是否已登录,如果未登录则进行登录
if not _real_trader_instance.is_logged_in():
logger.info("创建新的XtTrader实例并登录")
_real_trader_instance.login()
return _real_trader_instance
# 配置日志
def setup_logger():
log_dir = Config.LOG_DIR
try:
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logger = logging.getLogger('trade_server')
logger.info(f"Created logs directory at: {os.path.abspath(log_dir)}")
except Exception as e:
print(f"Error creating logs directory: {str(e)}")
# 如果无法创建目录,使用当前目录
log_dir = "."
logger = logging.getLogger('trade_server')
logger.setLevel(getattr(logging, Config.LOG_LEVEL))
# 确保没有重复的处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 文件处理器
file_handler = RotatingFileHandler(
os.path.join(log_dir, 'trade_server.log'),
maxBytes=Config.LOG_MAX_BYTES,
backupCount=Config.LOG_BACKUP_COUNT
)
file_handler.setFormatter(logging.Formatter(Config.LOG_FORMAT))
logger.addHandler(file_handler)
# 添加控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(Config.LOG_FORMAT))
logger.addHandler(console_handler)
return logger
logger = setup_logger()
# 否则返回真实交易单例
return get_real_trader()
def run_daily(time_str, job_func): def run_daily(time_str, job_func):
"""设置每天在指定时间运行的任务 """设置每天在指定时间运行的任务

View File

@ -1,7 +1,5 @@
import os import os
import random import random
import logging
from logging.handlers import TimedRotatingFileHandler
from config import Config from config import Config
from xtquant.xttrader import XtQuantTrader from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount from xtquant.xttype import StockAccount
@ -9,22 +7,10 @@ from xtquant import xtconstant
from xtquant.xtdata import get_instrument_detail, get_trading_time from xtquant.xtdata import get_instrument_detail, get_trading_time
import datetime as dt import datetime as dt
from chinese_calendar import is_workday from chinese_calendar import is_workday
from logger_config import get_logger
# 日志配置 # 获取日志记录器
LOG_DIR = "log" logger = get_logger('real_trader')
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
log_path = os.path.join(LOG_DIR, "%Y-%m-%d.log")
logger = logging.getLogger("xt_trader")
logger.setLevel(logging.INFO)
handler = TimedRotatingFileHandler(
os.path.join(LOG_DIR, "xt_trader.log"), when="midnight", interval=1, backupCount=7, encoding="utf-8"
)
handler.suffix = "%Y-%m-%d"
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(handler)
class MyXtQuantTraderCallback: class MyXtQuantTraderCallback:
def on_connected(self): def on_connected(self):