backtrader.filters.daysteps 源代码
#!/usr/bin/env python
"""Day Steps Filter Module - Bar replay simulation.
This module provides the BarReplayerOpen filter for splitting bars
to simulate replay behavior.
Classes:
BarReplayerOpen: Splits bars into open and OHLC parts.
Example:
>>> data = bt.feeds.GenericCSVData(dataname='data.csv')
>>> data.addfilter(bt.filters.BarReplayerOpen())
>>> cerebro.adddata(data)
"""
[文档]
class BarReplayerOpen:
"""
This filters splits a bar in two parts:
- ``Open``: the opening price of the bar will be used to deliver an
initial price bar in which the four components (OHLC) are equal
The volume/openinterest fields are zero for this initial bar
- ``OHLC``: the original bar is delivered complete with the original
``volume``/``openinterest``
The split simulates a replay without the need to use the *replay* filter.
"""
[文档]
def __init__(self, data):
"""Initialize the BarReplayerOpen filter.
Args:
data: The data feed to apply the filter to.
The filter sets resampling=1 and replaying=True on the data.
"""
self.pendingbar = None
data.resampling = 1
data.replaying = True
[文档]
def __call__(self, data):
"""Process the data feed to split bars into open and OHLC parts.
This method is called for each bar in the data feed. It splits the bar
into two parts - an initial bar with only the open price (OHLC=Open)
and the original OHLC bar. This simulates intraday replay behavior.
Args:
data: The data feed containing the bar to process.
Returns:
bool: True if the length of the stream was changed,
False if it remained unchanged.
"""
ret = True
# Make a copy of the new bar and remove it from stream
newbar = [data.lines[i][0] for i in range(data.size())]
data.backwards() # remove the copied bar from stream
openbar = newbar[:] # Make an open only bar
o = newbar[data.Open]
for field_idx in [data.High, data.Low, data.Close]:
openbar[field_idx] = o
# Nullify Volume/OpenInteres at the open
openbar[data.Volume] = 0.0
openbar[data.OpenInterest] = 0.0
# Overwrite the new data bar with our pending data - except start point
if self.pendingbar is not None:
data._updatebar(self.pendingbar)
ret = False
self.pendingbar = newbar # update the pending bar to the new bar
data._add2stack(openbar) # Add the openbar to the stack for processing
return ret # the length of the stream was not changed
[文档]
def last(self, data):
"""Called when the data is no longer producing bars
Can be called multiple times. It has the chance to (for example)
produce extra bars"""
if self.pendingbar is not None:
data.backwards() # remove delivered open bar
data._add2stack(self.pendingbar) # add remaining
self.pendingbar = None # No further action
return True # something delivered
return False # nothing delivered here
# Alias
DayStepsFilter = BarReplayerOpen