#!/usr/bin/env python
"""InfluxDB Data Feed Module - InfluxDB time series data.
This module provides the InfluxDB data feed for reading market data
from InfluxDB time series database.
Classes:
InfluxDB: InfluxDB data feed.
Example:
>>> data = bt.feeds.InfluxDB(
... host='localhost',
... port=8086,
... database='market_data'
... )
>>> cerebro.adddata(data)
"""
import datetime as dt
from ..dataseries import TimeFrame
from ..feed import DataBase
from ..utils import date2num
try:
from influxdb import InfluxDBClient as idbclient
from influxdb.exceptions import InfluxDBClientError
except Exception: # pragma: no cover - optional dependency, handled at runtime
idbclient = None
[文档]
class InfluxDBClientError(Exception):
"""Exception raised for InfluxDB client errors."""
pass
# Time period mapping
TIMEFRAMES = dict(
(
(TimeFrame.Seconds, "s"),
(TimeFrame.Minutes, "m"),
(TimeFrame.Days, "d"),
(TimeFrame.Weeks, "w"),
(TimeFrame.Months, "m"),
(TimeFrame.Years, "y"),
)
)
# backtrader fetches data from InfluxDB
[文档]
class InfluxDB(DataBase):
"""InfluxDB data feed for time series market data.
Fetches OHLCV data from an InfluxDB time series database.
"""
# Import packages
frompackages = (
("influxdb", [("InfluxDBClient", "idbclient")]),
("influxdb.exceptions", "InfluxDBClientError"),
)
# Parameters
params = (
("host", "127.0.0.1"),
("port", "8086"),
("username", None),
("password", None),
("database", None),
("timeframe", TimeFrame.Days),
("startdate", None),
("high", "high_p"),
("low", "low_p"),
("open", "open_p"),
("close", "close_p"),
("volume", "volume"),
("ointerest", "oi"),
)
def __init__(self):
"""Initialize the InfluxDB data feed."""
self.biter = None
self.ndb = None
[文档]
def start(self):
"""Start the InfluxDB data feed.
Connects to InfluxDB and executes query to fetch data.
"""
super().start()
# Try to connect to database
try:
self.ndb = idbclient(
self.p.host, self.p.port, self.p.username, self.p.password, self.p.database
)
except InfluxDBClientError as err:
print("Failed to establish connection to InfluxDB: %s" % err)
# Specific time period
tf = "{multiple}{timeframe}".format(
multiple=(self.p.compression if self.p.compression else 1),
timeframe=TIMEFRAMES.get(self.p.timeframe, "d"),
)
# Start time
if not self.p.startdate:
st = "<= now()"
else:
st = ">= '%s'" % self.p.startdate
# The query could already consider parameters like fromdate and todate
# to have the database skip them and not the internal code
# Specific commands needed for database data retrieval
qstr = (
'SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", '
'mean("{low_f}") AS "low", mean("{close_f}") AS "close", '
'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" '
'FROM "{dataname}" '
"WHERE time {begin} "
"GROUP BY time({timeframe}) fill(none)"
).format(
open_f=self.p.open,
high_f=self.p.high,
low_f=self.p.low,
close_f=self.p.close,
vol_f=self.p.volume,
oi_f=self.p.ointerest,
timeframe=tf,
begin=st,
dataname=self.p.dataname,
)
# Get data
try:
dbars = list(self.ndb.query(qstr).get_points())
except InfluxDBClientError as err:
print("InfluxDB query failed: %s" % err)
# Iterate data
self.biter = iter(dbars)
def _load(self):
# Try to get next bar data, then add to line
try:
bar = next(self.biter)
except StopIteration:
return False
self.l.datetime[0] = date2num(dt.datetime.strptime(bar["time"], "%Y-%m-%dT%H:%M:%SZ"))
self.l.open[0] = bar["open"]
self.l.high[0] = bar["high"]
self.l.low[0] = bar["low"]
self.l.close[0] = bar["close"]
self.l.volume[0] = bar["volume"]
return True