Source code for tide.influx

import time
import datetime as dt
from urllib3.exceptions import ReadTimeoutError

import pandas as pd
from influxdb_client import InfluxDBClient

from tide.utils import check_and_return_dt_index_df, date_objects_tostring


def _single_influx_request(
    start: str | pd.Timestamp | dt.datetime,
    stop: str | pd.Timestamp | dt.datetime,
    bucket: str,
    measurement: str,
    tide_tags: list[str],
    url: str,
    org: str,
    token: str,
    tz_info: str = "UTC",
    timeout: int = 10000,
) -> pd.DataFrame:
    client = InfluxDBClient(url=url, org=org, token=token, timeout=timeout)
    query_api = client.query_api()
    query = f"""
        from(bucket: "{bucket}")
        |> range(start: {date_objects_tostring(start, tz_info)}, 
                 stop: {date_objects_tostring(stop, tz_info)})
        |> filter(fn: (r) => r["_measurement"] == "{measurement}")
        |> map(fn: (r) => ({{r with tide: r.{tide_tags[0]}
    """
    if len(tide_tags) > 1:
        for tag in tide_tags[1:]:
            query += f' + "__" + r.{tag}'
    query += "}))"
    query += """
        |> keep(columns: ["_time", "_value", "tide"])
        |> pivot(rowKey: ["_time"], columnKey: ["tide"], valueColumn: "_value")
        |> sort(columns: ["_time"])
        """

    tables = query_api.query(query)

    records = []
    for table in tables:
        for record in table.records:
            records.append(record.values)

    df = pd.DataFrame(records)
    if not df.empty:
        df["_time"] = pd.to_datetime(df["_time"])
        df.set_index("_time", inplace=True)
        df.drop(["result", "table"], axis=1, inplace=True)
    return df


[docs] def get_influx_data( start: str | pd.Timestamp | dt.datetime, stop: str | pd.Timestamp | dt.datetime, bucket: str, measurement: str, tide_tags: list[str], url: str, org: str, token: str, split_td: str | dt.timedelta | pd.Timedelta = None, tz_info: str = "UTC", request_timeout: int = 10000, max_retry: int = 5, waited_seconds_at_retry: int = 5, verbose: bool = False, ) -> pd.DataFrame: """Fetch time series data from an InfluxDB instance. This function retrieves data from InfluxDB and formats it according to Tide's hierarchical column naming convention. It supports: - Flexible time range specification - Automatic query splitting for large time ranges - Retry mechanism for handling timeouts - Timezone-aware data handling Parameters ---------- start : str or pd.Timestamp or datetime.datetime Start time for the query. Can be: - A relative time string (e.g., "-1d", "-2h") - A pandas Timestamp - A datetime object If using relative time strings, they are interpreted relative to the current time. stop : str or pd.Timestamp or datetime.datetime End time for the query. Accepts the same formats as start. bucket : str Name of the InfluxDB bucket to query. measurement : str Name of the InfluxDB measurement to filter data. tide_tags : list[str] List of InfluxDB fields/tags to combine into Tide column names. Must be specified in order: [name, unit, bloc, sub_bloc]. Example: ["name", "unit", "location", "room"] will create columns like "temperature__°C__zone1__room1" url : str URL of the InfluxDB instance (e.g., "http://localhost:8086") org : str InfluxDB organization name token : str Authentication token for InfluxDB access split_td : str or datetime.timedelta or pd.Timedelta, optional Time interval for splitting large queries into smaller chunks. Useful for handling large time ranges or rate limits. Example: "1d" for daily chunks, "12h" for half-day chunks. If None, queries the entire time range at once. tz_info : str, default "UTC" Timezone for interpreting start and stop times. Must be a valid timezone name from the IANA Time Zone Database. request_timeout : int, default 10 000 Number of ms to wait before the request timeout max_retry : int, default 5 Maximum number of retry attempts for failed queries. Only applies to ReadTimeoutError exceptions. waited_seconds_at_retry : int, default 5 Number of seconds to wait between retry attempts. verbose : bool, default False Whether to print progress information during data fetching. Returns ------- pd.DataFrame DataFrame containing the fetched data with: - Datetime index in UTC - Columns named according to Tide's convention (name__unit__bloc__sub_bloc) - Values from the InfluxDB _value field Raises ------ ReadTimeoutError If all retry attempts fail to fetch data ValueError If tz_info is required but not provided for naive datetime objects Examples -------- >>> from tide.influx import get_influx_data >>> import pandas as pd >>> # Fetch last 24 hours of data >>> df = get_influx_data( ... start="-24h", ... stop="now", ... bucket="my_bucket", ... measurement="sensors", ... tide_tags=["name", "unit", "location"], ... url="http://localhost:8086", ... org="my_org", ... token="my_token", ... ) >>> # Fetch specific time range with daily splitting >>> df = get_influx_data( ... start="2023-01-01", ... stop="2023-01-07", ... bucket="my_bucket", ... measurement="sensors", ... tide_tags=["name", "unit", "location", "room"], ... url="http://localhost:8086", ... org="my_org", ... token="my_token", ... split_td="1d", ... verbose=True, ... ) >>> # Fetch data with custom timezone >>> df = get_influx_data( ... start="2023-01-01T00:00:00", ... stop="2023-01-01T23:59:59", ... bucket="my_bucket", ... measurement="sensors", ... tide_tags=["name", "unit", "location"], ... url="http://localhost:8086", ... org="my_org", ... token="my_token", ... tz_info="Europe/Paris", ... ) """ if isinstance(start, str) and isinstance(stop, str): start = dt.datetime.now() + pd.Timedelta(start) stop = dt.datetime.now() + pd.Timedelta(stop) if split_td is not None: dates_index = pd.date_range(start, stop, freq=split_td) else: dates_index = pd.Index([start, stop]) df_list = [] for i in range(len(dates_index) - 1): if verbose: print(f"Getting period {i + 1} / {len(dates_index) - 1}") for attempt in range(max_retry): try: df_list.append( _single_influx_request( start=dates_index[i], stop=dates_index[i + 1], bucket=bucket, measurement=measurement, tide_tags=tide_tags, url=url, org=org, token=token, tz_info=tz_info, timeout=request_timeout, ) ) break except ReadTimeoutError: if attempt < max_retry - 1: if verbose: print(f"Attempt {attempt + 1} failed") time.sleep(waited_seconds_at_retry) else: if verbose: print("Max retries reached. Unable to get data.") raise return df_list[0] if len(df_list) == 1 else pd.concat(df_list)
[docs] def push_influx_data( data: pd.DataFrame, tide_tags: list[str], bucket: str, url: str, org: str, token: str, measurement: str = "tide", ): """ Pushes data from a pandas DataFrame to an InfluxDB bucket. This function processes a DataFrame indexed by datetime and writes the data to an InfluxDB bucket. Each row in the DataFrame is expanded based on Tide tags extracted from a specific column and written to InfluxDB with corresponding timestamp and tag values. Parameters: data (pd.DataFrame): Input DataFrame with a datetime index and one or more columns of values. tide_tags (list[str]): List of tag names to extract from the "full_index" column after splitting it. For exemple : ["Name", "Unit", "bloc", "sub_bloc" bucket (str): InfluxDB bucket name where the data will be written. url (str): URL of the InfluxDB instance. org (str): InfluxDB organization name. token (str): Authentication token for the InfluxDB instance. measurement (str, optional): Name of the measurement to use in InfluxDB. Defaults to "tide". Raises: ValueError: If the input `data` is not a DataFrame with a datetime index. Example: >>> data = pd.DataFrame( { "name1__°C__bloc1": [1.0, 2.0], "name2__W__bloc1": [3.0, 4.0], }, index=pd.to_datetime(["2009-01-01T00:00:00Z", "2009-01-01T01:00:00Z"]), ) >>> push_influx_data( data=data, tide_tags=['Name', 'Unit', "bloc"], bucket='my-bucket', url='http://localhost:8086', org='my-org', token='my-token' ) """ data = check_and_return_dt_index_df(data) influx_df_list = [] for t, row in data.iterrows(): df = row.reset_index() df.columns = ["full_index", "_value"] df[tide_tags] = df["full_index"].str.split("__", expand=True) df = df.drop("full_index", axis=1) df.index = pd.to_datetime([t] * df.shape[0]) influx_df_list.append(df) influx_df = pd.concat(influx_df_list).dropna() with InfluxDBClient(url=url, token=token, org=org) as client: with client.write_api() as write_client: write_client.write( bucket=bucket, record=influx_df, data_frame_measurement_name=measurement, data_frame_tag_columns=tide_tags, )