Data source customization#
For implementing a custom data source extend gordo_core.data_providers.base.GordoBaseDataProvider.
Override gordo_core.data_providers.base.GordoBaseDataProvider.load_series() method, it should return data from
the data source in a correct format.
As a reference we could use CSV reader from gordo_core.data_providers.contrib module:
import pandas as pd
from pathlib import Path
from datetime import datetime
from gordo_core.data_providers.base import GordoBaseDataProvider
from gordo_core.utils import capture_args
from gordo_core.sensor_tag import unique_tag_names, Tag
from typing import Optional, Union, Iterable, Tuple
class CSVDataProvider(GordoBaseDataProvider):
@capture_args # required for proper data provider JSON serialization
def __init__(
self, file_path: Union[str, Path], timestamp_column: str, sep: str = ","
):
"""
Parameters
----------
file_path
Path to a CSV file containing the data to be loaded.
timestamp_column
Column in the CSV file containing the timestamps for each row.
sep
Delimiter to use.
"""
self.file_path = file_path
self.timestamp_column = timestamp_column
self.sep = sep
def load_series(
self,
train_start_date: datetime,
train_end_date: datetime,
tag_list: list[Tag],
dry_run: Optional[bool] = False,
**kwargs,
) -> Iterable[Tuple[pd.Series, Tag]]:
"""
Load data from the CSV file.
"""
# this dict contains sensor tag names as keys, and Tag as values
tags = unique_tag_names(tag_list)
usecols = []
if self.timestamp_column not in tags:
usecols.append(self.timestamp_column)
usecols.extend(tags.keys())
df = pd.read_csv(
self.file_path,
sep=self.sep,
usecols=usecols,
)
df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], utc=True)
filtered = df[
(df[self.timestamp_column] >= train_start_date)
& (df[self.timestamp_column] < train_end_date)
]
filtered = filtered.set_index(self.timestamp_column)
for column in filtered:
yield filtered[column], tags[column]
Then use this data provider with gordo_core.time_series.TimeSeriesDataset to load a CSV file:
In [1]: from gordo_core.time_series import TimeSeriesDataset
In [2]: from gordo_core.sensor_tag import SensorTag
In [3]: from gordo_core.data_providers.contrib.csv_provider import CSVDataProvider
In [4]: data_provider=CSVDataProvider("../examples/turbine_sensors.csv", "index")
In [5]: dataset = TimeSeriesDataset(
...: train_start_date='2023-01-29 00:00:00+00:00',
...: train_end_date='2023-01-31 00:00:00+00:00',
...: tag_list=[SensorTag('Pressure'), SensorTag('RPM'), 'Temperature'],
...: data_provider=data_provider,
...: row_filter="`RPM` > 0",
...: )
...:
In [6]: X, y = dataset.get_data()
In [7]: X
Out[7]:
Pressure RPM Temperature
2023-01-29 00:00:00+00:00 4969.0 9609.0 550.0
2023-01-29 00:10:00+00:00 4850.0 9850.0 551.0
2023-01-29 00:20:00+00:00 4521.0 9814.0 554.0
2023-01-29 00:30:00+00:00 4458.0 9806.0 555.0
2023-01-29 00:40:00+00:00 4197.0 9785.0 556.0
... ... ... ...
2023-01-30 23:20:00+00:00 4567.0 9853.0 552.0
2023-01-30 23:30:00+00:00 4283.0 9829.0 557.0
2023-01-30 23:40:00+00:00 4910.0 9547.0 558.0
2023-01-30 23:50:00+00:00 4437.0 9776.0 566.0
2023-01-31 00:00:00+00:00 4437.0 9776.0 566.0
[289 rows x 3 columns]
tag_list could be specified either as gordo_core.sensor_tag.SensorTag object with additional metadata or as a string.
str to gordo_core.sensor_tag.SensorTag conversion should be customized with overwriting
gordo_core.data_providers.base.GordoBaseDataProvider.tag_normalizer() method.