Skip to content

API reference

Dataset

Bases: BaseModel, Generic[T]

Store and operate on a collection of Timeseries.

Attributes:

Name Type Description
timeseries list[Timeseries]

A list of Timeseries objects.

Source code in gensor/core/dataset.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class Dataset(pyd.BaseModel, Generic[T]):
    """Store and operate on a collection of Timeseries.

    Attributes:
        timeseries (list[Timeseries]): A list of Timeseries objects.
    """

    timeseries: list[T | None] = pyd.Field(default_factory=list)

    def __iter__(self) -> Any:
        """Allows to iterate directly over the dataset."""
        return iter(self.timeseries)

    def __len__(self) -> int:
        """Gives the number of timeseries in the Dataset."""
        return len(self.timeseries)

    def __repr__(self) -> str:
        return f"Dataset({len(self)})"

    def __getitem__(self, index: int) -> T | None:
        """Retrieve a Timeseries object by its index in the dataset.

        !!! warning
            Using index will return the reference to the timeseries. If you need a copy,
            use .filter() instead of Dataset[index]

        Parameters:
            index (int): The index of the Timeseries to retrieve.

        Returns:
            Timeseries: The Timeseries object at the specified index.

        Raises:
            IndexError: If the index is out of range.
        """
        try:
            return self.timeseries[index]
        except IndexError:
            raise IndexOutOfRangeError(index, len(self)) from None

    def get_locations(self) -> list:
        """List all unique locations in the dataset."""
        return [ts.location for ts in self.timeseries if ts is not None]

    def add(self, other: T | list[T] | Dataset) -> Dataset:
        """Appends new Timeseries to the Dataset.

        If an equal Timeseries already exists, merge the new data into the existing
        Timeseries, dropping duplicate timestamps.

        Parameters:
            other (Timeseries): The Timeseries object to add.
        """

        # I need to check for BaseTimeseries instance in the add() method, but also
        # type hint VarType T.
        if isinstance(other, list | Dataset):
            for ts in other:
                if isinstance(ts, BaseTimeseries):
                    self._add_single_timeseries(ts)  # type: ignore[arg-type]

        elif isinstance(other, BaseTimeseries):
            self._add_single_timeseries(other)

        return self

    def _add_single_timeseries(self, ts: T) -> None:
        """Adds a single Timeseries to the Dataset or merges if an equal one exists."""
        for i, existing_ts in enumerate(self.timeseries):
            if existing_ts == ts:
                self.timeseries[i] = existing_ts.concatenate(ts)
                return

        self.timeseries.append(ts)

        return

    def filter(
        self,
        location: str | list | None = None,
        variable: str | list | None = None,
        unit: str | list | None = None,
        **kwargs: dict[str, str | list],
    ) -> T | Dataset:
        """Return a Timeseries or a new Dataset filtered by station, sensor,
        and/or variable.

        Parameters:
            location (Optional[str]): The location name.
            variable (Optional[str]): The variable being measured.
            unit (Optional[str]): Unit of the measurement.
            **kwargs (dict): Attributes of subclassed timeseries used for filtering
                (e.g., sensor, method).

        Returns:
            Timeseries | Dataset: A single Timeseries if exactly one match is found,
                                   or a new Dataset if multiple matches are found.
        """

        def matches(ts: T, attr: str, value: dict[str, str | list]) -> bool | None:
            """Check if the Timeseries object has the attribute and if it matches the value."""
            if not hasattr(ts, attr):
                message = f"'{ts.__class__.__name__}' object has no attribute '{attr}'"
                raise AttributeError(message)
            return getattr(ts, attr) in value

        if isinstance(location, str):
            location = [location]
        if isinstance(variable, str):
            variable = [variable]
        if isinstance(unit, str):
            unit = [unit]
        for key, value in kwargs.items():
            if isinstance(value, str):
                kwargs[key] = [value]

        matching_timeseries = [
            ts
            for ts in self.timeseries
            if ts is not None
            and (location is None or ts.location in location)
            and (variable is None or ts.variable in variable)
            and (unit is None or ts.unit in unit)
            and all(matches(ts, attr, value) for attr, value in kwargs.items())
        ]

        if not matching_timeseries:
            return Dataset()

        if len(matching_timeseries) == 1:
            return matching_timeseries[0].model_copy(deep=True)

        return self.model_copy(update={"timeseries": matching_timeseries})

    def to_sql(self, db: DatabaseConnection) -> None:
        """Save the entire timeseries to a SQLite database.

        Parameters:
            db (DatabaseConnection): SQLite database connection object.
        """
        for ts in self.timeseries:
            if ts:
                ts.to_sql(db)
        return

    def plot(
        self,
        include_outliers: bool = False,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plots the timeseries data, grouping by variable type.

        Parameters:
            include_outliers (bool): Whether to include outliers in the plot.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

        Returns:
            (fig, ax): Matplotlib figure and axes to allow further customization.
        """

        grouped_ts = defaultdict(list)

        for ts in self.timeseries:
            if ts:
                grouped_ts[ts.variable].append(ts)

        num_variables = len(grouped_ts)

        fig, axes = plt.subplots(
            num_variables, 1, figsize=(10, 5 * num_variables), sharex=True
        )

        if num_variables == 1:
            axes = [axes]

        for ax, (variable, ts_list) in zip(axes, grouped_ts.items(), strict=False):
            for ts in ts_list:
                ts.plot(
                    include_outliers=include_outliers,
                    ax=ax,
                    plot_kwargs=plot_kwargs,
                    legend_kwargs=legend_kwargs,
                )

            ax.set_title(f"Timeseries for {variable.capitalize()}")
            ax.set_xlabel("Time")

        fig.tight_layout()
        return fig, axes

__getitem__(index)

Retrieve a Timeseries object by its index in the dataset.

Warning

Using index will return the reference to the timeseries. If you need a copy, use .filter() instead of Dataset[index]

Parameters:

Name Type Description Default
index int

The index of the Timeseries to retrieve.

required

Returns:

Name Type Description
Timeseries T | None

The Timeseries object at the specified index.

Raises:

Type Description
IndexError

If the index is out of range.

Source code in gensor/core/dataset.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __getitem__(self, index: int) -> T | None:
    """Retrieve a Timeseries object by its index in the dataset.

    !!! warning
        Using index will return the reference to the timeseries. If you need a copy,
        use .filter() instead of Dataset[index]

    Parameters:
        index (int): The index of the Timeseries to retrieve.

    Returns:
        Timeseries: The Timeseries object at the specified index.

    Raises:
        IndexError: If the index is out of range.
    """
    try:
        return self.timeseries[index]
    except IndexError:
        raise IndexOutOfRangeError(index, len(self)) from None

__iter__()

Allows to iterate directly over the dataset.

Source code in gensor/core/dataset.py
25
26
27
def __iter__(self) -> Any:
    """Allows to iterate directly over the dataset."""
    return iter(self.timeseries)

__len__()

Gives the number of timeseries in the Dataset.

Source code in gensor/core/dataset.py
29
30
31
def __len__(self) -> int:
    """Gives the number of timeseries in the Dataset."""
    return len(self.timeseries)

add(other)

Appends new Timeseries to the Dataset.

If an equal Timeseries already exists, merge the new data into the existing Timeseries, dropping duplicate timestamps.

Parameters:

Name Type Description Default
other Timeseries

The Timeseries object to add.

required
Source code in gensor/core/dataset.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def add(self, other: T | list[T] | Dataset) -> Dataset:
    """Appends new Timeseries to the Dataset.

    If an equal Timeseries already exists, merge the new data into the existing
    Timeseries, dropping duplicate timestamps.

    Parameters:
        other (Timeseries): The Timeseries object to add.
    """

    # I need to check for BaseTimeseries instance in the add() method, but also
    # type hint VarType T.
    if isinstance(other, list | Dataset):
        for ts in other:
            if isinstance(ts, BaseTimeseries):
                self._add_single_timeseries(ts)  # type: ignore[arg-type]

    elif isinstance(other, BaseTimeseries):
        self._add_single_timeseries(other)

    return self

filter(location=None, variable=None, unit=None, **kwargs)

Return a Timeseries or a new Dataset filtered by station, sensor, and/or variable.

Parameters:

Name Type Description Default
location Optional[str]

The location name.

None
variable Optional[str]

The variable being measured.

None
unit Optional[str]

Unit of the measurement.

None
**kwargs dict

Attributes of subclassed timeseries used for filtering (e.g., sensor, method).

{}

Returns:

Type Description
T | Dataset

Timeseries | Dataset: A single Timeseries if exactly one match is found, or a new Dataset if multiple matches are found.

Source code in gensor/core/dataset.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def filter(
    self,
    location: str | list | None = None,
    variable: str | list | None = None,
    unit: str | list | None = None,
    **kwargs: dict[str, str | list],
) -> T | Dataset:
    """Return a Timeseries or a new Dataset filtered by station, sensor,
    and/or variable.

    Parameters:
        location (Optional[str]): The location name.
        variable (Optional[str]): The variable being measured.
        unit (Optional[str]): Unit of the measurement.
        **kwargs (dict): Attributes of subclassed timeseries used for filtering
            (e.g., sensor, method).

    Returns:
        Timeseries | Dataset: A single Timeseries if exactly one match is found,
                               or a new Dataset if multiple matches are found.
    """

    def matches(ts: T, attr: str, value: dict[str, str | list]) -> bool | None:
        """Check if the Timeseries object has the attribute and if it matches the value."""
        if not hasattr(ts, attr):
            message = f"'{ts.__class__.__name__}' object has no attribute '{attr}'"
            raise AttributeError(message)
        return getattr(ts, attr) in value

    if isinstance(location, str):
        location = [location]
    if isinstance(variable, str):
        variable = [variable]
    if isinstance(unit, str):
        unit = [unit]
    for key, value in kwargs.items():
        if isinstance(value, str):
            kwargs[key] = [value]

    matching_timeseries = [
        ts
        for ts in self.timeseries
        if ts is not None
        and (location is None or ts.location in location)
        and (variable is None or ts.variable in variable)
        and (unit is None or ts.unit in unit)
        and all(matches(ts, attr, value) for attr, value in kwargs.items())
    ]

    if not matching_timeseries:
        return Dataset()

    if len(matching_timeseries) == 1:
        return matching_timeseries[0].model_copy(deep=True)

    return self.model_copy(update={"timeseries": matching_timeseries})

get_locations()

List all unique locations in the dataset.

Source code in gensor/core/dataset.py
57
58
59
def get_locations(self) -> list:
    """List all unique locations in the dataset."""
    return [ts.location for ts in self.timeseries if ts is not None]

plot(include_outliers=False, plot_kwargs=None, legend_kwargs=None)

Plots the timeseries data, grouping by variable type.

Parameters:

Name Type Description Default
include_outliers bool

Whether to include outliers in the plot.

False
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes to allow further customization.

Source code in gensor/core/dataset.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def plot(
    self,
    include_outliers: bool = False,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plots the timeseries data, grouping by variable type.

    Parameters:
        include_outliers (bool): Whether to include outliers in the plot.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

    Returns:
        (fig, ax): Matplotlib figure and axes to allow further customization.
    """

    grouped_ts = defaultdict(list)

    for ts in self.timeseries:
        if ts:
            grouped_ts[ts.variable].append(ts)

    num_variables = len(grouped_ts)

    fig, axes = plt.subplots(
        num_variables, 1, figsize=(10, 5 * num_variables), sharex=True
    )

    if num_variables == 1:
        axes = [axes]

    for ax, (variable, ts_list) in zip(axes, grouped_ts.items(), strict=False):
        for ts in ts_list:
            ts.plot(
                include_outliers=include_outliers,
                ax=ax,
                plot_kwargs=plot_kwargs,
                legend_kwargs=legend_kwargs,
            )

        ax.set_title(f"Timeseries for {variable.capitalize()}")
        ax.set_xlabel("Time")

    fig.tight_layout()
    return fig, axes

to_sql(db)

Save the entire timeseries to a SQLite database.

Parameters:

Name Type Description Default
db DatabaseConnection

SQLite database connection object.

required
Source code in gensor/core/dataset.py
151
152
153
154
155
156
157
158
159
160
def to_sql(self, db: DatabaseConnection) -> None:
    """Save the entire timeseries to a SQLite database.

    Parameters:
        db (DatabaseConnection): SQLite database connection object.
    """
    for ts in self.timeseries:
        if ts:
            ts.to_sql(db)
    return

Timeseries

Bases: BaseTimeseries

Timeseries of groundwater sensor data.

Attributes:

Name Type Description
ts Series

The timeseries data.

variable Literal['temperature', 'pressure', 'conductivity', 'flux']

The type of the measurement.

unit Literal['degC', 'mmH2O', 'mS/cm', 'm/s']

The unit of the measurement.

sensor str

The serial number of the sensor.

sensor_alt float

Altitude of the sensor (ncessary to compute groundwater levels).

Source code in gensor/core/timeseries.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Timeseries(BaseTimeseries):
    """Timeseries of groundwater sensor data.

    Attributes:
        ts (pd.Series): The timeseries data.
        variable (Literal['temperature', 'pressure', 'conductivity', 'flux']):
            The type of the measurement.
        unit (Literal['degC', 'mmH2O', 'mS/cm', 'm/s']): The unit of
            the measurement.
        sensor (str): The serial number of the sensor.
        sensor_alt (float): Altitude of the sensor (ncessary to compute groundwater levels).
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    sensor: str | None = None
    sensor_alt: float | None = None

    def __eq__(self, other: object) -> bool:
        """Check equality based on location, sensor, variable, unit and sensor_alt."""
        if not isinstance(other, Timeseries):
            return NotImplemented

        if not super().__eq__(other):
            return False

        return self.sensor == other.sensor and self.sensor_alt == other.sensor_alt

    def plot(
        self,
        include_outliers: bool = False,
        ax: Axes | None = None,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plots the timeseries data.

        Parameters:
            include_outliers (bool): Whether to include outliers in the plot.
            ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
                If None, a new figure and axes are created.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

        Returns:
            (fig, ax): Matplotlib figure and axes to allow further customization.
        """
        fig, ax = super().plot(
            include_outliers=include_outliers,
            ax=ax,
            plot_kwargs=plot_kwargs,
            legend_kwargs=legend_kwargs,
        )

        ax.set_title(f"{self.variable.capitalize()} at {self.location} ({self.sensor})")

        return fig, ax

__eq__(other)

Check equality based on location, sensor, variable, unit and sensor_alt.

Source code in gensor/core/timeseries.py
40
41
42
43
44
45
46
47
48
def __eq__(self, other: object) -> bool:
    """Check equality based on location, sensor, variable, unit and sensor_alt."""
    if not isinstance(other, Timeseries):
        return NotImplemented

    if not super().__eq__(other):
        return False

    return self.sensor == other.sensor and self.sensor_alt == other.sensor_alt

plot(include_outliers=False, ax=None, plot_kwargs=None, legend_kwargs=None)

Plots the timeseries data.

Parameters:

Name Type Description Default
include_outliers bool

Whether to include outliers in the plot.

False
ax Axes

Matplotlib axes object to plot on. If None, a new figure and axes are created.

None
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes to allow further customization.

Source code in gensor/core/timeseries.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def plot(
    self,
    include_outliers: bool = False,
    ax: Axes | None = None,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plots the timeseries data.

    Parameters:
        include_outliers (bool): Whether to include outliers in the plot.
        ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
            If None, a new figure and axes are created.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

    Returns:
        (fig, ax): Matplotlib figure and axes to allow further customization.
    """
    fig, ax = super().plot(
        include_outliers=include_outliers,
        ax=ax,
        plot_kwargs=plot_kwargs,
        legend_kwargs=legend_kwargs,
    )

    ax.set_title(f"{self.variable.capitalize()} at {self.location} ({self.sensor})")

    return fig, ax

compensate(raw, barometric, alignment_period='h', threshold_wc=None, fieldwork_dates=None, interpolate_method=None)

Constructor for the Comensator object.

Parameters:

Name Type Description Default
raw Timeseries | Dataset

Raw sensor timeseries

required
barometric Timeseries | float

Barometric pressure timeseries or a single float value. If a float value is provided, it is assumed to be in cmH2O.

required
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

'h'
threshold_wc float

The threshold for the absolute water column. If it is provided, the records below that threshold are dropped.

None
fieldwork_dates Dict[str, list]

Dictionary of location name and a list of fieldwork days. All records on the fieldwork day are set to None.

None
interpolate_method str

String representing the interpolate method as in pd.Series.interpolate() method.

None
Source code in gensor/processing/compensation.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def compensate(
    raw: Timeseries | Dataset,
    barometric: Timeseries | float,
    alignment_period: Literal[
        "D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"
    ] = "h",
    threshold_wc: float | None = None,
    fieldwork_dates: dict | None = None,
    interpolate_method: str | None = None,
) -> Timeseries | Dataset | None:
    """Constructor for the Comensator object.

    Parameters:
        raw (Timeseries | Dataset): Raw sensor timeseries
        barometric (Timeseries | float): Barometric pressure timeseries or a single
            float value. If a float value is provided, it is assumed to be in cmH2O.
        alignment_period (Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']): The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float): The threshold for the absolute water column. If it is
            provided, the records below that threshold are dropped.
        fieldwork_dates (Dict[str, list]): Dictionary of location name and a list of
            fieldwork days. All records on the fieldwork day are set to None.
        interpolate_method (str): String representing the interpolate method as in
            pd.Series.interpolate() method.
    """
    if fieldwork_dates is None:
        fieldwork_dates = {}

    def _compensate_one(
        raw: Timeseries, fieldwork_dates: list | None
    ) -> Timeseries | None:
        comp = Compensator(ts=raw, barometric=barometric)
        compensated = comp.compensate(
            alignment_period=alignment_period,
            threshold_wc=threshold_wc,
            fieldwork_dates=fieldwork_dates,
        )
        if compensated is not None and interpolate_method:
            # .interpolate() called on Timeseries object is wrapped to return a
            # Timeseries object from the original pandas.Series.interpolate().
            return compensated.interpolate(method=interpolate_method)  # type: ignore[no-any-return]

        else:
            return compensated

    if isinstance(raw, Timeseries):
        dates = fieldwork_dates.get(raw.location)
        return _compensate_one(raw, dates)

    elif isinstance(raw, Dataset):
        compensated_series = []
        for item in raw:
            dates = fieldwork_dates.get(item.location)
            compensated_series.append(_compensate_one(item, dates))

        return raw.model_copy(update={"timeseries": compensated_series}, deep=True)

read_from_csv(path, file_format='vanessen', **kwargs)

Loads the data from csv files with given file_format and returns a list of Timeseries objects.

Parameters:

Name Type Description Default
path Path

The path to the file or directory containing the files.

required
**kwargs dict

Optional keyword arguments passed to the parsers: * serial_number_pattern (str): The regex pattern to extract the serial number from the file. * location_pattern (str): The regex pattern to extract the station from the file. * col_names (list): The column names for the dataframe. * location (str): Name of the location of the timeseries. * sensor (str): Sensor serial number.

{}
Source code in gensor/io/read.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def read_from_csv(
    path: Path, file_format: Literal["vanessen", "plain"] = "vanessen", **kwargs: Any
) -> Dataset | Timeseries:
    """Loads the data from csv files with given file_format and returns a list of Timeseries objects.

    Parameters:
        path (Path): The path to the file or directory containing the files.
        **kwargs (dict): Optional keyword arguments passed to the parsers:
            * serial_number_pattern (str): The regex pattern to extract the serial number from the file.
            * location_pattern (str): The regex pattern to extract the station from the file.
            * col_names (list): The column names for the dataframe.
            * location (str): Name of the location of the timeseries.
            * sensor (str): Sensor serial number.
    """

    parsers = {
        "vanessen": parse_vanessen_csv,
        "plain": parse_plain,
        # more parser to be implemented
    }

    if not isinstance(path, Path):
        message = "The path argument must be a Path object."
        raise TypeError(message)

    if path.is_dir() and not any(
        file.is_file() and file.suffix.lower() == ".csv" for file in path.iterdir()
    ):
        logger.info("No CSV files found. Operation skipped.")
        return Dataset()

    files = (
        [
            file
            for file in path.iterdir()
            if file.is_file() and file.suffix.lower() == ".csv"
        ]
        if path.is_dir()
        else [path]
        if path.suffix.lower() == ".csv"
        else []
    )

    if not files:
        logger.info("No CSV files found. Operation skipped.")
        return Dataset()

    parser = parsers[file_format]

    ds: Dataset = Dataset()

    for f in files:
        logger.info(f"Loading file: {f}")
        ts_in_file = parser(f, **kwargs)
        ds.add(ts_in_file)

    # If there is only one Timeseries in Dataset (as in the condition), ds[0] will always
    # be a Timeseries; so the line below does not introduce potential None in the return
    return ds[0] if len(ds) == 1 else ds  # type: ignore[return-value]

read_from_sql(db, load_all=True, location=None, variable=None, unit=None, timestamp_start=None, timestamp_stop=None, **kwargs)

Returns the timeseries or a dataset from a SQL database.

Parameters:

Name Type Description Default
db DatabaseConnection

The database connection object.

required
load_all bool

Whether to load all timeseries from the database.

True
location str

The station name.

None
variable str

The measurement type.

None
unit str

The unit of the measurement.

None
timestamp_start Timestamp

Start timestamp filter.

None
timestamp_stop Timestamp

End timestamp filter.

None
**kwargs dict

Any additional filters matching attributes of the particular timeseries.

{}

Returns:

Name Type Description
Dataset Timeseries | Dataset

Dataset with retrieved objects or an empty Dataset.

Source code in gensor/io/read.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def read_from_sql(
    db: DatabaseConnection,
    load_all: bool = True,
    location: str | None = None,
    variable: str | None = None,
    unit: str | None = None,
    timestamp_start: pd.Timestamp | None = None,
    timestamp_stop: pd.Timestamp | None = None,
    **kwargs: dict,
) -> Timeseries | Dataset:
    """Returns the timeseries or a dataset from a SQL database.

    Parameters:
        db (DatabaseConnection): The database connection object.
        load_all (bool): Whether to load all timeseries from the database.
        location (str): The station name.
        variable (str): The measurement type.
        unit (str): The unit of the measurement.
        timestamp_start (pd.Timestamp, optional): Start timestamp filter.
        timestamp_stop (pd.Timestamp, optional): End timestamp filter.
        **kwargs (dict): Any additional filters matching attributes of the particular
            timeseries.

    Returns:
        Dataset: Dataset with retrieved objects or an empty Dataset.
    """

    def _read_data_from_schema(schema_name: str) -> Any:
        """Read data from the table and apply the timestamp filter.

        Parameters:
            schema_name (str): name of the schema in SQLite database.

        Returns:
            pd.Series: results of the query or an empty pd.Series if none are found.
        """
        with db as con:
            schema = db.metadata.tables[schema_name]
            data_query = select(schema)

            if timestamp_start or timestamp_stop:
                if timestamp_start:
                    data_query = data_query.where(schema.c.timestamp >= timestamp_start)
                if timestamp_stop:
                    data_query = data_query.where(schema.c.timestamp <= timestamp_stop)

            ts = pd.read_sql(
                data_query,
                con=con,
                parse_dates={"timestamp": "%Y-%m-%dT%H:%M:%S%z"},
                index_col="timestamp",
            ).squeeze()

        if ts.empty:
            message = f"No data found in table {schema_name}"
            logger.warning(message)

        return ts.sort_index()

    def _create_object(data: pd.Series, metadata: dict) -> Any:
        """Create the appropriate object for timeseries."""

        core_metadata = {
            "location": metadata["location"],
            "variable": metadata["variable"],
            "unit": metadata["unit"],
        }

        extra_metadata = metadata.get("extra", {})

        ts_metadata = {**core_metadata, **extra_metadata}

        cls = metadata["cls"]
        module_name, class_name = cls.rsplit(".", 1)
        module = import_module(module_name)

        TimeseriesClass = getattr(module, class_name)
        ts_object = TimeseriesClass(ts=data, **ts_metadata)

        return ts_object

    metadata_df = (
        db.get_timeseries_metadata(
            location=location, variable=variable, unit=unit, **kwargs
        )
        if not load_all
        else db.get_timeseries_metadata()
    )

    if metadata_df.empty:
        message = "No schemas matched the specified filters."
        raise ValueError(message)

    timeseries_list = []

    for row in metadata_df.to_dict(orient="records"):
        try:
            schema_name = row.pop("table_name")
            data = _read_data_from_schema(schema_name)
            timeseries_obj = _create_object(data, row)
            timeseries_list.append(timeseries_obj)
        except (ValueError, TypeError):
            logger.exception(f"Skipping schema {schema_name} due to error.")

    return Dataset(timeseries=timeseries_list) if timeseries_list else Dataset()

set_log_level(level)

Set the logging level for the package.

Source code in gensor/log.py
4
5
6
7
def set_log_level(level: str) -> None:
    """Set the logging level for the package."""
    logger = logging.getLogger("gensor")
    logger.setLevel(level.upper())

analysis

outliers

OutlierDetection

Detecting outliers in groundwater timeseries data.

Each method in this class returns a pandas.Series containing predicted outliers in the dataset.

Methods:

Name Description
iqr

Use interquartile range (IQR).

zscore

Use the z-score method.

isolation_forest

Using the isolation forest algorithm.

lof

Using the local outlier factor (LOF) method.

Source code in gensor/analysis/outliers.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class OutlierDetection:
    """Detecting outliers in groundwater timeseries data.

    Each method in this class returns a pandas.Series containing predicted outliers in
    the dataset.

    Methods:
        iqr: Use interquartile range (IQR).
        zscore: Use the z-score method.
        isolation_forest: Using the isolation forest algorithm.
        lof: Using the local outlier factor (LOF) method.
    """

    def __init__(
        self,
        data: Series,
        method: Literal["iqr", "zscore", "isolation_forest", "lof"],
        rolling: bool,
        window: int,
        **kwargs: Any,
    ) -> None:
        """Find outliers in a time series using the specified method, with an option for rolling window."""

        FUNCS: dict[str, Callable] = {
            "iqr": self.iqr,
            "zscore": self.zscore,
            "isolation_forest": self.isolation_forest,
            "lof": self.lof,
        }

        method_func = FUNCS[method]

        if method in ["iqr", "zscore"]:
            # For 'iqr' and 'zscore' methods
            y = (
                kwargs.get("k", 1.5)
                if method == "iqr"
                else kwargs.get("threshold", 3.0)
            )
            if rolling:
                roll = data.rolling(window=window)
                mask = roll.apply(lambda x: method_func(x, y, rolling=True), raw=True)
            else:
                mask = method_func(data.to_numpy(), y, rolling=False)

            bool_mask = mask.astype(bool)
            bool_mask_series = Series(bool_mask, index=data.index)
            self.outliers = data[bool_mask_series]

        else:
            # For 'isolation_forest' and 'lof' methods
            self.outliers = method_func(data, **kwargs)

    @staticmethod
    def iqr(data: np.ndarray, k: float, rolling: bool) -> np.ndarray:
        """Use interquartile range (IQR).

        Parameters:
            data (pandas.Series): The time series data.

        Keyword Args:
            k (float): The multiplier for the IQR to define the range. Defaults to 1.5.

        Returns:
            np.ndarray: Binary mask representing the outliers as 1.
        """

        Q1 = np.percentile(data, 0.25)
        Q3 = np.percentile(data, 0.75)
        IQR = Q3 - Q1

        lower_bound = Q1 - k * IQR
        upper_bound = Q3 + k * IQR

        if rolling:
            return (
                np.array([1])
                if (data[-1] < lower_bound or data[-1] > upper_bound)
                else np.array([0])
            )

        return np.where((data < lower_bound) | (data > upper_bound), 1, 0)

    @staticmethod
    def zscore(data: np.ndarray, threshold: float, rolling: bool) -> np.ndarray:
        """Use the z-score method.

        Parameters:
            data (pandas.Series): The time series data.

        Keyword Args:
            threshold (float): The threshold for the z-score method. Defaults to 3.0.

        Returns:
            pandas.Series: Binary mask representing outliers.
        """

        mean = np.mean(data)
        std_dev = np.std(data)

        z_scores = np.abs((data - mean) / std_dev)

        if rolling:
            return np.array([1]) if z_scores[-1] > threshold else np.array([0])
        return np.where(z_scores > threshold, 1, 0)

    def isolation_forest(self, data: Series, **kwargs: Any) -> Series:
        """Using the isolation forest algorithm.

        Parameters:
            data (pandas.Series): The time series data.

        Keyword Args:
            n_estimators (int): The number of base estimators in the ensemble. Defaults to 100.
            max_samples (int | 'auto' | float): The number of samples to draw from X to train each base estimator. Defaults to 'auto'.
            contamination (float): The proportion of outliers in the data. Defaults to 0.01.
            max_features (int | float): The number of features to draw from X to train each base estimator. Defaults to 1.0.
            bootstrap (bool): Whether to use bootstrapping when sampling the data. Defaults to False.
            n_jobs (int): The number of jobs to run in parallel. Defaults to 1.
            random_state (int | RandomState | None): The random state to use. Defaults to None.
            verbose (int): The verbosity level. Defaults to 0.
            warm_start (bool): Whether to reuse the solution of the previous call to fit and add more estimators to the ensemble. Defaults to False.

        Note:
            For details on kwargs see: sklearn.ensemble.IsolationForest.
        """

        X = data.to_numpy().reshape(-1, 1)

        clf = IsolationForest(**kwargs)
        clf.fit(X)

        is_outlier = clf.predict(X)
        outliers: Series = data[is_outlier == -1]

        return outliers

    def lof(self, data: Series, **kwargs: Any) -> Series:
        """Using the local outlier factor (LOF) method.

        Parameters:
            data (pandas.Series): The time series data.

        Keyword Args:
            n_neighbors (int): The number of neighbors to consider for each sample. Defaults to 20.
            algorithm (str): The algorithm to use. Either 'auto', 'ball_tree', 'kd_tree' or 'brute'. Defaults to 'auto'.
            leaf_size (int): The leaf size of the tree. Defaults to 30.
            metric (str): The distance metric to use. Defaults to 'minkowski'.
            p (int): The power parameter for the Minkowski metric. Defaults to 2.
            contamination (float): The proportion of outliers in the data. Defaults to 0.01.
            novelty (bool): Whether to consider the samples as normal or outliers. Defaults to False.
            n_jobs (int): The number of jobs to run in parallel. Defaults to 1.
        Note:
            For details on kwargs see: sklearn.neighbors.LocalOutlierFactor.
        """

        X = data.to_numpy().reshape(-1, 1)

        clf = LocalOutlierFactor(**kwargs)

        is_outlier = clf.fit_predict(X)
        outliers: Series = data[is_outlier == -1]

        return outliers
__init__(data, method, rolling, window, **kwargs)

Find outliers in a time series using the specified method, with an option for rolling window.

Source code in gensor/analysis/outliers.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    data: Series,
    method: Literal["iqr", "zscore", "isolation_forest", "lof"],
    rolling: bool,
    window: int,
    **kwargs: Any,
) -> None:
    """Find outliers in a time series using the specified method, with an option for rolling window."""

    FUNCS: dict[str, Callable] = {
        "iqr": self.iqr,
        "zscore": self.zscore,
        "isolation_forest": self.isolation_forest,
        "lof": self.lof,
    }

    method_func = FUNCS[method]

    if method in ["iqr", "zscore"]:
        # For 'iqr' and 'zscore' methods
        y = (
            kwargs.get("k", 1.5)
            if method == "iqr"
            else kwargs.get("threshold", 3.0)
        )
        if rolling:
            roll = data.rolling(window=window)
            mask = roll.apply(lambda x: method_func(x, y, rolling=True), raw=True)
        else:
            mask = method_func(data.to_numpy(), y, rolling=False)

        bool_mask = mask.astype(bool)
        bool_mask_series = Series(bool_mask, index=data.index)
        self.outliers = data[bool_mask_series]

    else:
        # For 'isolation_forest' and 'lof' methods
        self.outliers = method_func(data, **kwargs)
iqr(data, k, rolling) staticmethod

Use interquartile range (IQR).

Parameters:

Name Type Description Default
data Series

The time series data.

required

Other Parameters:

Name Type Description
k float

The multiplier for the IQR to define the range. Defaults to 1.5.

Returns:

Type Description
ndarray

np.ndarray: Binary mask representing the outliers as 1.

Source code in gensor/analysis/outliers.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@staticmethod
def iqr(data: np.ndarray, k: float, rolling: bool) -> np.ndarray:
    """Use interquartile range (IQR).

    Parameters:
        data (pandas.Series): The time series data.

    Keyword Args:
        k (float): The multiplier for the IQR to define the range. Defaults to 1.5.

    Returns:
        np.ndarray: Binary mask representing the outliers as 1.
    """

    Q1 = np.percentile(data, 0.25)
    Q3 = np.percentile(data, 0.75)
    IQR = Q3 - Q1

    lower_bound = Q1 - k * IQR
    upper_bound = Q3 + k * IQR

    if rolling:
        return (
            np.array([1])
            if (data[-1] < lower_bound or data[-1] > upper_bound)
            else np.array([0])
        )

    return np.where((data < lower_bound) | (data > upper_bound), 1, 0)
isolation_forest(data, **kwargs)

Using the isolation forest algorithm.

Parameters:

Name Type Description Default
data Series

The time series data.

required

Other Parameters:

Name Type Description
n_estimators int

The number of base estimators in the ensemble. Defaults to 100.

max_samples int | auto | float

The number of samples to draw from X to train each base estimator. Defaults to 'auto'.

contamination float

The proportion of outliers in the data. Defaults to 0.01.

max_features int | float

The number of features to draw from X to train each base estimator. Defaults to 1.0.

bootstrap bool

Whether to use bootstrapping when sampling the data. Defaults to False.

n_jobs int

The number of jobs to run in parallel. Defaults to 1.

random_state int | RandomState | None

The random state to use. Defaults to None.

verbose int

The verbosity level. Defaults to 0.

warm_start bool

Whether to reuse the solution of the previous call to fit and add more estimators to the ensemble. Defaults to False.

Note

For details on kwargs see: sklearn.ensemble.IsolationForest.

Source code in gensor/analysis/outliers.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def isolation_forest(self, data: Series, **kwargs: Any) -> Series:
    """Using the isolation forest algorithm.

    Parameters:
        data (pandas.Series): The time series data.

    Keyword Args:
        n_estimators (int): The number of base estimators in the ensemble. Defaults to 100.
        max_samples (int | 'auto' | float): The number of samples to draw from X to train each base estimator. Defaults to 'auto'.
        contamination (float): The proportion of outliers in the data. Defaults to 0.01.
        max_features (int | float): The number of features to draw from X to train each base estimator. Defaults to 1.0.
        bootstrap (bool): Whether to use bootstrapping when sampling the data. Defaults to False.
        n_jobs (int): The number of jobs to run in parallel. Defaults to 1.
        random_state (int | RandomState | None): The random state to use. Defaults to None.
        verbose (int): The verbosity level. Defaults to 0.
        warm_start (bool): Whether to reuse the solution of the previous call to fit and add more estimators to the ensemble. Defaults to False.

    Note:
        For details on kwargs see: sklearn.ensemble.IsolationForest.
    """

    X = data.to_numpy().reshape(-1, 1)

    clf = IsolationForest(**kwargs)
    clf.fit(X)

    is_outlier = clf.predict(X)
    outliers: Series = data[is_outlier == -1]

    return outliers
lof(data, **kwargs)

Using the local outlier factor (LOF) method.

Parameters:

Name Type Description Default
data Series

The time series data.

required

Other Parameters:

Name Type Description
n_neighbors int

The number of neighbors to consider for each sample. Defaults to 20.

algorithm str

The algorithm to use. Either 'auto', 'ball_tree', 'kd_tree' or 'brute'. Defaults to 'auto'.

leaf_size int

The leaf size of the tree. Defaults to 30.

metric str

The distance metric to use. Defaults to 'minkowski'.

p int

The power parameter for the Minkowski metric. Defaults to 2.

contamination float

The proportion of outliers in the data. Defaults to 0.01.

novelty bool

Whether to consider the samples as normal or outliers. Defaults to False.

n_jobs int

The number of jobs to run in parallel. Defaults to 1.

Note: For details on kwargs see: sklearn.neighbors.LocalOutlierFactor.

Source code in gensor/analysis/outliers.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def lof(self, data: Series, **kwargs: Any) -> Series:
    """Using the local outlier factor (LOF) method.

    Parameters:
        data (pandas.Series): The time series data.

    Keyword Args:
        n_neighbors (int): The number of neighbors to consider for each sample. Defaults to 20.
        algorithm (str): The algorithm to use. Either 'auto', 'ball_tree', 'kd_tree' or 'brute'. Defaults to 'auto'.
        leaf_size (int): The leaf size of the tree. Defaults to 30.
        metric (str): The distance metric to use. Defaults to 'minkowski'.
        p (int): The power parameter for the Minkowski metric. Defaults to 2.
        contamination (float): The proportion of outliers in the data. Defaults to 0.01.
        novelty (bool): Whether to consider the samples as normal or outliers. Defaults to False.
        n_jobs (int): The number of jobs to run in parallel. Defaults to 1.
    Note:
        For details on kwargs see: sklearn.neighbors.LocalOutlierFactor.
    """

    X = data.to_numpy().reshape(-1, 1)

    clf = LocalOutlierFactor(**kwargs)

    is_outlier = clf.fit_predict(X)
    outliers: Series = data[is_outlier == -1]

    return outliers
zscore(data, threshold, rolling) staticmethod

Use the z-score method.

Parameters:

Name Type Description Default
data Series

The time series data.

required

Other Parameters:

Name Type Description
threshold float

The threshold for the z-score method. Defaults to 3.0.

Returns:

Type Description
ndarray

pandas.Series: Binary mask representing outliers.

Source code in gensor/analysis/outliers.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@staticmethod
def zscore(data: np.ndarray, threshold: float, rolling: bool) -> np.ndarray:
    """Use the z-score method.

    Parameters:
        data (pandas.Series): The time series data.

    Keyword Args:
        threshold (float): The threshold for the z-score method. Defaults to 3.0.

    Returns:
        pandas.Series: Binary mask representing outliers.
    """

    mean = np.mean(data)
    std_dev = np.std(data)

    z_scores = np.abs((data - mean) / std_dev)

    if rolling:
        return np.array([1]) if z_scores[-1] > threshold else np.array([0])
    return np.where(z_scores > threshold, 1, 0)

stats

Module to compute timeseries statistics, similar to pastas.stats.signatures module and following Heudorfer et al. 2019

To be implemented:

  • Structure
  • Flashiness
  • Distribution
  • Modality
  • Density
  • Shape
  • Scale
  • Slope

config

Warning

Whenever Timeseries objects are created via read_from_csv and use a parser (e.g., 'vanessen'), the timestamps are localized and converted to UTC. Therefore, if the user creates his own timeseries outside the read_from_csv, they should ensure that the timestamps are in UTC format.

core

base

BaseTimeseries

Bases: BaseModel

Generic base class for timeseries with metadata.

Timeseries is a series of measurements of a single variable, in the same unit, from a single location with unique timestamps.

Attributes:

Name Type Description
ts Series

The timeseries data.

variable Literal['temperature', 'pressure', 'conductivity', 'flux']

The type of the measurement.

unit Literal['degC', 'mmH2O', 'mS/cm', 'm/s']

The unit of the measurement.

outliers Series

Measurements marked as outliers.

transformation Any

Metadata of transformation the timeseries undergone.

Methods:

Name Description
validate_ts

if the pd.Series is not exactly what is required, coerce.

Source code in gensor/core/base.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
class BaseTimeseries(pyd.BaseModel):
    """Generic base class for timeseries with metadata.

    Timeseries is a series of measurements of a single variable, in the same unit, from a
    single location with unique timestamps.

    Attributes:
        ts (pd.Series): The timeseries data.
        variable (Literal['temperature', 'pressure', 'conductivity', 'flux']):
            The type of the measurement.
        unit (Literal['degC', 'mmH2O', 'mS/cm', 'm/s']): The unit of
            the measurement.
        outliers (pd.Series): Measurements marked as outliers.
        transformation (Any): Metadata of transformation the timeseries undergone.

    Methods:
        validate_ts: if the pd.Series is not exactly what is required, coerce.
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    ts: pd.Series = pyd.Field(repr=False, exclude=True)
    variable: Literal[
        "temperature", "pressure", "conductivity", "flux", "head", "depth"
    ]
    unit: Literal["degc", "cmh2o", "ms/cm", "m/s", "m asl", "m"]
    location: str | None = None
    outliers: pd.Series | None = pyd.Field(default=None, repr=False, exclude=True)
    transformation: Any = pyd.Field(default=None, repr=False, exclude=True)

    @pyd.computed_field()  # type: ignore[prop-decorator]
    @property
    def start(self) -> pd.Timestamp | Any:
        return self.ts.index.min()

    @pyd.computed_field()  # type: ignore[prop-decorator]
    @property
    def end(self) -> pd.Timestamp | Any:
        return self.ts.index.max()

    @pyd.field_serializer("start", "end")
    def serialize_timestamps(self, value: pd.Timestamp | None) -> str | None:
        """Serialize `pd.Timestamp` to ISO format."""
        return value.strftime("%Y%m%d%H%M%S") if value is not None else None

    def __eq__(self, other: object) -> bool:
        """Check equality based on location, sensor, variable, unit and sensor_alt."""
        if not isinstance(other, BaseTimeseries):
            return NotImplemented

        return (
            self.variable == other.variable
            and self.unit == other.unit
            and self.location == other.location
        )

    def __getattr__(self, attr: Any) -> Any:
        """Delegate attribute access to the underlying pandas Series if it exists.

        Special handling is implemented for pandas indexer.
        """
        if attr == "loc":
            return TimeseriesIndexer(self, self.ts.loc)

        if attr == "iloc":
            return TimeseriesIndexer(self, self.ts.iloc)

        error_message = f"'{self.__class__.__name__}' object has no attribute '{attr}'"

        if hasattr(self.ts, attr):
            # Return a function to call on the `ts` if it's a method, otherwise return the attribute
            ts_attr = getattr(self.ts, attr)
            if callable(ts_attr):

                def wrapper(*args: Any, **kwargs: Any) -> Any:
                    result = ts_attr(*args, **kwargs)
                    # If the result is a Series, return a new Timeseries; otherwise, return the result
                    if isinstance(result, pd.Series):
                        return self.model_copy(
                            update={"ts": deepcopy(result)}, deep=True
                        )

                    return result

                return wrapper
            else:
                return ts_attr
        raise AttributeError(error_message)

    @pyd.field_validator("ts")
    def validate_ts(cls, v: pd.Series) -> pd.Series:
        validated_ts = ts_schema.validate(v)

        return validated_ts

    @pyd.field_validator("outliers")
    def validate_outliers(cls, v: pd.Series) -> pd.Series:
        if v is not None:
            return ts_schema.validate(v)
        return v

    def concatenate(self: T, other: T) -> T:
        """Concatenate two Timeseries objects if they are considered equal."""
        if not isinstance(other, type(self)):
            return NotImplemented

        if self == other:
            combined_ts = pd.concat([self.ts, other.ts]).sort_index()
            combined_ts = combined_ts[~combined_ts.index.duplicated(keep="first")]

            return self.model_copy(update={"ts": combined_ts})
        else:
            raise TimeseriesUnequal()

    def resample(
        self: T,
        freq: Any,
        agg_func: Any = pd.Series.mean,
        **resample_kwargs: Any,
    ) -> T:
        """Resample the timeseries to a new frequency with a specified
        aggregation function.

        Parameters:
            freq (Any): The offset string or object representing target conversion
                (e.g., 'D' for daily, 'W' for weekly).
            agg_func (Any): The aggregation function to apply
                after resampling. Defaults to pd.Series.mean.
            **resample_kwargs: Additional keyword arguments passed to the
                pandas.Series.resample method.

        Returns:
            Updated deep copy of the Timeseries object with the
                resampled timeseries data.
        """
        resampled_ts = self.ts.resample(freq, **resample_kwargs).apply(agg_func)

        return self.model_copy(update={"ts": resampled_ts}, deep=True)

    def transform(
        self: T,
        method: Literal[
            "difference",
            "log",
            "square_root",
            "box_cox",
            "standard_scaler",
            "minmax_scaler",
            "robust_scaler",
            "maxabs_scaler",
        ],
        **transformer_kwargs: Any,
    ) -> T:
        """Transforms the timeseries using the specified method.

        Parameters:
            method (str): The method to use for transformation ('minmax',
                'standard', 'robust').
            transformer_kwargs: Additional keyword arguments passed to the
                transformer definition. See gensor.preprocessing.

        Returns:
            Updated deep copy of the Timeseries object with the
                transformed timeseries data.
        """

        data, transformation = Transformation(
            self.ts, method, **transformer_kwargs
        ).get_transformation()

        return self.model_copy(
            update={"ts": data, "transformation": transformation}, deep=True
        )

    def detect_outliers(
        self: T,
        method: Literal["iqr", "zscore", "isolation_forest", "lof"],
        rolling: bool = False,
        window: int = 6,
        remove: bool = True,
        **kwargs: Any,
    ) -> T:
        """Detects outliers in the timeseries using the specified method.

        Parameters:
            method (Literal['iqr', 'zscore', 'isolation_forest', 'lof']): The
                method to use for outlier detection.
            **kwargs: Additional kewword arguments for OutlierDetection.

        Returns:
            Updated deep copy of the Timeseries object with outliers,
            optionally removed from the original timeseries.
        """
        self.outliers = OutlierDetection(
            self.ts, method, rolling, window, **kwargs
        ).outliers

        if remove:
            filtered_ts = self.ts.drop(self.outliers.index)
            return self.model_copy(update={"ts": filtered_ts}, deep=True)

        else:
            return self

    def mask_with(
        self: T, other: T | pd.Series, mode: Literal["keep", "remove"] = "remove"
    ) -> T:
        """
        Removes records not present in 'other' by index.

        Parameters:
            other (Timeseries): Another Timeseries whose indices are used to mask the current one.
            mode (Literal['keep', 'remove']):
                - 'keep': Retains only the overlapping data.
                - 'remove': Removes the overlapping data.

        Returns:
            Timeseries: A new Timeseries object with the filtered data.
        """
        if isinstance(other, pd.Series):
            mask = other
        elif isinstance(other, BaseTimeseries):
            mask = other.ts

        if mode == "keep":
            masked_data = self.ts[self.ts.index.isin(mask.index)]
        elif mode == "remove":
            masked_data = self.ts[~self.ts.index.isin(mask.index)]
        else:
            message = f"Invalid mode: {mode}. Use 'keep' or 'remove'."
            raise ValueError(message)

        return self.model_copy(update={"ts": masked_data}, deep=True)

    def to_sql(self: T, db: DatabaseConnection) -> str:
        """Converts the timeseries to a list of dictionaries and uploads it to the database.

        The Timeseries data is uploaded to the SQL database by using the pandas
        `to_sql` method. Additionally, metadata about the timeseries is stored in the
        'timeseries_metadata' table.

        Parameters:
            db (DatabaseConnection): The database connection object.

        Returns:
            str: A message indicating the number of rows inserted into the database.
        """

        def separate_metadata() -> tuple:
            _core_metadata_fields = {"location", "variable", "unit", "start", "end"}

            core_metadata = self.model_dump(include=_core_metadata_fields)
            core_metadata.update({
                "cls": f"{self.__module__}.{self.__class__.__name__}"
            })

            extra_metadata = self.model_dump(exclude=_core_metadata_fields)

            return core_metadata, extra_metadata

        timestamp_start_fmt = self.start.strftime("%Y%m%d%H%M%S")
        timestamp_end_fmt = self.end.strftime("%Y%m%d%H%M%S")

        # Ensure the index is a pandas DatetimeIndex
        if isinstance(self.ts.index, pd.DatetimeIndex):
            utc_index = (
                self.ts.index.tz_convert("UTC")
                if self.ts.index.tz is not None
                else self.ts.index
            )
        else:
            message = "The index is not a DatetimeIndex and cannot be converted to UTC."
            raise TypeError(message)

        series_as_records = list(
            zip(utc_index.strftime("%Y-%m-%dT%H:%M:%S%z"), self.ts, strict=False)
        )

        # Extra metadata are attributes additional to BaseTimeseries
        core_metadata, extra_metadata = separate_metadata()

        metadata_entry = {
            **core_metadata,
            "extra": extra_metadata,
        }

        created_table = db.get_timeseries_metadata(
            location=self.location,
            variable=self.variable,
            unit=self.unit,
            **extra_metadata,
        )

        with db as con:
            if created_table.empty:
                schema_name = f"{self.location}_{self.variable}_{self.unit}".lower()
                unique_hash = str(uuid.uuid4())[:5]
                schema_name = schema_name + f"_{unique_hash}"

                # Newly created data schema
                schema = db.create_table(schema_name, self.variable)
            else:
                # Existing data schema
                schema_name = created_table["table_name"].iloc[0]
                schema = db.metadata.tables[schema_name]

            metadata_schema = db.metadata.tables["__timeseries_metadata__"]
            metadata_entry.update({"table_name": schema_name})

            if isinstance(schema, Table):
                stmt = sqlite_insert(schema).values(series_as_records)
                stmt = stmt.on_conflict_do_nothing(index_elements=["timestamp"])
                con.execute(stmt)

                metadata_stmt = sqlite_insert(metadata_schema).values(metadata_entry)
                metadata_stmt = metadata_stmt.on_conflict_do_update(
                    index_elements=["table_name"],
                    set_={
                        "start": timestamp_start_fmt,
                        "end": timestamp_end_fmt,
                    },
                )
                con.execute(metadata_stmt)

            # Commit all changes at once
            con.commit()

        return f"{schema_name} table and metadata updated."

    def plot(
        self: T,
        include_outliers: bool = False,
        ax: Axes | None = None,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plots the timeseries data.

        Parameters:
            include_outliers (bool): Whether to include outliers in the plot.
            ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
                If None, a new figure and axes are created.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

        Returns:
            (fig, ax): Matplotlib figure and axes to allow further customization.
        """

        plot_kwargs = plot_kwargs or {}
        legend_kwargs = legend_kwargs or {}

        if ax is None:
            fig, ax = plt.subplots(figsize=(10, 5))
        else:
            # mypy complained that the get_figure() can return None, but there is no
            # situation here in which this could be the case.
            fig = ax.get_figure()  # type: ignore [assignment]

        ax.plot(
            self.ts.index,
            self.ts,
            label=f"{self.location}",
            **plot_kwargs,
        )

        if include_outliers and self.outliers is not None:
            ax.scatter(
                self.outliers.index, self.outliers, color="red", label="Outliers"
            )
        for label in ax.get_xticklabels():
            label.set_rotation(45)

        ax.set_xlabel("Time")
        ax.set_ylabel(f"{self.variable} ({self.unit})")
        ax.set_title(f"{self.variable.capitalize()} at {self.location}")

        ax.legend(**legend_kwargs)

        return fig, ax
__eq__(other)

Check equality based on location, sensor, variable, unit and sensor_alt.

Source code in gensor/core/base.py
78
79
80
81
82
83
84
85
86
87
def __eq__(self, other: object) -> bool:
    """Check equality based on location, sensor, variable, unit and sensor_alt."""
    if not isinstance(other, BaseTimeseries):
        return NotImplemented

    return (
        self.variable == other.variable
        and self.unit == other.unit
        and self.location == other.location
    )
__getattr__(attr)

Delegate attribute access to the underlying pandas Series if it exists.

Special handling is implemented for pandas indexer.

Source code in gensor/core/base.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __getattr__(self, attr: Any) -> Any:
    """Delegate attribute access to the underlying pandas Series if it exists.

    Special handling is implemented for pandas indexer.
    """
    if attr == "loc":
        return TimeseriesIndexer(self, self.ts.loc)

    if attr == "iloc":
        return TimeseriesIndexer(self, self.ts.iloc)

    error_message = f"'{self.__class__.__name__}' object has no attribute '{attr}'"

    if hasattr(self.ts, attr):
        # Return a function to call on the `ts` if it's a method, otherwise return the attribute
        ts_attr = getattr(self.ts, attr)
        if callable(ts_attr):

            def wrapper(*args: Any, **kwargs: Any) -> Any:
                result = ts_attr(*args, **kwargs)
                # If the result is a Series, return a new Timeseries; otherwise, return the result
                if isinstance(result, pd.Series):
                    return self.model_copy(
                        update={"ts": deepcopy(result)}, deep=True
                    )

                return result

            return wrapper
        else:
            return ts_attr
    raise AttributeError(error_message)
concatenate(other)

Concatenate two Timeseries objects if they are considered equal.

Source code in gensor/core/base.py
134
135
136
137
138
139
140
141
142
143
144
145
def concatenate(self: T, other: T) -> T:
    """Concatenate two Timeseries objects if they are considered equal."""
    if not isinstance(other, type(self)):
        return NotImplemented

    if self == other:
        combined_ts = pd.concat([self.ts, other.ts]).sort_index()
        combined_ts = combined_ts[~combined_ts.index.duplicated(keep="first")]

        return self.model_copy(update={"ts": combined_ts})
    else:
        raise TimeseriesUnequal()
detect_outliers(method, rolling=False, window=6, remove=True, **kwargs)

Detects outliers in the timeseries using the specified method.

Parameters:

Name Type Description Default
method Literal['iqr', 'zscore', 'isolation_forest', 'lof']

The method to use for outlier detection.

required
**kwargs Any

Additional kewword arguments for OutlierDetection.

{}

Returns:

Type Description
T

Updated deep copy of the Timeseries object with outliers,

T

optionally removed from the original timeseries.

Source code in gensor/core/base.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def detect_outliers(
    self: T,
    method: Literal["iqr", "zscore", "isolation_forest", "lof"],
    rolling: bool = False,
    window: int = 6,
    remove: bool = True,
    **kwargs: Any,
) -> T:
    """Detects outliers in the timeseries using the specified method.

    Parameters:
        method (Literal['iqr', 'zscore', 'isolation_forest', 'lof']): The
            method to use for outlier detection.
        **kwargs: Additional kewword arguments for OutlierDetection.

    Returns:
        Updated deep copy of the Timeseries object with outliers,
        optionally removed from the original timeseries.
    """
    self.outliers = OutlierDetection(
        self.ts, method, rolling, window, **kwargs
    ).outliers

    if remove:
        filtered_ts = self.ts.drop(self.outliers.index)
        return self.model_copy(update={"ts": filtered_ts}, deep=True)

    else:
        return self
mask_with(other, mode='remove')

Removes records not present in 'other' by index.

Parameters:

Name Type Description Default
other Timeseries

Another Timeseries whose indices are used to mask the current one.

required
mode Literal['keep', 'remove']
  • 'keep': Retains only the overlapping data.
  • 'remove': Removes the overlapping data.
'remove'

Returns:

Name Type Description
Timeseries T

A new Timeseries object with the filtered data.

Source code in gensor/core/base.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def mask_with(
    self: T, other: T | pd.Series, mode: Literal["keep", "remove"] = "remove"
) -> T:
    """
    Removes records not present in 'other' by index.

    Parameters:
        other (Timeseries): Another Timeseries whose indices are used to mask the current one.
        mode (Literal['keep', 'remove']):
            - 'keep': Retains only the overlapping data.
            - 'remove': Removes the overlapping data.

    Returns:
        Timeseries: A new Timeseries object with the filtered data.
    """
    if isinstance(other, pd.Series):
        mask = other
    elif isinstance(other, BaseTimeseries):
        mask = other.ts

    if mode == "keep":
        masked_data = self.ts[self.ts.index.isin(mask.index)]
    elif mode == "remove":
        masked_data = self.ts[~self.ts.index.isin(mask.index)]
    else:
        message = f"Invalid mode: {mode}. Use 'keep' or 'remove'."
        raise ValueError(message)

    return self.model_copy(update={"ts": masked_data}, deep=True)
plot(include_outliers=False, ax=None, plot_kwargs=None, legend_kwargs=None)

Plots the timeseries data.

Parameters:

Name Type Description Default
include_outliers bool

Whether to include outliers in the plot.

False
ax Axes

Matplotlib axes object to plot on. If None, a new figure and axes are created.

None
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes to allow further customization.

Source code in gensor/core/base.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def plot(
    self: T,
    include_outliers: bool = False,
    ax: Axes | None = None,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plots the timeseries data.

    Parameters:
        include_outliers (bool): Whether to include outliers in the plot.
        ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
            If None, a new figure and axes are created.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

    Returns:
        (fig, ax): Matplotlib figure and axes to allow further customization.
    """

    plot_kwargs = plot_kwargs or {}
    legend_kwargs = legend_kwargs or {}

    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 5))
    else:
        # mypy complained that the get_figure() can return None, but there is no
        # situation here in which this could be the case.
        fig = ax.get_figure()  # type: ignore [assignment]

    ax.plot(
        self.ts.index,
        self.ts,
        label=f"{self.location}",
        **plot_kwargs,
    )

    if include_outliers and self.outliers is not None:
        ax.scatter(
            self.outliers.index, self.outliers, color="red", label="Outliers"
        )
    for label in ax.get_xticklabels():
        label.set_rotation(45)

    ax.set_xlabel("Time")
    ax.set_ylabel(f"{self.variable} ({self.unit})")
    ax.set_title(f"{self.variable.capitalize()} at {self.location}")

    ax.legend(**legend_kwargs)

    return fig, ax
resample(freq, agg_func=pd.Series.mean, **resample_kwargs)

Resample the timeseries to a new frequency with a specified aggregation function.

Parameters:

Name Type Description Default
freq Any

The offset string or object representing target conversion (e.g., 'D' for daily, 'W' for weekly).

required
agg_func Any

The aggregation function to apply after resampling. Defaults to pd.Series.mean.

mean
**resample_kwargs Any

Additional keyword arguments passed to the pandas.Series.resample method.

{}

Returns:

Type Description
T

Updated deep copy of the Timeseries object with the resampled timeseries data.

Source code in gensor/core/base.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def resample(
    self: T,
    freq: Any,
    agg_func: Any = pd.Series.mean,
    **resample_kwargs: Any,
) -> T:
    """Resample the timeseries to a new frequency with a specified
    aggregation function.

    Parameters:
        freq (Any): The offset string or object representing target conversion
            (e.g., 'D' for daily, 'W' for weekly).
        agg_func (Any): The aggregation function to apply
            after resampling. Defaults to pd.Series.mean.
        **resample_kwargs: Additional keyword arguments passed to the
            pandas.Series.resample method.

    Returns:
        Updated deep copy of the Timeseries object with the
            resampled timeseries data.
    """
    resampled_ts = self.ts.resample(freq, **resample_kwargs).apply(agg_func)

    return self.model_copy(update={"ts": resampled_ts}, deep=True)
serialize_timestamps(value)

Serialize pd.Timestamp to ISO format.

Source code in gensor/core/base.py
73
74
75
76
@pyd.field_serializer("start", "end")
def serialize_timestamps(self, value: pd.Timestamp | None) -> str | None:
    """Serialize `pd.Timestamp` to ISO format."""
    return value.strftime("%Y%m%d%H%M%S") if value is not None else None
to_sql(db)

Converts the timeseries to a list of dictionaries and uploads it to the database.

The Timeseries data is uploaded to the SQL database by using the pandas to_sql method. Additionally, metadata about the timeseries is stored in the 'timeseries_metadata' table.

Parameters:

Name Type Description Default
db DatabaseConnection

The database connection object.

required

Returns:

Name Type Description
str str

A message indicating the number of rows inserted into the database.

Source code in gensor/core/base.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def to_sql(self: T, db: DatabaseConnection) -> str:
    """Converts the timeseries to a list of dictionaries and uploads it to the database.

    The Timeseries data is uploaded to the SQL database by using the pandas
    `to_sql` method. Additionally, metadata about the timeseries is stored in the
    'timeseries_metadata' table.

    Parameters:
        db (DatabaseConnection): The database connection object.

    Returns:
        str: A message indicating the number of rows inserted into the database.
    """

    def separate_metadata() -> tuple:
        _core_metadata_fields = {"location", "variable", "unit", "start", "end"}

        core_metadata = self.model_dump(include=_core_metadata_fields)
        core_metadata.update({
            "cls": f"{self.__module__}.{self.__class__.__name__}"
        })

        extra_metadata = self.model_dump(exclude=_core_metadata_fields)

        return core_metadata, extra_metadata

    timestamp_start_fmt = self.start.strftime("%Y%m%d%H%M%S")
    timestamp_end_fmt = self.end.strftime("%Y%m%d%H%M%S")

    # Ensure the index is a pandas DatetimeIndex
    if isinstance(self.ts.index, pd.DatetimeIndex):
        utc_index = (
            self.ts.index.tz_convert("UTC")
            if self.ts.index.tz is not None
            else self.ts.index
        )
    else:
        message = "The index is not a DatetimeIndex and cannot be converted to UTC."
        raise TypeError(message)

    series_as_records = list(
        zip(utc_index.strftime("%Y-%m-%dT%H:%M:%S%z"), self.ts, strict=False)
    )

    # Extra metadata are attributes additional to BaseTimeseries
    core_metadata, extra_metadata = separate_metadata()

    metadata_entry = {
        **core_metadata,
        "extra": extra_metadata,
    }

    created_table = db.get_timeseries_metadata(
        location=self.location,
        variable=self.variable,
        unit=self.unit,
        **extra_metadata,
    )

    with db as con:
        if created_table.empty:
            schema_name = f"{self.location}_{self.variable}_{self.unit}".lower()
            unique_hash = str(uuid.uuid4())[:5]
            schema_name = schema_name + f"_{unique_hash}"

            # Newly created data schema
            schema = db.create_table(schema_name, self.variable)
        else:
            # Existing data schema
            schema_name = created_table["table_name"].iloc[0]
            schema = db.metadata.tables[schema_name]

        metadata_schema = db.metadata.tables["__timeseries_metadata__"]
        metadata_entry.update({"table_name": schema_name})

        if isinstance(schema, Table):
            stmt = sqlite_insert(schema).values(series_as_records)
            stmt = stmt.on_conflict_do_nothing(index_elements=["timestamp"])
            con.execute(stmt)

            metadata_stmt = sqlite_insert(metadata_schema).values(metadata_entry)
            metadata_stmt = metadata_stmt.on_conflict_do_update(
                index_elements=["table_name"],
                set_={
                    "start": timestamp_start_fmt,
                    "end": timestamp_end_fmt,
                },
            )
            con.execute(metadata_stmt)

        # Commit all changes at once
        con.commit()

    return f"{schema_name} table and metadata updated."
transform(method, **transformer_kwargs)

Transforms the timeseries using the specified method.

Parameters:

Name Type Description Default
method str

The method to use for transformation ('minmax', 'standard', 'robust').

required
transformer_kwargs Any

Additional keyword arguments passed to the transformer definition. See gensor.preprocessing.

{}

Returns:

Type Description
T

Updated deep copy of the Timeseries object with the transformed timeseries data.

Source code in gensor/core/base.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def transform(
    self: T,
    method: Literal[
        "difference",
        "log",
        "square_root",
        "box_cox",
        "standard_scaler",
        "minmax_scaler",
        "robust_scaler",
        "maxabs_scaler",
    ],
    **transformer_kwargs: Any,
) -> T:
    """Transforms the timeseries using the specified method.

    Parameters:
        method (str): The method to use for transformation ('minmax',
            'standard', 'robust').
        transformer_kwargs: Additional keyword arguments passed to the
            transformer definition. See gensor.preprocessing.

    Returns:
        Updated deep copy of the Timeseries object with the
            transformed timeseries data.
    """

    data, transformation = Transformation(
        self.ts, method, **transformer_kwargs
    ).get_transformation()

    return self.model_copy(
        update={"ts": data, "transformation": transformation}, deep=True
    )

dataset

Dataset

Bases: BaseModel, Generic[T]

Store and operate on a collection of Timeseries.

Attributes:

Name Type Description
timeseries list[Timeseries]

A list of Timeseries objects.

Source code in gensor/core/dataset.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class Dataset(pyd.BaseModel, Generic[T]):
    """Store and operate on a collection of Timeseries.

    Attributes:
        timeseries (list[Timeseries]): A list of Timeseries objects.
    """

    timeseries: list[T | None] = pyd.Field(default_factory=list)

    def __iter__(self) -> Any:
        """Allows to iterate directly over the dataset."""
        return iter(self.timeseries)

    def __len__(self) -> int:
        """Gives the number of timeseries in the Dataset."""
        return len(self.timeseries)

    def __repr__(self) -> str:
        return f"Dataset({len(self)})"

    def __getitem__(self, index: int) -> T | None:
        """Retrieve a Timeseries object by its index in the dataset.

        !!! warning
            Using index will return the reference to the timeseries. If you need a copy,
            use .filter() instead of Dataset[index]

        Parameters:
            index (int): The index of the Timeseries to retrieve.

        Returns:
            Timeseries: The Timeseries object at the specified index.

        Raises:
            IndexError: If the index is out of range.
        """
        try:
            return self.timeseries[index]
        except IndexError:
            raise IndexOutOfRangeError(index, len(self)) from None

    def get_locations(self) -> list:
        """List all unique locations in the dataset."""
        return [ts.location for ts in self.timeseries if ts is not None]

    def add(self, other: T | list[T] | Dataset) -> Dataset:
        """Appends new Timeseries to the Dataset.

        If an equal Timeseries already exists, merge the new data into the existing
        Timeseries, dropping duplicate timestamps.

        Parameters:
            other (Timeseries): The Timeseries object to add.
        """

        # I need to check for BaseTimeseries instance in the add() method, but also
        # type hint VarType T.
        if isinstance(other, list | Dataset):
            for ts in other:
                if isinstance(ts, BaseTimeseries):
                    self._add_single_timeseries(ts)  # type: ignore[arg-type]

        elif isinstance(other, BaseTimeseries):
            self._add_single_timeseries(other)

        return self

    def _add_single_timeseries(self, ts: T) -> None:
        """Adds a single Timeseries to the Dataset or merges if an equal one exists."""
        for i, existing_ts in enumerate(self.timeseries):
            if existing_ts == ts:
                self.timeseries[i] = existing_ts.concatenate(ts)
                return

        self.timeseries.append(ts)

        return

    def filter(
        self,
        location: str | list | None = None,
        variable: str | list | None = None,
        unit: str | list | None = None,
        **kwargs: dict[str, str | list],
    ) -> T | Dataset:
        """Return a Timeseries or a new Dataset filtered by station, sensor,
        and/or variable.

        Parameters:
            location (Optional[str]): The location name.
            variable (Optional[str]): The variable being measured.
            unit (Optional[str]): Unit of the measurement.
            **kwargs (dict): Attributes of subclassed timeseries used for filtering
                (e.g., sensor, method).

        Returns:
            Timeseries | Dataset: A single Timeseries if exactly one match is found,
                                   or a new Dataset if multiple matches are found.
        """

        def matches(ts: T, attr: str, value: dict[str, str | list]) -> bool | None:
            """Check if the Timeseries object has the attribute and if it matches the value."""
            if not hasattr(ts, attr):
                message = f"'{ts.__class__.__name__}' object has no attribute '{attr}'"
                raise AttributeError(message)
            return getattr(ts, attr) in value

        if isinstance(location, str):
            location = [location]
        if isinstance(variable, str):
            variable = [variable]
        if isinstance(unit, str):
            unit = [unit]
        for key, value in kwargs.items():
            if isinstance(value, str):
                kwargs[key] = [value]

        matching_timeseries = [
            ts
            for ts in self.timeseries
            if ts is not None
            and (location is None or ts.location in location)
            and (variable is None or ts.variable in variable)
            and (unit is None or ts.unit in unit)
            and all(matches(ts, attr, value) for attr, value in kwargs.items())
        ]

        if not matching_timeseries:
            return Dataset()

        if len(matching_timeseries) == 1:
            return matching_timeseries[0].model_copy(deep=True)

        return self.model_copy(update={"timeseries": matching_timeseries})

    def to_sql(self, db: DatabaseConnection) -> None:
        """Save the entire timeseries to a SQLite database.

        Parameters:
            db (DatabaseConnection): SQLite database connection object.
        """
        for ts in self.timeseries:
            if ts:
                ts.to_sql(db)
        return

    def plot(
        self,
        include_outliers: bool = False,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plots the timeseries data, grouping by variable type.

        Parameters:
            include_outliers (bool): Whether to include outliers in the plot.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

        Returns:
            (fig, ax): Matplotlib figure and axes to allow further customization.
        """

        grouped_ts = defaultdict(list)

        for ts in self.timeseries:
            if ts:
                grouped_ts[ts.variable].append(ts)

        num_variables = len(grouped_ts)

        fig, axes = plt.subplots(
            num_variables, 1, figsize=(10, 5 * num_variables), sharex=True
        )

        if num_variables == 1:
            axes = [axes]

        for ax, (variable, ts_list) in zip(axes, grouped_ts.items(), strict=False):
            for ts in ts_list:
                ts.plot(
                    include_outliers=include_outliers,
                    ax=ax,
                    plot_kwargs=plot_kwargs,
                    legend_kwargs=legend_kwargs,
                )

            ax.set_title(f"Timeseries for {variable.capitalize()}")
            ax.set_xlabel("Time")

        fig.tight_layout()
        return fig, axes
__getitem__(index)

Retrieve a Timeseries object by its index in the dataset.

Warning

Using index will return the reference to the timeseries. If you need a copy, use .filter() instead of Dataset[index]

Parameters:

Name Type Description Default
index int

The index of the Timeseries to retrieve.

required

Returns:

Name Type Description
Timeseries T | None

The Timeseries object at the specified index.

Raises:

Type Description
IndexError

If the index is out of range.

Source code in gensor/core/dataset.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __getitem__(self, index: int) -> T | None:
    """Retrieve a Timeseries object by its index in the dataset.

    !!! warning
        Using index will return the reference to the timeseries. If you need a copy,
        use .filter() instead of Dataset[index]

    Parameters:
        index (int): The index of the Timeseries to retrieve.

    Returns:
        Timeseries: The Timeseries object at the specified index.

    Raises:
        IndexError: If the index is out of range.
    """
    try:
        return self.timeseries[index]
    except IndexError:
        raise IndexOutOfRangeError(index, len(self)) from None
__iter__()

Allows to iterate directly over the dataset.

Source code in gensor/core/dataset.py
25
26
27
def __iter__(self) -> Any:
    """Allows to iterate directly over the dataset."""
    return iter(self.timeseries)
__len__()

Gives the number of timeseries in the Dataset.

Source code in gensor/core/dataset.py
29
30
31
def __len__(self) -> int:
    """Gives the number of timeseries in the Dataset."""
    return len(self.timeseries)
add(other)

Appends new Timeseries to the Dataset.

If an equal Timeseries already exists, merge the new data into the existing Timeseries, dropping duplicate timestamps.

Parameters:

Name Type Description Default
other Timeseries

The Timeseries object to add.

required
Source code in gensor/core/dataset.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def add(self, other: T | list[T] | Dataset) -> Dataset:
    """Appends new Timeseries to the Dataset.

    If an equal Timeseries already exists, merge the new data into the existing
    Timeseries, dropping duplicate timestamps.

    Parameters:
        other (Timeseries): The Timeseries object to add.
    """

    # I need to check for BaseTimeseries instance in the add() method, but also
    # type hint VarType T.
    if isinstance(other, list | Dataset):
        for ts in other:
            if isinstance(ts, BaseTimeseries):
                self._add_single_timeseries(ts)  # type: ignore[arg-type]

    elif isinstance(other, BaseTimeseries):
        self._add_single_timeseries(other)

    return self
filter(location=None, variable=None, unit=None, **kwargs)

Return a Timeseries or a new Dataset filtered by station, sensor, and/or variable.

Parameters:

Name Type Description Default
location Optional[str]

The location name.

None
variable Optional[str]

The variable being measured.

None
unit Optional[str]

Unit of the measurement.

None
**kwargs dict

Attributes of subclassed timeseries used for filtering (e.g., sensor, method).

{}

Returns:

Type Description
T | Dataset

Timeseries | Dataset: A single Timeseries if exactly one match is found, or a new Dataset if multiple matches are found.

Source code in gensor/core/dataset.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def filter(
    self,
    location: str | list | None = None,
    variable: str | list | None = None,
    unit: str | list | None = None,
    **kwargs: dict[str, str | list],
) -> T | Dataset:
    """Return a Timeseries or a new Dataset filtered by station, sensor,
    and/or variable.

    Parameters:
        location (Optional[str]): The location name.
        variable (Optional[str]): The variable being measured.
        unit (Optional[str]): Unit of the measurement.
        **kwargs (dict): Attributes of subclassed timeseries used for filtering
            (e.g., sensor, method).

    Returns:
        Timeseries | Dataset: A single Timeseries if exactly one match is found,
                               or a new Dataset if multiple matches are found.
    """

    def matches(ts: T, attr: str, value: dict[str, str | list]) -> bool | None:
        """Check if the Timeseries object has the attribute and if it matches the value."""
        if not hasattr(ts, attr):
            message = f"'{ts.__class__.__name__}' object has no attribute '{attr}'"
            raise AttributeError(message)
        return getattr(ts, attr) in value

    if isinstance(location, str):
        location = [location]
    if isinstance(variable, str):
        variable = [variable]
    if isinstance(unit, str):
        unit = [unit]
    for key, value in kwargs.items():
        if isinstance(value, str):
            kwargs[key] = [value]

    matching_timeseries = [
        ts
        for ts in self.timeseries
        if ts is not None
        and (location is None or ts.location in location)
        and (variable is None or ts.variable in variable)
        and (unit is None or ts.unit in unit)
        and all(matches(ts, attr, value) for attr, value in kwargs.items())
    ]

    if not matching_timeseries:
        return Dataset()

    if len(matching_timeseries) == 1:
        return matching_timeseries[0].model_copy(deep=True)

    return self.model_copy(update={"timeseries": matching_timeseries})
get_locations()

List all unique locations in the dataset.

Source code in gensor/core/dataset.py
57
58
59
def get_locations(self) -> list:
    """List all unique locations in the dataset."""
    return [ts.location for ts in self.timeseries if ts is not None]
plot(include_outliers=False, plot_kwargs=None, legend_kwargs=None)

Plots the timeseries data, grouping by variable type.

Parameters:

Name Type Description Default
include_outliers bool

Whether to include outliers in the plot.

False
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes to allow further customization.

Source code in gensor/core/dataset.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def plot(
    self,
    include_outliers: bool = False,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plots the timeseries data, grouping by variable type.

    Parameters:
        include_outliers (bool): Whether to include outliers in the plot.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

    Returns:
        (fig, ax): Matplotlib figure and axes to allow further customization.
    """

    grouped_ts = defaultdict(list)

    for ts in self.timeseries:
        if ts:
            grouped_ts[ts.variable].append(ts)

    num_variables = len(grouped_ts)

    fig, axes = plt.subplots(
        num_variables, 1, figsize=(10, 5 * num_variables), sharex=True
    )

    if num_variables == 1:
        axes = [axes]

    for ax, (variable, ts_list) in zip(axes, grouped_ts.items(), strict=False):
        for ts in ts_list:
            ts.plot(
                include_outliers=include_outliers,
                ax=ax,
                plot_kwargs=plot_kwargs,
                legend_kwargs=legend_kwargs,
            )

        ax.set_title(f"Timeseries for {variable.capitalize()}")
        ax.set_xlabel("Time")

    fig.tight_layout()
    return fig, axes
to_sql(db)

Save the entire timeseries to a SQLite database.

Parameters:

Name Type Description Default
db DatabaseConnection

SQLite database connection object.

required
Source code in gensor/core/dataset.py
151
152
153
154
155
156
157
158
159
160
def to_sql(self, db: DatabaseConnection) -> None:
    """Save the entire timeseries to a SQLite database.

    Parameters:
        db (DatabaseConnection): SQLite database connection object.
    """
    for ts in self.timeseries:
        if ts:
            ts.to_sql(db)
    return

indexer

TimeseriesIndexer

A wrapper for the Pandas indexers (e.g., loc, iloc) to return Timeseries objects.

Source code in gensor/core/indexer.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class TimeseriesIndexer:
    """A wrapper for the Pandas indexers (e.g., loc, iloc) to return Timeseries objects."""

    # marked indexer as Any to silence mypy. BaseIndexer is normally not indexable:
    # the same for the `parent`. It should by always type Timeseries, but I don't want
    # to deal with circular imports just for type hints for the devs...

    def __init__(self, parent: Any, indexer: Any):
        self.parent = parent
        self.indexer = indexer

    def __getitem__(self, key: str) -> Any:
        """Allows using the indexer (e.g., loc) and wraps the result in the parent Timeseries."""

        result = self.indexer[key]

        if isinstance(result, pd.Series):
            return self.parent.model_copy(update={"ts": result}, deep=False)

        if isinstance(result, (int | float | str | pd.Timestamp | np.float64)):
            return result

        message = f"Expected pd.Series, but got {type(result)} instead."
        raise TypeError(message)

    def __setitem__(self, key: str, value: Any) -> None:
        """Allows setting values directly using the indexer (e.g., loc, iloc)."""

        self.indexer[key] = value
__getitem__(key)

Allows using the indexer (e.g., loc) and wraps the result in the parent Timeseries.

Source code in gensor/core/indexer.py
20
21
22
23
24
25
26
27
28
29
30
31
32
def __getitem__(self, key: str) -> Any:
    """Allows using the indexer (e.g., loc) and wraps the result in the parent Timeseries."""

    result = self.indexer[key]

    if isinstance(result, pd.Series):
        return self.parent.model_copy(update={"ts": result}, deep=False)

    if isinstance(result, (int | float | str | pd.Timestamp | np.float64)):
        return result

    message = f"Expected pd.Series, but got {type(result)} instead."
    raise TypeError(message)
__setitem__(key, value)

Allows setting values directly using the indexer (e.g., loc, iloc).

Source code in gensor/core/indexer.py
34
35
36
37
def __setitem__(self, key: str, value: Any) -> None:
    """Allows setting values directly using the indexer (e.g., loc, iloc)."""

    self.indexer[key] = value

timeseries

Timeseries

Bases: BaseTimeseries

Timeseries of groundwater sensor data.

Attributes:

Name Type Description
ts Series

The timeseries data.

variable Literal['temperature', 'pressure', 'conductivity', 'flux']

The type of the measurement.

unit Literal['degC', 'mmH2O', 'mS/cm', 'm/s']

The unit of the measurement.

sensor str

The serial number of the sensor.

sensor_alt float

Altitude of the sensor (ncessary to compute groundwater levels).

Source code in gensor/core/timeseries.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Timeseries(BaseTimeseries):
    """Timeseries of groundwater sensor data.

    Attributes:
        ts (pd.Series): The timeseries data.
        variable (Literal['temperature', 'pressure', 'conductivity', 'flux']):
            The type of the measurement.
        unit (Literal['degC', 'mmH2O', 'mS/cm', 'm/s']): The unit of
            the measurement.
        sensor (str): The serial number of the sensor.
        sensor_alt (float): Altitude of the sensor (ncessary to compute groundwater levels).
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    sensor: str | None = None
    sensor_alt: float | None = None

    def __eq__(self, other: object) -> bool:
        """Check equality based on location, sensor, variable, unit and sensor_alt."""
        if not isinstance(other, Timeseries):
            return NotImplemented

        if not super().__eq__(other):
            return False

        return self.sensor == other.sensor and self.sensor_alt == other.sensor_alt

    def plot(
        self,
        include_outliers: bool = False,
        ax: Axes | None = None,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plots the timeseries data.

        Parameters:
            include_outliers (bool): Whether to include outliers in the plot.
            ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
                If None, a new figure and axes are created.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

        Returns:
            (fig, ax): Matplotlib figure and axes to allow further customization.
        """
        fig, ax = super().plot(
            include_outliers=include_outliers,
            ax=ax,
            plot_kwargs=plot_kwargs,
            legend_kwargs=legend_kwargs,
        )

        ax.set_title(f"{self.variable.capitalize()} at {self.location} ({self.sensor})")

        return fig, ax
__eq__(other)

Check equality based on location, sensor, variable, unit and sensor_alt.

Source code in gensor/core/timeseries.py
40
41
42
43
44
45
46
47
48
def __eq__(self, other: object) -> bool:
    """Check equality based on location, sensor, variable, unit and sensor_alt."""
    if not isinstance(other, Timeseries):
        return NotImplemented

    if not super().__eq__(other):
        return False

    return self.sensor == other.sensor and self.sensor_alt == other.sensor_alt
plot(include_outliers=False, ax=None, plot_kwargs=None, legend_kwargs=None)

Plots the timeseries data.

Parameters:

Name Type Description Default
include_outliers bool

Whether to include outliers in the plot.

False
ax Axes

Matplotlib axes object to plot on. If None, a new figure and axes are created.

None
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes to allow further customization.

Source code in gensor/core/timeseries.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def plot(
    self,
    include_outliers: bool = False,
    ax: Axes | None = None,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plots the timeseries data.

    Parameters:
        include_outliers (bool): Whether to include outliers in the plot.
        ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
            If None, a new figure and axes are created.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

    Returns:
        (fig, ax): Matplotlib figure and axes to allow further customization.
    """
    fig, ax = super().plot(
        include_outliers=include_outliers,
        ax=ax,
        plot_kwargs=plot_kwargs,
        legend_kwargs=legend_kwargs,
    )

    ax.set_title(f"{self.variable.capitalize()} at {self.location} ({self.sensor})")

    return fig, ax

db

DB

Module handling database connection in case saving and loading from SQLite database is used.

Modules:

connection.py

DatabaseConnection

Bases: BaseModel

Database connection object. If no database exists at the specified path, it will be created. If no database is specified, an in-memory database will be used.

Attributes metadata (MetaData): SQLAlchemy metadata object. db_directory (Path): Path to the database to connect to. db_name (str): Name for the database to connect to. engine (Engine | None): SQLAlchemy Engine instance.

Source code in gensor/db/connection.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class DatabaseConnection(pyd.BaseModel):
    """Database connection object.
    If no database exists at the specified path, it will be created.
    If no database is specified, an in-memory database will be used.

    Attributes
        metadata (MetaData): SQLAlchemy metadata object.
        db_directory (Path): Path to the database to connect to.
        db_name (str): Name for the database to connect to.
        engine (Engine | None): SQLAlchemy Engine instance.
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    metadata: MetaData = MetaData()
    db_directory: Path = Path.cwd()
    db_name: str = "gensor.db"
    engine: Engine | None = None

    def _verify_path(self) -> str:
        """Verify database path."""

        if not self.db_directory.exists():
            raise DatabaseNotFound()
        return f"sqlite:///{self.db_directory}/{self.db_name}"

    def connect(self) -> Connection:
        """Connect to the database and initialize the engine.
        If engine is None > create it with verified path > reflect.
        After connecting, ensure the timeseries_metadata table is present.
        """
        if self.engine is None:
            sqlite_path = self._verify_path()
            self.engine = create_engine(sqlite_path)

        connection = self.engine.connect()

        self.create_metadata()

        return connection

    def dispose(self) -> None:
        """Dispose of the engine, closing all connections."""
        if self.metadata:
            self.metadata.clear()
        if self.engine:
            self.engine.dispose()

    def __enter__(self) -> Connection:
        """Enable usage in a `with` block by returning the engine."""
        con = self.connect()
        if self.engine:
            self.metadata.reflect(bind=self.engine)
        return con

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Dispose of the engine when exiting the `with` block."""
        self.dispose()

    def get_timeseries_metadata(
        self,
        location: str | None = None,
        variable: str | None = None,
        unit: str | None = None,
        **kwargs: dict,
    ) -> pd.DataFrame:
        """
        List timeseries available in the database.

        Parameters:
            location (str): Location attribute to match.
            variable (str): Variable attribute to match.
            unit (str): Unit attribute to match.
            **kwargs: Additional filters. Must match the attributes of the
                Timeseries instance user is trying to retrieve.

        Returns:
            pd.DataFrame: The name of the matching table or None if no table is found.
        """
        with self as con:
            if "__timeseries_metadata__" not in self.metadata.tables:
                logger.info("The metadata table does not exist in this database.")
                return pd.DataFrame()

            metadata_table = self.metadata.tables["__timeseries_metadata__"]

            base_filters = []

            if location is not None:
                base_filters.append(metadata_table.c.location.ilike(location))
            if variable is not None:
                base_filters.append(metadata_table.c.variable.ilike(variable))
            if unit is not None:
                base_filters.append(metadata_table.c.unit.ilike(unit))

            extra_filters = [
                func.json_extract(metadata_table.c.extra, f"$.{k}").ilike(v)
                for k, v in kwargs.items()
                if v is not None
            ]

            # True in and_(True, *arg) fixis FutureWarning of dissallowing empty
            # filters in the future.
            query = metadata_table.select().where(
                and_(True, *base_filters, *extra_filters)
            )

            result = con.execute(query).fetchall()

            return pd.DataFrame(result).set_index("id") if result else pd.DataFrame()

    def create_metadata(self) -> Table | None:
        """Create a metadata table if it doesn't exist yet and store ts metadata."""

        metadata_table = Table(
            "__timeseries_metadata__",
            self.metadata,
            Column("id", Integer, primary_key=True),
            Column("table_name", String, unique=True),
            Column("location", String),
            Column("variable", String),
            Column("unit", String),
            Column("start", String, nullable=True),
            Column("end", String, nullable=True),
            Column("extra", JSON, nullable=True),
            Column("cls", String, nullable=False),
        )

        if self.engine:
            metadata_table.create(self.engine, checkfirst=True)
            self.metadata.reflect(bind=self.engine)
            return metadata_table
        else:
            logger.info("Engine does not exist.")
            return None

    def create_table(self, schema_name: str, column_name: str) -> Table | None:
        """Create a table in the database.

        Schema name is a string representing the location, sensor, variable measured and
        unit of measurement. This is a way of preserving the metadata of the Timeseries.
        The index is always `timestamp` and the column name is dynamicly create from
        the measured variable.
        """

        if schema_name in self.metadata.tables:
            return self.metadata.tables[schema_name]

        ts_table = Table(
            schema_name,
            self.metadata,
            Column("timestamp", String, primary_key=True),
            Column(column_name, Float),
            info={},
        )

        if self.engine:
            ts_table.create(self.engine, checkfirst=True)
            self.metadata.reflect(bind=self.engine)
            return ts_table
        else:
            logger.info("Engine does not exist.")
            return None

__enter__()

Enable usage in a with block by returning the engine.

Source code in gensor/db/connection.py
83
84
85
86
87
88
def __enter__(self) -> Connection:
    """Enable usage in a `with` block by returning the engine."""
    con = self.connect()
    if self.engine:
        self.metadata.reflect(bind=self.engine)
    return con

__exit__(exc_type, exc_val, exc_tb)

Dispose of the engine when exiting the with block.

Source code in gensor/db/connection.py
90
91
92
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Dispose of the engine when exiting the `with` block."""
    self.dispose()

connect()

Connect to the database and initialize the engine. If engine is None > create it with verified path > reflect. After connecting, ensure the timeseries_metadata table is present.

Source code in gensor/db/connection.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def connect(self) -> Connection:
    """Connect to the database and initialize the engine.
    If engine is None > create it with verified path > reflect.
    After connecting, ensure the timeseries_metadata table is present.
    """
    if self.engine is None:
        sqlite_path = self._verify_path()
        self.engine = create_engine(sqlite_path)

    connection = self.engine.connect()

    self.create_metadata()

    return connection

create_metadata()

Create a metadata table if it doesn't exist yet and store ts metadata.

Source code in gensor/db/connection.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def create_metadata(self) -> Table | None:
    """Create a metadata table if it doesn't exist yet and store ts metadata."""

    metadata_table = Table(
        "__timeseries_metadata__",
        self.metadata,
        Column("id", Integer, primary_key=True),
        Column("table_name", String, unique=True),
        Column("location", String),
        Column("variable", String),
        Column("unit", String),
        Column("start", String, nullable=True),
        Column("end", String, nullable=True),
        Column("extra", JSON, nullable=True),
        Column("cls", String, nullable=False),
    )

    if self.engine:
        metadata_table.create(self.engine, checkfirst=True)
        self.metadata.reflect(bind=self.engine)
        return metadata_table
    else:
        logger.info("Engine does not exist.")
        return None

create_table(schema_name, column_name)

Create a table in the database.

Schema name is a string representing the location, sensor, variable measured and unit of measurement. This is a way of preserving the metadata of the Timeseries. The index is always timestamp and the column name is dynamicly create from the measured variable.

Source code in gensor/db/connection.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def create_table(self, schema_name: str, column_name: str) -> Table | None:
    """Create a table in the database.

    Schema name is a string representing the location, sensor, variable measured and
    unit of measurement. This is a way of preserving the metadata of the Timeseries.
    The index is always `timestamp` and the column name is dynamicly create from
    the measured variable.
    """

    if schema_name in self.metadata.tables:
        return self.metadata.tables[schema_name]

    ts_table = Table(
        schema_name,
        self.metadata,
        Column("timestamp", String, primary_key=True),
        Column(column_name, Float),
        info={},
    )

    if self.engine:
        ts_table.create(self.engine, checkfirst=True)
        self.metadata.reflect(bind=self.engine)
        return ts_table
    else:
        logger.info("Engine does not exist.")
        return None

dispose()

Dispose of the engine, closing all connections.

Source code in gensor/db/connection.py
76
77
78
79
80
81
def dispose(self) -> None:
    """Dispose of the engine, closing all connections."""
    if self.metadata:
        self.metadata.clear()
    if self.engine:
        self.engine.dispose()

get_timeseries_metadata(location=None, variable=None, unit=None, **kwargs)

List timeseries available in the database.

Parameters:

Name Type Description Default
location str

Location attribute to match.

None
variable str

Variable attribute to match.

None
unit str

Unit attribute to match.

None
**kwargs dict

Additional filters. Must match the attributes of the Timeseries instance user is trying to retrieve.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The name of the matching table or None if no table is found.

Source code in gensor/db/connection.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def get_timeseries_metadata(
    self,
    location: str | None = None,
    variable: str | None = None,
    unit: str | None = None,
    **kwargs: dict,
) -> pd.DataFrame:
    """
    List timeseries available in the database.

    Parameters:
        location (str): Location attribute to match.
        variable (str): Variable attribute to match.
        unit (str): Unit attribute to match.
        **kwargs: Additional filters. Must match the attributes of the
            Timeseries instance user is trying to retrieve.

    Returns:
        pd.DataFrame: The name of the matching table or None if no table is found.
    """
    with self as con:
        if "__timeseries_metadata__" not in self.metadata.tables:
            logger.info("The metadata table does not exist in this database.")
            return pd.DataFrame()

        metadata_table = self.metadata.tables["__timeseries_metadata__"]

        base_filters = []

        if location is not None:
            base_filters.append(metadata_table.c.location.ilike(location))
        if variable is not None:
            base_filters.append(metadata_table.c.variable.ilike(variable))
        if unit is not None:
            base_filters.append(metadata_table.c.unit.ilike(unit))

        extra_filters = [
            func.json_extract(metadata_table.c.extra, f"$.{k}").ilike(v)
            for k, v in kwargs.items()
            if v is not None
        ]

        # True in and_(True, *arg) fixis FutureWarning of dissallowing empty
        # filters in the future.
        query = metadata_table.select().where(
            and_(True, *base_filters, *extra_filters)
        )

        result = con.execute(query).fetchall()

        return pd.DataFrame(result).set_index("id") if result else pd.DataFrame()

connection

Module defining database connection object.

Classes:

Name Description
DatabaseConnection

Database connection object

DatabaseConnection

Bases: BaseModel

Database connection object. If no database exists at the specified path, it will be created. If no database is specified, an in-memory database will be used.

Attributes metadata (MetaData): SQLAlchemy metadata object. db_directory (Path): Path to the database to connect to. db_name (str): Name for the database to connect to. engine (Engine | None): SQLAlchemy Engine instance.

Source code in gensor/db/connection.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class DatabaseConnection(pyd.BaseModel):
    """Database connection object.
    If no database exists at the specified path, it will be created.
    If no database is specified, an in-memory database will be used.

    Attributes
        metadata (MetaData): SQLAlchemy metadata object.
        db_directory (Path): Path to the database to connect to.
        db_name (str): Name for the database to connect to.
        engine (Engine | None): SQLAlchemy Engine instance.
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    metadata: MetaData = MetaData()
    db_directory: Path = Path.cwd()
    db_name: str = "gensor.db"
    engine: Engine | None = None

    def _verify_path(self) -> str:
        """Verify database path."""

        if not self.db_directory.exists():
            raise DatabaseNotFound()
        return f"sqlite:///{self.db_directory}/{self.db_name}"

    def connect(self) -> Connection:
        """Connect to the database and initialize the engine.
        If engine is None > create it with verified path > reflect.
        After connecting, ensure the timeseries_metadata table is present.
        """
        if self.engine is None:
            sqlite_path = self._verify_path()
            self.engine = create_engine(sqlite_path)

        connection = self.engine.connect()

        self.create_metadata()

        return connection

    def dispose(self) -> None:
        """Dispose of the engine, closing all connections."""
        if self.metadata:
            self.metadata.clear()
        if self.engine:
            self.engine.dispose()

    def __enter__(self) -> Connection:
        """Enable usage in a `with` block by returning the engine."""
        con = self.connect()
        if self.engine:
            self.metadata.reflect(bind=self.engine)
        return con

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Dispose of the engine when exiting the `with` block."""
        self.dispose()

    def get_timeseries_metadata(
        self,
        location: str | None = None,
        variable: str | None = None,
        unit: str | None = None,
        **kwargs: dict,
    ) -> pd.DataFrame:
        """
        List timeseries available in the database.

        Parameters:
            location (str): Location attribute to match.
            variable (str): Variable attribute to match.
            unit (str): Unit attribute to match.
            **kwargs: Additional filters. Must match the attributes of the
                Timeseries instance user is trying to retrieve.

        Returns:
            pd.DataFrame: The name of the matching table or None if no table is found.
        """
        with self as con:
            if "__timeseries_metadata__" not in self.metadata.tables:
                logger.info("The metadata table does not exist in this database.")
                return pd.DataFrame()

            metadata_table = self.metadata.tables["__timeseries_metadata__"]

            base_filters = []

            if location is not None:
                base_filters.append(metadata_table.c.location.ilike(location))
            if variable is not None:
                base_filters.append(metadata_table.c.variable.ilike(variable))
            if unit is not None:
                base_filters.append(metadata_table.c.unit.ilike(unit))

            extra_filters = [
                func.json_extract(metadata_table.c.extra, f"$.{k}").ilike(v)
                for k, v in kwargs.items()
                if v is not None
            ]

            # True in and_(True, *arg) fixis FutureWarning of dissallowing empty
            # filters in the future.
            query = metadata_table.select().where(
                and_(True, *base_filters, *extra_filters)
            )

            result = con.execute(query).fetchall()

            return pd.DataFrame(result).set_index("id") if result else pd.DataFrame()

    def create_metadata(self) -> Table | None:
        """Create a metadata table if it doesn't exist yet and store ts metadata."""

        metadata_table = Table(
            "__timeseries_metadata__",
            self.metadata,
            Column("id", Integer, primary_key=True),
            Column("table_name", String, unique=True),
            Column("location", String),
            Column("variable", String),
            Column("unit", String),
            Column("start", String, nullable=True),
            Column("end", String, nullable=True),
            Column("extra", JSON, nullable=True),
            Column("cls", String, nullable=False),
        )

        if self.engine:
            metadata_table.create(self.engine, checkfirst=True)
            self.metadata.reflect(bind=self.engine)
            return metadata_table
        else:
            logger.info("Engine does not exist.")
            return None

    def create_table(self, schema_name: str, column_name: str) -> Table | None:
        """Create a table in the database.

        Schema name is a string representing the location, sensor, variable measured and
        unit of measurement. This is a way of preserving the metadata of the Timeseries.
        The index is always `timestamp` and the column name is dynamicly create from
        the measured variable.
        """

        if schema_name in self.metadata.tables:
            return self.metadata.tables[schema_name]

        ts_table = Table(
            schema_name,
            self.metadata,
            Column("timestamp", String, primary_key=True),
            Column(column_name, Float),
            info={},
        )

        if self.engine:
            ts_table.create(self.engine, checkfirst=True)
            self.metadata.reflect(bind=self.engine)
            return ts_table
        else:
            logger.info("Engine does not exist.")
            return None
__enter__()

Enable usage in a with block by returning the engine.

Source code in gensor/db/connection.py
83
84
85
86
87
88
def __enter__(self) -> Connection:
    """Enable usage in a `with` block by returning the engine."""
    con = self.connect()
    if self.engine:
        self.metadata.reflect(bind=self.engine)
    return con
__exit__(exc_type, exc_val, exc_tb)

Dispose of the engine when exiting the with block.

Source code in gensor/db/connection.py
90
91
92
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Dispose of the engine when exiting the `with` block."""
    self.dispose()
connect()

Connect to the database and initialize the engine. If engine is None > create it with verified path > reflect. After connecting, ensure the timeseries_metadata table is present.

Source code in gensor/db/connection.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def connect(self) -> Connection:
    """Connect to the database and initialize the engine.
    If engine is None > create it with verified path > reflect.
    After connecting, ensure the timeseries_metadata table is present.
    """
    if self.engine is None:
        sqlite_path = self._verify_path()
        self.engine = create_engine(sqlite_path)

    connection = self.engine.connect()

    self.create_metadata()

    return connection
create_metadata()

Create a metadata table if it doesn't exist yet and store ts metadata.

Source code in gensor/db/connection.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def create_metadata(self) -> Table | None:
    """Create a metadata table if it doesn't exist yet and store ts metadata."""

    metadata_table = Table(
        "__timeseries_metadata__",
        self.metadata,
        Column("id", Integer, primary_key=True),
        Column("table_name", String, unique=True),
        Column("location", String),
        Column("variable", String),
        Column("unit", String),
        Column("start", String, nullable=True),
        Column("end", String, nullable=True),
        Column("extra", JSON, nullable=True),
        Column("cls", String, nullable=False),
    )

    if self.engine:
        metadata_table.create(self.engine, checkfirst=True)
        self.metadata.reflect(bind=self.engine)
        return metadata_table
    else:
        logger.info("Engine does not exist.")
        return None
create_table(schema_name, column_name)

Create a table in the database.

Schema name is a string representing the location, sensor, variable measured and unit of measurement. This is a way of preserving the metadata of the Timeseries. The index is always timestamp and the column name is dynamicly create from the measured variable.

Source code in gensor/db/connection.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def create_table(self, schema_name: str, column_name: str) -> Table | None:
    """Create a table in the database.

    Schema name is a string representing the location, sensor, variable measured and
    unit of measurement. This is a way of preserving the metadata of the Timeseries.
    The index is always `timestamp` and the column name is dynamicly create from
    the measured variable.
    """

    if schema_name in self.metadata.tables:
        return self.metadata.tables[schema_name]

    ts_table = Table(
        schema_name,
        self.metadata,
        Column("timestamp", String, primary_key=True),
        Column(column_name, Float),
        info={},
    )

    if self.engine:
        ts_table.create(self.engine, checkfirst=True)
        self.metadata.reflect(bind=self.engine)
        return ts_table
    else:
        logger.info("Engine does not exist.")
        return None
dispose()

Dispose of the engine, closing all connections.

Source code in gensor/db/connection.py
76
77
78
79
80
81
def dispose(self) -> None:
    """Dispose of the engine, closing all connections."""
    if self.metadata:
        self.metadata.clear()
    if self.engine:
        self.engine.dispose()
get_timeseries_metadata(location=None, variable=None, unit=None, **kwargs)

List timeseries available in the database.

Parameters:

Name Type Description Default
location str

Location attribute to match.

None
variable str

Variable attribute to match.

None
unit str

Unit attribute to match.

None
**kwargs dict

Additional filters. Must match the attributes of the Timeseries instance user is trying to retrieve.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The name of the matching table or None if no table is found.

Source code in gensor/db/connection.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def get_timeseries_metadata(
    self,
    location: str | None = None,
    variable: str | None = None,
    unit: str | None = None,
    **kwargs: dict,
) -> pd.DataFrame:
    """
    List timeseries available in the database.

    Parameters:
        location (str): Location attribute to match.
        variable (str): Variable attribute to match.
        unit (str): Unit attribute to match.
        **kwargs: Additional filters. Must match the attributes of the
            Timeseries instance user is trying to retrieve.

    Returns:
        pd.DataFrame: The name of the matching table or None if no table is found.
    """
    with self as con:
        if "__timeseries_metadata__" not in self.metadata.tables:
            logger.info("The metadata table does not exist in this database.")
            return pd.DataFrame()

        metadata_table = self.metadata.tables["__timeseries_metadata__"]

        base_filters = []

        if location is not None:
            base_filters.append(metadata_table.c.location.ilike(location))
        if variable is not None:
            base_filters.append(metadata_table.c.variable.ilike(variable))
        if unit is not None:
            base_filters.append(metadata_table.c.unit.ilike(unit))

        extra_filters = [
            func.json_extract(metadata_table.c.extra, f"$.{k}").ilike(v)
            for k, v in kwargs.items()
            if v is not None
        ]

        # True in and_(True, *arg) fixis FutureWarning of dissallowing empty
        # filters in the future.
        query = metadata_table.select().where(
            and_(True, *base_filters, *extra_filters)
        )

        result = con.execute(query).fetchall()

        return pd.DataFrame(result).set_index("id") if result else pd.DataFrame()

exceptions

IndexOutOfRangeError

Bases: IndexError

Custom exception raised when an index is out of range in the dataset.

Source code in gensor/exceptions.py
37
38
39
40
41
42
43
class IndexOutOfRangeError(IndexError):
    """Custom exception raised when an index is out of range in the dataset."""

    def __init__(self, index: int, dataset_size: int) -> None:
        super().__init__(
            f"Index {index} is out of range for the dataset with {dataset_size} timeseries."
        )

InvalidMeasurementTypeError

Bases: ValueError

Raised when a timeseries of a wrong measurement type is operated upon.

Source code in gensor/exceptions.py
1
2
3
4
5
6
7
class InvalidMeasurementTypeError(ValueError):
    """Raised when a timeseries of a wrong measurement type is operated upon."""

    def __init__(self, expected_type: str = "pressure") -> None:
        self.expected_type = expected_type
        message = f"Timeseries must be of measurement type '{self.expected_type}'."
        super().__init__(message)

MissingInputError

Bases: ValueError

Raised when a required input is missing.

Source code in gensor/exceptions.py
10
11
12
13
14
15
16
17
class MissingInputError(ValueError):
    """Raised when a required input is missing."""

    def __init__(self, input_name: str, message: str | None = None) -> None:
        self.input_name = input_name
        if message is None:
            message = f"Missing required input: '{self.input_name}'."
        super().__init__(message)

TimeseriesUnequal

Bases: ValueError

Raised when Timeseries objects are compared and are unequal.

Source code in gensor/exceptions.py
26
27
28
29
30
31
32
33
34
class TimeseriesUnequal(ValueError):
    """Raised when Timeseries objects are compared and are unequal."""

    def __init__(self, *args: object, message: str | None = None) -> None:
        message = (
            "Timeseries objects must have the same location, sensor, variable, and \
        unit to be added together."
        )
        super().__init__(message, *args)

io

read

Fetching the data from various sources.

TODO: Fix up the read_from_sql() function to actually work properly.

read_from_api()

Fetch data from the API.

Source code in gensor/io/read.py
190
191
192
def read_from_api() -> Dataset:
    """Fetch data from the API."""
    return NotImplemented

read_from_csv(path, file_format='vanessen', **kwargs)

Loads the data from csv files with given file_format and returns a list of Timeseries objects.

Parameters:

Name Type Description Default
path Path

The path to the file or directory containing the files.

required
**kwargs dict

Optional keyword arguments passed to the parsers: * serial_number_pattern (str): The regex pattern to extract the serial number from the file. * location_pattern (str): The regex pattern to extract the station from the file. * col_names (list): The column names for the dataframe. * location (str): Name of the location of the timeseries. * sensor (str): Sensor serial number.

{}
Source code in gensor/io/read.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def read_from_csv(
    path: Path, file_format: Literal["vanessen", "plain"] = "vanessen", **kwargs: Any
) -> Dataset | Timeseries:
    """Loads the data from csv files with given file_format and returns a list of Timeseries objects.

    Parameters:
        path (Path): The path to the file or directory containing the files.
        **kwargs (dict): Optional keyword arguments passed to the parsers:
            * serial_number_pattern (str): The regex pattern to extract the serial number from the file.
            * location_pattern (str): The regex pattern to extract the station from the file.
            * col_names (list): The column names for the dataframe.
            * location (str): Name of the location of the timeseries.
            * sensor (str): Sensor serial number.
    """

    parsers = {
        "vanessen": parse_vanessen_csv,
        "plain": parse_plain,
        # more parser to be implemented
    }

    if not isinstance(path, Path):
        message = "The path argument must be a Path object."
        raise TypeError(message)

    if path.is_dir() and not any(
        file.is_file() and file.suffix.lower() == ".csv" for file in path.iterdir()
    ):
        logger.info("No CSV files found. Operation skipped.")
        return Dataset()

    files = (
        [
            file
            for file in path.iterdir()
            if file.is_file() and file.suffix.lower() == ".csv"
        ]
        if path.is_dir()
        else [path]
        if path.suffix.lower() == ".csv"
        else []
    )

    if not files:
        logger.info("No CSV files found. Operation skipped.")
        return Dataset()

    parser = parsers[file_format]

    ds: Dataset = Dataset()

    for f in files:
        logger.info(f"Loading file: {f}")
        ts_in_file = parser(f, **kwargs)
        ds.add(ts_in_file)

    # If there is only one Timeseries in Dataset (as in the condition), ds[0] will always
    # be a Timeseries; so the line below does not introduce potential None in the return
    return ds[0] if len(ds) == 1 else ds  # type: ignore[return-value]

read_from_sql(db, load_all=True, location=None, variable=None, unit=None, timestamp_start=None, timestamp_stop=None, **kwargs)

Returns the timeseries or a dataset from a SQL database.

Parameters:

Name Type Description Default
db DatabaseConnection

The database connection object.

required
load_all bool

Whether to load all timeseries from the database.

True
location str

The station name.

None
variable str

The measurement type.

None
unit str

The unit of the measurement.

None
timestamp_start Timestamp

Start timestamp filter.

None
timestamp_stop Timestamp

End timestamp filter.

None
**kwargs dict

Any additional filters matching attributes of the particular timeseries.

{}

Returns:

Name Type Description
Dataset Timeseries | Dataset

Dataset with retrieved objects or an empty Dataset.

Source code in gensor/io/read.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def read_from_sql(
    db: DatabaseConnection,
    load_all: bool = True,
    location: str | None = None,
    variable: str | None = None,
    unit: str | None = None,
    timestamp_start: pd.Timestamp | None = None,
    timestamp_stop: pd.Timestamp | None = None,
    **kwargs: dict,
) -> Timeseries | Dataset:
    """Returns the timeseries or a dataset from a SQL database.

    Parameters:
        db (DatabaseConnection): The database connection object.
        load_all (bool): Whether to load all timeseries from the database.
        location (str): The station name.
        variable (str): The measurement type.
        unit (str): The unit of the measurement.
        timestamp_start (pd.Timestamp, optional): Start timestamp filter.
        timestamp_stop (pd.Timestamp, optional): End timestamp filter.
        **kwargs (dict): Any additional filters matching attributes of the particular
            timeseries.

    Returns:
        Dataset: Dataset with retrieved objects or an empty Dataset.
    """

    def _read_data_from_schema(schema_name: str) -> Any:
        """Read data from the table and apply the timestamp filter.

        Parameters:
            schema_name (str): name of the schema in SQLite database.

        Returns:
            pd.Series: results of the query or an empty pd.Series if none are found.
        """
        with db as con:
            schema = db.metadata.tables[schema_name]
            data_query = select(schema)

            if timestamp_start or timestamp_stop:
                if timestamp_start:
                    data_query = data_query.where(schema.c.timestamp >= timestamp_start)
                if timestamp_stop:
                    data_query = data_query.where(schema.c.timestamp <= timestamp_stop)

            ts = pd.read_sql(
                data_query,
                con=con,
                parse_dates={"timestamp": "%Y-%m-%dT%H:%M:%S%z"},
                index_col="timestamp",
            ).squeeze()

        if ts.empty:
            message = f"No data found in table {schema_name}"
            logger.warning(message)

        return ts.sort_index()

    def _create_object(data: pd.Series, metadata: dict) -> Any:
        """Create the appropriate object for timeseries."""

        core_metadata = {
            "location": metadata["location"],
            "variable": metadata["variable"],
            "unit": metadata["unit"],
        }

        extra_metadata = metadata.get("extra", {})

        ts_metadata = {**core_metadata, **extra_metadata}

        cls = metadata["cls"]
        module_name, class_name = cls.rsplit(".", 1)
        module = import_module(module_name)

        TimeseriesClass = getattr(module, class_name)
        ts_object = TimeseriesClass(ts=data, **ts_metadata)

        return ts_object

    metadata_df = (
        db.get_timeseries_metadata(
            location=location, variable=variable, unit=unit, **kwargs
        )
        if not load_all
        else db.get_timeseries_metadata()
    )

    if metadata_df.empty:
        message = "No schemas matched the specified filters."
        raise ValueError(message)

    timeseries_list = []

    for row in metadata_df.to_dict(orient="records"):
        try:
            schema_name = row.pop("table_name")
            data = _read_data_from_schema(schema_name)
            timeseries_obj = _create_object(data, row)
            timeseries_list.append(timeseries_obj)
        except (ValueError, TypeError):
            logger.exception(f"Skipping schema {schema_name} due to error.")

    return Dataset(timeseries=timeseries_list) if timeseries_list else Dataset()

log

set_log_level(level)

Set the logging level for the package.

Source code in gensor/log.py
4
5
6
7
def set_log_level(level: str) -> None:
    """Set the logging level for the package."""
    logger = logging.getLogger("gensor")
    logger.setLevel(level.upper())

parse

parse_plain(path, **kwargs)

Parse a simple csv without metadata header, just columns with variables

Parameters:

Name Type Description Default
path Path

The path to the file.

required

Returns:

Name Type Description
list list[Timeseries]

A list of Timeseries objects.

Source code in gensor/parse/plain.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def parse_plain(path: Path, **kwargs: Any) -> list[Timeseries]:
    """Parse a simple csv without metadata header, just columns with variables

    Parameters:
        path (Path): The path to the file.

    Returns:
        list: A list of Timeseries objects.
    """

    column_names = kwargs.get("col_names", ["timestamp", "pressure", "temperature"])

    encoding = detect_encoding(path, num_bytes=10_000)

    df = read_csv(
        path,
        encoding=encoding,
        skipfooter=1,
        skip_blank_lines=True,
        header=None,
        skiprows=1,
        index_col="timestamp",
        names=column_names,
        engine="python",
    )

    df = handle_timestamps(df, kwargs.get("timezone", "UTC"))

    ts_list = []

    for col in df.columns:
        if col in VARIABLE_TYPES_AND_UNITS:
            unit = VARIABLE_TYPES_AND_UNITS[col][0]
            ts_list.append(
                Timeseries(
                    ts=df[col],
                    # Validation will be done in Pydantic
                    variable=col,  # type: ignore[arg-type]
                    location=kwargs["location"],
                    sensor=kwargs["sensor"],
                    # Validation will be done in Pydantic
                    unit=unit,  # type: ignore[arg-type]
                )
            )
        else:
            message = (
                "Unsupported variable: {col}. Please provide a valid variable type."
            )
            raise ValueError(message)

    return ts_list

parse_vanessen_csv(path, **kwargs)

Parses a van Essen csv file and returns a list of Timeseries objects. At this point it does not matter whether the file is a barometric or piezometric logger file.

The function will use regex patterns to extract the serial number and station from the file. It is important to use the appropriate regex patterns, particularily for the station. If the default patterns are not working (whihc most likely will be the case), the user should provide their own patterns. The patterns can be provided as keyword arguments to the function and it is possible to use OR (|) in the regex pattern.

Warning

A better check for the variable type and units has to be implemented.

Parameters:

Name Type Description Default
path Path

The path to the file.

required

Other Parameters:

Name Type Description
serial_number_pattern str

The regex pattern to extract the serial number from the file.

location_pattern str

The regex pattern to extract the station from the file.

col_names list

The column names for the dataframe.

Returns:

Name Type Description
list list[Timeseries]

A list of Timeseries objects.

Source code in gensor/parse/vanessen.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def parse_vanessen_csv(path: Path, **kwargs: Any) -> list[Timeseries]:
    """Parses a van Essen csv file and returns a list of Timeseries objects. At this point it
    does not matter whether the file is a barometric or piezometric logger file.

    The function will use regex patterns to extract the serial number and station from the file. It is
    important to use the appropriate regex patterns, particularily for the station. If the default patterns
    are not working (whihc most likely will be the case), the user should provide their own patterns. The patterns
    can be provided as keyword arguments to the function and it is possible to use OR (|) in the regex pattern.

    !!! warning

        A better check for the variable type and units has to be implemented.

    Parameters:
        path (Path): The path to the file.

    Other Parameters:
        serial_number_pattern (str): The regex pattern to extract the serial number from the file.
        location_pattern (str): The regex pattern to extract the station from the file.
        col_names (list): The column names for the dataframe.

    Returns:
        list: A list of Timeseries objects.
    """

    patterns = {
        "sensor": kwargs.get("serial_number_pattern", r"[A-Za-z]{2}\d{3,4}"),
        "location": kwargs.get(
            "location_pattern", r"[A-Za-z]{2}\d{2}[A-Za-z]{1}|Barodiver"
        ),
        "timezone": kwargs.get("timezone_pattern", r"UTC[+-]?\d+"),
    }

    column_names = kwargs.get("col_names", ["timestamp", "pressure", "temperature"])

    encoding = detect_encoding(path, num_bytes=10_000)

    with path.open(mode="r", encoding=encoding) as f:
        text = f.read()

        metadata = get_metadata(text, patterns)

        if not metadata:
            logger.info(f"Skipping file {path} due to missing metadata.")
            return []

        data_start = "Date/time"
        data_end = "END OF DATA FILE"

        df = get_data(text, data_start, data_end, column_names)

        df = handle_timestamps(df, metadata.get("timezone", "UTC"))

        ts_list = []

        for col in df.columns:
            if col in VARIABLE_TYPES_AND_UNITS:
                unit = VARIABLE_TYPES_AND_UNITS[col][0]
                ts_list.append(
                    Timeseries(
                        ts=df[col],
                        # Validation will be done in Pydantic
                        variable=col,  # type: ignore[arg-type]
                        location=metadata.get("location"),
                        sensor=metadata.get("sensor"),
                        # Validation will be done in Pydantic
                        unit=unit,  # type: ignore[arg-type]
                    )
                )
            else:
                message = (
                    "Unsupported variable: {col}. Please provide a valid variable type."
                )
                raise ValueError(message)

    return ts_list

plain

parse_plain(path, **kwargs)

Parse a simple csv without metadata header, just columns with variables

Parameters:

Name Type Description Default
path Path

The path to the file.

required

Returns:

Name Type Description
list list[Timeseries]

A list of Timeseries objects.

Source code in gensor/parse/plain.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def parse_plain(path: Path, **kwargs: Any) -> list[Timeseries]:
    """Parse a simple csv without metadata header, just columns with variables

    Parameters:
        path (Path): The path to the file.

    Returns:
        list: A list of Timeseries objects.
    """

    column_names = kwargs.get("col_names", ["timestamp", "pressure", "temperature"])

    encoding = detect_encoding(path, num_bytes=10_000)

    df = read_csv(
        path,
        encoding=encoding,
        skipfooter=1,
        skip_blank_lines=True,
        header=None,
        skiprows=1,
        index_col="timestamp",
        names=column_names,
        engine="python",
    )

    df = handle_timestamps(df, kwargs.get("timezone", "UTC"))

    ts_list = []

    for col in df.columns:
        if col in VARIABLE_TYPES_AND_UNITS:
            unit = VARIABLE_TYPES_AND_UNITS[col][0]
            ts_list.append(
                Timeseries(
                    ts=df[col],
                    # Validation will be done in Pydantic
                    variable=col,  # type: ignore[arg-type]
                    location=kwargs["location"],
                    sensor=kwargs["sensor"],
                    # Validation will be done in Pydantic
                    unit=unit,  # type: ignore[arg-type]
                )
            )
        else:
            message = (
                "Unsupported variable: {col}. Please provide a valid variable type."
            )
            raise ValueError(message)

    return ts_list

utils

detect_encoding(path, num_bytes=1024)

Detect the encoding of a file using chardet.

Parameters:

Name Type Description Default
path Path

The path to the file.

required
num_bytes int

Number of bytes to read for encoding detection (default is 1024).

1024

Returns:

Name Type Description
str str

The detected encoding of the file.

Source code in gensor/parse/utils.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def detect_encoding(path: Path, num_bytes: int = 1024) -> str:
    """Detect the encoding of a file using chardet.

    Parameters:
        path (Path): The path to the file.
        num_bytes (int): Number of bytes to read for encoding detection (default is 1024).

    Returns:
        str: The detected encoding of the file.
    """
    with path.open("rb") as f:
        raw_data = f.read(num_bytes)
    result = chardet.detect(raw_data)
    return result["encoding"] or "utf-8"

get_data(text, data_start, data_end, column_names)

Search for data in the file.

Parameters:

Name Type Description Default
text str

string obtained from the CSV file.

required
data_start str

string at the first row of the data.

required
data_end str

string at the last row of the data.

required
column_names list

list of expected column names.

required

Returns:

Type Description
DataFrame

pd.DataFrame

Source code in gensor/parse/utils.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def get_data(
    text: str, data_start: str, data_end: str, column_names: list
) -> DataFrame:
    """Search for data in the file.

    Parameters:
        text (str): string obtained from the CSV file.
        data_start (str): string at the first row of the data.
        data_end (str): string at the last row of the data.
        column_names (list): list of expected column names.

    Returns:
        pd.DataFrame
    """

    data_io = StringIO(text[text.index(data_start) : text.index(data_end)])

    df = read_csv(
        data_io, skiprows=1, header=None, names=column_names, index_col="timestamp"
    )

    return df

get_metadata(text, patterns)

Search for metadata in the file header with given regex patterns.

Parameters:

Name Type Description Default
text str

string obtained from the CSV file.

required
patterns dict

regex patterns matching the location and sensor information.

required

Returns:

Name Type Description
dict dict

metadata of the timeseries.

Source code in gensor/parse/utils.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def get_metadata(text: str, patterns: dict) -> dict:
    """Search for metadata in the file header with given regex patterns.

    Parameters:
        text (str): string obtained from the CSV file.
        patterns (dict): regex patterns matching the location and sensor information.

    Returns:
        dict: metadata of the timeseries.
    """
    metadata = {}

    for k, v in patterns.items():
        match = re.search(v, text)
        metadata[k] = match.group() if match else None

    if metadata["sensor"] is None or metadata["location"] is None:
        return {}

    return metadata

handle_timestamps(df, tz_string)

Converts timestamps in the dataframe to the specified timezone (e.g., 'UTC+1').

Parameters:

Name Type Description Default
df DataFrame

The dataframe with timestamps.

required
tz_string str

A timezone string like 'UTC+1' or 'UTC-5'.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The dataframe with timestamps converted to UTC.

Source code in gensor/parse/utils.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def handle_timestamps(df: DataFrame, tz_string: str) -> DataFrame:
    """Converts timestamps in the dataframe to the specified timezone (e.g., 'UTC+1').

    Parameters:
        df (pd.DataFrame): The dataframe with timestamps.
        tz_string (str): A timezone string like 'UTC+1' or 'UTC-5'.

    Returns:
        pd.DataFrame: The dataframe with timestamps converted to UTC.
    """
    timezone = tz.gettz(tz_string)

    df.index = to_datetime(df.index).tz_localize(timezone)
    df.index = df.index.tz_convert("UTC")

    return df

vanessen

Logic parsing CSV files from van Essen Instruments Divers.

parse_vanessen_csv(path, **kwargs)

Parses a van Essen csv file and returns a list of Timeseries objects. At this point it does not matter whether the file is a barometric or piezometric logger file.

The function will use regex patterns to extract the serial number and station from the file. It is important to use the appropriate regex patterns, particularily for the station. If the default patterns are not working (whihc most likely will be the case), the user should provide their own patterns. The patterns can be provided as keyword arguments to the function and it is possible to use OR (|) in the regex pattern.

Warning

A better check for the variable type and units has to be implemented.

Parameters:

Name Type Description Default
path Path

The path to the file.

required

Other Parameters:

Name Type Description
serial_number_pattern str

The regex pattern to extract the serial number from the file.

location_pattern str

The regex pattern to extract the station from the file.

col_names list

The column names for the dataframe.

Returns:

Name Type Description
list list[Timeseries]

A list of Timeseries objects.

Source code in gensor/parse/vanessen.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def parse_vanessen_csv(path: Path, **kwargs: Any) -> list[Timeseries]:
    """Parses a van Essen csv file and returns a list of Timeseries objects. At this point it
    does not matter whether the file is a barometric or piezometric logger file.

    The function will use regex patterns to extract the serial number and station from the file. It is
    important to use the appropriate regex patterns, particularily for the station. If the default patterns
    are not working (whihc most likely will be the case), the user should provide their own patterns. The patterns
    can be provided as keyword arguments to the function and it is possible to use OR (|) in the regex pattern.

    !!! warning

        A better check for the variable type and units has to be implemented.

    Parameters:
        path (Path): The path to the file.

    Other Parameters:
        serial_number_pattern (str): The regex pattern to extract the serial number from the file.
        location_pattern (str): The regex pattern to extract the station from the file.
        col_names (list): The column names for the dataframe.

    Returns:
        list: A list of Timeseries objects.
    """

    patterns = {
        "sensor": kwargs.get("serial_number_pattern", r"[A-Za-z]{2}\d{3,4}"),
        "location": kwargs.get(
            "location_pattern", r"[A-Za-z]{2}\d{2}[A-Za-z]{1}|Barodiver"
        ),
        "timezone": kwargs.get("timezone_pattern", r"UTC[+-]?\d+"),
    }

    column_names = kwargs.get("col_names", ["timestamp", "pressure", "temperature"])

    encoding = detect_encoding(path, num_bytes=10_000)

    with path.open(mode="r", encoding=encoding) as f:
        text = f.read()

        metadata = get_metadata(text, patterns)

        if not metadata:
            logger.info(f"Skipping file {path} due to missing metadata.")
            return []

        data_start = "Date/time"
        data_end = "END OF DATA FILE"

        df = get_data(text, data_start, data_end, column_names)

        df = handle_timestamps(df, metadata.get("timezone", "UTC"))

        ts_list = []

        for col in df.columns:
            if col in VARIABLE_TYPES_AND_UNITS:
                unit = VARIABLE_TYPES_AND_UNITS[col][0]
                ts_list.append(
                    Timeseries(
                        ts=df[col],
                        # Validation will be done in Pydantic
                        variable=col,  # type: ignore[arg-type]
                        location=metadata.get("location"),
                        sensor=metadata.get("sensor"),
                        # Validation will be done in Pydantic
                        unit=unit,  # type: ignore[arg-type]
                    )
                )
            else:
                message = (
                    "Unsupported variable: {col}. Please provide a valid variable type."
                )
                raise ValueError(message)

    return ts_list

processing

compensation

Compensating the raw data from the absolute pressure transducer to the actual water level using the barometric pressure data.

Because van Essen Instrument divers are non-vented pressure transducers, to obtain the pressure resulting from the water column above the logger (i.e. the water level), the barometric pressure must be subtracted from the raw pressure measurements. In the first step the function aligns the two series to the same time step and then subtracts the barometric pressure from the raw pressure measurements. For short time periods (when for instance a slug test is performed) the barometric pressure can be provided as a single float value.

Subsequently the function filters out all records where the absolute water column is less than or equal to the cutoff value. This is because when the logger is out of the water when the measurement is taken, the absolute water column is close to zero, producing erroneous results and spikes in the plots. The cutoff value is set to 5 cm by default, but can be adjusted using the cutoff_wc kwarg.

Functions:

compensate: Compensate raw sensor pressure measurement with barometric pressure.

Compensator

Bases: BaseModel

Compensate raw sensor pressure measurement with barometric pressure.

Attributes:

Name Type Description
ts Timeseries

Raw sensor timeseries

barometric Timeseries | float

Barometric pressure timeseries or a single float value. If a float value is provided, it is assumed to be in cmH2O.

Source code in gensor/processing/compensation.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class Compensator(pyd.BaseModel):
    """Compensate raw sensor pressure measurement with barometric pressure.

    Attributes:
        ts (Timeseries): Raw sensor timeseries
        barometric (Timeseries | float): Barometric pressure timeseries or a single
            float value. If a float value is provided, it is assumed to be in cmH2O.
    """

    ts: Timeseries
    barometric: Timeseries | float

    @pyd.field_validator("ts", "barometric", mode="before")
    def validate_timeseries_type(cls, v: Timeseries) -> Timeseries:
        if isinstance(v, Timeseries) and v.variable != "pressure":
            raise InvalidMeasurementTypeError()
        return v

    @pyd.field_validator("ts")
    def validate_sensor_information(cls, v: Timeseries) -> Timeseries:
        if v.sensor is not None and not v.sensor_alt:
            raise MissingInputError("sensor_alt")
        return v

    def compensate(
        self,
        alignment_period: Literal["D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"],
        threshold_wc: float | None,
        fieldwork_dates: list | None,
    ) -> Timeseries | None:
        """Perform compensation.

        Parameters:
            alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']: The alignment period for the timeseries.
                Default is 'h'. See pandas offset aliases for definitinos.
            threshold_wc (float): The threshold for the absolute water column.
            fieldwork_dates (Optional[list]): List of dates when fieldwork was done. All
                measurement from a fieldwork day will be set to None.

        Returns:
            Timeseries: A new Timeseries instance with the compensated data and updated unit and variable. Optionally removed outliers are included.
        """

        resample_params = {"freq": alignment_period, "agg_func": pd.Series.mean}
        resampled_ts = self.ts.resample(**resample_params)

        if isinstance(self.barometric, Timeseries):
            if self.ts == self.barometric:
                print("Skipping compensation: both timeseries are the same.")
                return None
            resampled_baro = self.barometric.resample(**resample_params).ts

        elif isinstance(self.barometric, float):
            resampled_baro = pd.Series(
                [self.barometric] * len(resampled_ts.ts), index=resampled_ts.ts.index
            )

        # dividing by 100 to convert water column from cmH2O to mH2O
        watercolumn_ts = resampled_ts.ts.sub(resampled_baro).divide(100).dropna()

        if not isinstance(watercolumn_ts.index, pd.DatetimeIndex):
            watercolumn_ts.index = pd.to_datetime(watercolumn_ts.index)

        if fieldwork_dates:
            fieldwork_timestamps = pd.to_datetime(fieldwork_dates).tz_localize(
                watercolumn_ts.index.tz
            )

            watercolumn_ts.loc[
                watercolumn_ts.index.normalize().isin(fieldwork_timestamps)
            ] = None

        if threshold_wc:
            watercolumn_ts_filtered = watercolumn_ts[
                watercolumn_ts.abs() > threshold_wc
            ]

            dropped_outliers = watercolumn_ts[watercolumn_ts.abs() <= threshold_wc]

            print(
                f"{len(dropped_outliers)} records \
                    dropped due to low water column."
            )
            gwl = watercolumn_ts_filtered.add(float(resampled_ts.sensor_alt or 0))

            compensated = resampled_ts.model_copy(
                update={
                    "ts": gwl,
                    "outliers": dropped_outliers,
                    "unit": "m asl",
                    "variable": "head",
                },
                deep=True,
            )
        else:
            gwl = watercolumn_ts.add(float(resampled_ts.sensor_alt or 0))

            compensated = resampled_ts.model_copy(
                update={"ts": gwl, "unit": "m asl", "variable": "head"}, deep=True
            )

        return compensated
compensate(alignment_period, threshold_wc, fieldwork_dates)

Perform compensation.

Parameters:

Name Type Description Default
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

required
threshold_wc float

The threshold for the absolute water column.

required
fieldwork_dates Optional[list]

List of dates when fieldwork was done. All measurement from a fieldwork day will be set to None.

required

Returns:

Name Type Description
Timeseries Timeseries | None

A new Timeseries instance with the compensated data and updated unit and variable. Optionally removed outliers are included.

Source code in gensor/processing/compensation.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def compensate(
    self,
    alignment_period: Literal["D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"],
    threshold_wc: float | None,
    fieldwork_dates: list | None,
) -> Timeseries | None:
    """Perform compensation.

    Parameters:
        alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']: The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float): The threshold for the absolute water column.
        fieldwork_dates (Optional[list]): List of dates when fieldwork was done. All
            measurement from a fieldwork day will be set to None.

    Returns:
        Timeseries: A new Timeseries instance with the compensated data and updated unit and variable. Optionally removed outliers are included.
    """

    resample_params = {"freq": alignment_period, "agg_func": pd.Series.mean}
    resampled_ts = self.ts.resample(**resample_params)

    if isinstance(self.barometric, Timeseries):
        if self.ts == self.barometric:
            print("Skipping compensation: both timeseries are the same.")
            return None
        resampled_baro = self.barometric.resample(**resample_params).ts

    elif isinstance(self.barometric, float):
        resampled_baro = pd.Series(
            [self.barometric] * len(resampled_ts.ts), index=resampled_ts.ts.index
        )

    # dividing by 100 to convert water column from cmH2O to mH2O
    watercolumn_ts = resampled_ts.ts.sub(resampled_baro).divide(100).dropna()

    if not isinstance(watercolumn_ts.index, pd.DatetimeIndex):
        watercolumn_ts.index = pd.to_datetime(watercolumn_ts.index)

    if fieldwork_dates:
        fieldwork_timestamps = pd.to_datetime(fieldwork_dates).tz_localize(
            watercolumn_ts.index.tz
        )

        watercolumn_ts.loc[
            watercolumn_ts.index.normalize().isin(fieldwork_timestamps)
        ] = None

    if threshold_wc:
        watercolumn_ts_filtered = watercolumn_ts[
            watercolumn_ts.abs() > threshold_wc
        ]

        dropped_outliers = watercolumn_ts[watercolumn_ts.abs() <= threshold_wc]

        print(
            f"{len(dropped_outliers)} records \
                dropped due to low water column."
        )
        gwl = watercolumn_ts_filtered.add(float(resampled_ts.sensor_alt or 0))

        compensated = resampled_ts.model_copy(
            update={
                "ts": gwl,
                "outliers": dropped_outliers,
                "unit": "m asl",
                "variable": "head",
            },
            deep=True,
        )
    else:
        gwl = watercolumn_ts.add(float(resampled_ts.sensor_alt or 0))

        compensated = resampled_ts.model_copy(
            update={"ts": gwl, "unit": "m asl", "variable": "head"}, deep=True
        )

    return compensated

compensate(raw, barometric, alignment_period='h', threshold_wc=None, fieldwork_dates=None, interpolate_method=None)

Constructor for the Comensator object.

Parameters:

Name Type Description Default
raw Timeseries | Dataset

Raw sensor timeseries

required
barometric Timeseries | float

Barometric pressure timeseries or a single float value. If a float value is provided, it is assumed to be in cmH2O.

required
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

'h'
threshold_wc float

The threshold for the absolute water column. If it is provided, the records below that threshold are dropped.

None
fieldwork_dates Dict[str, list]

Dictionary of location name and a list of fieldwork days. All records on the fieldwork day are set to None.

None
interpolate_method str

String representing the interpolate method as in pd.Series.interpolate() method.

None
Source code in gensor/processing/compensation.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def compensate(
    raw: Timeseries | Dataset,
    barometric: Timeseries | float,
    alignment_period: Literal[
        "D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"
    ] = "h",
    threshold_wc: float | None = None,
    fieldwork_dates: dict | None = None,
    interpolate_method: str | None = None,
) -> Timeseries | Dataset | None:
    """Constructor for the Comensator object.

    Parameters:
        raw (Timeseries | Dataset): Raw sensor timeseries
        barometric (Timeseries | float): Barometric pressure timeseries or a single
            float value. If a float value is provided, it is assumed to be in cmH2O.
        alignment_period (Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']): The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float): The threshold for the absolute water column. If it is
            provided, the records below that threshold are dropped.
        fieldwork_dates (Dict[str, list]): Dictionary of location name and a list of
            fieldwork days. All records on the fieldwork day are set to None.
        interpolate_method (str): String representing the interpolate method as in
            pd.Series.interpolate() method.
    """
    if fieldwork_dates is None:
        fieldwork_dates = {}

    def _compensate_one(
        raw: Timeseries, fieldwork_dates: list | None
    ) -> Timeseries | None:
        comp = Compensator(ts=raw, barometric=barometric)
        compensated = comp.compensate(
            alignment_period=alignment_period,
            threshold_wc=threshold_wc,
            fieldwork_dates=fieldwork_dates,
        )
        if compensated is not None and interpolate_method:
            # .interpolate() called on Timeseries object is wrapped to return a
            # Timeseries object from the original pandas.Series.interpolate().
            return compensated.interpolate(method=interpolate_method)  # type: ignore[no-any-return]

        else:
            return compensated

    if isinstance(raw, Timeseries):
        dates = fieldwork_dates.get(raw.location)
        return _compensate_one(raw, dates)

    elif isinstance(raw, Dataset):
        compensated_series = []
        for item in raw:
            dates = fieldwork_dates.get(item.location)
            compensated_series.append(_compensate_one(item, dates))

        return raw.model_copy(update={"timeseries": compensated_series}, deep=True)

smoothing

Tools for smoothing the data.

smooth_data(data, window=5, method='rolling_mean', print_statistics=False, inplace=False, plot=False)

Smooth a time series using a rolling mean or median.

Parameters:

Name Type Description Default
data Series

The time series data.

required
window int

The size of the window for the rolling mean or median. Defaults to 5.

5
method str

The method to use for smoothing. Either 'rolling_mean' or 'rolling_median'. Defaults to 'rolling_mean'.

'rolling_mean'

Returns:

Type Description
Series | None

pandas.Series: The smoothed time series.

Source code in gensor/processing/smoothing.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def smooth_data(
    data: Timeseries,
    window: int = 5,
    method: str = "rolling_mean",
    print_statistics: bool = False,
    inplace: bool = False,
    plot: bool = False,
) -> Series | None:
    """Smooth a time series using a rolling mean or median.

    Args:
        data (pandas.Series): The time series data.
        window (int): The size of the window for the rolling mean or median. Defaults to 5.
        method (str): The method to use for smoothing. Either 'rolling_mean' or 'rolling_median'. Defaults to 'rolling_mean'.

    Returns:
        pandas.Series: The smoothed time series.
    """
    if method == "rolling_mean":
        smoothed_data = data.ts.rolling(window=window, center=True).mean()
    elif method == "rolling_median":
        smoothed_data = data.ts.rolling(window=window, center=True).median()
    else:
        raise NotImplementedError()

    valid_indices = smoothed_data.notna()
    original_data_aligned = data.ts[valid_indices]
    smoothed_data_aligned = smoothed_data[valid_indices]

    if print_statistics:
        mse = root_mean_squared_error(original_data_aligned, smoothed_data_aligned)
        print(f"Mean Squared Error of {method}: {mse:.2f}")

    if plot:
        plt.figure(figsize=(12, 6))
        plt.plot(
            data.timeseries.index, data.timeseries, label="Original Data", color="black"
        )
        plt.plot(
            smoothed_data.index,
            smoothed_data,
            label=f"Moving Average ({method})",
            color="green",
            linestyle="dotted",
        )

        plt.legend()
        plt.title("Groundwater Level with Moving Average")
        plt.xlabel("Date")
        plt.ylabel("Groundwater Level")
        plt.show()

    if inplace:
        data.ts = smoothed_data
        return None
    else:
        return smoothed_data

transform

Transformation

Source code in gensor/processing/transform.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class Transformation:
    def __init__(
        self,
        data: Series,
        method: Literal[
            "difference",
            "log",
            "square_root",
            "box_cox",
            "standard_scaler",
            "minmax_scaler",
            "robust_scaler",
            "maxabs_scaler",
        ],
        **kwargs: Any,
    ) -> None:
        self.data = data

        if method == "difference":
            self.transformed_data, self.scaler = self.difference(**kwargs)
        elif method == "log":
            self.transformed_data, self.scaler = self.log()
        elif method == "square_root":
            self.transformed_data, self.scaler = self.square_root()
        elif method == "box_cox":
            self.transformed_data, self.scaler = self.box_cox(**kwargs)
        elif method == "standard_scaler":
            self.transformed_data, self.scaler = self.standard_scaler()
        elif method == "minmax_scaler":
            self.transformed_data, self.scaler = self.minmax_scaler()
        elif method == "robust_scaler":
            self.transformed_data, self.scaler = self.robust_scaler()
        elif method == "maxabs_scaler":
            self.transformed_data, self.scaler = self.maxabs_scaler()
        else:
            raise NotImplementedError()

    def get_transformation(self) -> tuple:
        return self.transformed_data, self.scaler

    def difference(self, **kwargs: int) -> tuple[Series, str]:
        """Difference the time series data.

        Keword Arguments:
            periods (int): The number of periods to shift. Defaults to 1.

        Returns:
            pandas.Series: The differenced time series data.
        """
        periods = kwargs.get("periods", 1)
        transformed = self.data.diff(periods=periods).dropna()

        return (transformed, "difference")

    def log(self) -> tuple[Series, str]:
        """Take the natural logarithm of the time series data.

        Returns:
            pandas.Series: The natural logarithm of the time series data.
        """
        transformed = self.data.apply(lambda x: x if x <= 0 else np.log(x))
        return (transformed, "log")

    def square_root(self) -> tuple[Series, str]:
        """Take the square root of the time series data.

        Returns:
            pandas.Series: The square root of the time series data.
        """
        transformed = self.data.apply(lambda x: x if x <= 0 else np.sqrt(x))
        return (transformed, "square_root")

    def box_cox(self, **kwargs: float) -> tuple[Series, str]:
        """Apply the Box-Cox transformation to the time series data. Only works
            for all positive datasets!

        Keyword Arguments:
            lmbda (float): The transformation parameter. If not provided, it is automatically estimated.

        Returns:
            pandas.Series: The Box-Cox transformed time series data.
        """
        lmbda = kwargs.get("lmbda")

        if (self.data <= 0).any():
            message = (
                "Box-Cox transformation requires all values to be strictly positive."
            )
            raise ValueError(message)

        # Box-Cox always returns a tuple: (transformed_data, lmbda)
        if lmbda is not None:
            transformed_data = stats.boxcox(self.data, lmbda=lmbda)
        else:
            transformed_data, lmbda = stats.boxcox(self.data, lmbda=lmbda)

        # Return the transformed series and mark the method used
        transformed_series = Series(transformed_data, index=self.data.index)
        return transformed_series, f"box-cox (lambda={lmbda})"

    def standard_scaler(self) -> tuple[Series, Any]:
        """Normalize a pandas Series using StandardScaler."""
        scaler = StandardScaler()
        scaled_values = scaler.fit_transform(
            self.data.to_numpy().reshape(-1, 1)
        ).flatten()
        scaled_series = Series(scaled_values, index=self.data.index)
        return scaled_series, scaler

    def minmax_scaler(self) -> tuple[Series, Any]:
        """Normalize a pandas Series using MinMaxScaler."""
        scaler = MinMaxScaler()
        scaled_values = scaler.fit_transform(
            self.data.to_numpy().reshape(-1, 1)
        ).flatten()
        scaled_series = Series(scaled_values, index=self.data.index)
        return scaled_series, scaler

    def robust_scaler(self) -> tuple[Series, Any]:
        """Normalize a pandas Series using RobustScaler."""
        scaler = RobustScaler()
        scaled_values = scaler.fit_transform(
            self.data.to_numpy().reshape(-1, 1)
        ).flatten()
        scaled_series = Series(scaled_values, index=self.data.index)
        return scaled_series, scaler

    def maxabs_scaler(self) -> tuple[Series, Any]:
        """Normalize a pandas Series using MaxAbsScaler."""
        scaler = MaxAbsScaler()
        scaled_values = scaler.fit_transform(
            self.data.to_numpy().reshape(-1, 1)
        ).flatten()
        scaled_series = Series(scaled_values, index=self.data.index)
        return scaled_series, scaler
box_cox(**kwargs)

Apply the Box-Cox transformation to the time series data. Only works for all positive datasets!

Other Parameters:

Name Type Description
lmbda float

The transformation parameter. If not provided, it is automatically estimated.

Returns:

Type Description
tuple[Series, str]

pandas.Series: The Box-Cox transformed time series data.

Source code in gensor/processing/transform.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def box_cox(self, **kwargs: float) -> tuple[Series, str]:
    """Apply the Box-Cox transformation to the time series data. Only works
        for all positive datasets!

    Keyword Arguments:
        lmbda (float): The transformation parameter. If not provided, it is automatically estimated.

    Returns:
        pandas.Series: The Box-Cox transformed time series data.
    """
    lmbda = kwargs.get("lmbda")

    if (self.data <= 0).any():
        message = (
            "Box-Cox transformation requires all values to be strictly positive."
        )
        raise ValueError(message)

    # Box-Cox always returns a tuple: (transformed_data, lmbda)
    if lmbda is not None:
        transformed_data = stats.boxcox(self.data, lmbda=lmbda)
    else:
        transformed_data, lmbda = stats.boxcox(self.data, lmbda=lmbda)

    # Return the transformed series and mark the method used
    transformed_series = Series(transformed_data, index=self.data.index)
    return transformed_series, f"box-cox (lambda={lmbda})"
difference(**kwargs)

Difference the time series data.

Keword Arguments

periods (int): The number of periods to shift. Defaults to 1.

Returns:

Type Description
tuple[Series, str]

pandas.Series: The differenced time series data.

Source code in gensor/processing/transform.py
54
55
56
57
58
59
60
61
62
63
64
65
66
def difference(self, **kwargs: int) -> tuple[Series, str]:
    """Difference the time series data.

    Keword Arguments:
        periods (int): The number of periods to shift. Defaults to 1.

    Returns:
        pandas.Series: The differenced time series data.
    """
    periods = kwargs.get("periods", 1)
    transformed = self.data.diff(periods=periods).dropna()

    return (transformed, "difference")
log()

Take the natural logarithm of the time series data.

Returns:

Type Description
tuple[Series, str]

pandas.Series: The natural logarithm of the time series data.

Source code in gensor/processing/transform.py
68
69
70
71
72
73
74
75
def log(self) -> tuple[Series, str]:
    """Take the natural logarithm of the time series data.

    Returns:
        pandas.Series: The natural logarithm of the time series data.
    """
    transformed = self.data.apply(lambda x: x if x <= 0 else np.log(x))
    return (transformed, "log")
maxabs_scaler()

Normalize a pandas Series using MaxAbsScaler.

Source code in gensor/processing/transform.py
141
142
143
144
145
146
147
148
def maxabs_scaler(self) -> tuple[Series, Any]:
    """Normalize a pandas Series using MaxAbsScaler."""
    scaler = MaxAbsScaler()
    scaled_values = scaler.fit_transform(
        self.data.to_numpy().reshape(-1, 1)
    ).flatten()
    scaled_series = Series(scaled_values, index=self.data.index)
    return scaled_series, scaler
minmax_scaler()

Normalize a pandas Series using MinMaxScaler.

Source code in gensor/processing/transform.py
123
124
125
126
127
128
129
130
def minmax_scaler(self) -> tuple[Series, Any]:
    """Normalize a pandas Series using MinMaxScaler."""
    scaler = MinMaxScaler()
    scaled_values = scaler.fit_transform(
        self.data.to_numpy().reshape(-1, 1)
    ).flatten()
    scaled_series = Series(scaled_values, index=self.data.index)
    return scaled_series, scaler
robust_scaler()

Normalize a pandas Series using RobustScaler.

Source code in gensor/processing/transform.py
132
133
134
135
136
137
138
139
def robust_scaler(self) -> tuple[Series, Any]:
    """Normalize a pandas Series using RobustScaler."""
    scaler = RobustScaler()
    scaled_values = scaler.fit_transform(
        self.data.to_numpy().reshape(-1, 1)
    ).flatten()
    scaled_series = Series(scaled_values, index=self.data.index)
    return scaled_series, scaler
square_root()

Take the square root of the time series data.

Returns:

Type Description
tuple[Series, str]

pandas.Series: The square root of the time series data.

Source code in gensor/processing/transform.py
77
78
79
80
81
82
83
84
def square_root(self) -> tuple[Series, str]:
    """Take the square root of the time series data.

    Returns:
        pandas.Series: The square root of the time series data.
    """
    transformed = self.data.apply(lambda x: x if x <= 0 else np.sqrt(x))
    return (transformed, "square_root")
standard_scaler()

Normalize a pandas Series using StandardScaler.

Source code in gensor/processing/transform.py
114
115
116
117
118
119
120
121
def standard_scaler(self) -> tuple[Series, Any]:
    """Normalize a pandas Series using StandardScaler."""
    scaler = StandardScaler()
    scaled_values = scaler.fit_transform(
        self.data.to_numpy().reshape(-1, 1)
    ).flatten()
    scaled_series = Series(scaled_values, index=self.data.index)
    return scaled_series, scaler

testdata

Test data for Gensor package:

Attributes:

all (Path): The whole directory of test groundwater sensor data.
baro (Path): Timeseries of barometric pressure measurements.
pb01a (Path): Timeseries of a submerged logger.
pb02a_plain (Path): Timeseries from PB02A with the metadata removed.

all_paths: Traversable = resources.files(__name__) module-attribute

The whole directory of test groundwater sensor data.

baro: Traversable = all_paths / 'Barodiver_220427183008_BY222.csv' module-attribute

Timeseries of barometric pressure measurements.

pb01a: Traversable = all_paths / 'PB01A_moni_AV319_220427183019_AV319.csv' module-attribute

Timeseries of a submerged logger.

pb02a_plain: Traversable = all_paths / 'PB02A_plain.csv' module-attribute

Timeseries from PB02A with the metadata removed.