#!/usr/bin/env python
"""Cerebro - The main engine of the Backtrader framework.
This module contains the Cerebro class, which is the central orchestrator for
backtesting and live trading operations. Cerebro manages data feeds, strategies,
brokers, analyzers, observers, and all other components of the trading system.
Key Features:
- Data feed management and synchronization
- Strategy instantiation and execution
- Broker integration for order execution
- Multi-core optimization support
- Live trading and backtesting modes
- Plotting and analysis capabilities
Example:
Basic backtest setup::
import backtrader as bt
cerebro = bt.Cerebro()
data = bt.feeds.GenericCSVData(dataname='data.csv')
cerebro.adddata(data)
cerebro.addstrategy(MyStrategy)
cerebro.broker.setcash(100000)
results = cerebro.run()
cerebro.plot()
Classes:
OptReturn: Lightweight result object for optimization runs.
Cerebro: Main backtesting/trading engine.
"""
import collections
import datetime
import itertools
import multiprocessing
import traceback
from datetime import timezone
try: # For new Python versions
collectionsAbc = collections.abc # collections.Iterable -> collections.abc.Iterable
except AttributeError: # For old Python versions
collectionsAbc = collections # collections.Iterable
from . import errors, feeds, indicator, linebuffer, observers
from .brokers import BackBroker
from .dataseries import TimeFrame
from .metabase import OwnerContext
from .parameters import ParameterDescriptor, ParameterizedBase
from .strategy import SignalStrategy, Strategy
from .timer import Timer
from .tradingcal import PandasMarketCalendar, TradingCalendarBase
from .utils import OrderedDict, date2num, num2date, tzparse
from .utils.py3 import integer_types, map, range, string_types, zip
from .writer import WriterFile
# Python 3.11+ has datetime.UTC, earlier versions use timezone.utc
UTC = timezone.utc
[文档]
class OptReturn:
"""Lightweight result container for optimization runs.
This class is defined at module level to make it picklable for
multiprocessing. It stores only essential information from strategy
runs during optimization to reduce memory usage.
Attributes:
p: Alias for params.
params: Strategy parameters used in this optimization run.
analyzers: Analyzer results (if returned during optimization).
Note:
Additional attributes may be set dynamically via kwargs.
"""
[文档]
def __init__(self, params, **kwargs):
"""Initialize the OptReturn object.
Args:
params: Strategy parameters used in this optimization run.
**kwargs: Additional keyword arguments to set as attributes.
"""
self.p = self.params = params
for k, v in kwargs.items():
setattr(self, k, v)
[文档]
class Cerebro(ParameterizedBase):
"""Params:
- ``preload`` (default: ``True``)
Whether to preload the different ``data feeds`` passed to cerebro for
the Strategies
Note: When True (default), data is loaded into memory before backtesting,
which uses more memory but significantly improves execution speed.
- ``runonce`` (default: ``True``)
Run `Indicators` in vectorized mode to speed up the entire system.
Strategies and Observers will always be run on an event-based basis
Note: When True, indicators are calculated using vectorized operations
for better performance. Strategies and observers still run event-by-event.
- ``live`` (default: ``False``)
If no data has reported itself as *live* (via the data's ``islive``
method but the end user still wants to run in ``live`` mode, this
parameter can be set to true
This will simultaneously deactivate ``preload`` and ``runonce``. It
will have no effect on memory saving schemes.
Note: Setting to True forces live mode behavior, disabling preload and
runonce optimizations, which slows down backtesting.
- ``maxcpus`` (default: None -> all available cores)
How many cores to use simultaneously for optimization
Note: Set to number of CPU cores minus 1 to avoid system overload.
Use None (default) to use all available cores.
- ``stdstats`` (default: ``True``)
If True, default Observers will be added: Broker (Cash and Value),
Trades and BuySell
Note: These observers are used for plotting. Set to False if not needed.
- ``oldbuysell`` (default: ``False``)
If ``stdstats`` is ``True`` and observers are getting automatically
added, this switch controls the main behavior of the ``BuySell``
observer
- ``False``: use the modern behavior in which the buy / sell signals
are plotted below / above the low / high prices respectively to avoid
cluttering the plot
- ``True``: use the deprecated behavior in which the buy / sell signals
are plotted where the average price of the order executions for the
given moment in time is. This will, of course, be on top of an OHLC bar
or on a Line on Cloe bar, difficult the recognition of the plot.
Note: False (modern) plots signals outside the price bars for clarity.
True (old) plots signals at execution price, overlapping with bars.
- ``oldtrades`` (default: ``False``)
If ``stdstats`` is ``True`` and observers are getting automatically
added, this switch controls the main behavior of the ``Trades``
observer
- ``False``: use the modern behavior in which trades for all datas are
plotted with different markers
- ``True``: use the old Trades observer which plots the trades with the
same markers, differentiating only if they are positive or negative
Note: False uses different markers for different trades.
True uses same markers, only distinguishing positive/negative.
- ``exactbars`` (default: ``False``)
With the default value, each and every value stored in a line is kept in
memory
Possible values:
- ``True`` or ``1``: all "lines" objects reduce memory usage to the
automatically calculated minimum period.
If a Simple Moving Average has a period of 30, the underlying data
will have always a running buffer of 30 bars to allow the
calculation of the Simple Moving Average
- This setting will deactivate ``preload`` and ``runonce``
- Using this setting also deactivates **plotting**
- ``-1``: datafeeds and indicators/operations at strategy level will
keep all data in memory.
For example: a ``RSI`` internally uses the indicator ``UpDay`` to
make calculations. This subindicator will not keep all data in
memory
- This allows keeping ``plotting`` and ``preloading`` active.
- ``runonce`` will be deactivated
- ``-2``: data feeds and indicators kept as attributes of the
strategy will keep all points in memory.
For example: a ``RSI`` internally uses the indicator ``UpDay`` to
make calculations. This subindicator will not keep all data in
memory
If in the ``__init__`` something like
``a = self.data.close - self.data.high`` is defined, then ``a``
will not keep all data in memory
- This allows keeping ``plotting`` and ``preloading`` active.
- ``runonce`` will be deactivated
Note on exactbars values:
- True/1: Minimum memory, disables preload/runonce/plotting
- -1: Keeps data/indicators but not sub-indicator internals, disables runonce
- -2: Keeps strategy-level data/indicators, sub-indicators not using self are discarded
- ``objcache`` (default: ``False``)
Experimental option to implement a cache of lines objects and reduce
the amount of them. Example from UltimateOscillator:
bp = self.data.close - TrueLow(self.data)
tr = TrueRange(self.data) # -> creates another TrueLow(self.data)
If this is `True`, the second ``TrueLow(self.data)`` inside ``TrueRange``
matches the signature of the one in the ``bp`` calculation. It will be
reused.
Corner cases may happen in which this drives a line object off its
minimum period and breaks things, and it is therefore disabled.
Note: When True, identical indicator calculations are cached and reused
to reduce computation. Disabled by default due to edge cases.
- ``writer`` (default: ``False``)
If set to ``True`` a default WriterFile will be created which will
print to stdout. It will be added to the strategy (in addition to any
other writers added by the user code)
Note: Outputs trading information to stdout. Custom logging in strategy
is usually preferred for more control.
- ``tradehistory`` (default: ``False``)
If set to ``True``, it will activate update event logging in each trade
for all strategies. This can also be achieved on a per-strategy
basis with the strategy method ``set_tradehistory``
Note: Enables trade update logging for all strategies. Can also be
enabled per-strategy using set_tradehistory method.
- ``optdatas`` (default: ``True``)
If ``True`` and optimizing (and the system can ``preload`` and use
``runonce``, data preloading will be done only once in the main process
to save time and resources.
The tests show an approximate ``20%`` speed-up moving from a sample
execution in ``83`` seconds to ``66``
Note: When True with preload/runonce, data is preloaded once in the
main process and shared across optimization workers (~20% speedup).
- ``optreturn`` (default: ``True``)
If `True`, the optimization results will not be full ``Strategy``
objects (and all *datas*, *indicators*, *observers* ...) but object
with the following attributes (same as in ``Strategy``):
- ``params`` (or ``p``) the strategy had for the execution
- ``analyzers`` the strategy has executed
On most occasions, only the *analyzers* and with which *params* are
the things needed to evaluate the performance of a strategy. If
detailed analysis of the generated values for (for example)
*indicators* is needed, turn this off
The tests show a 13% - 15% improvement in execution time. Combined
with `optdatas` the total gain increases to a total speed-up of
`32%` in an optimization run.
Note: Returns only params and analyzers during optimization, discarding
data/indicators/observers for ~15% speedup (32% combined with optdatas).
- ``oldsync`` (default: ``False``)
Starting with release 1.9.0.99, the synchronization of multiple datas
(same or different timeframes) has been changed to allow datas of
different lengths.
If the old behavior with data0 as the master of the system is wished,
set this parameter to true
Note: False allows data feeds of different lengths.
True uses data0 as master (legacy behavior).
- ``tz`` (default: ``None``)
Adds a global timezone for strategies. The argument ``tz`` can be
- ``None``: in this case the datetime displayed by strategies will be
in UTC, which has always been the standard behavior
- ``pytz`` instance. It will be used as such to convert UTC times to
the chosen timezone
- ``string``. Instantiating a ``pytz`` instance will be attempted.
- ``integer``. Use, for the strategy, the same timezone as the
corresponding ``data`` in the ``self.datas`` iterable (``0`` would
use the timezone from ``data0``)
Note: None=UTC, pytz instance converts from UTC, string creates pytz,
integer uses timezone from corresponding data feed index.
- ``cheat_on_open`` (default: ``False``)
The ``next_open`` method of strategies will be called. This happens
before ``next`` and before the broker has had a chance to evaluate
orders. The indicators have not yet been recalculated. This allows
issuing an order which takes into account the indicators of the previous
day but uses the ``open`` price for stake calculations
For cheat_on_open order execution, it is also necessary to make the
call ``cerebro.broker.set_coo(True)`` or instantiate a broker with
``BackBroker(coo=True)`` (where *coo* stands for cheat-on-open) or set
the ``broker_coo`` parameter to ``True``. Cerebro will do it
automatically unless disabled below.
Note: Enables using next bar's open price for position sizing.
Useful for precise capital allocation. Requires broker_coo=True.
- ``broker_coo`` (default: ``True``)
This will automatically invoke the ``set_coo`` method of the broker
with ``True`` to activate ``cheat_on_open`` execution. Will only do it
if ``cheat_on_open`` is also ``True``
Note: Works together with cheat_on_open parameter.
- ``quicknotify`` (default: ``False``)
Broker notifications are delivered right before the delivery of the
*next* prices. For backtesting, this has no implications, but with live
brokers, a notification can take place long before the bar is
delivered. When set to ``True`` notifications will be delivered as soon
as possible (see ``qcheck`` in live feeds)
Set to ``False`` for compatibility. May be changed to ``True``
Note: False delays notifications until next bar. True sends immediately.
Mainly relevant for live trading.
"""
# Parameter descriptors using new system
preload = ParameterDescriptor(
default=True, type_=bool, doc="Whether to preload the different data feeds"
)
runonce = ParameterDescriptor(default=True, type_=bool, doc="Run Indicators in vectorized mode")
maxcpus = ParameterDescriptor(default=None, doc="How many cores to use for optimization")
stdstats = ParameterDescriptor(default=True, type_=bool, doc="Add default Observers")
oldbuysell = ParameterDescriptor(
default=False, type_=bool, doc="Use old BuySell observer behavior"
)
oldtrades = ParameterDescriptor(
default=False, type_=bool, doc="Use old Trades observer behavior"
)
lookahead = ParameterDescriptor(default=0, type_=int, doc="Lookahead parameter")
exactbars = ParameterDescriptor(default=False, doc="Memory usage control for lines objects")
optdatas = ParameterDescriptor(
default=True, type_=bool, doc="Optimize data preloading during optimization"
)
optreturn = ParameterDescriptor(
default=True, type_=bool, doc="Return simplified objects during optimization"
)
objcache = ParameterDescriptor(
default=False, type_=bool, doc="Cache lines objects to reduce memory"
)
live = ParameterDescriptor(default=False, type_=bool, doc="Run in live mode")
writer = ParameterDescriptor(default=False, type_=bool, doc="Add a default WriterFile")
tradehistory = ParameterDescriptor(
default=False, type_=bool, doc="Activate trade history logging"
)
oldsync = ParameterDescriptor(default=False, type_=bool, doc="Use old synchronization behavior")
tz = ParameterDescriptor(default=None, doc="Global timezone for strategies")
cheat_on_open = ParameterDescriptor(
default=False, type_=bool, doc="Enable cheat-on-open execution"
)
broker_coo = ParameterDescriptor(
default=True, type_=bool, doc="Auto-activate broker cheat-on-open"
)
quicknotify = ParameterDescriptor(
default=False, type_=bool, doc="Deliver broker notifications quickly"
)
[文档]
def __init__(self, **kwargs):
"""Initialize Cerebro with optional parameter overrides.
Args:
**kwargs: Parameter overrides (preload, runonce, maxcpus, etc.)
"""
super().__init__(**kwargs)
# Internal state flags
self._timerscheat = None
self._timers = None
self.runningstrats = None
self.runstrats = None
self.writers_csv = None
self.runwriters = None
self._dopreload = None
self._dorunonce = None
self._exactbars = None
self._event_stop = None
self._dolive = False # Live trading mode flag
self._doreplay = False # Data replay mode flag
self._dooptimize = False # Optimization mode flag
# Component containers
self.stores = list() # Data stores
self.feeds = list() # Data feeds
self.datas = list() # Data objects
self.datasbyname = collections.OrderedDict() # Data lookup by name
self.strats = list() # Strategy classes/instances
self.optcbs = list() # Optimization callbacks
self.observers = list() # Observer classes
self.analyzers = list() # Analyzer classes
self.indicators = list() # Indicator classes
self.sizers = dict() # Position sizers
self.writers = list() # Output writers
self.storecbs = list() # Store callbacks
self.datacbs = list() # Data callbacks
self.signals = list() # Signal definitions
# Signal strategy configuration
self._signal_strat = (None, None, None)
self._signal_concurrent = False # Allow concurrent signals
self._signal_accumulate = False # Allow accumulating positions
# Internal counters and references
self._dataid = itertools.count(1) # Data ID counter
self._broker = BackBroker() # Default broker
self._broker.cerebro = self # Back-reference to cerebro
self._tradingcal = None # Trading calendar
self._pretimers = list() # Pre-run timers
self._ohistory = list() # Order history
self._fhistory = None # Fund history
# Override parameters from kwargs
pkeys = self.params._getkeys()
for key, val in kwargs.items():
if key in pkeys:
setattr(self.params, key, val)
[文档]
@staticmethod
def iterize(iterable):
"""Convert each element in iterable to be iterable itself.
Args:
iterable: Input iterable whose elements may not be iterable.
Returns:
list: New list where each element is guaranteed to be iterable.
"""
niterable = list()
for elem in iterable:
if isinstance(elem, string_types):
elem = (elem,)
# elif not isinstance(elem, collections.Iterable):
elif not isinstance(
elem, collectionsAbc.Iterable
): # Different functions will be called for different Python versions
elem = (elem,)
niterable.append(elem)
return niterable
[文档]
def set_fund_history(self, fund):
"""
Add a history of orders to be directly executed in the broker for
performance evaluation
- ``fund``: is an iterable (ex: list, tuple, iterator, generator)
in which each element will be also iterable (with length) with
the following sub-elements (two formats are possible)
``[datetime, share_value, net asset value]``
**Note**: it must be sorted (or produce sorted elements) by
datetime ascending
where:
- ``datetime`` is a python ``date/datetime`` instance or a string
with format YYYY-MM-DD[THH:MM:SS[.us]] where the elements in
brackets are optional
- ``share_value`` is a float/integer
- ``net_asset_value`` is a float/integer
"""
self._fhistory = fund
[文档]
def add_order_history(self, orders, notify=True):
"""
Add a history of orders to be directly executed in the broker for
performance evaluation
- ``orders``: is an iterable (ex: list, tuple, iterator, generator)
in which each element will be also iterable (with length) with
the following sub-elements (two formats are possible)
``[datetime, size, price]`` or ``[datetime, size, price, data]``
**Note**: it must be sorted (or produce sorted elements) by
datetime ascending
where:
- ``datetime`` is a python ``date/datetime`` instance or a string
with format YYYY-MM-DD[THH:MM:SS[.us]] where the elements in
brackets are optional
- ``size`` is an integer (positive to *buy*, negative to *sell*)
- ``price`` is a float/integer
- ``data`` if present can take any of the following values
- *None* - The 1st data feed will be used as target
- *integer* - The data with that index (insertion order in
**Cerebro**) will be used
- *string* - a data with that name, assigned for example with
``cerebro.addata(data, name=value)``, will be the target
- ``notify`` (default: *True*)
If ``True``, the first strategy inserted in the system will be
notified of the artificial orders created following the information
from each order in ``orders``
**Note**: Implicit in the description is the need to add a data feed
which is the target of the orders.This is, for example, needed by
analyzers which track, for example, the returns
"""
self._ohistory.append((orders, notify))
[文档]
def notify_timer(self, timer, when, *args, **kwargs):
"""Receives a timer notification where ``timer`` is the timer that was
returned by ``add_timer``, and ``when`` is the calling time. ``args``
and ``kwargs`` are any additional arguments passed to ``add_timer``
The actual `when` time can be later, but the system may have not been
able to call the timer before. This value is the timer value and no the
system time.
"""
pass
def _add_timer(
self,
owner,
when,
offset=datetime.timedelta(),
repeat=datetime.timedelta(),
weekdays=[],
weekcarry=False,
monthdays=[],
monthcarry=True,
allow=None,
tzdata=None,
strats=False,
cheat=False,
*args,
**kwargs,
):
"""Internal method to really create the timer (not started yet) which
can be called by cerebro instances or other objects which can access
cerebro"""
timer = Timer(
tid=len(self._pretimers),
owner=owner,
strats=strats,
when=when,
offset=offset,
repeat=repeat,
weekdays=weekdays,
weekcarry=weekcarry,
monthdays=monthdays,
monthcarry=monthcarry,
allow=allow,
tzdata=tzdata,
cheat=cheat,
*args,
**kwargs,
)
self._pretimers.append(timer)
return timer
[文档]
def add_timer(
self,
when,
offset=datetime.timedelta(),
repeat=datetime.timedelta(),
weekdays=[],
weekcarry=False,
monthdays=[],
monthcarry=True,
allow=None,
tzdata=None,
strats=False,
cheat=False,
*args,
**kwargs,
):
"""
Schedules a timer to invoke ``notify_timer``
Arguments:
- ``when``: can be
- ``datetime.time`` instance (see below ``tzdata``)
- ``bt.timer.SESSION_START`` to reference a session start
- ``bt.timer.SESSION_END`` to reference a session end
- ``offset`` which must be a ``datetime.timedelta`` instance
Used to offset the value ``when``. It has a meaningful use in
combination with ``SESSION_START`` and ``SESSION_END``, to indicate
things like a timer being called ``15 minutes`` after the session
starts.
- ``repeat`` which must be a ``datetime.timedelta`` instance
Indicates if after a first call, further calls will be scheduled
within the same session at the scheduled `repeat` delta
Once the timer goes over the end of the session, it is reset to the
original value for ``when``
- ``weekdays``: a **sorted** iterable with integers indicating on
which days (iso codes, Monday is 1, Sunday is 7) the timers can
be actually invoked
If not specified, the timer will be active on all days
- ``weekcarry`` (default: ``False``). If ``True`` and the weekday was
not seen (ex: trading holiday), the timer will be executed on the
next day (even if in a new week)
- ``monthdays``: a **sorted** iterable with integers indicating on
which days of the month a timer has to be executed. For example,
always on day *15* of the month
If not specified, the timer will be active on all days
- ``monthcarry`` (default: ``True``). If the day was not seen
(weekend, trading holiday), the timer will be executed on the next
available day.
- ``allow`` (default: ``None``). A callback which receives a
`datetime.date`` instance and returns ``True`` if the date is
allowed for timers or else returns ``False``
- ``tzdata`` which can be either ``None`` (default), a ``pytz``
instance or a ``data feed`` instance.
``None``: ``when`` is interpreted at face value (which translates
to handling it as if it is UTC even if it's not)
``pytz`` instance: ``when`` will be interpreted as being specified
in the local time specified by the timezone instance.
``data feed`` instance: ``when`` will be interpreted as being
specified in the local time specified by the ``tz`` parameter of
the data feed instance.
**Note**: If ``when`` is either ``SESSION_START`` or
``SESSION_END`` and ``tzdata`` is ``None``, the first *data feed*
in the system (aka ``self.data0``) will be used as the reference
to find out the session times.
- ``strats`` (default: ``False``) call also the ``notify_timer`` of strategies
- ``cheat`` (default ``False``) if ``True`` the timer will be called
before the broker has a chance to evaluate the orders. This opens
the chance to issue orders based on opening price, for example, right
before the session starts
- ``*args``: any extra args will be passed to ``notify_timer``
- ``**kwargs``: any extra kwargs will be passed to ``notify_timer``
Return Value:
- The created timer
"""
return self._add_timer(
owner=self,
when=when,
offset=offset,
repeat=repeat,
weekdays=weekdays,
weekcarry=weekcarry,
monthdays=monthdays,
monthcarry=monthcarry,
allow=allow,
tzdata=tzdata,
strats=strats,
cheat=cheat,
*args,
**kwargs,
)
[文档]
def addtz(self, tz):
"""This can also be done with the parameter ``tz``
Adds a global timezone for strategies. The argument ``tz`` can be
- ``None``: in this case the datetime displayed by strategies will be
in UTC, which has always been the standard behavior
- ``pytz`` instance. It will be used as such to convert UTC times to
the chosen timezone
- ``string``. Instantiating a ``pytz`` instance will be attempted.
- ``integer``. Use, for the strategy, the same timezone as the
corresponding ``data`` in the ``self.datas`` iterable (``0`` would
use the timezone from ``data0``)
"""
self.p.tz = tz
[文档]
def addcalendar(self, cal):
"""Adds a global trading calendar to the system. Individual data feeds
may have separate calendars which override the global one
``cal`` can be an instance of ``TradingCalendar`` a string or an
instance of ``pandas_market_calendars``. A string will be
instantiated as a ``PandasMarketCalendar`` (which needs the module
``pandas_market_calendar`` installed in the system).
If a subclass of `TradingCalendarBase` is passed (not an instance), it
will be instantiated
"""
# Handle string or pandas calendar with valid_days attribute
if isinstance(cal, string_types):
cal = PandasMarketCalendar(calendar=cal)
elif hasattr(cal, "valid_days"):
cal = PandasMarketCalendar(calendar=cal)
# Handle TradingCalendarBase subclass or instance
else:
try:
if issubclass(cal, TradingCalendarBase):
cal = cal()
except TypeError: # already an instance
pass
self._tradingcal = cal
[文档]
def add_signal(self, sigtype, sigcls, *sigargs, **sigkwargs):
"""Add a signal to be used with SignalStrategy."""
self.signals.append((sigtype, sigcls, sigargs, sigkwargs))
[文档]
def signal_strategy(self, stratcls, *args, **kwargs):
"""Set a SignalStrategy subclass to receive signals."""
self._signal_strat = (stratcls, args, kwargs)
[文档]
def signal_concurrent(self, onoff):
"""Allow concurrent orders when signals are pending."""
self._signal_concurrent = onoff
[文档]
def signal_accumulate(self, onoff):
"""If signals are added to the system and the `accumulate` value is
set to True, entering the market when already in the market, will be
allowed to increase a position"""
self._signal_accumulate = onoff
[文档]
def addstore(self, store):
"""Add a Store instance to the system."""
if store not in self.stores:
self.stores.append(store)
[文档]
def addwriter(self, wrtcls, *args, **kwargs):
"""Adds an ``Writer`` class to the mix. Instantiation will be done at
``run`` time in cerebro"""
self.writers.append((wrtcls, args, kwargs))
[文档]
def addsizer(self, sizercls, *args, **kwargs):
"""Adds a ``Sizer`` class (and args) which is the default sizer for any
strategy added to cerebro
"""
self.sizers[None] = (sizercls, args, kwargs)
[文档]
def addsizer_byidx(self, idx, sizercls, *args, **kwargs):
"""Adds a ``Sizer`` class by idx. This idx is a reference compatible to
the one returned by ``addstrategy``. Only the strategy referenced by
``idx`` will receive this size
"""
self.sizers[idx] = (sizercls, args, kwargs)
[文档]
def addindicator(self, indcls, *args, **kwargs):
"""Add an Indicator class to be instantiated at run time."""
self.indicators.append((indcls, args, kwargs))
[文档]
def addanalyzer(self, ancls, *args, **kwargs):
"""Add an Analyzer class to be instantiated at run time."""
self.analyzers.append((ancls, args, kwargs))
[文档]
def addobserver(self, obscls, *args, **kwargs):
"""
Adds an ``Observer`` class to the mix. Instantiation will be done at
``run`` time
"""
self.observers.append((False, obscls, args, kwargs))
[文档]
def addobservermulti(self, obscls, *args, **kwargs):
"""
It will be added once per "data" in the system. A use case is a
buy/sell observer that observes individual data.
A counter-example is the CashValue, which observes system-wide values
"""
self.observers.append((True, obscls, args, kwargs))
[文档]
def addstorecb(self, callback):
"""Adds a callback to get messages which would be handled by the
notify_store method
The signature of the callback must support the following:
- callback(msg, *args, *kwargs)
The actual ``msg``, ``*args`` and ``**kwargs`` received are
implementation defined (depend entirely on the *data/broker/store*) but
in general one should expect them to be *printable* to allow for
reception and experimentation.
"""
self.storecbs.append(callback)
def _notify_store(self, msg, *args, **kwargs):
"""Internal method to dispatch store notifications."""
for callback in self.storecbs:
callback(msg, *args, **kwargs)
self.notify_store(msg, *args, **kwargs)
[文档]
def notify_store(self, msg, *args, **kwargs):
"""Receive store notifications in cerebro
This method can be overridden in ``Cerebro`` subclasses
The actual ``msg``, ``*args`` and ``**kwargs`` received are
implementation defined (depend entirely on the *data/broker/store*) but
in general one should expect them to be *printable* to allow for
reception and experimentation.
"""
pass
def _storenotify(self):
"""Process and dispatch store notifications to strategies."""
for store in self.stores:
for notif in store.get_notifications():
msg, args, kwargs = notif
self._notify_store(msg, *args, **kwargs)
for strat in self.runningstrats:
strat.notify_store(msg, *args, **kwargs)
[文档]
def adddatacb(self, callback):
"""Adds a callback to get messages which would be handled by the
notify_data method
The signature of the callback must support the following:
- callback(data, status, *args, *kwargs)
The actual ``*args`` and ``**kwargs`` received are implementation
defined (depend entirely on the *data/broker/store*), but in general one
should expect them to be *printable* to allow for reception and
experimentation.
"""
self.datacbs.append(callback)
def _datanotify(self):
"""Process and dispatch data notifications to strategies."""
for data in self.datas:
for notif in data.get_notifications():
status, args, kwargs = notif
self._notify_data(data, status, *args, **kwargs)
for strat in self.runningstrats:
strat.notify_data(data, status, *args, **kwargs)
def _notify_data(self, data, status, *args, **kwargs):
"""Internal method to dispatch data notifications."""
for callback in self.datacbs:
callback(data, status, *args, **kwargs)
self.notify_data(data, status, *args, **kwargs)
[文档]
def notify_data(self, data, status, *args, **kwargs):
"""Receive data notifications in cerebro
This method can be overridden in ``Cerebro`` subclasses
The actual ``*args`` and ``**kwargs`` received are
implementation defined (depend entirely on the *data/broker/store*), but
in general one should expect them to be *printable* to allow for
reception and experimentation.
"""
pass
[文档]
def adddata(self, data, name=None):
"""
Adds a ``Data Feed`` instance to the mix.
If ``name`` is not None, it will be put into ``data._name`` which is
meant for decoration/plotting purposes.
"""
# Set data name if provided
if name is not None:
data._name = name
data.name = name
# Assign unique ID to each data feed
data._id = next(self._dataid)
# Set data's environment to this cerebro
data.setenvironment(self)
# Add to data list
self.datas.append(data)
# Store in name lookup dictionary
self.datasbyname[data._name] = data
# Get feed from data
feed = data.getfeed()
# Add feed if not already present
if feed and feed not in self.feeds:
self.feeds.append(feed)
# Set live mode if data is live
if data.islive():
self._dolive = True
return data
[文档]
def chaindata(self, *args, **kwargs):
"""
Chains several data feeds into one
If ``name`` is passed as named argument and not `None`, it will be put
into ``data._name`` which is meant for decoration/plotting purposes.
If `None`, then the name of the first data will be used
"""
dname = kwargs.pop("name", None)
if dname is None:
dname = args[0]._dataname
d = feeds.Chainer(dataname=dname, *args)
self.adddata(d, name=dname)
return d
[文档]
def rolloverdata(self, *args, **kwargs):
"""Chains several data feeds into one
If ``name`` is passed as named argument and is not None, it will be put
into ``data._name`` which is meant for decoration/plotting purposes.
If `None`, then the name of the first data will be used
Any other kwargs will be passed to the RollOver class
"""
dname = kwargs.pop("name", None)
if dname is None:
dname = args[0]._dataname
d = feeds.RollOver(dataname=dname, *args, **kwargs)
self.adddata(d, name=dname)
return d
[文档]
def replaydata(self, dataname, name=None, **kwargs):
"""
Adds a ``Data Feed`` to be replayed by the system
If ``name`` is not None, it will be put into ``data._name`` which is
meant for decoration/plotting purposes.
Any other kwargs like ``timeframe``, ``compression``, ``todate`` which
are supported by the replay filter will be passed transparently
"""
if any(dataname is x for x in self.datas):
dataname = dataname.clone()
dataname.replay(**kwargs)
self.adddata(dataname, name=name)
self._doreplay = True
return dataname
[文档]
def resampledata(self, dataname, name=None, **kwargs):
"""
Adds a ``Data Feed`` to be resample by the system
If ``name`` is not None, it will be put into ``data._name`` which is
meant for decoration/plotting purposes.
Any other kwargs like ``timeframe``, ``compression``, ``todate`` which
are supported by the resample filter will be passed transparently
"""
if any(dataname is x for x in self.datas):
dataname = dataname.clone()
dataname.resample(**kwargs)
self.adddata(dataname, name=name)
self._doreplay = True
return dataname
[文档]
def optcallback(self, cb):
"""
Adds a *callback* to the list of callbacks that will be called with the
optimizations when each of the strategies has been run
The signature: cb(strategy)
"""
self.optcbs.append(cb)
[文档]
def optstrategy(self, strategy, *args, **kwargs):
"""
Adds a ``Strategy`` class to the mix for optimization. Instantiation
will happen during ``run`` time.
args and kwargs MUST BE iterables that hold the values to check.
Example: if a Strategy accepts a parameter `period`, for optimization
purposes, the call to ``optstrategy`` looks like:
- cerebro.optstrategy(MyStrategy, period=(15, 25))
This will execute an optimization for values 15 and 25. Whereas
- cerebro.optstrategy(MyStrategy, period=range(15, 25))
will execute MyStrategy with ``period`` values 15 -> 25 (25 not
included, because ranges are semi-open in Python)
If a parameter is passed but shall not be optimized, the call looks
like:
- cerebro.optstrategy(MyStrategy, period=(15,))
Notice that `period` is still passed as an iterable ... of just one element
``backtrader`` will anyhow try to identify situations like:
- cerebro.optstrategy(MyStrategy, period=15)
and will create an internal pseudo-iterable if possible
"""
self._dooptimize = True
args = self.iterize(args)
optargs = itertools.product(*args)
optkeys = list(kwargs)
vals = self.iterize(kwargs.values())
optvals = itertools.product(*vals)
okwargs1 = map(zip, itertools.repeat(optkeys), optvals)
optkwargs = map(dict, okwargs1)
it = itertools.product([strategy], optargs, optkwargs)
self.strats.append(it)
[文档]
def addstrategy(self, strategy, *args, **kwargs):
"""
Adds a ``Strategy`` class to the mix for a single pass run.
Instantiation will happen during ``run`` time.
Args and kwargs will be passed to the strategy as they are during
instantiation.
Returns the index with which addition of other objects (like sizers)
can be referenced
"""
self.strats.append([(strategy, args, kwargs)])
return len(self.strats) - 1
[文档]
def setbroker(self, broker):
"""
Sets a specific ``broker`` instance for this strategy, replacing the
one inherited from cerebro.
"""
self._broker = broker
broker.cerebro = self
return broker
[文档]
def getbroker(self):
"""
Returns the broker instance.
This is also available as a ``property`` by the name ``broker``
"""
return self._broker
broker = property(getbroker, setbroker)
[文档]
def plot(
self,
plotter=None,
numfigs=1,
iplot=True,
start=None,
end=None,
width=16,
height=9,
dpi=300,
tight=True,
use=None,
backend="matplotlib",
**kwargs,
):
"""
Plots the strategies inside cerebro
If ``plotter`` is None, a default ``Plot`` instance is created and
``kwargs`` are passed to it during instantiation.
``numfigs`` split the plot in the indicated number of charts reducing
chart density if wished
``iplot``: if ``True`` and running in a ``notebook`` the charts will be
displayed inline
``use``: set it to the name of the desired matplotlib backend. It will
take precedence over ``iplot``
``backend``: plotting backend to use. Options:
- 'matplotlib': traditional matplotlib plotting (default)
- 'plotly': interactive Plotly charts (better for large data)
``start``: An index to the datetime line array of the strategy or a
``datetime.date``, ``datetime.datetime`` instance indicating the start
of the plot
``end``: An index to the datetime line array of the strategy or a
``datetime.date``, ``datetime.datetime`` instance indicating the end
of the plot
``width``: in inches of the saved figure
``height``: in inches of the saved figure
``dpi``: quality in dots per inches of the saved figure
``tight``: only save actual content and not the frame of the figure
"""
if self._exactbars > 0:
return
# For plotly backend, ensure Transactions analyzer exists for buy/sell signals
if backend == "plotly":
for stratlist in self.runstrats:
for strat in stratlist:
# Check if Transactions analyzer already exists
has_txn = any(a.__class__.__name__ == "Transactions" for a in strat.analyzers)
if not has_txn:
# Add Transactions analyzer retroactively is not possible
# So we'll rely on broker.orders instead
pass
if not plotter:
from . import plot
if backend == "plotly":
plotter = plot.PlotlyPlot(**kwargs)
elif self.p.oldsync:
plotter = plot.Plot_OldSync(**kwargs)
else:
plotter = plot.Plot(**kwargs)
# pfillers = {self.datas[i]: self._plotfillers[i]
# for i, x in enumerate(self._plotfillers)}
# pfillers2 = {self.datas[i]: self._plotfillers2[i]
# for i, x in enumerate(self._plotfillers2)}
figs = []
for stratlist in self.runstrats:
for si, strat in enumerate(stratlist):
rfig = plotter.plot(
strat,
figid=si * 100,
numfigs=numfigs,
iplot=iplot,
start=start,
end=end,
use=use,
)
# pfillers=pfillers2)
figs.append(rfig)
plotter.show()
return figs
# Module passed to cerebro for multiprocessing during optimization
[文档]
def __call__(self, iterstrat):
"""
Used during optimization to pass the cerebro over the multiprocessing
module without complaints
"""
predata = self.p.optdatas and self._dopreload and self._dorunonce
return self.runstrategies(iterstrat, predata=predata)
# Delete runstrats when pickling
[文档]
def __getstate__(self):
"""
Used during optimization to prevent optimization result `runstrats`
from being pickled to subprocesses
"""
rv = vars(self).copy()
if "runstrats" in rv:
del rv["runstrats"]
return rv
# When called from within a strategy or elsewhere, stops execution quickly
[文档]
def runstop(self):
"""If invoked from inside a strategy or anywhere else, including other
threads, the execution will stop as soon as possible."""
self._event_stop = True # signal a stop has been requested
# Core method for backtesting. Any passed kwargs affect cerebro standard parameters.
# If no data added, will stop immediately. Return value differs based on optimization.
[文档]
def run(self, **kwargs):
"""The core method to perform backtesting. Any ``kwargs`` passed to it
will affect the value of the standard parameters ``Cerebro`` was
instantiated with.
If `cerebro` has no data, the method will immediately bail out.
It has different return values:
- For No Optimization: a list contanining instances of the Strategy
classes added with ``addstrategy``
- For Optimization: a list of lists which contain instances of the
Strategy classes added with ``addstrategy``
"""
self._event_stop = False # Stop is requested
# If no data, return empty list immediately
if not self.datas:
return [] # nothing can be run
# Override standard parameters with passed kwargs
pkeys = self.params._getkeys()
for key, val in kwargs.items():
if key in pkeys:
setattr(self.params, key, val)
# Manage activate/deactivate object cache
# Manage object cache
linebuffer.LineActions.cleancache() # clean cache
indicator.Indicator.cleancache() # clean cache
linebuffer.LineActions.usecache(self.p.objcache)
indicator.Indicator.usecache(self.p.objcache)
# Check if _dorunonce, _dopreload, _exactbars
self._dorunonce = self.p.runonce
self._dopreload = self.p.preload
self._exactbars = int(self.p.exactbars)
# If _exactbars is not 0, _dorunonce must be False; if _dopreload is True and _exactbars < 1, set _dopreload to True
if self._exactbars:
self._dorunonce = False # something is saving memory, no runonce
self._dopreload = self._dopreload and self._exactbars < 1
# If _doreplay is True or any data has replaying attribute True, set _doreplay to True
self._doreplay = self._doreplay or any(x.replaying for x in self.datas)
# If _doreplay, need to set _dopreload to False
if self._doreplay:
# preloading is not supported with replay. full timeframe bars
# are constructed in realtime
self._dopreload = False
# If _dolive or live, need to set _dorunonce and _dopreload to False
if self._dolive or self.p.live:
# in this case, both preload and runonce must be off
self._dorunonce = False
self._dopreload = False
# Writer list
self.runwriters = list()
# Add the system default writer if requested
# If writer parameter is True, add default writer
if self.p.writer is True:
wr = WriterFile()
self.runwriters.append(wr)
# Instantiate any other writers
# If there are other writers, instantiate and add to runwriters
for wrcls, wrargs, wrkwargs in self.writers:
wr = wrcls(*wrargs, **wrkwargs)
self.runwriters.append(wr)
# Write down if any writer wants the full csv output
# If that writer needs full csv output, save results to file
self.writers_csv = any(map(lambda x: x.p.csv, self.runwriters))
# Running strategy list
self.runstrats = list()
# If signals is not None, handle signalstrategy related issues
if self.signals: # allow processing of signals
signalst, sargs, skwargs = self._signal_strat
if signalst is None:
# Try to see if the 1st regular strategy is a signal strategy
try:
signalst, sargs, skwargs = self.strats.pop(0)
except IndexError:
pass # Nothing there
else:
if not isinstance(signalst, SignalStrategy):
# no signal ... reinsert at the beginning
self.strats.insert(0, (signalst, sargs, skwargs))
signalst = None # flag as not present
if signalst is None: # recheck
# Still None, create a default one
signalst, sargs, skwargs = SignalStrategy, tuple(), dict()
# Add the signal strategy
self.addstrategy(
signalst,
_accumulate=self._signal_accumulate,
_concurrent=self._signal_concurrent,
signals=self.signals,
*sargs,
**skwargs,
)
# If strategy list is empty, add strategy
if not self.strats: # Datas are present, add a strategy
self.addstrategy(Strategy)
# Iterate strategies
iterstrats = itertools.product(*self.strats)
# If not optimization parameters, or using 1 cpu core
if not self._dooptimize or self.p.maxcpus == 1:
# If no optimmization is wished ... or 1 core is to be used
# let's skip process "spawning"
# Iterate through strategies
for iterstrat in iterstrats:
# Run strategy
runstrat = self.runstrategies(iterstrat)
# Add running strategy to running strategy list
self.runstrats.append(runstrat)
# If optimization parameters
if self._dooptimize:
# Iterate all optcbs to return stopped strategy results
for cb in self.optcbs:
cb(runstrat) # callback receives finished strategy
# If optimization parameters
else:
# If optdatas is True, and _dopreload, and _dorunonce
if self.p.optdatas and self._dopreload and self._dorunonce:
# Iterate each data, reset, if _exactbars < 1, extend data
# Start data
# If data _dopreload, call preload on data
for data in self.datas:
data.reset()
if self._exactbars < 1: # datas can be a full length
data.extend(size=self.params.lookahead)
data._start()
# TODO: Re-checking self._dopreload here seems unnecessary since we already guaranteed it's True
# if self._dopreload:
# data.preload()
data.preload()
# Start process pool
pool = multiprocessing.Pool(self.p.maxcpus or None)
for r in pool.imap(self, iterstrats):
self.runstrats.append(r)
for cb in self.optcbs:
cb(r) # callback receives finished strategy
# Close process pool
pool.close()
# If optdatas is True, and _dopreload, and _dorunonce, iterate data and stop data
if self.p.optdatas and self._dopreload and self._dorunonce:
for data in self.datas:
data.stop()
# If not optimization parameters
if not self._dooptimize:
# avoid a list of list for regular cases
return self.runstrats[0]
return self.runstrats
# Initialize count
def _init_stcount(self):
self.stcount = itertools.count(0)
# Call next count
def _next_stid(self):
return next(self.stcount)
# Run strategy
[文档]
def runstrategies(self, iterstrat, predata=False):
"""
Internal method invoked by ``run``` to run a set of strategies
"""
# Initialize count
self._init_stcount()
# Initialize running strategy as empty list
self.runningstrats = runstrats = list()
# Iterate stores and start
for store in self.stores:
store.start()
# If cheat_on_open and broker_coo, set broker accordingly
if self.p.cheat_on_open and self.p.broker_coo:
# try to activate in broker
if hasattr(self._broker, "set_coo"):
self._broker.set_coo(True)
# If fund history is not None, need to set fund history
if self._fhistory is not None:
self._broker.set_fund_history(self._fhistory)
# Iterate order history
for orders, onotify in self._ohistory:
self._broker.add_order_history(orders, onotify)
# Broker start
self._broker.start()
# Feed start
for feed in self.feeds:
feed.start()
# If need to save writer data
if self.writers_csv:
# headers
wheaders = list()
# Iterate data, if data csv attribute is True, get headers that need saving
for data in self.datas:
if data.csv:
wheaders.extend(data.getwriterheaders())
# Save writer headers
for writer in self.runwriters:
if writer.p.csv:
writer.addheaders(wheaders)
# self._plotfillers = [list() for d in self.datas]
# self._plotfillers2 = [list() for d in self.datas]
# If no predata, need to pre-process data, similar to run method preprocessing
if not predata:
for data in self.datas:
data.reset()
if self._exactbars < 1: # datas can be a full length
data.extend(size=self.params.lookahead)
data._start()
if self._dopreload:
data.preload()
# Loop through strategies
for stratcls, sargs, skwargs in iterstrat:
# Add data to strategy parameters
sargs = self.datas + list(sargs)
# Instantiate strategy with OwnerContext so findowner() can find Cerebro
try:
# Use OwnerContext so Strategy.__new__ can find Cerebro via findowner()
with OwnerContext.set_owner(self):
# Use safe strategy creation to handle parameter filtering
if hasattr(stratcls, "_create_strategy_safely"):
strat = stratcls._create_strategy_safely(*sargs, **skwargs)
else:
# Fallback to direct instantiation
strat = stratcls(*sargs, **skwargs)
except errors.StrategySkipError:
continue # do not add strategy to the mix
# Old data synchronization method
if self.p.oldsync:
strat._oldsync = True # tell strategy to use old clock update
# Whether to save trade history data
if self.p.tradehistory:
strat.set_tradehistory()
# Add strategy
runstrats.append(strat)
# Get timezone info, if tz is integer, get tz at that index; otherwise use tzparse
tz = self.p.tz
if isinstance(tz, integer_types):
tz = self.datas[tz]._tz
else:
tz = tzparse(tz)
# If runstrats is not empty list
if runstrats:
# loop separated for clarity
# Get default sizer
defaultsizer = self.sizers.get(None, (None, None, None))
# For each strategy
for idx, strat in enumerate(runstrats):
# If stdstats is True, add several observers
if self.p.stdstats:
# Add observer broker
strat._addobserver(False, observers.Broker)
# Add observers.BuySell
if self.p.oldbuysell:
strat._addobserver(True, observers.BuySell)
else:
strat._addobserver(True, observers.BuySell, barplot=True)
# Add observer trade
if self.p.oldtrades or len(self.datas) == 1:
strat._addobserver(False, observers.Trades)
else:
strat._addobserver(False, observers.DataTrades)
# Add observers and their parameters to strategy
for multi, obscls, obsargs, obskwargs in self.observers:
strat._addobserver(multi, obscls, *obsargs, **obskwargs)
# Add indicators to strategy
for indcls, indargs, indkwargs in self.indicators:
strat._addindicator(indcls, *indargs, **indkwargs)
# Add analyzers to strategy
for ancls, anargs, ankwargs in self.analyzers:
strat._addanalyzer(ancls, *anargs, **ankwargs)
# Get specific sizer, if sizer is not None, add to strategy
sizer, sargs, skwargs = self.sizers.get(idx, defaultsizer)
if sizer is not None:
strat._addsizer(sizer, *sargs, **skwargs)
# Set timezone
strat._settz(tz)
# Strategy start
strat._start()
# For running writers, if csv parameter is True, save strategy data to writer
for writer in self.runwriters:
if writer.p.csv:
writer.addheaders(strat.getwriterheaders())
# If predata is False, data not preloaded
if not predata:
# Loop each strategy, call qbuffer to cache data
for strat in runstrats:
strat.qbuffer(self._exactbars, replaying=self._doreplay)
# Loop each writer, start writer
for writer in self.runwriters:
writer.start()
# Prepare timers
self._timers = []
self._timerscheat = []
# Loop timers
for timer in self._pretimers:
# preprocess tzdata if needed
# Start timer
timer.start(self.datas[0])
# If timer parameter cheat is True, add timer to self._timerscheat, otherwise add to self._timers
if timer.params.cheat:
self._timerscheat.append(timer)
else:
self._timers.append(timer)
# If _dopreload and _dorunonce are True
if self._dopreload and self._dorunonce:
# If old data alignment and sync method, use _runonce_old, otherwise use _runonce
if self.p.oldsync:
self._runonce_old(runstrats)
else:
self._runonce(runstrats)
# If _dopreload and _dorunonce are not both True
else:
# If old data alignment and sync method, use _runnext_old, otherwise use _runnext
if self.p.oldsync:
self._runnext_old(runstrats)
else:
self._runnext(runstrats)
# Iterate strategies and stop running
for strat in runstrats:
strat._stop()
# Stop broker
self._broker.stop()
# If predata is False, iterate data and stop each data
if not predata:
for data in self.datas:
data.stop()
# Iterate each feed and stop feed
for feed in self.feeds:
feed.stop()
# Iterate each store and stop store
for store in self.stores:
store.stop()
# Stop writer
self.stop_writers(runstrats)
# If doing parameter optimization and optreturn is True, get strategy results and add to results
if self._dooptimize and self.p.optreturn:
# Results can be optimized
results = list()
for strat in runstrats:
for a in strat.analyzers:
a.strategy = None
a._parent = None
# OPTIMIZED: Use __dict__ instead of dir() for better performance
for attrname in list(a.__dict__.keys()):
if attrname.startswith("data"):
setattr(a, attrname, None)
oreturn = OptReturn(
strat.params, analyzers=strat.analyzers, strategycls=type(strat)
)
results.append(oreturn)
return results
return runstrats
# Stop writer
[文档]
def stop_writers(self, runstrats):
"""Stop all writers and write final information.
Args:
runstrats: List of strategy instances that were run.
Collects information from data feeds and strategies, writes
the information to all registered writers, and stops them.
"""
# Cerebro info
cerebroinfo = OrderedDict()
# Data info
datainfos = OrderedDict()
# Get info for each data, save to datainfos, then save to cerebroinfo
for i, data in enumerate(self.datas):
datainfos["Data%d" % i] = data.getwriterinfo()
cerebroinfo["Datas"] = datainfos
# Get strategy info and save to stratinfos and cerebroinfo
stratinfos = dict()
for strat in runstrats:
stname = strat.__class__.__name__
stratinfos[stname] = strat.getwriterinfo()
cerebroinfo["Strategies"] = stratinfos
# Write cerebroinfo to file
for writer in self.runwriters:
writer.writedict(dict(Cerebro=cerebroinfo))
writer.stop()
# Notify broker info
def _brokernotify(self):
"""
Internal method which kicks the broker and delivers any broker
notification to the strategy
"""
# Call broker's next
self._broker.next()
while True:
# Get order info to notify, if order is None break loop, otherwise get order's owner.
# If owner is None, default to first strategy
order = self._broker.get_notification()
if order is None:
break
owner = order.owner
if owner is None:
owner = self.runningstrats[0] # default
# Notify order info through first strategy
owner._addnotification(order, quicknotify=self.p.quicknotify)
# Old runnext method, similar to runnext
def _runnext_old(self, runstrats):
"""
Actual implementation of run in full next mode. All objects have its
`next` method invoked on each data arrival
"""
data0 = self.datas[0]
d0ret = True
while d0ret or d0ret is None:
lastret = False
# Notify anything from the store even before moving datas
# because datas may not move due to an error reported by the store
self._storenotify()
if self._event_stop: # stop if requested
return
self._datanotify()
if self._event_stop: # stop if requested
return
d0ret = data0.next()
if d0ret:
for data in self.datas[1:]:
if not data.next(datamaster=data0): # no delivery
data._check(forcedata=data0) # check forcing output
data.next(datamaster=data0) # retry
elif d0ret is None:
# meant for things like live feeds which may not produce a bar
# at the moment but need the loop to run for notifications and
# getting resample and others to produce timely bars
data0._check()
for data in self.datas[1:]:
data._check()
else:
lastret = data0._last()
for data in self.datas[1:]:
lastret += data._last(datamaster=data0)
if not lastret:
# Only go extra round if something was changed by "lasts"
break
# Datas may have generated a new notification after next
self._datanotify()
if self._event_stop: # stop if requested
return
self._brokernotify()
if self._event_stop: # stop if requested
return
if d0ret or lastret: # bars produced by data or filters
for strat in runstrats:
strat._next()
if self._event_stop: # stop if requested
return
self._next_writers(runstrats)
# Last notification chance before stopping
self._datanotify()
if self._event_stop: # stop if requested
return
self._storenotify()
if self._event_stop: # stop if requested
return
# Old runonce method, similar to runonce
def _runonce_old(self, runstrats):
"""
Actual implementation of run in vector mode.
Strategies are still invoked on a pseudo-event mode in which `next`
is called for each data arrival
"""
for strat in runstrats:
strat._once()
# The default once for strategies does nothing and therefore
# has not moved forward all datas/indicators/observers that
# were homed before calling once, Hence no "need" to do it
# here again, because pointers are at 0
data0 = self.datas[0]
datas = self.datas[1:]
for i in range(data0.buflen()):
data0.advance()
for data in datas:
data.advance(datamaster=data0)
self._brokernotify()
if self._event_stop: # stop if requested
return
for strat in runstrats:
# data0.datetime[0] for compat. w/ new strategy's oncepost
strat._oncepost(data0.datetime[0])
if self._event_stop: # stop if requested
return
self._next_writers(runstrats)
# Run writer's next
def _next_writers(self, runstrats):
if not self.runwriters:
return
if self.writers_csv:
wvalues = list()
for data in self.datas:
if data.csv:
wvalues.extend(data.getwritervalues())
for strat in runstrats:
wvalues.extend(strat.getwritervalues())
for writer in self.runwriters:
if writer.p.csv:
writer.addvalues(wvalues)
writer.next()
# Disable runonce
def _disable_runonce(self):
"""API for lineiterators to disable runonce (see HeikinAshi)"""
self._dorunonce = False
# runnext method, core of the framework, event-driven core for data execution
def _runnext(self, runstrats):
"""
Actual implementation of run in full next mode. All objects have its
`next` method invoked on each data arrival
"""
try:
# Sort data by time period
datas = sorted(self.datas, key=lambda x: (x._timeframe, x._compression))
# Other data
datas1 = datas[1:]
# Main data
data0 = datas[0]
d0ret = True
# TODO: rs and rp are not used, commented out
# resample index
_rs = [i for i, x in enumerate(datas) if x.resampling]
# replaying index
_rp = [i for i, x in enumerate(datas) if x.replaying]
# index for resample only, not replay
rsonly = [i for i, x in enumerate(datas) if x.resampling and not x.replaying]
# Check if only doing resample
onlyresample = len(datas) == len(rsonly)
# Check if no data needs resample
noresample = not rsonly
# Number of cloned data
clonecount = sum(d._clone for d in datas)
# Number of data
ldatas = len(datas)
# Number of non-cloned data
ldatas_noclones = ldatas - clonecount
# TODO: lastqcheck not used, commented out
# lastqcheck = False
# Default dt0 at max time
dt0 = date2num(datetime.datetime.max) - 2 # default at max
# while loop
my_num = 0
# TODO: Modify while loop condition to avoid premature exit
# while d0ret or d0ret is None:
while True:
my_num += 1
# if any has live data in the buffer, no data will wait anything
# If any live data exists, newqcheck is False
newqcheck = not any(d.haslivedata() for d in datas)
# If live data exists
if not newqcheck:
# If no data has reached the live status or all, wait for
# the next incoming data
# livecount is the number of live data
livecount = sum(d._laststatus == d.LIVE for d in datas)
# TODO: This check has no meaning
newqcheck = not livecount or livecount == ldatas_noclones
lastret = False
# Notify anything from the store even before moving datas
# because datas may not move due to an error reported by the store
# Notify store related info
self._storenotify()
if self._event_stop: # stop if requested
return
# Notify data related info
self._datanotify()
if self._event_stop: # stop if requested
return
# record starting time and tell feeds to discount the elapsed time
# from the qcheck value
# Record start time and notify feed to subtract elapsed time from qcheck
drets = []
qstart = datetime.datetime.now(UTC)
for d in datas:
qlapse = datetime.datetime.now(UTC) - qstart
d.do_qcheck(newqcheck, qlapse.total_seconds())
d_next = d.next(ticks=False)
drets.append(d_next)
# TODO: Debug code, try printing
# if d_next:
# print(drets)
# Iterate drets, if d0ret is False and any dret is None, d0ret is None
d0ret = any(dret for dret in drets)
if not d0ret and any(dret is None for dret in drets):
d0ret = None
# If d0ret is not None
if d0ret:
# Get time
dts = []
for i, ret in enumerate(drets):
dts.append(datas[i].datetime[0] if ret else None)
# Get index to minimum datetime
# Get minimum time
if onlyresample or noresample:
dt0 = min(d for d in dts if d is not None)
else:
dt0 = min(
(d for i, d in enumerate(dts) if d is not None and i not in rsonly)
)
# TODO: dt0 < 1 is wrong, needs modification
if dt0 < 1:
return
# Get master data and time
dmaster = datas[dts.index(dt0)] # and timemaster
self._dtmaster = dmaster.num2date(dt0)
self._udtmaster = num2date(dt0)
# slen = len(runstrats[0])
# Try to get something for those that didn't return
# Loop through drets
for i, ret in enumerate(drets):
# If ret is not None, continue to next ret
if ret: # dts already contains a valid datetime for this i
continue
# try to get data by checking with a master
# Get data and try to set time for dts
d = datas[i]
d._check(forcedata=dmaster) # check to force output
if d.next(datamaster=dmaster, ticks=False): # retry
dts[i] = d.datetime[0] # good -> store
# self._plotfillers2[i].append(slen) # mark as fill
else:
# self._plotfillers[i].append(slen) # mark as empty
pass
# make sure only those at dmaster level end up delivering
# Iterate dts
for i, dti in enumerate(dts):
# If dti is not None
if dti is not None:
# Get data
di = datas[i]
# TODO: Code is redundant, rpi always returns False, can be removed
# rpi = False and di.replaying # to check behavior
if dti > dt0:
# TODO: rpi is False here, not rpi is True, consider removing and run directly
# if not rpi: # must see all ticks ...
di.rewind() # cannot deliver yet
# self._plotfillers[i].append(slen)
# If not replay
elif not di.replaying:
# Replay forces tick fill, else force here
di._tick_fill(force=True)
# self._plotfillers2[i].append(slen) # mark as fill
# If d0ret is None, iterate each data and call _check()
elif d0ret is None:
# meant for things like live feeds which may not produce a bar
# at the moment but need the loop to run for notifications and
# getting resample and others to produce timely bars
for data in datas:
data._check()
# If other case
else:
lastret = data0._last()
for data in datas1:
lastret += data._last(datamaster=data0)
if not lastret:
# Only go extra round if something was changed by "lasts"
break
# Datas may have generated a new notification after next
# Notify data info
self._datanotify()
if self._event_stop: # stop if requested
return
# Check timer and iterate strategies, call _next_open() to run
if d0ret or lastret: # if any bar, check timers before broker
self._check_timers(runstrats, dt0, cheat=True)
if self.p.cheat_on_open:
for strat in runstrats:
strat._next_open()
if self._event_stop: # stop if requested
return
# Notify broker
self._brokernotify()
if self._event_stop: # stop if requested
return
# Notify timer and iterate strategies to run
if d0ret or lastret: # bars produced by data or filters
# print("begin go to the strategy next")
self._check_timers(runstrats, dt0, cheat=False)
for strat in runstrats:
strat._next()
if self._event_stop: # stop if requested
return
self._next_writers(runstrats)
# if my_num % 1000000 == 0:
# print("end_runnext")
# print("exit_runnext")
# Last notification chance before stopping
# Notify data info
self._datanotify()
if self._event_stop: # stop if requested
return
# Notify store info
self._storenotify()
if self._event_stop: # stop if requested
return
except Exception as e:
_error_info = traceback.format_exception(type(e), e, e.__traceback__)
# print(_error_info) # Removed for performance - can be re-enabled for debugging
# runonce
def _runonce(self, runstrats):
"""
Actual implementation of run in vector mode.
Strategies are still invoked on a pseudo-event mode in which `next`
is called for each data arrival
"""
# Iterate strategies, call _once and reset
for strat in runstrats:
strat._once()
strat.reset() # strat called next by next - reset lines
# The default once for strategies does nothing and therefore
# has not moved forward all datas/indicators/observers that
# were homed before calling once, Hence no "need" to do it
# here again, because pointers are at 0
# Sort data from small period to large period
datas = sorted(self.datas, key=lambda x: (x._timeframe, x._compression))
while True:
# Check the next incoming date in the datas
# For each data call advance_peek(), get minimum time as the first one
dts = [d.advance_peek() for d in datas]
dt0 = min(dts)
if dt0 == float("inf"):
break # no data delivers anything
# Timemaster if needed be
# dmaster = datas[dts.index(dt0)] # and timemaster
# First strategy current length slen
# TODO: Variable slen not used, commented out
# slen = len(runstrats[0])
# For each data time, if time <= minimum time, advance data, otherwise ignore
for i, dti in enumerate(dts):
if dti <= dt0:
datas[i].advance()
# self._plotfillers2[i].append(slen) # mark as fill
else:
# self._plotfillers[i].append(slen)
pass
# Check timer
self._check_timers(runstrats, dt0, cheat=True)
# If cheat_on_open, call _oncepost_open() for each strategy
if self.p.cheat_on_open:
for strat in runstrats:
strat._oncepost_open()
# If stop was called, stop
if self._event_stop: # stop if requested
return
# Call _brokernotify()
self._brokernotify()
# If stop was called, stop
if self._event_stop: # stop if requested
return
# Check timer
self._check_timers(runstrats, dt0, cheat=False)
for strat in runstrats:
strat._oncepost(dt0)
if self._event_stop: # stop if requested
return
self._next_writers(runstrats)
# CRITICAL FIX: Process any pending orders that were submitted in the last iteration
# In runonce mode, orders submitted in the last _oncepost() call (which calls next())
# need to be processed before calling stop(), otherwise they won't be executed and trades won't be counted
# However, we need to ensure data index positions are correct before calling _brokernotify()
# The issue is that after the loop ends, data may have advanced beyond the last valid point,
# so we need to ensure data positions are set to the last valid datetime before processing orders
try:
# Get the last valid datetime from the strategy
if runstrats and len(runstrats) > 0:
strat = runstrats[0]
if hasattr(strat, "_last_valid_datetime") and strat._last_valid_datetime > 0:
last_dt = strat._last_valid_datetime
# For each data, find the index where datetime matches last_dt and set _idx accordingly
# This ensures broker can access data correctly when executing orders
for data in self.datas:
try:
if hasattr(data, "lines") and hasattr(data.lines, "datetime"):
# In runonce mode, data has an array of datetime values
# We need to find the index where datetime matches last_dt
if hasattr(data.lines.datetime, "array"):
dt_array = data.lines.datetime.array
# Find the last index where datetime <= last_dt
# This ensures we're at or before the last valid datetime
for i in range(len(dt_array) - 1, -1, -1):
if dt_array[i] <= last_dt and dt_array[i] > 0:
# Set _idx to this position so data.datetime[0] returns the correct value
if hasattr(data, "_idx"):
data._idx = i
if hasattr(data.lines.datetime, "_idx"):
data.lines.datetime._idx = i
# Also set _idx for all other lines in the data
if hasattr(data.lines, "lines"):
for line in data.lines.lines:
if hasattr(line, "_idx") and hasattr(
line, "array"
):
if i < len(line.array):
line._idx = i
break
except Exception:
pass
except Exception:
pass
# Now call _brokernotify() to process pending orders
# _brokernotify() internally calls broker.next() to process pending orders and then delivers notifications
# This ensures all orders submitted during the strategy execution are processed
self._brokernotify()
# print("end_runonce") # Removed for performance - called frequently during tests
# Check timer
def _check_timers(self, runstrats, dt0, cheat=False):
# If cheat is False, timers equals self._timers, otherwise equals self._timerscheat
timers = self._timers if not cheat else self._timerscheat
# For timer in timers
for t in timers:
# Use timer.check(dt0), if returns True, enter below, otherwise check next timer
if not t.check(dt0):
continue
# CRITICAL FIX: Remove 'when' from kwargs to avoid conflict with position argument
# when is already passed as t.lastwhen (2nd argument)
timer_kwargs = {k: v for k, v in t.kwargs.items() if k != "when"}
# Notify timer
t.params.owner.notify_timer(t, t.lastwhen, *t.args, **timer_kwargs)
# If strategy needs to use timer (t.params.strats is True), iterate strategies and call notify_timer
if t.params.strats:
for strat in runstrats:
strat.notify_timer(t, t.lastwhen, *t.args, **timer_kwargs)
[文档]
def add_report_analyzers(self, riskfree_rate=0.01):
"""Automatically add analyzers required for reporting.
Adds the following analyzers:
- SharpeRatio: Sharpe ratio
- DrawDown: Drawdown analysis
- TradeAnalyzer: Trade analysis
- SQN: System Quality Number
- AnnualReturn: Annual returns
Args:
riskfree_rate: Risk-free rate, default 0.01 (1%)
"""
from . import analyzers
self.addanalyzer(
analyzers.SharpeRatio,
_name="sharperatio",
riskfreerate=riskfree_rate,
timeframe=TimeFrame.Months,
)
self.addanalyzer(analyzers.DrawDown, _name="drawdown")
self.addanalyzer(analyzers.TradeAnalyzer, _name="tradeanalyzer")
self.addanalyzer(analyzers.SQN, _name="sqn")
self.addanalyzer(analyzers.AnnualReturn, _name="annualreturn")
self.addanalyzer(analyzers.TimeReturn, _name="timereturn", timeframe=TimeFrame.Days)
[文档]
def generate_report(
self, output_path, format="html", template="default", user=None, memo=None, **kwargs
):
"""Generate backtest report.
Args:
output_path: Output file path
format: Report format ('html', 'pdf', 'json')
template: Template name or path (only for HTML/PDF)
user: Username
memo: Remarks/notes
**kwargs: Additional parameters
Returns:
str: Output file path
Raises:
RuntimeError: If strategy has not been run yet
Example:
cerebro = bt.Cerebro()
cerebro.addstrategy(MyStrategy)
cerebro.adddata(data)
cerebro.run()
cerebro.generate_report('report.html')
"""
if not self.runstrats:
raise RuntimeError("No strategy has been run. Call cerebro.run() first.")
# Get the first strategy
strategy = self.runstrats[0][0]
from .reports import ReportGenerator
report = ReportGenerator(strategy, template=template)
format_lower = format.lower()
if format_lower == "html":
return report.generate_html(output_path, user=user, memo=memo, **kwargs)
elif format_lower == "pdf":
return report.generate_pdf(output_path, user=user, memo=memo, **kwargs)
elif format_lower == "json":
return report.generate_json(output_path, **kwargs)
else:
raise ValueError(f"Unsupported format: {format}. Use 'html', 'pdf', or 'json'.")