backtrader.resamplerfilter 源代码

#!/usr/bin/env python
"""Resampler and Filter Module - Data resampling and replay functionality.

This module provides classes for resampling data to different timeframes
and replaying data at compressed timeframes. It includes the base
resampler and replayer classes along with specific implementations for
different time periods.

Key Classes:
    Resampler: Base class for resampling data to different timeframes.
    Replayer: Base class for replaying data with session information.
    DTFaker: Provides fake datetime for live/real-time data feeds.

Example:
    Resampling daily data to weekly:
    >>> data = bt.feeds.GenericCSVData(dataname='daily.csv')
    >>> cerebro.adddata(data)
    >>> cerebro.resampledata(data, timeframe=bt.TimeFrame.Weeks)
"""

from datetime import datetime, timedelta, timezone

from .dataseries import TimeFrame, _Bar
from .parameters import ParameterizedBase
from .utils.date import date2num, num2date

# Python 3.11+ has datetime.UTC, earlier versions use timezone.utc
UTC = timezone.utc


# This class is only used in the _checkbarover function
# chkdata = DTFaker(data, forcedata) if fromcheck else data
[文档] class DTFaker: """Provides fake datetime for data sources that need periodic checks. This class is used for real-time data feeds that return None from _load to indicate that a check of the resampler and/or notification queue is needed. It provides the current time in both UTC and localized formats. Attributes: data: The underlying data source. _dt: UTC-like time as numeric value. _dtime: Localized datetime. sessionend: Trading day end time. Example: >>> faker = DTFaker(data) >>> print(faker.datetime()) # Current localized time """ # This will only be used for data sources which at some point in time # return None from _load to indicate that a check of the resampler and/or # notification queue is needed # This is meant (at least initially) for real-time feeds, because those are # the ones in need of events like the ones described above. # These data sources should also be producing ``utc`` time directly because # the real-time feed is (more often than not) timestamped and utc provides # a universal reference, # That's why below the timestamp is chosen in UTC and passed directly to # date2num to avoid localization.But it is extracted from data.num2date # to ensure the returned datetime object is localized according to the # expected output by the user (local timezone or any specified) # Initialize
[文档] def __init__(self, data, forcedata=None): """Initialize the DTFaker with current time. Args: data: The underlying data source. forcedata: Optional data source to force time from. """ # Data self.data = data # Aliases self.datetime = self self.p = self # If forcedata is None if forcedata is None: # Get current utc time and add data time offset _dtime = datetime.now(UTC) + data._timeoffset() # Convert calculated utc time to number self._dt = dt = date2num(_dtime) # utc-like time # Convert numeric time to localized time format self._dtime = data.num2date(dt) # localized time # If forcedata is not None else: # Get corresponding time from forcedata's datetime column as utc time self._dt = forcedata.datetime[0] # utc-like time # Get local time directly from forcedata self._dtime = forcedata.datetime.datetime() # localized time # Trading day end time self.sessionend = data.p.sessionend
# Length def __len__(self): return len(self.data) # Return localized date and time when called
[文档] def __call__(self, idx=0): """Return the localized datetime. Args: idx: Index (ignored, for compatibility). Returns: Localized datetime object. """ return self._dtime # simulates data.datetime.datetime()
# datetime returns localized date and time
[文档] def datetime(self, idx=0): """Return the localized datetime. Args: idx: Index (ignored, for compatibility). Returns: Localized datetime object. """ return self._dtime
# Return localized date
[文档] def date(self, idx=0): """Return the localized date. Args: idx: Index (ignored, for compatibility). Returns: Date object. """ return self._dtime.date()
# Return localized time
[文档] def time(self, idx=0): """Return the localized time. Args: idx: Index (ignored, for compatibility). Returns: Time object. """ return self._dtime.time()
# Return data calendar @property def _calendar(self): return self.data._calendar # If idx=0, return utc numeric time, otherwise return -inf def __getitem__(self, idx): return self._dt if idx == 0 else float("-inf") # Convert number to date and time
[文档] def num2date(self, *args, **kwargs): """Convert numeric time to datetime. Delegates to the underlying data source's num2date method. Returns: Datetime object. """ return self.data.num2date(*args, **kwargs)
# Convert date and time to number
[文档] def date2num(self, *args, **kwargs): """Convert datetime to numeric time. Delegates to the underlying data source's date2num method. Returns: Float representing the datetime. """ return self.data.date2num(*args, **kwargs)
# Get trading day end time def _getnexteos(self): return self.data._getnexteos()
# Base class for resampler class _BaseResampler(ParameterizedBase): # Parameters params = ( ("bar2edge", True), ("adjbartime", True), ("rightedge", True), ("boundoff", 0), ("timeframe", TimeFrame.Days), ("compression", 1), ("takelate", True), ("sessionend", True), ) # Initialize def __init__(self, data, **kwargs): """Initialize the base resampler. Sets up the resampling configuration based on timeframe and compression parameters, and modifies the data source accordingly. Args: data: The data source to resample. **kwargs: Additional parameters for the resampler. """ super().__init__(**kwargs) # If timeframe is less than day but greater than tick, subdays is True, subdays represents intraday timeframe self.subdays = TimeFrame.Ticks < self.p.timeframe < TimeFrame.Days # If timeframe is less than week, subweeks is True self.subweeks = self.p.timeframe < TimeFrame.Weeks # If not subdays, and data timeframe equals parameter timeframe, and parameter compression divided by data compression remainder is 0, componly is True self.componly = ( not self.subdays and data._timeframe == self.p.timeframe and not (self.p.compression % data._compression) ) # Create an object to save bar data self.bar = _Bar(maxdate=True) # bar holder # Number of bars produced, used to control compression count self.compcount = 0 # count of produced bars to control compression # Whether it is the first bar self._firstbar = True # If bar2edge, adjbartime, subweeks are all True, doadjusttime is True self.doadjusttime = self.p.bar2edge and self.p.adjbartime and self.subweeks # The end time of this trading day self._nexteos = None # Modify data information according to own parameters # During initialization, modify data attributes based on parameters # Data resampling is 1 data.resampling = 1 # replaying equals replaying data.replaying = self.replaying # Data timeframe equals parameter timeframe data._timeframe = self.p.timeframe # Data compression equals parameter compression data._compression = self.p.compression self.data = data # How to handle late-arriving data, if not subdays return False, if data length > 1 and current time <= previous time return True def _latedata(self, data): # new data at position 0, still untouched from stream if not self.subdays: return False # Time already delivered return len(data) > 1 and data.datetime[0] <= data.datetime[-1] # Whether to check if bar is over def _checkbarover(self, data, fromcheck=False, forcedata=None): # Data to check, if fromcheck is True, use DTFaker to generate instance, otherwise use data chkdata = DTFaker(data, forcedata) if fromcheck else data # Whether finished isover = False # If not componly and _barover(chkdata) is False, return False if not self.componly and not self._barover(chkdata): return isover # If intraday and bar2edge is True, return True if self.subdays and self.p.bar2edge: isover = True # If fromcheck is False elif not fromcheck: # fromcheck doesn't increase compcount # compcount+1 self.compcount += 1 # If compcount divided by compression equals 0, return True if not (self.compcount % self.p.compression): # boundary crossed and enough bars for compression ... proceed isover = True return isover # Determine if data has finished def _barover(self, data): # Timeframe tframe = self.p.timeframe # If timeframe equals tick, return bar.isopen() if tframe == TimeFrame.Ticks: # Ticks is already the lowest level return self.bar.isopen() # If timeframe is less than day, call _barover_subdays(data) elif tframe < TimeFrame.Days: return self._barover_subdays(data) # If timeframe equals day, call _barover_days(data) elif tframe == TimeFrame.Days: return self._barover_days(data) # If timeframe equals week, call _barover_weeks(data) elif tframe == TimeFrame.Weeks: return self._barover_weeks(data) # If timeframe equals month, call _barover_months(data) elif tframe == TimeFrame.Months: return self._barover_months(data) # If timeframe equals year, call _barover_years(data) elif tframe == TimeFrame.Years: return self._barover_years(data) # Set session end time def _eosset(self): if self._nexteos is None: self._nexteos, self._nextdteos = self.data._getnexteos() return # Check session end time def _eoscheck(self, data, seteos=True, exact=False): # If seteos is True, directly call _eosset to calculate session end time if seteos: self._eosset() # Compare current data time with session end time equal = data.datetime[0] == self._nextdteos grter = data.datetime[0] > self._nextdteos # If exact is True, ret equals equal, # Otherwise, if grter is True, if bar.isopen() is True and bar.datetime < next end time, ret equals True # Otherwise, ret equals equal if exact: ret = equal else: # if the compared data goes over the endofsession # make sure the resampled bar is open and has something before that # end of the session, It could be a weekend and nothing was delivered # until Monday if grter: ret = self.bar.isopen() and self.bar.datetime <= self._nextdteos else: ret = equal # If ret is True, _lasteos equals _nexteos, _lastdteos equals _nextdteos # And set _nexteos and _nextdteos to None and -inf respectively if ret: self._lasteos = self._nexteos self._lastdteos = self._nextdteos self._nexteos = None self._nextdteos = float("-inf") return ret # Check days def _barover_days(self, data): return self._eoscheck(data) # Check weeks def _barover_weeks(self, data): # If data's _calendar is None if self.data._calendar is None: # Get specific year, week number and day from date year, week, _ = data.num2date(self.bar.datetime).date().isocalendar() # Get bar's week number yearweek = year * 100 + week # Get data's year, week number and day, and get data's week number baryear, barweek, _ = data.datetime.date().isocalendar() bar_yearweek = baryear * 100 + barweek # If data's week number is greater than bar's week number, return True, otherwise return False return bar_yearweek > yearweek # If data's _calendar is not None, call last_weekday else: return data._calendar.last_weekday(data.datetime.date()) # Check months def _barover_months(self, data): dt = data.num2date(self.bar.datetime).date() yearmonth = dt.year * 100 + dt.month bardt = data.datetime.datetime() bar_yearmonth = bardt.year * 100 + bardt.month return bar_yearmonth > yearmonth # Check years def _barover_years(self, data): return data.datetime.datetime().year > data.num2date(self.bar.datetime).year # Get time point def _gettmpoint(self, tm): """ Returns the point of time intraday for a given time according to the timeframe - Ex 1: 00:05:00 in minutes -> point = 5 - Ex 2: 00:05:20 in seconds -> point = 5 * 60 + 20 = 320 """ # Minute point point = tm.hour * 60 + tm.minute # Remaining point restpoint = 0 # If timeframe is less than minutes if self.p.timeframe < TimeFrame.Minutes: # Second point point = point * 60 + tm.second # If timeframe is less than seconds if self.p.timeframe < TimeFrame.Seconds: # Convert point to microseconds point = point * 1e6 + tm.microsecond # If timeframe is not less than seconds, remaining point is microseconds else: restpoint = tm.microsecond # If timeframe is not less than minutes, remaining point is seconds and microseconds else: restpoint = tm.second + tm.microsecond # Add boundoff to point point += self.p.boundoff return point, restpoint # Intraday bar over def _barover_subdays(self, data): # If _eoscheck(data) returns True, then function returns True if self._eoscheck(data): return True # If data time is less than bar time, return False if data.datetime[0] < self.bar.datetime: return False # Get time objects for the comparisons - in utc-like format # Get bar and data time tm = num2date(self.bar.datetime).time() bartm = num2date(data.datetime[0]).time() # Get self.bar's time point and data's time point respectively point, _ = self._gettmpoint(tm) barpoint, _ = self._gettmpoint(bartm) # Set ret to False ret = False # If data's time point is less than bar's time point, return False # If data's time point is greater than bar's time point, further analyze if barpoint > point: # The data bar has surpassed the internal bar # If bar2edge is False, return True if not self.p.bar2edge: # Compression done on a simple bar basis (like days) ret = True # If compression is 1, return True elif self.p.compression == 1: # no bar compression requested -> internal bar done ret = True # If bar2edge is True and compression is not 1, calculate remainder of dividing points by compression # If data's point remainder is greater than bar's point remainder, return True else: point_comp = point // self.p.compression barpoint_comp = barpoint // self.p.compression # Went over boundary including compression if barpoint_comp > point_comp: ret = True return ret # Check whether to submit currently stored bar when data hasn't moved forward def check(self, data, _forcedata=None): """Called to check if the current stored bar has to be delivered in spite of the data not having moved forward. If no ticks from a live feed come in, a 5-second resampled bar could be delivered 20 seconds later. When this method is called the wall clock (incl data time offset) is called to check if the time has gone so far as to have to deliver the already stored data """ if not self.bar.isopen(): return return self(data, fromcheck=True, forcedata=_forcedata) # Determine if data is about to form a bar def _dataonedge(self, data): # If subweek is False, if data._calendar is None, return False and True if not self.subweeks: if data._calendar is None: return False, True # nothing can be done # Timeframe tframe = self.p.timeframe # Set ret to False ret = False # If timeframe equals week, call last_weekday to check # If timeframe equals month, call last_monthday to check # If timeframe equals year, call last_yearday to check if tframe == TimeFrame.Weeks: # Ticks is already the lowest ret = data._calendar.last_weekday(data.datetime.date()) elif tframe == TimeFrame.Months: ret = data._calendar.last_monthday(data.datetime.date()) elif tframe == TimeFrame.Years: ret = data._calendar.last_yearday(data.datetime.date()) # If ret is True if ret: # Data must be consumed but compression may not be met yet # Prevent barcheckover from being called because it could again # increase compcount # Set docheckover to False docheckover = False # compcount+1 self.compcount += 1 # If compcount divided by compression remainder equals 0, return True, otherwise return False ret = not (self.compcount % self.p.compression) # If ret equals False, docheckover equals True else: docheckover = True # Return ret, docheckover return ret, docheckover # _eoscheck check, return two True if self._eoscheck(data, exact=True): return True, True # If intraday if self.subdays: # Get data's point and remaining point point, prest = self._gettmpoint(data.datetime.time()) # If remaining point is not 0, return False and True if prest: return False, True # cannot be on boundary, subunits present # Pass through compression to get boundary and rest over boundary # Calculate boundary and remaining boundary bound, brest = divmod(point, self.p.compression) # if no extra and decomp bound is point # If divmod result remainder is 0, return two True return brest == 0 and point == (bound * self.p.compression), True # Code overriden by eoscheck # This code will not run if False and self.p.sessionend: # Days scenario - get datetime to compare in output timezone # because p.sessionend is expected in output timezone bdtime = data.datetime.datetime() bsend = datetime.combine(bdtime.date(), data.p.sessionend) return bdtime == bsend # If none of above reached return, return False, True return False, True # subweeks, not subdays and not sessionend # Calculate adjusted time def _calcadjtime(self, greater=False): if self._nexteos is None: # Session has been exceeded - end of session is the mark return self._lastdteos # utc-like dt = self.data.num2date(self.bar.datetime) # Get current time tm = dt.time() # Get the point of the day in the time frame unit (ex: minute 200) point, _ = self._gettmpoint(tm) # Apply compression to update the point position (comp 5 -> 200 // 5) # point = (point // self.p.compression) point = point // self.p.compression # If rightedge (end of boundary is activated) add it unless recursing point += self.p.rightedge # Restore point to the timeframe units by de-applying compression point *= self.p.compression # Get hours, minutes, seconds and microseconds extradays = 0 if self.p.timeframe == TimeFrame.Minutes: ph, pm = divmod(point, 60) ps = 0 pus = 0 elif self.p.timeframe == TimeFrame.Seconds: ph, pm = divmod(point, 60 * 60) pm, ps = divmod(pm, 60) pus = 0 elif self.p.timeframe <= TimeFrame.MicroSeconds: ph, pm = divmod(point, 60 * 60 * 1e6) pm, psec = divmod(pm, 60 * 1e6) ps, pus = divmod(psec, 1e6) elif self.p.timeframe == TimeFrame.Days: # last resort eost = self._nexteos.time() ph = eost.hour pm = eost.minute ps = eost.second pus = eost.microsecond if ph > 23: # went over midnight: extradays = ph // 24 ph %= 24 # Replace intraday parts with the calculated ones and update it dt = dt.replace(hour=int(ph), minute=int(pm), second=int(ps), microsecond=int(pus)) if extradays: dt += timedelta(days=extradays) dtnum = self.data.date2num(dt) return dtnum # Adjust bar time def _adjusttime(self, greater=False, forcedata=None): """ Adjusts the time of calculated bar (from the underlying data source) by using the timeframe to the appropriate boundary, with compression taken into account Depending on param ``rightedge`` uses the starting boundary or the ending one """ dtnum = self._calcadjtime(greater=greater) if greater and dtnum <= self.bar.datetime: return False self.bar.datetime = dtnum return True # Resample small period data to form large period data
[文档] class Resampler(_BaseResampler): """This class resamples data of a given timeframe to a larger timeframe. Params - Bar2edge (default: True) Resamples using time boundaries as the target.For example, with a "ticks -> 5 seconds" the resulting 5-seconds bars will be aligned to xx:00, xx:05, xx:10 ... # When resampling, use time boundary as target, for example if ticks data wants to resample to 5 seconds, bars will be formed at xx:00, xx:05, xx:10 - Adjbartime (default: True) Use the time at the boundary to adjust the time of the delivered resampled bar instead of the last seen timestamp. If resampling to "5 seconds" the time of the bar will be adjusted, for example, to hh:mm:05 even if the last seen timestamp was hh:mm:04.33 :note:: Time will only be adjusted if "bar2edge" is True. It wouldn't make sense to adjust the time if the bar has not been aligned to a boundary # Adjust the last bar's final time, when bar2edge is True, use the final boundary as the last bar's time - Rightedge (default: True) Use the right edge of the time boundaries to set the time. If False and compressing to 5 seconds, the time of a resampled bar for seconds between hh:mm:00 and hh:mm:04 will be hh:mm:00 (the starting boundary If True, the used boundary for the time will be hh:mm:05 (the ending boundary) # Whether to use the right time boundary, for example if time boundary is hh:mm:00:hh:mm:05, if set to True, will use hh:mm:05 # Set to False, will use hh:mm:00 """ # Parameters params = ( ("bar2edge", True), ("adjbartime", True), ("rightedge", True), ) replaying = False # Called when data no longer produces bars, can be called multiple times, has chance to produce extra bars when must deliver bar
[文档] def last(self, data): """Called when the data is no longer producing bars Can be called multiple times. It has the chance to (for example) produce extra bars which may still be accumulated and have to be delivered """ if self.bar.isopen(): if self.doadjusttime: self._adjusttime() data._add2stack(self.bar.lvalues()) self.bar.bstart(maxdate=True) # close the bar to avoid dups return True return False
# Used when calling resampler
[文档] def __call__(self, data, fromcheck=False, forcedata=None): """Called for each set of values produced by the data source""" consumed = False onedge = False docheckover = True if not fromcheck: if self._latedata(data): if not self.p.takelate: data.backwards() return True # get a new bar self.bar.bupdate(data) # update new or existing bar # push time beyond reference self.bar.datetime = data.datetime[-1] + 0.000001 data.backwards() # remove used bar return True if self.componly: # only if not subdays # Get a session ref before rewinding _, self._lastdteos = self.data._getnexteos() consumed = True else: onedge, docheckover = self._dataonedge(data) # for subdays consumed = onedge if consumed: self.bar.bupdate(data) # update new or existing bar data.backwards() # remove used bar # if self.bar.isopen and (onedge or (docheckover and checkbarover)) cond = self.bar.isopen() if cond: # original is and, the 2nd term must also be true if not onedge: # onedge true is sufficient if docheckover: cond = self._checkbarover(data, fromcheck=fromcheck, forcedata=forcedata) if cond: dodeliver = False if forcedata is not None: # check our delivery time is not larger than that of forcedata tframe = self.p.timeframe if tframe == TimeFrame.Ticks: # Ticks is already the lowest dodeliver = True elif tframe == TimeFrame.Minutes: dtnum = self._calcadjtime(greater=True) dodeliver = dtnum <= forcedata.datetime[0] elif tframe == TimeFrame.Days: dtnum = self._calcadjtime(greater=True) dodeliver = dtnum <= forcedata.datetime[0] else: dodeliver = True if dodeliver: if not onedge and self.doadjusttime: self._adjusttime(greater=True, forcedata=forcedata) data._add2stack(self.bar.lvalues()) self.bar.bstart(maxdate=True) # bar delivered -> restart if not fromcheck: if not consumed: self.bar.bupdate(data) # update new or existing bar data.backwards() # remove used bar return True
# Replayer class
[文档] class Replayer(_BaseResampler): """This class replays data of a given timeframe to a larger timeframe. It simulates the action of the market by slowly building up (for ex.) a daily bar from tick/seconds/minutes data Only when the bar is complete will the "length" of the data be changed effectively delivering a closed bar Params - Bar2edge (default: True) Replays using time boundaries as the target of the closed bar.For example, with a "ticks -> 5 seconds" the resulting 5-second bars will be aligned to xx:00, xx:05, xx:10 ... - Adjbartime (default: False) Use the time at the boundary to adjust the time of the delivered resampled bar instead of the last seen timestamp. If resampling to "5 seconds" the time of the bar will be adjusted, for example, to hh:mm:05 even if the last seen timestamp was hh:mm:04.33 *Note* Time will only be adjusted if "bar2edge" is True. It wouldn't make sense to adjust the time if the bar has not been aligned to a boundary *Note* if this parameter is True, an extra tick with the *adjusted* time will be introduced at the end of the *replayed* bar - Rightedge (default: True) Use the right edge of the time boundaries to set the time. If False and compressing to 5 seconds, the time of a resampled bar for seconds between hh:mm:00 and hh:mm:04 will be hh:mm:00 (the starting boundary If True, the used boundary for the time will be hh:mm:05 (the ending boundary) """ params = ( ("bar2edge", True), ("adjbartime", False), ("rightedge", True), ) replaying = True # Run when calling class
[文档] def __call__(self, data, fromcheck=False, forcedata=None): """Process the data for replaying. Manages bar replaying with session information and time alignment. Args: data: The data source to replay. fromcheck: Whether this is being called from a periodic check. forcedata: Optional data source to force timing from. Returns: bool: True if a new bar was generated, False otherwise. """ # Consume consumed = False # At bar generation time point onedge = False # Late-arriving data takinglate = False # Whether to check bar end docheckover = True # If fromcheck is False if not fromcheck: # Call _latedata to see how to handle late data, if returns True if self._latedata(data): # If takelate is False, generate a new bar if not self.p.takelate: data.backwards(force=True) return True # get a new bar # Set these two parameters consumed = True takinglate = True # If not intraday elif self.componly: # only if not subdays consumed = True else: # Call _dataonedge to determine if at bar generation time and if bar is over onedge, docheckover = self._dataonedge(data) # for subdays consumed = onedge data._tick_fill(force=True) # update # If consumed is True, update data, if takinglate is True, set a new time for bar if consumed: self.bar.bupdate(data) if takinglate: self.bar.datetime = data.datetime[-1] + 0.000001 # if onedge or (checkbarover and self._checkbarover) cond = onedge # If currently not at bar generation time point, if check is needed, need to check if bar is over if not cond: # original is or, if true, it would suffice if docheckover: cond = self._checkbarover(data, fromcheck=fromcheck) # If check result returns True if cond: # If not exactly at bar generation time and need to adjust time if not onedge and self.doadjusttime: # insert tick with adjtime adjusted = self._adjusttime(greater=True) # If adjustment is needed, adjust time and update bar if adjusted: ago = 0 if (consumed or fromcheck) else -1 # Update to the point right before the new data data._updatebar(self.bar.lvalues(), forward=False, ago=ago) # If no check needed if not fromcheck: # If not in consume mode, use _save2stack to save data if not consumed: # Reopen bar with real new data and save data to queue self.bar.bupdate(data, reopen=True) # erase is True, but the tick will not be seen below # and therefore no need to mark as 1st data._save2stack(erase=True, force=True) # If in consume mode, data starts, next bar is first bar else: self.bar.bstart(maxdate=True) self._firstbar = True # next is first # If check is needed else: # from check # fromcheck or consumed have forced delivery, reopen self.bar.bstart(maxdate=True) self._firstbar = True # next is first if adjusted: # after adjusting need to redeliver if this was a check data._save2stack(erase=True, force=True) # If no check needed elif not fromcheck: if not consumed: # Data already "forwarded" and we replay to new bar # No need to go backwards.reopen the internal cache self.bar.bupdate(data, reopen=True) else: # compression only, used data to update bar, hence remove # from stream, update existing data, reopen bar if not self._firstbar: # only discard data if not firstbar data.backwards(force=True) data._updatebar(self.bar.lvalues(), forward=False, ago=0) self.bar.bstart(maxdate=True) self._firstbar = True # make sure the next tick moves forward # If no check needed elif not fromcheck: # not over, update, remove new entry, deliver if not consumed: self.bar.bupdate(data) if not self._firstbar: # only discard data if not firstbar data.backwards(force=True) data._updatebar(self.bar.lvalues(), forward=False, ago=0) self._firstbar = False return False # the system can process the existing bar
[文档] class ResamplerTicks(Resampler): """Resampler for tick-level data.""" params = (("timeframe", TimeFrame.Ticks),)
[文档] class ResamplerSeconds(Resampler): """Resampler for seconds-level data.""" params = (("timeframe", TimeFrame.Seconds),)
[文档] class ResamplerMinutes(Resampler): """Resampler for minute-level data.""" params = (("timeframe", TimeFrame.Minutes),)
[文档] class ResamplerDaily(Resampler): """Resampler for daily data.""" params = (("timeframe", TimeFrame.Days),)
[文档] class ResamplerWeekly(Resampler): """Resampler for weekly data.""" params = (("timeframe", TimeFrame.Weeks),)
[文档] class ResamplerMonthly(Resampler): """Resampler for monthly data.""" params = (("timeframe", TimeFrame.Months),)
[文档] class ResamplerYearly(Resampler): """Resampler for yearly data.""" params = (("timeframe", TimeFrame.Years),)
[文档] class ReplayerTicks(Replayer): """Replayer for tick-level data.""" params = (("timeframe", TimeFrame.Ticks),)
[文档] class ReplayerSeconds(Replayer): """Replayer for seconds-level data.""" params = (("timeframe", TimeFrame.Seconds),)
[文档] class ReplayerMinutes(Replayer): """Replayer for minute-level data.""" params = (("timeframe", TimeFrame.Minutes),)
[文档] class ReplayerDaily(Replayer): """Replayer for daily data.""" params = (("timeframe", TimeFrame.Days),)
[文档] class ReplayerWeekly(Replayer): """Replayer for weekly data.""" params = (("timeframe", TimeFrame.Weeks),)
[文档] class ReplayerMonthly(Replayer): """Replayer for monthly data.""" params = (("timeframe", TimeFrame.Months),)