#!/usr/bin/env python
"""Analyzer Module - Strategy performance analysis framework.
This module provides the base classes for analyzers that calculate and
report performance metrics for trading strategies. Analyzers can track
trades, returns, drawdowns, Sharpe ratios, and other statistics.
Key Classes:
Analyzer: Base class for all analyzers.
TimeFrameAnalyzerBase: Base for time-frame aware analyzers.
Analyzers receive notifications from the strategy during backtesting:
- notify_trade: Called when a trade is completed
- notify_order: Called when an order status changes
- notify_cashvalue: Called when cash/value changes
- notify_fund: Called when fund data changes
Example:
Creating a custom analyzer:
>>> class MyAnalyzer(Analyzer):
... def __init__(self):
... super().__init__()
... self.trades = 0
...
... def notify_trade(self, trade):
... if trade.isclosed:
... self.trades += 1
...
... def get_analysis(self):
... return {'trade_count': self.trades}
"""
import calendar
import datetime
import pprint as pp
from collections import OrderedDict
from .dataseries import TimeFrame
from .metabase import findowner
from .observer import Observer
from .parameters import ParameterizedBase
from .strategy import Strategy
from .utils.py3 import MAXINT
# Analyzer class - refactored to not use metaclass
[文档]
class Analyzer(ParameterizedBase):
"""Analyzer base class. All analyzers are subclass of this one
An Analyzer instance operates in the frame of a strategy and provides an
analysis for that strategy.
# Analyzer class, all analyzers are base classes of this class. An analyzer operates within the strategy framework and provides analysis of strategy execution
Automagically set member attributes:
- ``self.strategy`` (giving access to the *strategy* and anything
accessible from it)
# Access to strategy instance
- ``self.datas[x]`` giving access to the array of data feeds present in
the the system, which could also be accessed via the strategy reference
- ``self.data``, giving access to ``self.datas[0]``
- ``self.dataX`` -> ``self.datas[X]``
- ``self.dataX_Y`` -> ``self.datas[X].lines[Y]``
- ``self.dataX_name`` -> ``self.datas[X].name``
- ``self.data_name`` -> ``self.datas[0].name``
- ``self.data_Y`` -> ``self.datas[0].lines[Y]``
# Methods to access data
This is not a *Lines* object, but the methods and operation follow the same
design
- ``__init__`` during instantiation and initial setup
- ``start`` / ``stop`` to signal the begin and end of operations
- ``prenext`` / ``nextstart`` / ``next`` family of methods that follow
the calls made to the same methods in the strategy
- ``notify_trade`` / ``notify_order`` / ``notify_cashvalue`` /
``notify_fund`` which receive the same notifications as the equivalent
methods of the strategy
The mode of operation is open and no pattern is preferred. As such the
analysis can be generated with the ``next`` calls, at the end of operations
during ``stop`` and even with a single method like ``notify_trade``
The important thing is to override ``get_analysis`` to return a *dict-like*
object containing the results of the analysis (the actual format is
implementation dependent)
# Below are not line objects, but methods and operation design are similar to strategy. The most important thing is to override get_analysis,
# to return a dict-like object to store analysis results
"""
# Save results to csv
csv = True
[文档]
def __init__(self, *args, **kwargs):
"""
Initialize Analyzer with basic functionality.
Note: __new__ removed - _children initialization moved here.
"""
# Initialize children list (moved from __new__)
self._children = list()
# Initialize parent first
super().__init__(*args, **kwargs)
# findowner is used to find _obj's parent, Strategy instance, returns None if not found
self.strategy = strategy = findowner(self, Strategy)
# findowner is used to find _obj's parent, belonging to Analyzer instance, returns None if not found
self._parent = findowner(self, Analyzer)
# Register with a master observer if created inside one
# findowner is used to find _obj's parent, but belonging to Observer instance, returns None if not found
masterobs = findowner(self, Observer)
# If there is obs, register analyzer to obs
if masterobs is not None:
masterobs._register_analyzer(self)
# analyzer's data
self.datas = strategy.datas if strategy is not None else []
# For each data add aliases: for first data: data and data0
# If analyzer's data is not None
if self.datas:
# analyzer's data is the first data
self.data = data = self.datas[0]
# For each line in data
for line_index, line in enumerate(data.lines):
# Get line name
linealias = data._getlinealias(line_index)
# If line name is not None, set attribute
if linealias:
setattr(self, "data_%s" % linealias, line)
# Set line name based on index
setattr(self, "data_%d" % line_index, line)
# Loop through data, set different names for data, can be accessed via data_d
for d, data in enumerate(self.datas):
setattr(self, "data%d" % d, data)
# Set specific attribute names for different data, can access line via attribute name
for line_index, line in enumerate(data.lines):
linealias = data._getlinealias(line_index)
if linealias:
setattr(self, "data%d_%s" % (d, linealias), line)
setattr(self, "data%d_%d" % (d, line_index), line)
# Call create_analysis method
self.create_analysis()
# Handle parent registration (previously in dopostinit)
if self._parent is not None:
self._parent._register(self)
# When getting analyzer's length, actually returns strategy's length
[文档]
def __len__(self):
"""Support for invoking ``len`` on analyzers by actually returning the
current length of the strategy the analyzer operates on"""
return len(self.strategy)
# Add a child to self._children
def _register(self, child):
self._children.append(child)
# Call _prenext, for each child, call _prenext
def _prenext(self):
for child in self._children:
child._prenext()
# Call prenext
self.prenext()
# Notify cash and value
# PERFORMANCE OPTIMIZATION: Cache children check, called 3.1M+ times
def _notify_cashvalue(self, cash, value):
children = self._children
if children:
for child in children:
child._notify_cashvalue(cash, value)
self.notify_cashvalue(cash, value)
# Notify cash, value, fundvalue, shares
# PERFORMANCE OPTIMIZATION: Cache children check, called 3.1M+ times
def _notify_fund(self, cash, value, fundvalue, shares):
children = self._children
if children:
for child in children:
child._notify_fund(cash, value, fundvalue, shares)
self.notify_fund(cash, value, fundvalue, shares)
# Notify trade
def _notify_trade(self, trade):
for child in self._children:
child._notify_trade(trade)
self.notify_trade(trade)
# Notify order
def _notify_order(self, order):
for child in self._children:
child._notify_order(order)
self.notify_order(order)
# Call _nextstart
def _nextstart(self):
for child in self._children:
child._nextstart()
self.nextstart()
# Call _next
def _next(self):
for child in self._children:
child._next()
self.next()
# _start, call _start for all children
def _start(self):
for child in self._children:
child._start()
self.start()
# _stop, call _stop for all children
def _stop(self):
for child in self._children:
child._stop()
self.stop()
# Notify cash, value
[文档]
def notify_cashvalue(self, cash, value):
"""Notify the analyzer of cash and value changes.
Args:
cash: Current available cash.
value: Current portfolio value.
Note:
Override this method to react to cash/value changes.
"""
pass
# Notify fund
[文档]
def notify_fund(self, cash, value, fundvalue, shares):
"""Notify the analyzer of fund-related changes.
Args:
cash: Current available cash.
value: Current portfolio value.
fundvalue: Current fund value.
shares: Number of fund shares.
Note:
Override this method to react to fund changes.
"""
pass
# Notify order, can be overridden in subclasses
[文档]
def notify_order(self, order):
"""Notify the analyzer of an order status change.
Args:
order: The order that was updated.
Note:
Override this method to track order status.
"""
pass
# Notify trade, can be overridden in subclasses
[文档]
def notify_trade(self, trade):
"""Notify the analyzer of a trade status change.
Args:
trade: The trade that was updated.
Note:
Override this method to track trade status.
"""
pass
# next, can be overridden in subclasses
[文档]
def next(self):
"""Called on each bar after minimum period is reached.
Note:
Override this method to implement per-bar analysis logic.
"""
pass
# prenext, if equal to next, override prenext in subclasses,
# generally, prenext needs to do the same calculation as next or pass
[文档]
def prenext(self):
"""Called on each bar before minimum period is reached.
By default calls next(). Override if different behavior is needed.
"""
# prenext and next until a minimum period of total_lines has been
# reached
# By default call next, unless prenext is specially overridden in subclass, otherwise prenext calls next
self.next()
# nextstart, generally overridden by subclasses, or call next
[文档]
def nextstart(self):
"""Called once when minimum period is first reached.
By default calls next(). Override if different behavior is needed.
"""
# Called once when the minimum period for all lines has been meet
# It's default behavior is to call next
# By default call next
self.next()
# start, can be overridden in subclasses
[文档]
def start(self):
"""Called at the start of the backtest.
Note:
Override this method to initialize analyzer state.
"""
pass
# stop, can be overridden in subclasses
[文档]
def stop(self):
"""Called at the end of the backtest.
Note:
Override this method to perform final calculations.
"""
pass
# Create analysis, override in subclasses
[文档]
def create_analysis(self):
"""Create the analysis results container.
Creates the rets OrderedDict that will hold analysis results.
Override this method to customize the results structure.
"""
# create a dict placeholder for the analysis
# Create a dict placeholder for analysis results
# self.rets can be accessed via get_analysis
self.rets = OrderedDict()
# Get analysis
[文档]
def get_analysis(self):
"""Returns a *dict-like* object with the results of the analysis
The keys and format of analysis results in the dictionary is
implementation dependent.
It is not even enforced that the result is a *dict-like object*, just
the convention
The default implementation returns the default OrderedDict ``rets``
created by the default ``create_analysis`` method
# Return dict-like result analysis, specific format depends on implementation
"""
return self.rets
# Print analysis
[文档]
def print(self, *args, **kwargs):
"""Prints the results returned by ``get_analysis`` via a standard
``print`` call"""
# print analysis, print analysis results by calling, this content can be accessed via get_analysis
print(self.get_analysis())
# Pretty print analysis
[文档]
def pprint(self, *args, **kwargs):
"""Prints the results returned by ``get_analysis`` via a pretty print
call"""
# pretty print analysis, similar to above
pp.pprint(self.get_analysis(), *args, **kwargs)
# TimeFrameAnalyzerBase class - refactored to not use metaclass
[文档]
class TimeFrameAnalyzerBase(Analyzer):
"""Base class for time-frame aware analyzers.
This analyzer base operates on a specific timeframe (daily, weekly,
monthly, etc.) and calls on_dt_over() when the timeframe changes.
Params:
timeframe: TimeFrame to use (None = use data's timeframe).
compression: Compression factor (None = use data's compression).
_doprenext: Whether to call prenext (default: True).
Methods:
on_dt_over(): Override to handle timeframe changes.
"""
# Parameters
params = (
("timeframe", None),
("compression", None),
("_doprenext", True),
)
[文档]
def __init__(self, *args, **kwargs):
"""Initialize with functionality previously in MetaTimeFrameAnalyzerBase"""
super().__init__(*args, **kwargs)
# Hack to support original method name - add on_dt_over_orig if on_dt_over_orig exists
if hasattr(self, "on_dt_over_orig") and not hasattr(self, "on_dt_over"):
self.on_dt_over = self.on_dt_over_orig
def _start(self):
# Override to add specific attributes
# Set trading period, e.g., minutes
# Set trading period - use data's timeframe if not specified
self.timeframe = self.p.timeframe or self.data._timeframe
# Set compression - use data's compression if not specified
self.compression = self.p.compression or self.data._compression
# CRITICAL FIX: Initialize dtcmp with datetime.min to match master branch behavior
# This ensures first _dt_over() call detects a change and counts correctly
self.dtcmp, self.dtkey = self._get_dt_cmpkey(datetime.datetime.min)
super()._start()
def _prenext(self):
# Match master branch: call children, check _dt_over, then prenext
for child in self._children:
child._prenext()
if self._dt_over():
self.on_dt_over()
if self.p._doprenext:
self.prenext()
def _nextstart(self):
# Match master branch: call children, check _dt_over or not doprenext, then nextstart
for child in self._children:
child._nextstart()
if self._dt_over() or not self.p._doprenext:
self.on_dt_over()
self.nextstart()
def _next(self):
# Match master branch: call children, check _dt_over, then next
for child in self._children:
child._next()
if self._dt_over():
self.on_dt_over()
self.next()
# This method generally needs to be overridden in subclasses
[文档]
def on_dt_over(self):
"""Called when the timeframe period changes.
This method is called when the datetime crosses into a new
period of the configured timeframe (e.g., new week, new month).
Note:
Override this method to implement period-based analysis logic.
"""
pass
# CRITICAL FIX: Match master branch - return boolean and update dtcmp atomically
# PERFORMANCE OPTIMIZATION: Cache attribute access, called 1.4M+ times
def _dt_over(self):
# If trading period equals NoTimeFrame, dtcmp equals maximum integer, dtkey equals maximum time
tf = self.timeframe
if tf == TimeFrame.NoTimeFrame:
dtcmp, dtkey = MAXINT, datetime.datetime.max
else:
# Get current datetime from strategy
dt = self.strategy.datetime.datetime()
dtcmp, dtkey = self._get_dt_cmpkey(dt, tf)
# If dtcmp is None, or dtcmp is greater than self.dtcmp
cur_dtcmp = self.dtcmp
if cur_dtcmp is None or dtcmp > cur_dtcmp:
# Set dtkey, dtkey1, dtcmp, dtcmp1 return True
self.dtkey, self.dtkey1 = dtkey, self.dtkey
self.dtcmp, self.dtcmp1 = dtcmp, cur_dtcmp
return True
# Return False
return False
# Get dtcmp, dtkey
# PERFORMANCE OPTIMIZATION: Accept tf parameter to avoid repeated attribute access
def _get_dt_cmpkey(self, dt, tf=None):
# If current trading period has NoTimeFrame, return two Nones
if tf is None:
tf = self.timeframe
if tf == TimeFrame.NoTimeFrame:
return None, None
# If current trading period is years
if tf == TimeFrame.Years:
dtcmp = dt.year
dtkey = datetime.date(dt.year, 12, 31)
# If trading period is months
elif tf == TimeFrame.Months:
dtcmp = dt.year * 100 + dt.month
# Get last day
_, lastday = calendar.monthrange(dt.year, dt.month)
# Get last day of each month
dtkey = datetime.datetime(dt.year, dt.month, lastday)
# If trading period is weeks
elif tf == TimeFrame.Weeks:
# Return year, week number and weekday for date
isoyear, isoweek, isoweekday = dt.isocalendar()
dtcmp = isoyear * 1000 + isoweek
# Weekend
sunday = dt + datetime.timedelta(days=7 - isoweekday)
# Get last day of each week
dtkey = datetime.datetime(sunday.year, sunday.month, sunday.day)
# If trading period is days, calculate specific dtcmp, dtkey
elif tf == TimeFrame.Days:
dtcmp = dt.year * 10000 + dt.month * 100 + dt.day
dtkey = datetime.datetime(dt.year, dt.month, dt.day)
# If trading period is less than days, call _get_subday_cmpkey to get
else:
dtcmp, dtkey = self._get_subday_cmpkey(dt)
return dtcmp, dtkey
# If trading period is less than days
def _get_subday_cmpkey(self, dt):
# Calculate intraday position
# Calculate current number of minutes
point = dt.hour * 60 + dt.minute
# If current trading period is less than minutes, convert point to seconds
if self.timeframe < TimeFrame.Minutes:
point = point * 60 + dt.second
# If current trading period is less than seconds, convert point to microseconds
if self.timeframe < TimeFrame.Seconds:
point = point * 1e6 + dt.microsecond
# Apply compression to update point position (comp 5 -> 200 // 5)
# Calculate current point based on number of periods
point = point // self.compression
# Move to next boundary
# Move to next
point += 1
# Restore point to the timeframe units by de-applying compression
# Calculate end position of next point
point *= self.compression
# Get hours, minutes, seconds and microseconds
# If trading period equals minutes, get ph, pm
if self.timeframe == TimeFrame.Minutes:
ph, pm = divmod(point, 60)
ps = 0
pus = 0
# If trading period equals seconds, get ph, pm, ps
elif self.timeframe == TimeFrame.Seconds:
ph, pm = divmod(point, 60 * 60)
pm, ps = divmod(pm, 60)
pus = 0
# If microseconds, get ph, pm, ps, pus
elif self.timeframe == TimeFrame.MicroSeconds:
ph, pm = divmod(point, 60 * 60 * 1e6)
pm, psec = divmod(pm, 60 * 1e6)
ps, pus = divmod(psec, 1e6)
# Whether it's the next day
extradays = 0
# If hour is greater than 23, divide, calculate if it's the next day
if ph > 23: # went over midnight:
extradays = ph // 24
ph %= 24
# moving 1 minor unit to the left to be in the boundary
# Time to adjust
tadjust = datetime.timedelta(
minutes=self.timeframe == TimeFrame.Minutes,
seconds=self.timeframe == TimeFrame.Seconds,
microseconds=self.timeframe == TimeFrame.MicroSeconds,
)
# Add extra day if present
# If next day is True, adjust time to next day
if extradays:
dt += datetime.timedelta(days=extradays)
# Replace intraday parts with the calculated ones and update it
# Calculate dtcmp
dtcmp = dt.replace(hour=int(ph), minute=int(pm), second=int(ps), microsecond=int(pus))
# Adjust dtcmp
dtcmp -= tadjust
# dtkey equals dtcmp
dtkey = dtcmp
return dtcmp, dtkey