backtrader.linebuffer 源代码

#!/usr/bin/env python
"""LineBuffer Module - Circular buffer storage for time-series data.

This module provides the LineBuffer class which implements a circular
buffer for storing time-series data. The buffer allows efficient
operations like appending, forwarding, rewinding, and resetting.

Key Features:
    - Index 0 always points to the current active value
    - Positive indices fetch past values (left-hand side)
    - Negative indices fetch future values (right-hand side)
    - Automatic memory management with qbuffer
    - Line bindings for automatic value propagation

Classes:
    LineBuffer: Core circular buffer implementation.
    LineActions: Base class for line objects with multiple lines.
    LineActionsMixin: Mixin providing line operations.
    LineActionsCache: Cache system for performance optimization.
    PseudoArray: Wrapper for non-array iterables.
    LinesOperation: Operations on multiple lines.
    LineOwnOperation: Operations on owned lines.

Example:
    Basic buffer usage:
    >>> buf = LineBuffer()
    >>> buf.home()  # Reset to beginning
    >>> buf.forward()  # Move to next position
    >>> buf[0] = 100.0  # Set current value
    >>> print(buf[0])  # Get current value
    100.0
    >>> print(buf[-1])  # Get previous value
"""

import array
import collections
import datetime
import itertools
import math
from itertools import islice, repeat

from . import metabase
from .lineroot import LineRoot, LineRootMixin, LineSingle
from .utils import num2date
from .utils.py3 import range, string_types

NAN = float("NaN")

# PERFORMANCE OPTIMIZATION: Pre-create default datetime for error recovery
# Avoids repeated datetime object creation in hot path
_DEFAULT_DATETIME = datetime.datetime(2000, 1, 1, 0, 0, 0)


# PERFORMANCE OPTIMIZATION: Helper function to check for NaN/None values
# Using value != value is much faster than isinstance + math.isnan
def _is_nan_or_none(value):
    """Fast check for NaN or None values.
    NaN is the only value that's not equal to itself (value != value).
    This is much faster than isinstance(value, float) and math.isnan(value).
    """
    return value is None or value != value


[文档] class LineBuffer(LineSingle, LineRootMixin): """ LineBuffer defines an interface to an "array.array" (or list) in which index 0 points to the item which is active for input and output. Positive indices fetch values from the past (left-hand side) Negative indices fetch values from the future (if the array has been extended on the right-hand side) With this behavior, no index has to be passed around to entities which have to work with the current value produced by other entities: the value is always reachable at "0". Likewise, storing the current value produced by "self" is done at 0. Additional operations to move the pointer (home, forward, extend, rewind, advance getzero) are provided The class can also hold "bindings" to other LineBuffers. When a value is set in this class, it will also be set in the binding. """ # Define LineBuffer mode attributes: UnBounded (0) and QBuffer (1) UnBounded, QBuffer = (0, 1) # Initialization
[文档] def __init__(self): """Initialize the LineBuffer instance. Sets up all internal attributes including the array storage, index pointer, buffer mode, and performance optimization flags. """ # ===== Optimization A: Pre-initialize all attributes to eliminate runtime hasattr checks ===== # Core attributes - must be initialized first self._minperiod = 1 # Minimum period self._array = array.array("d") # Internal array storage self._idx = -1 # Current index self._size = 0 # Current array size # Buffer-related attributes - set to reasonable defaults self.maxlen = 0 # Maximum length (used in QBuffer mode) self.extension = 0 # Extension size self.lencount = 0 # Length counter self.useislice = False # Whether to use islice self.extrasize = 0 # Extra size self.lenmark = 0 # Length mark # Array - initialize as empty array (will be reset based on mode in reset()) self.array = array.array("d") # Lines-related - ensure lines exists if not hasattr(self, "lines"): self.lines = [self] # lines is a list containing itself # Mode and bindings self.mode = self.UnBounded # Default unbounded mode self.bindings = list() # Binding list # Other attributes self._tz = None # Timezone setting self._owner = None # Owner object self._clock = None # Clock object self._ltype = None # Line type # Pre-calculate whether this is an indicator line to avoid repeated checks in hot paths try: self._is_indicator = (self._ltype == 0) or ("Indicator" in str(self.__class__.__name__)) except Exception: self._is_indicator = False # Performance optimization: pre-calculate whether this is a datetime line # to avoid repeated checks in __setitem__. Check once at init and cache the result. self._is_datetime_line = False try: if hasattr(self, "_name"): name_str = str(self._name).lower() self._is_datetime_line = "datetime" in name_str elif hasattr(self, "__class__"): class_str = str(self.__class__.__name__).lower() self._is_datetime_line = "datetime" in class_str except Exception: self._is_datetime_line = False # Pre-calculate default value to avoid repeated checks in __setitem__ if self._is_datetime_line: self._default_value = 1.0 # datetime lines use 1.0 (valid ordinal value) elif self._is_indicator: self._default_value = float("nan") # indicators use NaN else: self._default_value = 0.0 # others use 0.0 # Recursion guard (for __len__) self._in_len = False # Instance attribute guard replacing global set self._dt_cache_idx = None self._dt_cache_value = None self._dt_cache_tz = None self._dt_cache_dt = None # Call reset to complete initialization self.reset() # Reset, call own reset method
# Get the value of _idx
[文档] def get_idx(self): """Get the current index position. Returns: int: The current index in the buffer. """ # Optimization A: Removed hasattr check, __init__ ensures _idx exists return self._idx
# Set the value of _idx
[文档] def set_idx(self, idx, force=False): """Set the index position. Args: idx: The new index value. force: If True, force set even in QBuffer mode at lenmark. Note: In QBuffer mode, when at lenmark, the index stays at 0 unless force is True. This allows resampling operations. """ # If QBuffer and the last position of the buffer were reached, keep # it (unless force) as index 0. This allows resampling # - forward adds a position. However, the 1st one is discarded, the 0 is # invariant # force supports replaying, which needs the extra bar to float # forward/backwards, because the last input is read, and after a # "backwards" is used to update the previous data. Unless position # 0 was moved to the previous index, it would fail # Optimization A: Removed all hasattr checks, __init__ ensures all attributes exist if self.mode == self.QBuffer: if force or self._idx < self.lenmark: self._idx = idx else: # default: UnBounded self._idx = idx
# Property usage: can be used to get and set idx idx = property(get_idx, set_idx) # Reset
[文档] def reset(self): """Resets the internal buffer structure and the indices""" # CRITICAL FIX: In runonce mode, if array is already populated (from _once()), # preserve the array and lencount, only reset idx # Check if we're in runonce mode and array is populated preserve_array = False saved_lencount = 0 try: # Check if this is an indicator line that was processed in runonce mode # Line's _owner might be a Lines object, which has _owner pointing to the indicator if hasattr(self, "_owner") and self._owner is not None: owner = self._owner # Check if owner is a Lines object (which wraps lines for indicators) # Lines objects have _owner pointing to the actual indicator if hasattr(owner, "_owner") and owner._owner is not None: indicator = owner._owner # Check if indicator was processed in runonce mode if hasattr(indicator, "_once_called") and indicator._once_called: # Check if array has data if hasattr(self, "array") and self.array is not None: array_len = len(self.array) if array_len > 0: preserve_array = True saved_lencount = array_len # Also check if owner itself is an indicator elif hasattr(owner, "_once_called") and owner._once_called: # Check if array has data if hasattr(self, "array") and self.array is not None: array_len = len(self.array) if array_len > 0: preserve_array = True saved_lencount = array_len except Exception: pass if preserve_array: # In runonce mode with populated array: only reset idx, preserve array and lencount self.idx = -1 # Keep lencount at saved value (array length) if hasattr(self, "lencount"): self.lencount = saved_lencount self.extension = 0 else: # Normal reset: clear array and reset all counters # Optimization A: Removed hasattr checks, all attributes initialized in __init__ # If in cache mode (QBuffer), use deque to store data with fixed size if self.mode == self.QBuffer: # Add extrasize to ensure resample/replay work deque_maxlen = max(1, self.maxlen + self.extrasize) self.array = collections.deque(maxlen=deque_maxlen) self.useislice = True else: # Non-cache mode, use array.array self.array = array.array("d") self.useislice = False # CRITICAL FIX: Do NOT pre-fill array - this causes buflen() to be incorrect # buflen() = len(array) - extension, so pre-filling increases buflen incorrectly # Instead, let forward() handle array growth naturally # Reset counters and indices self.lencount = 0 self.idx = -1 self.extension = 0
# Set cache-related variables
[文档] def qbuffer(self, savemem=0, extrasize=0): """Enable queued buffer mode for memory-efficient storage. Args: savemem: Memory saving mode (0=normal, >0=enable cache mode). extrasize: Extra buffer size for resampling/replay operations. Note: In QBuffer mode, only the last maxlen values are kept, reducing memory usage for long backtests. """ self.mode = self.QBuffer # Set specific mode self.maxlen = max(1, self._minperiod) # Set maximum length, ensure at least 1 self.extrasize = max(0, extrasize) # Set extra size, ensure non-negative self.lenmark = self.maxlen - (not self.extrasize) # Max length minus 1 if extrasize=0 self.reset() # Reset
# Get indicator values
[文档] def getindicators(self): """Get list of indicators using this line buffer. Returns: list: Empty list for base LineBuffer (override in subclasses). """ return []
# Minimum buffer
[文档] def minbuffer(self, size): """The linebuffer must guarantee the minimum requested size to be available. In non-dqbuffer mode, this is always true (of course, until data is filled at the beginning, there are fewer values, but minperiod in the framework should account for this. In dqbuffer mode, the buffer has to be adjusted for this if currently less than requested """ # If not in cache mode or max length is already >= size, return None if self.mode != self.QBuffer or self.maxlen >= size: return # In cache mode, set maxlen equal to size self.maxlen = size # Max length minus 1 if self.extrasize=0 self.lenmark = self.maxlen - (not self.extrasize) # Reset self.reset()
# Return actual length
[文档] def __len__(self): """ Return the linebuffer's length counter. Performance optimization: Restore master branch's simple implementation - Directly return self.lencount (pre-calculated length value) - Remove all recursion checks, hasattr calls and complex logic - Performance improvement: from 0.611s to ~0.05s (92% improvement) """ return self.lencount
# Return the length of data in the line cache
[文档] def buflen(self): """Real data that can be currently held in the internal buffer The internal buffer can be longer than the actual stored data to allow for "lookahead" operations. The real amount of data that is held/can be held in the buffer is returned """ return len(self.array) - self.extension
[文档] def __getitem__(self, ago): """ Get the value at a specified offset - optimized for hot path. Args: ago (int): Relative offset from current index (0=current, -1=previous, 1=next) Returns: Value at the specified position """ # PERFORMANCE OPTIMIZATION: Fast path for common case (ago <= 0) # Avoid __dict__ access for majority of calls try: current_idx = self._idx lencount = self.lencount if lencount > 0 and current_idx >= lencount: current_idx = lencount - 1 return self.array[current_idx + ago] except IndexError: pass # Slow path: handle special cases # CRITICAL FIX: For data feed lines accessing FUTURE data is_data_feed_line = getattr(self, "_is_data_feed_line", False) if is_data_feed_line and ago > 0: target_idx = self._idx + ago if target_idx >= len(self.array) or self.array[target_idx] == 0.0: raise IndexError("array index out of range") # Check the simple flag for data feed line if is_data_feed_line: raise IndexError("array index out of range") # For indicators and other cases, return appropriate default if getattr(self, "_is_indicator", False): return float("nan") return 0.0
# Get data values, widely used in strategies
[文档] def get(self, ago=0, size=1): """Returns a slice of the array relative to *ago* Keyword Args: ago (int): Point of the array to which size will be added to return the slice size(int): size of the slice to return, can be positive or negative If size is positive *ago* will mark the end of the iterable and vice versa if size is negative Returns: A slice of the underlying buffer """ # Whether to use islice, use following syntax if true if self.useislice: start = self._idx + ago - size + 1 end = self._idx + ago + 1 return list(islice(self.array, start, end)) # If not using islice, directly slice the array return self.array[self._idx + ago - size + 1 : self._idx + ago + 1]
# Return the value at the actual index 0 of the array
[文档] def getzeroval(self, idx=0): """Returns a single value of the array relative to the real zero of the buffer Keyword Args: idx (int): Where to start relative to the real start of the buffer size(int): size of the slice to return Returns: A slice of the underlying buffer """ return self.array[idx]
# Return data of size starting from idx in the array
[文档] def getzero(self, idx=0, size=1): """Returns a slice of the array relative to the real zero of the buffer Keyword Args: idx (int): Where to start relative to the real start of the buffer size(int): size of the slice to return Returns: A slice of the underlying buffer """ if self.useislice: return list(islice(self.array, idx, idx + size)) return self.array[idx : idx + size]
# Set values to the array
[文档] def __setitem__(self, ago, value): """Sets a value at position "ago" and executes any associated bindings Keyword Args: ago (int): Point of the array to which size will be added to return the slice value (variable): value to be set Performance optimization: Use pre-calculated flags to avoid repeated hasattr and string operations """ # Performance optimization: Use try-except instead of hasattr to check array existence # array is already initialized in __init__, this is just a defensive check try: array = self.array except AttributeError: import array as array_module array = array_module.array("d") self.array = array # Performance optimization: Use pre-calculated flags and default values # Handle None/NaN values - use fast path for checking if value is None: value = self._default_value # PERFORMANCE OPTIMIZATION: Use value != value for NaN check elif value != value: # NaN detection without isinstance + isnan value = self._default_value # datetime line value validation elif self._is_datetime_line and value < 1.0: value = 1.0 elif self._is_datetime_line: # For non-numeric datetime line values, convert to 1.0 try: float_value = float(value) value = 1.0 if float_value < 1.0 else float_value except (TypeError, ValueError): value = 1.0 # Calculate the required index required_index = self.idx + ago # Handle index out of bounds - fast path array_len = len(array) if required_index >= array_len: # Performance optimization: Use pre-calculated default value as fill value fill_value = self._default_value extend_size = required_index - array_len + 1 # Batch extend the array for _ in range(extend_size): array.append(fill_value) elif required_index < 0: # Skip setting values for negative indices return # Set the value at the required index array[required_index] = value # Update any bindings - only execute if bindings exist # Performance optimization: bindings are empty in most cases, check before processing if self.bindings: for binding in self.bindings: # Performance optimization: Use try-except to get binding's datetime flag # Most bindings are not datetime lines, fast path try: binding_is_datetime = binding._is_datetime_line except AttributeError: # Binding doesn't have pre-calculated flag, fall back to simple check binding_is_datetime = False binding_value = value if binding_is_datetime and ( not isinstance(binding_value, (int, float)) or binding_value < 1.0 ): binding_value = 1.0 binding[ago] = binding_value
# Set specific value to array
[文档] def set(self, value, ago=0): """Sets a value at position "ago" and executes any associated bindings Keyword Args: value (variable): value to be set ago (int): Point of the array to which size will be added to return the slice """ # CRITICAL FIX: Special handling for datetime lines - NEVER allow 0.0 for datetime! is_datetime_line = (hasattr(self, "_name") and "datetime" in str(self._name).lower()) or ( hasattr(self, "__class__") and "datetime" in str(self.__class__.__name__).lower() ) # CRITICAL FIX: Ensure we never store None values - convert appropriately if value is None: value = 1.0 if is_datetime_line else 0.0 # CRITICAL FIX: Also convert NaN appropriately elif isinstance(value, float) and math.isnan(value): value = 1.0 if is_datetime_line else 0.0 # CRITICAL FIX: If setting invalid value on a datetime line, convert to valid ordinal elif is_datetime_line and (not isinstance(value, (int, float)) or value < 1.0): value = 1.0 # CRITICAL FIX: Ensure array is initialized before accessing if not hasattr(self, "array") or self.array is None: import array self.array = array.array("d") # Ensure array has enough space required_index = self.idx + ago if required_index >= len(self.array): # Extend the array to accommodate the required index extend_size = required_index - len(self.array) + 1 # Use appropriate fill value based on line type fill_value = 1.0 if is_datetime_line else 0.0 for _ in range(extend_size): self.array.append(fill_value) elif required_index < 0: # Handle negative indices gracefully return self.array[required_index] = value for binding in self.bindings: # Apply same datetime protection to bindings binding_is_datetime = ( hasattr(binding, "_name") and "datetime" in str(binding._name).lower() ) or ( hasattr(binding, "__class__") and "datetime" in str(binding.__class__.__name__).lower() ) binding_value = value if binding_is_datetime and ( not isinstance(binding_value, (int, float)) or binding_value < 1.0 ): binding_value = 1.0 binding[ago] = binding_value
# Return to the beginning
[文档] def home(self): """Rewinds the logical index to the beginning The underlying buffer remains untouched and the actual len can be found out with buflen """ self.idx = -1 self.lencount = 0
# Move forward one step
[文档] def forward(self, value=NAN, size=1): """Moves the logical index forward and enlarges the buffer as much as needed Keyword Args: value (variable): value to be set in new positions size (int): How many extra positions to enlarge the buffer """ # PERFORMANCE OPTIMIZATION: Direct attribute access (faster than __dict__.get) # Attributes are guaranteed to exist after __init__ is_indicator = self._is_indicator # PERFORMANCE OPTIMIZATION: Use value != value for NaN check # NaN is the only value that's not equal to itself if value is None or value != value: value = NAN if is_indicator else 0.0 # For non-indicators, follow clock synchronization if not is_indicator: clock = self._clock if clock is not None: try: clock_len = len(clock) current_len = self.lencount if current_len >= clock_len: return max_advance = clock_len - current_len if size > max_advance: size = max_advance if size <= 0: return except Exception: pass # CRITICAL FIX: Ensure we have a valid size if size <= 0: return self.idx += size self.lencount += size # Append data: use module-level 0.0 for non-indicators append_val = value if is_indicator else (0.0 if value != value or value is None else value) if size == 1: self.array.append(append_val) else: # Batch extend for multiple positions self.array.extend([append_val] * size)
# Move backward one step
[文档] def backwards(self, size=1, force=False): """Moves the logical index backwards and reduces the buffer as much as needed Keyword Args: size (int): How many extra positions to rewind the buffer force (bool): Whether to force the reduction of the logical buffer regardless of the minperiod """ # CRITICAL FIX: Match master behavior - use set_idx for force support and pop array elements self.set_idx(self._idx - size, force=force) self.lencount -= size # PERFORMANCE OPTIMIZATION: Use slice deletion instead of loop pop # Called 3.4M+ times, batch deletion is faster than loop arr = self.array arr_len = len(arr) if arr_len > 0: del arr[max(0, arr_len - size) :]
# Move backward one step (original backwards was overridden)
[文档] def safe_backwards(self, size=1): """Safely move the index backwards without raising errors. Args: size: Number of positions to move backwards. Returns: bool: True if index is still >= 0 after moving, False otherwise. """ # CRITICAL FIX: Safe backward navigation if not hasattr(self, "_idx") or self._idx is None: self._idx = -1 return False self._idx -= size return self._idx >= 0
# Decrease idx and lencount by size
[文档] def rewind(self, size=1): """Rewind the buffer by decreasing idx and lencount. Args: size: Number of positions to rewind. """ # CRITICAL FIX: Safe attribute access if hasattr(self, "idx"): self.idx -= size if hasattr(self, "lencount"): self.lencount -= size
# Increase idx and lencount by size
[文档] def advance(self, size=1): """Advances the logical index without touching the underlying buffer""" # CRITICAL FIX: Remove hasattr checks - attributes are always initialized in __init__ # The hasattr checks were preventing proper advancement self.idx += size self.lencount += size
# Extend forward
[文档] def extend(self, value=float("nan"), size=0): """Extends the underlying array with positions that the index will not reach Keyword Args: value (variable): value to be set in new positins size (int): How many extra positions to enlarge the buffer The purpose is to allow for lookahead operations or to be able to set values in the buffer "future" """ self.extension += size for i in range(size): self.array.append(value)
# Add another LineBuffer
[文档] def addbinding(self, binding): """Adds another line binding Keyword Args: binding (LineBuffer): another line that must be set when this line becomes a value """ self.bindings.append(binding) # record in the binding when the period is starting (never sooner # than self) binding.updateminperiod(self._minperiod)
# Get all data starting from idx
[文档] def plot(self, idx=0, size=None): """Returns a slice of the array relative to the real zero of the buffer Keyword Args: idx (int): Where to start relative to the real start of the buffer size(int): size of the slice to return This is a variant of getzero that unless told otherwise returns the entire buffer, which is usually the idea behind plottint (all must be plotted) Returns: A slice of the underlying buffer """ return self.getzero(idx, size or len(self))
# Get partial data from array
[文档] def plotrange(self, start, end): """Get a slice of data from the array. Args: start: Start index of the slice. end: End index of the slice. Returns: list or array: Slice of data from start to end. """ if self.useislice: return list(islice(self.array, start, end)) return self.array[start:end]
# Set array values for each binding when running in once mode
[文档] def oncebinding(self): """ Executes the bindings when running in "once" mode """ larray = self.array blen = self.buflen() for binding in self.bindings: binding.array[0:blen] = larray[0:blen]
# Convert binding to line
[文档] def bind2lines(self, binding=0): """ Stores a binding to another line. "Binding" can be an index or a name """ if isinstance(binding, string_types): line = getattr(self._owner.lines, binding) else: line = self._owner.lines[binding] self.addbinding(line) return self
bind2line = bind2lines
[文档] def __call__(self, ago=None): """Returns either the current value (ago=None) or a delayed LineBuffer that fetches the value which is "ago" periods before. Useful to have the closing price 5 bars before: close(-5) """ if ago is None: return self[0] return LineDelay(self, ago)
def _makeoperation(self, other, operation, r=False, _ownerskip=None, original_other=None): # CRITICAL FIX: Pass parent indicators so LinesOperation can call their _once parent_a = None if hasattr(self, "_owner") and self._owner is not None: owner = self._owner if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: parent_a = owner._owner_ref elif hasattr(owner, "_once"): parent_a = owner parent_b_candidate = original_other if original_other is not None else other parent_b = parent_b_candidate if hasattr(parent_b_candidate, "_once") else None return LinesOperation(self, other, operation, r=r, parent_a=parent_a, parent_b=parent_b) def _makeoperationown(self, operation, _ownerskip=None): # CRITICAL FIX: Pass parent indicator so LineOwnOperation can call its _once parent_a = None if hasattr(self, "_owner") and self._owner is not None: owner = self._owner if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: parent_a = owner._owner_ref elif hasattr(owner, "_once"): parent_a = owner return LineOwnOperation(self, operation, parent_a=parent_a) def _settz(self, tz): self._tz = tz
[文档] def datetime(self, ago=0, tz=None, naive=True): """Get the datetime value at the specified offset. Args: ago: Number of periods to look back (0=current, -1=previous). tz: Timezone to apply. If None, uses self._tz. naive: If True, return naive datetime without timezone info. Returns: datetime: Datetime object representing the timestamp. Raises: IndexError: If the requested position is out of bounds for data feeds. """ # PERFORMANCE OPTIMIZATION: Simplified datetime() method # - Use module-level _DEFAULT_DATETIME constant # - Remove redundant import statements # - Reduce nested try-except blocks # Get value, may raise IndexError for data feeds value = self[ago] # Fast path: Check for NaN/None values if _is_nan_or_none(value): return _DEFAULT_DATETIME if naive else _DEFAULT_DATETIME.replace(tzinfo=tz or self._tz) # Fast path: Common case (ago=0, tz=None, naive=True) with caching if ago == 0 and tz is None and naive: current_idx = self._idx if self.lencount > 0 and current_idx >= self.lencount: current_idx = self.lencount - 1 # Check cache if ( getattr(self, "_dt_cache_idx", None) == current_idx and getattr(self, "_dt_cache_tz", None) is self._tz and getattr(self, "_dt_cache_value", None) == value ): return self._dt_cache_dt # Convert and cache try: dt = num2date(value, self._tz, True) self._dt_cache_idx = current_idx self._dt_cache_tz = self._tz self._dt_cache_value = value self._dt_cache_dt = dt return dt except (ValueError, OverflowError): return _DEFAULT_DATETIME # Slow path: non-default parameters try: return num2date(value, tz or self._tz, naive) except (ValueError, OverflowError): return _DEFAULT_DATETIME if naive else _DEFAULT_DATETIME.replace(tzinfo=tz or self._tz)
[文档] def date(self, ago=0, tz=None, naive=True): """Get the date component of the datetime value at the specified offset. Args: ago: Number of periods to look back (0=current, -1=previous). tz: Timezone to apply. If None, uses self._tz. naive: If True, return naive date without timezone info. Returns: date: Date object representing the date portion of the timestamp. Raises: IndexError: If the requested position is out of bounds for data feeds. """ # CRITICAL FIX: date() calls datetime(), which should raise IndexError if out of range # This allows strategy to detect end of data for next_month calculation try: dt = self.datetime(ago, tz, naive) except IndexError: # Re-raise IndexError to allow strategy to detect end of data raise if dt is None: return None try: return dt.date() except (AttributeError, ValueError): return None
[文档] def time(self, ago=0, tz=None, naive=True): """Get the time component of the datetime value at the specified offset. Args: ago: Number of periods to look back (0=current, -1=previous). tz: Timezone to apply. If None, uses self._tz. naive: If True, return naive time without timezone info. Returns: time: Time object representing the time portion of the timestamp. """ dt = self.datetime(ago, tz, naive) if dt is None: return None try: return dt.time() except (AttributeError, ValueError): return None
[文档] def dt(self, ago=0): """Alias to avoid the extra chars in "datetime" for this field""" return self.datetime(ago)
[文档] def tm_raw(self, ago=0): """ Returns a localtime/gmtime like time.struct_time object which is compatible with strftime formatting. The time zone of the struct_time is naive """ return self.datetime(ago, naive=False).timetuple()
[文档] def tm(self, ago=0): """ Returns a localtime/gmtime like time.struct_time object which is compatible with strftime formatting. The time zone of the struct_time is naive """ return self.datetime(ago, naive=True).timetuple()
[文档] def tm_lt(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is lower than the time carried by the "other" line """ return self[ago] < other[0]
[文档] def tm_le(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is lower than or equal to the time carried by the "other" line """ return self[ago] <= other[0]
[文档] def tm_eq(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is equal to the time carried by the "other" line """ return self[ago] == other[0]
[文档] def tm_gt(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is greater than the time carried by the "other" line """ return self[ago] > other[0]
[文档] def tm_ge(self, other, ago=0): """ Returns True if the time carried by this line's index "ago" is greater than or equal to the time carried by the "other" line """ return self[ago] >= other[0]
[文档] def tm2dtime(self, tm, ago=0): """ Returns the passed tm (time.struct_time) in a datetime using the timezone (if any) of the line """ return datetime.datetime(*tm[:6])
[文档] def tm2datetime(self, tm, ago=0): """ Returns the passed tm (time.struct_time) in a datetime using the timezone (if any) of the line """ return datetime.datetime(*tm[:6])
# LineActions cache for performance
[文档] class LineActionsCache: """Cache system for LineActions to avoid repetitive calculations""" _cache = {} _cache_enabled = False
[文档] @classmethod def enable_cache(cls, enable=True): """Enable or disable the cache. Args: enable: True to enable caching, False to disable. """ cls._cache_enabled = enable
[文档] @classmethod def clear_cache(cls): """Clear all cached values.""" cls._cache.clear()
[文档] @classmethod def get_cache_key(cls, *args): """Generate cache key from arguments""" return hash(tuple(id(arg) if hasattr(arg, "__hash__") else str(arg) for arg in args))
[文档] class LineActionsMixin: """Mixin to provide LineActions functionality without metaclass"""
[文档] @classmethod def dopreinit(cls, _obj, *args, **kwargs): """Pre-initialization processing for LineActions""" # CRITICAL FIX: Set lines._owner BEFORE any user __init__ code runs # This is needed for line bindings like: self.lines.crossover = upcross - downcross if hasattr(_obj, "lines") and _obj.lines is not None: if not hasattr(_obj.lines, "_owner") or _obj.lines._owner is None: _obj.lines._owner = _obj # Set up clock from owner hierarchy _obj._clock = None if hasattr(_obj, "_owner") and _obj._owner is not None: # Try to get clock from owner first if hasattr(_obj._owner, "_clock") and _obj._owner._clock is not None: _obj._clock = _obj._owner._clock # If owner has datas, use the first data as clock elif hasattr(_obj._owner, "datas") and _obj._owner.datas: _obj._clock = _obj._owner.datas[0] # If owner has data attribute, use it as clock elif hasattr(_obj._owner, "data") and _obj._owner.data is not None: _obj._clock = _obj._owner.data # Try the owner itself as clock if it has __len__ elif hasattr(_obj._owner, "__len__"): _obj._clock = _obj._owner # If still no clock found and we have datas, use the first data if _obj._clock is None and hasattr(_obj, "datas") and _obj.datas: _obj._clock = _obj.datas[0] # CRITICAL FIX: Only initialize minperiod to 1 if not already set from data sources # The _minperiod might have been set in __new__ from data sources for nested indicators # (e.g., EMA applied to another indicator's output) if not hasattr(_obj, "_minperiod") or _obj._minperiod is None: _obj._minperiod = 1 # CRITICAL FIX: Calculate minperiod from args (like original metaclass did) # This ensures that indicators applied to other indicators inherit their minperiod from .lineroot import LineMultiple, LineSingle _minperiods = [] # Collect minperiods from LineSingle args for arg in args: if isinstance(arg, LineSingle): _minperiods.append(getattr(arg, "_minperiod", 1)) # Collect minperiods from LineMultiple args (get their first line) for arg in args: if isinstance(arg, LineMultiple) and hasattr(arg, "lines") and arg.lines: try: first_line = arg.lines[0] _minperiods.append(getattr(first_line, "_minperiod", 1)) except (IndexError, TypeError): pass # Update minperiod with max from args if _minperiods: _minperiod = max(_minperiods) _obj.updateminperiod(_minperiod) return _obj, args, kwargs
[文档] @classmethod def dopostinit(cls, _obj, *args, **kwargs): """Post-initialization processing for LineActions""" # NOTE: Indicator registration is now handled in lineiterator.py dopostinit # with proper duplicate checking. No registration needed here. pass
[文档] class PseudoArray: """Wrapper for non-array iterables to provide array-like access. This class wraps iterables (including itertools.repeat) and provides array-like indexing access. It handles cases where the wrapped object doesn't support direct indexing. Attributes: wrapped: The wrapped iterable object. _minperiod: Minimum period inherited from the wrapped object. Example: >>> from itertools import repeat >>> pseudo = PseudoArray(repeat(1.0)) >>> print(pseudo[0]) 1.0 """
[文档] def __init__(self, wrapped): """Initialize PseudoArray with a wrapped iterable. Args: wrapped: The iterable object to wrap. """ self.wrapped = wrapped # CRITICAL FIX: Ensure PseudoArray has _minperiod attribute self._minperiod = getattr(wrapped, "_minperiod", 1)
def __getitem__(self, key): try: # Try normal indexing first return self.wrapped[key] except TypeError: # Handle itertools.repeat objects and other iterables that don't support indexing if hasattr(self.wrapped, "__iter__"): # For repeat objects, all values are the same, so just get the first one try: # Convert to list if it's a repeat object if str(type(self.wrapped)) == "<class 'itertools.repeat'>": # For repeat, all values are the same return next(iter(self.wrapped)) else: # Convert iterable to list and index wrapped_list = list(self.wrapped) return wrapped_list[key] except (StopIteration, IndexError): return float("nan") else: # If not iterable, return the wrapped object itself for index 0 if key == 0: return self.wrapped else: return float("nan") @property def array(self): """Get the array representation of the wrapped object. Returns: list or array: Array representation of the wrapped object. """ # Handle repeat objects specially if str(type(self.wrapped)) == "<class 'itertools.repeat'>": # For repeat objects, return a list with one element repeated return [next(iter(self.wrapped))] elif hasattr(self.wrapped, "array"): return self.wrapped.array else: return self.wrapped
[文档] class LineActions(LineBuffer, LineActionsMixin, metabase.ParamsMixin): """ Base class for *Line Clases* with different lines, derived from a LineBuffer """ _ltype = LineRoot.IndType # Add plotlines attribute for plotting support plotlines = object()
[文档] def __new__(cls, *args, **kwargs): """Handle data processing for indicators and other LineActions objects""" # Create the instance using the normal Python object creation instance = super().__new__(cls) # Initialize basic attributes import collections instance._lineiterators = collections.defaultdict(list) # CRITICAL FIX: Define mindatas before using it mindatas = getattr(cls, "_mindatas", getattr(cls, "mindatas", 1)) # Set up parameters for this instance (needed for self.p.period etc.) if hasattr(cls, "_params") and cls._params is not None: params_cls = cls._params # Create parameter instance for this object instance.p = params_cls() # Update with kwargs for key, value in kwargs.items(): if hasattr(instance.p, key): setattr(instance.p, key, value) else: # Fallback to empty parameter object from .utils import DotDict instance.p = DotDict(**kwargs) # Create and set up Lines instance lines_cls = getattr(cls, "lines", None) if lines_cls is not None: instance.lines = lines_cls() # CRITICAL FIX: Set lines._owner immediately after creating lines instance # Use object.__setattr__ to directly set _owner_ref (bypasses Lines.__setattr__) object.__setattr__(instance.lines, "_owner_ref", instance) # Ensure lines are properly initialized with their own buffers if hasattr(instance.lines, "_obj"): instance.lines._obj = instance # CRITICAL FIX: Ensure lines instance has the essential methods # If the lines instance doesn't have advance method, add it if not hasattr(instance.lines, "advance"): def advance_method(size=1): """Forward all lines in the collection""" for line in getattr(instance.lines, "lines", []): if hasattr(line, "advance"): line.advance(size=size) instance.lines.advance = advance_method # CRITICAL FIX: Set up line references for indicators # Each line should be a separate LineBuffer with its own array if hasattr(instance.lines, "lines") and instance.lines.lines: # Ensure each line is a LineBuffer with its own array for i, line_obj in enumerate(instance.lines.lines): if not isinstance(line_obj, LineBuffer): # Create a new LineBuffer for this line - no import needed, we're in linebuffer.py new_line = LineBuffer() # Copy any existing attributes if hasattr(line_obj, "__dict__"): new_line.__dict__.update(line_obj.__dict__) instance.lines.lines[i] = new_line line_obj = new_line # Ensure the line has its own array if not hasattr(line_obj, "array") or not line_obj.array: import array line_obj.array = array.array("d") line_obj._idx = -1 line_obj.lencount = 0 # Set up convenience references - first line as .line instance.line = instance.lines.lines[0] if instance.lines.lines else instance instance.l = instance.lines # Common shorthand else: # No individual lines, use the instance itself instance.line = instance instance.l = instance.lines else: # Create default lines using the proper Lines class from .lineseries import Lines instance.lines = Lines() # Add the advance method if it doesn't exist if not hasattr(instance.lines, "advance"): def advance_method(size=1): """Forward all lines in the collection""" for line in getattr(instance.lines, "lines", []): if hasattr(line, "advance"): line.advance(size=size) instance.lines.advance = advance_method instance.line = instance instance.l = instance.lines # CRITICAL FIX: Auto-assign data from owner if no data provided and mindatas > 0 if mindatas > 0: # Try to get owner and auto-assign data using multiple strategies from . import metabase owner = None # Strategy 1: Use findowner try: from .strategy import Strategy except ImportError: Strategy = None if Strategy is not None: owner = metabase.findowner(instance, Strategy) # If we found an owner with data, auto-assign it if owner is not None and hasattr(owner, "data") and owner.data is not None: # Check if we already have data in args data_count = 0 for arg in args: if ( hasattr(arg, "lines") or hasattr(arg, "_name") or str(type(arg).__name__).endswith("Data") ): data_count += 1 # If we need more data sources than we have, auto-assign from owner if data_count < mindatas: # Add owner's data as needed missing_data_count = mindatas - data_count for _ in range(missing_data_count): args = (owner.data,) + args # Process arguments to identify data sources data_count = 0 processed_datas = [] for i, arg in enumerate(args): if ( hasattr(arg, "lines") or hasattr(arg, "_name") or str(type(arg).__name__).endswith("Data") ): processed_datas.append(arg) data_count += 1 if data_count >= mindatas: break instance.datas = processed_datas if processed_datas: instance.data = processed_datas[0] else: instance.data = None # Set up dnames if available try: from .utils import DotDict instance.dnames = DotDict( [(d._name, d) for d in instance.datas if getattr(d, "_name", "")] ) except Exception: instance.dnames = {} return instance
def __init__(self, *args, **kwargs): """Initialize LineActions instance. Sets up lines, owner references, data sources, and clock. This is a complex initialization that handles multiple scenarios including indicators, strategies, and data feeds. Args: *args: Positional arguments including data feeds. **kwargs: Keyword arguments for parameters. """ # CRITICAL FIX: Set lines._owner FIRST, before any other initialization # This ensures line bindings in user's __init__ can find the owner if hasattr(self, "lines") and self.lines is not None: # If lines is still a class, create an instance first if isinstance(self.lines, type): self.lines = self.lines() # Now set owner using object.__setattr__ to directly set _owner_ref if self.lines is not None: object.__setattr__(self.lines, "_owner_ref", self) # Set up _owner from call stack BEFORE calling dopreinit from . import metabase # Try to find any LineIterator-like owner # Try findowner first with different classes self._owner = None # First try to find a Strategy specifically try: from .strategy import Strategy self._owner = metabase.findowner(self, Strategy) except Exception: pass # If no Strategy found, try LineIterator if self._owner is None: try: from .lineiterator import LineIterator self._owner = metabase.findowner(self, LineIterator) except Exception: pass # If still no owner, try a broader search # findowner() uses OwnerContext for owner lookup if self._owner is None: self._owner = metabase.findowner(self, None) # Call pre-init self.__class__.dopreinit(self, *args, **kwargs) # Call parent init super().__init__() # Call post-init self.__class__.dopostinit(self, *args, **kwargs)
[文档] def getindicators(self): """Get list of indicators using this line actions object. Returns: list: Empty list for base LineActions (override in subclasses). """ return []
[文档] def qbuffer(self, savemem=0): """Enable queued buffer mode for memory-efficient storage. Args: savemem: Memory saving mode (0=normal, >0=enable cache mode). """ super().qbuffer(savemem=1)
[文档] def plotlabel(self): """Return the plot label for this line object""" # Try to get plot label from _plotlabel method if hasattr(self, "_plotlabel"): label_dict = self._plotlabel() # Convert dict to string format if isinstance(label_dict, dict): # Format as 'ClassName(param1=value1, param2=value2)' params_str = ", ".join(f"{k}={v}" for k, v in label_dict.items()) if params_str: return f"{self.__class__.__name__}({params_str})" else: return self.__class__.__name__ else: return str(label_dict) # Fallback: return class name return self.__class__.__name__
def _plotlabel(self): """Default implementation of plot label""" # Try to get params if available if hasattr(self, "params") and hasattr(self.params, "_getkwargs"): return self.params._getkwargs() # Otherwise return empty dict return {}
[文档] @staticmethod def arrayize(obj): """Convert an object to an array-compatible object. Args: obj: Object to convert. Can be a value, iterable, or array-like. Returns: The original object if it has an array attribute, otherwise a LineNum or PseudoArray wrapper. """ if not hasattr(obj, "array"): if not hasattr(obj, "__getitem__"): # CRITICAL FIX: Create a LineNum that properly handles _minperiod line_num = LineNum(obj) # Ensure the LineNum has the _minperiod attribute if not hasattr(line_num, "_minperiod"): line_num._minperiod = 1 return line_num # make it a LineNum if not hasattr(obj, "__len__"): pseudo_array = PseudoArray(obj) # CRITICAL FIX: Ensure PseudoArray objects have _minperiod for compatibility if not hasattr(pseudo_array, "_minperiod"): pseudo_array._minperiod = 1 return pseudo_array # Can iterate (for once) return obj
def _next_old(self): """DEPRECATED: This method is no longer used. LineIterator._next() is used instead.""" # CRITICAL FIX: Prevent double processing if _once was already called if hasattr(self, "_once_called") and self._once_called: return # Already processed in once mode, don't process again # CRITICAL FIX: Ensure data synchronization without over-advancing if hasattr(self, "_clock") and self._clock is not None: try: clock_len = len(self._clock) self_len = len(self) # Only advance if we're behind the clock and not already at or ahead if self_len < clock_len and (clock_len - self_len) <= 1: # Forward one step to match the clock self.forward() except Exception: # If clock access fails, just forward once self.forward() else: # No clock, just forward once self.forward() # Call prenext or nextstart/next depending on minperiod if len(self) < self._minperiod: self.prenext() elif len(self) == self._minperiod: self.nextstart() # called once for the 1st value over minperiod else: self.next() # called for each value over minperiod def _once(self, start, end): # Mark that once was called to prevent double processing in _next self._once_called = True # CRITICAL FIX: Ensure array exists but don't pre-fill it # Pre-filling causes incorrect buflen() calculations if not hasattr(self, "array") or self.array is None: import array as array_module self.array = array_module.array("d") # CRITICAL FIX: Ensure proper range for once processing if start < 0: start = 0 if end < start: end = start # CRITICAL FIX: Get the actual buffer length if available if hasattr(self, "_clock") and self._clock and hasattr(self._clock, "buflen"): max_len = self._clock.buflen() if end > max_len: end = max_len # CRITICAL FIX: Call _once() on all child line iterators first # This ensures dependencies are calculated before this indicator if hasattr(self, "_lineiterators"): from .lineiterator import LineIterator for indicator in self._lineiterators.get(LineIterator.IndType, []): try: if hasattr(indicator, "_once"): indicator._once(start, end) except Exception: # Continue processing other indicators if one fails pass # CRITICAL FIX: Call preonce before main processing try: if hasattr(self, "preonce"): self.preonce(start, end) except Exception: pass # CRITICAL FIX: Process the main once calculation # Try to call once method if it exists try: if hasattr(self, "once") and callable(self.once): self.once(start, end) except Exception: # If once fails or doesn't exist, skip it # The indicator will be calculated via next() calls during strategy execution pass # CRITICAL FIX: Update lencount after once processing to match the data length # In runonce mode, lencount should equal the number of data points processed # Get the actual data length from the clock or data source actual_data_len = end try: # Try to get the actual data length from clock or data sources if hasattr(self, "_clock") and self._clock: try: actual_data_len = self._clock.buflen() except Exception: try: actual_data_len = len(self._clock) except Exception: pass elif hasattr(self, "datas") and self.datas and len(self.datas) > 0: try: actual_data_len = self.datas[0].buflen() except Exception: try: actual_data_len = len(self.datas[0]) except Exception: pass # Use the maximum of end and actual_data_len to ensure we don't truncate final_len = max(end, actual_data_len) if actual_data_len > 0 else end except Exception: final_len = end if hasattr(self, "lines") and hasattr(self.lines, "lines") and self.lines.lines: # Update lencount for all lines to match the data length for line in self.lines.lines: if hasattr(line, "lencount"): # CRITICAL FIX: Set lencount to final_len (actual data length) # This ensures len(indicator) == len(strategy) in runonce mode line.lencount = final_len if hasattr(line, "_idx"): # Set _idx to the last processed position line._idx = final_len - 1 if final_len > 0 else -1
[文档] @classmethod def cleancache(cls): """Clean the cache - called by cerebro""" LineActionsCache.clear_cache()
[文档] @classmethod def usecache(cls, enable=True): """Enable or disable the cache""" LineActionsCache.enable_cache(enable)
[文档] def LineDelay(a, ago=0, **kwargs): """Create a delayed line object. Args: a: Source line object. ago: Number of periods to delay. Negative for lookback. **kwargs: Additional keyword arguments. Returns: _LineDelay or _LineForward: A delayed line object. """ if ago <= 0: return _LineDelay(a, ago, **kwargs) return _LineForward(a, ago)
[文档] def LineNum(num): """Create a constant line from a number. Args: num: The constant value. Returns: _LineDelay: A line object that always returns the constant value. """ return _LineDelay(PseudoArray(repeat(num)), 0)
class _LineDelay(LineActions): """Delayed line object for negative ago values (lookback). This class represents a line that accesses historical values from another line. For example, data(-1) returns the previous bar's value. Attributes: a: The source line object. ago: Number of periods to look back (negative value). """ def __init__(self, a, ago): """Initialize the delayed line. Args: a: Source line object. ago: Number of periods to look back (negative value). """ super().__init__() self.a = self.arrayize(a) self.ago = ago # CRITICAL FIX: Inherit minperiod from source's owner (indicator) if available # When called as nzd(-1), 'a' is nzd.lines[0] which has minperiod=1, # but the indicator nzd has minperiod=20. We need to use the indicator's minperiod. source_minperiod = getattr(a, "_minperiod", 1) # Check if source has an owner with a higher minperiod if hasattr(a, "_owner") and a._owner is not None: owner = a._owner # Check for _owner_ref (Lines object pointing to indicator) if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: owner_minperiod = getattr(owner._owner_ref, "_minperiod", 1) source_minperiod = max(source_minperiod, owner_minperiod) else: owner_minperiod = getattr(owner, "_minperiod", 1) source_minperiod = max(source_minperiod, owner_minperiod) # Update our minperiod with the source's minperiod if source_minperiod > 1: self.updateminperiod(source_minperiod) # Need to add the delay to the period. "ago" is 0 based and therefore # we need to pass an extra 1 which is the minimum defined period for # any data (which will be subtracted inside addminperiod) # CRITICAL FIX: Must add abs(ago) + 1, NOT just abs(ago) self.addminperiod(abs(ago) + 1) def __getitem__(self, idx): """CRITICAL FIX: Override __getitem__ to compute delayed value dynamically. This handles constants wrapped in PseudoArray correctly. For ago=-10 (lookback), accessing [0] should return self.a[-10] (10 bars back). Formula: self.a[idx + ago] where ago is negative for lookback. """ try: # For delay operations, get value from source with delay applied # ago is negative for lookback, so idx + ago gives historical index return self.a[idx + self.ago] except (IndexError, TypeError): return 0.0 def next(self): """Calculate and set the delayed value for the current bar. Gets the value from the source line at the delayed position and stores it at position 0. """ # CRITICAL FIX: Proper delay operation # ago is negative for lookback (e.g., ago=-10 means 10 bars back) # We need self.a[ago] to get the historical value try: # Get the delayed value - ago is already negative for lookback delayed_val = self.a[self.ago] # Ensure value is never None or NaN if delayed_val is None: delayed_val = 0.0 elif isinstance(delayed_val, float) and math.isnan(delayed_val): delayed_val = 0.0 self[0] = delayed_val except (IndexError, AttributeError): # If we can't get the delayed value, use 0.0 self[0] = 0.0 def once(self, start, end): """Calculate delayed values in batch mode (runonce). Args: start: Starting index. end: Ending index. """ # cache python dictionary lookups dst = self.array ago = self.ago # CRITICAL FIX: Ensure destination array is properly sized while len(dst) < end: dst.append(0.0) # CRITICAL FIX: Ensure source has computed its values before we access them # This is necessary for LinesOperation sources that haven't run once() yet if hasattr(self.a, "once") and hasattr(self.a, "array") and len(self.a.array) < end: self.a.once(start, end) # CRITICAL FIX: Check if source is a constant value (PseudoArray with repeat) # We need to check the wrapped object, not just the array, because # PseudoArray.array returns a new list each time is_constant = False constant_value = None # Check if self.a is a PseudoArray wrapping a repeat object # OR if self.a is a _LineDelay that wraps a PseudoArray with repeat source_obj = self.a if hasattr(self.a, "a"): # self.a is a _LineDelay, check its source source_obj = self.a.a if hasattr(source_obj, "wrapped"): wrapped = source_obj.wrapped # Check if it's a repeat object if ( isinstance(wrapped, itertools.repeat) or str(type(wrapped)) == "<class 'itertools.repeat'>" ): is_constant = True try: # Get the constant value from the repeat object # Create a new iterator to avoid consuming it constant_value = next(iter(wrapped)) # Ensure constant value is not None or NaN if constant_value is None: constant_value = 0.0 elif isinstance(constant_value, float) and math.isnan(constant_value): constant_value = 0.0 except (StopIteration, TypeError): constant_value = 0.0 # If not a constant, get the source array if not is_constant: src = self.a.array for i in range(start, end): if is_constant: # For constant values, just use the constant dst[i] = constant_value else: # CRITICAL FIX: Proper bounds checking for delayed access # For ago=-26 (forward shift), we need to access src[i + ago] = src[i - 26] # to get the value calculated 26 bars ago src_index = i + ago if src_index >= 0 and src_index < len(src): val = src[src_index] # Ensure value is never None or NaN if val is None: val = 0.0 elif isinstance(val, float) and math.isnan(val): val = 0.0 dst[i] = val elif len(src) > 0: # If index is out of bounds but we have source data, use the last available value val = src[-1] if val is None: val = 0.0 elif isinstance(val, float) and math.isnan(val): val = 0.0 dst[i] = val else: # If no source data available, use 0.0 dst[i] = 0.0 class _LineForward(LineActions): """Forward line object for positive ago values (lookahead). This class represents a line that accesses future values from another line. For example, data(1) returns the next bar's value. Attributes: a: The source line object. ago: Number of periods to look ahead (positive value). """ def __init__(self, a, ago): """Initialize the forward line. Args: a: Source line object. ago: Number of periods to look ahead (positive value). """ super().__init__() self.a = self.arrayize(a) self.ago = ago # Need to add the delay to the period if hasattr(a, "_minperiod"): self.addminperiod(ago) def next(self): """Calculate and set the forwarded value for the current bar. Gets the value from the source line at the forward position and stores it at position 0. """ # operation(float, other) ... expecting other to be a float # CRITICAL FIX: Ensure we get valid numeric values for indicator calculations try: # Get operand values with proper type checking if hasattr(self.a, "__getitem__"): # LineBuffer-like object - get current value try: a_val = self.a[0] except (IndexError, TypeError): a_val = 0.0 else: # Direct value a_val = self.a if hasattr(self.b, "__getitem__"): # LineBuffer-like object - get current value try: b_val = self.b[0] except (IndexError, TypeError): b_val = 0.0 else: # Direct value b_val = self.b # CRITICAL FIX: Ensure values are numeric and not None/NaN if a_val is None: a_val = 0.0 elif isinstance(a_val, float) and math.isnan(a_val): a_val = 0.0 elif not isinstance(a_val, (int, float)): try: a_val = float(a_val) except (ValueError, TypeError): a_val = 0.0 if b_val is None: b_val = 0.0 elif isinstance(b_val, float) and math.isnan(b_val): b_val = 0.0 elif not isinstance(b_val, (int, float)): try: b_val = float(b_val) except (ValueError, TypeError): b_val = 0.0 # CRITICAL FIX: Actually perform the operation and store the result # Handle both normal and reverse operations if hasattr(self, "operation") and self.operation: # CRITICAL FIX: Handle reverse operations properly if getattr(self, "r", False): result = self.operation(b_val, a_val) # Reverse: b op a else: result = self.operation(a_val, b_val) # Normal: a op b # Ensure result is a valid number if result is None: result = 0.0 elif isinstance(result, float) and math.isnan(result): result = 0.0 elif not isinstance(result, (int, float)): try: result = float(result) except (ValueError, TypeError): result = 0.0 # Store the result in the current position self[0] = result else: # Fallback: store a_val if no operation is defined self[0] = a_val except Exception: # If anything fails, store 0.0 to prevent crashes # print(f"LinesOperation.next() error: {e}") # Removed for performance self[0] = 0.0 def once(self, start, end): """Calculate forwarded values in batch mode (runonce). Args: start: Starting index. end: Ending index. """ # cache python dictionary lookups dst = self.array srca = self.a.array op = self.operation # CRITICAL FIX: Ensure destination array is properly sized while len(dst) < end: dst.append(0.0) # CRITICAL FIX: Ensure source array has required data if len(srca) < end: # If source array is shorter than required range, only process available data end = min(end, len(srca)) for i in range(start, end): try: # CRITICAL FIX: Bounds checking for source array a_val = srca[i] if i < len(srca) else 0.0 # Ensure value is numeric if a_val is None or (isinstance(a_val, float) and math.isnan(a_val)): a_val = 0.0 result = op(a_val) # Ensure result is valid if result is None or (isinstance(result, float) and math.isnan(result)): result = 0.0 dst[i] = result except Exception: # If operation fails, store 0.0 dst[i] = 0.0
[文档] class LinesOperation(LineActions): """Operation between two line objects (binary operations). This class represents binary operations (addition, subtraction, etc.) between two line objects. The result is a new line that contains the element-wise operation result. Attributes: operation: The binary function to apply (e.g., operator.add). a: First operand (left-hand side). b: Second operand (right-hand side). r: If True, reverse operation order. _parent_a: Parent indicator for operand a. _parent_b: Parent indicator for operand b. Example: >>> result = LinesOperation(indicator1, indicator2, operator.sub) >>> # result[0] = indicator1[0] - indicator2[0] """ def __init__(self, a, b, operation, r=False, parent_a=None, parent_b=None): """Initialize a binary operation between two line objects. Args: a: First operand (left-hand side). b: Second operand (right-hand side). operation: The binary function to apply (e.g., operator.add). r: If True, reverse operation order (b op a instead of a op b). parent_a: Parent indicator for operand a. parent_b: Parent indicator for operand b. """ super().__init__() self.operation = operation self.a = a # always a linebuffer-like object self.b = self.arrayize(b) self.r = r # CRITICAL FIX: Store references to parent indicators for _once processing # Use passed parent references if available, otherwise try to find them self._parent_a = parent_a if parent_a is not None else self._find_parent_indicator(a) self._parent_b = parent_b if parent_b is not None else self._find_parent_indicator(b) # ensure a is added if it's a lineiterator-like object # self.addminperiod(1) already done by the base class # CRITICAL FIX: Handle _minperiod attribute access more safely a_minperiod = getattr(a, "_minperiod", 1) if hasattr(a, "_minperiod") else 1 b_minperiod = getattr(b, "_minperiod", 1) if hasattr(b, "_minperiod") else 1 # Use updateminperiod to take max of operand minperiods # For me1 - me2, minperiod = max(me1._minperiod, me2._minperiod) max_minperiod = max(a_minperiod, b_minperiod) self.updateminperiod(max_minperiod) def _find_parent_indicator(self, operand): """Find the parent indicator that owns this operand (LineBuffer)""" # If operand is already an indicator (has _once method), return it if hasattr(operand, "_once") and hasattr(operand, "_lineiterators"): return operand # If operand is a LineBuffer, try to find its owner indicator if hasattr(operand, "_owner") and operand._owner is not None: owner = operand._owner # Check if owner has _owner_ref pointing to the indicator if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: return owner._owner_ref # Check if owner itself is an indicator if hasattr(owner, "_once") and hasattr(owner, "_lineiterators"): return owner return None
[文档] def __getitem__(self, ago): """CRITICAL FIX: Override __getitem__ to compute value dynamically from source operands. This ensures correct values in runonce mode where LinesOperation's _idx may not be properly advanced because it's not registered as IndType. """ try: # Get values from source operands at the same relative position a_val = self.a[ago] if hasattr(self.a, "__getitem__") else self.a b_val = self.b[ago] if hasattr(self.b, "__getitem__") else self.b # Handle None/NaN values if a_val is None or (isinstance(a_val, float) and a_val != a_val): return float("nan") if b_val is None or (isinstance(b_val, float) and b_val != b_val): return float("nan") # Compute and return the operation result if self.r: return self.operation(b_val, a_val) else: return self.operation(a_val, b_val) except (IndexError, TypeError): return float("nan")
def _next(self): """CRITICAL FIX: _next() method for compatibility with LineIterator processing loop. This method is called by LineIterator._next() for items in _lineiterators[IndType]. """ # Advance the line buffer self.advance() # Call next() to compute the value self.next() # Update bindings so bound lines get the computed value for binding in self.bindings: binding[0] = self[0]
[文档] def next(self): """Calculate and set the operation result for the current bar. Performs the binary operation on the current values of both operands and stores the result at position 0. """ # operation(float, other) ... expecting other to be a float # CRITICAL FIX: Ensure we get valid numeric values for indicator calculations try: # Get operand values with proper type checking if hasattr(self.a, "__getitem__"): # LineBuffer-like object - get current value try: a_val = self.a[0] except (IndexError, TypeError): a_val = 0.0 else: # Direct value a_val = self.a if hasattr(self.b, "__getitem__"): # LineBuffer-like object - get current value try: b_val = self.b[0] except (IndexError, TypeError): b_val = 0.0 else: # Direct value b_val = self.b # CRITICAL FIX: Ensure values are numeric and not None/NaN if a_val is None: a_val = 0.0 elif isinstance(a_val, float) and math.isnan(a_val): a_val = 0.0 elif not isinstance(a_val, (int, float)): try: a_val = float(a_val) except (ValueError, TypeError): a_val = 0.0 if b_val is None: b_val = 0.0 elif isinstance(b_val, float) and math.isnan(b_val): b_val = 0.0 elif not isinstance(b_val, (int, float)): try: b_val = float(b_val) except (ValueError, TypeError): b_val = 0.0 # CRITICAL FIX: Actually perform the operation and store the result # Handle both normal and reverse operations if hasattr(self, "operation") and self.operation: # CRITICAL FIX: Handle reverse operations properly if getattr(self, "r", False): result = self.operation(b_val, a_val) # Reverse: b op a else: result = self.operation(a_val, b_val) # Normal: a op b # Ensure result is a valid number if result is None: result = 0.0 elif isinstance(result, float) and math.isnan(result): result = 0.0 elif not isinstance(result, (int, float)): try: result = float(result) except (ValueError, TypeError): result = 0.0 # Store the result in the current position self[0] = result else: # Fallback: store a_val if no operation is defined self[0] = a_val except Exception: # If anything fails, store 0.0 to prevent crashes # print(f"LinesOperation.next() error: {e}") # Removed for performance self[0] = 0.0
[文档] def once(self, start, end): """Calculate operation results in batch mode (runonce). Args: start: Starting index. end: Ending index. """ # Check if array is already populated (avoid redundant work) if len(self.array) >= end: return # CRITICAL FIX: Always use start=0 for nested operations # This ensures historical values are available for indicators like SMA nested_start = 0 # CRITICAL FIX: Call parent indicators' once() methods to populate their arrays # This is needed for cases like dif = ema_1 - ema_2 where ema_1/ema_2 must be computed first if self._parent_a is not None and hasattr(self._parent_a, "once"): try: self._parent_a.once(nested_start, end) except Exception: pass if self._parent_b is not None and hasattr(self._parent_b, "once"): try: self._parent_b.once(nested_start, end) except Exception: pass # CRITICAL FIX: Call once() on ALL operands that have it (not just LinesOperations) # This ensures LineBuffer operands (like indicator outputs) are also computed if hasattr(self.a, "once"): try: self.a.once(nested_start, end) except Exception: pass if hasattr(self.b, "once"): try: self.b.once(nested_start, end) except Exception: pass # CRITICAL FIX: Always process from 0 to populate historical values if hasattr(self.b, "array"): self._once_op(nested_start, end) else: if isinstance(self.b, float): ( self._once_val_op_r(nested_start, end) if self.r else self._once_val_op(nested_start, end) ) else: self._once_time_op(nested_start, end) # CRITICAL FIX: Call oncebinding to copy computed values to bound lines # This is needed in runonce mode where once() computes all values at once self.oncebinding()
def _once_op(self, start, end): # CRITICAL FIX: Ensure b's array is populated if b is a _LineDelay or similar if hasattr(self.b, "once") and len(self.b.array) < end: try: self.b.once(start, end) except Exception: pass # cache python dictionary lookups dst = self.array srca = self.a.array srcb = self.b.array op = self.operation # Ensure destination array is sized for direct index assignment while len(dst) < end: dst.append(float("nan")) # Clip processing range to available source data # CRITICAL FIX: Check if b is a _LineDelay wrapping a constant (PseudoArray) # In this case, srcb will be empty but b[i] will return the constant is_constant_b = ( len(srcb) == 0 and hasattr(self.b, "a") and type(self.b.a).__name__ == "PseudoArray" ) if is_constant_b: # b is a _LineDelay wrapping a constant - use srca length only end = min(end, len(srca)) else: end = min(end, len(srca), len(srcb)) # Use dynamic access for constant values wrapped in _LineDelay use_dynamic_b = is_constant_b # CRITICAL FIX: Always process from 0 to ensure historical values are available # This is needed for indicators like SMA that need historical values for their calculations actual_start = 0 for i in range(actual_start, end): try: a_val = srca[i] if use_dynamic_b: b_val = self.b[i] # Use __getitem__ for constants else: b_val = srcb[i] # Preserve NaN semantics for indicators: if any operand is None/NaN -> NaN if a_val is None or a_val != a_val or b_val is None or b_val != b_val: dst[i] = float("nan") continue if self.r: result = op(b_val, a_val) else: result = op(a_val, b_val) # Preserve NaN semantics if result is None or result != result: result = float("nan") dst[i] = result except Exception: # If operation fails, store NaN for indicator semantics dst[i] = float("nan") def _once_time_op(self, start, end): # cache python dictionary lookups dst = self.array srca = self.a.array srcb = self.b[0] op = self.operation # Ensure destination array is sized for direct index assignment while len(dst) < end: dst.append(float("nan")) # Clip processing range to available source data end = min(end, len(srca)) for i in range(start, end): try: a_val = srca[i] # Preserve NaN semantics if a_val is None or a_val != a_val or srcb is None or srcb != srcb: dst[i] = float("nan") continue if self.r: result = op(srcb, a_val) else: result = op(a_val, srcb) if result is None or result != result: result = float("nan") dst[i] = result except Exception: dst[i] = float("nan") def _once_val_op(self, start, end): # cache python dictionary lookups dst = self.array srca = self.a.array srcb = self.b op = self.operation # Ensure destination array is sized for direct index assignment while len(dst) < end: dst.append(float("nan")) # Clip processing range to available source data end = min(end, len(srca)) for i in range(start, end): try: a_val = srca[i] if a_val is None or a_val != a_val or srcb is None or srcb != srcb: dst[i] = float("nan") continue result = op(a_val, srcb) if result is None or result != result: result = float("nan") dst[i] = result except Exception: dst[i] = float("nan") def _once_val_op_r(self, start, end): # cache python dictionary lookups dst = self.array srca = self.a.array srcb = self.b op = self.operation # Ensure destination array is sized for direct index assignment while len(dst) < end: dst.append(float("nan")) # Clip processing range to available source data end = min(end, len(srca)) for i in range(start, end): try: a_val = srca[i] if a_val is None or a_val != a_val or srcb is None or srcb != srcb: dst[i] = float("nan") continue result = op(srcb, a_val) if result is None or result != result: result = float("nan") dst[i] = result except Exception: dst[i] = float("nan")
[文档] class LineOwnOperation(LineActions): """Operation on a single line object (unary operations). This class represents unary operations (negation, absolute value, etc.) on a single line object. The result is a new line that contains the element-wise operation result. Attributes: operation: The unary function to apply (e.g., operator.neg). a: The operand (line object). _parent_a: Parent indicator for the operand. Example: >>> result = LineOwnOperation(indicator, operator.neg) >>> # result[0] = -indicator[0] """ def __init__(self, a, operation, parent_a=None): """Initialize a unary operation on a line object. Args: a: The operand (line object). operation: The unary function to apply (e.g., operator.neg). parent_a: Parent indicator for the operand. """ super().__init__() self.operation = operation self.a = a # CRITICAL FIX: Store reference to parent indicator for _once processing self._parent_a = parent_a if parent_a is not None else self._find_parent_indicator(a) # CRITICAL FIX: Handle _minperiod attribute access more safely a_minperiod = getattr(a, "_minperiod", 1) if hasattr(a, "_minperiod") else 1 self.addminperiod(a_minperiod) def _find_parent_indicator(self, operand): """Find the parent indicator that owns this operand (LineBuffer)""" if hasattr(operand, "_once") and hasattr(operand, "_lineiterators"): return operand if hasattr(operand, "_owner") and operand._owner is not None: owner = operand._owner if hasattr(owner, "_owner_ref") and owner._owner_ref is not None: return owner._owner_ref if hasattr(owner, "_once") and hasattr(owner, "_lineiterators"): return owner return None
[文档] def __getitem__(self, ago): """CRITICAL FIX: Override __getitem__ to compute value dynamically from source operand.""" try: a_val = self.a[ago] if hasattr(self.a, "__getitem__") else self.a if a_val is None or (isinstance(a_val, float) and a_val != a_val): return float("nan") return self.operation(a_val) except (IndexError, TypeError): return float("nan")
[文档] def next(self): """Calculate and set the unary operation result for the current bar. Performs the unary operation on the current value of the operand and stores the result at position 0. """ self[0] = self.operation(self.a[0])
[文档] def once(self, start, end): """Calculate unary operation results in batch mode (runonce). Args: start: Starting index. end: Ending index. """ # CRITICAL FIX: Ensure source operand is processed first if self._parent_a is not None and hasattr(self._parent_a, "_once"): try: self._parent_a._once(start, end) except Exception: pass # cache python dictionary lookups dst = self.array srca = self.a.array op = self.operation # CRITICAL FIX: Ensure destination array is properly sized while len(dst) < end: dst.append(float("nan")) # CRITICAL FIX: Ensure source array has required data if len(srca) < end: # If source array is shorter than required range, only process available data end = min(end, len(srca)) for i in range(start, end): try: # CRITICAL FIX: Bounds checking for source array a_val = srca[i] if i < len(srca) else 0.0 # Ensure value is numeric if a_val is None or (isinstance(a_val, float) and math.isnan(a_val)): a_val = 0.0 result = op(a_val) # Ensure result is valid if result is None or (isinstance(result, float) and math.isnan(result)): result = 0.0 dst[i] = result except Exception: # If operation fails, store 0.0 dst[i] = 0.0
[文档] def size(self): """Return the number of lines in this LineActions object""" if hasattr(self, "lines") and hasattr(self.lines, "size"): return self.lines.size() elif hasattr(self, "lines") and hasattr(self.lines, "__len__"): return len(self.lines) else: return 1 # Default to 1 line if no lines object available