Data source customization#

For implementing a custom data source extend gordo_core.data_providers.base.GordoBaseDataProvider. Override gordo_core.data_providers.base.GordoBaseDataProvider.load_series() method, it should return data from the data source in a correct format.

As a reference we could use CSV reader from gordo_core.data_providers.contrib module:

import pandas as pd

from pathlib import Path
from datetime import datetime

from gordo_core.data_providers.base import GordoBaseDataProvider
from gordo_core.utils import capture_args
from gordo_core.sensor_tag import unique_tag_names, Tag

from typing import Optional, Union, Iterable, Tuple


class CSVDataProvider(GordoBaseDataProvider):
    @capture_args  # required for proper data provider JSON serialization
    def __init__(
        self, file_path: Union[str, Path], timestamp_column: str, sep: str = ","
    ):
        """
        Parameters
        ----------
        file_path
            Path to a CSV file containing the data to be loaded.
        timestamp_column
            Column in the CSV file containing the timestamps for each row.
        sep
            Delimiter to use.
        """
        self.file_path = file_path
        self.timestamp_column = timestamp_column
        self.sep = sep

    def load_series(
        self,
        train_start_date: datetime,
        train_end_date: datetime,
        tag_list: list[Tag],
        dry_run: Optional[bool] = False,
        **kwargs,
    ) -> Iterable[Tuple[pd.Series, Tag]]:
        """
        Load data from the CSV file.
        """
        # this dict contains sensor tag names as keys, and Tag as values
        tags = unique_tag_names(tag_list)
        usecols = []
        if self.timestamp_column not in tags:
            usecols.append(self.timestamp_column)
        usecols.extend(tags.keys())
        df = pd.read_csv(
            self.file_path,
            sep=self.sep,
            usecols=usecols,
        )
        df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], utc=True)
        filtered = df[
            (df[self.timestamp_column] >= train_start_date)
            & (df[self.timestamp_column] < train_end_date)
        ]
        filtered = filtered.set_index(self.timestamp_column)
        for column in filtered:
            yield filtered[column], tags[column]

Then use this data provider with gordo_core.time_series.TimeSeriesDataset to load a CSV file:

In [1]: from gordo_core.time_series import TimeSeriesDataset

In [2]: from gordo_core.sensor_tag import SensorTag

In [3]: from gordo_core.data_providers.contrib.csv_provider import CSVDataProvider

In [4]: data_provider=CSVDataProvider("../examples/turbine_sensors.csv", "index")

In [5]: dataset = TimeSeriesDataset(
   ...:     train_start_date='2023-01-29 00:00:00+00:00',
   ...:     train_end_date='2023-01-31 00:00:00+00:00',
   ...:     tag_list=[SensorTag('Pressure'), SensorTag('RPM'), 'Temperature'],
   ...:     data_provider=data_provider,
   ...:     row_filter="`RPM` > 0",
   ...: )
   ...: 

In [6]: X, y = dataset.get_data()

In [7]: X
Out[7]: 
                           Pressure     RPM  Temperature
2023-01-29 00:00:00+00:00    4969.0  9609.0        550.0
2023-01-29 00:10:00+00:00    4850.0  9850.0        551.0
2023-01-29 00:20:00+00:00    4521.0  9814.0        554.0
2023-01-29 00:30:00+00:00    4458.0  9806.0        555.0
2023-01-29 00:40:00+00:00    4197.0  9785.0        556.0
...                             ...     ...          ...
2023-01-30 23:20:00+00:00    4567.0  9853.0        552.0
2023-01-30 23:30:00+00:00    4283.0  9829.0        557.0
2023-01-30 23:40:00+00:00    4910.0  9547.0        558.0
2023-01-30 23:50:00+00:00    4437.0  9776.0        566.0
2023-01-31 00:00:00+00:00    4437.0  9776.0        566.0

[289 rows x 3 columns]

tag_list could be specified either as gordo_core.sensor_tag.SensorTag object with additional metadata or as a string. str to gordo_core.sensor_tag.SensorTag conversion should be customized with overwriting gordo_core.data_providers.base.GordoBaseDataProvider.tag_normalizer() method.