Skip to content

构建简单回测框架

回测框架的整体架构

上一章我们了解了回测的基本概念。本章将动手构建一个简单但完整的回测框架。这个框架虽然不追求工业级的复杂度,但会覆盖真实回测中最核心的环节。

一个完整的回测框架需要包含以下模块:

初始化(设置参数、加载数据)

逐日遍历历史数据

信号生成(策略逻辑判断)

下单(模拟交易执行)

更新持仓和净值

计算绩效指标

每个模块的职责如下:

  • 初始化:设置初始资金、交易成本参数,加载历史K线数据
  • 逐日遍历:按时间顺序逐日(或逐周、逐月)扫描历史数据
  • 信号生成:根据策略规则,在每一天判断是否产生买入或卖出信号
  • 下单:根据信号和可用资金,模拟买入或卖出操作
  • 更新持仓和净值:维护当前持仓列表,计算每日账户净值
  • 绩效计算:回测结束后,计算各项绩效指标

数据准备

回测的第一步是准备好高质量的历史数据。对于A股而言,通常需要以下字段:

字段说明
date交易日期
open开盘价(前复权)
high最高价(前复权)
low最低价(前复权)
close收盘价(前复权)
volume成交量
turnover成交额
adj_factor复权因子
is_st是否ST股
is_paused是否停牌

数据的获取可以使用 aksharetushare 等Python库。数据质量是回测的基石,务必在使用前进行以下检查:

python
import pandas as pd

def check_data_quality(df):
    """检查数据质量"""
    issues = []

    # 检查缺失值
    null_counts = df.isnull().sum()
    if null_counts.any():
        issues.append(f"存在缺失值:\n{null_counts[null_counts > 0]}")

    # 检查价格异常(负数或零)
    price_cols = ['open', 'high', 'low', 'close']
    for col in price_cols:
        if (df[col] <= 0).any():
            issues.append(f"{col} 列存在非正值")

    # 检查日期连续性(排除周末和节假日)
    date_diff = df['date'].diff().dt.days
    large_gaps = date_diff[date_diff > 7]  # 超过7天的间隔可能有问题
    if len(large_gaps) > 0:
        issues.append(f"存在 {len(large_gaps)} 个超过7天的日期间隔")

    # 检查高低价关系
    if (df['high'] < df['low']).any():
        issues.append("存在最高价小于最低价的异常数据")

    return issues

核心框架实现

下面我们逐步实现回测框架的各个组件。

第一步:定义数据结构

python
from dataclasses import dataclass, field
from typing import List, Dict
from enum import Enum

class OrderSide(Enum):
    BUY = "buy"
    SELL = "sell"

@dataclass
class Order:
    """订单"""
    date: str           # 下单日期
    code: str           # 股票代码
    side: OrderSide     # 买卖方向
    price: float        # 成交价格
    shares: int         # 成交数量(股)
    amount: float       # 成交金额
    commission: float   # 佣金
    tax: float          # 印花税
    slippage_cost: float  # 滑点成本

@dataclass
class Position:
    """持仓"""
    code: str           # 股票代码
    shares: int         # 持仓数量
    avg_cost: float     # 平均成本价
    buy_date: str       # 首次买入日期

@dataclass
class DailyRecord:
    """每日净值记录"""
    date: str
    cash: float             # 现金
    market_value: float     # 持仓市值
    total_value: float      # 总资产
    positions: Dict         # 当日持仓快照

第二步:初始化引擎

python
class BacktestEngine:
    def __init__(
        self,
        initial_cash: float = 1_000_000,
        commission_rate: float = 0.00025,    # 佣金万2.5
        min_commission: float = 5.0,          # 最低佣金5元
        stamp_tax_rate: float = 0.0005,       # 印花税千0.5(卖出)
        transfer_fee_rate: float = 0.00001,   # 过户费十万分之一
        slippage_rate: float = 0.001,         # 滑点0.1%
    ):
        # 账户状态
        self.cash = initial_cash
        self.initial_cash = initial_cash
        self.positions: Dict[str, Position] = {}
        self.orders: List[Order] = []
        self.daily_records: List[DailyRecord] = []

        # 交易成本参数
        self.commission_rate = commission_rate
        self.min_commission = min_commission
        self.stamp_tax_rate = stamp_tax_rate
        self.transfer_fee_rate = transfer_fee_rate
        self.slippage_rate = slippage_rate

        # 数据
        self.data = None

第三步:交易成本模拟

交易成本模拟是回测框架中最容易被忽略、但对结果影响最大的部分之一。不准确的成本模拟会导致回测结果严重失真。

A股交易成本构成:

买入成本 = 股价 × 数量 + 佣金 + 过户费 + 滑点成本
卖出收入 = 股价 × 数量 - 佣金 - 印花税 - 过户费 - 滑点成本
python
def calculate_buy_cost(self, price: float, shares: int) -> dict:
    """计算买入成本"""
    amount = price * shares

    # 佣金(万2.5,最低5元)
    commission = max(amount * self.commission_rate, self.min_commission)

    # 过户费(十万分之一,双向)
    transfer_fee = amount * self.transfer_fee_rate

    # 滑点成本(假设买入价格上浮0.1%)
    slippage_cost = amount * self.slippage_rate

    total_cost = amount + commission + transfer_fee + slippage_cost

    return {
        'amount': amount,
        'commission': commission,
        'transfer_fee': transfer_fee,
        'slippage_cost': slippage_cost,
        'total_cost': total_cost,
    }

def calculate_sell_cost(self, price: float, shares: int) -> dict:
    """计算卖出成本"""
    amount = price * shares

    # 佣金
    commission = max(amount * self.commission_rate, self.min_commission)

    # 印花税(千分之0.5,仅卖出)
    stamp_tax = amount * self.stamp_tax_rate

    # 过户费
    transfer_fee = amount * self.transfer_fee_rate

    # 滑点成本(假设卖出价格下浮0.1%)
    slippage_cost = amount * self.slippage_rate

    total_deduction = commission + stamp_tax + transfer_fee + slippage_cost
    net_proceeds = amount - total_deduction

    return {
        'amount': amount,
        'commission': commission,
        'stamp_tax': stamp_tax,
        'transfer_fee': transfer_fee,
        'slippage_cost': slippage_cost,
        'total_deduction': total_deduction,
        'net_proceeds': net_proceeds,
    }

下面用一个具体例子来看交易成本的影响:

python
# 假设买入10万元某股票
engine = BacktestEngine()
buy_cost = engine.calculate_buy_cost(price=10.0, shares=10000)
# amount: 100000
# commission: 25 (100000 × 0.00025)
# transfer_fee: 1 (100000 × 0.00001)
# slippage_cost: 100 (100000 × 0.001)
# total_cost: 100126

# 卖出同样的股票(假设价格不变)
sell_cost = engine.calculate_sell_cost(price=10.0, shares=10000)
# amount: 100000
# commission: 25
# stamp_tax: 50 (100000 × 0.0005)
# transfer_fee: 1
# slippage_cost: 100
# total_deduction: 176
# net_proceeds: 99824

# 一买一卖的总成本:126 + 176 = 302 元(约占本金的0.3%)

这意味着,如果你做一次完整的买卖(先买后卖),仅交易成本就会损失约0.3%。如果你的策略是高频交易,交易成本的累积将非常可观。这也是为什么策略的换手率越高,对交易成本的模拟就越重要。

第四步:下单逻辑

python
def buy(self, code: str, price: float, date: str, max_shares: int = None):
    """买入股票"""
    # A股最小交易单位为100股(1手)
    # 计算可买数量
    cost_info = self.calculate_buy_cost(price, 100)  # 先算1手的成本

    if self.cash < cost_info['total_cost']:
        return None  # 资金不足

    # 计算最大可买数量(按100股取整)
    max_possible = int(self.cash / cost_info['total_cost'] * 100)
    max_possible = (max_possible // 100) * 100  # 取整到100股

    if max_shares is not None:
        max_possible = min(max_possible, max_shares)

    if max_possible < 100:
        return None  # 连1手都买不起

    shares = max_possible
    cost_info = self.calculate_buy_cost(price, shares)

    # 扣除资金
    self.cash -= cost_info['total_cost']

    # 更新持仓
    if code in self.positions:
        pos = self.positions[code]
        total_shares = pos.shares + shares
        pos.avg_cost = (pos.avg_cost * pos.shares + price * shares) / total_shares
        pos.shares = total_shares
    else:
        self.positions[code] = Position(
            code=code,
            shares=shares,
            avg_cost=price,
            buy_date=date,
        )

    # 记录订单
    order = Order(
        date=date, code=code, side=OrderSide.BUY,
        price=price, shares=shares, amount=cost_info['amount'],
        commission=cost_info['commission'],
        tax=0,  # 买入无印花税
        slippage_cost=cost_info['slippage_cost'],
    )
    self.orders.append(order)
    return order

def sell(self, code: str, price: float, date: str, shares: int = None):
    """卖出股票"""
    if code not in self.positions:
        return None  # 未持有该股票

    pos = self.positions[code]
    sell_shares = shares if shares is not None else pos.shares
    sell_shares = min(sell_shares, pos.shares)
    sell_shares = (sell_shares // 100) * 100  # 取整到100股

    if sell_shares < 100:
        return None

    cost_info = self.calculate_sell_cost(price, sell_shares)

    # 增加资金
    self.cash += cost_info['net_proceeds']

    # 更新持仓
    pos.shares -= sell_shares
    if pos.shares == 0:
        del self.positions[code]

    # 记录订单
    order = Order(
        date=date, code=code, side=OrderSide.SELL,
        price=price, shares=sell_shares, amount=cost_info['amount'],
        commission=cost_info['commission'],
        tax=cost_info['stamp_tax'],
        slippage_cost=cost_info['slippage_cost'],
    )
    self.orders.append(order)
    return order

第五步:净值更新

python
def update_daily_value(self, date: str, price_dict: Dict[str, float]):
    """更新每日净值"""
    market_value = 0.0
    for code, pos in self.positions.items():
        if code in price_dict:
            market_value += price_dict[code] * pos.shares
        # 如果没有当日价格(停牌等),使用上一次的价格

    total_value = self.cash + market_value

    record = DailyRecord(
        date=date,
        cash=self.cash,
        market_value=market_value,
        total_value=total_value,
        positions={code: {'shares': pos.shares, 'avg_cost': pos.avg_cost}
                   for code, pos in self.positions.items()},
    )
    self.daily_records.append(record)

第六步:主循环

python
def run(self, data: pd.DataFrame, strategy):
    """运行回测"""
    self.data = data

    for i in range(len(data)):
        row = data.iloc[i]
        date = row['date']

        # 获取截至当日的所有历史数据(供策略使用)
        history = data.iloc[:i + 1]

        # 生成交易信号
        signals = strategy.generate_signals(history, self.positions, self.cash)

        # 执行交易信号
        for signal in signals:
            if signal['action'] == 'buy':
                # 使用次日开盘价执行(避免前视偏差)
                if i + 1 < len(data):
                    exec_price = data.iloc[i + 1]['open']
                    self.buy(signal['code'], exec_price, date, signal.get('shares'))
            elif signal['action'] == 'sell':
                if i + 1 < len(data):
                    exec_price = data.iloc[i + 1]['open']
                    self.sell(signal['code'], exec_price, date, signal.get('shares'))

        # 更新每日净值
        price_dict = {row.get('code', 'default'): row['close']}
        self.update_daily_value(date, price_dict)

除权除息处理

A股上市公司会进行分红(派息、送股、转增等),这些事件会导致股价在除权除息日发生跳变。如果不做处理,回测中的价格序列会出现虚假的巨大跌幅,导致错误的交易信号和失真的收益计算。

什么是除权除息

  • 除息(XD):公司派发现金红利后,股价扣除每股红利金额。如某股收盘价20元,每股派1元,除息后基准价为19元。
  • 除权(XR):公司送股或转增后,股数增加,每股价值稀释。如每10股送10股,股价减半。
  • 除权除息(DR):同时发生上述两种情况。

复权处理方式

复权的目的是消除除权除息对价格序列的影响,使得价格具有连续可比性。

前复权(Forward Adjustment):以最新价格为基准,向前调整历史价格。这是回测中最常用的方式,因为前复权价格可以直接反映从过去持有到现在的真实收益率。

python
# 前复权价格 = 原始价格 × 复权因子
# 复权因子会随着除权除息事件不断调整
# 前复权价格从最新一天往回看是准确的

# 示例:某股票经历了每10股送10股
# 原始价格:10, 11, 12, 除权后 → 6, 6.5, 7
# 前复权价格:5, 5.5, 6, 6, 6.5, 7
# (除权前的价格除以2,与除权后的价格连续)

后复权(Backward Adjustment):以最早价格为基准,向后调整价格。后复权价格反映了从上市第一天持有到现在的累计收益率。

在回测框架中,建议直接使用前复权数据(大多数数据源都提供),这样就不需要自己在回测过程中处理复权问题。但如果你使用的是不复权数据,则需要手动处理。

在回测中处理除权除息

如果你使用前复权数据,回测框架本身不需要额外处理。但有一个需要注意的点:持仓成本的计算

假设你以10元买入某股票,之后该股票进行了10送10的除权,前复权价格调整为5元。此时你的持仓成本在前复权体系下也应该调整为5元(因为你的股数翻倍了)。

如果你的回测框架使用前复权价格计算盈亏,就需要确保持仓的成本价也使用前复权价格。最简单的做法是在计算盈亏时始终使用当前的前复权价格与前复权成本价对比。

如果你使用不复权数据,则需要在回测过程中跟踪除权事件:

python
def adjust_position_for_dividend(self, code: str, dividend_info: dict):
    """根据除权除息信息调整持仓"""
    if code not in self.positions:
        return

    pos = self.positions[code]

    # 送股/转增:增加持仓数量,降低平均成本
    bonus_shares = int(pos.shares * dividend_info['bonus_ratio'])  # 送股比例
    if bonus_shares > 0:
        new_total = pos.shares + bonus_shares
        pos.avg_cost = pos.avg_cost * pos.shares / new_total
        pos.shares = new_total

    # 派息:现金分红,增加账户现金
    dividend_cash = pos.shares * dividend_info['cash_per_share']
    # 注意:派息需要扣除红利税(持股超过1年免征,1个月到1年10%,1个月以内20%)
    self.cash += dividend_cash * (1 - dividend_info.get('tax_rate', 0))

策略接口设计

回测框架应该与策略逻辑解耦。策略只需要实现一个生成信号的接口:

python
class Strategy:
    """策略基类"""

    def generate_signals(self, history, positions, cash):
        """
        根据历史数据和当前状态生成交易信号

        Parameters:
            history: 截至当日的DataFrame历史数据
            positions: 当前持仓字典
            cash: 当前可用现金

        Returns:
            list: 交易信号列表,每个信号是包含 action/code/shares 的字典
        """
        raise NotImplementedError

class DualMAStrategy(Strategy):
    """双均线策略示例"""

    def __init__(self, short_window=5, long_window=20):
        self.short_window = short_window
        self.long_window = long_window

    def generate_signals(self, history, positions, cash):
        signals = []

        if len(history) < self.long_window:
            return signals

        short_ma = history['close'].rolling(self.short_window).mean()
        long_ma = history['close'].rolling(self.long_window).mean()

        # 金叉买入
        if short_ma.iloc[-1] > long_ma.iloc[-1] and short_ma.iloc[-2] <= long_ma.iloc[-2]:
            if len(positions) == 0:  # 空仓时买入
                signals.append({'action': 'buy', 'code': history['code'].iloc[-1]})

        # 死叉卖出
        elif short_ma.iloc[-1] < long_ma.iloc[-1] and short_ma.iloc[-2] >= long_ma.iloc[-2]:
            if len(positions) > 0:  # 持仓时卖出
                for code in list(positions.keys()):
                    signals.append({'action': 'sell', 'code': code})

        return signals

运行一个完整的回测示例

python
# 1. 准备数据(这里用模拟数据演示)
import numpy as np
dates = pd.date_range('2022-01-01', '2024-12-31', freq='B')  # 工作日
np.random.seed(42)
prices = 10 + np.cumsum(np.random.randn(len(dates)) * 0.1)

data = pd.DataFrame({
    'date': dates,
    'code': 'DEMO',
    'open': prices * (1 + np.random.randn(len(dates)) * 0.005),
    'high': prices * (1 + abs(np.random.randn(len(dates)) * 0.01)),
    'low': prices * (1 - abs(np.random.randn(len(dates)) * 0.01)),
    'close': prices,
    'volume': np.random.randint(100000, 1000000, len(dates)),
})

# 2. 创建引擎和策略
engine = BacktestEngine(initial_cash=1_000_000)
strategy = DualMAStrategy(short_window=5, long_window=20)

# 3. 运行回测
engine.run(data, strategy)

# 4. 查看结果
print(f"初始资金: {engine.initial_cash:,.0f}")
print(f"最终净值: {engine.daily_records[-1].total_value:,.0f}")
print(f"总交易次数: {len(engine.orders)}")

框架的改进方向

以上框架是一个最小可行版本。在实际使用中,你可能需要逐步添加以下功能:

  1. 多股票支持:支持同时回测多只股票的策略
  2. 仓位管理:支持按比例建仓、分批买卖
  3. 止损止盈:支持固定比例止损、移动止损等
  4. 涨跌停检查:模拟涨停无法买入、跌停无法卖出的情况
  5. 停牌处理:标记停牌日,跳过停牌股票的交易信号
  6. 绩效指标计算:自动计算年化收益率、最大回撤、夏普比率等

这些改进方向我们将在后续章节中逐步展开。下一章将详细介绍策略绩效指标的计算方法,让你能够科学地评价回测结果。

仅供学习交流,不构成任何投资建议