backtrader.filters.datafiller 源代码
#!/usr/bin/env python
"""Data Filler Filter Module - Gap filling for data feeds.
This module provides the DataFiller for filling gaps in data feeds
when bars are missing.
Classes:
DataFiller: Fills gaps in data with specified values.
Example:
>>> data = bt.feeds.GenericCSVData(dataname='data.csv')
>>> data.addfilter(bt.filters.DataFiller())
>>> cerebro.adddata(data)
"""
import collections
from datetime import datetime, timedelta
from ..dataseries import TimeFrame
from ..feed import AbstractDataBase
[文档]
class DataFiller(AbstractDataBase):
"""This class will fill gaps in the source data using the following
information bits from the underlying data source
- timeframe and compression are to dimension the output bars
- sessionstart and sessionend
If a data feed has missing bars in between 10:31 and 10:34 and the
timeframe is minutes, the output will be filled with bars for minutes
10:32 and 10:33 using the closing price of the last bar (10:31)
Bars can be missing amongst other things because
Params:
- ``fill_price`` (def: None): if None (or evaluates to False), the
closing price will be used, else the passed value (which can be
for example 'NaN' to have a missing bar in terms of evaluation but
present in terms of time
- ``fill_vol`` (def: NaN): used to fill the volume with missing bars
- ``fill_oi`` (def: NaN): used to fill the openinterest with missing bars
"""
params = (
("fill_price", None),
("fill_vol", float("NaN")),
("fill_oi", float("NaN")),
)
def __init__(self):
"""Initialize the DataFiller.
Sets up internal state variables for tracking timeframe,
compression, data bars, and fill bars queue.
"""
self._timeframe = None
self._compression = None
self._dbar = None
self._fillbars = None
[文档]
def start(self):
"""Start the data filler.
Initializes the fill bars queue and data bar flag.
This method is called when the data feed starts processing.
"""
super().start()
self._fillbars = collections.deque()
self._dbar = False
[文档]
def preload(self):
"""Preload data from the underlying data source.
If the underlying data is not preloaded, loads it completely.
Copies timeframe and compression settings from the source data
after it has started (some sources do autodetection).
This method ensures all necessary data is available before processing.
"""
if len(self.p.dataname) == self.p.dataname.buflen():
# if data is not preloaded … do it
self.p.dataname.start()
self.p.dataname.preload()
self.p.dataname.home()
# Copy timeframe from data after start (some sources do autodetection)
self.p.timeframe = self._timeframe = self.p.dataname._timeframe
self.p.compression = self._compression = self.p.dataname._compression
super().preload()
def _copyfromdata(self):
# Data is allowed - Copy size which is "number of lines"
for i in range(self.p.dataname.size()):
self.lines[i][0] = self.p.dataname.lines[i][0]
self._dbar = False # invalidate a flag for read bar
return True
def _frombars(self):
dtime, price = self._fillbars.popleft()
price = self.p.fill_price or price
self.lines.datetime[0] = self.p.dataname.date2num(dtime)
self.lines.open[0] = price
self.lines.high[0] = price
self.lines.low[0] = price
self.lines.close[0] = price
self.lines.volume[0] = self.p.fill_vol
self.lines.openinterest[0] = self.p.fill_oi
return True
# Minimum delta unit in between bars
_tdeltas = {
TimeFrame.Minutes: timedelta(seconds=60),
TimeFrame.Seconds: timedelta(seconds=1),
TimeFrame.MicroSeconds: timedelta(microseconds=1),
}
def _load(self):
if not len(self.p.dataname):
self.p.dataname.start() # start data if not done somewhere else
# Copy from underlying data
self._timeframe = self.p.dataname._timeframe
self._compression = self.p.dataname._compression
self.p.timeframe = self._timeframe
self.p.compression = self._compression
# Calculate and save timedelta for timeframe
self._tdunit = self._tdeltas[self._timeframe]
self._tdunit *= self._compression
if self._fillbars:
return self._frombars()
# use existing bar or fetch a bar
self._dbar = self._dbar or self.p.dataname.next()
if not self._dbar:
return False # no more data
if len(self) == 1:
# Cannot yet look backwards - deliver data as is
return self._copyfromdata()
# previous (delivered) close
pclose = self.lines.close[-1]
# Get time of previous (already delivered) bar
dtime_prev = self.lines.datetime.datetime(-1)
# Get time of current (from a data source) bar
dtime_cur = self.p.dataname.datetime.datetime(0)
# Calculate the session end for previous bar
send = datetime.combine(dtime_prev.date(), self.p.dataname.sessionend)
if dtime_cur > send: # if jumped boundary
# 1. check for missing bars until boundary (end)
dtime_prev += self._tdunit
while dtime_prev < send:
self._fillbars.append((dtime_prev, pclose))
dtime_prev += self._tdunit
# Calculate session start for new bar
sstart = datetime.combine(dtime_cur.date(), self.p.dataname.sessionstart)
# 2. check for missing bars from new boundary (start)
# check a gap from new sessionstart
while sstart < dtime_cur:
self._fillbars.append((sstart, pclose))
sstart += self._tdunit
else:
# no boundary jumped - check the gap until current time
dtime_prev += self._tdunit
while dtime_prev < dtime_cur:
self._fillbars.append((dtime_prev, pclose))
dtime_prev += self._tdunit
if self._fillbars:
self._dbar = True # flag a pending data bar is available
# return an accumulated bar in the current cycle
return self._frombars()
return self._copyfromdata()