backtrader.indicators.crossover 源代码

#!/usr/bin/env python
"""Crossover Indicator Module - Crossover detection indicators.

This module provides indicators for detecting when two data series cross
each other (upward or downward).

Classes:
    NonZeroDifference: Tracks difference, memorizing last non-zero value (alias: NZD).
    CrossUp: Detects upward crossover.
    CrossDown: Detects downward crossover.
    CrossOver: Detects both directional crossovers.

Example:
    class MyStrategy(bt.Strategy):
        def __init__(self):
            self.sma_fast = bt.indicators.SMA(self.data, period=10)
            self.sma_slow = bt.indicators.SMA(self.data, period=20)
            self.crossover = bt.indicators.CrossOver(self.sma_fast, self.sma_slow)

        def next(self):
            if self.crossover[0] > 0:
                self.buy()
            elif self.crossover[0] < 0:
                self.sell()
"""

from . import Indicator


[文档] class NonZeroDifference(Indicator): """ Keeps track of the difference between two data inputs, memorizing the last non-zero value if the current difference is zero """ _mindatas = 2 alias = ("NZD",) lines = ("nzd",) def __init__(self): """Initialize the NonZeroDifference indicator. Tracks difference between two data sources. """ super().__init__()
[文档] def nextstart(self): """Initialize NZD on first valid bar. Sets initial difference value. """ self.l.nzd[0] = self.data0[0] - self.data1[0]
[文档] def next(self): """Calculate NZD for the current bar. Memorizes last non-zero difference when current difference is zero. """ d = self.data0[0] - self.data1[0] # Memorize last non-zero value new_val = d if d else self.l.nzd[-1] self.l.nzd[0] = new_val
# Don't override once() - let framework call next() for each bar class _CrossBase(Indicator): _mindatas = 2 lines = ("cross",) plotinfo = dict(plotymargin=0.05, plotyhlines=[0.0, 1.0]) def __init__(self): """Initialize the crossover base indicator. Creates NonZeroDifference for crossover detection. """ super().__init__() # CRITICAL: Call parent init first self.nzd = NonZeroDifference(self.data0, self.data1) def next(self): """Detect crossover for the current bar. Returns 1.0 if crossover detected, 0.0 otherwise. """ # Check for crossover if hasattr(self, "_crossup"): if self._crossup: # Upward cross: previous diff < 0 (strictly), now data0 > data1 before = self.nzd(-1) < 0.0 # STRICT inequality after = self.data0[0] > self.data1[0] else: # Downward cross: previous diff > 0 (strictly), now data0 < data1 before = self.nzd(-1) > 0.0 # STRICT inequality after = self.data0[0] < self.data1[0] self.lines.cross[0] = 1.0 if (before and after) else 0.0 else: self.lines.cross[0] = 0.0 # Don't override once() - let framework call next() for each bar
[文档] class CrossUp(_CrossBase): """Upward cross indicator""" _crossup = True
[文档] class CrossDown(_CrossBase): """Downward cross indicator""" _crossup = False
[文档] class CrossOver(Indicator): """ Gives signal for data crossover: 1.0 for upward cross, -1.0 for downward cross, 0.0 otherwise """ _mindatas = 2 lines = ("crossover",) plotinfo = dict(plotymargin=0.05, plotyhlines=[-1.0, 1.0]) def __init__(self): """Initialize the CrossOver indicator. Sets up minperiod and tracking variables for crossover detection. """ super().__init__() # CRITICAL FIX: Inherit minperiod from data sources first # This is needed because the framework's automatic inheritance isn't working if hasattr(self, "datas") and self.datas: data_minperiods = [getattr(d, "_minperiod", 1) for d in self.datas] self._minperiod = max([self._minperiod] + data_minperiods) # CRITICAL FIX: Add minperiod for lookback requirement (nzd(-1) in master) # addminperiod(n) adds n-1 to minperiod, so addminperiod(2) adds 1 self.addminperiod(2) # For next() mode: track last non-zero difference self._last_nzd = None # CRITICAL FIX: Track owner's data length to detect replay mode # In replay mode, we should only calculate crossover when the bar is complete # The owner (strategy) has the actual data feed whose length changes when bars complete self._last_owner_data_len = 0 self._owner_data = None # Will be set to owner's data feed in next()
[文档] def prenext(self): """Track difference during warmup period. Updates _last_nzd for use in nextstart/next crossover detection. """ # Track difference during warmup period so _last_nzd is available in nextstart # This is similar to MACD's prenext that calculates MACD values during warmup diff = self.data0[0] - self.data1[0] # Update _last_nzd (memorize non-zero) if self._last_nzd is None: self._last_nzd = diff else: self._last_nzd = diff if diff != 0.0 else self._last_nzd
[文档] def nextstart(self): """Calculate crossover on first valid bar. Handles replay mode special case and calculates initial crossover. """ # CRITICAL FIX: In replay mode, the first bar after minperiod doesn't have a valid # "previous" bar in the compressed timeframe context. Skip crossover calculation # on the first bar ONLY when in replay mode. For normal mode, calculate normally. diff = self.data0[0] - self.data1[0] # Check if we're in replay mode by checking owner's datas for replaying attribute is_replay = False if hasattr(self, "_owner") and hasattr(self._owner, "datas"): for data in self._owner.datas: if hasattr(data, "replaying") and data.replaying > 0: is_replay = True break if is_replay: # In replay mode, skip crossover on first bar - set to 0 and update _last_nzd self.lines.crossover[0] = 0.0 # Update _last_nzd for next() prev_nzd = self._last_nzd if self._last_nzd is not None else diff self._last_nzd = diff if diff != 0.0 else prev_nzd return # Normal mode: calculate crossover normally # Get previous non-zero difference (set during prenext) prev_nzd = self._last_nzd if self._last_nzd is not None else diff # Check for crossover up_cross = 1.0 if (prev_nzd < 0.0 and self.data0[0] > self.data1[0]) else 0.0 down_cross = 1.0 if (prev_nzd > 0.0 and self.data0[0] < self.data1[0]) else 0.0 self.lines.crossover[0] = up_cross - down_cross # Update _last_nzd for next() self._last_nzd = diff if diff != 0.0 else prev_nzd
[文档] def next(self): """Calculate crossover for the current bar. Returns 1.0 for upward cross, -1.0 for downward cross, 0.0 otherwise. Handles replay mode correctly by deferring calculation when bars are updating. """ # Current difference diff = self.data0[0] - self.data1[0] # CRITICAL FIX: In replay mode with runonce, the same bar is updated multiple times. # The key insight is that we should only calculate crossover when we're at a NEW bar # (idx has advanced), not when we're updating the same bar multiple times. # We detect this by checking if idx < len - 1, which means we haven't advanced yet. # IMPORTANT: Only apply this logic in replay mode, not in exactbars mode! # Check if we're in replay mode is_replay = False if hasattr(self, "_owner") and hasattr(self._owner, "datas"): for data in self._owner.datas: if hasattr(data, "replaying") and data.replaying > 0: is_replay = True break # Only defer crossover calculation in replay mode if is_replay and hasattr(self.lines[0], "idx") and hasattr(self.lines[0], "__len__"): current_idx = self.lines[0].idx current_len = len(self.lines[0]) # If idx < len - 1, we're still filling the current bar, not at a new bar yet # Defer crossover calculation by updating _last_nzd but setting crossover to 0 if current_idx < current_len - 1: # Still updating current bar - defer crossover calculation if self._last_nzd is None: self._last_nzd = diff elif diff != 0.0: self._last_nzd = diff self.lines.crossover[0] = 0.0 return # At this point, we're at a new bar (idx == len - 1), calculate crossover # using the previous bar's difference (stored in _last_nzd or from data[-1]) try: prev_diff = self.data0[-1] - self.data1[-1] # Find last non-zero difference by looking at line values if prev_diff == 0.0: prev_nzd = self._last_nzd if self._last_nzd is not None else diff else: prev_nzd = prev_diff except (IndexError, TypeError): # Fall back to cached value prev_nzd = self._last_nzd if self._last_nzd is not None else diff # Update _last_nzd for next bar if self._last_nzd is None: self._last_nzd = diff elif diff != 0.0: self._last_nzd = diff # Check for crossover using STRICT inequalities # Upward: prev < 0 and now data0 > data1 up_cross = 1.0 if (prev_nzd < 0.0 and self.data0[0] > self.data1[0]) else 0.0 # Downward: prev > 0 and now data0 < data1 down_cross = 1.0 if (prev_nzd > 0.0 and self.data0[0] < self.data1[0]) else 0.0 # Combine self.lines.crossover[0] = up_cross - down_cross
[文档] def once(self, start, end): """Calculate crossover in runonce mode. Vectorized implementation that processes all bars at once. """ # Vectorized once() implementation matching next() behavior d0array = self.data0.array d1array = self.data1.array crossarray = self.line.array # Handle case where data is shorter than minperiod if start >= end: # No bars to process - initialize all to 0 while len(crossarray) < len(d0array): crossarray.append(0.0) return # Ensure array is large enough while len(crossarray) < end: crossarray.append(0.0) # Initialize prev_nzd from prenext period (bar before start) # This matches next() which uses _last_nzd set during prenext if start > 0 and start - 1 < len(d0array): prev_nzd = d0array[start - 1] - d1array[start - 1] # Scan backwards to find last non-zero difference (like prenext does) for j in range(start - 1, -1, -1): diff_j = d0array[j] - d1array[j] if diff_j != 0.0: prev_nzd = diff_j break else: prev_nzd = 0.0 # CRITICAL FIX: For replay mode, skip crossover on the very first bar. # The first bar after minperiod doesn't have a valid "previous" bar in the # compressed timeframe context. Defer crossover to the second bar. # This prevents false positive crossovers at the start of replay data. # ONLY apply this fix when in replay mode. is_replay = False if hasattr(self, "_owner") and hasattr(self._owner, "datas"): for data in self._owner.datas: if hasattr(data, "replaying") and data.replaying > 0: is_replay = True break first_bar = start if is_replay else -1 # -1 means never skip # Process ALL bars from start for i in range(start, end): d0_val = d0array[i] d1_val = d1array[i] diff = d0_val - d1_val # Skip crossover calculation on first bar ONLY in replay mode if i == first_bar: crossarray[i] = 0.0 # Still update prev_nzd for next iteration prev_nzd = diff if diff != 0.0 else prev_nzd continue # Check crossover using prev_nzd (from previous bar) up_cross = 1.0 if (prev_nzd < 0.0 and d0_val > d1_val) else 0.0 down_cross = 1.0 if (prev_nzd > 0.0 and d0_val < d1_val) else 0.0 crossarray[i] = up_cross - down_cross # Update prev_nzd for next iteration (memorize non-zero) prev_nzd = diff if diff != 0.0 else prev_nzd