backtrader.filters.datafiller 源代码

#!/usr/bin/env python
"""Data Filler Filter Module - Gap filling for data feeds.

This module provides the DataFiller for filling gaps in data feeds
when bars are missing.

Classes:
    DataFiller: Fills gaps in data with specified values.

Example:
    >>> data = bt.feeds.GenericCSVData(dataname='data.csv')
    >>> data.addfilter(bt.filters.DataFiller())
    >>> cerebro.adddata(data)
"""

import collections
from datetime import datetime, timedelta

from ..dataseries import TimeFrame
from ..feed import AbstractDataBase


[文档] class DataFiller(AbstractDataBase): """This class will fill gaps in the source data using the following information bits from the underlying data source - timeframe and compression are to dimension the output bars - sessionstart and sessionend If a data feed has missing bars in between 10:31 and 10:34 and the timeframe is minutes, the output will be filled with bars for minutes 10:32 and 10:33 using the closing price of the last bar (10:31) Bars can be missing amongst other things because Params: - ``fill_price`` (def: None): if None (or evaluates to False), the closing price will be used, else the passed value (which can be for example 'NaN' to have a missing bar in terms of evaluation but present in terms of time - ``fill_vol`` (def: NaN): used to fill the volume with missing bars - ``fill_oi`` (def: NaN): used to fill the openinterest with missing bars """ params = ( ("fill_price", None), ("fill_vol", float("NaN")), ("fill_oi", float("NaN")), ) def __init__(self): """Initialize the DataFiller. Sets up internal state variables for tracking timeframe, compression, data bars, and fill bars queue. """ self._timeframe = None self._compression = None self._dbar = None self._fillbars = None
[文档] def start(self): """Start the data filler. Initializes the fill bars queue and data bar flag. This method is called when the data feed starts processing. """ super().start() self._fillbars = collections.deque() self._dbar = False
[文档] def preload(self): """Preload data from the underlying data source. If the underlying data is not preloaded, loads it completely. Copies timeframe and compression settings from the source data after it has started (some sources do autodetection). This method ensures all necessary data is available before processing. """ if len(self.p.dataname) == self.p.dataname.buflen(): # if data is not preloaded … do it self.p.dataname.start() self.p.dataname.preload() self.p.dataname.home() # Copy timeframe from data after start (some sources do autodetection) self.p.timeframe = self._timeframe = self.p.dataname._timeframe self.p.compression = self._compression = self.p.dataname._compression super().preload()
def _copyfromdata(self): # Data is allowed - Copy size which is "number of lines" for i in range(self.p.dataname.size()): self.lines[i][0] = self.p.dataname.lines[i][0] self._dbar = False # invalidate a flag for read bar return True def _frombars(self): dtime, price = self._fillbars.popleft() price = self.p.fill_price or price self.lines.datetime[0] = self.p.dataname.date2num(dtime) self.lines.open[0] = price self.lines.high[0] = price self.lines.low[0] = price self.lines.close[0] = price self.lines.volume[0] = self.p.fill_vol self.lines.openinterest[0] = self.p.fill_oi return True # Minimum delta unit in between bars _tdeltas = { TimeFrame.Minutes: timedelta(seconds=60), TimeFrame.Seconds: timedelta(seconds=1), TimeFrame.MicroSeconds: timedelta(microseconds=1), } def _load(self): if not len(self.p.dataname): self.p.dataname.start() # start data if not done somewhere else # Copy from underlying data self._timeframe = self.p.dataname._timeframe self._compression = self.p.dataname._compression self.p.timeframe = self._timeframe self.p.compression = self._compression # Calculate and save timedelta for timeframe self._tdunit = self._tdeltas[self._timeframe] self._tdunit *= self._compression if self._fillbars: return self._frombars() # use existing bar or fetch a bar self._dbar = self._dbar or self.p.dataname.next() if not self._dbar: return False # no more data if len(self) == 1: # Cannot yet look backwards - deliver data as is return self._copyfromdata() # previous (delivered) close pclose = self.lines.close[-1] # Get time of previous (already delivered) bar dtime_prev = self.lines.datetime.datetime(-1) # Get time of current (from a data source) bar dtime_cur = self.p.dataname.datetime.datetime(0) # Calculate the session end for previous bar send = datetime.combine(dtime_prev.date(), self.p.dataname.sessionend) if dtime_cur > send: # if jumped boundary # 1. check for missing bars until boundary (end) dtime_prev += self._tdunit while dtime_prev < send: self._fillbars.append((dtime_prev, pclose)) dtime_prev += self._tdunit # Calculate session start for new bar sstart = datetime.combine(dtime_cur.date(), self.p.dataname.sessionstart) # 2. check for missing bars from new boundary (start) # check a gap from new sessionstart while sstart < dtime_cur: self._fillbars.append((sstart, pclose)) sstart += self._tdunit else: # no boundary jumped - check the gap until current time dtime_prev += self._tdunit while dtime_prev < dtime_cur: self._fillbars.append((dtime_prev, pclose)) dtime_prev += self._tdunit if self._fillbars: self._dbar = True # flag a pending data bar is available # return an accumulated bar in the current cycle return self._frombars() return self._copyfromdata()