Plumbing Module

The plumbing module provides the core functionality for data pipeline creation and management.

class tide.plumbing.Plumber(data=None, pipe_dict=None)[source]

Bases: object

A powerful class for managing and transforming time series data through configurable processing pipelines.

The Plumber class is the core component of the Tide library, providing a comprehensive interface for: - Managing time series data with hierarchical column naming (name__unit__bloc__sub_bloc) - Creating and executing data processing pipelines with column-wise transformations - Analyzing and visualizing data gaps and quality - Plotting time series with customizable multi-axis layouts

The class uses a tree structure to organize data columns based on their tags, allowing for: - Flexible data selection using tag-based queries - Hierarchical organization of data by unit, bloc, and sub-bloc - Automatic handling of data transformations at different steps

Parameters:
  • data (pd.Series or pd.DataFrame, optional) – Input time series data. Must have a datetime index with timezone information.

  • pipe_dict (dict, optional) – Pipeline configuration dictionary. Each key represents a processing step and contains either: - A list of transformations to apply to all columns - A dictionary mapping column tags to specific transformations

Variables:
  • data (pd.DataFrame) – The input time series data with datetime index

  • root (Node) – Root node of the tree structure organizing column names

  • pipe_dict (dict) – Configuration dictionary defining the processing pipeline steps

Examples

>>> from tide.plumbing import Plumber
>>> import pandas as pd
>>> # Create sample data with hierarchical column names
>>> data = pd.DataFrame(
...     {
...         "temp__°C__zone1": [20, 21, np.nan, 23],
...         "humid__%HR__zone1": [50, 55, 60, np.nan],
...         "power__kW__hvac": [1.5, 1.8, 1.6, 1.7],
...     },
...     index=pd.date_range("2023", freq="h", periods=4, tz="UTC"),
... )
>>> # Define pipeline configuration
>>> pipe_dict = {
...     "pre_processing": {
...         "°C": [["ReplaceThreshold", {"upper": 25}]],
...         "%HR": [["ReplaceThreshold", {"upper": 100}]],
...     },
...     "common": [["Interpolate", ["linear"]]],
... }
>>> # Initialize and process data
>>> plumber = Plumber(data, pipe_dict)
>>> corrected = plumber.get_corrected_data()
>>> # Analyze gaps
>>> gaps = plumber.get_gaps_description()
>>> # Visualize data
>>> plumber.plot(y_axis_level="unit")

Notes

  • Column names can use any combination of tags (name, unit, bloc, sub_bloc) separated by double underscores. Examples: - Simple: “temperature” - With unit: “temperature__°C” - Full: “temperature__°C__zone1__room1”

  • Input data must have a datetime index with timezone information

  • Pipeline steps can be applied globally or to specific column groups

  • Supports all transformations from the processing module

  • Provides comprehensive gap analysis and visualization tools

  • Uses plotly for interactive data visualization

__init__(data=None, pipe_dict=None)[source]
show(select=None, steps=slice(None, None, None), depth_level=None)[source]

Display the tree structure of selected data columns at selected steps for a given depth level.

Parameters:
  • select (str or pd.Index or list[str], optional) – Data selection using tide’s tag system

  • steps (None or str or list[str] or slice, default slice(None)) – Pipeline steps to apply before showing the tree

  • depth_level (int or str, optional) – Maximum depth level to display in the tree

get_gaps_description(select=None, steps=slice(None, None, None), verbose=False, gaps_lte=None, gaps_gte=None, return_combination=True)[source]

Get a statistical description of gaps durations in the data.

Parameters:
  • select (str or pd.Index or list[str], optional) – Data selection using tide’s tag system

  • steps (None or str or list[str] or slice, default slice(None)) – Pipeline steps to apply before analyzing gaps

  • verbose (bool, default False) – Whether to print information about pipeline steps

  • gaps_lte (str or pd.Timedelta or dt.timedelta, optional) – Upper threshold for gap duration

  • gaps_gte (str or pd.Timedelta or dt.timedelta, optional) – Lower threshold for gap duration

  • return_combination (bool, default True) – Whether to include statistics for gaps aggregation. Useful to get statistics when all data are available.

Returns:

DataFrame containing statistics about gap durations for each column. Statistics include: - data_presence_%: percentage of non-gap data points - count: number of gaps - mean: average gap duration - std: standard deviation of gap durations - min: shortest gap - 25%: first quartile - 50%: median - 75%: third quartile - max: longest gap Empty DataFrame if no gaps are found.

Return type:

pd.DataFrame

set_data(data)[source]

Set new data for the Plumber instance.

Parameters:

data (pd.Series or pd.DataFrame) – New time series data to process. Must have a datetime index with timezone information.

select(select=None)[source]

Select columns based on tags.

Parameters:

select (str or pd.Index or list[str], optional) – Selection criteria using tide’s tag system. Can be a unit (e.g., “°C”), location (e.g., “zone_1”), or any other tag in the column names.

Returns:

Selected column names

Return type:

pd.Index

get_pipeline(select=None, steps=slice(None, None, None), verbose=False)[source]

Create a scikit-learn pipeline from the configuration.

This method builds a scikit-learn Pipeline object based on the current configuration and selected data columns. The pipeline can be used to transform data according to the defined processing steps.

Parameters:
  • select (str or pd.Index or list[str], optional) – Data selection using tide’s tag system. Can be: - A single tag (e.g., “°C” to select all temperature columns) - A full column name pattern (e.g., “temp__°C__zone1”) If None, selects all columns.

  • steps (None or str or list[str] or slice, default slice(None)) – Pipeline steps to include. Can be: - A single step name (e.g., “pre_processing”) - A list of step names (e.g., [“pre_processing”, “common”]) - A slice object (e.g., slice(“pre_processing”, “common”)) - None to return an Identity transformer - slice(None) to include all steps

  • verbose (bool, default False) – Whether to print information about pipeline steps during creation

Returns:

A scikit-learn Pipeline object configured with the selected steps and columns. The pipeline will transform the data according to the processing steps defined in pipe_dict.

Return type:

Pipeline

Raises:

ValueError – If data is not set (self.data is None)

Examples

>>> from tide.plumbing import Plumber
>>> import pandas as pd
>>> # Create sample data
>>> data = pd.DataFrame(
...     {
...         "temp__°C__zone1": [20, 21, np.nan, 23],
...         "humid__%HR__zone1": [50, 55, 60, np.nan],
...         "power__kW__hvac": [1.5, 1.8, 1.6, 1.7],
...     },
...     index=pd.date_range("2023", freq="h", periods=4, tz="UTC"),
... )
>>> # Define pipeline configuration
>>> pipe_dict = {
...     "pre_processing": {
...         "°C": [["ReplaceThreshold", {"upper": 25}]],
...         "%HR": [["ReplaceThreshold", {"upper": 100}]],
...     },
...     "common": [["Interpolate", ["linear"]]],
... }
>>> # Initialize Plumber
>>> plumber = Plumber(data, pipe_dict)
>>> # Get pipeline for temperature columns only
>>> temp_pipe = plumber.get_pipeline(select="°C")
>>> # Get pipeline for all columns with only pre-processing step
>>> pre_pipe = plumber.get_pipeline(steps="pre_processing")
>>> # Get pipeline for specific columns and steps
>>> custom_pipe = plumber.get_pipeline(
...     select=["temp__°C__zone1", "power__kW__hvac"],
...     steps=["pre_processing", "common"],
... )
get_corrected_data(select=None, start=None, stop=None, steps=slice(None, None, None), verbose=False)[source]

Apply pipeline transformations to selected data.

This method applies the configured processing pipeline to the selected data columns within the specified time range. It returns a new DataFrame with the transformed data.

Parameters:
  • select (str or pd.Index or list[str], optional) – Data selection using tide’s tag system. Can be: - A single tag (e.g., “°C” to select all temperature columns) - A full column name pattern (e.g., “temp__°C__zone1”) If None, selects all columns.

  • start (str or datetime or Timestamp, optional) – Start time for data slice. Can be: - A string in ISO format (e.g., “2023-01-01”) - A datetime object - A pandas Timestamp If None, uses the first timestamp in the data.

  • stop (str or datetime or Timestamp, optional) – End time for data slice. Can be: - A string in ISO format (e.g., “2023-12-31”) - A datetime object - A pandas Timestamp If None, uses the last timestamp in the data.

  • steps (None or str or list[str] or slice, default slice(None)) – Pipeline steps to apply. Can be: - A single step name (e.g., “pre_processing”) - A list of step names (e.g., [“pre_processing”, “common”]) - A slice object (e.g., slice(“pre_processing”, “common”)) - None to return an Identity transformer - slice(None) to include all steps

  • verbose (bool, default False) – Whether to print information about pipeline steps during processing

Return type:

pd.DataFrame

Raises:

ValueError – If data is not set (self.data is None)

Examples

>>> from tide.plumbing import Plumber
>>> import pandas as pd
>>> # Create sample data
>>> data = pd.DataFrame(
...     {
...         "temp__°C__zone1": [20, 21, np.nan, 23],
...         "humid__%HR__zone1": [50, 55, 60, np.nan],
...         "power__kW__hvac": [1.5, 1.8, 1.6, 1.7],
...     },
...     index=pd.date_range("2023", freq="h", periods=4, tz="UTC"),
... )
>>> # Define pipeline configuration
>>> pipe_dict = {
...     "pre_processing": {
...         "°C": [["ReplaceThreshold", {"upper": 25}]],
...         "%HR": [["ReplaceThreshold", {"upper": 100}]],
...     },
...     "common": [["Interpolate", ["linear"]]],
... }
>>> # Initialize Plumber
>>> plumber = Plumber(data, pipe_dict)
>>> # Get corrected data for temperature columns only
>>> temp_data = plumber.get_corrected_data(select="°C")
>>> # Get corrected data for a specific time range
>>> time_slice = plumber.get_corrected_data(
...     start="2023-01-01T00:00:00", stop="2023-01-01T12:00:00"
... )
>>> # Get corrected data with specific steps
>>> pre_processed = plumber.get_corrected_data(
...     select=["temp__°C__zone1", "power__kW__hvac"], steps="pre_processing"
... )
plot_gaps_heatmap(select=None, start=None, stop=None, steps=slice(None, None, None), time_step=None, title=None, verbose=False)[source]

Create a heatmap visualization of data gaps.

This method generates an interactive heatmap using plotly that shows the presence and distribution of data gaps across different columns and time periods. The heatmap helps identify patterns in missing data and potential data quality issues.

Parameters:
  • select (str or pd.Index or list[str], optional) – Data selection using tide’s tag system. Can be: - A single tag (e.g., “°C” to select all temperature columns) - A full column name pattern (e.g., “temp__°C__zone1”) If None, selects all columns.

  • start (str or datetime or Timestamp, optional) – Start time for visualization. Can be: - A string in ISO format (e.g., “2023-01-01”) - A datetime object - A pandas Timestamp If None, uses the first timestamp in the data.

  • stop (str or datetime or Timestamp, optional) – End time for visualization. Can be: - A string in ISO format (e.g., “2023-12-31”) - A datetime object - A pandas Timestamp If None, uses the last timestamp in the data.

  • steps (None or str or list[str] or slice, default slice(None)) – Pipeline steps to apply before visualization. Can be: - A single step name (e.g., “pre_processing”) - A list of step names (e.g., [“pre_processing”, “common”]) - A slice object (e.g., slice(“pre_processing”, “common”)) - None to return an Identity transformer - slice(None) to include all steps

  • time_step (str or Timedelta or timedelta, optional) – Time step for aggregating gaps. Can be: - A string (e.g., “1h”, “1d”, “1w”) - A timedelta object - A pandas Timedelta If None, uses the original data frequency.

  • title (str, optional) – Plot title. If None, uses a default title based on the data selection.

  • verbose (bool, default False) – Whether to print information about pipeline steps during processing

Returns:

A plotly Figure object containing the heatmap with: - Rows representing different columns - Columns representing time periods - Colors indicating presence (white) or absence (colored) of data - Interactive features (zoom, pan, hover information)

Return type:

go.Figure

Examples

>>> from tide.plumbing import Plumber
>>> import pandas as pd
>>> # Create sample data with gaps
>>> data = pd.DataFrame(
...     {
...         "temp__°C__zone1": [20, np.nan, 23, np.nan, 25],
...         "humid__%HR__zone1": [50, 55, np.nan, 60, np.nan],
...         "power__kW__hvac": [1.5, 1.8, 1.6, np.nan, 1.7],
...     },
...     index=pd.date_range("2023", freq="h", periods=5, tz="UTC"),
... )
>>> # Initialize Plumber
>>> plumber = Plumber(data)
>>> # Create heatmap for all columns
>>> fig = plumber.plot_gaps_heatmap()
>>> fig.show()
>>> # Create heatmap for temperature data with daily aggregation
>>> fig = plumber.plot_gaps_heatmap(
...     select="°C", time_step="1d", title="Temperature Data Gaps"
... )
>>> fig.show()
>>> # Create heatmap for specific time range
>>> fig = plumber.plot_gaps_heatmap(
...     start="2023-01-01T00:00:00", stop="2023-01-01T12:00:00"
... )
>>> fig.show()
plot(select=None, start=None, stop=None, y_axis_level=None, y_tag_list=None, steps=slice(None, None, None), data_mode='lines', steps_2=None, data_2_mode='markers', markers_opacity=0.8, lines_width=2.0, title=None, plot_gaps=False, gaps_lower_td=None, gaps_rgb=(31, 73, 125), gaps_alpha=0.5, plot_gaps_2=False, gaps_2_lower_td=None, gaps_2_rgb=(254, 160, 34), gaps_2_alpha=0.5, axis_space=0.03, y_title_standoff=5, verbose=False, use_resampler=False)[source]

Create an interactive time series plot.

This method generates a highly customizable interactive plot using plotly that can show: - Multiple time series with automatic different y-axes based on unit - Two different versions of the data (e.g., raw and processed) - Data gaps visualization with customizable colors and opacity - Custom styling and layout options

Parameters:
  • select (str or pd.Index or list[str], optional) – Data selection using tide’s tag system. Can be: - A single tag (e.g., “°C” to select all temperature columns) - A full column name pattern (e.g., “temp__°C__zone1”) If None, selects all columns.

  • start (str or datetime or Timestamp, optional) – Start time for plot. Can be: - A string in ISO format (e.g., “2023-01-01”) - A datetime object - A pandas Timestamp If None, uses the first timestamp in the data.

  • stop (str or datetime or Timestamp, optional) – End time for plot. Can be: - A string in ISO format (e.g., “2023-12-31”) - A datetime object - A pandas Timestamp If None, uses the last timestamp in the data.

  • y_axis_level (str, optional) – Tag level to use for y-axis grouping. Can be: - “unit” to group by measurement unit - “bloc” to group by data bloc - “sub_bloc” to group by sub-bloc If None, uses a single y-axis for all data.

  • y_tag_list (list[str], optional) – List of tags for custom y-axis ordering. The order of tags in this list determines the order of y-axes from left to right.

  • steps (None or str or list[str] or slice, default slice(None)) – Pipeline steps to apply for main data. Can be: - A single step name (e.g., “pre_processing”) - A list of step names (e.g., [“pre_processing”, “common”]) - A slice object (e.g., slice(“pre_processing”, “common”)) - None to return an Identity transformer - slice(None) to include all steps

  • data_mode (str, default "lines") – Plot mode for main data. Can be: - “lines” for line plots - “markers” for scatter plots - “lines+markers” for combined line and marker plots

  • steps_2 (None or str or list[str] or slice, optional) – Pipeline steps to apply for secondary data. Used to compare different processing steps or versions of the data.

  • data_2_mode (str, default "markers") – Plot mode for secondary data. Same options as data_mode.

  • markers_opacity (float, default 0.8) – Opacity for markers (0.0 to 1.0)

  • lines_width (float, default 2.0) – Width of plot lines in pixels

  • title (str, optional) – Plot title. If None, uses a default title based on the data selection.

  • plot_gaps (bool, default False) – Whether to highlight gaps in main data

  • gaps_lower_td (str or Timedelta or timedelta, optional) – Minimum duration for gap highlighting. Can be: - A string (e.g., “1h”, “1d”) - A timedelta object - A pandas Timedelta

  • gaps_rgb (tuple[int, int, int], default (31, 73, 125)) – RGB color for main data gaps (0-255 range)

  • gaps_alpha (float, default 0.5) – Opacity for main data gaps (0.0 to 1.0)

  • plot_gaps_2 (bool, default False) – Whether to highlight gaps in secondary data

  • gaps_2_lower_td (str or Timedelta or timedelta, optional) – Minimum duration for secondary data gap highlighting

  • gaps_2_rgb (tuple[int, int, int], default (254, 160, 34)) – RGB color for secondary data gaps (0-255 range)

  • gaps_2_alpha (float, default 0.5) – Opacity for secondary data gaps (0.0 to 1.0)

  • axis_space (float, default 0.03) – Space between multiple y-axes (0.0 to 1.0)

  • y_title_standoff (int or float, default 5) – Distance between y-axis title and axis in pixels

  • verbose (bool, default False) – Whether to print information about pipeline steps during processing

  • use_resampler (bool, default False) – Whether to use plotly-resampler for dynamic data aggregation. Requires the optional dependency plotly-resampler (pip install python-tide[resampler]). When enabled, the figure dynamically resamples data on zoom/pan, making it practical for large datasets (e.g. 1-minute resolution over a year). Dynamic resampling requires a live Python server to respond to zoom events — it does not work with a static fig.show() call. In a Jupyter environment (with an active kernel), returns a FigureWidgetResampler — display the figure by evaluating fig in a cell (do not call fig.show()). In a non-Jupyter environment, returns a FigureResampler; call fig.show_dash() to launch the interactive Dash server.

Returns:

A plotly Figure object containing the plot with: - Multiple y-axes if y_axis_level is specified - Interactive features (zoom, pan, hover information) - Legend with all series - Optional gap highlighting - Customizable styling - Dynamic resampling on zoom/pan if use_resampler=True

Return type:

go.Figure or FigureResampler or FigureWidgetResampler

Examples

>>> from tide.plumbing import Plumber
>>> import pandas as pd
>>> # Create sample data
>>> data = pd.DataFrame(
...     {
...         "temp__°C__zone1": [20, 21, np.nan, 23],
...         "humid__%HR__zone1": [50, 55, 60, np.nan],
...         "power__kW__hvac": [1.5, 1.8, 1.6, 1.7],
...     },
...     index=pd.date_range("2023", freq="h", periods=4, tz="UTC"),
... )
>>> # Initialize Plumber
>>> plumber = Plumber(data)
>>> # Create basic plot with automatic y-axes
>>> fig = plumber.plot(y_axis_level="unit")
>>> fig.show()
>>> # Create plot with custom styling and gap highlighting
>>> fig = plumber.plot(
...     select=["temp__°C__zone1", "power__kW__hvac"],
...     data_mode="lines+markers",
...     plot_gaps=True,
...     gaps_lower_td="1h",
...     title="Temperature and Power Data",
... )
>>> fig.show()
>>> # Create plot comparing raw and processed data
>>> fig = plumber.plot(
...     steps="pre_processing",
...     steps_2=None,
...     data_mode="lines",
...     data_2_mode="markers",
...     title="Raw vs Processed Data",
... )
>>> fig.show()
>>> # Use dynamic resampling for large datasets (e.g. 1-min data over 1 year)
>>> # In a Jupyter notebook:
>>> fig = plumber.plot(use_resampler=True)
>>> fig  # displays as an interactive widget with on-the-fly resampling
>>> # Outside Jupyter:
>>> fig = plumber.plot(use_resampler=True)
>>> fig.show_dash()  # launches a local Dash server for interactive resampling
plot_dash(select=None, start=None, stop=None, y_axis_level=None, y_tag_list=None, steps=slice(None, None, None), data_mode='lines', steps_2=None, data_2_mode='markers', markers_opacity=0.8, lines_width=2.0, title=None, plot_gaps=False, gaps_lower_td=None, gaps_rgb=(31, 73, 125), gaps_alpha=0.5, plot_gaps_2=False, gaps_2_lower_td=None, gaps_2_rgb=(254, 160, 34), gaps_2_alpha=0.5, axis_space=0.03, y_title_standoff=5, verbose=False, port=8050)[source]

Launch an interactive Dash application for time series exploration.

Provides the same visualisation options as plot() with two additions:

  • A collapsible sidebar listing all available columns with per-series visibility toggles and colour pickers.

  • When plotly-resampler is installed, traces are dynamically downsampled on zoom/pan so arbitrarily large datasets remain responsive.

The server runs in a background daemon thread and the default browser opens automatically. It stays alive as long as the Python process runs. Calling this method multiple times on different ports is supported.

Parameters:
  • select (str or pd.Index or list[str], optional) – Columns pre-selected (visible) when the app opens. Accepts a tide tag query string, an explicit list of column names, or a pd.Index. Defaults to all columns.

  • start (str or datetime or Timestamp, optional) – Start of the displayed time range.

  • stop (str or datetime or Timestamp, optional) – End of the displayed time range.

  • y_axis_level (str, optional) – Tag level used to group columns onto separate y-axes ("unit", "bloc", etc.).

  • y_tag_list (list[str], optional) – Explicit list of tag values for y-axis grouping, overrides y_axis_level.

  • steps (None or str or list[str] or slice, default slice(None)) – Pipeline step(s) applied to the primary data before plotting.

  • data_mode (str, default "lines") – Plotly trace mode for the primary data ("lines", "markers", "lines+markers").

  • steps_2 (None or str or list[str] or slice, optional) – Pipeline step(s) for the optional secondary dataset overlay.

  • data_2_mode (str, default "markers") – Plotly trace mode for the secondary data.

  • markers_opacity (float, default 0.8) – Opacity of markers (0.0 – 1.0).

  • lines_width (float, default 2.0) – Width of line traces in pixels.

  • title (str, optional) – Figure title.

  • plot_gaps (bool, default False) – Highlight gaps in the primary data.

  • gaps_lower_td (str or Timedelta or timedelta, optional) – Minimum gap duration to highlight (e.g. "1h").

  • gaps_rgb (tuple[int, int, int], default (31, 73, 125)) – RGB colour for primary-data gap rectangles.

  • gaps_alpha (float, default 0.5) – Opacity for primary-data gap rectangles.

  • plot_gaps_2 (bool, default False) – Highlight gaps in the secondary data.

  • gaps_2_lower_td (str or Timedelta or timedelta, optional) – Minimum gap duration to highlight in the secondary data.

  • gaps_2_rgb (tuple[int, int, int], default (254, 160, 34)) – RGB colour for secondary-data gap rectangles.

  • gaps_2_alpha (float, default 0.5) – Opacity for secondary-data gap rectangles.

  • axis_space (float, default 0.03) – Horizontal space reserved per additional right-side y-axis.

  • y_title_standoff (int or float, default 5) – Distance between a y-axis title and the axis line (pixels).

  • verbose (bool, default False) – Print pipeline processing information.

  • port (int, default 8050) – TCP port for the Dash server.

Returns:

The method launches a Dash server as a side effect; it does not return a figure object.

Return type:

None

Examples

>>> plumber.plot_dash()
>>> # Pre-select columns and separate axes by unit
>>> plumber.plot_dash(
...     select=["temp__°C__zone1", "power__kW__hvac"],
...     y_axis_level="unit",
...     title="Zone 1 monitoring",
... )
>>> # Compare raw vs processed data on port 8051
>>> plumber.plot_dash(
...     steps=slice(None),
...     steps_2=None,
...     data_2_mode="markers",
...     port=8051,
... )

Pipeline Creation Functions

tide.plumbing._get_pipe_from_proc_list(data_columns, proc_list, tz, verbose=False)[source]
Return type:

Pipeline

tide.plumbing._get_column_wise_transformer(proc_dict, data_columns, tz, process_name=None, verbose=False)[source]
Return type:

ColumnTransformer | None

tide.plumbing.get_pipeline_from_dict(data_columns, pipe_dict=None, tz='UTC', verbose=False)[source]

Helper Functions

tide.plumbing._dummy_df(columns, tz)[source]