Source code for tide.processing

from collections import defaultdict

import pandas as pd
import numpy as np
import datetime as dt
import warnings
from functools import partial
from collections.abc import Callable

from sklearn.utils.validation import check_is_fitted
from scipy.ndimage import gaussian_filter1d
from scipy.signal import detrend

from tide.base import BaseProcessing, BaseFiller, BaseOikoMeteo
from tide.math import time_gradient, time_integrate
from tide.utils import (
    get_data_blocks,
    get_outer_timestamps,
    check_and_return_dt_index_df,
    tide_request,
    ensure_list,
)
from tide.regressors import SkSTLForecast, SkProphet
from tide.classifiers import STLEDetector
from tide.meteo import sun_position, beam_component, sky_diffuse, ground_diffuse
from tide.utils import get_tags_max_level

FUNCTION_MAP = {
    "mean": np.mean,
    "average": np.average,
    "sum": np.sum,
    "dot": np.dot,
    "time_integrate": time_integrate,
}

MODEL_MAP = {"STL": SkSTLForecast, "Prophet": SkProphet}

OIKOLAB_DEFAULT_MAP = {
    "temperature": "t_ext__°C__outdoor__meteo",
    "dewpoint_temperature": "t_dp__°C__outdoor__meteo",
    "mean_sea_level_pressure": "pressure__Pa__outdoor__meteo",
    "wind_speed": "wind_speed__m/s__outdoor__meteo",
    "100m_wind_speed": "100m_wind_speed__m/s__outdoor__meteo",
    "relative_humidity": "rh__0-1RH__outdoor__meteo",
    "surface_solar_radiation": "gho__w/m²__outdoor__meteo",
    "direct_normal_solar_radiation": "dni__w/m²__outdoor__meteo",
    "surface_diffuse_solar_radiation": "dhi__w/m²__outdoor__meteo",
    "surface_thermal_radiation": "thermal_radiation__w/m²__outdoor__meteo",
    "total_cloud_cover": "total_cloud_cover__0-1cover__outdoor__meteo",
    "total_precipitation": "total_precipitation__mm__outdoor__meteo",
}

AUTO_BLOC = "%auto_bloc%"
AUTO_SUB_BLOC = "%auto_sub_bloc%"
DEFAULT_UNIT = "DIMENSIONLESS"


def _parse_result_name_template(
    result_column_name: str, n_tag_levels: int
) -> list[str]:
    """Parse the result column name template and pad to n_tag_levels + 1 parts.

    The template follows the TIDE convention: name[__unit[__bloc[__sub_bloc]]].
    Missing parts are filled with defaults:
      - unit  → DIMENSIONLESS
      - bloc  → %auto_bloc%
      - sub_bloc → %auto_sub_bloc%

    Parameters
    ----------
    result_column_name : str
        Template string such as "loss__J__%auto_bloc%__room_1".
    n_tag_levels : int
        Maximum number of tag levels (beyond name) present in the source DataFrame.
        Determines how many parts the output column name should have.

    Returns
    -------
    list[str]
        Parts list of length n_tag_levels + 1, e.g. ["loss", "J", "building_1", "room_1"].
    """
    defaults = [DEFAULT_UNIT, AUTO_BLOC, AUTO_SUB_BLOC]
    parts = result_column_name.split("__")

    # Pad up to n_tag_levels + 1 parts (name + up to 3 tags)
    max_parts = n_tag_levels + 1
    while len(parts) < max_parts:
        parts.append(defaults[len(parts) - 1])

    return parts[:max_parts]


def _extract_bloc_info(column_name: str) -> tuple[str | None, str | None]:
    """Extract bloc and sub_bloc from a TIDE column name.

    Parameters
    ----------
    column_name : str
        A column name like "Tin__°C__building_1__room_1".

    Returns
    -------
    tuple[str | None, str | None]
        (bloc, sub_bloc), either of which may be None if not present.
    """
    parts = column_name.split("__")
    bloc = parts[2] if len(parts) > 2 else None
    sub_bloc = parts[3] if len(parts) > 3 else None
    return bloc, sub_bloc


def _infer_blocs_for_group(group: dict[str, str]) -> tuple[str | None, str | None]:
    """Infer the dominant bloc/sub_bloc for a group of variable→column mappings.

    Uses the first non-None value seen across all columns in the group,
    preserving insertion order (priority to first variable declared).

    Parameters
    ----------
    group : dict[str, str]
        Mapping of variable name → actual column name for one group.

    Returns
    -------
    tuple[str | None, str | None]
        (bloc, sub_bloc) inferred for this group.
    """
    inferred_bloc, inferred_sub_bloc = None, None
    for col in group.values():
        bloc, sub_bloc = _extract_bloc_info(col)
        if inferred_bloc is None and bloc is not None:
            inferred_bloc = bloc
        if inferred_sub_bloc is None and sub_bloc is not None:
            inferred_sub_bloc = sub_bloc
        if inferred_bloc and inferred_sub_bloc:
            break
    return inferred_bloc, inferred_sub_bloc


def _build_result_column_name(
    template_parts: list[str],
    inferred_bloc: str | None,
    inferred_sub_bloc: str | None,
) -> str:
    """Resolve auto-placeholders in the template and return the final column name.

    Parameters
    ----------
    template_parts : list[str]
        Parts from _parse_result_name_template, may contain %auto_bloc% / %auto_sub_bloc%.
    inferred_bloc : str | None
        Bloc inferred from the source columns for this group.
    inferred_sub_bloc : str | None
        Sub-bloc inferred from the source columns for this group.

    Returns
    -------
    str
        Final column name, e.g. "loss__J__building_1__room_1".
    """
    resolved = []
    for i, part in enumerate(template_parts):
        if part == AUTO_BLOC:
            resolved.append(inferred_bloc or AUTO_BLOC)
        elif part == AUTO_SUB_BLOC:
            resolved.append(inferred_sub_bloc or AUTO_SUB_BLOC)
        else:
            resolved.append(part)
    return "__".join(resolved)


[docs] class Identity(BaseProcessing): """A transformer that returns input data unchanged. Parameters ---------- None Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns (same as input). Methods ------- fit(X, y=None) No-op, returns self. transform(X) Returns input unchanged. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({"temp__°C": [20, 21, 22], "humid__%": [45, 50, 55]}) >>> identity = Identity() >>> result = identity.fit_transform(df) >>> assert (result == df).all().all() # Data unchanged >>> assert list(result.columns) == list(df.columns) # Column order preserved Returns ------- pd.DataFrame The input data without any modifications. """
[docs] def __init__(self): super().__init__()
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): return X
[docs] class ReplaceDuplicated(BaseProcessing): """A transformer that replaces duplicated values in each column with a specified value. This transformer identifies and replaces duplicated values in each column of a pandas DataFrame, keeping either the first, last, or no occurrence of duplicated values. Parameters ---------- keep : str, default 'first' Specify which of the duplicated (if any) value to keep. Allowed arguments : 'first', 'last', False. - 'first': Keep first occurrence of duplicated values - 'last': Keep last occurrence of duplicated values - False: Keep no occurrence (replace all duplicates) value : float, default np.nan Value used to replace the non-kept duplicated values. Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns (same as input). Examples -------- >>> import pandas as pd >>> import numpy as np >>> from datetime import datetime, timezone >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... {"temp__°C": [20, 20, 22, 22, 23], "humid__%": [45, 45, 50, 50, 55]}, ... index=dates, ... ) >>> # Keep first occurrence of duplicates >>> replacer = ReplaceDuplicated(keep="first", value=np.nan) >>> result = replacer.fit_transform(df) >>> print(result) temp__°C humid__% 2024-01-01 00:00:00+00:00 20.0 45.0 2024-01-01 00:01:00+00:00 NaN NaN 2024-01-01 00:02:00+00:00 22.0 50.0 2024-01-01 00:03:00+00:00 NaN NaN 2024-01-01 00:04:00+00:00 23.0 55.0 Returns ------- pd.DataFrame The DataFrame with duplicated values replaced according to the specified strategy. The output maintains the same DateTimeIndex as the input. """
[docs] def __init__(self, keep="first", value=np.nan): super().__init__() self.keep = keep self.value = value
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): for col in X.columns: X.loc[X[col].duplicated(keep=self.keep), col] = self.value return X
[docs] class Dropna(BaseProcessing): """A transformer that removes rows containing missing values from a DataFrame. This transformer removes rows from a DataFrame based on the presence of missing values (NaN) according to the specified strategy. Parameters ---------- how : str, default 'all' How to drop missing values in the data: - 'all': Drop row if all values are missing - 'any': Drop row if any value is missing - int: Drop row if at least this many values are missing Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns (same as input). Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "temp__°C": [20, np.nan, 22, np.nan, np.nan], ... "humid__%": [45, 50, np.nan, np.nan, np.nan], ... }, ... index=dates, ... ) >>> # Drop rows where all values are missing >>> dropper = Dropna(how="all") >>> result = dropper.fit_transform(df) >>> print(result) temp__°C humid__% 2024-01-01 00:00:00+00:00 20.0 45.0 2024-01-01 00:01:00+00:00 NaN 50.0 2024-01-01 00:02:00+00:00 22.0 NaN >>> # Drop rows with any missing value >>> dropper_strict = Dropna(how="any") >>> result_strict = dropper_strict.fit_transform(df) >>> print(result_strict) temp__°C humid__% 2024-01-01 00:00:00+00:00 20.0 45.0 Returns ------- pd.DataFrame The DataFrame with rows containing missing values removed according to the specified strategy. The output maintains the same DateTimeIndex structure as the input, with rows removed. """
[docs] def __init__(self, how="all"): super().__init__() self.how = how
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): return X.dropna(how=self.how)
[docs] class RenameColumns(BaseProcessing): """A transformer that renames columns in a DataFrame. This transformer allows renaming DataFrame columns either by providing a list of new names in the same order as the current columns, or by providing a dictionary mapping old names to new names. Parameters ---------- new_names : list[str] | dict[str, str] New names for the columns. Can be specified in two ways: - list[str]: List of new names in the same order as current columns. Must have the same length as the number of columns. - dict[str, str]: Dictionary mapping old column names to new names. Keys must be existing column names, values are the new names. Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns after renaming. Examples -------- >>> import pandas as pd >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:02:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... {"temp__°C": [20, 21, 22], "humid__%": [45, 50, 55]}, index=dates ... ) >>> # Rename using a list (maintains order) >>> renamer_list = RenameColumns(["temperature__°C", "humidity__%"]) >>> result_list = renamer_list.fit_transform(df) >>> print(result_list) temperature__°C humidity__% 2024-01-01 00:00:00+00:00 20.0 45.0 2024-01-01 00:01:00+00:00 21.0 50.0 2024-01-01 00:02:00+00:00 22.0 55.0 >>> # Rename using a dictionary (selective renaming) >>> renamer_dict = RenameColumns({"temp__°C": "temperature__°C"}) >>> result_dict = renamer_dict.fit_transform(df) >>> print(result_dict) temperature__°C humid__% 2024-01-01 00:00:00+00:00 20.0 45.0 2024-01-01 00:01:00+00:00 21.0 50.0 2024-01-01 00:02:00+00:00 22.0 55.0 Returns ------- pd.DataFrame The DataFrame with renamed columns. """
[docs] def __init__(self, new_names: list[str] | dict[str, str]): super().__init__() self.new_names = new_names
def _fit_implementation(self, X, y=None): if isinstance(self.new_names, list): if len(self.new_names) != len(X.columns): raise ValueError( "Length of new_names list must match the number " "of columns in the DataFrame." ) self.feature_names_out_ = self.new_names elif isinstance(self.new_names, dict): self.feature_names_out_ = list(X.rename(columns=self.new_names)) def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_", "feature_names_out_"]) if isinstance(self.new_names, list): X.columns = self.new_names elif isinstance(self.new_names, dict): X.rename(columns=self.new_names, inplace=True) return X
[docs] class SkTransform(BaseProcessing): """A transformer that applies scikit-learn transformers to a pandas DataFrame. This transformer wraps any scikit-learn transformer and applies it to a pandas DataFrame while preserving the DataFrame's index and column structure. It is particularly useful when you want to use scikit-learn's preprocessing tools (like StandardScaler, MinMaxScaler, etc.) while maintaining the time series nature of your data. Parameters ---------- transformer : object A scikit-learn transformer to apply on the data. Must implement fit(), transform(), and optionally inverse_transform() methods. Attributes ---------- transformer_ : object The fitted scikit-learn transformer. feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns (same as input). Examples -------- >>> import pandas as pd >>> from sklearn.preprocessing import StandardScaler >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:02:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... {"temp__°C": [20, 21, 22], "humid__%": [45, 50, 55]}, index=dates ... ) >>> # Apply StandardScaler while preserving DataFrame structure >>> sk_transform = SkTransform(StandardScaler()) >>> result = sk_transform.fit_transform(df) >>> print(result) temp__°C humid__% 2024-01-01 00:00:00+00:00 -1.0 -1.0 2024-01-01 00:01:00+00:00 0.0 0.0 2024-01-01 00:02:00+00:00 1.0 1.0 >>> # Inverse transform to get back original values >>> original = sk_transform.inverse_transform(result) >>> print(original) temp__°C humid__% 2024-01-01 00:00:00+00:00 20.0 45.0 2024-01-01 00:01:00+00:00 21.0 50.0 2024-01-01 00:02:00+00:00 22.0 55.0 Returns ------- pd.DataFrame The transformed DataFrame with the same index and column structure as the input. The values are transformed according to the specified scikit-learn transformer. """
[docs] def __init__(self, transformer): super().__init__() self.transformer = transformer
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.transformer.fit(X) return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_"]) return pd.DataFrame( data=self.transformer.transform(X), index=X.index, columns=X.columns )
[docs] def inverse_transform(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_"]) X = check_and_return_dt_index_df(X) return pd.DataFrame( data=self.transformer.inverse_transform(X), index=X.index, columns=X.columns )
[docs] class ReplaceThreshold(BaseProcessing): """A transformer that replaces values in a DataFrame based on threshold values. This transformer replaces values in a DataFrame that fall outside specified upper and lower thresholds with a given replacement value. It is useful for handling outliers or extreme values in time series data. Parameters ---------- upper : float, optional (default=None) The upper threshold value. Values greater than this threshold will be replaced with the specified value. lower : float, optional (default=None) The lower threshold value. Values less than this threshold will be replaced with the specified value. value : float, optional (default=np.nan) The value to use for replacing values that fall outside the thresholds. Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns (same as input). Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... {"temp__°C": [20, 25, 30, 35, 40], "humid__%": [45, 50, 55, 60, 65]}, ... index=dates, ... ) >>> # Replace values outside thresholds with NaN >>> replacer = ReplaceThreshold(upper=35, lower=20, value=np.nan) >>> result = replacer.fit_transform(df) >>> print(result) temp__°C humid__% 2024-01-01 00:00:00+00:00 20.0 NaN 2024-01-01 00:01:00+00:00 25.0 NaN 2024-01-01 00:02:00+00:00 30.0 NaN 2024-01-01 00:03:00+00:00 NaN NaN 2024-01-01 00:04:00+00:00 NaN NaN Returns ------- pd.DataFrame The DataFrame with values outside the specified thresholds replaced with the given value. The output maintains the same DateTimeIndex and column structure as the input. """
[docs] def __init__(self, upper=None, lower=None, value=np.nan): super().__init__() self.lower = lower self.upper = upper self.value = value
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): pass def _transform_implementation(self, X: pd.Series | pd.DataFrame): if self.lower is not None: lower_mask = X < self.lower else: lower_mask = pd.DataFrame( np.full(X.shape, False), index=X.index, columns=X.columns ) if self.upper is not None: upper_mask = X > self.upper else: upper_mask = pd.DataFrame( np.full(X.shape, False), index=X.index, columns=X.columns ) X[np.logical_or(lower_mask, upper_mask)] = self.value return X
[docs] class DropTimeGradient(BaseProcessing): """ A transformer that removes values in a DataFrame based on the time gradient. The time gradient is calculated as the difference of consecutive values in the time series divided by the time delta between each value (in seconds). If the gradient is below the `lower_rate` or above the `upper_rate`, then the value is set to NaN. Parameters ---------- dropna : bool, default=True Whether to remove NaN values from the DataFrame before processing. upper_rate : float, optional The upper rate threshold in units of value/second. If the gradient is greater than or equal to this value, the value will be set to NaN. Example: For a temperature change of 5°C per minute, set upper_rate=5/60 ≈ 0.083 lower_rate : float, optional The lower rate threshold in units of value/second. If the gradient is less than or equal to this value, the value will be set to NaN. Example: For a pressure change of 100 Pa per minute, set lower_rate=100/60 ≈ 1.67 Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns (same as input). Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "temp__°C": [20, 25, 30, 35, 40], # Steady increase of 5°C/min ... "humid__%": [45, 45, 45, 45, 45], # Constant ... "press__Pa": [1000, 1000, 900, 1000, 1000], # Sudden change ... }, ... index=dates, ... ) >>> # Remove values with gradients outside thresholds >>> # For temperature: 5°C/min = 5/60 ≈ 0.083°C/s >>> # For pressure: 100 Pa/min = 100/60 ≈ 1.67 Pa/s >>> dropper = DropTimeGradient(upper_rate=0.083, lower_rate=0.001) >>> result = dropper.fit_transform(df) >>> print(result) temp__°C humid__% press__Pa 2024-01-01 00:00:00+00:00 20.0 45.0 1000.0 2024-01-01 00:01:00+00:00 25.0 NaN 1000.0 2024-01-01 00:02:00+00:00 30.0 NaN NaN 2024-01-01 00:03:00+00:00 35.0 NaN 1000.0 2024-01-01 00:04:00+00:00 40.0 45.0 1000.0 Notes ----- - The gradient is calculated as (value2 - value1) / (time2 - time1 in seconds) - For the upper_rate threshold, both the current and next gradient must exceed the threshold for a value to be removed - For the lower_rate threshold, only the current gradient needs to be below the threshold for a value to be removed - NaN values are handled according to the dropna parameter: - If True (default): NaN values are removed before processing - If False: NaN values are kept and may affect gradient calculations - The rate parameters (upper_rate and lower_rate) must be specified in units of value/second. To convert from per-minute rates, divide by 60. Returns ------- pd.DataFrame The DataFrame with values removed based on their time gradients. The output maintains the same DateTimeIndex and column structure as the input. """
[docs] def __init__(self, dropna=True, upper_rate=None, lower_rate=None): super().__init__() self.dropna = dropna self.upper_rate = upper_rate self.lower_rate = lower_rate
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): pass def _transform_implementation(self, X: pd.Series | pd.DataFrame): X_transformed = [] for column in X.columns: X_column = X[column] if self.dropna: original_index = X_column.index.copy() X_column = X_column.dropna() time_delta = X_column.index.to_series().diff().dt.total_seconds() abs_der = abs(X_column.diff().divide(time_delta, axis=0)) abs_der_two = abs(X_column.diff(periods=2).divide(time_delta, axis=0)) if self.upper_rate is not None: mask_der = abs_der >= self.upper_rate mask_der_two = abs_der_two >= self.upper_rate else: mask_der = pd.Series( np.full(X_column.shape, False), index=X_column.index, name=X_column.name, ) mask_der_two = mask_der if self.lower_rate is not None: mask_constant = abs_der <= self.lower_rate else: mask_constant = pd.Series( np.full(X_column.shape, False), index=X_column.index, name=X_column.name, ) mask_to_remove = np.logical_and(mask_der, mask_der_two) mask_to_remove = np.logical_or(mask_to_remove, mask_constant) X_column[mask_to_remove] = np.nan if self.dropna: X_column = X_column.reindex(original_index) X_transformed.append(X_column) return pd.concat(X_transformed, axis=1)
[docs] class ApplyExpression(BaseProcessing): """A transformer that applies a mathematical expression to a pandas DataFrame. This transformer allows you to apply any valid Python mathematical expression to a pandas DataFrame. The expression is evaluated using pandas' `eval` function, which provides efficient evaluation of mathematical expressions. Parameters ---------- expression : str A string representing a valid Python mathematical expression. The expression can use the input DataFrame `X` as a variable. Common operations include: - Basic arithmetic: +, -, *, /, **, % - Comparison: >, <, >=, <=, ==, != - Boolean operations: &, |, ~ - Mathematical functions: abs(), sqrt(), pow(), etc. Example: "X * 2" or "X / 1000" or "X ** 2" new_unit : str, optional (default=None) The new unit to apply to the column names after transformation. If provided, the transformer will update the unit part of the column names (the part after the second "__" in the Tide naming convention). Example: If input columns are "power__W__building" and new_unit="kW", output columns will be "power__kW__building". Examples -------- >>> import pandas as pd >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:02:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "power__W__building": [1000, 2000, 3000], ... }, ... index=dates, ... ) >>> # Convert power from W to kW >>> transformer = ApplyExpression("X / 1000", "kW") >>> result = transformer.fit_transform(df) >>> print(result) power__kW__building 2024-01-01 00:00:00+00:00 1.0 2024-01-01 00:01:00+00:00 2.0 2024-01-01 00:02:00+00:00 3.0 Notes ----- - The expression is evaluated using pandas' `eval` function, which is optimized for numerical operations on DataFrames. - The input DataFrame `X` is available in the expression context. - When using `new_unit`, the transformer follows the Tide naming convention of "name__unit__block" for column names. - The transformer preserves the DataFrame's index and column structure. - All mathematical operations are applied element-wise to the DataFrame. Returns ------- pd.DataFrame The transformed DataFrame with the mathematical expression applied to all values. If new_unit is specified, the column names are updated accordingly. """
[docs] def __init__(self, expression: str, new_unit: str = None): super().__init__() self.expression = expression self.new_unit = new_unit
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): if self.new_unit is not None: self.feature_names_out_ = self.get_set_tags_values_columns( X.copy(), 1, self.new_unit ) def _transform_implementation(self, X: pd.Series | pd.DataFrame): X = eval(self.expression) if self.new_unit is not None: X.columns = self.feature_names_out_ return X
[docs] class TimeGradient(BaseProcessing): """A transformer that calculates the time gradient (derivative) of a pandas DataFrame. This transformer computes the rate of change of values with respect to time. The gradient is calculated using np centered gradient method. d(i) = (yi-1 - yi+1) / (ti-1 - ti+1) Method assume linear variation at boundaries Parameters ---------- new_unit : str, optional (default=None) The new unit to apply to the column names after transformation. If provided, the transformer will update the unit part of the column names (the part after the second "__" in the Tide naming convention). Example: If input columns are "energy__J__building" and new_unit="W", output columns will be "energy__W__building". Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> # Create energy data (in Joules) with varying consumption >>> df = pd.DataFrame( ... { ... "energy__J__building": [ ... 0, # Start at 0 J ... 360000, # 1 kWh = 3600000 J ... 720000, # 2 kWh ... 1080000, # 3 kWh ... 1440000, # 4 kWh ... ] ... }, ... index=dates, ... ) >>> # Calculate power (W) from energy (J) using time gradient >>> # Power = Energy / time (in seconds) >>> transformer = TimeGradient(new_unit="W") >>> result = transformer.fit_transform(df) >>> print(result) energy__W__building 2024-01-01 00:00:00+00:00 NaN 2024-01-01 00:01:00+00:00 6000.0 2024-01-01 00:02:00+00:00 6000.0 2024-01-01 00:03:00+00:00 6000.0 2024-01-01 00:04:00+00:00 6000.0 Notes ----- - The time gradient is calculated as (value2 - value1) / (time2 - time1 in seconds) - The first and last values in each column will be NaN since they don't have enough neighbors to calculate the gradient - When using new_unit, the transformer follows the Tide naming convention of "name__unit__block" for column names Returns ------- pd.DataFrame The DataFrame with time gradients calculated for each column. The output maintains the same DateTimeIndex as the input. If new_unit is specified, the column names are updated accordingly. """
[docs] def __init__(self, new_unit: str = None): super().__init__() self.new_unit = new_unit
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): if self.new_unit is not None: self.feature_names_out_ = self.get_set_tags_values_columns( X.copy(), 1, self.new_unit ) def _transform_implementation(self, X: pd.Series | pd.DataFrame): original_index = X.index.copy() derivative = time_gradient(X) derivative = derivative.reindex(original_index) if self.new_unit is not None: derivative.columns = self.feature_names_out_ return derivative
class TimeIntegrate(BaseProcessing): """ A transformer that calculates the time integral (cumulative sum over time) of a pandas DataFrame. This transformer computes the cumulative integral of values with respect to time. The integral is calculated using the trapezoidal rule between consecutive data points. Int[i] = Int[i - 1] + dt * (Yi-1 + Yi) / 2 CAUTION given that TimeGradient is a centered derivative. This is not an exact inverse transform. Parameters ---------- new_unit : str, optional (default=None) The new unit to apply to the column names after transformation. If provided, the transformer will update the unit part of the column names (the part after the second "__" in the Tide naming convention). Example: If input columns are "power__W__building" and new_unit="J", output columns will be "power__J__building". If not provided and drop_columns=False, the unit will be "originalunit.s" initial_value : float or dict, optional (default=0.0) The initial value(s) for the integral at the first timestamp. Can be: - float: Same initial value for all columns - dict: Mapping of column names to their initial values Example: initial_value={"power__W__building": 1000.0} drop_columns : bool, optional (default=True) If True, only returns the integrated columns (replaces original data). If False, keeps both the original columns and adds new integrated columns with updated units. Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> # Create power data (in Watts) - constant 6000W consumption >>> df = pd.DataFrame( ... {"power__W__building": [6000, 6000, 6000, 6000, 6000]}, ... index=dates, ... ) >>> # Calculate energy (J) from power (W) using time integration >>> # Energy = integral of Power over time (in seconds) >>> transformer = TimeIntegral(new_unit="J") >>> result = transformer.fit_transform(df) >>> print(result) power__J__building 2024-01-01 00:00:00+00:00 0.0 2024-01-01 00:01:00+00:00 360000.0 2024-01-01 00:02:00+00:00 720000.0 2024-01-01 00:03:00+00:00 1080000.0 2024-01-01 00:04:00+00:00 1440000.0 >>> # Keep original columns alongside integrated ones >>> transformer_keep = TimeIntegral(new_unit="J", drop_columns=False) >>> result_keep = transformer_keep.fit_transform(df) >>> print(result_keep) power__W__building power__J__building 2024-01-01 00:00:00+00:00 6000 0.0 2024-01-01 00:01:00+00:00 6000 360000.0 2024-01-01 00:02:00+00:00 6000 720000.0 2024-01-01 00:03:00+00:00 6000 1080000.0 2024-01-01 00:04:00+00:00 6000 1440000.0 >>> # Without specifying new_unit, the unit becomes "W.s" >>> transformer_auto = TimeIntegral(drop_columns=False) >>> result_auto = transformer_auto.fit_transform(df) >>> print(result_auto) power__W__building power__W.s__building 2024-01-01 00:00:00+00:00 6000 0.0 2024-01-01 00:01:00+00:00 6000 360000.0 2024-01-01 00:02:00+00:00 6000 720000.0 2024-01-01 00:03:00+00:00 6000 1080000.0 2024-01-01 00:04:00+00:00 6000 1440000.0 Notes ----- - The time integral is calculated using the trapezoidal rule: integral += (time2 - time1) * (value1 + value2) / 2 - This simulates an energy meter that accumulates energy over time - The first value is set to initial_value (default 0.0) - Time differences are calculated in seconds - When using new_unit, the transformer follows the Tide naming convention of "name__unit__block" for column names - For irregular time series, the integration automatically adapts to the varying time steps - When drop_columns=False, the original columns are preserved and new integrated columns are added with the appropriate unit suffix Returns ------- pd.DataFrame If drop_columns=True: DataFrame with cumulative time integrals for each column. If drop_columns=False: DataFrame with both original and integrated columns. The output maintains the same DateTimeIndex as the input. If new_unit is specified, the integrated column names are updated accordingly. """ def __init__( self, new_unit: str = None, initial_value: float | dict = 0.0, drop_columns: bool = True, ): super().__init__() self.new_unit = new_unit self.initial_value = initial_value self.drop_columns = drop_columns def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): # Determine the unit to use for integrated columns if self.new_unit is not None: unit_suffix = self.new_unit else: # Extract original unit and append ".s" # Assuming Tide naming convention: "name__unit__block__sub_block" unit_suffix = None sample_col = X.columns[0] if len(X.columns) > 0 else None if sample_col and "__" in sample_col: parts = sample_col.split("__") if len(parts) >= 2: original_unit = parts[1] unit_suffix = f"{original_unit}.s" # Create feature names for integrated columns self.integrated_feature_names_ = self.get_set_tags_values_columns( X.copy(), 1, unit_suffix ) # Determine final output feature names if self.drop_columns: self.feature_names_out_ = self.integrated_feature_names_ else: self.feature_names_out_ = list(X.columns) + list( self.integrated_feature_names_ ) def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_", "feature_names_out_"]) if len(X) == 0: return X time_diffs = np.diff(X.index.view("int64")) * 1e-9 # Create DataFrame for integrated values integrated = pd.DataFrame(index=X.index, columns=X.columns, dtype=float) for col in X.columns: values = X[col].to_numpy() if isinstance(self.initial_value, dict): init_val = self.initial_value.get(col, 0.0) else: init_val = self.initial_value integrals = np.zeros(len(values)) integrals[0] = init_val for i in range(1, len(values)): dt = time_diffs[i - 1] # Trapezoidal rule: area = dt * (y1 + y2) / 2 increment = dt * (values[i - 1] + values[i]) / 2 integrals[i] = integrals[i - 1] + increment integrated[col] = integrals integrated.columns = self.integrated_feature_names_ if self.drop_columns: return integrated else: result = pd.concat([X, integrated], axis=1) return result
[docs] class Ffill(BaseFiller, BaseProcessing): """A transformer that forward-fills missing values in a pandas DataFrame. This transformer fills missing values (NaN) in a DataFrame by propagating the last valid observation forward. It is particularly useful when past values are more relevant for filling gaps than future values. Parameters ---------- limit : int, optional (default=None) The maximum number of consecutive NaN values to forward-fill. If specified, only gaps with this many or fewer consecutive NaN values will be filled. Must be greater than 0 if not None. Example: If limit=2, a gap of 3 or more NaN values will only be partially filled. gaps_lte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only fill gaps with duration less than or equal to this value. gaps_gte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only fill gaps with duration greater than or equal to this value. Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "temp__°C__room": [20, np.nan, np.nan, 23, 24], ... "press__Pa__room": [1000, np.nan, 900, np.nan, 1000], ... }, ... index=dates, ... ) >>> # Forward-fill all missing values >>> filler = Ffill() >>> result = filler.fit_transform(df) >>> print(result) temp__°C__room press__Pa__room 2024-01-01 00:00:00+00:00 20.0 1000.0 2024-01-01 00:01:00+00:00 20.0 1000.0 2024-01-01 00:02:00+00:00 20.0 900.0 2024-01-01 00:03:00+00:00 23.0 900.0 2024-01-01 00:04:00+00:00 24.0 1000.0 >>> # Forward-fill with limit of 1 >>> filler_limited = Ffill(limit=1) >>> result_limited = filler_limited.fit_transform(df) >>> print(result_limited) temp__°C__room press__Pa__room 2024-01-01 00:00:00+00:00 20.0 1000.0 2024-01-01 00:01:00+00:00 20.0 1000.0 2024-01-01 00:02:00+00:00 NaN 900.0 2024-01-01 00:03:00+00:00 23.0 900.0 2024-01-01 00:04:00+00:00 24.0 1000.0 >>> # Forward-fill only gaps of 1 hour or less >>> filler_timed = Ffill(gaps_lte="1h") >>> result_timed = filler_timed.fit_transform(df) >>> print(result_timed) temp__°C__room press__Pa__room 2024-01-01 00:00:00+00:00 20.0 1000.0 2024-01-01 00:01:00+00:00 NaN 1000.0 2024-01-01 00:02:00+00:00 NaN 900.0 2024-01-01 00:03:00+00:00 23.0 900.0 2024-01-01 00:04:00+00:00 24.0 1000.0 Notes ----- - NaN values at the beginning of the time series will remain unfilled since there are no past values to propagate Returns ------- pd.DataFrame The DataFrame with missing values forward-filled according to the specified parameters. The output maintains the same DateTimeIndex and column structure as the input. """
[docs] def __init__( self, limit: int = None, gaps_lte: str | pd.Timedelta | dt.timedelta = None, gaps_gte: str | pd.Timedelta | dt.timedelta = None, ): BaseFiller.__init__(self) BaseProcessing.__init__(self) self.limit = limit self.gaps_gte = gaps_gte self.gaps_lte = gaps_lte
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): filled_x = X.ffill(limit=self.limit) if not (self.gaps_gte or self.gaps_lte): return filled_x gaps_mask = self.get_gaps_mask(X) X[gaps_mask] = filled_x[gaps_mask] return X
[docs] class Bfill(BaseFiller, BaseProcessing): """A transformer that back-fills missing values in a pandas DataFrame. This transformer fills missing values (NaN) in a DataFrame by propagating the next valid observation backward. It is particularly useful when future values are more relevant for filling gaps than past values. Parameters ---------- limit : int, optional (default=None) The maximum number of consecutive NaN values to back-fill. If specified, only gaps with this many or fewer consecutive NaN values will be filled. Must be greater than 0 if not None. Example: If limit=2, a gap of 3 or more NaN values will only be partially filled. gaps_lte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only fill gaps with duration less than or equal to this value. gaps_gte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only fill gaps with duration greater than or equal to this value. Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "temp__°C__room": [20, np.nan, np.nan, 23, 24], ... "press__Pa__room": [1000, np.nan, 900, np.nan, 1000], ... }, ... index=dates, ... ) >>> # Back-fill all missing values >>> filler = Bfill() >>> result = filler.fit_transform(df) >>> print(result) temp__°C__room press__Pa__room 2024-01-01 00:00:00+00:00 20.0 1000.0 2024-01-01 00:01:00+00:00 23.0 900.0 2024-01-01 00:02:00+00:00 23.0 900.0 2024-01-01 00:03:00+00:00 23.0 1000.0 2024-01-01 00:04:00+00:00 24.0 1000.0 >>> # Back-fill with limit of 1 >>> filler_limited = Bfill(limit=1) >>> result_limited = filler_limited.fit_transform(df) >>> print(result_limited) temp__°C__room press__Pa__room 2024-01-01 00:00:00+00:00 20.0 1000.0 2024-01-01 00:01:00+00:00 23.0 900.0 2024-01-01 00:02:00+00:00 NaN 900.0 2024-01-01 00:03:00+00:00 23.0 1000.0 2024-01-01 00:04:00+00:00 24.0 1000.0 Notes ----- - The transformer fills NaN values by propagating the next valid observation backward in time - When limit is specified, only gaps with that many or fewer consecutive NaN values will be filled - The gaps_lte and gaps_gte parameters allow filtering gaps based on their duration before filling - The transformer preserves the DataFrame's index and column structure - NaN values at the end of the time series will remain unfilled since there are no future values to propagate Returns ------- pd.DataFrame The DataFrame with missing values back-filled according to the specified parameters. The output maintains the same DateTimeIndex and column structure as the input. """
[docs] def __init__( self, limit: int = None, gaps_lte: str | pd.Timedelta | dt.timedelta = None, gaps_gte: str | pd.Timedelta | dt.timedelta = None, ): BaseFiller.__init__(self) BaseProcessing.__init__(self) self.limit = limit self.gaps_gte = gaps_gte self.gaps_lte = gaps_lte
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): filled_x = X.bfill(limit=self.limit) if not (self.gaps_gte or self.gaps_lte): return filled_x gaps_mask = self.get_gaps_mask(X) X[gaps_mask] = filled_x[gaps_mask] return X
[docs] class FillNa(BaseFiller, BaseProcessing): """ A transformer that fills missing values in a pandas DataFrame with a specified value. Parameters ---------- value : float The value to use for filling missing values. gaps_lte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only fill gaps with duration less than or equal to this value. gaps_gte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only fill gaps with duration greater than or equal to this value. Examples -------- >>> import pandas as pd >>> import numpy as np >>> from datetime import datetime, timedelta >>> from tide.processing import FillNa >>> # Create a DataFrame with missing values and timezone-aware index >>> dates = pd.date_range(start="2024-01-01", periods=5, freq="1h", tz="UTC") >>> df = pd.DataFrame( ... { ... "temperature__°C": [20.0, np.nan, np.nan, 22.0, 23.0], ... "pressure__Pa": [1013.0, np.nan, 1015.0, np.nan, 1014.0], ... }, ... index=dates, ... ) >>> # Fill all missing values with 0 >>> filler = FillNa(value=0) >>> df_filled = filler.fit_transform(df) >>> print(df_filled) temperature__°C pressure__Pa 2024-01-01 00:00:00+00:00 20.0 1013.0 2024-01-01 01:00:00+00:00 0.0 0.0 2024-01-01 02:00:00+00:00 0.0 1015.0 2024-01-01 03:00:00+00:00 22.0 0.0 2024-01-01 04:00:00+00:00 23.0 1014.0 >>> # Fill only gaps of 1 hour or less with -999 >>> filler = FillNa(value=-999, gaps_lte="1h") >>> df_filled = filler.fit_transform(df) >>> print(df_filled) temperature__°C pressure__Pa 2024-01-01 00:00:00+00:00 20.0 1013.0 2024-01-01 01:00:00+00:00 np.nan -999.0 2024-01-01 02:00:00+00:00 np.nan 1015.0 2024-01-01 03:00:00+00:00 22.0 -999.0 2024-01-01 04:00:00+00:00 23.0 1014.0 Notes ----- - When using gap duration parameters (gaps_lte or gaps_gte), only gaps within the specified time ranges will be filled - This transformer is particularly useful for: - Replacing missing values with a known default value - Handling sensor errors or invalid measurements Returns ------- pd.DataFrame A DataFrame with missing values filled according to the specified parameters. The output maintains the same structure and index as the input DataFrame. """
[docs] def __init__( self, value: float, gaps_lte: str | pd.Timedelta | dt.timedelta = None, gaps_gte: str | pd.Timedelta | dt.timedelta = None, ): BaseFiller.__init__(self) BaseProcessing.__init__(self) self.value = value self.gaps_gte = gaps_gte self.gaps_lte = gaps_lte
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): if self.gaps_gte or self.gaps_lte: gaps = self.get_gaps_dict_to_fill(X) for col, gaps in gaps.items(): for gap in gaps: X.loc[gap, col] = X.loc[gap, col].fillna(self.value) return X else: return X.fillna(self.value)
[docs] class Interpolate(BaseFiller, BaseProcessing): """ A transformer that interpolates missing values in a pandas DataFrame using various methods. Parameters ---------- method : str, default="linear" The interpolation method to use. Sample of useful available methods: - "linear": Linear interpolation (default) - "slinear": Spline interpolation of order 1 - "quadratic": Spline interpolation of order 2 - "cubic": Spline interpolation of order 3 - "barycentric": Barycentric interpolation - "polynomial": Polynomial interpolation - "krogh": Krogh interpolation - "piecewise_polynomial": Piecewise polynomial interpolation - "spline": Spline interpolation - "pchip": Piecewise cubic Hermite interpolation - "akima": Akima interpolation - "cubicspline": Cubic spline interpolation - "from_derivatives": Interpolation from derivatives gaps_lte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only interpolate gaps with duration less than or equal to this value. gaps_gte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only interpolate gaps with duration greater than or equal to this value. Examples -------- >>> import pandas as pd >>> import numpy as np >>> from datetime import datetime, timedelta >>> from tide.processing import Interpolate >>> # Create a DataFrame with missing values and timezone-aware index >>> dates = pd.date_range(start="2024-01-01", periods=5, freq="1h", tz="UTC") >>> df = pd.DataFrame( ... { ... "temperature__°C": [20.0, np.nan, np.nan, 22.0, 23.0], ... "pressure__Pa": [1013.0, np.nan, 1015.0, np.nan, 1014.0], ... }, ... index=dates, ... ) >>> # Linear interpolation of all missing values >>> interpolator = Interpolate(method="linear") >>> df_interpolated = interpolator.fit_transform(df) >>> print(df_interpolated) temperature__°C pressure__Pa 2024-01-01 00:00:00+00:00 20.0 1013.0 2024-01-01 01:00:00+00:00 20.7 1014.0 2024-01-01 02:00:00+00:00 21.3 1015.0 2024-01-01 03:00:00+00:00 22.0 1014.5 2024-01-01 04:00:00+00:00 23.0 1014.0 Notes ----- - When using gap duration parameters (gaps_lte or gaps_gte), only gaps within the specified time ranges will be interpolated - Different interpolation methods may produce different results: - Linear interpolation is simple but may not capture complex patterns - Cubic interpolation provides smoother curves but may overshoot Returns ------- pd.DataFrame A DataFrame with missing values interpolated according to the specified parameters. The output maintains the same structure and index as the input DataFrame. """
[docs] def __init__( self, method: str = "linear", gaps_lte: str | pd.Timedelta | dt.timedelta = None, gaps_gte: str | pd.Timedelta | dt.timedelta = None, ): BaseFiller.__init__(self) BaseProcessing.__init__(self) self.method = method self.gaps_gte = gaps_gte self.gaps_lte = gaps_lte
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): gaps_mask = self.get_gaps_mask(X) X_full = X.interpolate(method=self.method) X[gaps_mask] = X_full[gaps_mask] return X
[docs] class Resample(BaseProcessing): """A transformer that resamples time series data to a different frequency. This transformer allows you to resample time series data to a different frequency while applying specified aggregation methods. It supports both simple resampling with a single method for all columns and custom methods for specific columns using Tide's naming convention. Parameters ---------- rule : str | pd.Timedelta | dt.timedelta The frequency to resample to. Can be specified as: - String: '1min', '5min', '1h', '1D', etc. - Timedelta object: pd.Timedelta('1 hour') - datetime.timedelta object: dt.timedelta(hours=1) method : str | Callable, default='mean' The default aggregation method to use for resampling. Can be: - String: 'mean', 'sum', 'min', 'max', 'std', etc. - Custom string from FUNCTION_MAP: 'time_integrate', 'average', etc. - Callable: Any function that can be used with pandas' resample tide_format_methods : dict[str, str | Callable], optional (default=None) A dictionary mapping Tide tag components to specific aggregation methods. Keys are the components to match (name, unit, block, sub_block). Values are the aggregation methods to use for matching columns. Example: {'name': 'power', 'method': 'sum'} will use sum aggregation for all columns with 'power' in their name. columns_methods : list[tuple[list[str], str | Callable]], optional (default=None) A list of tuples specifying custom methods for specific columns. Each tuple contains: - list[str]: List of column names to apply the method to - str | Callable: The aggregation method to use Example: [(['power__W__building'], 'sum')] Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "power__W__building": [1000, 1200, 1100, 1300, 1400], ... "temp__°C__room": [20, 21, 22, 23, 24], ... "humid__%__room": [45, 46, 47, 48, 49], ... }, ... index=dates, ... ) >>> # Resample to 5-minute intervals using mean >>> resampler = Resample(rule="5min") >>> result = resampler.fit_transform(df) >>> print(result) power__W__building temp__°C__room humid__%__room 2024-01-01 00:00:00+00:00 1100.0 21.0 46.0 2024-01-01 00:05:00+00:00 1350.0 23.5 48.5 >>> # Resample with custom methods >>> resampler_custom = Resample( ... rule="5min", ... tide_format_methods={"name": "power", "method": "min"}, ... columns_methods=[(["temp__°C__room"], "max")], ... ) Notes ----- - When using tide_format_methods, the matching is done on the Tide tag components (name__unit__block__sub_block) - If tide_format_methods is provided, it takes precedence over columns_methods and completely replaces it during fitting - If no custom method is specified for a column, the default method is used - The output frequency is determined by the rule parameter - Missing values in the input are handled according to the specified methods Returns ------- pd.DataFrame The resampled DataFrame with the specified frequency and aggregation methods. The output maintains the same column structure as the input, with values aggregated according to the specified methods. """
[docs] def __init__( self, rule: str | pd.Timedelta | dt.timedelta, method: str | Callable = "mean", tide_format_methods: dict[str, str | Callable] = None, columns_methods: list[tuple[list[str], str | Callable]] = None, ): super().__init__() self.rule = rule self.method = method self.tide_format_methods = tide_format_methods self.columns_methods = columns_methods
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): if self.tide_format_methods: self.columns_methods = [] for req, method in self.tide_format_methods.items(): self.columns_methods.append((tide_request(X.columns, req), method)) def _transform_implementation(self, X: pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_"]) if not self.columns_methods: col_to_method = {col: self.method for col in X.columns} else: col_to_method = {} for cols, method in self.columns_methods: for col in cols: col_to_method[col] = method for col in X.columns: col_to_method.setdefault(col, self.method) method_to_cols = defaultdict(list) for col, method in col_to_method.items(): method_to_cols[method].append(col) results = [] for method, cols in method_to_cols.items(): if ( isinstance(method, str) and method in FUNCTION_MAP and method not in ["mean", "sum", "min", "max", "std"] ): method = FUNCTION_MAP[method] res = X[cols].resample(self.rule).agg(method) results.append(res) out = pd.concat(results, axis=1) return out[X.columns]
[docs] class AddTimeLag(BaseProcessing): """A transformer that adds time-lagged features to a pandas DataFrame. This transformer creates new features by shifting existing features in time, allowing the creation of past or future values as new features. This is particularly useful for time series analysis where historical or future values might be relevant predictors. Parameters ---------- time_lag : str | pd.Timedelta | dt.timedelta, default="1h" The time lag to apply when creating new features. Can be specified as: - A string (e.g., "1h", "30min", "1d") - A pandas Timedelta object - A datetime timedelta object A positive time lag creates features with past values, while a negative time lag creates features with future values. features_to_lag : str | list[str] | None, default=None The features to create lagged versions of. If None, all features in the input DataFrame will be lagged. Can be specified as: - A single feature name (string) - A list of feature names - None (to lag all features) feature_marker : str | None, default=None The prefix to use for the new lagged feature names. If None, the string representation of time_lag followed by an underscore is used. For example, with time_lag="1h", features will be prefixed with "1h_". drop_resulting_nan : bool, default=False Whether to drop rows containing NaN values that result from the lag operation. This is useful when you want to ensure complete data for the lagged features. Examples -------- >>> import pandas as pd >>> from tide.processing import AddTimeLag >>> # Create sample data >>> dates = pd.date_range(start="2024-01-01", periods=5, freq="1h", tz="UTC") >>> df = pd.DataFrame( ... { ... "power__W__building": [100, 200, 300, 400, 500], ... "temp__°C__room": [20, 21, 22, 23, 24], ... }, ... index=dates, ... ) >>> # Add 1-hour lagged features >>> lagger = AddTimeLag(time_lag="1h") >>> result = lagger.fit_transform(df) >>> print(result) power__W__building temp__°C__room 1h_power__W__building 1h_temp__°C__room 2024-01-01 00:00:00 100.0 20.0 NaN NaN 2024-01-01 01:00:00 200.0 21.0 100.0 20.0 2024-01-01 02:00:00 300.0 22.0 200.0 21.0 2024-01-01 03:00:00 400.0 23.0 300.0 22.0 2024-01-01 04:00:00 500.0 24.0 400.0 23.0 >>> # Add custom lagged features with specific marker >>> lagger_custom = AddTimeLag( ... time_lag="1h", ... features_to_lag=["power__W__building"], ... feature_marker="prev_", ... drop_resulting_nan=True, ... ) >>> result_custom = lagger_custom.fit_transform(df) >>> print(result_custom) power__W__building temp__°C__room prev_power__W__building 2024-01-01 00:00:00 200.0 21.0 100.0 2024-01-01 01:00:00 300.0 22.0 200.0 2024-01-01 02:00:00 400.0 23.0 300.0 2024-01-01 03:00:00 500.0 24.0 400.0 Notes ----- - The transformer preserves the original features and adds new lagged versions - Lagged features are created by shifting the index and concatenating with the original data - When drop_resulting_nan=True, rows with NaN values in lagged features are removed from the output - The feature_marker parameter allows for custom naming of lagged features - The transformer supports both positive (past) and negative (future) lags Returns ------- pd.DataFrame The input DataFrame with additional lagged features. The original features are preserved, and new lagged features are added with the specified prefix. """
[docs] def __init__( self, time_lag: str | pd.Timedelta | dt.timedelta = "1h", features_to_lag: str | list[str] = None, feature_marker: str = None, drop_resulting_nan=False, ): BaseProcessing.__init__(self) self.time_lag = time_lag self.features_to_lag = features_to_lag self.feature_marker = feature_marker self.drop_resulting_nan = drop_resulting_nan
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): if self.features_to_lag is None: self.features_to_lag = X.columns else: self.features_to_lag = ( [self.features_to_lag] if isinstance(self.features_to_lag, str) else self.features_to_lag ) self.feature_marker = ( str(self.time_lag) + "_" if self.feature_marker is None else self.feature_marker ) self.required_columns = self.features_to_lag self.feature_names_out_.extend( [self.feature_marker + name for name in self.required_columns] ) def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_", "feature_names_out_"]) to_lag = X[self.features_to_lag].copy() to_lag.index = to_lag.index + self.time_lag to_lag.columns = self.feature_marker + to_lag.columns X_transformed = pd.concat([X, to_lag], axis=1) if self.drop_resulting_nan: X_transformed = X_transformed.dropna() return X_transformed
[docs] class WindowAggregate(BaseProcessing): """A transformer that adds window-aggregated features to a pandas DataFrame. This transformer creates new features by applying aggregation functions over rolling time windows of existing features. Parameters ---------- window_interval : tuple[str, str], default=("-5min", "0min") The time window interval for aggregation, specified as a tuple of (start_offset, stop_offset) from each timestamp. Can be specified as: - String tuples (e.g., ("-5min", "0min"), ("-1h", "-30min")) - Both offsets must be parseable as pandas Timedelta objects The window includes all data points between [t + start_offset, t + stop_offset]. Note: Future windows (stop_offset > 0) are not supported. features_to_aggregate : str | list[str] | None, default=None The features to create aggregated versions of. If None, all features in the input DataFrame will be aggregated. Can be specified as: - A single feature name (string) - A list of feature names - None (to aggregate all features) feature_marker : str | None, default=None The prefix to use for the new aggregated feature names. If None, the string representation of window_interval followed by an underscore is used. For example, with window_interval=("-5min", "0min"), features will be prefixed with "('-5min', '0min')_". agg_method : str, default="mean" The aggregation method to apply over the rolling window. Must be a valid pandas rolling aggregation method such as: - "mean": Average value over the window - "sum": Sum of values over the window - "min": Minimum value over the window - "max": Maximum value over the window - "std": Standard deviation over the window - "median": Median value over the window - "count": Count of non-null values over the window Examples -------- >>> import pandas as pd >>> from tide.processing import WindowAggregate >>> # Create sample data >>> dates = pd.date_range(start="2024-01-01", periods=6, freq="5min", tz="UTC") >>> df = pd.DataFrame( ... { ... "power__W__building": [100, 150, 200, 180, 220, 250], ... "temp__°C__room": [20, 20.5, 21, 21.5, 22, 22.5], ... }, ... index=dates, ... ) >>> # Add 10-minute rolling mean features (last 10 minutes up to current time) >>> aggregator = WindowAggregate(window_interval=("-10min", "0min")) >>> result = aggregator.fit_transform(df) >>> print(result) power__W__building temp__°C__room ('-10min', '0min')_power__W__building ('-10min', '0min')_temp__°C__room 2024-01-01 00:00:00 100.0 20.00 100.0 20.00 2024-01-01 00:05:00 150.0 20.50 125.0 20.25 2024-01-01 00:10:00 200.0 21.00 150.0 20.50 2024-01-01 00:15:00 180.0 21.50 176.7 21.00 2024-01-01 00:20:00 220.0 22.00 200.0 21.50 2024-01-01 00:25:00 250.0 22.50 216.7 22.00 >>> # Add custom aggregated features with specific marker and method >>> aggregator_max = WindowAggregate( ... window_interval=("-15min", "0min"), ... feature_marker="max_15min_", ... agg_method="max", ... ) >>> result_max = aggregator_max.fit_transform(df) >>> print(result_max) power__W__building temp__°C__room max_15min_power__W__building max_15min_temp__°C__room 2024-01-01 00:00:00 100.0 20.00 100.0 20.00 2024-01-01 00:05:00 150.0 20.50 150.0 20.50 2024-01-01 00:10:00 200.0 21.00 200.0 21.00 2024-01-01 00:15:00 180.0 21.50 200.0 21.50 2024-01-01 00:20:00 220.0 22.00 220.0 22.00 2024-01-01 00:25:00 250.0 22.50 250.0 22.50 Notes ----- - The transformer preserves the original features and adds new aggregated versions - The rolling window is inclusive on both ends ("closed='both'") - The minimum number of periods for aggregation is set to 1 (min_periods=1) - Future windows (where stop_offset > 0) are not supported and will raise ValueError - The window_interval must satisfy start_offset < stop_offset - The aggregation method must be a valid pandas rolling method - For each timestamp t, the aggregation is computed over [t + start_offset, t + stop_offset] - The window is achieved by first shifting the data by -stop_offset, then applying a rolling window of size (stop_offset - start_offset) Returns ------- pd.DataFrame The input DataFrame with additional window-aggregated features. The original features are preserved, and new aggregated features are added with the specified prefix. Raises ------ ValueError If stop_offset > 0 (future windows not supported) If start_offset >= stop_offset (invalid window interval) If agg_method is not a valid pandas rolling aggregation method """
[docs] def __init__( self, window_interval: tuple[str, str] = ("-5min", "0min"), feature_marker: str | None = None, agg_method: str = "mean", ): super().__init__() self.window_interval = window_interval self.feature_marker = feature_marker self.agg_method = agg_method
def _fit_implementation(self, X: pd.DataFrame, y=None): start, stop = self.window_interval self.offset_start_ = pd.Timedelta(start) self.offset_stop_ = pd.Timedelta(stop) if self.offset_stop_ > pd.Timedelta(0): raise ValueError("Future windows are not supported with rolling") if self.offset_start_ >= self.offset_stop_: raise ValueError("window_interval must satisfy start < stop") self.feature_marker = ( f"{self.window_interval}_" if self.feature_marker is None else self.feature_marker ) self.required_columns = list(X.columns) self.new_columns_ = [self.feature_marker + col for col in self.required_columns] self.feature_names_out_ = self.feature_names_in_ + self.new_columns_ self.shift_ = -self.offset_stop_ # e.g. 5min self.rolling_window_ = self.offset_stop_ - self.offset_start_ # e.g. 10min return self def _transform_implementation(self, X: pd.DataFrame): check_is_fitted( self, attributes=[ "required_columns", "feature_names_out_", "shift_", "rolling_window_", "new_columns_", ], ) X_shifted = X.shift(freq=self.shift_) rolling = X_shifted.rolling( window=self.rolling_window_, closed="both", min_periods=1, ) if not hasattr(rolling, self.agg_method): raise ValueError( f"Aggregation '{self.agg_method}' not supported by rolling" ) X_agg = getattr(rolling, self.agg_method)() X_agg.columns = self.new_columns_ return pd.concat([X, X_agg], axis=1)
[docs] class GaussianFilter1D(BaseProcessing): """A transformer that applies a 1D Gaussian filter to smooth time series data. This transformer applies a one-dimensional Gaussian filter to each column of the input DataFrame, effectively reducing high-frequency noise while preserving the overall trend and important features of the time series. Parameters ---------- sigma : float, default=5 Standard deviation of the Gaussian kernel. Controls the level of smoothing: - Larger values result in smoother output but may lose fine details - Smaller values preserve more details but may not reduce noise effectively - Must be positive mode : str, default='nearest' How to handle values outside the input boundaries. Options are: - 'nearest': Use the nearest edge value (default) - 'reflect': Reflect values around the edge - 'mirror': Mirror values around the edge - 'constant': Use a constant value (0) - 'wrap': Wrap values around the edge truncate : float, default=4.0 The filter window size in terms of standard deviations. Values outside the range (mean ± truncate * sigma) are ignored. This parameter: - Controls the effective size of the filter window - Affects the computational efficiency - Must be positive Examples -------- >>> import pandas as pd >>> from tide.processing import GaussianFilter1D >>> # Create sample data with timezone-aware index >>> dates = pd.date_range(start="2024-01-01", periods=5, freq="1h", tz="UTC") >>> df = pd.DataFrame( ... { ... "power__W__building": [100, 150, 200, 180, 220], ... "temp__°C__room": [20, 21, 22, 21, 23], ... }, ... index=dates, ... ) >>> # Apply Gaussian filter with default settings >>> smoother = GaussianFilter1D(sigma=2) >>> result = smoother.fit_transform(df) >>> print(result) power__W__building temp__°C__room 2024-01-01 00:00:00+00:00 130.0 20.0 2024-01-01 01:00:00+00:00 149.0 20.0 2024-01-01 02:00:00+00:00 169.0 21.0 2024-01-01 03:00:00+00:00 187.0 21.0 2024-01-01 04:00:00+00:00 201.0 22.0 Notes ----- - The input DataFrame must have a timezone-aware DatetimeIndex - The filter is applied independently to each column - The output maintains the same index and column structure as the input - The smoothing effect is more pronounced at the edges of the time series Returns ------- pd.DataFrame The smoothed DataFrame with the same structure as the input. Each column has been smoothed using the 1D Gaussian filter with the specified parameters. """
[docs] def __init__(self, sigma=5, mode="nearest", truncate=4.0): super().__init__() self.sigma = sigma self.mode = mode self.truncate = truncate
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_"]) gauss_filter = partial( gaussian_filter1d, sigma=self.sigma, mode=self.mode, truncate=self.truncate ) return X.apply(gauss_filter)
[docs] class CombineColumns(BaseProcessing): """A transformer that combines multiple columns in a DataFrame using various aggregation methods. This transformer creates a new column by combining values from multiple input columns using specified aggregation methods. It supports weighted and unweighted combinations, and can optionally drop the original columns. Parameters ---------- function : str The aggregation function to use for combining columns. Valid options are: - "mean": Arithmetic mean of the columns - "sum": Sum of the columns - "average": Weighted average of the columns (requires weights) - "dot": Dot product of the columns with weights (weighted sum) weights : list[float | int] | np.ndarray, default=None Weights to apply when using 'average' or 'dot' functions. Must be provided for these functions and must match the number of columns. Ignored for 'mean' and 'sum' functions. drop_columns : bool, default=False Whether to drop the original columns after combining them. If True, only the combined result column is returned. result_column_name : str, default="combined" The name for the new column containing the combined values. Examples -------- >>> import pandas as pd >>> from tide.processing import CombineColumns >>> # Create sample data with timezone-aware index >>> dates = pd.date_range(start="2024-01-01", periods=3, freq="1h", tz="UTC") >>> df = pd.DataFrame( ... { ... "power__W__building1": [100, 200, 300], ... "power__W__building2": [150, 250, 350], ... "power__W__building3": [200, 300, 400], ... }, ... index=dates, ... ) >>> # Combine columns using mean >>> combiner = CombineColumns(function="mean", result_column_name="power__W__avg") >>> result = combiner.fit_transform(df) >>> print(result) power__W__building1 power__W__building2 power__W__building3 power__W__avg 2024-01-01 00:00:00+00:00 100.0 150.0 200.0 150.0 2024-01-01 01:00:00+00:00 200.0 250.0 300.0 250.0 2024-01-01 02:00:00+00:00 300.0 350.0 400.0 350.0 >>> # Combine columns using weighted average >>> combiner_weighted = CombineColumns( ... function="average", ... weights=[0.5, 0.3, 0.2], ... result_column_name="power__W__weighted", ... drop_columns=True, ... ) >>> result_weighted = combiner_weighted.fit_transform(df) >>> print(result_weighted) power__W__weighted 2024-01-01 00:00:00+00:00 135.0 2024-01-01 01:00:00+00:00 235.0 2024-01-01 02:00:00+00:00 335.0 Notes ----- - The input DataFrame must have a timezone-aware DatetimeIndex - Weights must be provided when using 'average' or 'dot' functions - Weights are ignored for 'mean' and 'sum' functions - The number of weights must match the number of columns being combined - When drop_columns=True, only the combined result column is returned - The transformer preserves the index of the input DataFrame Returns ------- pd.DataFrame The DataFrame with the combined column added. If drop_columns=True, only the combined column is returned. The output maintains the same index as the input. """
[docs] def __init__( self, function: str, weights: list[float | int] | np.ndarray = None, drop_columns: bool = False, result_column_name: str = "combined", ): BaseProcessing.__init__(self) self.function = function self.weights = weights self.drop_columns = drop_columns self.result_column_name = result_column_name
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.fit_check_features(X) self.method_ = FUNCTION_MAP[self.function] if self.function in ["mean", "sum"] and self.weights is not None: raise ValueError( f"Weights have been provided, but {self.function} " f"cannot use it. Use one of 'average' or 'dot'" ) if self.drop_columns: self.feature_names_out_ = [] self.feature_names_out_.append(self.result_column_name) def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted( self, attributes=["feature_names_in_", "feature_names_out_", "method_"] ) if self.function == "average": X[self.result_column_name] = self.method_(X, axis=1, weights=self.weights) elif self.function == "dot": X[self.result_column_name] = self.method_(X, self.weights) else: X[self.result_column_name] = self.method_(X, axis=1) return X[self.feature_names_out_]
[docs] class STLFilter(BaseProcessing): """A transformer that applies Seasonal-Trend decomposition using LOESS (STL) to detect and filter outliers in time series data. This transformer decomposes each column of the input DataFrame into seasonal, trend, and residual components using STL decomposition. It then identifies outliers in the residual component based on an absolute threshold and replaces them with NaN values. Parameters ---------- period : int | str | dt.timedelta The periodicity of the seasonal component. Can be specified as: - An integer for the number of observations in one seasonal cycle - A string representing the time frequency (e.g., '15T' for 15 minutes) - A timedelta object representing the duration of the seasonal cycle trend : int | str | dt.timedelta The length of the trend smoother. Must be odd and larger than season. Typically set to around 150% of the seasonal period. The choice depends on the characteristics of your time series. absolute_threshold : int | float The threshold for detecting anomalies in the residual component. Any value in the residual that exceeds this threshold (in absolute value) is considered an anomaly and replaced by NaN. seasonal : int | str | dt.timedelta, default=None The length of the smoothing window for the seasonal component. If not provided, it is inferred based on the period. Must be an odd integer if specified as an int. Can also be specified as a string representing a time frequency or a timedelta object. stl_additional_kwargs : dict[str, float], default=None Additional keyword arguments to pass to the STL decomposition. Notes ----- - The STL decomposition is applied independently to each column - Outliers are detected based on the residual component of the decomposition - Detected outliers are replaced with NaN values - The trend parameter should be larger than the period parameter - The seasonal parameter is optional and defaults to an inferred value - The transformer preserves the index and column structure of the input Returns ------- pd.DataFrame The input DataFrame with outliers replaced by NaN values. The output maintains the same index and column structure as the input. """
[docs] def __init__( self, period: int | str | dt.timedelta, trend: int | str | dt.timedelta, absolute_threshold: int | float, seasonal: int | str | dt.timedelta = None, stl_additional_kwargs: dict[str, float] = None, ): super().__init__() self.period = period self.trend = trend self.absolute_threshold = absolute_threshold self.seasonal = seasonal self.stl_additional_kwargs = stl_additional_kwargs
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.stl_ = STLEDetector( self.period, self.trend, self.absolute_threshold, self.seasonal, self.stl_additional_kwargs, ) self.stl_.fit(X) return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_", "stl_"]) errors = self.stl_.predict(X) errors = errors.astype(bool) for col in errors: X.loc[errors[col], col] = np.nan return X
[docs] class FillGapsAR(BaseFiller, BaseProcessing): """ A transformer that fills gaps in time series data using autoregressive models. This transformer identifies and fills gaps in time series data using a specified model (e.g., Prophet). The filling process depends on the `recursive_fill` parameter: When recursive_fill=True: 1. Identifies gaps in the data and filters them based on size thresholds 2. Uses the largest continuous block of valid data to fit the model 3. Fills neighboring gaps using backcasting or forecasting 4. Optionally handles high-frequency data by: - Resampling to a larger timestep for better pattern recognition - Performing predictions at the resampled timestep - Using linear interpolation to restore original resolution 5. Repeats steps 2-4 until no more gaps remain When recursive_fill=False: 1. Identifies gaps in the data and filters them based on size thresholds 2. Uses the entire dataset to fit the model 3. Fills all gaps in a single pass using the fitted model 4. Optionally handles high-frequency data as described above Parameters ---------- model_name : str, default="Prophet" The name of the model to use for gap filling. Currently supports "Prophet" and "STL". Note: STL model requires recursive_fill=True as it cannot handle NaN values. model_kwargs : dict, default={} Additional keyword arguments to pass to the model during initialization. gaps_lte : str | datetime | pd.Timestamp, default=None Upper threshold for gap size. Gaps larger than this will not be filled. Can be a string (e.g., "1D"), datetime object, or pd.Timestamp. gaps_gte : str | datetime | pd.Timestamp, default=None Lower threshold for gap size. Gaps smaller than this will not be filled. Can be a string (e.g., "1h"), datetime object, or pd.Timestamp. resample_at_td : str | timedelta | pd.Timedelta, default=None Optional resampling period for high-frequency data. If provided, data will be resampled to this frequency before model fitting and prediction. recursive_fill : bool, default=False Whether to recursively fill gaps until no more gaps remain. If False, only performs one pass of gap filling. Must be True when using STL model. Examples -------- >>> import pandas as pd >>> from tide.processing import FillGapsAR >>> # Create sample data with gaps >>> dates = pd.date_range(start="2024-01-01", periods=24, freq="1h", tz="UTC") >>> df = pd.DataFrame( ... { ... "power__W__building": [ ... 100, ... np.nan, ... np.nan, ... 180, ... 220, ... 190, ... np.nan, ... 230, ... 180, ... 160, ... 140, ... 120, ... 110, ... 130, ... 150, ... 170, ... 190, ... 210, ... 230, ... 220, ... 200, ... 180, ... 160, ... 140, ... ] ... }, ... index=dates, ... ) >>> # Fill gaps using Prophet model (non-recursive) >>> filler = FillGapsAR( ... model_name="Prophet", gaps_lte="1D", gaps_gte="1h", resample_at_td="1h" ... ) >>> result = filler.fit_transform(df) >>> # Fill gaps using STL model (recursive required) >>> filler = FillGapsAR( ... model_name="STL", ... gaps_lte="1D", ... gaps_gte="1h", ... recursive_fill=True, # Required for STL ... ) >>> result = filler.fit_transform(df) Notes ----- - Gaps are filled independently for each column - For high-frequency data, resampling can improve pattern recognition - When recursive_fill=True, the model is fitted on the largest continuous block of valid data for each gap - When recursive_fill=False, the model is fitted on the entire dataset - STL model requires recursive_fill=True as it cannot handle NaN values - Prophet model requires additional dependencies (prophet package) Returns ------- pd.DataFrame DataFrame with gaps filled using the specified model. The output maintains the same structure and timezone information as the input. """
[docs] def __init__( self, model_name: str = "Prophet", model_kwargs: dict = {}, gaps_lte: str | dt.datetime | pd.Timestamp = None, gaps_gte: str | dt.datetime | pd.Timestamp = None, resample_at_td: str | dt.timedelta | pd.Timedelta = None, recursive_fill: bool = False, ): BaseFiller.__init__(self, gaps_lte, gaps_gte) BaseProcessing.__init__(self) self.model_name = model_name self.model_kwargs = model_kwargs self.resample_at_td = resample_at_td gaps_lte = pd.Timedelta(gaps_lte) if isinstance(gaps_lte, str) else gaps_lte resample_at_td = ( pd.Timedelta(resample_at_td) if isinstance(resample_at_td, str) else resample_at_td ) if ( resample_at_td is not None and gaps_lte is not None and gaps_lte < resample_at_td ): raise ValueError( f"Cannot predict data for gaps LTE to {gaps_lte} with data" f"at a {resample_at_td} timestep" ) self.recursive_fill = recursive_fill
def _check_forecast_horizon(self, idx): idx_dt = idx[-1] - idx[0] if idx_dt == dt.timedelta(0): idx_dt = idx.freq if idx_dt < pd.to_timedelta(self.resample_at_td): raise ValueError( f"Forecaster is asked to predict at {idx_dt} in the future " f"or in the past." f" But data used for fitting have a {self.resample_at_td} frequency" ) def _get_x_and_idx_at_freq(self, x, idx, backcast): if self.resample_at_td is not None: self._check_forecast_horizon(idx) x_out = x.resample(self.resample_at_td).mean() idx_out = pd.date_range(idx[0], idx[-1], freq=self.resample_at_td).floor( self.resample_at_td ) idx_out.freq = idx_out.inferred_freq else: x_out = x idx_out = idx return x_out, idx_out def _fill_up_sampling(self, X, idx, col): beg = idx[0] - idx.freq end = idx[-1] + idx.freq # Interpolate linearly between inferred values and using neighbor data X.loc[idx, col] = X.loc[beg:end, col].interpolate() # If gap is at boundaries if beg < X.index[0]: X.loc[idx, col] = X.loc[idx, col].bfill() if end > X.index[-1]: X.loc[idx, col] = X.loc[idx, col].ffill()
[docs] def fill_x(self, X, group, col, idx, backcast): check_is_fitted(self, attributes=["model_"]) bc_model = self.model_(backcast=backcast, **self.model_kwargs) if self.resample_at_td: self._check_forecast_horizon(idx) x_fit, idx_pred = self._get_x_and_idx_at_freq(X.loc[group, col], idx, backcast) bc_model.fit(X=x_fit.index, y=x_fit) to_predict = idx_pred to_predict = to_predict[to_predict.isin(X.index)] # Here a bit dirty. STL doesn't allow forecast on its fitting set if self.model_name == "STL": to_predict = to_predict[~to_predict.isin(x_fit.index)] X.loc[to_predict, col] = bc_model.predict(to_predict).to_numpy().flatten() if self.resample_at_td is not None: self._fill_up_sampling(X, idx, col)
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.model_ = MODEL_MAP[self.model_name] def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["model_"]) gaps = self.get_gaps_dict_to_fill(X) for col in X: if not self.recursive_fill: for idx in gaps[col]: self.fill_x(X, X.index, col, idx, backcast=None) else: while gaps[col]: data_blocks = get_data_blocks(X[col], return_combination=False)[col] data_timedelta = [block[-1] - block[0] for block in data_blocks] biggest_group = data_blocks[ data_timedelta.index(max(data_timedelta)) ] start, end = get_outer_timestamps(biggest_group, X.index) indices_to_delete = [] for i, idx in enumerate(gaps[col]): if start in idx: self.fill_x(X, biggest_group, col, idx, backcast=True) indices_to_delete.append(i) elif end in idx: self.fill_x(X, biggest_group, col, idx, backcast=False) indices_to_delete.append(i) for i in sorted(indices_to_delete, reverse=True): del gaps[col][i] return X
[docs] class ExpressionCombine(BaseProcessing): """A transformer that combines DataFrame columns using a mathematical expression. Evaluates a mathematical expression over specified columns from a DataFrame and stores the result in one or more new columns following the TIDE naming convention (``name__unit__bloc__sub_bloc``). Supports **broadcasting**: when a tag pattern matches multiple columns, the expression is evaluated once per group. Patterns matching a single column are reused (broadcast) across all groups. Parameters ---------- columns_dict : dict[str, str] Mapping of expression variable names to TIDE-style tag patterns. Keys are the variable names used in ``expression``; values are tag patterns passed to ``tide_request`` to select matching columns (e.g. ``"Tin__°C__building"``). expression : str Mathematical expression to evaluate, referencing variables defined in ``columns_dict``. Must be a valid expression for ``pandas.eval()``. Standard arithmetic operators and NumPy functions are supported. result_column_name : str Template for the output column name(s), following the TIDE convention ``name__unit__bloc__sub_bloc``. - The minimum required part is the **name** (e.g. ``"Q_vent"``). - Missing parts are filled with defaults: ``unit`` → ``"DIMENSIONLESS"``, ``bloc`` → ``"%auto_bloc%"``, ``sub_bloc`` → ``"%auto_sub_bloc%"``. - ``%auto_bloc%`` and ``%auto_sub_bloc%`` are resolved at fit time from the source columns of each group (first non-None value found). Examples of valid templates:: "loss_ventilation" # name only "loss_ventilation__J" # name + unit "loss_ventilation__J__%auto_bloc%" # auto bloc "loss_ventilation__J__%auto_bloc%__room_1" # auto bloc, fixed sub-bloc "loss_ventilation__J__building_1__room_1" # fully explicit drop_columns : bool, default=False If ``True``, the source columns referenced in ``columns_dict`` are dropped from the output. Columns not used in the expression are kept. Attributes ---------- column_groups_ : list[dict[str, str]] One dict per group, mapping variable names to the actual column names selected for that group. Set during ``fit()``. result_columns_ : list[str] Final output column names (one per group), with all placeholders resolved. Set during ``fit()``. required_columns : list[str] Union of all source columns across all groups. Updated during ``fit()``. feature_names_out_ : list[str] Ordered list of all column names present in the transformed DataFrame. Notes ----- **Broadcasting rules** - If every pattern matches exactly one column → one group, one result column. - If some patterns match *N* > 1 columns, all multi-match patterns must resolve to the same *N*. Single-match patterns are broadcast across groups. - Patterns resolving to different counts > 1 raise a ``ValueError``. **Bloc inference for** ``%auto_bloc%`` / ``%auto_sub_bloc%`` For each group the bloc and sub_bloc are inferred from the source columns of that group: the first non-``None`` value found (in ``columns_dict`` declaration order) is used. Variables that are broadcast (single match) also contribute to inference, so declare the most representative variable first if ordering matters. **Column name length** The number of parts in the result column name is capped at ``get_tags_max_level(X.columns) + 1`` to stay consistent with the depth of the source DataFrame. Raises ------ ValueError - If a tag pattern matches no columns. - If multi-match patterns resolve to different counts. - If a resolved result column name already exists in the DataFrame. Examples -------- **Broadcasting across multiple rooms** ``Text`` and ``pump_mass_flwr`` each match a single column and are broadcast to both rooms; ``Tin`` matches two columns, producing two result columns: >>> combiner = ExpressionCombine( ... columns_dict={ ... "Tin": "Tin__°C__building", ... "Text": "Text__°C__outdoor", ... "mdot": "mass_flwr__kg/s__hvac", ... }, ... expression="(Tin - Text) * mdot * 1004", ... result_column_name="Q_vent__W__%auto_bloc%", ... ) >>> result = combiner.fit_transform(df) >>> # Creates: Q_vent__W__building **Fully explicit output name** >>> combiner = ExpressionCombine( ... columns_dict={"P": "P__W__hvac", "t": "t__s__hvac"}, ... expression="P * t", ... result_column_name="E__J__hvac", ... ) >>> result = combiner.fit_transform(df) >>> # Creates: E__J__hvac **Name-only template (all tags inferred)** >>> combiner = ExpressionCombine( ... columns_dict={"T": "Tin__°C__building__room_1"}, ... expression="T + 273.15", ... result_column_name="T_kelvin", ... ) >>> result = combiner.fit_transform(df) >>> # Creates: T_kelvin__DIMENSIONLESS__building__room_1 **Dropping source columns** >>> combiner = ExpressionCombine( ... columns_dict={ ... "Tin": "Tin__°C__building", ... "Text": "Text__°C__outdoor__meteo", ... }, ... expression="Tin - Text", ... result_column_name="deltaT__K", ... drop_columns=True, ... ) >>> result = combiner.fit_transform(df) >>> # Tin and Text columns are removed; deltaT columns are added """
[docs] def __init__( self, columns_dict: dict[str, str], expression: str, result_column_name: str, drop_columns: bool = False, ): self.columns_dict = columns_dict self.expression = expression self.result_column_name = result_column_name self.drop_columns = drop_columns BaseProcessing.__init__(self, required_columns=[])
[docs] def _resolve_column_groups(self, X: pd.DataFrame) -> tuple[list[dict], list[str]]: """Resolve tag patterns to column groups and validate broadcasting rules. Returns ------- column_groups : list[dict] List of dicts mapping variable names → actual column names, one per group. result_columns : list[str] Final column names for each group, with placeholders resolved. """ # --- 1. Resolve each variable's tag pattern to matched columns --- resolved: dict[str, list[str]] = {} for var, tag_pattern in self.columns_dict.items(): matched = tide_request(X.columns, tag_pattern) if not matched: raise ValueError( f"Tag pattern '{tag_pattern}' for variable '{var}' " f"matched no columns in DataFrame." ) resolved[var] = matched # --- 2. Validate broadcasting: all counts must be 1 or the same N --- counts = {var: len(cols) for var, cols in resolved.items()} multi_counts = {c for c in counts.values() if c > 1} if len(multi_counts) > 1: raise ValueError( f"Incompatible column counts for broadcasting: {counts}. " f"All counts must be 1 or equal to each other." ) n_groups = multi_counts.pop() if multi_counts else 1 # --- 3. Build one group dict per group index --- column_groups: list[dict[str, str]] = [ { var: cols[0] if len(cols) == 1 else cols[i] for var, cols in resolved.items() } for i in range(n_groups) ] # --- 4. Parse the result name template once --- n_tag_levels = get_tags_max_level(X.columns) template_parts = _parse_result_name_template( self.result_column_name, n_tag_levels ) # --- 5. Build one result column name per group --- result_columns: list[str] = [ _build_result_column_name(template_parts, *_infer_blocs_for_group(group)) for group in column_groups ] return column_groups, result_columns
[docs] def _fit_implementation(self, X, y=None): """Fit the transformer by resolving column groups.""" self.column_groups_, self.result_columns_ = self._resolve_column_groups(X) # Update required columns for BaseProcessing all_cols = set() for group in self.column_groups_: all_cols.update(group.values()) self.required_columns = list(all_cols) if self.drop_columns: self.feature_names_out_ = [ col for col in X.columns if col not in self.required_columns ] else: self.feature_names_out_ = list(X.columns) # Check for conflicts for result_col in self.result_columns_: if result_col in self.feature_names_out_: raise ValueError( f"Result column '{result_col}' already exists in DataFrame. " f"It cannot be overwritten." ) self.feature_names_out_.extend(self.result_columns_) return self
def _transform_implementation(self, X: pd.DataFrame) -> pd.DataFrame: result = ( X if not self.drop_columns else X[[col for col in X.columns if col not in self.required_columns]] ).copy() for group, result_col in zip(self.column_groups_, self.result_columns_): # Build local_dict for pd.eval local_dict = {var: X[col] for var, col in group.items()} result.loc[:, result_col] = pd.eval( self.expression, local_dict=local_dict, ) return result[self.feature_names_out_]
class FillOikoMeteo(BaseFiller, BaseOikoMeteo, BaseProcessing): """A transformer that fills data gaps using meteorological data from the Oikolab API. This transformer identifies gaps in time series data and fills them with corresponding meteorological data retrieved from the Oikolab API. It supports filtering gaps based on their size and can handle different data frequencies through automatic interpolation or resampling. Parameters ---------- gaps_lte : str | pd.Timedelta | dt.timedelta, default=None Maximum gap size to fill. Gaps larger than this will be ignored. Can be specified as a string (e.g., "24h") or timedelta object. gaps_gte : str | pd.Timedelta | dt.timedelta, default=None Minimum gap size to fill. Gaps smaller than this will be ignored. Can be specified as a string (e.g., "1h") or timedelta object. lat : float, default=43.47 Latitude of the location for which to retrieve meteorological data. lon : float, default=-1.51 Longitude of the location for which to retrieve meteorological data. columns_param_map : dict[str, str], default=None Mapping of input columns to Oikolab API parameters. If None, all columns will be filled with temperature data. Available Oikolab parameters are: - temperature - dewpoint_temperature - mean_sea_level_pressure - wind_speed - 100m_wind_speed - relative_humidity - surface_solar_radiation - direct_normal_solar_radiation - surface_diffuse_solar_radiation - surface_thermal_radiation - total_cloud_cover - total_precipitation model : str, default="era5" The meteorological model to use for data retrieval. env_oiko_api_key : str, default="OIKO_API_KEY" Name of the environment variable containing the Oikolab API key. Examples -------- >>> from tide.processing import FillOikoMeteo >>> import pandas as pd >>> # Create sample data with gaps >>> df = pd.DataFrame( ... { ... "temperature": [20, None, 22, None, 24], ... "humidity": [50, None, 55, None, 60], ... }, ... index=pd.date_range("2024-01-01", periods=5, freq="H"), ... ) >>> # Initialize and fit the transformer >>> filler = FillOikoMeteo( ... gaps_gte="1h", ... gaps_lte="24h", ... lat=43.47, ... lon=-1.51, ... columns_param_map={ ... "temperature": "temperature", ... "humidity": "relative_humidity", ... }, ... ) >>> # Transform the data >>> result = filler.fit_transform(df) Notes ----- - Requires an Oikolab API key to be set as an environment variable. - If columns_param_map is not provided, all columns will be filled with temperature data to comply with scikit-learn API recommendations. - Automatically handles different data frequencies through interpolation or resampling. """ def __init__( self, gaps_lte: str | pd.Timedelta | dt.timedelta = None, gaps_gte: str | pd.Timedelta | dt.timedelta = None, lat: float = 43.47, lon: float = -1.51, columns_param_map: dict[str, str] = None, model: str = "era5", env_oiko_api_key: str = "OIKO_API_KEY", ): BaseFiller.__init__(self, gaps_lte, gaps_gte) BaseOikoMeteo.__init__(self, lat, lon, model, env_oiko_api_key) BaseProcessing.__init__(self) self.columns_param_map = columns_param_map def _fit_implementation(self, X, y=None): if self.columns_param_map is None: # Dumb action fill everything with temperature self.columns_param_map = {col: "temperature" for col in X.columns} self.get_api_key_from_env() def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["api_key_"]) gaps_dict = self.get_gaps_dict_to_fill(X) for col, idx_list in gaps_dict.items(): if col in self.columns_param_map.keys(): for idx in idx_list: df = self.get_meteo_from_idx(idx, [self.columns_param_map[col]]) X.loc[idx, col] = df.loc[idx, self.columns_param_map[col]] return X
[docs] class AddOikoData(BaseOikoMeteo, BaseProcessing): """ A transformer class to fetch and integrate Oikolab meteorological data into a given time-indexed DataFrame or Series. It retrieves weather data such as temperature, wind speed, or humidity at specified latitude and longitude, and adds it to the input DataFrame under user-specified column names. Parameters ---------- lat : float, optional Latitude of the location for which meteorological data is to be fetched. Default is 43.47. lon : float, optional Longitude of the location for which meteorological data is to be fetched. Default is -1.51. param_columns_map : dict[str, str], optional A mapping of meteorological parameter names (keys) to column names (values) in the resulting DataFrame. Default is `OIKOLAB_DEFAULT_MAP`. Example: `{"temperature": "text__°C__meteo", "wind_speed": "wind__m/s__meteo"}` model : str, optional The meteorological model to use for fetching data. Default is "era5". env_oiko_api_key : str, optional The name of the environment variable containing the Oikolab API key. Default is "OIKO_API_KEY". Methods ------- fit(X: pd.Series | pd.DataFrame, y=None) Checks the input DataFrame for conflicts with target column names and validates the API key availability. transform(X: pd.Series | pd.DataFrame) Fetches meteorological data and appends it to the input DataFrame under the specified column names at given frequency. Notes ----- - This class requires access to the Oikolab API, and a valid API key must be set as an environment variable. - The input DataFrame must have a DateTimeIndex for fetching data at specific time frequencies. """
[docs] def __init__( self, lat: float = 43.47, lon: float = -1.51, param_columns_map: dict[str, str] = OIKOLAB_DEFAULT_MAP, model: str = "era5", env_oiko_api_key: str = "OIKO_API_KEY", ): BaseOikoMeteo.__init__(self, lat, lon, model, env_oiko_api_key) BaseProcessing.__init__(self) self.param_columns_map = param_columns_map
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): mask = X.columns.isin(self.param_columns_map.values()) if mask.any(): raise ValueError( f"Cannot add Oikolab meteo data. {X.columns[mask]} already in columns" ) self.get_api_key_from_env() self.feature_names_out_.extend(list(self.param_columns_map.values())) def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted( self, attributes=["api_key_", "feature_names_in_", "feature_names_out_"] ) df = self.get_meteo_from_idx(X.index, list(self.param_columns_map.keys())) X.loc[:, list(self.param_columns_map.values())] = df.to_numpy() return X
[docs] class AddSolarAngles(BaseProcessing): """A transformer that adds solar angles (azimuth and elevation) to a DataFrame. This transformer calculates and adds solar azimuth and elevation angles for a given location and time series. The angles are calculated using the Astronomical Almanac's algorithm (1950-2050) as described in Michalsky (1988) and subsequent papers. Parameters ---------- lat : float, default=43.47 Latitude of the location in decimal degrees. lon : float, default=-1.51 Longitude of the location in decimal degrees. data_bloc : str, default="OTHER" Name of the data block to store the solar angles. data_sub_bloc : str, default="OTHER_SUB_BLOC" Name of the sub-block to store the solar angles. Examples -------- >>> from tide.processing import AddSolarAngles >>> import pandas as pd >>> # Create sample data with datetime index >>> df = pd.DataFrame( ... {"temperature": [20, 21, 22]}, ... index=pd.date_range("2024-01-01", periods=3, freq="H"), ... ) >>> # Add solar angles >>> transformer = AddSolarAngles( ... lat=43.47, lon=-1.51, data_bloc="SOLAR", data_sub_bloc="ANGLES" ... ) >>> # Transform the data >>> result = transformer.fit_transform(df) Notes ----- - Requires a DataFrame with a DateTimeIndex. - Adds two new columns: solar_azimuth and solar_elevation. - Uses the Astronomical Almanac's algorithm for solar position calculations. - Valid for years 1950-2050. Given the course of the world right now, I don't think anyone will need to use this transformer for dates after 2050. """
[docs] def __init__( self, lat: float = 43.47, lon: float = -1.51, data_bloc: str = "OTHER", data_sub_bloc: str = "OTHER_SUB_BLOC", ): self.lat = lat self.lon = lon self.data_bloc = data_bloc self.data_sub_bloc = data_sub_bloc BaseProcessing.__init__(self)
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.fit_check_features(X) if self.data_sub_bloc and not self.data_bloc: raise ValueError( "Cannot provide a subbloc name if bloc name is not provided" ) parts = [p for p in (self.data_bloc, self.data_sub_bloc) if p] suffix = "__".join(parts) if parts else "" suffix = f"__{suffix}" if suffix else "" new_feat_names = [ f"sun_el__angle_deg{suffix}", f"sun_az__angle_deg{suffix}", ] self.feature_names_out_.extend(new_feat_names) def _transform_implementation(self, X: pd.Series | pd.DataFrame): df = pd.DataFrame( data=np.array([sun_position(date, self.lat, self.lon) for date in X.index]), columns=self.feature_names_out_[-2:], index=X.index, ) return pd.concat([X, df], axis=1)
[docs] class ProjectSolarRadOnSurfaces(BaseProcessing): """ A transformer that projects solar radiation onto surfaces with specific orientations and tilts. This transformer calculates the total solar radiation incident on surfaces by combining: - Direct beam radiation (projected onto the tilted surface) - Diffuse sky radiation (from the sky dome) - Ground-reflected radiation (albedo effect) Parameters ---------- bni_column_name : str Name of the column containing beam normal irradiance (BNI) data in W/m². This is the direct solar radiation perpendicular to the sun's rays. dhi_column_name : str Name of the column containing diffuse horizontal irradiance (DHI) data in W/m². This is the scattered solar radiation from the sky dome. ghi_column_name : str Name of the column containing global horizontal irradiance (GHI) data in W/m². This is the total solar radiation on a horizontal surface. lat : float, default=43.47 Latitude of the location in degrees. Positive for northern hemisphere. lon : float, default=-1.51 Longitude of the location in degrees. Positive for eastern hemisphere. surface_azimuth_angles : int | float | list[int | float], default=180.0 Azimuth angles of the surfaces in degrees east of north. - 0°: North-facing - 90°: East-facing - 180°: South-facing surface_tilt_angle : float | list[float], default=35.0 Tilt angles of the surfaces in degrees from horizontal. - 0°: Horizontal surface - 90°: Vertical surface - 180°: Horizontal surface facing down albedo : float, default=0.25 Ground reflectivity or albedo coefficient. Typical values: - 0.1-0.2: Dark surfaces (asphalt, forest) - 0.2-0.3: Grass, soil - 0.3-0.4: Light surfaces (concrete, sand) - 0.4-0.5: Snow - 0.8-0.9: Fresh snow surface_name : str | list[str], default="az_180_tilt_35" Names for the output columns following Tide naming convention. Example: "south_facing_35deg" will create "south_facing_35deg__W/m²__OTHER__OTHER_SUB_BLOC" data_bloc : str, default="OTHER" Tide bloc name for the output columns. data_sub_bloc : str, default="OTHER_SUB_BLOC" Tide sub_bloc name for the output columns. Examples -------- >>> import pandas as pd >>> import numpy as np >>> from datetime import datetime, timedelta >>> from tide.processing import ProjectSolarRadOnSurfaces >>> import pytz >>> # Create a DataFrame with solar radiation data and timezone-aware index >>> dates = pd.date_range(start="2024-01-01", periods=3, freq="1h", tz="UTC") >>> df = pd.DataFrame( ... { ... "bni__W/m²__outdoor__meteo": [ ... 800, ... 900, ... 1000, ... ], # Direct normal irradiance ... "dhi__W/m²__outdoor__meteo": [ ... 200, ... 250, ... 300, ... ], # Diffuse horizontal irradiance ... "ghi__W/m²__outdoor__meteo": [ ... 600, ... 700, ... 800, ... ], # Global horizontal irradiance ... }, ... index=dates, ... ) >>> # Project radiation on a south-facing surface tilted at 35 degrees >>> projector = ProjectSolarRadOnSurfaces( ... bni_column_name="bni__W/m²__outdoor__meteo", ... dhi_column_name="dhi__W/m²__outdoor__meteo", ... ghi_column_name="ghi__W/m²__outdoor__meteo", ... surface_azimuth_angles=180.0, # South-facing ... surface_tilt_angle=35.0, # 35-degree tilt ... surface_name="south_facing_35deg", ... data_bloc="SOLAR", ... data_sub_bloc="ROOF", ... ) >>> result = projector.fit_transform(df) >>> print(result) bni__W/m²__outdoor__meteo dhi__W/m²__outdoor__meteo ghi__W/m²__outdoor__meteo south_facing_35deg__W/m²__SOLAR__ROOF 2024-01-01 00:00:00+00:00 800.0 200.0 600.0 850.5 2024-01-01 01:00:00+00:00 900.0 250.0 700.0 950.2 2024-01-01 02:00:00+00:00 1000.0 300.0 800.0 1050.8 Notes ----- - All input radiation values must be in W/m² - The output radiation values are also in W/m² Returns ------- pd.DataFrame The input DataFrame with additional columns containing the total solar radiation projected onto each specified surface. The output maintains the same DateTimeIndex as the input. """
[docs] def __init__( self, bni_column_name: str, dhi_column_name: str, ghi_column_name: str, lat: float = 43.47, lon: float = -1.51, surface_azimuth_angles: int | float | list[int | float] = 180.0, surface_tilt_angle: float | list[float] = 35.0, albedo: float = 0.25, surface_name: str | list[str] = "az_180_tilt_35", data_bloc: str = "OTHER", data_sub_bloc: str = "OTHER_SUB_BLOC", ): BaseProcessing.__init__(self) self.bni_column_name = bni_column_name self.dhi_column_name = dhi_column_name self.ghi_column_name = ghi_column_name self.lat = lat self.lon = lon self.surface_azimuth_angles = surface_azimuth_angles self.surface_tilt_angle = surface_tilt_angle self.albedo = albedo self.surface_name = surface_name self.data_bloc = data_bloc self.data_sub_bloc = data_sub_bloc
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): if ( not len(ensure_list(self.surface_azimuth_angles)) == len(ensure_list(self.surface_tilt_angle)) == len(ensure_list(self.surface_name)) ): raise ValueError("Number of surface azimuth, tilt and name does not match") self.required_columns = [ self.bni_column_name, self.dhi_column_name, self.ghi_column_name, ] self.added_columns = [ f"{name}__W/m²__{self.data_bloc}__{self.data_sub_bloc}" for name in ensure_list(self.surface_name) ] self.feature_names_out_.extend(self.added_columns) def _transform_implementation(self, X: pd.Series | pd.DataFrame): sun_pos = np.array([sun_position(date, self.lat, self.lon) for date in X.index]) for az, til, name in zip( ensure_list(self.surface_azimuth_angles), ensure_list(self.surface_tilt_angle), self.added_columns, ): X[name] = ( beam_component( til, az, 90 - sun_pos[:, 0], sun_pos[:, 1], X[self.bni_column_name] ) + sky_diffuse(til, X[self.dhi_column_name]) + ground_diffuse(til, X[self.ghi_column_name], self.albedo) ) return X
[docs] class FillOtherColumns(BaseFiller, BaseProcessing): """A transformer that fills missing values in specified columns using values from corresponding filler columns. This transformer is useful when you have multiple columns measuring the same quantity (e.g., temperature from different sensors) and want to use one column to fill gaps in another. Or fill gaps with computed values, for example solar radiations on a pyranometer from projected radiations based on meteo services. Parameters ---------- gaps_lte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only fill gaps with duration less than or equal to this value. gaps_gte : str | pd.Timedelta | dt.timedelta, optional (default=None) Only fill gaps with duration greater than or equal to this value. columns_map : dict[str, str], optional (default={}) A mapping of target columns to their corresponding filler columns. Keys are the columns with gaps to be filled. Values are the columns to use for filling the gaps. Example: {'temp__°C__room1': 'temp__°C__room2'} drop_filling_columns : bool, default=False Whether to remove the filler columns after filling the gaps. If True, only the target columns remain in the output. Examples -------- >>> import pandas as pd >>> import numpy as np >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:04:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "temp__°C__room1": [20, np.nan, np.nan, 23, 24], ... "temp__°C__room2": [21, 22, 22, 22, 23], ... "humid__%__room1": [45, np.nan, 47, np.nan, 49], ... "humid__%__room2": [46, 46, 48, 48, 50], ... }, ... index=dates, ... ) >>> # Fill gaps in room1 using room2 data >>> filler = FillOtherColumns( ... columns_map={ ... "temp__°C__room1": "temp__°C__room2", ... "humid__%__room1": "humid__%__room2", ... } ... ) >>> result = filler.fit_transform(df) >>> print(result) temp__°C__room1 temp__°C__room2 humid__%__room1 humid__%__room2 2024-01-01 00:00:00+00:00 20.0 21.0 45.0 46.0 2024-01-01 00:01:00+00:00 22.0 22.0 46.0 46.0 2024-01-01 00:02:00+00:00 22.0 22.0 47.0 48.0 2024-01-01 00:03:00+00:00 23.0 22.0 48.0 48.0 2024-01-01 00:04:00+00:00 24.0 23.0 49.0 50.0 >>> # Fill gaps and drop filler columns >>> filler_drop = FillOtherColumns( ... columns_map={ ... "temp__°C__room1": "temp__°C__room2", ... "humid__%__room1": "humid__%__room2", ... }, ... drop_filling_columns=True, ... ) >>> result_drop = filler_drop.fit_transform(df) >>> print(result_drop) temp__°C__room1 humid__%__room1 2024-01-01 00:00:00+00:00 20.0 45.0 2024-01-01 00:01:00+00:00 22.0 46.0 2024-01-01 00:02:00+00:00 22.0 47.0 2024-01-01 00:03:00+00:00 23.0 48.0 2024-01-01 00:04:00+00:00 24.0 49.0 Notes ----- - When using gap duration parameters (gaps_lte or gaps_gte), only gaps within the specified time ranges will be filled - The filler columns must contain valid values at the timestamps where the target columns have gaps - If drop_filling_columns is True, the output DataFrame will only contain the target columns with filled gaps Returns ------- pd.DataFrame The DataFrame with gaps filled using values from the specified filler columns. If drop_filling_columns is True, the filler columns are removed from the output. """
[docs] def __init__( self, gaps_lte: str | pd.Timedelta | dt.timedelta = None, gaps_gte: str | pd.Timedelta | dt.timedelta = None, columns_map: dict[str, str] = {}, drop_filling_columns: bool = False, ): BaseFiller.__init__(self, gaps_lte, gaps_gte) BaseProcessing.__init__(self) self.columns_map = columns_map self.drop_filling_columns = drop_filling_columns
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.required_columns = list(self.columns_map.keys()) + list( self.columns_map.values() ) if self.drop_filling_columns: self.removed_columns = list(self.columns_map.values()) self.feature_names_out_ = list(X.columns.drop(self.removed_columns)) def _transform_implementation(self, X: pd.Series | pd.DataFrame): gap_dict = self.get_gaps_dict_to_fill(X[list(self.columns_map.keys())]) for col, idxs in gap_dict.items(): for idx in idxs: X.loc[idx, col] = X.loc[idx, self.columns_map[col]] return ( X.drop(self.removed_columns, axis="columns") if self.drop_filling_columns else X )
[docs] class DropColumns(BaseProcessing): """A transformer that removes specified columns from a pandas DataFrame. It is particularly useful for data preprocessing when certain columns are no longer needed or for removing intermediate calculation columns. Parameters ---------- columns : str | list[str], optional (default=None) The column name or a list of column names to be dropped. If None, ALL columns are dropped, only the index is kept. Example: 'temp__°C' or ['temp__°C', 'humid__%'] or '°C|%' Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns (input columns minus dropped columns). Examples -------- >>> import pandas as pd >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:02:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "temp__°C": [20, 21, 22], ... "humid__%": [45, 50, 55], ... "press__Pa": [1000, 1010, 1020], ... }, ... index=dates, ... ) >>> # Drop a single column >>> dropper = DropColumns(columns="temp__°C") >>> result = dropper.fit_transform(df) >>> print(result) humid__% press__Pa 2024-01-01 00:00:00+00:00 45.0 1000.0 2024-01-01 00:01:00+00:00 50.0 1010.0 2024-01-01 00:02:00+00:00 55.0 1020.0 >>> # Drop multiple columns >>> dropper_multi = DropColumns(columns="°C|%") >>> result_multi = dropper_multi.fit_transform(df) >>> print(result_multi) press__Pa 2024-01-01 00:00:00+00:00 1000.0 2024-01-01 00:01:00+00:00 1010.0 2024-01-01 00:02:00+00:00 1020.0 Notes ----- - If a specified column doesn't exist in the DataFrame, it will be silently ignored - The order of remaining columns is preserved - If no columns are specified (columns=None), a DataFrame with no values is returned Returns ------- pd.DataFrame The DataFrame with specified columns removed. The output maintains the same DateTimeIndex as the input, with only the specified columns removed. """
[docs] def __init__(self, columns: str | list[str] = None): self.columns = columns BaseProcessing.__init__(self)
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.required_columns = tide_request(X, self.columns) self.feature_names_out_ = list( X.drop(self.required_columns, axis="columns").columns ) def _transform_implementation(self, X: pd.Series | pd.DataFrame): return X.drop(self.required_columns, axis="columns")
[docs] class KeepColumns(BaseProcessing): """ A transformer that keeps specified columns from a pandas DataFrame. It is particularly useful at the final step of data preprocessing. When only some columns are passed to a model Parameters ---------- columns : str | list[str], optional (default=None) The column name or a list of column names to be kept. If None, no columns are dropped and the DataFrame is returned unchanged. Example: 'temp__°C' or ['temp__°C', 'humid__%'] or '°C|%' Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns (input columns minus dropped columns). Examples -------- >>> import pandas as pd >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:02:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "temp__°C": [20, 21, 22], ... "humid__%": [45, 50, 55], ... "press__Pa": [1000, 1010, 1020], ... }, ... index=dates, ... ) >>> # Keep a single column >>> keeper = KeepColumns(columns="temp__°C") >>> result = keeper.fit_transform(df) >>> print(result) temp__°C 2024-01-01 00:00:00+00:00 20 2024-01-01 00:01:00+00:00 21 2024-01-01 00:02:00+00:00 22 >>> # Keep multiple columns >>> keeper_multi = KeepColumns(columns="°C|%") >>> result_multi = keeper_multi.fit_transform(df) >>> print(result_multi) temp__°C humid__% 2024-01-01 00:00:00+00:00 20 45 2024-01-01 00:01:00+00:00 21 50 2024-01-01 00:02:00+00:00 22 55 Notes ----- - If a specified column doesn't exist in the DataFrame, it will be silently ignored - The order of selected columns is preserved - If no columns are specified (columns=None), the DataFrame is returned unchanged Returns ------- pd.DataFrame The DataFrame with specified columns removed. The output maintains the same DateTimeIndex as the input, with only the specified columns removed. """
[docs] def __init__(self, columns: str | list[str] = None): self.columns = columns BaseProcessing.__init__(self)
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.required_columns = tide_request(X, self.columns) self.feature_names_out_ = self.required_columns def _transform_implementation(self, X: pd.Series | pd.DataFrame): return X[self.feature_names_out_]
[docs] class ReplaceTag(BaseProcessing): """A transformer that replaces components or full Tide tag names with new values. This transformer allows you to selectively replace parts of Tide tag names (components separated by "__") or full tags with new values. It is particularly useful for standardizing tag names, updating units, or changing block/sub-block names across multiple columns. Parameters ---------- tag_map : dict[str, str], optional (default=None) A dictionary mapping old tag components or full tags to new values. Keys are the components/tags to replace, values are their replacements. Example: {'°C': 'K', 'room1': 'room2', 'P__bool': 'Cycle_P__n_cycle'} If None, no replacements are made and the DataFrame is returned unchanged. Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns with replaced tag components. Examples -------- >>> import pandas as pd >>> # Create DataFrame with DateTimeIndex >>> dates = pd.date_range( ... start="2024-01-01 00:00:00", end="2024-01-01 00:02:00", freq="1min" ... ).tz_localize("UTC") >>> df = pd.DataFrame( ... { ... "temp__°C__room1__north": [20, 21, 22], ... "humid__%__room1__north": [45, 50, 55], ... "press__Pa__room1__north": [1000, 1010, 1020], ... "P__bool__ecs": [0, 1, 0], ... }, ... index=dates, ... ) >>> # Replace room1 with room2 and P__bool with Cycle_P__n_cycle >>> replacer = ReplaceTag( ... tag_map={ ... "room1": "room2", ... "P__bool": "Cycle_P__n_cycle", ... } ... ) >>> result = replacer.fit_transform(df) >>> print(result.columns) Index(['temp__°C__room2__north', 'humid__%__room2__north', 'press__Pa__room2__north', 'Cycle_P__n_cycle__ecs'], dtype='object') Notes ----- - Tide tags follow the format "name__unit__block__sub_block" - The transformer identifies components to replace regardless of their order or position in the tag. - Components not specified in tag_map remain unchanged - If tag_map is None, the DataFrame is returned unchanged - Multi-component replacements (like "P__bool" -> "Cycle_P__n_cycle") are performed while maintaining the remaining tag components. If the number of components to replace matches the number of replacements, they are substituted in their original positions. Otherwise, the first component found is replaced by all new components, and others are removed. Returns ------- pd.DataFrame The DataFrame with updated column names based on the tag replacements. The output maintains the same DateTimeIndex and data values as the input. """
[docs] def __init__(self, tag_map: dict[str, str] = None): self.tag_map = tag_map BaseProcessing.__init__(self)
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.fit_check_features(X) if self.tag_map is None: self.feature_names_out_ = self.feature_names_in_ return # Trier par nombre de composants décroissant pour matcher les plus longs en premier sorted_map = dict( sorted( self.tag_map.items(), key=lambda item: len(item[0].split("__")), reverse=True, ) ) self.feature_names_out_ = [ self._replace_col(col, sorted_map) for col in self.feature_names_in_ ] def _replace_col(self, col: str, sorted_map: dict[str, str]) -> str: parts = col.split("__") for key, value in sorted_map.items(): key_parts = key.split("__") val_parts = value.split("__") # Trouver les indices des composants du key dans parts indices = self._find_indices(parts, key_parts) if indices is None: continue if len(key_parts) == len(val_parts): # Substitution 1-pour-1 en place for idx, vp in zip(indices, val_parts): parts[idx] = vp else: # Tout mettre au premier indice, supprimer les autres first, *rest = sorted(indices) parts[first] = None # placeholder for idx in rest: parts[idx] = None # Reconstruire en remplaçant le placeholder par val_parts new_parts = [] for p in parts: if p is None and val_parts: new_parts.extend(val_parts) val_parts = [] # n'insérer qu'une fois elif p is not None: new_parts.append(p) parts = new_parts return "__".join(parts)
[docs] def _find_indices(self, parts: list[str], key_parts: list[str]) -> list[int] | None: """Retourne les indices de key_parts dans parts, ou None si introuvable.""" indices = [] available = list(range(len(parts))) # indices non encore utilisés for kp in key_parts: for i in available: if parts[i] == kp: indices.append(i) available.remove(i) break else: return None # kp introuvable return indices
def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted(self, attributes=["feature_names_in_", "feature_names_out_"]) X.columns = self.feature_names_out_ return X
[docs] class AddFourierPairs(BaseProcessing): """A transformer that adds a pair of new columns with sine and cosine signal of given period. Based on time series index, phase shift is computed from the beginning of the year. Parameters ---------- period: str | pd.Timedelta | dt.timedelta = "24h" Period of thte signal. Will automaticaly convert a string to pandas TimeDelta order: int = 1, Sinus and Cosinus order. Will add a pair of feature for order "n", with a pulsation n * 2 * pi * f amplitude: float | int = 1.0, Amplitude of the signal unit: str = "-", Unit of the signal block: str = "BLOCK", block tag according to tide taging system. Will only be used if level 2 tags or more are already used sub_block: str = "SUB_BLOCK", sub_block tag according to tide taging system. Will only be used if level 3 tags or more are already used Attributes ---------- feature_names_in_ : list[str] Names of input columns (set during fit). feature_names_out_ : list[str] Names of output columns with replaced tag components. Examples -------- >>> import pandas as pd >>> # Create DataFrame with DateTimeIndex >>> data = pd.DataFrame( ... data=np.arange(24).astype("float64"), ... index=pd.date_range("2009-01-01 00:00:00", freq="H", periods=24, tz="UTC"), ... columns=["feat_1"], ... ) >>> signal = AddFourierPairs(period="24h", order=2) >>> result = signal.fit_transform(data) >>> print(result.head()) feat_1 1 days 00:00:00_order_1_Sine 1 days 00:00:00_order_1_Cosine 1 days 00:00:00_order_2_Sine 1 days 00:00:00_order_2_Cosine 2009-01-01 00:00:00+00:00 0.0 0.000000 1.000000 0.000000 1.000000e+00 2009-01-01 01:00:00+00:00 1.0 0.258819 0.965926 0.500000 8.660254e-01 2009-01-01 02:00:00+00:00 2.0 0.500000 0.866025 0.866025 5.000000e-01 2009-01-01 03:00:00+00:00 3.0 0.707107 0.707107 1.000000 6.123234e-17 2009-01-01 04:00:00+00:00 4.0 0.866025 0.500000 0.866025 -5.000000e-01 Notes ----- - Tide tags follow the format "name__unit__block__sub_block" - If unit, block or sub_block is given, but data have a lower level tag, it will be ignored. - The transformer preserves the order of tag components Returns ------- pd.DataFrame The DataFrame with new columns corresponding to the Fourier pairs """
[docs] def __init__( self, period: str | pd.Timedelta | dt.timedelta = "24h", order: int = 1, amplitude: float | int = 1.0, unit: str = "-", block: str = "BLOCK", sub_block: str = "SUB_BLOCK", ): BaseProcessing.__init__(self) self.period = pd.to_timedelta(period) if isinstance(period, str) else period self.order = order self.amplitude = amplitude self.unit = unit self.block = block self.sub_block = sub_block
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): self.fit_check_features(X) max_level = get_tags_max_level(X.columns) self.new_columns_ = [] for od in range(1, self.order + 1): for trig in ["Sine", "Cosine"]: name = f"{self.period}_order_{od}_{trig}" if max_level > 0: name += f"__{self.unit}" if max_level > 1: name += f"__{self.block}" if max_level > 2: name += f"__{self.sub_block}" self.new_columns_.append(name) self.feature_names_out_.extend(self.new_columns_) return self def _transform_implementation(self, X: pd.Series | pd.DataFrame): check_is_fitted( self, attributes=["feature_names_in_", "feature_names_out_", "new_columns_"] ) begin = X.index[0] frequency = 1 / self.period.total_seconds() year_start = pd.Timestamp(begin.year, 1, 1) if begin.tz: year_start = year_start.tz_localize(begin.tz) seconds_from_start_of_year = (begin - year_start).total_seconds() phi = 2 * np.pi * frequency * seconds_from_start_of_year new_index = X.index.to_frame().diff().squeeze() sec_dt = [element.total_seconds() for element in new_index] increasing_seconds = pd.Series(sec_dt).cumsum().to_numpy() increasing_seconds[0] = 0 omega = 2 * np.pi * frequency for od, idx in zip( range(1, self.order + 1), range(0, len(self.new_columns_), 2) ): X[self.new_columns_[idx]] = self.amplitude * np.sin( od * omega * increasing_seconds + phi ) X[self.new_columns_[idx + 1]] = self.amplitude * np.cos( od * omega * increasing_seconds + phi ) return X
QUANTILE_METHODS = { "Gaussian": (gaussian_filter1d, dict(sigma=5, truncate=5, mode="wrap")), "Detrend": (detrend, dict(type="linear")), }
[docs] class DropQuantile(BaseProcessing): """ Filter outliers in time series data using quantile-based thresholds. This processor identifies and replaces outlier values with NaN based on quantile boundaries. It can optionally apply detrending methods before computing quantiles, making it robust against seasonal patterns or trends. Parameters ---------- upper_quantile : float, default=1.0 Upper quantile threshold for outlier detection. Values above this quantile are considered potential outliers. Must be in range [0, 1]. lower_quantile : float, default=0.0 Lower quantile threshold for outlier detection. Values below this quantile are considered potential outliers. Must be in range [0, 1]. n_iqr : float, optional Number of interquartile ranges (IQR) to extend beyond the quantile thresholds. When specified, the bounds are adjusted as: lower_bound = q_low - n_iqr * IQR upper_bound = q_up + n_iqr * IQR Commonly used with quantiles 0.25 and 0.75 (default quartiles). detrend_method : str, optional Name of the detrending method to apply before computing quantiles. Available methods depend on QUANTILE_METHODS dictionary. At the moment only 'Gaussian' and "Detrend" are available. Detrend is a linear or constant detrending. See scipy.signal. If None, quantiles are computed directly on raw data. method_args : dict, optional Additional keyword arguments to pass to the detrending method. For example: {'window': 48} for moving average window size. Attributes ---------- upper_quantile : float Stored upper quantile threshold. lower_quantile : float Stored lower quantile threshold. n_iqr : float or None Stored IQR multiplier. detrend_method : str or None Stored detrending method name. method_args : dict or None Stored detrending method arguments. Examples -------- Filter temperature data with outliers using IQR-based detection: >>> import pandas as pd >>> import numpy as np >>> from datetime import timedelta >>> >>> # Create toy temperature dataset with daily and semi-daily patterns >>> index = pd.date_range( ... "2009-01-01", "2009-01-01 23:00:00", freq="15min", tz="UTC" ... ) >>> t_seconds = np.arange(0, 24 * 3600 + 1, 15 * 60) >>> >>> # Daily sinusoidal pattern >>> daily_pattern = 5 * np.sin(2 * np.pi * t_seconds / (24 * 3600)) >>> >>> # Semi-daily sinusoidal pattern >>> semidaily_pattern = 5 * np.sin(2 * np.pi * t_seconds / (12 * 3600)) >>> >>> # Add random noise >>> rng = np.random.default_rng(42) >>> temp_data = pd.DataFrame( ... { ... "Temp_1": daily_pattern + rng.standard_normal(len(daily_pattern)), ... "Temp_2": semidaily_pattern ... + 2 * rng.standard_normal(len(semidaily_pattern)), ... }, ... index=index, ... ) >>> >>> # Apply outlier detection with Gaussian detrending >>> dropper = DropQuantile( ... upper_quantile=0.75, ... lower_quantile=0.25, ... n_iqr=1.5, ... detrend_method="Gaussian", ... ) >>> filtered_data = dropper.fit_transform(temp_data) >>> >>> # Check detected outliers >>> print(f"Outliers in Temp_1: {filtered_data['Temp_1'].isna().sum()}") >>> print(f"Outliers in Temp_2: {filtered_data['Temp_2'].isna().sum()}") Notes ----- - When using `n_iqr`, it is conventional to set `upper_quantile=0.75` and `lower_quantile=0.25` (the first and third quartiles). Other quantile values will trigger a warning. - The `detrend_method` parameter is critical for time series with trends or seasonality. Without detrending, quantile thresholds may incorrectly flag normal seasonal variations as outliers. Consider using: * 'Gaussian' for smooth trends * 'Detrend' linear dtrending - If `detrend_method=None`, the method operates on raw values, which may be appropriate only for stationary data without seasonal patterns. - The transformation replaces outliers with `np.nan` rather than removing rows, preserving the time series structure for subsequent processing. See Also -------- pandas.DataFrame.quantile : Compute quantiles of DataFrame columns. """
[docs] def __init__( self, upper_quantile: float = 1.0, lower_quantile: float = 0.0, n_iqr: float = None, detrend_method: str = None, method_args: dict = None, ): super().__init__() self.upper_quantile = upper_quantile self.lower_quantile = lower_quantile self.n_iqr = n_iqr self.detrend_method = detrend_method self.method_args = method_args if self.n_iqr and (self.upper_quantile != 0.75 or self.lower_quantile != 0.25): warnings.warn("n_iqr is tipicaly used with quantile of 0.25 et 0.75")
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): pass def _transform_implementation(self, X: pd.Series | pd.DataFrame): x = X.copy() if self.detrend_method and self.detrend_method in QUANTILE_METHODS.keys(): try: method, kwargs = QUANTILE_METHODS[self.detrend_method] except KeyError: raise NotImplementedError( f"The method {self.detrend_method} is not yet implemented" ) if self.method_args: kwargs.update(self.method_args) # Separate methods that returns the trend from the ones that returns # detrended series if self.detrend_method in ["Gaussian"]: residue = X - X.apply(partial(method, **kwargs)) else: residue = X.apply(partial(method, **kwargs)) else: residue = X.copy() assert True for col in x: q_low = np.quantile(residue[col].dropna(), self.lower_quantile) q_up = np.quantile(residue[col].dropna(), self.upper_quantile) if self.n_iqr: iqr = q_up - q_low q_low -= self.n_iqr * iqr q_up += self.n_iqr * iqr mask = (residue[col] < q_low) | (residue[col] > q_up) x.loc[mask, col] = np.nan return x
[docs] class TrimSequence(BaseProcessing): """ Trim the beginning and end of valid data sequences in time series. This processor identifies continuous sequences of non-NaN values and removes a specified duration from the start and/or end of each sequence. This is useful for removing potentially unreliable measurements at sequence boundaries, such as sensor warm-up periods or shutdown transients. Parameters ---------- trim_beginning : str, pd.Timedelta, or dt.timedelta, default=pd.Timedelta(0) Duration to trim from the beginning of each valid sequence. Can be specified as a string (e.g., "1h", "30min") or as a Timedelta object. Values within this duration from the start of each sequence are set to NaN. trim_end : str, pd.Timedelta, or dt.timedelta, default=pd.Timedelta(0) Duration to trim from the end of each valid sequence. Can be specified as a string (e.g., "1h", "30min") or as a Timedelta object. Values within this duration from the end of each sequence are set to NaN. Attributes ---------- trim_beginning : pd.Timedelta Stored duration to trim from sequence beginnings. trim_end : pd.Timedelta Stored duration to trim from sequence ends. Examples -------- Remove the first and last hour of each measurement sequence: >>> import pandas as pd >>> import numpy as np >>> >>> # Create toy dataset with gaps (sensor downtime) >>> index = pd.date_range("2024-01-01", "2024-01-03", freq="1h", tz="UTC") >>> data = pd.DataFrame( ... {"Temperature": np.random.randn(len(index)) + 20}, ... index=index, ... ) >>> >>> # Introduce gaps to simulate sensor downtime >>> data.loc["2024-01-01 12:00":"2024-01-01 18:00", "Temperature"] = np.nan >>> data.loc["2024-01-02 06:00":"2024-01-02 10:00", "Temperature"] = np.nan >>> >>> # Trim first and last hour of each sequence >>> trimmer = TrimSequence(trim_beginning="1h", trim_end="1h") >>> trimmed_data = trimmer.fit_transform(data) >>> >>> # Check how many values were trimmed >>> original_valid = data["Temperature"].notna().sum() >>> trimmed_valid = trimmed_data["Temperature"].notna().sum() >>> print(f"Values removed: {original_valid - trimmed_valid}") Notes ----- - A "sequence" is defined as a continuous block of non-NaN values. Each sequence is detected independently, and trimming is applied separately to each one. - If a sequence is shorter than `trim_beginning + trim_end`, the entire sequence will be set to NaN. - The transformation preserves the DataFrame structure and index, replacing trimmed values with `np.nan` rather than removing rows. - This processor is particularly useful for: * Removing sensor warm-up periods at measurement start * Excluding shutdown transients at measurement end * Filtering edge effects after data gaps or sensor restarts """
[docs] def __init__( self, trim_beginning: str | pd.Timedelta | dt.timedelta = pd.Timedelta(0), trim_end: str | pd.Timedelta | dt.timedelta = pd.Timedelta(0), ): super().__init__() self.trim_beginning = ( pd.Timedelta(trim_beginning) if isinstance(trim_beginning, str) else trim_beginning ) self.trim_end = ( pd.Timedelta(trim_end) if isinstance(trim_end, str) else trim_end )
def _fit_implementation(self, X: pd.Series | pd.DataFrame, y=None): pass def _transform_implementation(self, X: pd.Series | pd.DataFrame): out_cols = [] for col in X.columns: s = X[col] notna = s.notna() seq_id = (notna & ~notna.shift(fill_value=False)).cumsum() bounds = ( s.loc[notna] .groupby(seq_id[notna]) .agg(start=lambda x: x.index[0], end=lambda x: x.index[-1]) ) bounds["start"] = bounds["start"] + self.trim_beginning bounds["end"] = bounds["end"] - self.trim_end start_map = seq_id.map(bounds["start"]) end_map = seq_id.map(bounds["end"]) keep = ( notna & start_map.notna() & (s.index >= start_map) & (s.index <= end_map) ) out = s.copy() out[~keep] = pd.NA out_cols.append(out) return pd.concat(out_cols, axis=1)