Skip to content

API reference

Dataset

Bases: BaseModel, Generic[T]

Store and operate on a collection of Timeseries.

Attributes:

Name Type Description
timeseries list[Timeseries]

A list of Timeseries objects.

Source code in gensor/core/dataset.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
class Dataset(pyd.BaseModel, Generic[T]):
    """Store and operate on a collection of Timeseries.

    Attributes:
        timeseries (list[Timeseries]): A list of Timeseries objects.
    """

    timeseries: list[T | None] = pyd.Field(default_factory=list)

    def __iter__(self) -> Any:
        """Allows to iterate directly over the dataset."""
        return iter(self.timeseries)

    def __len__(self) -> int:
        """Gives the number of timeseries in the Dataset."""
        return len(self.timeseries)

    def __repr__(self) -> str:
        return f"Dataset({len(self)})"

    def __getitem__(self, key: int | str | list | tuple) -> T | None | Dataset:
        """Retrieve Timeseries by integer index, location name, or (location,
        variable[, unit]) tuple.

        - ``dataset[0]`` returns the Timeseries at that position (a reference).
        - ``dataset["PB01A"]`` returns the Timeseries at that location, or a
          Dataset if the location has several timeseries (e.g. pressure and
          temperature). A list of names (``dataset[["PB01A", "PB02A"]]``) always
          returns a Dataset.
        - ``dataset["PB01A", "pressure"]`` (or ``["PB01A", "pressure", "cmh2o"]``)
          narrows by variable/unit, returning a single Timeseries when one matches.
          For full control use :meth:`filter` / :meth:`one`.

        !!! warning
            Integer indexing returns a reference to the timeseries. Location /
            tuple indexing returns copies (it delegates to ``.filter()``).

        Parameters:
            key (int | str | list | tuple): Position, location name, list of
                names, or a (location, variable[, unit]) tuple.

        Returns:
            Timeseries | Dataset: The matching timeseries or a dataset of them.

        Raises:
            IndexOutOfRangeError: If an integer index is out of range.
            KeyError: If no timeseries matches the given location(s)/filters.
        """
        if isinstance(key, tuple):
            location, variable, unit = (*key, None, None)[:3]
            result = self.filter(location=location, variable=variable, unit=unit)
            if isinstance(result, Dataset) and len(result) == 0:
                message = f"No timeseries found for {key!r}."
                raise KeyError(message)
            return result

        if isinstance(key, str | list):
            result = self.filter(location=key)
            if isinstance(result, Dataset) and len(result) == 0:
                message = f"No timeseries found for location(s) {key!r}."
                raise KeyError(message)
            return result

        try:
            return self.timeseries[key]
        except IndexError:
            raise IndexOutOfRangeError(key, len(self)) from None

    def __contains__(self, location: object) -> bool:
        """Return True if any timeseries in the dataset has the given location."""
        return any(ts is not None and ts.location == location for ts in self.timeseries)

    def get_locations(self) -> list:
        """List all unique locations in the dataset, preserving first-seen order."""
        locations: list = []
        for ts in self.timeseries:
            if ts is not None and ts.location not in locations:
                locations.append(ts.location)
        return locations

    @property
    def loc(self) -> DatasetIndexer:
        """Label-based selection applied to every timeseries in the dataset.

        ``ds.loc[start:end]`` returns a new Dataset where each timeseries is sliced by
        ``.loc[start:end]`` (e.g. a date range), forwarding the key to each series' own
        pandas ``.loc``. Empty slices yield empty timeseries (every series is kept).

        Examples:
            >>> ds.loc["2021-01-01":"2021-12-31"]  # doctest: +SKIP
        """
        return DatasetIndexer(self)

    @property
    def coverage(self) -> Coverage:
        """Coverage summary of the dataset.

        Renders as a per-timeseries table (records and time span per location /
        variable / sensor) and exposes :meth:`Coverage.plot` for a coverage timeline.

        Examples:
            >>> ds.coverage          # the table  # doctest: +SKIP
            >>> ds.coverage.plot()   # the timeline  # doctest: +SKIP
        """
        return Coverage(self)

    @property
    def info(self) -> pd.DataFrame:
        """Per-timeseries metadata summary, rendered as a table.

        One row per timeseries — ``location``, ``variable``, ``sensor``, the number of
        ``records``, and the ``start`` / ``end`` of its time span. A quick look at what
        a Dataset holds before processing it (the default repr only shows the timeseries
        count). See :attr:`coverage` for a plottable version and :func:`gensor.diff` to
        line this up across datasets.

        Examples:
            >>> ds.info  # doctest: +SKIP
        """
        columns = ["location", "variable", "sensor", "records", "start", "end"]
        table = pd.DataFrame(
            [
                {
                    "location": ts.location,
                    "variable": ts.variable,
                    "sensor": getattr(ts, "sensor", None),
                    "records": len(ts.ts),
                    "start": ts.ts.index.min(),
                    "end": ts.ts.index.max(),
                }
                for ts in self.timeseries
                if ts is not None and len(ts.ts) > 0
            ],
            columns=columns,
        )
        if not table.empty:
            table = table.sort_values(["location", "variable", "sensor"]).reset_index(
                drop=True
            )
        return table

    def diff(
        self,
        *others: Dataset,
        labels: list[str] | None = None,
        key: tuple[str, ...] = ("location", "variable"),
    ) -> CoverageDiff:
        """Compare this dataset's coverage with one or more others.

        Convenience wrapper over :func:`gensor.diff`. ``labels`` names this dataset
        and the others (default ``ds0``, ``ds1`` ...).

        Examples:
            >>> raw.diff(trimmed, labels=["raw", "trimmed"]).plot()  # doctest: +SKIP
        """
        datasets = [self, *others]
        if labels is None:
            labels = [f"ds{i}" for i in range(len(datasets))]
        return diff(dict(zip(labels, datasets, strict=True)), key=key)

    def one(self, **filters: Any) -> T:
        """Return exactly one matching Timeseries.

        A convenience over :meth:`filter` for when a single result is expected:
        it always returns a Timeseries (never a Dataset) and raises if zero or
        more than one timeseries match - avoiding the "is it a Timeseries or a
        Dataset?" ambiguity of :meth:`filter` / ``dataset[name]``.

        Parameters:
            **filters: Same keyword filters as :meth:`filter` (location,
                variable, unit, sensor, ...).

        Returns:
            Timeseries: The single matching timeseries.

        Raises:
            ValueError: If zero or more than one timeseries match the filters.
        """
        result = self.filter(**filters)
        if isinstance(result, BaseTimeseries):
            return result

        count = len(result)
        message = f"Expected exactly one timeseries matching {filters}, found {count}."
        raise ValueError(message)

    def add(self, other: T | list[T] | Dataset) -> Dataset:
        """Appends new Timeseries to the Dataset.

        If an equal Timeseries already exists, merge the new data into the existing
        Timeseries, dropping duplicate timestamps.

        Parameters:
            other (Timeseries): The Timeseries object to add.
        """

        # I need to check for BaseTimeseries instance in the add() method, but also
        # type hint VarType T.
        if isinstance(other, list | Dataset):
            for ts in other:
                if isinstance(ts, BaseTimeseries):
                    self._add_single_timeseries(ts)  # type: ignore[arg-type]

        elif isinstance(other, BaseTimeseries):
            self._add_single_timeseries(other)

        return self

    def _add_single_timeseries(self, ts: T) -> None:
        """Adds a single Timeseries to the Dataset or merges if an equal one exists."""
        for i, existing_ts in enumerate(self.timeseries):
            if existing_ts == ts:
                self.timeseries[i] = existing_ts.concatenate(ts)
                return

        self.timeseries.append(ts)

        return

    def filter(
        self,
        *predicates: Where,
        location: str | list | None = None,
        variable: str | list | None = None,
        unit: str | list | None = None,
        **kwargs: str | list,
    ) -> T | Dataset:
        """Return a Timeseries or a new Dataset filtered by station, sensor,
        and/or variable.

        Any of ``location``/``variable``/``unit`` (and the keyword attributes) may be
        a single value or a list of values, matching a timeseries when its attribute
        equals (or is in) the given value(s).

        Prefix a value with ``~`` to *negate* it - drop timeseries with that value
        rather than keep them (e.g. ``location="~PB16D"`` keeps everything except
        PB16D; ``sensor="~AV319"`` drops just that sensor). Positive and negated
        values may be mixed within one attribute and across attributes; for a given
        attribute a timeseries is kept when its value is in the positives (if any are
        given) **and** not in the negatives, and attributes are AND-ed together.

        For conditions the per-attribute keywords can't express - notably a *combined*
        match across attributes - pass one or more :class:`Where` predicates
        positionally. ``filter(~Where(location="PB03B", sensor="AV319"))`` drops only that
        sensor at that location (the whole combination negated as a unit), while
        ``filter(Where(location="PB16A") | Where(location="PB16B"))`` keeps either.
        Predicates are AND-ed with the keyword filters.

        Parameters:
            *predicates (Where): Predicate objects; all must match for a timeseries to
                be kept (combine with ``& | ~``).
            location (str | list, optional): The location name(s); ``~`` negates.
            variable (str | list, optional): The variable(s) being measured; ``~`` negates.
            unit (str | list, optional): Unit(s) of the measurement; ``~`` negates.
            **kwargs (str | list): Attributes of subclassed timeseries used for
                filtering (e.g., sensor, method); ``~`` negates.

        Returns:
            Timeseries | Dataset: A single Timeseries if exactly one match is found,
                                   or a new Dataset if multiple matches are found.
        """
        keep = self._matcher(predicates, location, variable, unit, kwargs)
        matching_timeseries = [ts for ts in self.timeseries if keep(ts)]

        if not matching_timeseries:
            return Dataset()

        if len(matching_timeseries) == 1:
            return matching_timeseries[0].model_copy(deep=True)

        return self.model_copy(update={"timeseries": matching_timeseries})

    def pop(
        self,
        *predicates: Where,
        location: str | list | None = None,
        variable: str | list | None = None,
        unit: str | list | None = None,
        **kwargs: str | list,
    ) -> T | Dataset:
        """Remove and return the matching timeseries, mutating the Dataset in place.

        Selection works exactly like :meth:`filter` (same ``location`` / ``variable`` /
        ``unit`` / keyword filters, ``~`` negation, and :class:`Where` predicates), but
        the matched timeseries are **removed** from this Dataset and returned **by
        reference** (not copied) - so you can alter them and ``add()`` them back in their
        new form::

            ts = ds.pop(location="PB03B", sensor="AV319")   # taken out of ds
            ts.ts = ts.ts - 300                             # edit the live series
            ds.add(ts)                                       # put it back, changed

        Parameters:
            *predicates (Where): Predicate objects; all must match (combine with ``& | ~``).
            location (str | list, optional): The location name(s); ``~`` negates.
            variable (str | list, optional): The variable(s) being measured; ``~`` negates.
            unit (str | list, optional): Unit(s) of the measurement; ``~`` negates.
            **kwargs (str | list): Other timeseries attributes to match (e.g., sensor).

        Returns:
            Timeseries | Dataset: A single Timeseries if exactly one match is removed, a
                new Dataset of them if several match, or an empty Dataset if none match
                (in which case nothing is removed).
        """
        keep = self._matcher(predicates, location, variable, unit, kwargs)

        popped: list[T | None] = []
        remaining: list[T | None] = []
        for ts in self.timeseries:
            (popped if keep(ts) else remaining).append(ts)

        self.timeseries = remaining

        if not popped:
            return Dataset()
        if len(popped) == 1:
            return popped[0]
        return Dataset(timeseries=popped)

    def _matcher(
        self,
        predicates: tuple,
        location: str | list | None,
        variable: str | list | None,
        unit: str | list | None,
        kwargs: dict,
    ) -> Any:
        """Build the ``keep(ts)`` predicate shared by :meth:`filter` and :meth:`pop`.

        A timeseries is kept when it matches every keyword filter (``~`` negation
        included) and every positional :class:`Where` predicate. ``None`` entries never
        match.
        """
        keywords = {"location": location, "variable": variable, "unit": unit, **kwargs}
        tests = [
            Where(**{attr: value})
            for attr, value in keywords.items()
            if value is not None
        ]
        tests.extend(predicates)

        def keep(ts: T | None) -> bool:
            return ts is not None and all(test(ts) for test in tests)

        return keep

    def to_sql(self, db: DatabaseConnection) -> None:
        """Save the entire timeseries to a SQLite database.

        Parameters:
            db (DatabaseConnection): SQLite database connection object.
        """
        for ts in self.timeseries:
            if ts is None:
                continue
            if len(ts.ts) == 0:
                logger.info(
                    f"Skipping empty timeseries (location={ts.location!r}) - "
                    "nothing to write to the database."
                )
                continue
            ts.to_sql(db)
        return

    def plot(
        self,
        facet: str = "variable",
        variable: str | list | None = None,
        ncols: int = 5,
        sharex: bool = False,
        include_outliers: bool = False,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, list] | dict[str, tuple[Figure, list]]:
        """Plot the dataset's timeseries, in one of two layouts.

        - ``facet="variable"`` (default): one subplot per variable (pressure,
          temperature, ...), every location's series overlaid on that axis. Returns
          ``(fig, axes)`` where ``axes`` is a list (one per variable).
        - ``facet="location"``: a **separate figure per variable**, each a grid with one
          panel per location (``ncols`` wide). Every location gets a panel - left empty
          if it has no (or empty) series for that variable - and unused trailing cells are
          hidden. Multiple sensors at a location are overlaid in the same panel, and a
          legend (labelled by **sensor serial**) is shown only then; single-series panels
          get no legend. Panels are titled by location and carry no x-label (the dates are
          on the shared/rotated ticks). Returns ``{variable: (fig, axes)}``.

        Parameters:
            facet (str): ``"variable"`` or ``"location"``.
            variable (str | list, optional): restrict to these variable(s); default is
                every unique variable in the dataset.
            ncols (int): panels per row for the ``facet="location"`` grid.
            sharex (bool): for ``facet="location"``, share the x-axis across all panels so
                every row and column is aligned to the same (full) time span - the
                longest-running series sets the extent, and empty panels span it too.
            include_outliers (bool): Whether to include outliers in the plot.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot().
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend().

        Returns:
            ``(fig, axes)`` for ``facet="variable"``; a ``{variable: (fig, axes)}`` dict
            for ``facet="location"``.
        """
        variables = (
            [variable]
            if isinstance(variable, str)
            else list(variable)
            if variable is not None
            else sorted({ts.variable for ts in self.timeseries if ts is not None})
        )

        if facet == "variable":
            return self._plot_by_variable(
                variables, include_outliers, plot_kwargs, legend_kwargs
            )
        if facet == "location":
            return self._plot_by_location(
                variables, ncols, sharex, include_outliers, plot_kwargs, legend_kwargs
            )

        message = f"facet must be 'variable' or 'location', got {facet!r}."
        raise ValueError(message)

    def _plot_by_variable(
        self,
        variables: list,
        include_outliers: bool,
        plot_kwargs: dict[str, Any] | None,
        legend_kwargs: dict[str, Any] | None,
    ) -> tuple[Figure, list]:
        """One subplot per variable, every location overlaid (see :meth:`plot`)."""
        fig, axs = plt.subplots(
            len(variables),
            1,
            figsize=(10, 5 * len(variables)),
            sharex=True,
            squeeze=False,
        )
        axes = list(axs.ravel())
        for ax, var in zip(axes, variables, strict=False):
            for ts in self.timeseries:
                if ts is not None and ts.variable == var and len(ts.ts) > 0:
                    ts.plot(
                        include_outliers=include_outliers,
                        ax=ax,
                        plot_kwargs=plot_kwargs,
                        legend_kwargs=legend_kwargs,
                    )
            ax.set_title(f"Timeseries for {var.capitalize()}")
            ax.set_xlabel("Time")
        fig.tight_layout()
        return fig, axes

    def _series_at(self, location: str, variable: str) -> list:
        """Non-empty timeseries at a given location and variable."""
        return [
            ts
            for ts in self.timeseries
            if ts is not None
            and ts.location == location
            and ts.variable == variable
            and len(ts.ts) > 0
        ]

    def _draw_location_panel(
        self,
        ax: Axes,
        series: list,
        include_outliers: bool,
        plot_kwargs: dict[str, Any],
        legend_kwargs: dict[str, Any],
    ) -> None:
        """Draw one location panel: overlay its series, style ticks, legend if shared."""
        for ts in series:
            ax.plot(ts.ts.index, ts.ts.to_numpy(), label=ts.sensor, **plot_kwargs)
            if include_outliers and ts.outliers is not None and len(ts.outliers) > 0:
                ax.scatter(ts.outliers.index, ts.outliers, color="red", s=5)
        ax.tick_params(labelsize=6)
        for label in ax.get_xticklabels():
            label.set_rotation(45)
            label.set_horizontalalignment("right")
        if len(series) > 1:  # only label sensors when they share a panel
            ax.legend(**legend_kwargs)

    def _plot_by_location(
        self,
        variables: list,
        ncols: int,
        sharex: bool,
        include_outliers: bool,
        plot_kwargs: dict[str, Any] | None,
        legend_kwargs: dict[str, Any] | None,
    ) -> dict[str, tuple[Figure, list]]:
        """A grid of one panel per location, a figure per variable (see :meth:`plot`)."""
        locations = self.get_locations()
        nrows = (len(locations) + ncols - 1) // ncols if locations else 1
        pkw = {"lw": 0.7, **(plot_kwargs or {})}
        lkw = {"fontsize": 7, **(legend_kwargs or {})}
        results: dict[str, tuple[Figure, list]] = {}
        for var in variables:
            fig, axs = plt.subplots(
                nrows,
                ncols,
                figsize=(4 * ncols, 2.3 * nrows),
                squeeze=False,
                sharex=sharex,
            )
            axes = list(axs.ravel())
            for ax, loc in zip(axes, locations, strict=False):
                ax.set_title(
                    loc, fontsize=8
                )  # every location keeps a panel, even if empty
                self._draw_location_panel(
                    ax, self._series_at(loc, var), include_outliers, pkw, lkw
                )
            for ax in axes[len(locations) :]:
                ax.set_visible(False)  # hide unused trailing cells
            fig.suptitle(f"{var.capitalize()} by location", fontsize=13)
            fig.tight_layout(rect=(0, 0, 1, 0.98))  # leave room for the suptitle
            results[var] = (fig, axes)
        return results

coverage: Coverage property

Coverage summary of the dataset.

Renders as a per-timeseries table (records and time span per location / variable / sensor) and exposes :meth:Coverage.plot for a coverage timeline.

Examples:

>>> ds.coverage          # the table
>>> ds.coverage.plot()   # the timeline

info: pd.DataFrame property

Per-timeseries metadata summary, rendered as a table.

One row per timeseries — location, variable, sensor, the number of records, and the start / end of its time span. A quick look at what a Dataset holds before processing it (the default repr only shows the timeseries count). See :attr:coverage for a plottable version and :func:gensor.diff to line this up across datasets.

Examples:

>>> ds.info

loc: DatasetIndexer property

Label-based selection applied to every timeseries in the dataset.

ds.loc[start:end] returns a new Dataset where each timeseries is sliced by .loc[start:end] (e.g. a date range), forwarding the key to each series' own pandas .loc. Empty slices yield empty timeseries (every series is kept).

Examples:

>>> ds.loc["2021-01-01":"2021-12-31"]

__contains__(location)

Return True if any timeseries in the dataset has the given location.

Source code in gensor/core/dataset.py
190
191
192
def __contains__(self, location: object) -> bool:
    """Return True if any timeseries in the dataset has the given location."""
    return any(ts is not None and ts.location == location for ts in self.timeseries)

__getitem__(key)

Retrieve Timeseries by integer index, location name, or (location, variable[, unit]) tuple.

  • dataset[0] returns the Timeseries at that position (a reference).
  • dataset["PB01A"] returns the Timeseries at that location, or a Dataset if the location has several timeseries (e.g. pressure and temperature). A list of names (dataset[["PB01A", "PB02A"]]) always returns a Dataset.
  • dataset["PB01A", "pressure"] (or ["PB01A", "pressure", "cmh2o"]) narrows by variable/unit, returning a single Timeseries when one matches. For full control use :meth:filter / :meth:one.

Warning

Integer indexing returns a reference to the timeseries. Location / tuple indexing returns copies (it delegates to .filter()).

Parameters:

Name Type Description Default
key int | str | list | tuple

Position, location name, list of names, or a (location, variable[, unit]) tuple.

required

Returns:

Type Description
T | None | Dataset

Timeseries | Dataset: The matching timeseries or a dataset of them.

Raises:

Type Description
IndexOutOfRangeError

If an integer index is out of range.

KeyError

If no timeseries matches the given location(s)/filters.

Source code in gensor/core/dataset.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def __getitem__(self, key: int | str | list | tuple) -> T | None | Dataset:
    """Retrieve Timeseries by integer index, location name, or (location,
    variable[, unit]) tuple.

    - ``dataset[0]`` returns the Timeseries at that position (a reference).
    - ``dataset["PB01A"]`` returns the Timeseries at that location, or a
      Dataset if the location has several timeseries (e.g. pressure and
      temperature). A list of names (``dataset[["PB01A", "PB02A"]]``) always
      returns a Dataset.
    - ``dataset["PB01A", "pressure"]`` (or ``["PB01A", "pressure", "cmh2o"]``)
      narrows by variable/unit, returning a single Timeseries when one matches.
      For full control use :meth:`filter` / :meth:`one`.

    !!! warning
        Integer indexing returns a reference to the timeseries. Location /
        tuple indexing returns copies (it delegates to ``.filter()``).

    Parameters:
        key (int | str | list | tuple): Position, location name, list of
            names, or a (location, variable[, unit]) tuple.

    Returns:
        Timeseries | Dataset: The matching timeseries or a dataset of them.

    Raises:
        IndexOutOfRangeError: If an integer index is out of range.
        KeyError: If no timeseries matches the given location(s)/filters.
    """
    if isinstance(key, tuple):
        location, variable, unit = (*key, None, None)[:3]
        result = self.filter(location=location, variable=variable, unit=unit)
        if isinstance(result, Dataset) and len(result) == 0:
            message = f"No timeseries found for {key!r}."
            raise KeyError(message)
        return result

    if isinstance(key, str | list):
        result = self.filter(location=key)
        if isinstance(result, Dataset) and len(result) == 0:
            message = f"No timeseries found for location(s) {key!r}."
            raise KeyError(message)
        return result

    try:
        return self.timeseries[key]
    except IndexError:
        raise IndexOutOfRangeError(key, len(self)) from None

__iter__()

Allows to iterate directly over the dataset.

Source code in gensor/core/dataset.py
131
132
133
def __iter__(self) -> Any:
    """Allows to iterate directly over the dataset."""
    return iter(self.timeseries)

__len__()

Gives the number of timeseries in the Dataset.

Source code in gensor/core/dataset.py
135
136
137
def __len__(self) -> int:
    """Gives the number of timeseries in the Dataset."""
    return len(self.timeseries)

add(other)

Appends new Timeseries to the Dataset.

If an equal Timeseries already exists, merge the new data into the existing Timeseries, dropping duplicate timestamps.

Parameters:

Name Type Description Default
other Timeseries

The Timeseries object to add.

required
Source code in gensor/core/dataset.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def add(self, other: T | list[T] | Dataset) -> Dataset:
    """Appends new Timeseries to the Dataset.

    If an equal Timeseries already exists, merge the new data into the existing
    Timeseries, dropping duplicate timestamps.

    Parameters:
        other (Timeseries): The Timeseries object to add.
    """

    # I need to check for BaseTimeseries instance in the add() method, but also
    # type hint VarType T.
    if isinstance(other, list | Dataset):
        for ts in other:
            if isinstance(ts, BaseTimeseries):
                self._add_single_timeseries(ts)  # type: ignore[arg-type]

    elif isinstance(other, BaseTimeseries):
        self._add_single_timeseries(other)

    return self

diff(*others, labels=None, key=('location', 'variable'))

Compare this dataset's coverage with one or more others.

Convenience wrapper over :func:gensor.diff. labels names this dataset and the others (default ds0, ds1 ...).

Examples:

>>> raw.diff(trimmed, labels=["raw", "trimmed"]).plot()
Source code in gensor/core/dataset.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def diff(
    self,
    *others: Dataset,
    labels: list[str] | None = None,
    key: tuple[str, ...] = ("location", "variable"),
) -> CoverageDiff:
    """Compare this dataset's coverage with one or more others.

    Convenience wrapper over :func:`gensor.diff`. ``labels`` names this dataset
    and the others (default ``ds0``, ``ds1`` ...).

    Examples:
        >>> raw.diff(trimmed, labels=["raw", "trimmed"]).plot()  # doctest: +SKIP
    """
    datasets = [self, *others]
    if labels is None:
        labels = [f"ds{i}" for i in range(len(datasets))]
    return diff(dict(zip(labels, datasets, strict=True)), key=key)

filter(*predicates, location=None, variable=None, unit=None, **kwargs)

Return a Timeseries or a new Dataset filtered by station, sensor, and/or variable.

Any of location/variable/unit (and the keyword attributes) may be a single value or a list of values, matching a timeseries when its attribute equals (or is in) the given value(s).

Prefix a value with ~ to negate it - drop timeseries with that value rather than keep them (e.g. location="~PB16D" keeps everything except PB16D; sensor="~AV319" drops just that sensor). Positive and negated values may be mixed within one attribute and across attributes; for a given attribute a timeseries is kept when its value is in the positives (if any are given) and not in the negatives, and attributes are AND-ed together.

For conditions the per-attribute keywords can't express - notably a combined match across attributes - pass one or more :class:Where predicates positionally. filter(~Where(location="PB03B", sensor="AV319")) drops only that sensor at that location (the whole combination negated as a unit), while filter(Where(location="PB16A") | Where(location="PB16B")) keeps either. Predicates are AND-ed with the keyword filters.

Parameters:

Name Type Description Default
*predicates Where

Predicate objects; all must match for a timeseries to be kept (combine with & | ~).

()
location str | list

The location name(s); ~ negates.

None
variable str | list

The variable(s) being measured; ~ negates.

None
unit str | list

Unit(s) of the measurement; ~ negates.

None
**kwargs str | list

Attributes of subclassed timeseries used for filtering (e.g., sensor, method); ~ negates.

{}

Returns:

Type Description
T | Dataset

Timeseries | Dataset: A single Timeseries if exactly one match is found, or a new Dataset if multiple matches are found.

Source code in gensor/core/dataset.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def filter(
    self,
    *predicates: Where,
    location: str | list | None = None,
    variable: str | list | None = None,
    unit: str | list | None = None,
    **kwargs: str | list,
) -> T | Dataset:
    """Return a Timeseries or a new Dataset filtered by station, sensor,
    and/or variable.

    Any of ``location``/``variable``/``unit`` (and the keyword attributes) may be
    a single value or a list of values, matching a timeseries when its attribute
    equals (or is in) the given value(s).

    Prefix a value with ``~`` to *negate* it - drop timeseries with that value
    rather than keep them (e.g. ``location="~PB16D"`` keeps everything except
    PB16D; ``sensor="~AV319"`` drops just that sensor). Positive and negated
    values may be mixed within one attribute and across attributes; for a given
    attribute a timeseries is kept when its value is in the positives (if any are
    given) **and** not in the negatives, and attributes are AND-ed together.

    For conditions the per-attribute keywords can't express - notably a *combined*
    match across attributes - pass one or more :class:`Where` predicates
    positionally. ``filter(~Where(location="PB03B", sensor="AV319"))`` drops only that
    sensor at that location (the whole combination negated as a unit), while
    ``filter(Where(location="PB16A") | Where(location="PB16B"))`` keeps either.
    Predicates are AND-ed with the keyword filters.

    Parameters:
        *predicates (Where): Predicate objects; all must match for a timeseries to
            be kept (combine with ``& | ~``).
        location (str | list, optional): The location name(s); ``~`` negates.
        variable (str | list, optional): The variable(s) being measured; ``~`` negates.
        unit (str | list, optional): Unit(s) of the measurement; ``~`` negates.
        **kwargs (str | list): Attributes of subclassed timeseries used for
            filtering (e.g., sensor, method); ``~`` negates.

    Returns:
        Timeseries | Dataset: A single Timeseries if exactly one match is found,
                               or a new Dataset if multiple matches are found.
    """
    keep = self._matcher(predicates, location, variable, unit, kwargs)
    matching_timeseries = [ts for ts in self.timeseries if keep(ts)]

    if not matching_timeseries:
        return Dataset()

    if len(matching_timeseries) == 1:
        return matching_timeseries[0].model_copy(deep=True)

    return self.model_copy(update={"timeseries": matching_timeseries})

get_locations()

List all unique locations in the dataset, preserving first-seen order.

Source code in gensor/core/dataset.py
194
195
196
197
198
199
200
def get_locations(self) -> list:
    """List all unique locations in the dataset, preserving first-seen order."""
    locations: list = []
    for ts in self.timeseries:
        if ts is not None and ts.location not in locations:
            locations.append(ts.location)
    return locations

one(**filters)

Return exactly one matching Timeseries.

A convenience over :meth:filter for when a single result is expected: it always returns a Timeseries (never a Dataset) and raises if zero or more than one timeseries match - avoiding the "is it a Timeseries or a Dataset?" ambiguity of :meth:filter / dataset[name].

Parameters:

Name Type Description Default
**filters Any

Same keyword filters as :meth:filter (location, variable, unit, sensor, ...).

{}

Returns:

Name Type Description
Timeseries T

The single matching timeseries.

Raises:

Type Description
ValueError

If zero or more than one timeseries match the filters.

Source code in gensor/core/dataset.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def one(self, **filters: Any) -> T:
    """Return exactly one matching Timeseries.

    A convenience over :meth:`filter` for when a single result is expected:
    it always returns a Timeseries (never a Dataset) and raises if zero or
    more than one timeseries match - avoiding the "is it a Timeseries or a
    Dataset?" ambiguity of :meth:`filter` / ``dataset[name]``.

    Parameters:
        **filters: Same keyword filters as :meth:`filter` (location,
            variable, unit, sensor, ...).

    Returns:
        Timeseries: The single matching timeseries.

    Raises:
        ValueError: If zero or more than one timeseries match the filters.
    """
    result = self.filter(**filters)
    if isinstance(result, BaseTimeseries):
        return result

    count = len(result)
    message = f"Expected exactly one timeseries matching {filters}, found {count}."
    raise ValueError(message)

plot(facet='variable', variable=None, ncols=5, sharex=False, include_outliers=False, plot_kwargs=None, legend_kwargs=None)

Plot the dataset's timeseries, in one of two layouts.

  • facet="variable" (default): one subplot per variable (pressure, temperature, ...), every location's series overlaid on that axis. Returns (fig, axes) where axes is a list (one per variable).
  • facet="location": a separate figure per variable, each a grid with one panel per location (ncols wide). Every location gets a panel - left empty if it has no (or empty) series for that variable - and unused trailing cells are hidden. Multiple sensors at a location are overlaid in the same panel, and a legend (labelled by sensor serial) is shown only then; single-series panels get no legend. Panels are titled by location and carry no x-label (the dates are on the shared/rotated ticks). Returns {variable: (fig, axes)}.

Parameters:

Name Type Description Default
facet str

"variable" or "location".

'variable'
variable str | list

restrict to these variable(s); default is every unique variable in the dataset.

None
ncols int

panels per row for the facet="location" grid.

5
sharex bool

for facet="location", share the x-axis across all panels so every row and column is aligned to the same (full) time span - the longest-running series sets the extent, and empty panels span it too.

False
include_outliers bool

Whether to include outliers in the plot.

False
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot().

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend().

None

Returns:

Type Description
tuple[Figure, list] | dict[str, tuple[Figure, list]]

(fig, axes) for facet="variable"; a {variable: (fig, axes)} dict

tuple[Figure, list] | dict[str, tuple[Figure, list]]

for facet="location".

Source code in gensor/core/dataset.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def plot(
    self,
    facet: str = "variable",
    variable: str | list | None = None,
    ncols: int = 5,
    sharex: bool = False,
    include_outliers: bool = False,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, list] | dict[str, tuple[Figure, list]]:
    """Plot the dataset's timeseries, in one of two layouts.

    - ``facet="variable"`` (default): one subplot per variable (pressure,
      temperature, ...), every location's series overlaid on that axis. Returns
      ``(fig, axes)`` where ``axes`` is a list (one per variable).
    - ``facet="location"``: a **separate figure per variable**, each a grid with one
      panel per location (``ncols`` wide). Every location gets a panel - left empty
      if it has no (or empty) series for that variable - and unused trailing cells are
      hidden. Multiple sensors at a location are overlaid in the same panel, and a
      legend (labelled by **sensor serial**) is shown only then; single-series panels
      get no legend. Panels are titled by location and carry no x-label (the dates are
      on the shared/rotated ticks). Returns ``{variable: (fig, axes)}``.

    Parameters:
        facet (str): ``"variable"`` or ``"location"``.
        variable (str | list, optional): restrict to these variable(s); default is
            every unique variable in the dataset.
        ncols (int): panels per row for the ``facet="location"`` grid.
        sharex (bool): for ``facet="location"``, share the x-axis across all panels so
            every row and column is aligned to the same (full) time span - the
            longest-running series sets the extent, and empty panels span it too.
        include_outliers (bool): Whether to include outliers in the plot.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot().
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend().

    Returns:
        ``(fig, axes)`` for ``facet="variable"``; a ``{variable: (fig, axes)}`` dict
        for ``facet="location"``.
    """
    variables = (
        [variable]
        if isinstance(variable, str)
        else list(variable)
        if variable is not None
        else sorted({ts.variable for ts in self.timeseries if ts is not None})
    )

    if facet == "variable":
        return self._plot_by_variable(
            variables, include_outliers, plot_kwargs, legend_kwargs
        )
    if facet == "location":
        return self._plot_by_location(
            variables, ncols, sharex, include_outliers, plot_kwargs, legend_kwargs
        )

    message = f"facet must be 'variable' or 'location', got {facet!r}."
    raise ValueError(message)

pop(*predicates, location=None, variable=None, unit=None, **kwargs)

Remove and return the matching timeseries, mutating the Dataset in place.

Selection works exactly like :meth:filter (same location / variable / unit / keyword filters, ~ negation, and :class:Where predicates), but the matched timeseries are removed from this Dataset and returned by reference (not copied) - so you can alter them and add() them back in their new form::

ts = ds.pop(location="PB03B", sensor="AV319")   # taken out of ds
ts.ts = ts.ts - 300                             # edit the live series
ds.add(ts)                                       # put it back, changed

Parameters:

Name Type Description Default
*predicates Where

Predicate objects; all must match (combine with & | ~).

()
location str | list

The location name(s); ~ negates.

None
variable str | list

The variable(s) being measured; ~ negates.

None
unit str | list

Unit(s) of the measurement; ~ negates.

None
**kwargs str | list

Other timeseries attributes to match (e.g., sensor).

{}

Returns:

Type Description
T | Dataset

Timeseries | Dataset: A single Timeseries if exactly one match is removed, a new Dataset of them if several match, or an empty Dataset if none match (in which case nothing is removed).

Source code in gensor/core/dataset.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def pop(
    self,
    *predicates: Where,
    location: str | list | None = None,
    variable: str | list | None = None,
    unit: str | list | None = None,
    **kwargs: str | list,
) -> T | Dataset:
    """Remove and return the matching timeseries, mutating the Dataset in place.

    Selection works exactly like :meth:`filter` (same ``location`` / ``variable`` /
    ``unit`` / keyword filters, ``~`` negation, and :class:`Where` predicates), but
    the matched timeseries are **removed** from this Dataset and returned **by
    reference** (not copied) - so you can alter them and ``add()`` them back in their
    new form::

        ts = ds.pop(location="PB03B", sensor="AV319")   # taken out of ds
        ts.ts = ts.ts - 300                             # edit the live series
        ds.add(ts)                                       # put it back, changed

    Parameters:
        *predicates (Where): Predicate objects; all must match (combine with ``& | ~``).
        location (str | list, optional): The location name(s); ``~`` negates.
        variable (str | list, optional): The variable(s) being measured; ``~`` negates.
        unit (str | list, optional): Unit(s) of the measurement; ``~`` negates.
        **kwargs (str | list): Other timeseries attributes to match (e.g., sensor).

    Returns:
        Timeseries | Dataset: A single Timeseries if exactly one match is removed, a
            new Dataset of them if several match, or an empty Dataset if none match
            (in which case nothing is removed).
    """
    keep = self._matcher(predicates, location, variable, unit, kwargs)

    popped: list[T | None] = []
    remaining: list[T | None] = []
    for ts in self.timeseries:
        (popped if keep(ts) else remaining).append(ts)

    self.timeseries = remaining

    if not popped:
        return Dataset()
    if len(popped) == 1:
        return popped[0]
    return Dataset(timeseries=popped)

to_sql(db)

Save the entire timeseries to a SQLite database.

Parameters:

Name Type Description Default
db DatabaseConnection

SQLite database connection object.

required
Source code in gensor/core/dataset.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def to_sql(self, db: DatabaseConnection) -> None:
    """Save the entire timeseries to a SQLite database.

    Parameters:
        db (DatabaseConnection): SQLite database connection object.
    """
    for ts in self.timeseries:
        if ts is None:
            continue
        if len(ts.ts) == 0:
            logger.info(
                f"Skipping empty timeseries (location={ts.location!r}) - "
                "nothing to write to the database."
            )
            continue
        ts.to_sql(db)
    return

Timeseries

Bases: BaseTimeseries

Timeseries of groundwater sensor data.

Attributes:

Name Type Description
ts Series

The timeseries data.

variable Literal['temperature', 'pressure', 'conductivity', 'flux']

The type of the measurement.

unit Literal['degC', 'mmH2O', 'mS/cm', 'm/s']

The unit of the measurement.

sensor str

The serial number of the sensor.

sensor_alt float

Altitude of the sensor (ncessary to compute groundwater levels).

Source code in gensor/core/timeseries.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Timeseries(BaseTimeseries):
    """Timeseries of groundwater sensor data.

    Attributes:
        ts (pd.Series): The timeseries data.
        variable (Literal['temperature', 'pressure', 'conductivity', 'flux']):
            The type of the measurement.
        unit (Literal['degC', 'mmH2O', 'mS/cm', 'm/s']): The unit of
            the measurement.
        sensor (str): The serial number of the sensor.
        sensor_alt (float): Altitude of the sensor (ncessary to compute groundwater levels).
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    sensor: str | None = None
    sensor_alt: float | None = None

    def __eq__(self, other: object) -> bool:
        """Check equality based on location, sensor, variable, unit and sensor_alt."""
        if not isinstance(other, Timeseries):
            return NotImplemented

        if not super().__eq__(other):
            return False

        return self.sensor == other.sensor and self.sensor_alt == other.sensor_alt

    def plot(
        self,
        include_outliers: bool = False,
        ax: Axes | None = None,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plots the timeseries data.

        Parameters:
            include_outliers (bool): Whether to include outliers in the plot.
            ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
                If None, a new figure and axes are created.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

        Returns:
            (fig, ax): Matplotlib figure and axes to allow further customization.
        """
        fig, ax = super().plot(
            include_outliers=include_outliers,
            ax=ax,
            plot_kwargs=plot_kwargs,
            legend_kwargs=legend_kwargs,
        )

        ax.set_title(f"{self.variable.capitalize()} at {self.location} ({self.sensor})")

        return fig, ax

__eq__(other)

Check equality based on location, sensor, variable, unit and sensor_alt.

Source code in gensor/core/timeseries.py
40
41
42
43
44
45
46
47
48
def __eq__(self, other: object) -> bool:
    """Check equality based on location, sensor, variable, unit and sensor_alt."""
    if not isinstance(other, Timeseries):
        return NotImplemented

    if not super().__eq__(other):
        return False

    return self.sensor == other.sensor and self.sensor_alt == other.sensor_alt

plot(include_outliers=False, ax=None, plot_kwargs=None, legend_kwargs=None)

Plots the timeseries data.

Parameters:

Name Type Description Default
include_outliers bool

Whether to include outliers in the plot.

False
ax Axes

Matplotlib axes object to plot on. If None, a new figure and axes are created.

None
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes to allow further customization.

Source code in gensor/core/timeseries.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def plot(
    self,
    include_outliers: bool = False,
    ax: Axes | None = None,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plots the timeseries data.

    Parameters:
        include_outliers (bool): Whether to include outliers in the plot.
        ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
            If None, a new figure and axes are created.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

    Returns:
        (fig, ax): Matplotlib figure and axes to allow further customization.
    """
    fig, ax = super().plot(
        include_outliers=include_outliers,
        ax=ax,
        plot_kwargs=plot_kwargs,
        legend_kwargs=legend_kwargs,
    )

    ax.set_title(f"{self.variable.capitalize()} at {self.location} ({self.sensor})")

    return fig, ax

Where

A composable predicate over a Timeseries' attributes, for Dataset.filter/drop.

A leaf Where(**conditions) matches a Timeseries when every condition holds; each condition matches when the timeseries' attribute equals (or is in, for a list) the given value(s), and a leading ~ on a value negates that single condition. Compose leaves with & (and), | (or) and ~ (not) to express anything the per-attribute keyword filters can't - in particular a combined exclusion::

~Where(location="PB03B", sensor="AV319")            # not (PB03B and AV319)
Where(variable="pressure") & ~Where(location="PB16D")
Where(location="PB16A") | Where(location="PB16B")

Pass instances straight to Dataset.filter (keep matches) or Dataset.drop (remove matches); they are AND-ed with the keyword filters in the same call.

Source code in gensor/core/dataset.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class Where:
    """A composable predicate over a Timeseries' attributes, for ``Dataset.filter``/``drop``.

    A leaf ``Where(**conditions)`` matches a Timeseries when **every** condition holds;
    each condition matches when the timeseries' attribute equals (or is in, for a list)
    the given value(s), and a leading ``~`` on a value negates that single condition.
    Compose leaves with ``&`` (and), ``|`` (or) and ``~`` (not) to express anything the
    per-attribute keyword filters can't - in particular a *combined* exclusion::

        ~Where(location="PB03B", sensor="AV319")            # not (PB03B and AV319)
        Where(variable="pressure") & ~Where(location="PB16D")
        Where(location="PB16A") | Where(location="PB16B")

    Pass instances straight to ``Dataset.filter`` (keep matches) or ``Dataset.drop``
    (remove matches); they are AND-ed with the keyword filters in the same call.
    """

    def __init__(self, _test: Any = None, **conditions: str | list) -> None:
        self._conditions = conditions
        self._test = _test if _test is not None else self._compile(conditions)

    @staticmethod
    def _compile(conditions: dict) -> Any:
        specs = {attr: _split(value) for attr, value in conditions.items()}

        def test(ts: Any) -> bool:
            for attr, (include, exclude) in specs.items():
                if not hasattr(ts, attr):
                    message = (
                        f"'{ts.__class__.__name__}' object has no attribute '{attr}'"
                    )
                    raise AttributeError(message)
                actual = getattr(ts, attr)
                if (include and actual not in include) or actual in exclude:
                    return False
            return True

        return test

    def __call__(self, ts: Any) -> bool:
        return bool(self._test(ts))

    def __invert__(self) -> Where:
        return Where(_test=lambda ts: not self._test(ts))

    def __and__(self, other: Where) -> Where:
        return Where(_test=lambda ts: self._test(ts) and other(ts))

    def __or__(self, other: Where) -> Where:
        return Where(_test=lambda ts: self._test(ts) or other(ts))

    def __repr__(self) -> str:
        body = ", ".join(f"{k}={v!r}" for k, v in self._conditions.items())
        return f"Where({body})"

compensate(raw, barometric, alignment_period='h', threshold_wc=0.025, fieldwork_dates=None, interpolate_method=None)

Compensate raw sensor pressure to groundwater head (m asl).

Computes the water column (see :func:water_column) and adds the sensor altitude.

Parameters:

Name Type Description Default
raw Timeseries | Dataset

Raw sensor timeseries

required
barometric Timeseries | float

Barometric pressure timeseries or a single float value. If a float value is provided, it is assumed to be in cmH2O.

required
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

'h'
threshold_wc float | None

Lower cutoff (in m) for the water column; records at or below it are dropped. Defaults to 0.025 m (25 mm) and is always applied; lower it to keep shallower columns, or set 0 to drop only negatives. Negative water columns are always dropped regardless, being physically impossible.

0.025
fieldwork_dates Dict[str, list]

Dictionary of location name and a list of fieldwork days. All records on the fieldwork day are set to None.

None
interpolate_method str

String representing the interpolate method as in pd.Series.interpolate() method.

None

Returns:

Type Description
Timeseries | Dataset | None

Timeseries | Dataset | None: head (variable 'head', unit 'm asl').

Source code in gensor/processing/compensation.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def compensate(
    raw: Timeseries | Dataset,
    barometric: Timeseries | float,
    alignment_period: Literal[
        "D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"
    ] = "h",
    threshold_wc: float | None = 0.025,
    fieldwork_dates: dict | None = None,
    interpolate_method: str | None = None,
) -> Timeseries | Dataset | None:
    """Compensate raw sensor pressure to groundwater head (m asl).

    Computes the water column (see :func:`water_column`) and adds the sensor altitude.

    Parameters:
        raw (Timeseries | Dataset): Raw sensor timeseries
        barometric (Timeseries | float): Barometric pressure timeseries or a single
            float value. If a float value is provided, it is assumed to be in cmH2O.
        alignment_period (Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']): The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float | None): Lower cutoff (in m) for the water column; records at
            or below it are dropped. Defaults to 0.025 m (25 mm) and is always applied;
            lower it to keep shallower columns, or set 0 to drop only negatives. Negative
            water columns are always dropped regardless, being physically impossible.
        fieldwork_dates (Dict[str, list]): Dictionary of location name and a list of
            fieldwork days. All records on the fieldwork day are set to None.
        interpolate_method (str): String representing the interpolate method as in
            pd.Series.interpolate() method.

    Returns:
        Timeseries | Dataset | None: head (variable 'head', unit 'm asl').
    """
    return _apply(
        "compensate",
        raw,
        barometric,
        alignment_period,
        threshold_wc,
        fieldwork_dates,
        interpolate_method,
    )

diff(datasets, key=('location', 'variable'))

Compare the coverage of two or more datasets.

Parameters:

Name Type Description Default
datasets dict[str, Dataset] | list[Dataset]

a mapping {label: Dataset} (preferred - labels name the columns and legend) or a list of datasets (auto-labelled ds0, ds1 ...).

required
key tuple[str, ...]

attributes used to align series across datasets (default ("location", "variable")).

('location', 'variable')

Returns:

Name Type Description
CoverageDiff CoverageDiff

renders as a comparison table; .plot() draws the timeline.

Source code in gensor/core/dataset.py
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
def diff(
    datasets: dict[str, Dataset] | list[Dataset],
    key: tuple[str, ...] = ("location", "variable"),
) -> CoverageDiff:
    """Compare the coverage of two or more datasets.

    Parameters:
        datasets: a mapping ``{label: Dataset}`` (preferred - labels name the columns
            and legend) or a list of datasets (auto-labelled ``ds0``, ``ds1`` ...).
        key: attributes used to align series across datasets (default
            ``("location", "variable")``).

    Returns:
        CoverageDiff: renders as a comparison table; ``.plot()`` draws the timeline.
    """
    if isinstance(datasets, Dataset):
        message = (
            "Pass two or more datasets to diff(), e.g. diff({'a': ds1, 'b': ds2})."
        )
        raise TypeError(message)
    if not isinstance(datasets, dict):
        datasets = {f"ds{i}": d for i, d in enumerate(datasets)}
    return CoverageDiff(datasets, key=key)

read_from_csv(path, file_format='vanessen', **kwargs)

Loads the data from csv files with given file_format and returns a list of Timeseries objects.

Parameters:

Name Type Description Default
path Path

The path to the file or directory containing the files.

required
**kwargs dict

Optional keyword arguments passed to the parsers: * serial_number_pattern (str): The regex pattern to extract the serial number from the file. * location_pattern (str): The regex pattern to extract the station from the file. * col_names (list): The column names for the dataframe. * location (str): Name of the location of the timeseries. * sensor (str): Sensor serial number.

{}
Source code in gensor/io/read.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def read_from_csv(
    path: Path, file_format: Literal["vanessen", "plain"] = "vanessen", **kwargs: Any
) -> Dataset | Timeseries:
    """Loads the data from csv files with given file_format and returns a list of Timeseries objects.

    Parameters:
        path (Path): The path to the file or directory containing the files.
        **kwargs (dict): Optional keyword arguments passed to the parsers:
            * serial_number_pattern (str): The regex pattern to extract the serial number from the file.
            * location_pattern (str): The regex pattern to extract the station from the file.
            * col_names (list): The column names for the dataframe.
            * location (str): Name of the location of the timeseries.
            * sensor (str): Sensor serial number.
    """

    parsers = {
        "vanessen": parse_vanessen_csv,
        "plain": parse_plain,
        # more parser to be implemented
    }

    if not isinstance(path, Path):
        message = "The path argument must be a Path object."
        raise TypeError(message)

    if path.is_dir() and not any(
        file.is_file() and file.suffix.lower() == ".csv" for file in path.iterdir()
    ):
        logger.info("No CSV files found. Operation skipped.")
        return Dataset()

    files = (
        [
            file
            for file in path.iterdir()
            if file.is_file() and file.suffix.lower() == ".csv"
        ]
        if path.is_dir()
        else [path]
        if path.suffix.lower() == ".csv"
        else []
    )

    if not files:
        logger.info("No CSV files found. Operation skipped.")
        return Dataset()

    parser = parsers[file_format]

    ds: Dataset = Dataset()

    for f in files:
        logger.info(f"Loading file: {f}")
        ts_in_file = parser(f, **kwargs)
        ds.add(ts_in_file)

    # If there is only one Timeseries in Dataset (as in the condition), ds[0] will always
    # be a Timeseries; so the line below does not introduce potential None in the return
    return ds[0] if len(ds) == 1 else ds  # type: ignore[return-value]

read_from_sql(db, load_all=True, location=None, variable=None, unit=None, timestamp_start=None, timestamp_stop=None, **kwargs)

Returns the timeseries or a dataset from a SQL database.

Parameters:

Name Type Description Default
db DatabaseConnection

The database connection object.

required
load_all bool

Whether to load all timeseries from the database.

True
location str

The station name.

None
variable str

The measurement type.

None
unit str

The unit of the measurement.

None
timestamp_start Timestamp

Start timestamp filter.

None
timestamp_stop Timestamp

End timestamp filter.

None
**kwargs dict

Any additional filters matching attributes of the particular timeseries.

{}

Returns:

Name Type Description
Dataset Timeseries | Dataset

Dataset with retrieved objects or an empty Dataset.

Source code in gensor/io/read.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def read_from_sql(
    db: DatabaseConnection,
    load_all: bool = True,
    location: str | None = None,
    variable: str | None = None,
    unit: str | None = None,
    timestamp_start: pd.Timestamp | None = None,
    timestamp_stop: pd.Timestamp | None = None,
    **kwargs: dict,
) -> Timeseries | Dataset:
    """Returns the timeseries or a dataset from a SQL database.

    Parameters:
        db (DatabaseConnection): The database connection object.
        load_all (bool): Whether to load all timeseries from the database.
        location (str): The station name.
        variable (str): The measurement type.
        unit (str): The unit of the measurement.
        timestamp_start (pd.Timestamp, optional): Start timestamp filter.
        timestamp_stop (pd.Timestamp, optional): End timestamp filter.
        **kwargs (dict): Any additional filters matching attributes of the particular
            timeseries.

    Returns:
        Dataset: Dataset with retrieved objects or an empty Dataset.
    """

    def _read_data_from_schema(schema_name: str) -> Any:
        """Read data from the table and apply the timestamp filter.

        Parameters:
            schema_name (str): name of the schema in SQLite database.

        Returns:
            pd.Series: results of the query or an empty pd.Series if none are found.
        """
        with db as con:
            schema = db.metadata.tables[schema_name]
            data_query = select(schema)

            if timestamp_start or timestamp_stop:
                if timestamp_start:
                    data_query = data_query.where(schema.c.timestamp >= timestamp_start)
                if timestamp_stop:
                    data_query = data_query.where(schema.c.timestamp <= timestamp_stop)

            ts = pd.read_sql(
                data_query,
                con=con,
                parse_dates={"timestamp": "%Y-%m-%dT%H:%M:%S%z"},
                index_col="timestamp",
            ).squeeze()

        if ts.empty:
            message = f"No data found in table {schema_name}"
            logger.warning(message)

        return ts.sort_index()

    def _create_object(data: pd.Series, metadata: dict) -> Any:
        """Create the appropriate object for timeseries."""

        core_metadata = {
            "location": metadata["location"],
            "variable": metadata["variable"],
            "unit": metadata["unit"],
        }

        extra_metadata = metadata.get("extra", {})

        ts_metadata = {**core_metadata, **extra_metadata}

        cls = metadata["cls"]
        module_name, class_name = cls.rsplit(".", 1)
        module = import_module(module_name)

        TimeseriesClass = getattr(module, class_name)
        ts_object = TimeseriesClass(ts=data, **ts_metadata)

        return ts_object

    metadata_df = (
        db.get_timeseries_metadata(
            location=location, variable=variable, unit=unit, **kwargs
        )
        if not load_all
        else db.get_timeseries_metadata()
    )

    if metadata_df.empty:
        message = "No schemas matched the specified filters."
        raise ValueError(message)

    timeseries_list = []

    for row in metadata_df.to_dict(orient="records"):
        try:
            schema_name = row.pop("table_name")
            data = _read_data_from_schema(schema_name)
            timeseries_obj = _create_object(data, row)
            timeseries_list.append(timeseries_obj)
        except (ValueError, TypeError):
            logger.exception(f"Skipping schema {schema_name} due to error.")

    return Dataset(timeseries=timeseries_list) if timeseries_list else Dataset()

set_log_level(level)

Set the logging level for the package.

Source code in gensor/log.py
4
5
6
7
def set_log_level(level: str) -> None:
    """Set the logging level for the package."""
    logger = logging.getLogger("gensor")
    logger.setLevel(level.upper())

water_column(raw, barometric, alignment_period='h', threshold_wc=0.025, fieldwork_dates=None, interpolate_method=None)

Barometrically compensate raw sensor pressure to the water column above the sensor.

This is the first step of :func:compensate exposed on its own: subtract the barometric pressure, convert to mH2O, mask fieldwork days, and drop out-of-water records (see threshold_wc) - without adding the sensor altitude, so the result is the water column height in metres (variable 'water_column', unit 'm') rather than head.

Parameters:

Name Type Description Default
raw Timeseries | Dataset

Raw sensor timeseries

required
barometric Timeseries | float

Barometric pressure timeseries or a single float value. If a float value is provided, it is assumed to be in cmH2O.

required
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

'h'
threshold_wc float | None

Lower cutoff (in m) for the water column; records at or below it are dropped. Defaults to 0.025 m (25 mm) and is always applied; lower it to keep shallower columns, or set 0 to drop only negatives. Negative water columns are always dropped regardless, being physically impossible.

0.025
fieldwork_dates Dict[str, list]

Dictionary of location name and a list of fieldwork days. All records on the fieldwork day are set to None.

None
interpolate_method str

String representing the interpolate method as in pd.Series.interpolate() method.

None

Returns:

Type Description
Timeseries | Dataset | None

Timeseries | Dataset | None: the water column height (variable 'water_column', unit 'm').

Source code in gensor/processing/compensation.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def water_column(
    raw: Timeseries | Dataset,
    barometric: Timeseries | float,
    alignment_period: Literal[
        "D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"
    ] = "h",
    threshold_wc: float | None = 0.025,
    fieldwork_dates: dict | None = None,
    interpolate_method: str | None = None,
) -> Timeseries | Dataset | None:
    """Barometrically compensate raw sensor pressure to the water column above the sensor.

    This is the first step of :func:`compensate` exposed on its own: subtract the
    barometric pressure, convert to mH2O, mask fieldwork days, and drop out-of-water
    records (see ``threshold_wc``) - without adding the sensor altitude, so the result is
    the water column height in metres (variable 'water_column', unit 'm') rather than head.

    Parameters:
        raw (Timeseries | Dataset): Raw sensor timeseries
        barometric (Timeseries | float): Barometric pressure timeseries or a single
            float value. If a float value is provided, it is assumed to be in cmH2O.
        alignment_period (Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']): The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float | None): Lower cutoff (in m) for the water column; records at
            or below it are dropped. Defaults to 0.025 m (25 mm) and is always applied;
            lower it to keep shallower columns, or set 0 to drop only negatives. Negative
            water columns are always dropped regardless, being physically impossible.
        fieldwork_dates (Dict[str, list]): Dictionary of location name and a list of
            fieldwork days. All records on the fieldwork day are set to None.
        interpolate_method (str): String representing the interpolate method as in
            pd.Series.interpolate() method.

    Returns:
        Timeseries | Dataset | None: the water column height (variable 'water_column',
            unit 'm').
    """
    return _apply(
        "water_column",
        raw,
        barometric,
        alignment_period,
        threshold_wc,
        fieldwork_dates,
        interpolate_method,
    )

analysis

outliers

OutlierDetection

Detecting outliers in groundwater timeseries data.

Each method in this class returns a pandas.Series containing predicted outliers in the dataset.

Methods:

Name Description
iqr

Use interquartile range (IQR).

zscore

Use the z-score method.

isolation_forest

Using the isolation forest algorithm.

lof

Using the local outlier factor (LOF) method.

Source code in gensor/analysis/outliers.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class OutlierDetection:
    """Detecting outliers in groundwater timeseries data.

    Each method in this class returns a pandas.Series containing predicted outliers in
    the dataset.

    Methods:
        iqr: Use interquartile range (IQR).
        zscore: Use the z-score method.
        isolation_forest: Using the isolation forest algorithm.
        lof: Using the local outlier factor (LOF) method.
    """

    def __init__(
        self,
        data: Series,
        method: Literal["iqr", "zscore", "isolation_forest", "lof"],
        rolling: bool,
        window: int,
        **kwargs: Any,
    ) -> None:
        """Find outliers in a time series using the specified method, with an option for rolling window."""

        FUNCS: dict[str, Callable] = {
            "iqr": self.iqr,
            "zscore": self.zscore,
            "isolation_forest": self.isolation_forest,
            "lof": self.lof,
        }

        method_func = FUNCS[method]

        if method in ["iqr", "zscore"]:
            # For 'iqr' and 'zscore' methods
            y = (
                kwargs.get("k", 1.5)
                if method == "iqr"
                else kwargs.get("threshold", 3.0)
            )
            if rolling:
                roll = data.rolling(window=window)
                mask = roll.apply(lambda x: method_func(x, y, rolling=True), raw=True)
            else:
                mask = method_func(data.to_numpy(), y, rolling=False)

            bool_mask = mask.astype(bool)
            bool_mask_series = Series(bool_mask, index=data.index)
            self.outliers = data[bool_mask_series]

        else:
            # For 'isolation_forest' and 'lof' methods
            self.outliers = method_func(data, **kwargs)

    @staticmethod
    def iqr(data: np.ndarray, k: float, rolling: bool) -> np.ndarray:
        """Use interquartile range (IQR).

        Parameters:
            data (pandas.Series): The time series data.

        Keyword Args:
            k (float): The multiplier for the IQR to define the range. Defaults to 1.5.

        Returns:
            np.ndarray: Binary mask representing the outliers as 1.
        """

        Q1 = np.percentile(data, 0.25)
        Q3 = np.percentile(data, 0.75)
        IQR = Q3 - Q1

        lower_bound = Q1 - k * IQR
        upper_bound = Q3 + k * IQR

        if rolling:
            return (
                np.array([1])
                if (data[-1] < lower_bound or data[-1] > upper_bound)
                else np.array([0])
            )

        return np.where((data < lower_bound) | (data > upper_bound), 1, 0)

    @staticmethod
    def zscore(data: np.ndarray, threshold: float, rolling: bool) -> np.ndarray:
        """Use the z-score method.

        Parameters:
            data (pandas.Series): The time series data.

        Keyword Args:
            threshold (float): The threshold for the z-score method. Defaults to 3.0.

        Returns:
            pandas.Series: Binary mask representing outliers.
        """

        mean = np.mean(data)
        std_dev = np.std(data)

        z_scores = np.abs((data - mean) / std_dev)

        if rolling:
            return np.array([1]) if z_scores[-1] > threshold else np.array([0])
        return np.where(z_scores > threshold, 1, 0)

    def isolation_forest(self, data: Series, **kwargs: Any) -> Series:
        """Using the isolation forest algorithm.

        Parameters:
            data (pandas.Series): The time series data.

        Keyword Args:
            n_estimators (int): The number of base estimators in the ensemble. Defaults to 100.
            max_samples (int | 'auto' | float): The number of samples to draw from X to train each base estimator. Defaults to 'auto'.
            contamination (float): The proportion of outliers in the data. Defaults to 0.01.
            max_features (int | float): The number of features to draw from X to train each base estimator. Defaults to 1.0.
            bootstrap (bool): Whether to use bootstrapping when sampling the data. Defaults to False.
            n_jobs (int): The number of jobs to run in parallel. Defaults to 1.
            random_state (int | RandomState | None): The random state to use. Defaults to None.
            verbose (int): The verbosity level. Defaults to 0.
            warm_start (bool): Whether to reuse the solution of the previous call to fit and add more estimators to the ensemble. Defaults to False.

        Note:
            For details on kwargs see: sklearn.ensemble.IsolationForest.
        """

        X = data.to_numpy().reshape(-1, 1)

        clf = IsolationForest(**kwargs)
        clf.fit(X)

        is_outlier = clf.predict(X)
        outliers: Series = data[is_outlier == -1]

        return outliers

    def lof(self, data: Series, **kwargs: Any) -> Series:
        """Using the local outlier factor (LOF) method.

        Parameters:
            data (pandas.Series): The time series data.

        Keyword Args:
            n_neighbors (int): The number of neighbors to consider for each sample. Defaults to 20.
            algorithm (str): The algorithm to use. Either 'auto', 'ball_tree', 'kd_tree' or 'brute'. Defaults to 'auto'.
            leaf_size (int): The leaf size of the tree. Defaults to 30.
            metric (str): The distance metric to use. Defaults to 'minkowski'.
            p (int): The power parameter for the Minkowski metric. Defaults to 2.
            contamination (float): The proportion of outliers in the data. Defaults to 0.01.
            novelty (bool): Whether to consider the samples as normal or outliers. Defaults to False.
            n_jobs (int): The number of jobs to run in parallel. Defaults to 1.
        Note:
            For details on kwargs see: sklearn.neighbors.LocalOutlierFactor.
        """

        X = data.to_numpy().reshape(-1, 1)

        clf = LocalOutlierFactor(**kwargs)

        is_outlier = clf.fit_predict(X)
        outliers: Series = data[is_outlier == -1]

        return outliers
__init__(data, method, rolling, window, **kwargs)

Find outliers in a time series using the specified method, with an option for rolling window.

Source code in gensor/analysis/outliers.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    data: Series,
    method: Literal["iqr", "zscore", "isolation_forest", "lof"],
    rolling: bool,
    window: int,
    **kwargs: Any,
) -> None:
    """Find outliers in a time series using the specified method, with an option for rolling window."""

    FUNCS: dict[str, Callable] = {
        "iqr": self.iqr,
        "zscore": self.zscore,
        "isolation_forest": self.isolation_forest,
        "lof": self.lof,
    }

    method_func = FUNCS[method]

    if method in ["iqr", "zscore"]:
        # For 'iqr' and 'zscore' methods
        y = (
            kwargs.get("k", 1.5)
            if method == "iqr"
            else kwargs.get("threshold", 3.0)
        )
        if rolling:
            roll = data.rolling(window=window)
            mask = roll.apply(lambda x: method_func(x, y, rolling=True), raw=True)
        else:
            mask = method_func(data.to_numpy(), y, rolling=False)

        bool_mask = mask.astype(bool)
        bool_mask_series = Series(bool_mask, index=data.index)
        self.outliers = data[bool_mask_series]

    else:
        # For 'isolation_forest' and 'lof' methods
        self.outliers = method_func(data, **kwargs)
iqr(data, k, rolling) staticmethod

Use interquartile range (IQR).

Parameters:

Name Type Description Default
data Series

The time series data.

required

Other Parameters:

Name Type Description
k float

The multiplier for the IQR to define the range. Defaults to 1.5.

Returns:

Type Description
ndarray

np.ndarray: Binary mask representing the outliers as 1.

Source code in gensor/analysis/outliers.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@staticmethod
def iqr(data: np.ndarray, k: float, rolling: bool) -> np.ndarray:
    """Use interquartile range (IQR).

    Parameters:
        data (pandas.Series): The time series data.

    Keyword Args:
        k (float): The multiplier for the IQR to define the range. Defaults to 1.5.

    Returns:
        np.ndarray: Binary mask representing the outliers as 1.
    """

    Q1 = np.percentile(data, 0.25)
    Q3 = np.percentile(data, 0.75)
    IQR = Q3 - Q1

    lower_bound = Q1 - k * IQR
    upper_bound = Q3 + k * IQR

    if rolling:
        return (
            np.array([1])
            if (data[-1] < lower_bound or data[-1] > upper_bound)
            else np.array([0])
        )

    return np.where((data < lower_bound) | (data > upper_bound), 1, 0)
isolation_forest(data, **kwargs)

Using the isolation forest algorithm.

Parameters:

Name Type Description Default
data Series

The time series data.

required

Other Parameters:

Name Type Description
n_estimators int

The number of base estimators in the ensemble. Defaults to 100.

max_samples int | auto | float

The number of samples to draw from X to train each base estimator. Defaults to 'auto'.

contamination float

The proportion of outliers in the data. Defaults to 0.01.

max_features int | float

The number of features to draw from X to train each base estimator. Defaults to 1.0.

bootstrap bool

Whether to use bootstrapping when sampling the data. Defaults to False.

n_jobs int

The number of jobs to run in parallel. Defaults to 1.

random_state int | RandomState | None

The random state to use. Defaults to None.

verbose int

The verbosity level. Defaults to 0.

warm_start bool

Whether to reuse the solution of the previous call to fit and add more estimators to the ensemble. Defaults to False.

Note

For details on kwargs see: sklearn.ensemble.IsolationForest.

Source code in gensor/analysis/outliers.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def isolation_forest(self, data: Series, **kwargs: Any) -> Series:
    """Using the isolation forest algorithm.

    Parameters:
        data (pandas.Series): The time series data.

    Keyword Args:
        n_estimators (int): The number of base estimators in the ensemble. Defaults to 100.
        max_samples (int | 'auto' | float): The number of samples to draw from X to train each base estimator. Defaults to 'auto'.
        contamination (float): The proportion of outliers in the data. Defaults to 0.01.
        max_features (int | float): The number of features to draw from X to train each base estimator. Defaults to 1.0.
        bootstrap (bool): Whether to use bootstrapping when sampling the data. Defaults to False.
        n_jobs (int): The number of jobs to run in parallel. Defaults to 1.
        random_state (int | RandomState | None): The random state to use. Defaults to None.
        verbose (int): The verbosity level. Defaults to 0.
        warm_start (bool): Whether to reuse the solution of the previous call to fit and add more estimators to the ensemble. Defaults to False.

    Note:
        For details on kwargs see: sklearn.ensemble.IsolationForest.
    """

    X = data.to_numpy().reshape(-1, 1)

    clf = IsolationForest(**kwargs)
    clf.fit(X)

    is_outlier = clf.predict(X)
    outliers: Series = data[is_outlier == -1]

    return outliers
lof(data, **kwargs)

Using the local outlier factor (LOF) method.

Parameters:

Name Type Description Default
data Series

The time series data.

required

Other Parameters:

Name Type Description
n_neighbors int

The number of neighbors to consider for each sample. Defaults to 20.

algorithm str

The algorithm to use. Either 'auto', 'ball_tree', 'kd_tree' or 'brute'. Defaults to 'auto'.

leaf_size int

The leaf size of the tree. Defaults to 30.

metric str

The distance metric to use. Defaults to 'minkowski'.

p int

The power parameter for the Minkowski metric. Defaults to 2.

contamination float

The proportion of outliers in the data. Defaults to 0.01.

novelty bool

Whether to consider the samples as normal or outliers. Defaults to False.

n_jobs int

The number of jobs to run in parallel. Defaults to 1.

Note: For details on kwargs see: sklearn.neighbors.LocalOutlierFactor.

Source code in gensor/analysis/outliers.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def lof(self, data: Series, **kwargs: Any) -> Series:
    """Using the local outlier factor (LOF) method.

    Parameters:
        data (pandas.Series): The time series data.

    Keyword Args:
        n_neighbors (int): The number of neighbors to consider for each sample. Defaults to 20.
        algorithm (str): The algorithm to use. Either 'auto', 'ball_tree', 'kd_tree' or 'brute'. Defaults to 'auto'.
        leaf_size (int): The leaf size of the tree. Defaults to 30.
        metric (str): The distance metric to use. Defaults to 'minkowski'.
        p (int): The power parameter for the Minkowski metric. Defaults to 2.
        contamination (float): The proportion of outliers in the data. Defaults to 0.01.
        novelty (bool): Whether to consider the samples as normal or outliers. Defaults to False.
        n_jobs (int): The number of jobs to run in parallel. Defaults to 1.
    Note:
        For details on kwargs see: sklearn.neighbors.LocalOutlierFactor.
    """

    X = data.to_numpy().reshape(-1, 1)

    clf = LocalOutlierFactor(**kwargs)

    is_outlier = clf.fit_predict(X)
    outliers: Series = data[is_outlier == -1]

    return outliers
zscore(data, threshold, rolling) staticmethod

Use the z-score method.

Parameters:

Name Type Description Default
data Series

The time series data.

required

Other Parameters:

Name Type Description
threshold float

The threshold for the z-score method. Defaults to 3.0.

Returns:

Type Description
ndarray

pandas.Series: Binary mask representing outliers.

Source code in gensor/analysis/outliers.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@staticmethod
def zscore(data: np.ndarray, threshold: float, rolling: bool) -> np.ndarray:
    """Use the z-score method.

    Parameters:
        data (pandas.Series): The time series data.

    Keyword Args:
        threshold (float): The threshold for the z-score method. Defaults to 3.0.

    Returns:
        pandas.Series: Binary mask representing outliers.
    """

    mean = np.mean(data)
    std_dev = np.std(data)

    z_scores = np.abs((data - mean) / std_dev)

    if rolling:
        return np.array([1]) if z_scores[-1] > threshold else np.array([0])
    return np.where(z_scores > threshold, 1, 0)

stats

Module to compute timeseries statistics, similar to pastas.stats.signatures module and following Heudorfer et al. 2019

To be implemented:

  • Structure
  • Flashiness
  • Distribution
  • Modality
  • Density
  • Shape
  • Scale
  • Slope

config

Warning

Whenever Timeseries objects are created via read_from_csv and use a parser (e.g., 'vanessen'), the timestamps are localized and converted to UTC. Therefore, if the user creates his own timeseries outside the read_from_csv, they should ensure that the timestamps are in UTC format.

core

base

BaseTimeseries

Bases: BaseModel

Generic base class for timeseries with metadata.

Timeseries is a series of measurements of a single variable, in the same unit, from a single location with unique timestamps.

Attributes:

Name Type Description
ts Series

The timeseries data.

variable Literal['temperature', 'pressure', 'conductivity', 'flux']

The type of the measurement.

unit Literal['degC', 'mmH2O', 'mS/cm', 'm/s']

The unit of the measurement.

outliers Series

Measurements marked as outliers.

transformation Any

Metadata of transformation the timeseries undergone.

Methods:

Name Description
validate_ts

if the pd.Series is not exactly what is required, coerce.

Source code in gensor/core/base.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
class BaseTimeseries(pyd.BaseModel):
    """Generic base class for timeseries with metadata.

    Timeseries is a series of measurements of a single variable, in the same unit, from a
    single location with unique timestamps.

    Attributes:
        ts (pd.Series): The timeseries data.
        variable (Literal['temperature', 'pressure', 'conductivity', 'flux']):
            The type of the measurement.
        unit (Literal['degC', 'mmH2O', 'mS/cm', 'm/s']): The unit of
            the measurement.
        outliers (pd.Series): Measurements marked as outliers.
        transformation (Any): Metadata of transformation the timeseries undergone.

    Methods:
        validate_ts: if the pd.Series is not exactly what is required, coerce.
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    ts: pd.Series = pyd.Field(repr=False, exclude=True)
    variable: Literal[
        "temperature", "pressure", "conductivity", "flux", "head", "depth"
    ]
    unit: Literal["degc", "cmh2o", "ms/cm", "m/s", "m asl", "m"]
    location: str | None = None
    outliers: pd.Series | None = pyd.Field(default=None, repr=False, exclude=True)
    transformation: Any = pyd.Field(default=None, repr=False, exclude=True)

    @pyd.computed_field()  # type: ignore[prop-decorator]
    @property
    def start(self) -> pd.Timestamp | Any:
        return self.ts.index.min()

    @pyd.computed_field()  # type: ignore[prop-decorator]
    @property
    def end(self) -> pd.Timestamp | Any:
        return self.ts.index.max()

    @pyd.field_serializer("start", "end")
    def serialize_timestamps(self, value: pd.Timestamp | None) -> str | None:
        """Serialize `pd.Timestamp` to ISO format."""
        return value.strftime("%Y%m%d%H%M%S") if value is not None else None

    def __eq__(self, other: object) -> bool:
        """Check equality based on location, sensor, variable, unit and sensor_alt."""
        if not isinstance(other, BaseTimeseries):
            return NotImplemented

        return (
            self.variable == other.variable
            and self.unit == other.unit
            and self.location == other.location
        )

    def __getattr__(self, attr: Any) -> Any:
        """Delegate attribute access to the underlying pandas Series if it exists.

        Special handling is implemented for pandas indexer.
        """
        if attr == "loc":
            return TimeseriesIndexer(self, self.ts.loc)

        if attr == "iloc":
            return TimeseriesIndexer(self, self.ts.iloc)

        error_message = f"'{self.__class__.__name__}' object has no attribute '{attr}'"

        if hasattr(self.ts, attr):
            # Return a function to call on the `ts` if it's a method, otherwise return the attribute
            ts_attr = getattr(self.ts, attr)
            if callable(ts_attr):

                def wrapper(*args: Any, **kwargs: Any) -> Any:
                    result = ts_attr(*args, **kwargs)
                    # If the result is a Series, return a new Timeseries; otherwise, return the result
                    if isinstance(result, pd.Series):
                        return self.model_copy(
                            update={"ts": deepcopy(result)}, deep=True
                        )

                    return result

                return wrapper
            else:
                return ts_attr
        raise AttributeError(error_message)

    @pyd.field_validator("ts")
    def validate_ts(cls, v: pd.Series) -> pd.Series:
        validated_ts = ts_schema.validate(v)

        return validated_ts

    @pyd.field_validator("outliers")
    def validate_outliers(cls, v: pd.Series) -> pd.Series:
        if v is not None:
            return ts_schema.validate(v)
        return v

    def concatenate(self: T, other: T) -> T:
        """Concatenate two Timeseries objects if they are considered equal."""
        if not isinstance(other, type(self)):
            return NotImplemented

        if self == other:
            combined_ts = pd.concat([self.ts, other.ts]).sort_index()
            combined_ts = combined_ts[~combined_ts.index.duplicated(keep="first")]

            return self.model_copy(update={"ts": combined_ts})
        else:
            raise TimeseriesUnequal()

    def resample(
        self: T,
        freq: Any,
        agg_func: Any = pd.Series.mean,
        **resample_kwargs: Any,
    ) -> T:
        """Resample the timeseries to a new frequency with a specified
        aggregation function.

        Parameters:
            freq (Any): The offset string or object representing target conversion
                (e.g., 'D' for daily, 'W' for weekly).
            agg_func (Any): The aggregation function to apply
                after resampling. Defaults to pd.Series.mean.
            **resample_kwargs: Additional keyword arguments passed to the
                pandas.Series.resample method.

        Returns:
            Updated deep copy of the Timeseries object with the
                resampled timeseries data.
        """
        resampled_ts = self.ts.resample(freq, **resample_kwargs).apply(agg_func)

        return self.model_copy(update={"ts": resampled_ts}, deep=True)

    def transform(
        self: T,
        method: Literal[
            "difference",
            "log",
            "square_root",
            "box_cox",
            "standard_scaler",
            "minmax_scaler",
            "robust_scaler",
            "maxabs_scaler",
        ],
        **transformer_kwargs: Any,
    ) -> T:
        """Transforms the timeseries using the specified method.

        Parameters:
            method (str): The method to use for transformation ('minmax',
                'standard', 'robust').
            transformer_kwargs: Additional keyword arguments passed to the
                transformer definition. See gensor.preprocessing.

        Returns:
            Updated deep copy of the Timeseries object with the
                transformed timeseries data.
        """

        data, transformation = Transformation(
            self.ts, method, **transformer_kwargs
        ).get_transformation()

        return self.model_copy(
            update={"ts": data, "transformation": transformation}, deep=True
        )

    def detect_outliers(
        self: T,
        method: Literal["iqr", "zscore", "isolation_forest", "lof"],
        rolling: bool = False,
        window: int = 6,
        remove: bool = True,
        **kwargs: Any,
    ) -> T:
        """Detects outliers in the timeseries using the specified method.

        Parameters:
            method (Literal['iqr', 'zscore', 'isolation_forest', 'lof']): The
                method to use for outlier detection.
            **kwargs: Additional kewword arguments for OutlierDetection.

        Returns:
            Updated deep copy of the Timeseries object with outliers,
            optionally removed from the original timeseries.
        """
        self.outliers = OutlierDetection(
            self.ts, method, rolling, window, **kwargs
        ).outliers

        if remove:
            filtered_ts = self.ts.drop(self.outliers.index)
            return self.model_copy(update={"ts": filtered_ts}, deep=True)

        else:
            return self

    def mask_with(
        self: T, other: T | pd.Series, mode: Literal["keep", "remove"] = "remove"
    ) -> T:
        """
        Removes records not present in 'other' by index.

        Parameters:
            other (Timeseries): Another Timeseries whose indices are used to mask the current one.
            mode (Literal['keep', 'remove']):
                - 'keep': Retains only the overlapping data.
                - 'remove': Removes the overlapping data.

        Returns:
            Timeseries: A new Timeseries object with the filtered data.
        """
        if isinstance(other, pd.Series):
            mask = other
        elif isinstance(other, BaseTimeseries):
            mask = other.ts

        if mode == "keep":
            masked_data = self.ts[self.ts.index.isin(mask.index)]
        elif mode == "remove":
            masked_data = self.ts[~self.ts.index.isin(mask.index)]
        else:
            message = f"Invalid mode: {mode}. Use 'keep' or 'remove'."
            raise ValueError(message)

        return self.model_copy(update={"ts": masked_data}, deep=True)

    def to_sql(self: T, db: DatabaseConnection) -> str:
        """Converts the timeseries to a list of dictionaries and uploads it to the database.

        The Timeseries data is uploaded to the SQL database by using the pandas
        `to_sql` method. Additionally, metadata about the timeseries is stored in the
        'timeseries_metadata' table.

        Parameters:
            db (DatabaseConnection): The database connection object.

        Returns:
            str: A message indicating the number of rows inserted into the database.
        """

        def separate_metadata() -> tuple:
            _core_metadata_fields = {"location", "variable", "unit", "start", "end"}

            core_metadata = self.model_dump(include=_core_metadata_fields)
            core_metadata.update({
                "cls": f"{self.__module__}.{self.__class__.__name__}"
            })

            extra_metadata = self.model_dump(exclude=_core_metadata_fields)

            return core_metadata, extra_metadata

        timestamp_start_fmt = self.start.strftime("%Y%m%d%H%M%S")
        timestamp_end_fmt = self.end.strftime("%Y%m%d%H%M%S")

        # Ensure the index is a pandas DatetimeIndex
        if isinstance(self.ts.index, pd.DatetimeIndex):
            utc_index = (
                self.ts.index.tz_convert("UTC")
                if self.ts.index.tz is not None
                else self.ts.index
            )
        else:
            message = "The index is not a DatetimeIndex and cannot be converted to UTC."
            raise TypeError(message)

        # Records as dicts keyed by column name so the insert can run as an
        # executemany (one row per parameter set) instead of a single multi-row
        # VALUES clause - the latter blows SQLite's bound-variable limit for long
        # series. ``tolist()`` also yields native Python floats/strings.
        series_as_records = [
            {"timestamp": timestamp, self.variable: value}
            for timestamp, value in zip(
                utc_index.strftime("%Y-%m-%dT%H:%M:%S%z").tolist(),
                self.ts.tolist(),
                strict=False,
            )
        ]

        # Extra metadata are attributes additional to BaseTimeseries
        core_metadata, extra_metadata = separate_metadata()

        metadata_entry = {
            **core_metadata,
            "extra": extra_metadata,
        }

        created_table = db.get_timeseries_metadata(
            location=self.location,
            variable=self.variable,
            unit=self.unit,
            **extra_metadata,
        )

        with db as con:
            if created_table.empty:
                schema_name = f"{self.location}_{self.variable}_{self.unit}".lower()
                unique_hash = str(uuid.uuid4())[:5]
                schema_name = schema_name + f"_{unique_hash}"

                # Newly created data schema
                schema = db.create_table(schema_name, self.variable)
            else:
                # Existing data schema
                schema_name = created_table["table_name"].iloc[0]
                schema = db.metadata.tables[schema_name]

            metadata_schema = db.metadata.tables["__timeseries_metadata__"]
            metadata_entry.update({"table_name": schema_name})

            if isinstance(schema, Table):
                if series_as_records:
                    stmt = sqlite_insert(schema).on_conflict_do_nothing(
                        index_elements=["timestamp"]
                    )
                    con.execute(stmt, series_as_records)

                metadata_stmt = sqlite_insert(metadata_schema).values(metadata_entry)
                metadata_stmt = metadata_stmt.on_conflict_do_update(
                    index_elements=["table_name"],
                    set_={
                        "start": timestamp_start_fmt,
                        "end": timestamp_end_fmt,
                    },
                )
                con.execute(metadata_stmt)

            # Commit all changes at once
            con.commit()

        return f"{schema_name} table and metadata updated."

    def plot(
        self: T,
        include_outliers: bool = False,
        ax: Axes | None = None,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plots the timeseries data.

        Parameters:
            include_outliers (bool): Whether to include outliers in the plot.
            ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
                If None, a new figure and axes are created.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

        Returns:
            (fig, ax): Matplotlib figure and axes to allow further customization.
        """

        plot_kwargs = plot_kwargs or {}
        legend_kwargs = legend_kwargs or {}

        if ax is None:
            fig, ax = plt.subplots(figsize=(10, 5))
        else:
            # mypy complained that the get_figure() can return None, but there is no
            # situation here in which this could be the case.
            fig = ax.get_figure()  # type: ignore [assignment]

        ax.plot(
            self.ts.index,
            self.ts,
            label=f"{self.location}",
            **plot_kwargs,
        )

        if include_outliers and self.outliers is not None:
            ax.scatter(
                self.outliers.index, self.outliers, color="red", label="Outliers"
            )
        for label in ax.get_xticklabels():
            label.set_rotation(45)

        ax.set_xlabel("Time")
        ax.set_ylabel(f"{self.variable} ({self.unit})")
        ax.set_title(f"{self.variable.capitalize()} at {self.location}")

        ax.legend(**legend_kwargs)

        return fig, ax
__eq__(other)

Check equality based on location, sensor, variable, unit and sensor_alt.

Source code in gensor/core/base.py
78
79
80
81
82
83
84
85
86
87
def __eq__(self, other: object) -> bool:
    """Check equality based on location, sensor, variable, unit and sensor_alt."""
    if not isinstance(other, BaseTimeseries):
        return NotImplemented

    return (
        self.variable == other.variable
        and self.unit == other.unit
        and self.location == other.location
    )
__getattr__(attr)

Delegate attribute access to the underlying pandas Series if it exists.

Special handling is implemented for pandas indexer.

Source code in gensor/core/base.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __getattr__(self, attr: Any) -> Any:
    """Delegate attribute access to the underlying pandas Series if it exists.

    Special handling is implemented for pandas indexer.
    """
    if attr == "loc":
        return TimeseriesIndexer(self, self.ts.loc)

    if attr == "iloc":
        return TimeseriesIndexer(self, self.ts.iloc)

    error_message = f"'{self.__class__.__name__}' object has no attribute '{attr}'"

    if hasattr(self.ts, attr):
        # Return a function to call on the `ts` if it's a method, otherwise return the attribute
        ts_attr = getattr(self.ts, attr)
        if callable(ts_attr):

            def wrapper(*args: Any, **kwargs: Any) -> Any:
                result = ts_attr(*args, **kwargs)
                # If the result is a Series, return a new Timeseries; otherwise, return the result
                if isinstance(result, pd.Series):
                    return self.model_copy(
                        update={"ts": deepcopy(result)}, deep=True
                    )

                return result

            return wrapper
        else:
            return ts_attr
    raise AttributeError(error_message)
concatenate(other)

Concatenate two Timeseries objects if they are considered equal.

Source code in gensor/core/base.py
134
135
136
137
138
139
140
141
142
143
144
145
def concatenate(self: T, other: T) -> T:
    """Concatenate two Timeseries objects if they are considered equal."""
    if not isinstance(other, type(self)):
        return NotImplemented

    if self == other:
        combined_ts = pd.concat([self.ts, other.ts]).sort_index()
        combined_ts = combined_ts[~combined_ts.index.duplicated(keep="first")]

        return self.model_copy(update={"ts": combined_ts})
    else:
        raise TimeseriesUnequal()
detect_outliers(method, rolling=False, window=6, remove=True, **kwargs)

Detects outliers in the timeseries using the specified method.

Parameters:

Name Type Description Default
method Literal['iqr', 'zscore', 'isolation_forest', 'lof']

The method to use for outlier detection.

required
**kwargs Any

Additional kewword arguments for OutlierDetection.

{}

Returns:

Type Description
T

Updated deep copy of the Timeseries object with outliers,

T

optionally removed from the original timeseries.

Source code in gensor/core/base.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def detect_outliers(
    self: T,
    method: Literal["iqr", "zscore", "isolation_forest", "lof"],
    rolling: bool = False,
    window: int = 6,
    remove: bool = True,
    **kwargs: Any,
) -> T:
    """Detects outliers in the timeseries using the specified method.

    Parameters:
        method (Literal['iqr', 'zscore', 'isolation_forest', 'lof']): The
            method to use for outlier detection.
        **kwargs: Additional kewword arguments for OutlierDetection.

    Returns:
        Updated deep copy of the Timeseries object with outliers,
        optionally removed from the original timeseries.
    """
    self.outliers = OutlierDetection(
        self.ts, method, rolling, window, **kwargs
    ).outliers

    if remove:
        filtered_ts = self.ts.drop(self.outliers.index)
        return self.model_copy(update={"ts": filtered_ts}, deep=True)

    else:
        return self
mask_with(other, mode='remove')

Removes records not present in 'other' by index.

Parameters:

Name Type Description Default
other Timeseries

Another Timeseries whose indices are used to mask the current one.

required
mode Literal['keep', 'remove']
  • 'keep': Retains only the overlapping data.
  • 'remove': Removes the overlapping data.
'remove'

Returns:

Name Type Description
Timeseries T

A new Timeseries object with the filtered data.

Source code in gensor/core/base.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def mask_with(
    self: T, other: T | pd.Series, mode: Literal["keep", "remove"] = "remove"
) -> T:
    """
    Removes records not present in 'other' by index.

    Parameters:
        other (Timeseries): Another Timeseries whose indices are used to mask the current one.
        mode (Literal['keep', 'remove']):
            - 'keep': Retains only the overlapping data.
            - 'remove': Removes the overlapping data.

    Returns:
        Timeseries: A new Timeseries object with the filtered data.
    """
    if isinstance(other, pd.Series):
        mask = other
    elif isinstance(other, BaseTimeseries):
        mask = other.ts

    if mode == "keep":
        masked_data = self.ts[self.ts.index.isin(mask.index)]
    elif mode == "remove":
        masked_data = self.ts[~self.ts.index.isin(mask.index)]
    else:
        message = f"Invalid mode: {mode}. Use 'keep' or 'remove'."
        raise ValueError(message)

    return self.model_copy(update={"ts": masked_data}, deep=True)
plot(include_outliers=False, ax=None, plot_kwargs=None, legend_kwargs=None)

Plots the timeseries data.

Parameters:

Name Type Description Default
include_outliers bool

Whether to include outliers in the plot.

False
ax Axes

Matplotlib axes object to plot on. If None, a new figure and axes are created.

None
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes to allow further customization.

Source code in gensor/core/base.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def plot(
    self: T,
    include_outliers: bool = False,
    ax: Axes | None = None,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plots the timeseries data.

    Parameters:
        include_outliers (bool): Whether to include outliers in the plot.
        ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
            If None, a new figure and axes are created.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

    Returns:
        (fig, ax): Matplotlib figure and axes to allow further customization.
    """

    plot_kwargs = plot_kwargs or {}
    legend_kwargs = legend_kwargs or {}

    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 5))
    else:
        # mypy complained that the get_figure() can return None, but there is no
        # situation here in which this could be the case.
        fig = ax.get_figure()  # type: ignore [assignment]

    ax.plot(
        self.ts.index,
        self.ts,
        label=f"{self.location}",
        **plot_kwargs,
    )

    if include_outliers and self.outliers is not None:
        ax.scatter(
            self.outliers.index, self.outliers, color="red", label="Outliers"
        )
    for label in ax.get_xticklabels():
        label.set_rotation(45)

    ax.set_xlabel("Time")
    ax.set_ylabel(f"{self.variable} ({self.unit})")
    ax.set_title(f"{self.variable.capitalize()} at {self.location}")

    ax.legend(**legend_kwargs)

    return fig, ax
resample(freq, agg_func=pd.Series.mean, **resample_kwargs)

Resample the timeseries to a new frequency with a specified aggregation function.

Parameters:

Name Type Description Default
freq Any

The offset string or object representing target conversion (e.g., 'D' for daily, 'W' for weekly).

required
agg_func Any

The aggregation function to apply after resampling. Defaults to pd.Series.mean.

mean
**resample_kwargs Any

Additional keyword arguments passed to the pandas.Series.resample method.

{}

Returns:

Type Description
T

Updated deep copy of the Timeseries object with the resampled timeseries data.

Source code in gensor/core/base.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def resample(
    self: T,
    freq: Any,
    agg_func: Any = pd.Series.mean,
    **resample_kwargs: Any,
) -> T:
    """Resample the timeseries to a new frequency with a specified
    aggregation function.

    Parameters:
        freq (Any): The offset string or object representing target conversion
            (e.g., 'D' for daily, 'W' for weekly).
        agg_func (Any): The aggregation function to apply
            after resampling. Defaults to pd.Series.mean.
        **resample_kwargs: Additional keyword arguments passed to the
            pandas.Series.resample method.

    Returns:
        Updated deep copy of the Timeseries object with the
            resampled timeseries data.
    """
    resampled_ts = self.ts.resample(freq, **resample_kwargs).apply(agg_func)

    return self.model_copy(update={"ts": resampled_ts}, deep=True)
serialize_timestamps(value)

Serialize pd.Timestamp to ISO format.

Source code in gensor/core/base.py
73
74
75
76
@pyd.field_serializer("start", "end")
def serialize_timestamps(self, value: pd.Timestamp | None) -> str | None:
    """Serialize `pd.Timestamp` to ISO format."""
    return value.strftime("%Y%m%d%H%M%S") if value is not None else None
to_sql(db)

Converts the timeseries to a list of dictionaries and uploads it to the database.

The Timeseries data is uploaded to the SQL database by using the pandas to_sql method. Additionally, metadata about the timeseries is stored in the 'timeseries_metadata' table.

Parameters:

Name Type Description Default
db DatabaseConnection

The database connection object.

required

Returns:

Name Type Description
str str

A message indicating the number of rows inserted into the database.

Source code in gensor/core/base.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def to_sql(self: T, db: DatabaseConnection) -> str:
    """Converts the timeseries to a list of dictionaries and uploads it to the database.

    The Timeseries data is uploaded to the SQL database by using the pandas
    `to_sql` method. Additionally, metadata about the timeseries is stored in the
    'timeseries_metadata' table.

    Parameters:
        db (DatabaseConnection): The database connection object.

    Returns:
        str: A message indicating the number of rows inserted into the database.
    """

    def separate_metadata() -> tuple:
        _core_metadata_fields = {"location", "variable", "unit", "start", "end"}

        core_metadata = self.model_dump(include=_core_metadata_fields)
        core_metadata.update({
            "cls": f"{self.__module__}.{self.__class__.__name__}"
        })

        extra_metadata = self.model_dump(exclude=_core_metadata_fields)

        return core_metadata, extra_metadata

    timestamp_start_fmt = self.start.strftime("%Y%m%d%H%M%S")
    timestamp_end_fmt = self.end.strftime("%Y%m%d%H%M%S")

    # Ensure the index is a pandas DatetimeIndex
    if isinstance(self.ts.index, pd.DatetimeIndex):
        utc_index = (
            self.ts.index.tz_convert("UTC")
            if self.ts.index.tz is not None
            else self.ts.index
        )
    else:
        message = "The index is not a DatetimeIndex and cannot be converted to UTC."
        raise TypeError(message)

    # Records as dicts keyed by column name so the insert can run as an
    # executemany (one row per parameter set) instead of a single multi-row
    # VALUES clause - the latter blows SQLite's bound-variable limit for long
    # series. ``tolist()`` also yields native Python floats/strings.
    series_as_records = [
        {"timestamp": timestamp, self.variable: value}
        for timestamp, value in zip(
            utc_index.strftime("%Y-%m-%dT%H:%M:%S%z").tolist(),
            self.ts.tolist(),
            strict=False,
        )
    ]

    # Extra metadata are attributes additional to BaseTimeseries
    core_metadata, extra_metadata = separate_metadata()

    metadata_entry = {
        **core_metadata,
        "extra": extra_metadata,
    }

    created_table = db.get_timeseries_metadata(
        location=self.location,
        variable=self.variable,
        unit=self.unit,
        **extra_metadata,
    )

    with db as con:
        if created_table.empty:
            schema_name = f"{self.location}_{self.variable}_{self.unit}".lower()
            unique_hash = str(uuid.uuid4())[:5]
            schema_name = schema_name + f"_{unique_hash}"

            # Newly created data schema
            schema = db.create_table(schema_name, self.variable)
        else:
            # Existing data schema
            schema_name = created_table["table_name"].iloc[0]
            schema = db.metadata.tables[schema_name]

        metadata_schema = db.metadata.tables["__timeseries_metadata__"]
        metadata_entry.update({"table_name": schema_name})

        if isinstance(schema, Table):
            if series_as_records:
                stmt = sqlite_insert(schema).on_conflict_do_nothing(
                    index_elements=["timestamp"]
                )
                con.execute(stmt, series_as_records)

            metadata_stmt = sqlite_insert(metadata_schema).values(metadata_entry)
            metadata_stmt = metadata_stmt.on_conflict_do_update(
                index_elements=["table_name"],
                set_={
                    "start": timestamp_start_fmt,
                    "end": timestamp_end_fmt,
                },
            )
            con.execute(metadata_stmt)

        # Commit all changes at once
        con.commit()

    return f"{schema_name} table and metadata updated."
transform(method, **transformer_kwargs)

Transforms the timeseries using the specified method.

Parameters:

Name Type Description Default
method str

The method to use for transformation ('minmax', 'standard', 'robust').

required
transformer_kwargs Any

Additional keyword arguments passed to the transformer definition. See gensor.preprocessing.

{}

Returns:

Type Description
T

Updated deep copy of the Timeseries object with the transformed timeseries data.

Source code in gensor/core/base.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def transform(
    self: T,
    method: Literal[
        "difference",
        "log",
        "square_root",
        "box_cox",
        "standard_scaler",
        "minmax_scaler",
        "robust_scaler",
        "maxabs_scaler",
    ],
    **transformer_kwargs: Any,
) -> T:
    """Transforms the timeseries using the specified method.

    Parameters:
        method (str): The method to use for transformation ('minmax',
            'standard', 'robust').
        transformer_kwargs: Additional keyword arguments passed to the
            transformer definition. See gensor.preprocessing.

    Returns:
        Updated deep copy of the Timeseries object with the
            transformed timeseries data.
    """

    data, transformation = Transformation(
        self.ts, method, **transformer_kwargs
    ).get_transformation()

    return self.model_copy(
        update={"ts": data, "transformation": transformation}, deep=True
    )

dataset

Coverage

Coverage summary of a :class:Dataset, returned by Dataset.coverage.

Holds a per-timeseries table (one row per location / variable / sensor with its record count and time span) and renders as that table in a notebook. Call :meth:plot for a coverage timeline (one row per location; bars span contiguous data, breaks mark gaps longer than max_gap).

The table is :attr:Dataset.info with a derived duration column appended, so the per-series summary has a single source.

Source code in gensor/core/dataset.py
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
class Coverage:
    """Coverage summary of a :class:`Dataset`, returned by ``Dataset.coverage``.

    Holds a per-timeseries ``table`` (one row per location / variable / sensor with
    its record count and time span) and renders as that table in a notebook. Call
    :meth:`plot` for a coverage timeline (one row per location; bars span contiguous
    data, breaks mark gaps longer than ``max_gap``).

    The table is :attr:`Dataset.info` with a derived ``duration`` column appended, so
    the per-series summary has a single source.
    """

    columns: ClassVar[list[str]] = [
        "location",
        "variable",
        "sensor",
        "records",
        "start",
        "end",
        "duration",
    ]

    def __init__(self, dataset: Dataset) -> None:
        self._dataset = dataset
        table = dataset.info
        table["duration"] = table["end"] - table["start"]
        self.table = table

    def __repr__(self) -> str:
        return self.table.to_string(index=False)

    def _repr_html_(self) -> str:
        return self.table.to_html(index=False)

    def plot(
        self,
        max_gap: str = "7D",
        ax: Axes | None = None,
        color: str = "#1f4e79",
    ) -> tuple[Figure, Axes]:
        """Plot a coverage timeline: one row per location, with bars spanning
        contiguous data and breaks wherever the gap between consecutive samples
        exceeds ``max_gap``.

        Parameters:
            max_gap (str): pandas timedelta string; a gap longer than this splits a
                bar so within-record holes (e.g. a missing season) stay visible.
            ax (Axes | None): existing axes to draw on; a new figure is created if None.
            color (str): bar colour.

        Returns:
            (fig, ax): Matplotlib figure and axes.
        """
        threshold = pd.Timedelta(max_gap)
        locations = self._dataset.get_locations()

        if ax is None:
            fig, ax = plt.subplots(figsize=(12, 0.35 * len(locations) + 1))
        else:
            fig = ax.figure

        for row, location in enumerate(locations):
            index: pd.DatetimeIndex | None = None
            for ts in self._dataset:
                if ts is None or ts.location != location or len(ts.ts) == 0:
                    continue
                index = ts.ts.index if index is None else index.union(ts.ts.index)
            if index is None or len(index) == 0:
                continue
            ax.broken_barh(
                _coverage_segments(index, threshold), (row - 0.4, 0.8), facecolors=color
            )

        ax.set_yticks(range(len(locations)))
        ax.set_yticklabels(locations, fontsize=8)
        ax.invert_yaxis()
        ax.xaxis_date()
        ax.set_title("Data coverage")
        ax.grid(axis="x", alpha=0.3)
        fig.tight_layout()
        return fig, ax
plot(max_gap='7D', ax=None, color='#1f4e79')

Plot a coverage timeline: one row per location, with bars spanning contiguous data and breaks wherever the gap between consecutive samples exceeds max_gap.

Parameters:

Name Type Description Default
max_gap str

pandas timedelta string; a gap longer than this splits a bar so within-record holes (e.g. a missing season) stay visible.

'7D'
ax Axes | None

existing axes to draw on; a new figure is created if None.

None
color str

bar colour.

'#1f4e79'

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes.

Source code in gensor/core/dataset.py
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
def plot(
    self,
    max_gap: str = "7D",
    ax: Axes | None = None,
    color: str = "#1f4e79",
) -> tuple[Figure, Axes]:
    """Plot a coverage timeline: one row per location, with bars spanning
    contiguous data and breaks wherever the gap between consecutive samples
    exceeds ``max_gap``.

    Parameters:
        max_gap (str): pandas timedelta string; a gap longer than this splits a
            bar so within-record holes (e.g. a missing season) stay visible.
        ax (Axes | None): existing axes to draw on; a new figure is created if None.
        color (str): bar colour.

    Returns:
        (fig, ax): Matplotlib figure and axes.
    """
    threshold = pd.Timedelta(max_gap)
    locations = self._dataset.get_locations()

    if ax is None:
        fig, ax = plt.subplots(figsize=(12, 0.35 * len(locations) + 1))
    else:
        fig = ax.figure

    for row, location in enumerate(locations):
        index: pd.DatetimeIndex | None = None
        for ts in self._dataset:
            if ts is None or ts.location != location or len(ts.ts) == 0:
                continue
            index = ts.ts.index if index is None else index.union(ts.ts.index)
        if index is None or len(index) == 0:
            continue
        ax.broken_barh(
            _coverage_segments(index, threshold), (row - 0.4, 0.8), facecolors=color
        )

    ax.set_yticks(range(len(locations)))
    ax.set_yticklabels(locations, fontsize=8)
    ax.invert_yaxis()
    ax.xaxis_date()
    ax.set_title("Data coverage")
    ax.grid(axis="x", alpha=0.3)
    fig.tight_layout()
    return fig, ax

CoverageDiff

Coverage comparison of two or more datasets, returned by :func:gensor.diff (or Dataset.diff).

Series are aligned across datasets by key (default ("location", "variable")); multiple sensors sharing a key are unioned and the sensor(s) reported. Renders as a wide table (per-dataset record count / start / end, plus present and status summary columns) and exposes :meth:plot for an N-way coverage timeline grouped by timeseries.

Source code in gensor/core/dataset.py
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
class CoverageDiff:
    """Coverage comparison of two or more datasets, returned by :func:`gensor.diff`
    (or ``Dataset.diff``).

    Series are aligned across datasets by ``key`` (default ``("location",
    "variable")``); multiple sensors sharing a key are unioned and the sensor(s)
    reported. Renders as a wide ``table`` (per-dataset record count / start / end,
    plus ``present`` and ``status`` summary columns) and exposes :meth:`plot` for an
    N-way coverage timeline grouped by timeseries.
    """

    def __init__(
        self,
        datasets: dict[str, Dataset],
        key: tuple[str, ...] = ("location", "variable"),
    ) -> None:
        if len(datasets) < 2:
            message = "CoverageDiff needs at least two datasets to compare."
            raise ValueError(message)

        self._datasets = dict(datasets)
        self.key = tuple(key)
        self.labels = list(datasets)

        # per label: key-tuple -> {sensor, records, start, end}, collapsed from the
        # dataset's `.info` table; plus the union DatetimeIndex per key, kept only for
        # the timeline plot.
        self._coverage: dict[str, dict[tuple, dict]] = {}
        self._index: dict[str, dict[tuple, pd.DatetimeIndex]] = {}
        for label, dataset in datasets.items():
            self._coverage[label] = self._summarise(dataset.info)
            index_by_key: dict[tuple, pd.DatetimeIndex] = {}
            for ts in dataset:
                if ts is None or len(ts.ts) == 0:
                    continue
                k = tuple(getattr(ts, attr) for attr in self.key)
                index_by_key[k] = (
                    ts.ts.index
                    if k not in index_by_key
                    else index_by_key[k].union(ts.ts.index)
                )
            self._index[label] = {
                k: idx.sort_values() for k, idx in index_by_key.items()
            }

        self.keys = sorted({k for cov in self._coverage.values() for k in cov})
        self.table = self._build_table()

    def _summarise(self, info: pd.DataFrame) -> dict[tuple, dict]:
        """Collapse a :attr:`Dataset.info` table into one summary row per comparison
        ``key`` (the key columns must be present in ``info``). Timeseries sharing a key
        are merged: sensors joined, records summed, span widened to the outer bounds."""
        summary: dict[tuple, dict] = {}
        for row in info.itertuples(index=False):
            k = tuple(getattr(row, attr) for attr in self.key)
            entry = summary.setdefault(
                k, {"sensors": set(), "records": 0, "start": row.start, "end": row.end}
            )
            entry["sensors"].add(row.sensor)
            entry["records"] += int(row.records)
            entry["start"] = min(entry["start"], row.start)
            entry["end"] = max(entry["end"], row.end)
        return {
            k: {
                "sensor": "+".join(sorted(s for s in v["sensors"] if s)) or None,
                "records": v["records"],
                "start": v["start"],
                "end": v["end"],
            }
            for k, v in summary.items()
        }

    def _status(self, k: tuple) -> str:
        present = [lab for lab in self.labels if k in self._coverage[lab]]
        if len(present) < len(self.labels):
            return "only " + ", ".join(present)
        records = {self._coverage[lab][k]["records"] for lab in self.labels}
        spans = {
            (self._coverage[lab][k]["start"], self._coverage[lab][k]["end"])
            for lab in self.labels
        }
        if len(records) == 1 and len(spans) == 1:
            return "identical"
        return "span differs" if len(spans) > 1 else "records differ"

    def _build_table(self) -> pd.DataFrame:
        defaults = {"sensor": None, "records": 0, "start": pd.NaT, "end": pd.NaT}
        data: dict[tuple, list] = {}
        for label in self.labels:
            cov = self._coverage[label]
            for metric in ("sensor", "records", "start", "end"):
                data[(label, metric)] = [
                    cov.get(k, defaults).get(metric, defaults[metric])
                    for k in self.keys
                ]
        data[("summary", "present")] = [
            sum(k in self._coverage[lab] for lab in self.labels) for k in self.keys
        ]
        data[("summary", "status")] = [self._status(k) for k in self.keys]

        table = pd.DataFrame(
            data,
            index=pd.MultiIndex.from_tuples(self.keys, names=self.key),
        )
        table.columns = pd.MultiIndex.from_tuples(table.columns)
        return table

    def __repr__(self) -> str:
        return self.table.to_string()

    def _repr_html_(self) -> str:
        return self.table.to_html()

    def plot(
        self,
        max_gap: str = "7D",
        ax: Axes | None = None,
        colors: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plot an N-way coverage timeline grouped by timeseries.

        One row per ``key`` (e.g. location + variable); within each row a coverage
        sub-bar per dataset (colour-coded, with a legend). Series present in only one
        dataset, or covering different spans, are immediately visible.

        Parameters:
            max_gap (str): pandas timedelta string; gaps longer than this split a bar.
            ax (Axes | None): existing axes to draw on; a new figure is created if None.
            colors (dict | None): optional ``{label: colour}`` mapping.

        Returns:
            (fig, ax): Matplotlib figure and axes.
        """
        from matplotlib.patches import Patch

        threshold = pd.Timedelta(max_gap)
        if colors is None:
            cmap = plt.get_cmap("tab10")
            colors = {lab: cmap(i % 10) for i, lab in enumerate(self.labels)}

        if ax is None:
            fig, ax = plt.subplots(figsize=(13, 0.45 * len(self.keys) + 1.5))
        else:
            fig = ax.figure

        n = len(self.labels)
        sub_h = 0.8 / n
        for row, k in enumerate(self.keys):
            for j, label in enumerate(self.labels):
                index = self._index[label].get(k)
                if index is None or len(index) == 0:
                    continue
                y = row - 0.4 + j * sub_h
                ax.broken_barh(
                    _coverage_segments(index, threshold),
                    (y, sub_h * 0.9),
                    facecolors=colors[label],
                )

        ax.set_yticks(range(len(self.keys)))
        ax.set_yticklabels([" ".join(map(str, k)) for k in self.keys], fontsize=7)
        ax.invert_yaxis()
        ax.xaxis_date()
        ax.set_title("Coverage diff")
        ax.grid(axis="x", alpha=0.3)
        ax.legend(
            handles=[Patch(facecolor=colors[lab], label=lab) for lab in self.labels],
            bbox_to_anchor=(1.01, 1),
            loc="upper left",
            frameon=True,
        )
        fig.tight_layout()
        return fig, ax
plot(max_gap='7D', ax=None, colors=None)

Plot an N-way coverage timeline grouped by timeseries.

One row per key (e.g. location + variable); within each row a coverage sub-bar per dataset (colour-coded, with a legend). Series present in only one dataset, or covering different spans, are immediately visible.

Parameters:

Name Type Description Default
max_gap str

pandas timedelta string; gaps longer than this split a bar.

'7D'
ax Axes | None

existing axes to draw on; a new figure is created if None.

None
colors dict | None

optional {label: colour} mapping.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes.

Source code in gensor/core/dataset.py
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
def plot(
    self,
    max_gap: str = "7D",
    ax: Axes | None = None,
    colors: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plot an N-way coverage timeline grouped by timeseries.

    One row per ``key`` (e.g. location + variable); within each row a coverage
    sub-bar per dataset (colour-coded, with a legend). Series present in only one
    dataset, or covering different spans, are immediately visible.

    Parameters:
        max_gap (str): pandas timedelta string; gaps longer than this split a bar.
        ax (Axes | None): existing axes to draw on; a new figure is created if None.
        colors (dict | None): optional ``{label: colour}`` mapping.

    Returns:
        (fig, ax): Matplotlib figure and axes.
    """
    from matplotlib.patches import Patch

    threshold = pd.Timedelta(max_gap)
    if colors is None:
        cmap = plt.get_cmap("tab10")
        colors = {lab: cmap(i % 10) for i, lab in enumerate(self.labels)}

    if ax is None:
        fig, ax = plt.subplots(figsize=(13, 0.45 * len(self.keys) + 1.5))
    else:
        fig = ax.figure

    n = len(self.labels)
    sub_h = 0.8 / n
    for row, k in enumerate(self.keys):
        for j, label in enumerate(self.labels):
            index = self._index[label].get(k)
            if index is None or len(index) == 0:
                continue
            y = row - 0.4 + j * sub_h
            ax.broken_barh(
                _coverage_segments(index, threshold),
                (y, sub_h * 0.9),
                facecolors=colors[label],
            )

    ax.set_yticks(range(len(self.keys)))
    ax.set_yticklabels([" ".join(map(str, k)) for k in self.keys], fontsize=7)
    ax.invert_yaxis()
    ax.xaxis_date()
    ax.set_title("Coverage diff")
    ax.grid(axis="x", alpha=0.3)
    ax.legend(
        handles=[Patch(facecolor=colors[lab], label=lab) for lab in self.labels],
        bbox_to_anchor=(1.01, 1),
        loc="upper left",
        frameon=True,
    )
    fig.tight_layout()
    return fig, ax

Dataset

Bases: BaseModel, Generic[T]

Store and operate on a collection of Timeseries.

Attributes:

Name Type Description
timeseries list[Timeseries]

A list of Timeseries objects.

Source code in gensor/core/dataset.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
class Dataset(pyd.BaseModel, Generic[T]):
    """Store and operate on a collection of Timeseries.

    Attributes:
        timeseries (list[Timeseries]): A list of Timeseries objects.
    """

    timeseries: list[T | None] = pyd.Field(default_factory=list)

    def __iter__(self) -> Any:
        """Allows to iterate directly over the dataset."""
        return iter(self.timeseries)

    def __len__(self) -> int:
        """Gives the number of timeseries in the Dataset."""
        return len(self.timeseries)

    def __repr__(self) -> str:
        return f"Dataset({len(self)})"

    def __getitem__(self, key: int | str | list | tuple) -> T | None | Dataset:
        """Retrieve Timeseries by integer index, location name, or (location,
        variable[, unit]) tuple.

        - ``dataset[0]`` returns the Timeseries at that position (a reference).
        - ``dataset["PB01A"]`` returns the Timeseries at that location, or a
          Dataset if the location has several timeseries (e.g. pressure and
          temperature). A list of names (``dataset[["PB01A", "PB02A"]]``) always
          returns a Dataset.
        - ``dataset["PB01A", "pressure"]`` (or ``["PB01A", "pressure", "cmh2o"]``)
          narrows by variable/unit, returning a single Timeseries when one matches.
          For full control use :meth:`filter` / :meth:`one`.

        !!! warning
            Integer indexing returns a reference to the timeseries. Location /
            tuple indexing returns copies (it delegates to ``.filter()``).

        Parameters:
            key (int | str | list | tuple): Position, location name, list of
                names, or a (location, variable[, unit]) tuple.

        Returns:
            Timeseries | Dataset: The matching timeseries or a dataset of them.

        Raises:
            IndexOutOfRangeError: If an integer index is out of range.
            KeyError: If no timeseries matches the given location(s)/filters.
        """
        if isinstance(key, tuple):
            location, variable, unit = (*key, None, None)[:3]
            result = self.filter(location=location, variable=variable, unit=unit)
            if isinstance(result, Dataset) and len(result) == 0:
                message = f"No timeseries found for {key!r}."
                raise KeyError(message)
            return result

        if isinstance(key, str | list):
            result = self.filter(location=key)
            if isinstance(result, Dataset) and len(result) == 0:
                message = f"No timeseries found for location(s) {key!r}."
                raise KeyError(message)
            return result

        try:
            return self.timeseries[key]
        except IndexError:
            raise IndexOutOfRangeError(key, len(self)) from None

    def __contains__(self, location: object) -> bool:
        """Return True if any timeseries in the dataset has the given location."""
        return any(ts is not None and ts.location == location for ts in self.timeseries)

    def get_locations(self) -> list:
        """List all unique locations in the dataset, preserving first-seen order."""
        locations: list = []
        for ts in self.timeseries:
            if ts is not None and ts.location not in locations:
                locations.append(ts.location)
        return locations

    @property
    def loc(self) -> DatasetIndexer:
        """Label-based selection applied to every timeseries in the dataset.

        ``ds.loc[start:end]`` returns a new Dataset where each timeseries is sliced by
        ``.loc[start:end]`` (e.g. a date range), forwarding the key to each series' own
        pandas ``.loc``. Empty slices yield empty timeseries (every series is kept).

        Examples:
            >>> ds.loc["2021-01-01":"2021-12-31"]  # doctest: +SKIP
        """
        return DatasetIndexer(self)

    @property
    def coverage(self) -> Coverage:
        """Coverage summary of the dataset.

        Renders as a per-timeseries table (records and time span per location /
        variable / sensor) and exposes :meth:`Coverage.plot` for a coverage timeline.

        Examples:
            >>> ds.coverage          # the table  # doctest: +SKIP
            >>> ds.coverage.plot()   # the timeline  # doctest: +SKIP
        """
        return Coverage(self)

    @property
    def info(self) -> pd.DataFrame:
        """Per-timeseries metadata summary, rendered as a table.

        One row per timeseries — ``location``, ``variable``, ``sensor``, the number of
        ``records``, and the ``start`` / ``end`` of its time span. A quick look at what
        a Dataset holds before processing it (the default repr only shows the timeseries
        count). See :attr:`coverage` for a plottable version and :func:`gensor.diff` to
        line this up across datasets.

        Examples:
            >>> ds.info  # doctest: +SKIP
        """
        columns = ["location", "variable", "sensor", "records", "start", "end"]
        table = pd.DataFrame(
            [
                {
                    "location": ts.location,
                    "variable": ts.variable,
                    "sensor": getattr(ts, "sensor", None),
                    "records": len(ts.ts),
                    "start": ts.ts.index.min(),
                    "end": ts.ts.index.max(),
                }
                for ts in self.timeseries
                if ts is not None and len(ts.ts) > 0
            ],
            columns=columns,
        )
        if not table.empty:
            table = table.sort_values(["location", "variable", "sensor"]).reset_index(
                drop=True
            )
        return table

    def diff(
        self,
        *others: Dataset,
        labels: list[str] | None = None,
        key: tuple[str, ...] = ("location", "variable"),
    ) -> CoverageDiff:
        """Compare this dataset's coverage with one or more others.

        Convenience wrapper over :func:`gensor.diff`. ``labels`` names this dataset
        and the others (default ``ds0``, ``ds1`` ...).

        Examples:
            >>> raw.diff(trimmed, labels=["raw", "trimmed"]).plot()  # doctest: +SKIP
        """
        datasets = [self, *others]
        if labels is None:
            labels = [f"ds{i}" for i in range(len(datasets))]
        return diff(dict(zip(labels, datasets, strict=True)), key=key)

    def one(self, **filters: Any) -> T:
        """Return exactly one matching Timeseries.

        A convenience over :meth:`filter` for when a single result is expected:
        it always returns a Timeseries (never a Dataset) and raises if zero or
        more than one timeseries match - avoiding the "is it a Timeseries or a
        Dataset?" ambiguity of :meth:`filter` / ``dataset[name]``.

        Parameters:
            **filters: Same keyword filters as :meth:`filter` (location,
                variable, unit, sensor, ...).

        Returns:
            Timeseries: The single matching timeseries.

        Raises:
            ValueError: If zero or more than one timeseries match the filters.
        """
        result = self.filter(**filters)
        if isinstance(result, BaseTimeseries):
            return result

        count = len(result)
        message = f"Expected exactly one timeseries matching {filters}, found {count}."
        raise ValueError(message)

    def add(self, other: T | list[T] | Dataset) -> Dataset:
        """Appends new Timeseries to the Dataset.

        If an equal Timeseries already exists, merge the new data into the existing
        Timeseries, dropping duplicate timestamps.

        Parameters:
            other (Timeseries): The Timeseries object to add.
        """

        # I need to check for BaseTimeseries instance in the add() method, but also
        # type hint VarType T.
        if isinstance(other, list | Dataset):
            for ts in other:
                if isinstance(ts, BaseTimeseries):
                    self._add_single_timeseries(ts)  # type: ignore[arg-type]

        elif isinstance(other, BaseTimeseries):
            self._add_single_timeseries(other)

        return self

    def _add_single_timeseries(self, ts: T) -> None:
        """Adds a single Timeseries to the Dataset or merges if an equal one exists."""
        for i, existing_ts in enumerate(self.timeseries):
            if existing_ts == ts:
                self.timeseries[i] = existing_ts.concatenate(ts)
                return

        self.timeseries.append(ts)

        return

    def filter(
        self,
        *predicates: Where,
        location: str | list | None = None,
        variable: str | list | None = None,
        unit: str | list | None = None,
        **kwargs: str | list,
    ) -> T | Dataset:
        """Return a Timeseries or a new Dataset filtered by station, sensor,
        and/or variable.

        Any of ``location``/``variable``/``unit`` (and the keyword attributes) may be
        a single value or a list of values, matching a timeseries when its attribute
        equals (or is in) the given value(s).

        Prefix a value with ``~`` to *negate* it - drop timeseries with that value
        rather than keep them (e.g. ``location="~PB16D"`` keeps everything except
        PB16D; ``sensor="~AV319"`` drops just that sensor). Positive and negated
        values may be mixed within one attribute and across attributes; for a given
        attribute a timeseries is kept when its value is in the positives (if any are
        given) **and** not in the negatives, and attributes are AND-ed together.

        For conditions the per-attribute keywords can't express - notably a *combined*
        match across attributes - pass one or more :class:`Where` predicates
        positionally. ``filter(~Where(location="PB03B", sensor="AV319"))`` drops only that
        sensor at that location (the whole combination negated as a unit), while
        ``filter(Where(location="PB16A") | Where(location="PB16B"))`` keeps either.
        Predicates are AND-ed with the keyword filters.

        Parameters:
            *predicates (Where): Predicate objects; all must match for a timeseries to
                be kept (combine with ``& | ~``).
            location (str | list, optional): The location name(s); ``~`` negates.
            variable (str | list, optional): The variable(s) being measured; ``~`` negates.
            unit (str | list, optional): Unit(s) of the measurement; ``~`` negates.
            **kwargs (str | list): Attributes of subclassed timeseries used for
                filtering (e.g., sensor, method); ``~`` negates.

        Returns:
            Timeseries | Dataset: A single Timeseries if exactly one match is found,
                                   or a new Dataset if multiple matches are found.
        """
        keep = self._matcher(predicates, location, variable, unit, kwargs)
        matching_timeseries = [ts for ts in self.timeseries if keep(ts)]

        if not matching_timeseries:
            return Dataset()

        if len(matching_timeseries) == 1:
            return matching_timeseries[0].model_copy(deep=True)

        return self.model_copy(update={"timeseries": matching_timeseries})

    def pop(
        self,
        *predicates: Where,
        location: str | list | None = None,
        variable: str | list | None = None,
        unit: str | list | None = None,
        **kwargs: str | list,
    ) -> T | Dataset:
        """Remove and return the matching timeseries, mutating the Dataset in place.

        Selection works exactly like :meth:`filter` (same ``location`` / ``variable`` /
        ``unit`` / keyword filters, ``~`` negation, and :class:`Where` predicates), but
        the matched timeseries are **removed** from this Dataset and returned **by
        reference** (not copied) - so you can alter them and ``add()`` them back in their
        new form::

            ts = ds.pop(location="PB03B", sensor="AV319")   # taken out of ds
            ts.ts = ts.ts - 300                             # edit the live series
            ds.add(ts)                                       # put it back, changed

        Parameters:
            *predicates (Where): Predicate objects; all must match (combine with ``& | ~``).
            location (str | list, optional): The location name(s); ``~`` negates.
            variable (str | list, optional): The variable(s) being measured; ``~`` negates.
            unit (str | list, optional): Unit(s) of the measurement; ``~`` negates.
            **kwargs (str | list): Other timeseries attributes to match (e.g., sensor).

        Returns:
            Timeseries | Dataset: A single Timeseries if exactly one match is removed, a
                new Dataset of them if several match, or an empty Dataset if none match
                (in which case nothing is removed).
        """
        keep = self._matcher(predicates, location, variable, unit, kwargs)

        popped: list[T | None] = []
        remaining: list[T | None] = []
        for ts in self.timeseries:
            (popped if keep(ts) else remaining).append(ts)

        self.timeseries = remaining

        if not popped:
            return Dataset()
        if len(popped) == 1:
            return popped[0]
        return Dataset(timeseries=popped)

    def _matcher(
        self,
        predicates: tuple,
        location: str | list | None,
        variable: str | list | None,
        unit: str | list | None,
        kwargs: dict,
    ) -> Any:
        """Build the ``keep(ts)`` predicate shared by :meth:`filter` and :meth:`pop`.

        A timeseries is kept when it matches every keyword filter (``~`` negation
        included) and every positional :class:`Where` predicate. ``None`` entries never
        match.
        """
        keywords = {"location": location, "variable": variable, "unit": unit, **kwargs}
        tests = [
            Where(**{attr: value})
            for attr, value in keywords.items()
            if value is not None
        ]
        tests.extend(predicates)

        def keep(ts: T | None) -> bool:
            return ts is not None and all(test(ts) for test in tests)

        return keep

    def to_sql(self, db: DatabaseConnection) -> None:
        """Save the entire timeseries to a SQLite database.

        Parameters:
            db (DatabaseConnection): SQLite database connection object.
        """
        for ts in self.timeseries:
            if ts is None:
                continue
            if len(ts.ts) == 0:
                logger.info(
                    f"Skipping empty timeseries (location={ts.location!r}) - "
                    "nothing to write to the database."
                )
                continue
            ts.to_sql(db)
        return

    def plot(
        self,
        facet: str = "variable",
        variable: str | list | None = None,
        ncols: int = 5,
        sharex: bool = False,
        include_outliers: bool = False,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, list] | dict[str, tuple[Figure, list]]:
        """Plot the dataset's timeseries, in one of two layouts.

        - ``facet="variable"`` (default): one subplot per variable (pressure,
          temperature, ...), every location's series overlaid on that axis. Returns
          ``(fig, axes)`` where ``axes`` is a list (one per variable).
        - ``facet="location"``: a **separate figure per variable**, each a grid with one
          panel per location (``ncols`` wide). Every location gets a panel - left empty
          if it has no (or empty) series for that variable - and unused trailing cells are
          hidden. Multiple sensors at a location are overlaid in the same panel, and a
          legend (labelled by **sensor serial**) is shown only then; single-series panels
          get no legend. Panels are titled by location and carry no x-label (the dates are
          on the shared/rotated ticks). Returns ``{variable: (fig, axes)}``.

        Parameters:
            facet (str): ``"variable"`` or ``"location"``.
            variable (str | list, optional): restrict to these variable(s); default is
                every unique variable in the dataset.
            ncols (int): panels per row for the ``facet="location"`` grid.
            sharex (bool): for ``facet="location"``, share the x-axis across all panels so
                every row and column is aligned to the same (full) time span - the
                longest-running series sets the extent, and empty panels span it too.
            include_outliers (bool): Whether to include outliers in the plot.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot().
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend().

        Returns:
            ``(fig, axes)`` for ``facet="variable"``; a ``{variable: (fig, axes)}`` dict
            for ``facet="location"``.
        """
        variables = (
            [variable]
            if isinstance(variable, str)
            else list(variable)
            if variable is not None
            else sorted({ts.variable for ts in self.timeseries if ts is not None})
        )

        if facet == "variable":
            return self._plot_by_variable(
                variables, include_outliers, plot_kwargs, legend_kwargs
            )
        if facet == "location":
            return self._plot_by_location(
                variables, ncols, sharex, include_outliers, plot_kwargs, legend_kwargs
            )

        message = f"facet must be 'variable' or 'location', got {facet!r}."
        raise ValueError(message)

    def _plot_by_variable(
        self,
        variables: list,
        include_outliers: bool,
        plot_kwargs: dict[str, Any] | None,
        legend_kwargs: dict[str, Any] | None,
    ) -> tuple[Figure, list]:
        """One subplot per variable, every location overlaid (see :meth:`plot`)."""
        fig, axs = plt.subplots(
            len(variables),
            1,
            figsize=(10, 5 * len(variables)),
            sharex=True,
            squeeze=False,
        )
        axes = list(axs.ravel())
        for ax, var in zip(axes, variables, strict=False):
            for ts in self.timeseries:
                if ts is not None and ts.variable == var and len(ts.ts) > 0:
                    ts.plot(
                        include_outliers=include_outliers,
                        ax=ax,
                        plot_kwargs=plot_kwargs,
                        legend_kwargs=legend_kwargs,
                    )
            ax.set_title(f"Timeseries for {var.capitalize()}")
            ax.set_xlabel("Time")
        fig.tight_layout()
        return fig, axes

    def _series_at(self, location: str, variable: str) -> list:
        """Non-empty timeseries at a given location and variable."""
        return [
            ts
            for ts in self.timeseries
            if ts is not None
            and ts.location == location
            and ts.variable == variable
            and len(ts.ts) > 0
        ]

    def _draw_location_panel(
        self,
        ax: Axes,
        series: list,
        include_outliers: bool,
        plot_kwargs: dict[str, Any],
        legend_kwargs: dict[str, Any],
    ) -> None:
        """Draw one location panel: overlay its series, style ticks, legend if shared."""
        for ts in series:
            ax.plot(ts.ts.index, ts.ts.to_numpy(), label=ts.sensor, **plot_kwargs)
            if include_outliers and ts.outliers is not None and len(ts.outliers) > 0:
                ax.scatter(ts.outliers.index, ts.outliers, color="red", s=5)
        ax.tick_params(labelsize=6)
        for label in ax.get_xticklabels():
            label.set_rotation(45)
            label.set_horizontalalignment("right")
        if len(series) > 1:  # only label sensors when they share a panel
            ax.legend(**legend_kwargs)

    def _plot_by_location(
        self,
        variables: list,
        ncols: int,
        sharex: bool,
        include_outliers: bool,
        plot_kwargs: dict[str, Any] | None,
        legend_kwargs: dict[str, Any] | None,
    ) -> dict[str, tuple[Figure, list]]:
        """A grid of one panel per location, a figure per variable (see :meth:`plot`)."""
        locations = self.get_locations()
        nrows = (len(locations) + ncols - 1) // ncols if locations else 1
        pkw = {"lw": 0.7, **(plot_kwargs or {})}
        lkw = {"fontsize": 7, **(legend_kwargs or {})}
        results: dict[str, tuple[Figure, list]] = {}
        for var in variables:
            fig, axs = plt.subplots(
                nrows,
                ncols,
                figsize=(4 * ncols, 2.3 * nrows),
                squeeze=False,
                sharex=sharex,
            )
            axes = list(axs.ravel())
            for ax, loc in zip(axes, locations, strict=False):
                ax.set_title(
                    loc, fontsize=8
                )  # every location keeps a panel, even if empty
                self._draw_location_panel(
                    ax, self._series_at(loc, var), include_outliers, pkw, lkw
                )
            for ax in axes[len(locations) :]:
                ax.set_visible(False)  # hide unused trailing cells
            fig.suptitle(f"{var.capitalize()} by location", fontsize=13)
            fig.tight_layout(rect=(0, 0, 1, 0.98))  # leave room for the suptitle
            results[var] = (fig, axes)
        return results
coverage: Coverage property

Coverage summary of the dataset.

Renders as a per-timeseries table (records and time span per location / variable / sensor) and exposes :meth:Coverage.plot for a coverage timeline.

Examples:

>>> ds.coverage          # the table
>>> ds.coverage.plot()   # the timeline
info: pd.DataFrame property

Per-timeseries metadata summary, rendered as a table.

One row per timeseries — location, variable, sensor, the number of records, and the start / end of its time span. A quick look at what a Dataset holds before processing it (the default repr only shows the timeseries count). See :attr:coverage for a plottable version and :func:gensor.diff to line this up across datasets.

Examples:

>>> ds.info
loc: DatasetIndexer property

Label-based selection applied to every timeseries in the dataset.

ds.loc[start:end] returns a new Dataset where each timeseries is sliced by .loc[start:end] (e.g. a date range), forwarding the key to each series' own pandas .loc. Empty slices yield empty timeseries (every series is kept).

Examples:

>>> ds.loc["2021-01-01":"2021-12-31"]
__contains__(location)

Return True if any timeseries in the dataset has the given location.

Source code in gensor/core/dataset.py
190
191
192
def __contains__(self, location: object) -> bool:
    """Return True if any timeseries in the dataset has the given location."""
    return any(ts is not None and ts.location == location for ts in self.timeseries)
__getitem__(key)

Retrieve Timeseries by integer index, location name, or (location, variable[, unit]) tuple.

  • dataset[0] returns the Timeseries at that position (a reference).
  • dataset["PB01A"] returns the Timeseries at that location, or a Dataset if the location has several timeseries (e.g. pressure and temperature). A list of names (dataset[["PB01A", "PB02A"]]) always returns a Dataset.
  • dataset["PB01A", "pressure"] (or ["PB01A", "pressure", "cmh2o"]) narrows by variable/unit, returning a single Timeseries when one matches. For full control use :meth:filter / :meth:one.

Warning

Integer indexing returns a reference to the timeseries. Location / tuple indexing returns copies (it delegates to .filter()).

Parameters:

Name Type Description Default
key int | str | list | tuple

Position, location name, list of names, or a (location, variable[, unit]) tuple.

required

Returns:

Type Description
T | None | Dataset

Timeseries | Dataset: The matching timeseries or a dataset of them.

Raises:

Type Description
IndexOutOfRangeError

If an integer index is out of range.

KeyError

If no timeseries matches the given location(s)/filters.

Source code in gensor/core/dataset.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def __getitem__(self, key: int | str | list | tuple) -> T | None | Dataset:
    """Retrieve Timeseries by integer index, location name, or (location,
    variable[, unit]) tuple.

    - ``dataset[0]`` returns the Timeseries at that position (a reference).
    - ``dataset["PB01A"]`` returns the Timeseries at that location, or a
      Dataset if the location has several timeseries (e.g. pressure and
      temperature). A list of names (``dataset[["PB01A", "PB02A"]]``) always
      returns a Dataset.
    - ``dataset["PB01A", "pressure"]`` (or ``["PB01A", "pressure", "cmh2o"]``)
      narrows by variable/unit, returning a single Timeseries when one matches.
      For full control use :meth:`filter` / :meth:`one`.

    !!! warning
        Integer indexing returns a reference to the timeseries. Location /
        tuple indexing returns copies (it delegates to ``.filter()``).

    Parameters:
        key (int | str | list | tuple): Position, location name, list of
            names, or a (location, variable[, unit]) tuple.

    Returns:
        Timeseries | Dataset: The matching timeseries or a dataset of them.

    Raises:
        IndexOutOfRangeError: If an integer index is out of range.
        KeyError: If no timeseries matches the given location(s)/filters.
    """
    if isinstance(key, tuple):
        location, variable, unit = (*key, None, None)[:3]
        result = self.filter(location=location, variable=variable, unit=unit)
        if isinstance(result, Dataset) and len(result) == 0:
            message = f"No timeseries found for {key!r}."
            raise KeyError(message)
        return result

    if isinstance(key, str | list):
        result = self.filter(location=key)
        if isinstance(result, Dataset) and len(result) == 0:
            message = f"No timeseries found for location(s) {key!r}."
            raise KeyError(message)
        return result

    try:
        return self.timeseries[key]
    except IndexError:
        raise IndexOutOfRangeError(key, len(self)) from None
__iter__()

Allows to iterate directly over the dataset.

Source code in gensor/core/dataset.py
131
132
133
def __iter__(self) -> Any:
    """Allows to iterate directly over the dataset."""
    return iter(self.timeseries)
__len__()

Gives the number of timeseries in the Dataset.

Source code in gensor/core/dataset.py
135
136
137
def __len__(self) -> int:
    """Gives the number of timeseries in the Dataset."""
    return len(self.timeseries)
add(other)

Appends new Timeseries to the Dataset.

If an equal Timeseries already exists, merge the new data into the existing Timeseries, dropping duplicate timestamps.

Parameters:

Name Type Description Default
other Timeseries

The Timeseries object to add.

required
Source code in gensor/core/dataset.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def add(self, other: T | list[T] | Dataset) -> Dataset:
    """Appends new Timeseries to the Dataset.

    If an equal Timeseries already exists, merge the new data into the existing
    Timeseries, dropping duplicate timestamps.

    Parameters:
        other (Timeseries): The Timeseries object to add.
    """

    # I need to check for BaseTimeseries instance in the add() method, but also
    # type hint VarType T.
    if isinstance(other, list | Dataset):
        for ts in other:
            if isinstance(ts, BaseTimeseries):
                self._add_single_timeseries(ts)  # type: ignore[arg-type]

    elif isinstance(other, BaseTimeseries):
        self._add_single_timeseries(other)

    return self
diff(*others, labels=None, key=('location', 'variable'))

Compare this dataset's coverage with one or more others.

Convenience wrapper over :func:gensor.diff. labels names this dataset and the others (default ds0, ds1 ...).

Examples:

>>> raw.diff(trimmed, labels=["raw", "trimmed"]).plot()
Source code in gensor/core/dataset.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def diff(
    self,
    *others: Dataset,
    labels: list[str] | None = None,
    key: tuple[str, ...] = ("location", "variable"),
) -> CoverageDiff:
    """Compare this dataset's coverage with one or more others.

    Convenience wrapper over :func:`gensor.diff`. ``labels`` names this dataset
    and the others (default ``ds0``, ``ds1`` ...).

    Examples:
        >>> raw.diff(trimmed, labels=["raw", "trimmed"]).plot()  # doctest: +SKIP
    """
    datasets = [self, *others]
    if labels is None:
        labels = [f"ds{i}" for i in range(len(datasets))]
    return diff(dict(zip(labels, datasets, strict=True)), key=key)
filter(*predicates, location=None, variable=None, unit=None, **kwargs)

Return a Timeseries or a new Dataset filtered by station, sensor, and/or variable.

Any of location/variable/unit (and the keyword attributes) may be a single value or a list of values, matching a timeseries when its attribute equals (or is in) the given value(s).

Prefix a value with ~ to negate it - drop timeseries with that value rather than keep them (e.g. location="~PB16D" keeps everything except PB16D; sensor="~AV319" drops just that sensor). Positive and negated values may be mixed within one attribute and across attributes; for a given attribute a timeseries is kept when its value is in the positives (if any are given) and not in the negatives, and attributes are AND-ed together.

For conditions the per-attribute keywords can't express - notably a combined match across attributes - pass one or more :class:Where predicates positionally. filter(~Where(location="PB03B", sensor="AV319")) drops only that sensor at that location (the whole combination negated as a unit), while filter(Where(location="PB16A") | Where(location="PB16B")) keeps either. Predicates are AND-ed with the keyword filters.

Parameters:

Name Type Description Default
*predicates Where

Predicate objects; all must match for a timeseries to be kept (combine with & | ~).

()
location str | list

The location name(s); ~ negates.

None
variable str | list

The variable(s) being measured; ~ negates.

None
unit str | list

Unit(s) of the measurement; ~ negates.

None
**kwargs str | list

Attributes of subclassed timeseries used for filtering (e.g., sensor, method); ~ negates.

{}

Returns:

Type Description
T | Dataset

Timeseries | Dataset: A single Timeseries if exactly one match is found, or a new Dataset if multiple matches are found.

Source code in gensor/core/dataset.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def filter(
    self,
    *predicates: Where,
    location: str | list | None = None,
    variable: str | list | None = None,
    unit: str | list | None = None,
    **kwargs: str | list,
) -> T | Dataset:
    """Return a Timeseries or a new Dataset filtered by station, sensor,
    and/or variable.

    Any of ``location``/``variable``/``unit`` (and the keyword attributes) may be
    a single value or a list of values, matching a timeseries when its attribute
    equals (or is in) the given value(s).

    Prefix a value with ``~`` to *negate* it - drop timeseries with that value
    rather than keep them (e.g. ``location="~PB16D"`` keeps everything except
    PB16D; ``sensor="~AV319"`` drops just that sensor). Positive and negated
    values may be mixed within one attribute and across attributes; for a given
    attribute a timeseries is kept when its value is in the positives (if any are
    given) **and** not in the negatives, and attributes are AND-ed together.

    For conditions the per-attribute keywords can't express - notably a *combined*
    match across attributes - pass one or more :class:`Where` predicates
    positionally. ``filter(~Where(location="PB03B", sensor="AV319"))`` drops only that
    sensor at that location (the whole combination negated as a unit), while
    ``filter(Where(location="PB16A") | Where(location="PB16B"))`` keeps either.
    Predicates are AND-ed with the keyword filters.

    Parameters:
        *predicates (Where): Predicate objects; all must match for a timeseries to
            be kept (combine with ``& | ~``).
        location (str | list, optional): The location name(s); ``~`` negates.
        variable (str | list, optional): The variable(s) being measured; ``~`` negates.
        unit (str | list, optional): Unit(s) of the measurement; ``~`` negates.
        **kwargs (str | list): Attributes of subclassed timeseries used for
            filtering (e.g., sensor, method); ``~`` negates.

    Returns:
        Timeseries | Dataset: A single Timeseries if exactly one match is found,
                               or a new Dataset if multiple matches are found.
    """
    keep = self._matcher(predicates, location, variable, unit, kwargs)
    matching_timeseries = [ts for ts in self.timeseries if keep(ts)]

    if not matching_timeseries:
        return Dataset()

    if len(matching_timeseries) == 1:
        return matching_timeseries[0].model_copy(deep=True)

    return self.model_copy(update={"timeseries": matching_timeseries})
get_locations()

List all unique locations in the dataset, preserving first-seen order.

Source code in gensor/core/dataset.py
194
195
196
197
198
199
200
def get_locations(self) -> list:
    """List all unique locations in the dataset, preserving first-seen order."""
    locations: list = []
    for ts in self.timeseries:
        if ts is not None and ts.location not in locations:
            locations.append(ts.location)
    return locations
one(**filters)

Return exactly one matching Timeseries.

A convenience over :meth:filter for when a single result is expected: it always returns a Timeseries (never a Dataset) and raises if zero or more than one timeseries match - avoiding the "is it a Timeseries or a Dataset?" ambiguity of :meth:filter / dataset[name].

Parameters:

Name Type Description Default
**filters Any

Same keyword filters as :meth:filter (location, variable, unit, sensor, ...).

{}

Returns:

Name Type Description
Timeseries T

The single matching timeseries.

Raises:

Type Description
ValueError

If zero or more than one timeseries match the filters.

Source code in gensor/core/dataset.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def one(self, **filters: Any) -> T:
    """Return exactly one matching Timeseries.

    A convenience over :meth:`filter` for when a single result is expected:
    it always returns a Timeseries (never a Dataset) and raises if zero or
    more than one timeseries match - avoiding the "is it a Timeseries or a
    Dataset?" ambiguity of :meth:`filter` / ``dataset[name]``.

    Parameters:
        **filters: Same keyword filters as :meth:`filter` (location,
            variable, unit, sensor, ...).

    Returns:
        Timeseries: The single matching timeseries.

    Raises:
        ValueError: If zero or more than one timeseries match the filters.
    """
    result = self.filter(**filters)
    if isinstance(result, BaseTimeseries):
        return result

    count = len(result)
    message = f"Expected exactly one timeseries matching {filters}, found {count}."
    raise ValueError(message)
plot(facet='variable', variable=None, ncols=5, sharex=False, include_outliers=False, plot_kwargs=None, legend_kwargs=None)

Plot the dataset's timeseries, in one of two layouts.

  • facet="variable" (default): one subplot per variable (pressure, temperature, ...), every location's series overlaid on that axis. Returns (fig, axes) where axes is a list (one per variable).
  • facet="location": a separate figure per variable, each a grid with one panel per location (ncols wide). Every location gets a panel - left empty if it has no (or empty) series for that variable - and unused trailing cells are hidden. Multiple sensors at a location are overlaid in the same panel, and a legend (labelled by sensor serial) is shown only then; single-series panels get no legend. Panels are titled by location and carry no x-label (the dates are on the shared/rotated ticks). Returns {variable: (fig, axes)}.

Parameters:

Name Type Description Default
facet str

"variable" or "location".

'variable'
variable str | list

restrict to these variable(s); default is every unique variable in the dataset.

None
ncols int

panels per row for the facet="location" grid.

5
sharex bool

for facet="location", share the x-axis across all panels so every row and column is aligned to the same (full) time span - the longest-running series sets the extent, and empty panels span it too.

False
include_outliers bool

Whether to include outliers in the plot.

False
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot().

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend().

None

Returns:

Type Description
tuple[Figure, list] | dict[str, tuple[Figure, list]]

(fig, axes) for facet="variable"; a {variable: (fig, axes)} dict

tuple[Figure, list] | dict[str, tuple[Figure, list]]

for facet="location".

Source code in gensor/core/dataset.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def plot(
    self,
    facet: str = "variable",
    variable: str | list | None = None,
    ncols: int = 5,
    sharex: bool = False,
    include_outliers: bool = False,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, list] | dict[str, tuple[Figure, list]]:
    """Plot the dataset's timeseries, in one of two layouts.

    - ``facet="variable"`` (default): one subplot per variable (pressure,
      temperature, ...), every location's series overlaid on that axis. Returns
      ``(fig, axes)`` where ``axes`` is a list (one per variable).
    - ``facet="location"``: a **separate figure per variable**, each a grid with one
      panel per location (``ncols`` wide). Every location gets a panel - left empty
      if it has no (or empty) series for that variable - and unused trailing cells are
      hidden. Multiple sensors at a location are overlaid in the same panel, and a
      legend (labelled by **sensor serial**) is shown only then; single-series panels
      get no legend. Panels are titled by location and carry no x-label (the dates are
      on the shared/rotated ticks). Returns ``{variable: (fig, axes)}``.

    Parameters:
        facet (str): ``"variable"`` or ``"location"``.
        variable (str | list, optional): restrict to these variable(s); default is
            every unique variable in the dataset.
        ncols (int): panels per row for the ``facet="location"`` grid.
        sharex (bool): for ``facet="location"``, share the x-axis across all panels so
            every row and column is aligned to the same (full) time span - the
            longest-running series sets the extent, and empty panels span it too.
        include_outliers (bool): Whether to include outliers in the plot.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot().
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend().

    Returns:
        ``(fig, axes)`` for ``facet="variable"``; a ``{variable: (fig, axes)}`` dict
        for ``facet="location"``.
    """
    variables = (
        [variable]
        if isinstance(variable, str)
        else list(variable)
        if variable is not None
        else sorted({ts.variable for ts in self.timeseries if ts is not None})
    )

    if facet == "variable":
        return self._plot_by_variable(
            variables, include_outliers, plot_kwargs, legend_kwargs
        )
    if facet == "location":
        return self._plot_by_location(
            variables, ncols, sharex, include_outliers, plot_kwargs, legend_kwargs
        )

    message = f"facet must be 'variable' or 'location', got {facet!r}."
    raise ValueError(message)
pop(*predicates, location=None, variable=None, unit=None, **kwargs)

Remove and return the matching timeseries, mutating the Dataset in place.

Selection works exactly like :meth:filter (same location / variable / unit / keyword filters, ~ negation, and :class:Where predicates), but the matched timeseries are removed from this Dataset and returned by reference (not copied) - so you can alter them and add() them back in their new form::

ts = ds.pop(location="PB03B", sensor="AV319")   # taken out of ds
ts.ts = ts.ts - 300                             # edit the live series
ds.add(ts)                                       # put it back, changed

Parameters:

Name Type Description Default
*predicates Where

Predicate objects; all must match (combine with & | ~).

()
location str | list

The location name(s); ~ negates.

None
variable str | list

The variable(s) being measured; ~ negates.

None
unit str | list

Unit(s) of the measurement; ~ negates.

None
**kwargs str | list

Other timeseries attributes to match (e.g., sensor).

{}

Returns:

Type Description
T | Dataset

Timeseries | Dataset: A single Timeseries if exactly one match is removed, a new Dataset of them if several match, or an empty Dataset if none match (in which case nothing is removed).

Source code in gensor/core/dataset.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def pop(
    self,
    *predicates: Where,
    location: str | list | None = None,
    variable: str | list | None = None,
    unit: str | list | None = None,
    **kwargs: str | list,
) -> T | Dataset:
    """Remove and return the matching timeseries, mutating the Dataset in place.

    Selection works exactly like :meth:`filter` (same ``location`` / ``variable`` /
    ``unit`` / keyword filters, ``~`` negation, and :class:`Where` predicates), but
    the matched timeseries are **removed** from this Dataset and returned **by
    reference** (not copied) - so you can alter them and ``add()`` them back in their
    new form::

        ts = ds.pop(location="PB03B", sensor="AV319")   # taken out of ds
        ts.ts = ts.ts - 300                             # edit the live series
        ds.add(ts)                                       # put it back, changed

    Parameters:
        *predicates (Where): Predicate objects; all must match (combine with ``& | ~``).
        location (str | list, optional): The location name(s); ``~`` negates.
        variable (str | list, optional): The variable(s) being measured; ``~`` negates.
        unit (str | list, optional): Unit(s) of the measurement; ``~`` negates.
        **kwargs (str | list): Other timeseries attributes to match (e.g., sensor).

    Returns:
        Timeseries | Dataset: A single Timeseries if exactly one match is removed, a
            new Dataset of them if several match, or an empty Dataset if none match
            (in which case nothing is removed).
    """
    keep = self._matcher(predicates, location, variable, unit, kwargs)

    popped: list[T | None] = []
    remaining: list[T | None] = []
    for ts in self.timeseries:
        (popped if keep(ts) else remaining).append(ts)

    self.timeseries = remaining

    if not popped:
        return Dataset()
    if len(popped) == 1:
        return popped[0]
    return Dataset(timeseries=popped)
to_sql(db)

Save the entire timeseries to a SQLite database.

Parameters:

Name Type Description Default
db DatabaseConnection

SQLite database connection object.

required
Source code in gensor/core/dataset.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def to_sql(self, db: DatabaseConnection) -> None:
    """Save the entire timeseries to a SQLite database.

    Parameters:
        db (DatabaseConnection): SQLite database connection object.
    """
    for ts in self.timeseries:
        if ts is None:
            continue
        if len(ts.ts) == 0:
            logger.info(
                f"Skipping empty timeseries (location={ts.location!r}) - "
                "nothing to write to the database."
            )
            continue
        ts.to_sql(db)
    return

DatasetIndexer

Applies a pandas .loc selection to every Timeseries in a Dataset.

Returned by :attr:Dataset.loc. ds.loc[start:end] slices each timeseries by label (e.g. a date range) via its own .loc and returns a new Dataset of the results. Intended for label slices; a key that selects a single scalar from a timeseries (a point lookup) is rejected, since the per-series scalars can't form a Dataset.

Source code in gensor/core/dataset.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class DatasetIndexer:
    """Applies a pandas ``.loc`` selection to every Timeseries in a Dataset.

    Returned by :attr:`Dataset.loc`. ``ds.loc[start:end]`` slices each timeseries by label
    (e.g. a date range) via its own ``.loc`` and returns a new Dataset of the results.
    Intended for label slices; a key that selects a single scalar from a timeseries (a
    point lookup) is rejected, since the per-series scalars can't form a Dataset.
    """

    def __init__(self, parent: Dataset) -> None:
        self.parent = parent

    def __getitem__(self, key: Any) -> Dataset:
        sliced: list = []
        for ts in self.parent.timeseries:
            if ts is None:
                sliced.append(None)
                continue
            result = ts.loc[key]
            if not isinstance(result, BaseTimeseries):
                message = (
                    "Dataset.loc expects a label slice (e.g. ds.loc[start:end]); "
                    f"key {key!r} selected a scalar from a timeseries."
                )
                raise TypeError(message)
            sliced.append(result)
        return self.parent.model_copy(update={"timeseries": sliced}, deep=False)

Where

A composable predicate over a Timeseries' attributes, for Dataset.filter/drop.

A leaf Where(**conditions) matches a Timeseries when every condition holds; each condition matches when the timeseries' attribute equals (or is in, for a list) the given value(s), and a leading ~ on a value negates that single condition. Compose leaves with & (and), | (or) and ~ (not) to express anything the per-attribute keyword filters can't - in particular a combined exclusion::

~Where(location="PB03B", sensor="AV319")            # not (PB03B and AV319)
Where(variable="pressure") & ~Where(location="PB16D")
Where(location="PB16A") | Where(location="PB16B")

Pass instances straight to Dataset.filter (keep matches) or Dataset.drop (remove matches); they are AND-ed with the keyword filters in the same call.

Source code in gensor/core/dataset.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class Where:
    """A composable predicate over a Timeseries' attributes, for ``Dataset.filter``/``drop``.

    A leaf ``Where(**conditions)`` matches a Timeseries when **every** condition holds;
    each condition matches when the timeseries' attribute equals (or is in, for a list)
    the given value(s), and a leading ``~`` on a value negates that single condition.
    Compose leaves with ``&`` (and), ``|`` (or) and ``~`` (not) to express anything the
    per-attribute keyword filters can't - in particular a *combined* exclusion::

        ~Where(location="PB03B", sensor="AV319")            # not (PB03B and AV319)
        Where(variable="pressure") & ~Where(location="PB16D")
        Where(location="PB16A") | Where(location="PB16B")

    Pass instances straight to ``Dataset.filter`` (keep matches) or ``Dataset.drop``
    (remove matches); they are AND-ed with the keyword filters in the same call.
    """

    def __init__(self, _test: Any = None, **conditions: str | list) -> None:
        self._conditions = conditions
        self._test = _test if _test is not None else self._compile(conditions)

    @staticmethod
    def _compile(conditions: dict) -> Any:
        specs = {attr: _split(value) for attr, value in conditions.items()}

        def test(ts: Any) -> bool:
            for attr, (include, exclude) in specs.items():
                if not hasattr(ts, attr):
                    message = (
                        f"'{ts.__class__.__name__}' object has no attribute '{attr}'"
                    )
                    raise AttributeError(message)
                actual = getattr(ts, attr)
                if (include and actual not in include) or actual in exclude:
                    return False
            return True

        return test

    def __call__(self, ts: Any) -> bool:
        return bool(self._test(ts))

    def __invert__(self) -> Where:
        return Where(_test=lambda ts: not self._test(ts))

    def __and__(self, other: Where) -> Where:
        return Where(_test=lambda ts: self._test(ts) and other(ts))

    def __or__(self, other: Where) -> Where:
        return Where(_test=lambda ts: self._test(ts) or other(ts))

    def __repr__(self) -> str:
        body = ", ".join(f"{k}={v!r}" for k, v in self._conditions.items())
        return f"Where({body})"

diff(datasets, key=('location', 'variable'))

Compare the coverage of two or more datasets.

Parameters:

Name Type Description Default
datasets dict[str, Dataset] | list[Dataset]

a mapping {label: Dataset} (preferred - labels name the columns and legend) or a list of datasets (auto-labelled ds0, ds1 ...).

required
key tuple[str, ...]

attributes used to align series across datasets (default ("location", "variable")).

('location', 'variable')

Returns:

Name Type Description
CoverageDiff CoverageDiff

renders as a comparison table; .plot() draws the timeline.

Source code in gensor/core/dataset.py
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
def diff(
    datasets: dict[str, Dataset] | list[Dataset],
    key: tuple[str, ...] = ("location", "variable"),
) -> CoverageDiff:
    """Compare the coverage of two or more datasets.

    Parameters:
        datasets: a mapping ``{label: Dataset}`` (preferred - labels name the columns
            and legend) or a list of datasets (auto-labelled ``ds0``, ``ds1`` ...).
        key: attributes used to align series across datasets (default
            ``("location", "variable")``).

    Returns:
        CoverageDiff: renders as a comparison table; ``.plot()`` draws the timeline.
    """
    if isinstance(datasets, Dataset):
        message = (
            "Pass two or more datasets to diff(), e.g. diff({'a': ds1, 'b': ds2})."
        )
        raise TypeError(message)
    if not isinstance(datasets, dict):
        datasets = {f"ds{i}": d for i, d in enumerate(datasets)}
    return CoverageDiff(datasets, key=key)

indexer

TimeseriesIndexer

A wrapper for the Pandas indexers (e.g., loc, iloc) to return Timeseries objects.

Source code in gensor/core/indexer.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class TimeseriesIndexer:
    """A wrapper for the Pandas indexers (e.g., loc, iloc) to return Timeseries objects."""

    # marked indexer as Any to silence mypy. BaseIndexer is normally not indexable:
    # the same for the `parent`. It should by always type Timeseries, but I don't want
    # to deal with circular imports just for type hints for the devs...

    def __init__(self, parent: Any, indexer: Any):
        self.parent = parent
        self.indexer = indexer

    def __getitem__(self, key: str) -> Any:
        """Allows using the indexer (e.g., loc) and wraps the result in the parent Timeseries."""

        result = self.indexer[key]

        if isinstance(result, pd.Series):
            return self.parent.model_copy(update={"ts": result}, deep=False)

        if isinstance(result, (int | float | str | pd.Timestamp | np.float64)):
            return result

        message = f"Expected pd.Series, but got {type(result)} instead."
        raise TypeError(message)

    def __setitem__(self, key: str, value: Any) -> None:
        """Allows setting values directly using the indexer (e.g., loc, iloc)."""

        self.indexer[key] = value
__getitem__(key)

Allows using the indexer (e.g., loc) and wraps the result in the parent Timeseries.

Source code in gensor/core/indexer.py
20
21
22
23
24
25
26
27
28
29
30
31
32
def __getitem__(self, key: str) -> Any:
    """Allows using the indexer (e.g., loc) and wraps the result in the parent Timeseries."""

    result = self.indexer[key]

    if isinstance(result, pd.Series):
        return self.parent.model_copy(update={"ts": result}, deep=False)

    if isinstance(result, (int | float | str | pd.Timestamp | np.float64)):
        return result

    message = f"Expected pd.Series, but got {type(result)} instead."
    raise TypeError(message)
__setitem__(key, value)

Allows setting values directly using the indexer (e.g., loc, iloc).

Source code in gensor/core/indexer.py
34
35
36
37
def __setitem__(self, key: str, value: Any) -> None:
    """Allows setting values directly using the indexer (e.g., loc, iloc)."""

    self.indexer[key] = value

timeseries

Timeseries

Bases: BaseTimeseries

Timeseries of groundwater sensor data.

Attributes:

Name Type Description
ts Series

The timeseries data.

variable Literal['temperature', 'pressure', 'conductivity', 'flux']

The type of the measurement.

unit Literal['degC', 'mmH2O', 'mS/cm', 'm/s']

The unit of the measurement.

sensor str

The serial number of the sensor.

sensor_alt float

Altitude of the sensor (ncessary to compute groundwater levels).

Source code in gensor/core/timeseries.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Timeseries(BaseTimeseries):
    """Timeseries of groundwater sensor data.

    Attributes:
        ts (pd.Series): The timeseries data.
        variable (Literal['temperature', 'pressure', 'conductivity', 'flux']):
            The type of the measurement.
        unit (Literal['degC', 'mmH2O', 'mS/cm', 'm/s']): The unit of
            the measurement.
        sensor (str): The serial number of the sensor.
        sensor_alt (float): Altitude of the sensor (ncessary to compute groundwater levels).
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    sensor: str | None = None
    sensor_alt: float | None = None

    def __eq__(self, other: object) -> bool:
        """Check equality based on location, sensor, variable, unit and sensor_alt."""
        if not isinstance(other, Timeseries):
            return NotImplemented

        if not super().__eq__(other):
            return False

        return self.sensor == other.sensor and self.sensor_alt == other.sensor_alt

    def plot(
        self,
        include_outliers: bool = False,
        ax: Axes | None = None,
        plot_kwargs: dict[str, Any] | None = None,
        legend_kwargs: dict[str, Any] | None = None,
    ) -> tuple[Figure, Axes]:
        """Plots the timeseries data.

        Parameters:
            include_outliers (bool): Whether to include outliers in the plot.
            ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
                If None, a new figure and axes are created.
            plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
            legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

        Returns:
            (fig, ax): Matplotlib figure and axes to allow further customization.
        """
        fig, ax = super().plot(
            include_outliers=include_outliers,
            ax=ax,
            plot_kwargs=plot_kwargs,
            legend_kwargs=legend_kwargs,
        )

        ax.set_title(f"{self.variable.capitalize()} at {self.location} ({self.sensor})")

        return fig, ax
__eq__(other)

Check equality based on location, sensor, variable, unit and sensor_alt.

Source code in gensor/core/timeseries.py
40
41
42
43
44
45
46
47
48
def __eq__(self, other: object) -> bool:
    """Check equality based on location, sensor, variable, unit and sensor_alt."""
    if not isinstance(other, Timeseries):
        return NotImplemented

    if not super().__eq__(other):
        return False

    return self.sensor == other.sensor and self.sensor_alt == other.sensor_alt
plot(include_outliers=False, ax=None, plot_kwargs=None, legend_kwargs=None)

Plots the timeseries data.

Parameters:

Name Type Description Default
include_outliers bool

Whether to include outliers in the plot.

False
ax Axes

Matplotlib axes object to plot on. If None, a new figure and axes are created.

None
plot_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.

None
legend_kwargs dict[str, Any] | None

kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

None

Returns:

Type Description
(fig, ax)

Matplotlib figure and axes to allow further customization.

Source code in gensor/core/timeseries.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def plot(
    self,
    include_outliers: bool = False,
    ax: Axes | None = None,
    plot_kwargs: dict[str, Any] | None = None,
    legend_kwargs: dict[str, Any] | None = None,
) -> tuple[Figure, Axes]:
    """Plots the timeseries data.

    Parameters:
        include_outliers (bool): Whether to include outliers in the plot.
        ax (matplotlib.axes.Axes, optional): Matplotlib axes object to plot on.
            If None, a new figure and axes are created.
        plot_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.plot() method to customize the plot.
        legend_kwargs (dict[str, Any] | None): kwargs passed to matplotlib.axes.Axes.legend() to customize the legend.

    Returns:
        (fig, ax): Matplotlib figure and axes to allow further customization.
    """
    fig, ax = super().plot(
        include_outliers=include_outliers,
        ax=ax,
        plot_kwargs=plot_kwargs,
        legend_kwargs=legend_kwargs,
    )

    ax.set_title(f"{self.variable.capitalize()} at {self.location} ({self.sensor})")

    return fig, ax

db

DB

Module handling database connection in case saving and loading from SQLite database is used.

Modules:

connection.py

DatabaseConnection

Bases: BaseModel

Database connection object. If no database exists at the specified path, it will be created. If no database is specified, an in-memory database will be used.

Attributes metadata (MetaData): SQLAlchemy metadata object. db_directory (Path): Path to the database to connect to. db_name (str): Name for the database to connect to. engine (Engine | None): SQLAlchemy Engine instance.

Source code in gensor/db/connection.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class DatabaseConnection(pyd.BaseModel):
    """Database connection object.
    If no database exists at the specified path, it will be created.
    If no database is specified, an in-memory database will be used.

    Attributes
        metadata (MetaData): SQLAlchemy metadata object.
        db_directory (Path): Path to the database to connect to.
        db_name (str): Name for the database to connect to.
        engine (Engine | None): SQLAlchemy Engine instance.
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    metadata: MetaData = MetaData()
    db_directory: Path = Path.cwd()
    db_name: str = "gensor.db"
    engine: Engine | None = None

    def _verify_path(self) -> str:
        """Verify database path."""

        if not self.db_directory.exists():
            raise DatabaseNotFound()
        return f"sqlite:///{self.db_directory}/{self.db_name}"

    def connect(self) -> Connection:
        """Connect to the database and initialize the engine.
        If engine is None > create it with verified path > reflect.
        After connecting, ensure the timeseries_metadata table is present.
        """
        if self.engine is None:
            sqlite_path = self._verify_path()
            self.engine = create_engine(sqlite_path)

        connection = self.engine.connect()

        self.create_metadata()

        return connection

    def dispose(self) -> None:
        """Dispose of the engine, closing all connections."""
        if self.metadata:
            self.metadata.clear()
        if self.engine:
            self.engine.dispose()

    def __enter__(self) -> Connection:
        """Enable usage in a `with` block by returning the engine."""
        con = self.connect()
        if self.engine:
            self.metadata.reflect(bind=self.engine)
        return con

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Dispose of the engine when exiting the `with` block."""
        self.dispose()

    def get_timeseries_metadata(
        self,
        location: str | None = None,
        variable: str | None = None,
        unit: str | None = None,
        **kwargs: dict,
    ) -> pd.DataFrame:
        """
        List timeseries available in the database.

        Parameters:
            location (str): Location attribute to match.
            variable (str): Variable attribute to match.
            unit (str): Unit attribute to match.
            **kwargs: Additional filters. Must match the attributes of the
                Timeseries instance user is trying to retrieve.

        Returns:
            pd.DataFrame: The name of the matching table or None if no table is found.
        """
        with self as con:
            if "__timeseries_metadata__" not in self.metadata.tables:
                logger.info("The metadata table does not exist in this database.")
                return pd.DataFrame()

            metadata_table = self.metadata.tables["__timeseries_metadata__"]

            base_filters = []

            if location is not None:
                base_filters.append(metadata_table.c.location.ilike(location))
            if variable is not None:
                base_filters.append(metadata_table.c.variable.ilike(variable))
            if unit is not None:
                base_filters.append(metadata_table.c.unit.ilike(unit))

            extra_filters = [
                func.json_extract(metadata_table.c.extra, f"$.{k}").ilike(v)
                for k, v in kwargs.items()
                if v is not None
            ]

            # True in and_(True, *arg) fixis FutureWarning of dissallowing empty
            # filters in the future.
            query = metadata_table.select().where(
                and_(True, *base_filters, *extra_filters)
            )

            result = con.execute(query).fetchall()

            return pd.DataFrame(result).set_index("id") if result else pd.DataFrame()

    def create_metadata(self) -> Table | None:
        """Create a metadata table if it doesn't exist yet and store ts metadata."""

        metadata_table = Table(
            "__timeseries_metadata__",
            self.metadata,
            Column("id", Integer, primary_key=True),
            Column("table_name", String, unique=True),
            Column("location", String),
            Column("variable", String),
            Column("unit", String),
            Column("start", String, nullable=True),
            Column("end", String, nullable=True),
            Column("extra", JSON, nullable=True),
            Column("cls", String, nullable=False),
        )

        if self.engine:
            metadata_table.create(self.engine, checkfirst=True)
            self.metadata.reflect(bind=self.engine)
            return metadata_table
        else:
            logger.info("Engine does not exist.")
            return None

    def create_table(self, schema_name: str, column_name: str) -> Table | None:
        """Create a table in the database.

        Schema name is a string representing the location, sensor, variable measured and
        unit of measurement. This is a way of preserving the metadata of the Timeseries.
        The index is always `timestamp` and the column name is dynamicly create from
        the measured variable.
        """

        if schema_name in self.metadata.tables:
            return self.metadata.tables[schema_name]

        ts_table = Table(
            schema_name,
            self.metadata,
            Column("timestamp", String, primary_key=True),
            Column(column_name, Float),
            info={},
        )

        if self.engine:
            ts_table.create(self.engine, checkfirst=True)
            self.metadata.reflect(bind=self.engine)
            return ts_table
        else:
            logger.info("Engine does not exist.")
            return None

__enter__()

Enable usage in a with block by returning the engine.

Source code in gensor/db/connection.py
83
84
85
86
87
88
def __enter__(self) -> Connection:
    """Enable usage in a `with` block by returning the engine."""
    con = self.connect()
    if self.engine:
        self.metadata.reflect(bind=self.engine)
    return con

__exit__(exc_type, exc_val, exc_tb)

Dispose of the engine when exiting the with block.

Source code in gensor/db/connection.py
90
91
92
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Dispose of the engine when exiting the `with` block."""
    self.dispose()

connect()

Connect to the database and initialize the engine. If engine is None > create it with verified path > reflect. After connecting, ensure the timeseries_metadata table is present.

Source code in gensor/db/connection.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def connect(self) -> Connection:
    """Connect to the database and initialize the engine.
    If engine is None > create it with verified path > reflect.
    After connecting, ensure the timeseries_metadata table is present.
    """
    if self.engine is None:
        sqlite_path = self._verify_path()
        self.engine = create_engine(sqlite_path)

    connection = self.engine.connect()

    self.create_metadata()

    return connection

create_metadata()

Create a metadata table if it doesn't exist yet and store ts metadata.

Source code in gensor/db/connection.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def create_metadata(self) -> Table | None:
    """Create a metadata table if it doesn't exist yet and store ts metadata."""

    metadata_table = Table(
        "__timeseries_metadata__",
        self.metadata,
        Column("id", Integer, primary_key=True),
        Column("table_name", String, unique=True),
        Column("location", String),
        Column("variable", String),
        Column("unit", String),
        Column("start", String, nullable=True),
        Column("end", String, nullable=True),
        Column("extra", JSON, nullable=True),
        Column("cls", String, nullable=False),
    )

    if self.engine:
        metadata_table.create(self.engine, checkfirst=True)
        self.metadata.reflect(bind=self.engine)
        return metadata_table
    else:
        logger.info("Engine does not exist.")
        return None

create_table(schema_name, column_name)

Create a table in the database.

Schema name is a string representing the location, sensor, variable measured and unit of measurement. This is a way of preserving the metadata of the Timeseries. The index is always timestamp and the column name is dynamicly create from the measured variable.

Source code in gensor/db/connection.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def create_table(self, schema_name: str, column_name: str) -> Table | None:
    """Create a table in the database.

    Schema name is a string representing the location, sensor, variable measured and
    unit of measurement. This is a way of preserving the metadata of the Timeseries.
    The index is always `timestamp` and the column name is dynamicly create from
    the measured variable.
    """

    if schema_name in self.metadata.tables:
        return self.metadata.tables[schema_name]

    ts_table = Table(
        schema_name,
        self.metadata,
        Column("timestamp", String, primary_key=True),
        Column(column_name, Float),
        info={},
    )

    if self.engine:
        ts_table.create(self.engine, checkfirst=True)
        self.metadata.reflect(bind=self.engine)
        return ts_table
    else:
        logger.info("Engine does not exist.")
        return None

dispose()

Dispose of the engine, closing all connections.

Source code in gensor/db/connection.py
76
77
78
79
80
81
def dispose(self) -> None:
    """Dispose of the engine, closing all connections."""
    if self.metadata:
        self.metadata.clear()
    if self.engine:
        self.engine.dispose()

get_timeseries_metadata(location=None, variable=None, unit=None, **kwargs)

List timeseries available in the database.

Parameters:

Name Type Description Default
location str

Location attribute to match.

None
variable str

Variable attribute to match.

None
unit str

Unit attribute to match.

None
**kwargs dict

Additional filters. Must match the attributes of the Timeseries instance user is trying to retrieve.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The name of the matching table or None if no table is found.

Source code in gensor/db/connection.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def get_timeseries_metadata(
    self,
    location: str | None = None,
    variable: str | None = None,
    unit: str | None = None,
    **kwargs: dict,
) -> pd.DataFrame:
    """
    List timeseries available in the database.

    Parameters:
        location (str): Location attribute to match.
        variable (str): Variable attribute to match.
        unit (str): Unit attribute to match.
        **kwargs: Additional filters. Must match the attributes of the
            Timeseries instance user is trying to retrieve.

    Returns:
        pd.DataFrame: The name of the matching table or None if no table is found.
    """
    with self as con:
        if "__timeseries_metadata__" not in self.metadata.tables:
            logger.info("The metadata table does not exist in this database.")
            return pd.DataFrame()

        metadata_table = self.metadata.tables["__timeseries_metadata__"]

        base_filters = []

        if location is not None:
            base_filters.append(metadata_table.c.location.ilike(location))
        if variable is not None:
            base_filters.append(metadata_table.c.variable.ilike(variable))
        if unit is not None:
            base_filters.append(metadata_table.c.unit.ilike(unit))

        extra_filters = [
            func.json_extract(metadata_table.c.extra, f"$.{k}").ilike(v)
            for k, v in kwargs.items()
            if v is not None
        ]

        # True in and_(True, *arg) fixis FutureWarning of dissallowing empty
        # filters in the future.
        query = metadata_table.select().where(
            and_(True, *base_filters, *extra_filters)
        )

        result = con.execute(query).fetchall()

        return pd.DataFrame(result).set_index("id") if result else pd.DataFrame()

connection

Module defining database connection object.

Classes:

Name Description
DatabaseConnection

Database connection object

DatabaseConnection

Bases: BaseModel

Database connection object. If no database exists at the specified path, it will be created. If no database is specified, an in-memory database will be used.

Attributes metadata (MetaData): SQLAlchemy metadata object. db_directory (Path): Path to the database to connect to. db_name (str): Name for the database to connect to. engine (Engine | None): SQLAlchemy Engine instance.

Source code in gensor/db/connection.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class DatabaseConnection(pyd.BaseModel):
    """Database connection object.
    If no database exists at the specified path, it will be created.
    If no database is specified, an in-memory database will be used.

    Attributes
        metadata (MetaData): SQLAlchemy metadata object.
        db_directory (Path): Path to the database to connect to.
        db_name (str): Name for the database to connect to.
        engine (Engine | None): SQLAlchemy Engine instance.
    """

    model_config = pyd.ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True
    )

    metadata: MetaData = MetaData()
    db_directory: Path = Path.cwd()
    db_name: str = "gensor.db"
    engine: Engine | None = None

    def _verify_path(self) -> str:
        """Verify database path."""

        if not self.db_directory.exists():
            raise DatabaseNotFound()
        return f"sqlite:///{self.db_directory}/{self.db_name}"

    def connect(self) -> Connection:
        """Connect to the database and initialize the engine.
        If engine is None > create it with verified path > reflect.
        After connecting, ensure the timeseries_metadata table is present.
        """
        if self.engine is None:
            sqlite_path = self._verify_path()
            self.engine = create_engine(sqlite_path)

        connection = self.engine.connect()

        self.create_metadata()

        return connection

    def dispose(self) -> None:
        """Dispose of the engine, closing all connections."""
        if self.metadata:
            self.metadata.clear()
        if self.engine:
            self.engine.dispose()

    def __enter__(self) -> Connection:
        """Enable usage in a `with` block by returning the engine."""
        con = self.connect()
        if self.engine:
            self.metadata.reflect(bind=self.engine)
        return con

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Dispose of the engine when exiting the `with` block."""
        self.dispose()

    def get_timeseries_metadata(
        self,
        location: str | None = None,
        variable: str | None = None,
        unit: str | None = None,
        **kwargs: dict,
    ) -> pd.DataFrame:
        """
        List timeseries available in the database.

        Parameters:
            location (str): Location attribute to match.
            variable (str): Variable attribute to match.
            unit (str): Unit attribute to match.
            **kwargs: Additional filters. Must match the attributes of the
                Timeseries instance user is trying to retrieve.

        Returns:
            pd.DataFrame: The name of the matching table or None if no table is found.
        """
        with self as con:
            if "__timeseries_metadata__" not in self.metadata.tables:
                logger.info("The metadata table does not exist in this database.")
                return pd.DataFrame()

            metadata_table = self.metadata.tables["__timeseries_metadata__"]

            base_filters = []

            if location is not None:
                base_filters.append(metadata_table.c.location.ilike(location))
            if variable is not None:
                base_filters.append(metadata_table.c.variable.ilike(variable))
            if unit is not None:
                base_filters.append(metadata_table.c.unit.ilike(unit))

            extra_filters = [
                func.json_extract(metadata_table.c.extra, f"$.{k}").ilike(v)
                for k, v in kwargs.items()
                if v is not None
            ]

            # True in and_(True, *arg) fixis FutureWarning of dissallowing empty
            # filters in the future.
            query = metadata_table.select().where(
                and_(True, *base_filters, *extra_filters)
            )

            result = con.execute(query).fetchall()

            return pd.DataFrame(result).set_index("id") if result else pd.DataFrame()

    def create_metadata(self) -> Table | None:
        """Create a metadata table if it doesn't exist yet and store ts metadata."""

        metadata_table = Table(
            "__timeseries_metadata__",
            self.metadata,
            Column("id", Integer, primary_key=True),
            Column("table_name", String, unique=True),
            Column("location", String),
            Column("variable", String),
            Column("unit", String),
            Column("start", String, nullable=True),
            Column("end", String, nullable=True),
            Column("extra", JSON, nullable=True),
            Column("cls", String, nullable=False),
        )

        if self.engine:
            metadata_table.create(self.engine, checkfirst=True)
            self.metadata.reflect(bind=self.engine)
            return metadata_table
        else:
            logger.info("Engine does not exist.")
            return None

    def create_table(self, schema_name: str, column_name: str) -> Table | None:
        """Create a table in the database.

        Schema name is a string representing the location, sensor, variable measured and
        unit of measurement. This is a way of preserving the metadata of the Timeseries.
        The index is always `timestamp` and the column name is dynamicly create from
        the measured variable.
        """

        if schema_name in self.metadata.tables:
            return self.metadata.tables[schema_name]

        ts_table = Table(
            schema_name,
            self.metadata,
            Column("timestamp", String, primary_key=True),
            Column(column_name, Float),
            info={},
        )

        if self.engine:
            ts_table.create(self.engine, checkfirst=True)
            self.metadata.reflect(bind=self.engine)
            return ts_table
        else:
            logger.info("Engine does not exist.")
            return None
__enter__()

Enable usage in a with block by returning the engine.

Source code in gensor/db/connection.py
83
84
85
86
87
88
def __enter__(self) -> Connection:
    """Enable usage in a `with` block by returning the engine."""
    con = self.connect()
    if self.engine:
        self.metadata.reflect(bind=self.engine)
    return con
__exit__(exc_type, exc_val, exc_tb)

Dispose of the engine when exiting the with block.

Source code in gensor/db/connection.py
90
91
92
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Dispose of the engine when exiting the `with` block."""
    self.dispose()
connect()

Connect to the database and initialize the engine. If engine is None > create it with verified path > reflect. After connecting, ensure the timeseries_metadata table is present.

Source code in gensor/db/connection.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def connect(self) -> Connection:
    """Connect to the database and initialize the engine.
    If engine is None > create it with verified path > reflect.
    After connecting, ensure the timeseries_metadata table is present.
    """
    if self.engine is None:
        sqlite_path = self._verify_path()
        self.engine = create_engine(sqlite_path)

    connection = self.engine.connect()

    self.create_metadata()

    return connection
create_metadata()

Create a metadata table if it doesn't exist yet and store ts metadata.

Source code in gensor/db/connection.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def create_metadata(self) -> Table | None:
    """Create a metadata table if it doesn't exist yet and store ts metadata."""

    metadata_table = Table(
        "__timeseries_metadata__",
        self.metadata,
        Column("id", Integer, primary_key=True),
        Column("table_name", String, unique=True),
        Column("location", String),
        Column("variable", String),
        Column("unit", String),
        Column("start", String, nullable=True),
        Column("end", String, nullable=True),
        Column("extra", JSON, nullable=True),
        Column("cls", String, nullable=False),
    )

    if self.engine:
        metadata_table.create(self.engine, checkfirst=True)
        self.metadata.reflect(bind=self.engine)
        return metadata_table
    else:
        logger.info("Engine does not exist.")
        return None
create_table(schema_name, column_name)

Create a table in the database.

Schema name is a string representing the location, sensor, variable measured and unit of measurement. This is a way of preserving the metadata of the Timeseries. The index is always timestamp and the column name is dynamicly create from the measured variable.

Source code in gensor/db/connection.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def create_table(self, schema_name: str, column_name: str) -> Table | None:
    """Create a table in the database.

    Schema name is a string representing the location, sensor, variable measured and
    unit of measurement. This is a way of preserving the metadata of the Timeseries.
    The index is always `timestamp` and the column name is dynamicly create from
    the measured variable.
    """

    if schema_name in self.metadata.tables:
        return self.metadata.tables[schema_name]

    ts_table = Table(
        schema_name,
        self.metadata,
        Column("timestamp", String, primary_key=True),
        Column(column_name, Float),
        info={},
    )

    if self.engine:
        ts_table.create(self.engine, checkfirst=True)
        self.metadata.reflect(bind=self.engine)
        return ts_table
    else:
        logger.info("Engine does not exist.")
        return None
dispose()

Dispose of the engine, closing all connections.

Source code in gensor/db/connection.py
76
77
78
79
80
81
def dispose(self) -> None:
    """Dispose of the engine, closing all connections."""
    if self.metadata:
        self.metadata.clear()
    if self.engine:
        self.engine.dispose()
get_timeseries_metadata(location=None, variable=None, unit=None, **kwargs)

List timeseries available in the database.

Parameters:

Name Type Description Default
location str

Location attribute to match.

None
variable str

Variable attribute to match.

None
unit str

Unit attribute to match.

None
**kwargs dict

Additional filters. Must match the attributes of the Timeseries instance user is trying to retrieve.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The name of the matching table or None if no table is found.

Source code in gensor/db/connection.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def get_timeseries_metadata(
    self,
    location: str | None = None,
    variable: str | None = None,
    unit: str | None = None,
    **kwargs: dict,
) -> pd.DataFrame:
    """
    List timeseries available in the database.

    Parameters:
        location (str): Location attribute to match.
        variable (str): Variable attribute to match.
        unit (str): Unit attribute to match.
        **kwargs: Additional filters. Must match the attributes of the
            Timeseries instance user is trying to retrieve.

    Returns:
        pd.DataFrame: The name of the matching table or None if no table is found.
    """
    with self as con:
        if "__timeseries_metadata__" not in self.metadata.tables:
            logger.info("The metadata table does not exist in this database.")
            return pd.DataFrame()

        metadata_table = self.metadata.tables["__timeseries_metadata__"]

        base_filters = []

        if location is not None:
            base_filters.append(metadata_table.c.location.ilike(location))
        if variable is not None:
            base_filters.append(metadata_table.c.variable.ilike(variable))
        if unit is not None:
            base_filters.append(metadata_table.c.unit.ilike(unit))

        extra_filters = [
            func.json_extract(metadata_table.c.extra, f"$.{k}").ilike(v)
            for k, v in kwargs.items()
            if v is not None
        ]

        # True in and_(True, *arg) fixis FutureWarning of dissallowing empty
        # filters in the future.
        query = metadata_table.select().where(
            and_(True, *base_filters, *extra_filters)
        )

        result = con.execute(query).fetchall()

        return pd.DataFrame(result).set_index("id") if result else pd.DataFrame()

exceptions

IndexOutOfRangeError

Bases: IndexError

Custom exception raised when an index is out of range in the dataset.

Source code in gensor/exceptions.py
37
38
39
40
41
42
43
class IndexOutOfRangeError(IndexError):
    """Custom exception raised when an index is out of range in the dataset."""

    def __init__(self, index: int, dataset_size: int) -> None:
        super().__init__(
            f"Index {index} is out of range for the dataset with {dataset_size} timeseries."
        )

InvalidMeasurementTypeError

Bases: ValueError

Raised when a timeseries of a wrong measurement type is operated upon.

Source code in gensor/exceptions.py
1
2
3
4
5
6
7
class InvalidMeasurementTypeError(ValueError):
    """Raised when a timeseries of a wrong measurement type is operated upon."""

    def __init__(self, expected_type: str = "pressure") -> None:
        self.expected_type = expected_type
        message = f"Timeseries must be of measurement type '{self.expected_type}'."
        super().__init__(message)

MissingInputError

Bases: ValueError

Raised when a required input is missing.

Source code in gensor/exceptions.py
10
11
12
13
14
15
16
17
class MissingInputError(ValueError):
    """Raised when a required input is missing."""

    def __init__(self, input_name: str, message: str | None = None) -> None:
        self.input_name = input_name
        if message is None:
            message = f"Missing required input: '{self.input_name}'."
        super().__init__(message)

TimeseriesUnequal

Bases: ValueError

Raised when Timeseries objects are compared and are unequal.

Source code in gensor/exceptions.py
26
27
28
29
30
31
32
33
34
class TimeseriesUnequal(ValueError):
    """Raised when Timeseries objects are compared and are unequal."""

    def __init__(self, *args: object, message: str | None = None) -> None:
        message = (
            "Timeseries objects must have the same location, sensor, variable, and \
        unit to be added together."
        )
        super().__init__(message, *args)

io

read

Fetching the data from various sources.

TODO: Fix up the read_from_sql() function to actually work properly.

read_from_api()

Fetch data from the API.

Source code in gensor/io/read.py
190
191
192
def read_from_api() -> Dataset:
    """Fetch data from the API."""
    return NotImplemented

read_from_csv(path, file_format='vanessen', **kwargs)

Loads the data from csv files with given file_format and returns a list of Timeseries objects.

Parameters:

Name Type Description Default
path Path

The path to the file or directory containing the files.

required
**kwargs dict

Optional keyword arguments passed to the parsers: * serial_number_pattern (str): The regex pattern to extract the serial number from the file. * location_pattern (str): The regex pattern to extract the station from the file. * col_names (list): The column names for the dataframe. * location (str): Name of the location of the timeseries. * sensor (str): Sensor serial number.

{}
Source code in gensor/io/read.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def read_from_csv(
    path: Path, file_format: Literal["vanessen", "plain"] = "vanessen", **kwargs: Any
) -> Dataset | Timeseries:
    """Loads the data from csv files with given file_format and returns a list of Timeseries objects.

    Parameters:
        path (Path): The path to the file or directory containing the files.
        **kwargs (dict): Optional keyword arguments passed to the parsers:
            * serial_number_pattern (str): The regex pattern to extract the serial number from the file.
            * location_pattern (str): The regex pattern to extract the station from the file.
            * col_names (list): The column names for the dataframe.
            * location (str): Name of the location of the timeseries.
            * sensor (str): Sensor serial number.
    """

    parsers = {
        "vanessen": parse_vanessen_csv,
        "plain": parse_plain,
        # more parser to be implemented
    }

    if not isinstance(path, Path):
        message = "The path argument must be a Path object."
        raise TypeError(message)

    if path.is_dir() and not any(
        file.is_file() and file.suffix.lower() == ".csv" for file in path.iterdir()
    ):
        logger.info("No CSV files found. Operation skipped.")
        return Dataset()

    files = (
        [
            file
            for file in path.iterdir()
            if file.is_file() and file.suffix.lower() == ".csv"
        ]
        if path.is_dir()
        else [path]
        if path.suffix.lower() == ".csv"
        else []
    )

    if not files:
        logger.info("No CSV files found. Operation skipped.")
        return Dataset()

    parser = parsers[file_format]

    ds: Dataset = Dataset()

    for f in files:
        logger.info(f"Loading file: {f}")
        ts_in_file = parser(f, **kwargs)
        ds.add(ts_in_file)

    # If there is only one Timeseries in Dataset (as in the condition), ds[0] will always
    # be a Timeseries; so the line below does not introduce potential None in the return
    return ds[0] if len(ds) == 1 else ds  # type: ignore[return-value]

read_from_sql(db, load_all=True, location=None, variable=None, unit=None, timestamp_start=None, timestamp_stop=None, **kwargs)

Returns the timeseries or a dataset from a SQL database.

Parameters:

Name Type Description Default
db DatabaseConnection

The database connection object.

required
load_all bool

Whether to load all timeseries from the database.

True
location str

The station name.

None
variable str

The measurement type.

None
unit str

The unit of the measurement.

None
timestamp_start Timestamp

Start timestamp filter.

None
timestamp_stop Timestamp

End timestamp filter.

None
**kwargs dict

Any additional filters matching attributes of the particular timeseries.

{}

Returns:

Name Type Description
Dataset Timeseries | Dataset

Dataset with retrieved objects or an empty Dataset.

Source code in gensor/io/read.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def read_from_sql(
    db: DatabaseConnection,
    load_all: bool = True,
    location: str | None = None,
    variable: str | None = None,
    unit: str | None = None,
    timestamp_start: pd.Timestamp | None = None,
    timestamp_stop: pd.Timestamp | None = None,
    **kwargs: dict,
) -> Timeseries | Dataset:
    """Returns the timeseries or a dataset from a SQL database.

    Parameters:
        db (DatabaseConnection): The database connection object.
        load_all (bool): Whether to load all timeseries from the database.
        location (str): The station name.
        variable (str): The measurement type.
        unit (str): The unit of the measurement.
        timestamp_start (pd.Timestamp, optional): Start timestamp filter.
        timestamp_stop (pd.Timestamp, optional): End timestamp filter.
        **kwargs (dict): Any additional filters matching attributes of the particular
            timeseries.

    Returns:
        Dataset: Dataset with retrieved objects or an empty Dataset.
    """

    def _read_data_from_schema(schema_name: str) -> Any:
        """Read data from the table and apply the timestamp filter.

        Parameters:
            schema_name (str): name of the schema in SQLite database.

        Returns:
            pd.Series: results of the query or an empty pd.Series if none are found.
        """
        with db as con:
            schema = db.metadata.tables[schema_name]
            data_query = select(schema)

            if timestamp_start or timestamp_stop:
                if timestamp_start:
                    data_query = data_query.where(schema.c.timestamp >= timestamp_start)
                if timestamp_stop:
                    data_query = data_query.where(schema.c.timestamp <= timestamp_stop)

            ts = pd.read_sql(
                data_query,
                con=con,
                parse_dates={"timestamp": "%Y-%m-%dT%H:%M:%S%z"},
                index_col="timestamp",
            ).squeeze()

        if ts.empty:
            message = f"No data found in table {schema_name}"
            logger.warning(message)

        return ts.sort_index()

    def _create_object(data: pd.Series, metadata: dict) -> Any:
        """Create the appropriate object for timeseries."""

        core_metadata = {
            "location": metadata["location"],
            "variable": metadata["variable"],
            "unit": metadata["unit"],
        }

        extra_metadata = metadata.get("extra", {})

        ts_metadata = {**core_metadata, **extra_metadata}

        cls = metadata["cls"]
        module_name, class_name = cls.rsplit(".", 1)
        module = import_module(module_name)

        TimeseriesClass = getattr(module, class_name)
        ts_object = TimeseriesClass(ts=data, **ts_metadata)

        return ts_object

    metadata_df = (
        db.get_timeseries_metadata(
            location=location, variable=variable, unit=unit, **kwargs
        )
        if not load_all
        else db.get_timeseries_metadata()
    )

    if metadata_df.empty:
        message = "No schemas matched the specified filters."
        raise ValueError(message)

    timeseries_list = []

    for row in metadata_df.to_dict(orient="records"):
        try:
            schema_name = row.pop("table_name")
            data = _read_data_from_schema(schema_name)
            timeseries_obj = _create_object(data, row)
            timeseries_list.append(timeseries_obj)
        except (ValueError, TypeError):
            logger.exception(f"Skipping schema {schema_name} due to error.")

    return Dataset(timeseries=timeseries_list) if timeseries_list else Dataset()

log

set_log_level(level)

Set the logging level for the package.

Source code in gensor/log.py
4
5
6
7
def set_log_level(level: str) -> None:
    """Set the logging level for the package."""
    logger = logging.getLogger("gensor")
    logger.setLevel(level.upper())

parse

parse_plain(path, **kwargs)

Parse a simple csv without metadata header, just columns with variables

Parameters:

Name Type Description Default
path Path

The path to the file.

required

Returns:

Name Type Description
list list[Timeseries]

A list of Timeseries objects.

Source code in gensor/parse/plain.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def parse_plain(path: Path, **kwargs: Any) -> list[Timeseries]:
    """Parse a simple csv without metadata header, just columns with variables

    Parameters:
        path (Path): The path to the file.

    Returns:
        list: A list of Timeseries objects.
    """

    column_names = kwargs.get("col_names", ["timestamp", "pressure", "temperature"])

    encoding = detect_encoding(path, num_bytes=10_000)

    df = read_csv(
        path,
        encoding=encoding,
        skipfooter=1,
        skip_blank_lines=True,
        header=None,
        skiprows=1,
        index_col="timestamp",
        names=column_names,
        engine="python",
    )

    df = handle_timestamps(df, kwargs.get("timezone", "UTC"))

    ts_list = []

    for col in df.columns:
        if col in VARIABLE_TYPES_AND_UNITS:
            unit = VARIABLE_TYPES_AND_UNITS[col][0]
            ts_list.append(
                Timeseries(
                    ts=df[col],
                    # Validation will be done in Pydantic
                    variable=col,  # type: ignore[arg-type]
                    location=kwargs["location"],
                    sensor=kwargs["sensor"],
                    # Validation will be done in Pydantic
                    unit=unit,  # type: ignore[arg-type]
                )
            )
        else:
            message = (
                "Unsupported variable: {col}. Please provide a valid variable type."
            )
            raise ValueError(message)

    return ts_list

parse_vanessen_csv(path, **kwargs)

Parses a van Essen csv file and returns a list of Timeseries objects. At this point it does not matter whether the file is a barometric or piezometric logger file.

The function will use regex patterns to extract the serial number and station from the file. It is important to use the appropriate regex patterns, particularily for the station. If the default patterns are not working (whihc most likely will be the case), the user should provide their own patterns. The patterns can be provided as keyword arguments to the function and it is possible to use OR (|) in the regex pattern.

Warning

A better check for the variable type and units has to be implemented.

Parameters:

Name Type Description Default
path Path

The path to the file.

required

Other Parameters:

Name Type Description
serial_number_pattern str

The regex pattern to extract the serial number from the file.

location_pattern str

The regex pattern to extract the station from the file.

col_names list

The column names for the dataframe.

Returns:

Name Type Description
list list[Timeseries]

A list of Timeseries objects.

Source code in gensor/parse/vanessen.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def parse_vanessen_csv(path: Path, **kwargs: Any) -> list[Timeseries]:
    """Parses a van Essen csv file and returns a list of Timeseries objects. At this point it
    does not matter whether the file is a barometric or piezometric logger file.

    The function will use regex patterns to extract the serial number and station from the file. It is
    important to use the appropriate regex patterns, particularily for the station. If the default patterns
    are not working (whihc most likely will be the case), the user should provide their own patterns. The patterns
    can be provided as keyword arguments to the function and it is possible to use OR (|) in the regex pattern.

    !!! warning

        A better check for the variable type and units has to be implemented.

    Parameters:
        path (Path): The path to the file.

    Other Parameters:
        serial_number_pattern (str): The regex pattern to extract the serial number from the file.
        location_pattern (str): The regex pattern to extract the station from the file.
        col_names (list): The column names for the dataframe.

    Returns:
        list: A list of Timeseries objects.
    """

    patterns = {
        "sensor": kwargs.get("serial_number_pattern", r"[A-Za-z]{1,2}\d{3,4}"),
        "location": kwargs.get(
            "location_pattern", r"[A-Za-z]{2}\d{2}[A-Za-z]{1}|Barodiver"
        ),
        "timezone": kwargs.get("timezone_pattern", r"UTC[+-]?\d+"),
    }

    column_names = kwargs.get("col_names", ["timestamp", "pressure", "temperature"])

    encoding = detect_encoding(path, num_bytes=10_000)

    with path.open(mode="r", encoding=encoding) as f:
        text = f.read()

        fields = get_header_fields(text)

        def pick(pattern: str, raw: str | None, override: str | None) -> str | None:
            """Resolve a metadata value from a labelled header field.

            An explicit ``location=``/``sensor=`` kwarg always wins. Otherwise the
            pattern is matched against *that field's value only* (so e.g. the serial
            ``AZ066`` is pulled out of ``..00-AZ066  219.`` and the location ``PB16A``
            out of ``pb16a_moni_az066``), falling back to the verbatim field value
            when the pattern does not match (e.g. ``FL1`` / ``barodiver`` locations).
            """
            if override:
                return override
            if not raw:
                return None
            match = re.search(pattern, raw)
            return match.group() if match else raw

        # Read the labelled header fields directly — far more reliable than matching a
        # regex against the whole file, which can grab a stray token from the embedded
        # FILENAME path (e.g. a folder name) instead of the real serial.
        location = pick(
            patterns["location"], fields.get("Location"), kwargs.get("location")
        )
        sensor = pick(
            patterns["sensor"], fields.get("Serial number"), kwargs.get("sensor")
        )
        tz_match = re.search(patterns["timezone"], text)
        timezone = tz_match.group() if tz_match else "UTC"

        if location is None or sensor is None:
            logger.info(
                f"Skipping file {path} due to missing metadata "
                "(pass location=/sensor= to override)."
            )
            return []

        data_start = "Date/time"
        data_end = "END OF DATA FILE"

        df = get_data(text, data_start, data_end, column_names)

        df = handle_timestamps(df, timezone)

        ts_list = []

        for col in df.columns:
            if col in VARIABLE_TYPES_AND_UNITS:
                unit = VARIABLE_TYPES_AND_UNITS[col][0]
                ts_list.append(
                    Timeseries(
                        ts=df[col],
                        # Validation will be done in Pydantic
                        variable=col,  # type: ignore[arg-type]
                        location=location,
                        sensor=sensor,
                        # Validation will be done in Pydantic
                        unit=unit,  # type: ignore[arg-type]
                    )
                )
            else:
                message = f"Unsupported variable: {col}. Please provide a valid variable type."
                raise ValueError(message)

    return ts_list

plain

parse_plain(path, **kwargs)

Parse a simple csv without metadata header, just columns with variables

Parameters:

Name Type Description Default
path Path

The path to the file.

required

Returns:

Name Type Description
list list[Timeseries]

A list of Timeseries objects.

Source code in gensor/parse/plain.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def parse_plain(path: Path, **kwargs: Any) -> list[Timeseries]:
    """Parse a simple csv without metadata header, just columns with variables

    Parameters:
        path (Path): The path to the file.

    Returns:
        list: A list of Timeseries objects.
    """

    column_names = kwargs.get("col_names", ["timestamp", "pressure", "temperature"])

    encoding = detect_encoding(path, num_bytes=10_000)

    df = read_csv(
        path,
        encoding=encoding,
        skipfooter=1,
        skip_blank_lines=True,
        header=None,
        skiprows=1,
        index_col="timestamp",
        names=column_names,
        engine="python",
    )

    df = handle_timestamps(df, kwargs.get("timezone", "UTC"))

    ts_list = []

    for col in df.columns:
        if col in VARIABLE_TYPES_AND_UNITS:
            unit = VARIABLE_TYPES_AND_UNITS[col][0]
            ts_list.append(
                Timeseries(
                    ts=df[col],
                    # Validation will be done in Pydantic
                    variable=col,  # type: ignore[arg-type]
                    location=kwargs["location"],
                    sensor=kwargs["sensor"],
                    # Validation will be done in Pydantic
                    unit=unit,  # type: ignore[arg-type]
                )
            )
        else:
            message = (
                "Unsupported variable: {col}. Please provide a valid variable type."
            )
            raise ValueError(message)

    return ts_list

utils

detect_encoding(path, num_bytes=1024)

Detect the encoding of a file using chardet.

Parameters:

Name Type Description Default
path Path

The path to the file.

required
num_bytes int

Number of bytes to read for encoding detection (default is 1024).

1024

Returns:

Name Type Description
str str

The detected encoding of the file.

Source code in gensor/parse/utils.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def detect_encoding(path: Path, num_bytes: int = 1024) -> str:
    """Detect the encoding of a file using chardet.

    Parameters:
        path (Path): The path to the file.
        num_bytes (int): Number of bytes to read for encoding detection (default is 1024).

    Returns:
        str: The detected encoding of the file.
    """
    with path.open("rb") as f:
        raw_data = f.read(num_bytes)
    result = chardet.detect(raw_data)
    return result["encoding"] or "utf-8"

get_data(text, data_start, data_end, column_names)

Search for data in the file.

Parameters:

Name Type Description Default
text str

string obtained from the CSV file.

required
data_start str

string marking the data header row.

required
data_end str

string marking the end of the data block. When it is not present (some exports omit the trailing marker), the data is read to the end of the file.

required
column_names list

list of expected column names.

required

Returns:

Type Description
DataFrame

pd.DataFrame

Source code in gensor/parse/utils.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def get_data(
    text: str, data_start: str, data_end: str, column_names: list
) -> DataFrame:
    """Search for data in the file.

    Parameters:
        text (str): string obtained from the CSV file.
        data_start (str): string marking the data header row.
        data_end (str): string marking the end of the data block. When it is not
            present (some exports omit the trailing marker), the data is read to
            the end of the file.
        column_names (list): list of expected column names.

    Returns:
        pd.DataFrame
    """

    start = text.find(data_start)
    if start == -1:
        message = f"Could not find the data header {data_start!r} in the file."
        raise ValueError(message)

    end = text.find(data_end, start)
    if end == -1:  # exports without the trailing marker: read to end of file
        end = len(text)

    block = text[start:end]
    sep = _sniff_delimiter(block.splitlines()[0])

    df = read_csv(
        StringIO(block),
        skiprows=1,
        header=None,
        names=column_names,
        index_col="timestamp",
        sep=sep,
    )

    return df

get_header_fields(text)

Parse the key = value lines of a Diver-Office header into a dict.

Diver-Office files carry labelled fields in the header (e.g. Location and Serial number); reading those directly is far more reliable than matching a regex against the whole file, which can pick up stray matches from the embedded FILENAME path. Parsing stops at the data block, section markers ([Logger settings] ...) and lines without a key = value shape are skipped, and the first occurrence of each key wins.

Parameters:

Name Type Description Default
text str

string obtained from the CSV file.

required

Returns:

Name Type Description
dict dict

header field name -> value (both stripped).

Source code in gensor/parse/utils.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def get_header_fields(text: str) -> dict:
    """Parse the ``key = value`` lines of a Diver-Office header into a dict.

    Diver-Office files carry labelled fields in the header (e.g. ``Location`` and
    ``Serial number``); reading those directly is far more reliable than matching a
    regex against the whole file, which can pick up stray matches from the embedded
    ``FILENAME`` path. Parsing stops at the data block, section markers
    (``[Logger settings]`` ...) and lines without a ``key = value`` shape are
    skipped, and the first occurrence of each key wins.

    Parameters:
        text (str): string obtained from the CSV file.

    Returns:
        dict: header field name -> value (both stripped).
    """
    fields: dict[str, str] = {}

    for line in text.splitlines():
        if "Date/time" in line:  # start of the data block
            break
        if not line.strip() or line.lstrip().startswith("["):
            continue
        key, sep, value = line.partition("=")
        key = key.strip()
        if sep and key and key not in fields:
            fields[key] = value.strip()

    return fields

get_metadata(text, patterns)

Search for metadata in the file header with given regex patterns.

Parameters:

Name Type Description Default
text str

string obtained from the CSV file.

required
patterns dict

regex patterns matching the location and sensor information.

required

Returns:

Name Type Description
dict dict

metadata of the timeseries.

Source code in gensor/parse/utils.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def get_metadata(text: str, patterns: dict) -> dict:
    """Search for metadata in the file header with given regex patterns.

    Parameters:
        text (str): string obtained from the CSV file.
        patterns (dict): regex patterns matching the location and sensor information.

    Returns:
        dict: metadata of the timeseries.
    """
    metadata = {}

    for k, v in patterns.items():
        match = re.search(v, text)
        metadata[k] = match.group() if match else None

    return metadata

handle_timestamps(df, tz_string)

Converts timestamps in the dataframe to the specified timezone (e.g., 'UTC+1').

Parameters:

Name Type Description Default
df DataFrame

The dataframe with timestamps.

required
tz_string str

A timezone string like 'UTC+1' or 'UTC-5'.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The dataframe with timestamps converted to UTC.

Source code in gensor/parse/utils.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def handle_timestamps(df: DataFrame, tz_string: str) -> DataFrame:
    """Converts timestamps in the dataframe to the specified timezone (e.g., 'UTC+1').

    Parameters:
        df (pd.DataFrame): The dataframe with timestamps.
        tz_string (str): A timezone string like 'UTC+1' or 'UTC-5'.

    Returns:
        pd.DataFrame: The dataframe with timestamps converted to UTC.
    """
    timezone = tz.gettz(tz_string)

    df.index = to_datetime(df.index).tz_localize(timezone)
    df.index = df.index.tz_convert("UTC")

    return df

vanessen

Logic parsing CSV files from van Essen Instruments Divers.

parse_vanessen_csv(path, **kwargs)

Parses a van Essen csv file and returns a list of Timeseries objects. At this point it does not matter whether the file is a barometric or piezometric logger file.

The function will use regex patterns to extract the serial number and station from the file. It is important to use the appropriate regex patterns, particularily for the station. If the default patterns are not working (whihc most likely will be the case), the user should provide their own patterns. The patterns can be provided as keyword arguments to the function and it is possible to use OR (|) in the regex pattern.

Warning

A better check for the variable type and units has to be implemented.

Parameters:

Name Type Description Default
path Path

The path to the file.

required

Other Parameters:

Name Type Description
serial_number_pattern str

The regex pattern to extract the serial number from the file.

location_pattern str

The regex pattern to extract the station from the file.

col_names list

The column names for the dataframe.

Returns:

Name Type Description
list list[Timeseries]

A list of Timeseries objects.

Source code in gensor/parse/vanessen.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def parse_vanessen_csv(path: Path, **kwargs: Any) -> list[Timeseries]:
    """Parses a van Essen csv file and returns a list of Timeseries objects. At this point it
    does not matter whether the file is a barometric or piezometric logger file.

    The function will use regex patterns to extract the serial number and station from the file. It is
    important to use the appropriate regex patterns, particularily for the station. If the default patterns
    are not working (whihc most likely will be the case), the user should provide their own patterns. The patterns
    can be provided as keyword arguments to the function and it is possible to use OR (|) in the regex pattern.

    !!! warning

        A better check for the variable type and units has to be implemented.

    Parameters:
        path (Path): The path to the file.

    Other Parameters:
        serial_number_pattern (str): The regex pattern to extract the serial number from the file.
        location_pattern (str): The regex pattern to extract the station from the file.
        col_names (list): The column names for the dataframe.

    Returns:
        list: A list of Timeseries objects.
    """

    patterns = {
        "sensor": kwargs.get("serial_number_pattern", r"[A-Za-z]{1,2}\d{3,4}"),
        "location": kwargs.get(
            "location_pattern", r"[A-Za-z]{2}\d{2}[A-Za-z]{1}|Barodiver"
        ),
        "timezone": kwargs.get("timezone_pattern", r"UTC[+-]?\d+"),
    }

    column_names = kwargs.get("col_names", ["timestamp", "pressure", "temperature"])

    encoding = detect_encoding(path, num_bytes=10_000)

    with path.open(mode="r", encoding=encoding) as f:
        text = f.read()

        fields = get_header_fields(text)

        def pick(pattern: str, raw: str | None, override: str | None) -> str | None:
            """Resolve a metadata value from a labelled header field.

            An explicit ``location=``/``sensor=`` kwarg always wins. Otherwise the
            pattern is matched against *that field's value only* (so e.g. the serial
            ``AZ066`` is pulled out of ``..00-AZ066  219.`` and the location ``PB16A``
            out of ``pb16a_moni_az066``), falling back to the verbatim field value
            when the pattern does not match (e.g. ``FL1`` / ``barodiver`` locations).
            """
            if override:
                return override
            if not raw:
                return None
            match = re.search(pattern, raw)
            return match.group() if match else raw

        # Read the labelled header fields directly — far more reliable than matching a
        # regex against the whole file, which can grab a stray token from the embedded
        # FILENAME path (e.g. a folder name) instead of the real serial.
        location = pick(
            patterns["location"], fields.get("Location"), kwargs.get("location")
        )
        sensor = pick(
            patterns["sensor"], fields.get("Serial number"), kwargs.get("sensor")
        )
        tz_match = re.search(patterns["timezone"], text)
        timezone = tz_match.group() if tz_match else "UTC"

        if location is None or sensor is None:
            logger.info(
                f"Skipping file {path} due to missing metadata "
                "(pass location=/sensor= to override)."
            )
            return []

        data_start = "Date/time"
        data_end = "END OF DATA FILE"

        df = get_data(text, data_start, data_end, column_names)

        df = handle_timestamps(df, timezone)

        ts_list = []

        for col in df.columns:
            if col in VARIABLE_TYPES_AND_UNITS:
                unit = VARIABLE_TYPES_AND_UNITS[col][0]
                ts_list.append(
                    Timeseries(
                        ts=df[col],
                        # Validation will be done in Pydantic
                        variable=col,  # type: ignore[arg-type]
                        location=location,
                        sensor=sensor,
                        # Validation will be done in Pydantic
                        unit=unit,  # type: ignore[arg-type]
                    )
                )
            else:
                message = f"Unsupported variable: {col}. Please provide a valid variable type."
                raise ValueError(message)

    return ts_list

processing

compensation

Compensating the raw data from the absolute pressure transducer to the actual water level using the barometric pressure data.

Because van Essen Instrument divers are non-vented pressure transducers, to obtain the pressure resulting from the water column above the logger (i.e. the water level), the barometric pressure must be subtracted from the raw pressure measurements. In the first step the function aligns the two series to the same time step and then subtracts the barometric pressure from the raw pressure measurements. For short time periods (when for instance a slug test is performed) the barometric pressure can be provided as a single float value.

Subsequently the function filters out all records where the water column is less than or equal to the cutoff value, and - always, regardless of the cutoff - every record with a negative water column. The water column above a submerged sensor is physically non-negative, so the near-zero readings taken while the logger is out of the water (which produce erroneous results and spikes in the plots) and any negative values (out-of-water / noise / barometric-alignment artefacts) are all erroneous. The comparison is signed, not on the absolute value, so large negative spikes are dropped rather than kept. The cutoff defaults to 25 mm (threshold_wc=0.025) and is always applied; lower it to keep shallower columns, or set it to 0 to drop only negatives.

Functions:

water_column: Barometrically compensate raw pressure to the water column above the
    sensor (the first step, without adding the sensor altitude).
compensate: Full compensation of raw sensor pressure to groundwater head, using
    ``water_column`` and then adding the sensor altitude.

Compensator

Bases: BaseModel

Compensate raw sensor pressure measurement with barometric pressure.

Attributes:

Name Type Description
ts Timeseries

Raw sensor timeseries

barometric Timeseries | float

Barometric pressure timeseries or a single float value. If a float value is provided, it is assumed to be in cmH2O.

Source code in gensor/processing/compensation.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class Compensator(pyd.BaseModel):
    """Compensate raw sensor pressure measurement with barometric pressure.

    Attributes:
        ts (Timeseries): Raw sensor timeseries
        barometric (Timeseries | float): Barometric pressure timeseries or a single
            float value. If a float value is provided, it is assumed to be in cmH2O.
    """

    ts: Timeseries
    barometric: Timeseries | float

    @pyd.field_validator("ts", "barometric", mode="before")
    def validate_timeseries_type(cls, v: Timeseries) -> Timeseries:
        if isinstance(v, Timeseries) and v.variable != "pressure":
            raise InvalidMeasurementTypeError()
        return v

    @pyd.field_validator("ts")
    def validate_sensor_information(cls, v: Timeseries) -> Timeseries:
        if v.sensor is not None and not v.sensor_alt:
            raise MissingInputError("sensor_alt")
        return v

    def water_column(
        self,
        alignment_period: Literal["D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"],
        threshold_wc: float | None,
        fieldwork_dates: list | None,
    ) -> Timeseries | None:
        """Compute the barometrically compensated water column above the sensor.

        Aligns the raw and barometric series to ``alignment_period``, subtracts the
        barometric pressure, converts cmH2O to mH2O, masks fieldwork days, and drops the
        out-of-water records (see ``threshold_wc``). This is the first step of
        :meth:`compensate` and can be used on its own to obtain just the water column
        height (it does not require ``sensor_alt``).

        Parameters:
            alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']: The alignment period for the timeseries.
                Default is 'h'. See pandas offset aliases for definitinos.
            threshold_wc (float | None): Lower cutoff (in m) for the water column.
                Records at or below it are dropped, along with all negative water columns
                (which are always dropped as physically impossible). ``None`` is treated
                as ``0`` (drop only negatives).
            fieldwork_dates (Optional[list]): List of dates when fieldwork was done. All
                measurement from a fieldwork day will be set to None.

        Returns:
            Timeseries: A new Timeseries of the water column height in metres (variable
                'water_column', unit 'm'); dropped out-of-water records are kept in
                ``.outliers``. ``None`` if the raw and barometric series are the same.
        """

        resample_params = {"freq": alignment_period, "agg_func": pd.Series.mean}
        resampled_ts = self.ts.resample(**resample_params)

        if isinstance(self.barometric, Timeseries):
            if self.ts == self.barometric:
                print("Skipping compensation: both timeseries are the same.")
                return None
            resampled_baro = self.barometric.resample(**resample_params).ts

        elif isinstance(self.barometric, float):
            resampled_baro = pd.Series(
                [self.barometric] * len(resampled_ts.ts), index=resampled_ts.ts.index
            )

        # dividing by 100 to convert water column from cmH2O to mH2O
        watercolumn_ts = resampled_ts.ts.sub(resampled_baro).divide(100).dropna()

        if not isinstance(watercolumn_ts.index, pd.DatetimeIndex):
            watercolumn_ts.index = pd.to_datetime(watercolumn_ts.index)

        if fieldwork_dates:
            fieldwork_timestamps = pd.to_datetime(fieldwork_dates).tz_localize(
                watercolumn_ts.index.tz
            )

            watercolumn_ts.loc[
                watercolumn_ts.index.normalize().isin(fieldwork_timestamps)
            ] = None

        # The water column above a submerged sensor is physically non-negative, so any
        # negative value (logger out of the water / noise / barometric misalignment) is
        # always discarded - regardless of the cutoff. On top of that, the near-zero
        # out-of-water band at or below ``threshold_wc`` (25 mm by default) is removed.
        # This always runs; pass a smaller ``threshold_wc`` to keep shallower columns, or
        # ``0`` to drop only negatives. NaN values (e.g. fieldwork-masked days) are left
        # in place as gaps. A signed comparison is essential: ``.abs() > threshold`` would
        # wrongly retain large-magnitude negatives.
        cutoff = 0.0 if threshold_wc is None else float(threshold_wc)
        invalid = (watercolumn_ts < 0) | (watercolumn_ts <= cutoff)
        watercolumn_ts_filtered = watercolumn_ts[~invalid]
        dropped_outliers = watercolumn_ts[invalid]

        if len(dropped_outliers):
            print(
                f"{len(dropped_outliers)} records dropped "
                f"(negative or <= {cutoff} m water column / out of water)."
            )

        return resampled_ts.model_copy(
            update={
                "ts": watercolumn_ts_filtered,
                "outliers": dropped_outliers,
                "unit": "m",
                "variable": "water_column",
            },
            deep=True,
        )

    def compensate(
        self,
        alignment_period: Literal["D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"],
        threshold_wc: float | None,
        fieldwork_dates: list | None,
    ) -> Timeseries | None:
        """Perform full compensation to groundwater head (m asl).

        Computes the water column with :meth:`water_column`, then adds the sensor
        altitude (``sensor_alt``) to express it as head above the reference datum.

        Parameters:
            alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']: The alignment period for the timeseries.
                Default is 'h'. See pandas offset aliases for definitinos.
            threshold_wc (float | None): Lower cutoff (in m) for the water column; see
                :meth:`water_column`.
            fieldwork_dates (Optional[list]): List of dates when fieldwork was done. All
                measurement from a fieldwork day will be set to None.

        Returns:
            Timeseries: A new Timeseries instance with the compensated data and updated unit and variable. Optionally removed outliers are included.
        """
        watercolumn = self.water_column(
            alignment_period=alignment_period,
            threshold_wc=threshold_wc,
            fieldwork_dates=fieldwork_dates,
        )
        if watercolumn is None:
            return None

        gwl = watercolumn.ts.add(float(watercolumn.sensor_alt or 0))

        return watercolumn.model_copy(
            update={"ts": gwl, "unit": "m asl", "variable": "head"},
            deep=True,
        )
compensate(alignment_period, threshold_wc, fieldwork_dates)

Perform full compensation to groundwater head (m asl).

Computes the water column with :meth:water_column, then adds the sensor altitude (sensor_alt) to express it as head above the reference datum.

Parameters:

Name Type Description Default
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

required
threshold_wc float | None

Lower cutoff (in m) for the water column; see :meth:water_column.

required
fieldwork_dates Optional[list]

List of dates when fieldwork was done. All measurement from a fieldwork day will be set to None.

required

Returns:

Name Type Description
Timeseries Timeseries | None

A new Timeseries instance with the compensated data and updated unit and variable. Optionally removed outliers are included.

Source code in gensor/processing/compensation.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def compensate(
    self,
    alignment_period: Literal["D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"],
    threshold_wc: float | None,
    fieldwork_dates: list | None,
) -> Timeseries | None:
    """Perform full compensation to groundwater head (m asl).

    Computes the water column with :meth:`water_column`, then adds the sensor
    altitude (``sensor_alt``) to express it as head above the reference datum.

    Parameters:
        alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']: The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float | None): Lower cutoff (in m) for the water column; see
            :meth:`water_column`.
        fieldwork_dates (Optional[list]): List of dates when fieldwork was done. All
            measurement from a fieldwork day will be set to None.

    Returns:
        Timeseries: A new Timeseries instance with the compensated data and updated unit and variable. Optionally removed outliers are included.
    """
    watercolumn = self.water_column(
        alignment_period=alignment_period,
        threshold_wc=threshold_wc,
        fieldwork_dates=fieldwork_dates,
    )
    if watercolumn is None:
        return None

    gwl = watercolumn.ts.add(float(watercolumn.sensor_alt or 0))

    return watercolumn.model_copy(
        update={"ts": gwl, "unit": "m asl", "variable": "head"},
        deep=True,
    )
water_column(alignment_period, threshold_wc, fieldwork_dates)

Compute the barometrically compensated water column above the sensor.

Aligns the raw and barometric series to alignment_period, subtracts the barometric pressure, converts cmH2O to mH2O, masks fieldwork days, and drops the out-of-water records (see threshold_wc). This is the first step of :meth:compensate and can be used on its own to obtain just the water column height (it does not require sensor_alt).

Parameters:

Name Type Description Default
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

required
threshold_wc float | None

Lower cutoff (in m) for the water column. Records at or below it are dropped, along with all negative water columns (which are always dropped as physically impossible). None is treated as 0 (drop only negatives).

required
fieldwork_dates Optional[list]

List of dates when fieldwork was done. All measurement from a fieldwork day will be set to None.

required

Returns:

Name Type Description
Timeseries Timeseries | None

A new Timeseries of the water column height in metres (variable 'water_column', unit 'm'); dropped out-of-water records are kept in .outliers. None if the raw and barometric series are the same.

Source code in gensor/processing/compensation.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def water_column(
    self,
    alignment_period: Literal["D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"],
    threshold_wc: float | None,
    fieldwork_dates: list | None,
) -> Timeseries | None:
    """Compute the barometrically compensated water column above the sensor.

    Aligns the raw and barometric series to ``alignment_period``, subtracts the
    barometric pressure, converts cmH2O to mH2O, masks fieldwork days, and drops the
    out-of-water records (see ``threshold_wc``). This is the first step of
    :meth:`compensate` and can be used on its own to obtain just the water column
    height (it does not require ``sensor_alt``).

    Parameters:
        alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']: The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float | None): Lower cutoff (in m) for the water column.
            Records at or below it are dropped, along with all negative water columns
            (which are always dropped as physically impossible). ``None`` is treated
            as ``0`` (drop only negatives).
        fieldwork_dates (Optional[list]): List of dates when fieldwork was done. All
            measurement from a fieldwork day will be set to None.

    Returns:
        Timeseries: A new Timeseries of the water column height in metres (variable
            'water_column', unit 'm'); dropped out-of-water records are kept in
            ``.outliers``. ``None`` if the raw and barometric series are the same.
    """

    resample_params = {"freq": alignment_period, "agg_func": pd.Series.mean}
    resampled_ts = self.ts.resample(**resample_params)

    if isinstance(self.barometric, Timeseries):
        if self.ts == self.barometric:
            print("Skipping compensation: both timeseries are the same.")
            return None
        resampled_baro = self.barometric.resample(**resample_params).ts

    elif isinstance(self.barometric, float):
        resampled_baro = pd.Series(
            [self.barometric] * len(resampled_ts.ts), index=resampled_ts.ts.index
        )

    # dividing by 100 to convert water column from cmH2O to mH2O
    watercolumn_ts = resampled_ts.ts.sub(resampled_baro).divide(100).dropna()

    if not isinstance(watercolumn_ts.index, pd.DatetimeIndex):
        watercolumn_ts.index = pd.to_datetime(watercolumn_ts.index)

    if fieldwork_dates:
        fieldwork_timestamps = pd.to_datetime(fieldwork_dates).tz_localize(
            watercolumn_ts.index.tz
        )

        watercolumn_ts.loc[
            watercolumn_ts.index.normalize().isin(fieldwork_timestamps)
        ] = None

    # The water column above a submerged sensor is physically non-negative, so any
    # negative value (logger out of the water / noise / barometric misalignment) is
    # always discarded - regardless of the cutoff. On top of that, the near-zero
    # out-of-water band at or below ``threshold_wc`` (25 mm by default) is removed.
    # This always runs; pass a smaller ``threshold_wc`` to keep shallower columns, or
    # ``0`` to drop only negatives. NaN values (e.g. fieldwork-masked days) are left
    # in place as gaps. A signed comparison is essential: ``.abs() > threshold`` would
    # wrongly retain large-magnitude negatives.
    cutoff = 0.0 if threshold_wc is None else float(threshold_wc)
    invalid = (watercolumn_ts < 0) | (watercolumn_ts <= cutoff)
    watercolumn_ts_filtered = watercolumn_ts[~invalid]
    dropped_outliers = watercolumn_ts[invalid]

    if len(dropped_outliers):
        print(
            f"{len(dropped_outliers)} records dropped "
            f"(negative or <= {cutoff} m water column / out of water)."
        )

    return resampled_ts.model_copy(
        update={
            "ts": watercolumn_ts_filtered,
            "outliers": dropped_outliers,
            "unit": "m",
            "variable": "water_column",
        },
        deep=True,
    )

compensate(raw, barometric, alignment_period='h', threshold_wc=0.025, fieldwork_dates=None, interpolate_method=None)

Compensate raw sensor pressure to groundwater head (m asl).

Computes the water column (see :func:water_column) and adds the sensor altitude.

Parameters:

Name Type Description Default
raw Timeseries | Dataset

Raw sensor timeseries

required
barometric Timeseries | float

Barometric pressure timeseries or a single float value. If a float value is provided, it is assumed to be in cmH2O.

required
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

'h'
threshold_wc float | None

Lower cutoff (in m) for the water column; records at or below it are dropped. Defaults to 0.025 m (25 mm) and is always applied; lower it to keep shallower columns, or set 0 to drop only negatives. Negative water columns are always dropped regardless, being physically impossible.

0.025
fieldwork_dates Dict[str, list]

Dictionary of location name and a list of fieldwork days. All records on the fieldwork day are set to None.

None
interpolate_method str

String representing the interpolate method as in pd.Series.interpolate() method.

None

Returns:

Type Description
Timeseries | Dataset | None

Timeseries | Dataset | None: head (variable 'head', unit 'm asl').

Source code in gensor/processing/compensation.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def compensate(
    raw: Timeseries | Dataset,
    barometric: Timeseries | float,
    alignment_period: Literal[
        "D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"
    ] = "h",
    threshold_wc: float | None = 0.025,
    fieldwork_dates: dict | None = None,
    interpolate_method: str | None = None,
) -> Timeseries | Dataset | None:
    """Compensate raw sensor pressure to groundwater head (m asl).

    Computes the water column (see :func:`water_column`) and adds the sensor altitude.

    Parameters:
        raw (Timeseries | Dataset): Raw sensor timeseries
        barometric (Timeseries | float): Barometric pressure timeseries or a single
            float value. If a float value is provided, it is assumed to be in cmH2O.
        alignment_period (Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']): The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float | None): Lower cutoff (in m) for the water column; records at
            or below it are dropped. Defaults to 0.025 m (25 mm) and is always applied;
            lower it to keep shallower columns, or set 0 to drop only negatives. Negative
            water columns are always dropped regardless, being physically impossible.
        fieldwork_dates (Dict[str, list]): Dictionary of location name and a list of
            fieldwork days. All records on the fieldwork day are set to None.
        interpolate_method (str): String representing the interpolate method as in
            pd.Series.interpolate() method.

    Returns:
        Timeseries | Dataset | None: head (variable 'head', unit 'm asl').
    """
    return _apply(
        "compensate",
        raw,
        barometric,
        alignment_period,
        threshold_wc,
        fieldwork_dates,
        interpolate_method,
    )

water_column(raw, barometric, alignment_period='h', threshold_wc=0.025, fieldwork_dates=None, interpolate_method=None)

Barometrically compensate raw sensor pressure to the water column above the sensor.

This is the first step of :func:compensate exposed on its own: subtract the barometric pressure, convert to mH2O, mask fieldwork days, and drop out-of-water records (see threshold_wc) - without adding the sensor altitude, so the result is the water column height in metres (variable 'water_column', unit 'm') rather than head.

Parameters:

Name Type Description Default
raw Timeseries | Dataset

Raw sensor timeseries

required
barometric Timeseries | float

Barometric pressure timeseries or a single float value. If a float value is provided, it is assumed to be in cmH2O.

required
alignment_period Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']

The alignment period for the timeseries. Default is 'h'. See pandas offset aliases for definitinos.

'h'
threshold_wc float | None

Lower cutoff (in m) for the water column; records at or below it are dropped. Defaults to 0.025 m (25 mm) and is always applied; lower it to keep shallower columns, or set 0 to drop only negatives. Negative water columns are always dropped regardless, being physically impossible.

0.025
fieldwork_dates Dict[str, list]

Dictionary of location name and a list of fieldwork days. All records on the fieldwork day are set to None.

None
interpolate_method str

String representing the interpolate method as in pd.Series.interpolate() method.

None

Returns:

Type Description
Timeseries | Dataset | None

Timeseries | Dataset | None: the water column height (variable 'water_column', unit 'm').

Source code in gensor/processing/compensation.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def water_column(
    raw: Timeseries | Dataset,
    barometric: Timeseries | float,
    alignment_period: Literal[
        "D", "ME", "SME", "MS", "YE", "YS", "h", "min", "s"
    ] = "h",
    threshold_wc: float | None = 0.025,
    fieldwork_dates: dict | None = None,
    interpolate_method: str | None = None,
) -> Timeseries | Dataset | None:
    """Barometrically compensate raw sensor pressure to the water column above the sensor.

    This is the first step of :func:`compensate` exposed on its own: subtract the
    barometric pressure, convert to mH2O, mask fieldwork days, and drop out-of-water
    records (see ``threshold_wc``) - without adding the sensor altitude, so the result is
    the water column height in metres (variable 'water_column', unit 'm') rather than head.

    Parameters:
        raw (Timeseries | Dataset): Raw sensor timeseries
        barometric (Timeseries | float): Barometric pressure timeseries or a single
            float value. If a float value is provided, it is assumed to be in cmH2O.
        alignment_period (Literal['D', 'ME', 'SME', 'MS', 'YE', 'YS', 'h', 'min', 's']): The alignment period for the timeseries.
            Default is 'h'. See pandas offset aliases for definitinos.
        threshold_wc (float | None): Lower cutoff (in m) for the water column; records at
            or below it are dropped. Defaults to 0.025 m (25 mm) and is always applied;
            lower it to keep shallower columns, or set 0 to drop only negatives. Negative
            water columns are always dropped regardless, being physically impossible.
        fieldwork_dates (Dict[str, list]): Dictionary of location name and a list of
            fieldwork days. All records on the fieldwork day are set to None.
        interpolate_method (str): String representing the interpolate method as in
            pd.Series.interpolate() method.

    Returns:
        Timeseries | Dataset | None: the water column height (variable 'water_column',
            unit 'm').
    """
    return _apply(
        "water_column",
        raw,
        barometric,
        alignment_period,
        threshold_wc,
        fieldwork_dates,
        interpolate_method,
    )

smoothing

Tools for smoothing the data.

smooth_data(data, window=5, method='rolling_mean', print_statistics=False, inplace=False, plot=False)

Smooth a time series using a rolling mean or median.

Parameters:

Name Type Description Default
data Series

The time series data.

required
window int

The size of the window for the rolling mean or median. Defaults to 5.

5
method str

The method to use for smoothing. Either 'rolling_mean' or 'rolling_median'. Defaults to 'rolling_mean'.

'rolling_mean'

Returns:

Type Description
Series | None

pandas.Series: The smoothed time series.

Source code in gensor/processing/smoothing.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def smooth_data(
    data: Timeseries,
    window: int = 5,
    method: str = "rolling_mean",
    print_statistics: bool = False,
    inplace: bool = False,
    plot: bool = False,
) -> Series | None:
    """Smooth a time series using a rolling mean or median.

    Args:
        data (pandas.Series): The time series data.
        window (int): The size of the window for the rolling mean or median. Defaults to 5.
        method (str): The method to use for smoothing. Either 'rolling_mean' or 'rolling_median'. Defaults to 'rolling_mean'.

    Returns:
        pandas.Series: The smoothed time series.
    """
    if method == "rolling_mean":
        smoothed_data = data.ts.rolling(window=window, center=True).mean()
    elif method == "rolling_median":
        smoothed_data = data.ts.rolling(window=window, center=True).median()
    else:
        raise NotImplementedError()

    valid_indices = smoothed_data.notna()
    original_data_aligned = data.ts[valid_indices]
    smoothed_data_aligned = smoothed_data[valid_indices]

    if print_statistics:
        mse = root_mean_squared_error(original_data_aligned, smoothed_data_aligned)
        print(f"Mean Squared Error of {method}: {mse:.2f}")

    if plot:
        plt.figure(figsize=(12, 6))
        plt.plot(
            data.timeseries.index, data.timeseries, label="Original Data", color="black"
        )
        plt.plot(
            smoothed_data.index,
            smoothed_data,
            label=f"Moving Average ({method})",
            color="green",
            linestyle="dotted",
        )

        plt.legend()
        plt.title("Groundwater Level with Moving Average")
        plt.xlabel("Date")
        plt.ylabel("Groundwater Level")
        plt.show()

    if inplace:
        data.ts = smoothed_data
        return None
    else:
        return smoothed_data

transform

Transformation

Source code in gensor/processing/transform.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class Transformation:
    def __init__(
        self,
        data: Series,
        method: Literal[
            "difference",
            "log",
            "square_root",
            "box_cox",
            "standard_scaler",
            "minmax_scaler",
            "robust_scaler",
            "maxabs_scaler",
        ],
        **kwargs: Any,
    ) -> None:
        self.data = data

        if method == "difference":
            self.transformed_data, self.scaler = self.difference(**kwargs)
        elif method == "log":
            self.transformed_data, self.scaler = self.log()
        elif method == "square_root":
            self.transformed_data, self.scaler = self.square_root()
        elif method == "box_cox":
            self.transformed_data, self.scaler = self.box_cox(**kwargs)
        elif method == "standard_scaler":
            self.transformed_data, self.scaler = self.standard_scaler()
        elif method == "minmax_scaler":
            self.transformed_data, self.scaler = self.minmax_scaler()
        elif method == "robust_scaler":
            self.transformed_data, self.scaler = self.robust_scaler()
        elif method == "maxabs_scaler":
            self.transformed_data, self.scaler = self.maxabs_scaler()
        else:
            raise NotImplementedError()

    def get_transformation(self) -> tuple:
        return self.transformed_data, self.scaler

    def difference(self, **kwargs: int) -> tuple[Series, str]:
        """Difference the time series data.

        Keword Arguments:
            periods (int): The number of periods to shift. Defaults to 1.

        Returns:
            pandas.Series: The differenced time series data.
        """
        periods = kwargs.get("periods", 1)
        transformed = self.data.diff(periods=periods).dropna()

        return (transformed, "difference")

    def log(self) -> tuple[Series, str]:
        """Take the natural logarithm of the time series data.

        Returns:
            pandas.Series: The natural logarithm of the time series data.
        """
        transformed = self.data.apply(lambda x: x if x <= 0 else np.log(x))
        return (transformed, "log")

    def square_root(self) -> tuple[Series, str]:
        """Take the square root of the time series data.

        Returns:
            pandas.Series: The square root of the time series data.
        """
        transformed = self.data.apply(lambda x: x if x <= 0 else np.sqrt(x))
        return (transformed, "square_root")

    def box_cox(self, **kwargs: float) -> tuple[Series, str]:
        """Apply the Box-Cox transformation to the time series data. Only works
            for all positive datasets!

        Keyword Arguments:
            lmbda (float): The transformation parameter. If not provided, it is automatically estimated.

        Returns:
            pandas.Series: The Box-Cox transformed time series data.
        """
        lmbda = kwargs.get("lmbda")

        if (self.data <= 0).any():
            message = (
                "Box-Cox transformation requires all values to be strictly positive."
            )
            raise ValueError(message)

        # Box-Cox always returns a tuple: (transformed_data, lmbda)
        if lmbda is not None:
            transformed_data = stats.boxcox(self.data, lmbda=lmbda)
        else:
            transformed_data, lmbda = stats.boxcox(self.data, lmbda=lmbda)

        # Return the transformed series and mark the method used
        transformed_series = Series(transformed_data, index=self.data.index)
        return transformed_series, f"box-cox (lambda={lmbda})"

    def standard_scaler(self) -> tuple[Series, Any]:
        """Normalize a pandas Series using StandardScaler."""
        scaler = StandardScaler()
        scaled_values = scaler.fit_transform(
            self.data.to_numpy().reshape(-1, 1)
        ).flatten()
        scaled_series = Series(scaled_values, index=self.data.index)
        return scaled_series, scaler

    def minmax_scaler(self) -> tuple[Series, Any]:
        """Normalize a pandas Series using MinMaxScaler."""
        scaler = MinMaxScaler()
        scaled_values = scaler.fit_transform(
            self.data.to_numpy().reshape(-1, 1)
        ).flatten()
        scaled_series = Series(scaled_values, index=self.data.index)
        return scaled_series, scaler

    def robust_scaler(self) -> tuple[Series, Any]:
        """Normalize a pandas Series using RobustScaler."""
        scaler = RobustScaler()
        scaled_values = scaler.fit_transform(
            self.data.to_numpy().reshape(-1, 1)
        ).flatten()
        scaled_series = Series(scaled_values, index=self.data.index)
        return scaled_series, scaler

    def maxabs_scaler(self) -> tuple[Series, Any]:
        """Normalize a pandas Series using MaxAbsScaler."""
        scaler = MaxAbsScaler()
        scaled_values = scaler.fit_transform(
            self.data.to_numpy().reshape(-1, 1)
        ).flatten()
        scaled_series = Series(scaled_values, index=self.data.index)
        return scaled_series, scaler
box_cox(**kwargs)

Apply the Box-Cox transformation to the time series data. Only works for all positive datasets!

Other Parameters:

Name Type Description
lmbda float

The transformation parameter. If not provided, it is automatically estimated.

Returns:

Type Description
tuple[Series, str]

pandas.Series: The Box-Cox transformed time series data.

Source code in gensor/processing/transform.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def box_cox(self, **kwargs: float) -> tuple[Series, str]:
    """Apply the Box-Cox transformation to the time series data. Only works
        for all positive datasets!

    Keyword Arguments:
        lmbda (float): The transformation parameter. If not provided, it is automatically estimated.

    Returns:
        pandas.Series: The Box-Cox transformed time series data.
    """
    lmbda = kwargs.get("lmbda")

    if (self.data <= 0).any():
        message = (
            "Box-Cox transformation requires all values to be strictly positive."
        )
        raise ValueError(message)

    # Box-Cox always returns a tuple: (transformed_data, lmbda)
    if lmbda is not None:
        transformed_data = stats.boxcox(self.data, lmbda=lmbda)
    else:
        transformed_data, lmbda = stats.boxcox(self.data, lmbda=lmbda)

    # Return the transformed series and mark the method used
    transformed_series = Series(transformed_data, index=self.data.index)
    return transformed_series, f"box-cox (lambda={lmbda})"
difference(**kwargs)

Difference the time series data.

Keword Arguments

periods (int): The number of periods to shift. Defaults to 1.

Returns:

Type Description
tuple[Series, str]

pandas.Series: The differenced time series data.

Source code in gensor/processing/transform.py
54
55
56
57
58
59
60
61
62
63
64
65
66
def difference(self, **kwargs: int) -> tuple[Series, str]:
    """Difference the time series data.

    Keword Arguments:
        periods (int): The number of periods to shift. Defaults to 1.

    Returns:
        pandas.Series: The differenced time series data.
    """
    periods = kwargs.get("periods", 1)
    transformed = self.data.diff(periods=periods).dropna()

    return (transformed, "difference")
log()

Take the natural logarithm of the time series data.

Returns:

Type Description
tuple[Series, str]

pandas.Series: The natural logarithm of the time series data.

Source code in gensor/processing/transform.py
68
69
70
71
72
73
74
75
def log(self) -> tuple[Series, str]:
    """Take the natural logarithm of the time series data.

    Returns:
        pandas.Series: The natural logarithm of the time series data.
    """
    transformed = self.data.apply(lambda x: x if x <= 0 else np.log(x))
    return (transformed, "log")
maxabs_scaler()

Normalize a pandas Series using MaxAbsScaler.

Source code in gensor/processing/transform.py
141
142
143
144
145
146
147
148
def maxabs_scaler(self) -> tuple[Series, Any]:
    """Normalize a pandas Series using MaxAbsScaler."""
    scaler = MaxAbsScaler()
    scaled_values = scaler.fit_transform(
        self.data.to_numpy().reshape(-1, 1)
    ).flatten()
    scaled_series = Series(scaled_values, index=self.data.index)
    return scaled_series, scaler
minmax_scaler()

Normalize a pandas Series using MinMaxScaler.

Source code in gensor/processing/transform.py
123
124
125
126
127
128
129
130
def minmax_scaler(self) -> tuple[Series, Any]:
    """Normalize a pandas Series using MinMaxScaler."""
    scaler = MinMaxScaler()
    scaled_values = scaler.fit_transform(
        self.data.to_numpy().reshape(-1, 1)
    ).flatten()
    scaled_series = Series(scaled_values, index=self.data.index)
    return scaled_series, scaler
robust_scaler()

Normalize a pandas Series using RobustScaler.

Source code in gensor/processing/transform.py
132
133
134
135
136
137
138
139
def robust_scaler(self) -> tuple[Series, Any]:
    """Normalize a pandas Series using RobustScaler."""
    scaler = RobustScaler()
    scaled_values = scaler.fit_transform(
        self.data.to_numpy().reshape(-1, 1)
    ).flatten()
    scaled_series = Series(scaled_values, index=self.data.index)
    return scaled_series, scaler
square_root()

Take the square root of the time series data.

Returns:

Type Description
tuple[Series, str]

pandas.Series: The square root of the time series data.

Source code in gensor/processing/transform.py
77
78
79
80
81
82
83
84
def square_root(self) -> tuple[Series, str]:
    """Take the square root of the time series data.

    Returns:
        pandas.Series: The square root of the time series data.
    """
    transformed = self.data.apply(lambda x: x if x <= 0 else np.sqrt(x))
    return (transformed, "square_root")
standard_scaler()

Normalize a pandas Series using StandardScaler.

Source code in gensor/processing/transform.py
114
115
116
117
118
119
120
121
def standard_scaler(self) -> tuple[Series, Any]:
    """Normalize a pandas Series using StandardScaler."""
    scaler = StandardScaler()
    scaled_values = scaler.fit_transform(
        self.data.to_numpy().reshape(-1, 1)
    ).flatten()
    scaled_series = Series(scaled_values, index=self.data.index)
    return scaled_series, scaler

testdata

Test data for Gensor package:

Attributes:

all (Path): The whole directory of test groundwater sensor data.
baro (Path): Timeseries of barometric pressure measurements.
pb01a (Path): Timeseries of a submerged logger.
pb02a_plain (Path): Timeseries from PB02A with the metadata removed.

all_paths: Traversable = resources.files(__name__) module-attribute

The whole directory of test groundwater sensor data.

baro: Traversable = all_paths / 'Barodiver_220427183008_BY222.csv' module-attribute

Timeseries of barometric pressure measurements.

pb01a: Traversable = all_paths / 'PB01A_moni_AV319_220427183019_AV319.csv' module-attribute

Timeseries of a submerged logger.

pb02a_plain: Traversable = all_paths / 'PB02A_plain.csv' module-attribute

Timeseries from PB02A with the metadata removed.