Source code for interpreTS.core.feature_extractor
import pandas as pd
import numpy as np
from pandas.tseries.frequencies import to_offset
from ..utils.feature_loader import Features
from ..utils.data_manager import load_metadata, load_feature_functions, load_validation_requirements
from ..utils.task_manager import TaskManager
[docs]
class FeatureExtractor:
DEFAULT_FEATURES_SMALL = [
Features.LENGTH, Features.MEAN, Features.VARIANCE, Features.STABILITY,
Features.ENTROPY, Features.SPIKENESS, Features.SEASONALITY_STRENGTH
]
DEFAULT_FEATURES_BIG = [
Features.ABSOLUTE_ENERGY, Features.BINARIZE_MEAN,
Features.CHANGE_IN_VARIANCE, Features.CROSSING_POINTS, Features.DISTANCE_TO_LAST_TREND_CHANGE, Features.DOMINANT,
Features.ENTROPY, Features.FLAT_SPOTS, Features.HETEROGENEITY,
Features.LINEARITY, Features.LENGTH, Features.MEAN, Features.MISSING_POINTS,
Features.PEAK, Features.SIGNIFICANT_CHANGES, Features.SPIKENESS, Features.STABILITY,
Features.STD_1ST_DER, Features.TROUGH, Features.VARIANCE, Features.MEAN_CHANGE,
Features.SEASONALITY_STRENGTH, Features.TREND_STRENGTH, Features.CHANGE_IN_VARIANCE
]
FEATURES_ALL = [
Features.ABOVE_9TH_DECILE, Features.BELOW_1ST_DECILE, Features.ABSOLUTE_ENERGY, Features.BINARIZE_MEAN,
Features.CHANGE_IN_VARIANCE, Features.CROSSING_POINTS, Features.DISTANCE_TO_LAST_TREND_CHANGE, Features.DOMINANT,
Features.ENTROPY, Features.FLAT_SPOTS, Features.HETEROGENEITY,
Features.LINEARITY, Features.LENGTH, Features.MEAN, Features.MISSING_POINTS, Features.OUTLIERS_IQR, Features.OUTLIERS_STD,
Features.PEAK, Features.SIGNIFICANT_CHANGES, Features.SPIKENESS, Features.STABILITY,
Features.STD_1ST_DER, Features.TROUGH, Features.VARIANCE, Features.MEAN_CHANGE,
Features.SEASONALITY_STRENGTH, Features.TREND_STRENGTH, Features.VARIABILITY_IN_SUB_PERIODS, Features.CHANGE_IN_VARIANCE
]
FOR_ML = [
Features.ABSOLUTE_ENERGY,Features.BINARIZE_MEAN,Features.DOMINANT,Features.ENTROPY,Features.FLAT_SPOTS,
Features.HETEROGENEITY,Features.LINEARITY,Features.LENGTH,Features.MEAN,Features.MISSING_POINTS,Features.PEAK,
Features.SIGNIFICANT_CHANGES,Features.SPIKENESS,Features.STABILITY,Features.STD_1ST_DER,Features.TROUGH,Features.VARIANCE,
Features.SEASONALITY_STRENGTH, Features.TREND_STRENGTH
]
CAN_USE_NAN = [
Features.MISSING_POINTS, Features.PEAK, Features.SPIKENESS, Features.TROUGH, Features.SEASONALITY_STRENGTH
]
def __init__(self, features=None, feature_params=None, window_size=np.nan, stride=1, id_column=None, sort_column=None, feature_column=None, group_by=None):
"""
Initialize the FeatureExtractor with a list of features to calculate and optional parameters for each feature.
Parameters
----------
features : list of Features constants or str, optional
A list of features to calculate, or a keyword ('small', 'big', 'all', 'for-ML', 'can-use-nan', 'empty').
Default is None, which calculates the small default feature set.
feature_params : dict, optional
Parameters for specific features, where keys are feature names and values are dicts of parameters.
window_size : int or str, optional
The size of the window for feature extraction.
- np.nan (entire series is used as a single window),
- int (number of samples in the window),
- str (time-based format).
Default is np.nan.
If a string is provided, it must follow a fixed frequency format.
Supported formats include:
- '1s' for 1 second
- '5min' for 5 minutes
- '0.5h' for 30 minutes
- '1h' for 1 hour
- '1d' for 1 day
- A combination of time units, e.g., '1h15min' for 1 hour and 15 minutes.
Only fixed frequencies are allowed. The window size cannot be a non-fixed frequency, such as business days ('1B') or other irregular intervals.
stride : int or str, optional
The step size for moving the window. Can be:
- int (number of samples to shift),
- str (time-based format).
Default is 1.
If a string is provided, it must follow the same fixed frequency format as the `window_size`.
id_column : str, optional
The name of the column used to identify different time series (optional).
sort_column : str, optional
The column to sort by before feature extraction (optional).
feature_column : str or None, optional
The column containing feature data. If None, features are calculated for all columns except ID and sort columns.
group_by : str or None, optional
Column name to group by. If None, no grouping is performed.
Raises
-------
ValueError
If any parameter is invalid.
"""
self.group_by = group_by
if isinstance(features, str):
if features.lower() == 'default-small':
self.features = self.DEFAULT_FEATURES_SMALL
elif features.lower() == 'default-big':
self.features = self.DEFAULT_FEATURES_BIG
elif features.lower() == 'all':
self.features = self.FEATURES_ALL
elif features.lower() == 'for-ml':
self.features = self.FOR_ML
elif features.lower() == 'can-use-nan':
self.features = self.FOR_ML
elif features.lower() == 'empty':
self.features = []
else:
raise ValueError(f"Invalid feature keyword '{features}'. Accepted values are: 'default-small', 'default-big', 'all', 'for-ML', 'can-use-nan', 'empty'.")
else:
self.features = features if features is not None else self.DEFAULT_FEATURES_SMALL
self.feature_params = feature_params if feature_params is not None else {}
for feature_name, params in self.feature_params.items():
if "window_size" in params:
print(f"Warning: 'window_size' parameter in feature_params for feature '{feature_name}' will be ignored.")
del params["window_size"]
self.window_size = window_size
self.stride = stride
self.id_column = id_column
self.sort_column = sort_column
self.feature_column = feature_column
self.feature_functions = load_feature_functions()
self.validation_requirements = load_validation_requirements()
self.task_manager = TaskManager(
self.feature_functions, self.window_size, self.features, self.stride,
self.feature_params, self.validation_requirements
)
self.task_manager._validate_parameters(self.features, self.feature_params, self.window_size, self.stride, self.id_column, self.sort_column)
self.feature_metadata = load_metadata()
[docs]
def validate_data_frequency(self, grouped_data):
"""
Validate that data has a consistent and defined frequency if window_size or stride are time-based.
Parameters
----------
data : pd.DataFrame
The time series data to validate.
Raises
------
ValueError
If data frequency is not defined or inconsistent.
"""
if isinstance(self.window_size, str) or isinstance(self.stride, str):
for _, group in grouped_data:
if not isinstance(group.index, pd.DatetimeIndex):
raise ValueError(
"Time-based window_size and stride require a time-indexed DataFrame with regular frequency."
)
if group.index.freq is None:
if len(group) < 3:
inferred_freq = None
else:
inferred_freq = pd.infer_freq(group.index)
if inferred_freq is None:
raise ValueError(
"Data index does not have a defined frequency. Use `.resample()` to align your data."
)
group.index.freq = inferred_freq
[docs]
def head(self, features_df, n=5):
"""
Returns the first n rows of the resulting DataFrame from the extract_features function.
Parameters
----------
features_df : pd.DataFrame
The resulting DataFrame from the extract_features function.
n : int, optional (default 5)
The number of rows to return. If n is negative, returns all rows except the last |n| rows.
Returns
-------
pd.DataFrame
The first n rows of the DataFrame.
"""
if not isinstance(features_df, pd.DataFrame):
raise ValueError("Input must be a DataFrame.")
if len(features_df) < n:
print(f"Warning: Only {len(features_df)} rows available in DataFrame.")
return features_df.head(n)
[docs]
def extract_features(self, data, progress_callback=None, mode='sequential', n_jobs=-1):
"""
Extract features from a time series dataset.
Parameters
----------
data : pd.DataFrame or pd.Series
The time series data for which features are to be extracted.
progress_callback : function, optional
A function to report progress, which takes a single argument: progress percentage (0-100).
mode : str, optional
The mode of processing. Can be 'parallel' for multi-threaded processing
or 'sequential' for single-threaded processing with real-time progress reporting.
n_jobs : int, optional
The number of jobs (processes) to run in parallel. Default is -1 (use all available CPUs).
Returns
-------
pd.DataFrame
A DataFrame containing calculated features for each window.
"""
if mode not in ['parallel', 'sequential', 'dask']:
raise ValueError(f"Invalid mode '{mode}'. Accepted values are: ['parallel', 'sequential']")
if isinstance(data, pd.Series):
data = data.to_frame(name='value')
if self.feature_column is None:
self.feature_column = 'value'
if isinstance(data.index, pd.MultiIndex):
data = data.reset_index()
if data.empty:
print("Warning: Input data is empty. Returning an empty DataFrame.")
return pd.DataFrame()
if self.sort_column:
data = data.sort_values(by=self.sort_column)
feature_columns = [self.feature_column] if self.feature_column else [col for col in data.columns if col not in {self.id_column, self.sort_column}]
grouped_data = data.groupby(self.id_column) if self.id_column else [(None, data)]
grouped_data = data.groupby(self.id_column) if self.id_column else [(None, data)]
# TODO
# grouped_data = self.group_data(data)
self.validate_data_frequency(grouped_data)
if mode == 'dask':
return self.task_manager._execute_dask(grouped_data, feature_columns)
tasks = self.task_manager._generate_tasks(grouped_data, feature_columns)
total_steps = len(tasks)
if mode == 'parallel':
results = self.task_manager._execute_parallel(tasks, n_jobs, progress_callback, total_steps)
else:
results = self.task_manager._execute_sequential(tasks, progress_callback, total_steps)
return pd.DataFrame(results)
[docs]
def group_data(self, data):
"""
Group data based on the group_by column.
Parameters
----------
data : pd.DataFrame
Input data.
Returns
-------
iterable
Grouped data.
"""
if self.group_by:
return data.groupby(self.group_by)
return [(None, data)]
[docs]
def extract_features_stream(self, data_stream, progress_callback=None):
"""
Extract features from a stream of time series data.
Parameters
----------
data_stream : iterable
An iterable that yields incoming data points as dictionaries with keys corresponding to column names.
progress_callback : function, optional
A function to report progress, which takes a single argument: the total number of processed points.
Yields
------
dict
A dictionary containing the calculated features for the current window.
"""
if not self.feature_column or not self.id_column:
raise ValueError("Feature column and ID column must be specified for streaming mode.")
buffers = {}
total_points = 0
time_based_window = isinstance(self.window_size, str)
if time_based_window:
if self.sort_column is None:
raise ValueError("A 'sort_column' must be specified when using a time-based window.")
data_stream = iter(data_stream)
try:
# Check if sort_column is datetime-based
sample_point = next(data_stream) # Get the first data point to check format
if not pd.to_datetime(sample_point[self.sort_column], errors='coerce'):
raise ValueError(f"Column '{self.sort_column}' does not contain valid datetime values.")
# Put the sample back into the generator stream
data_stream = iter([sample_point] + list(data_stream))
except Exception as e:
raise ValueError(f"Error in validating time-based column: {e}")
try:
window_offset = to_offset(self.window_size)
except ValueError:
raise ValueError(f"Invalid time-based window_size format: {self.window_size}. Supported formats are for example: '1s', '5min', '1h'.")
for new_point in data_stream:
total_points += 1
series_id = new_point[self.id_column]
if series_id not in buffers:
buffers[series_id] = []
buffers[series_id].append(new_point)
# Handle time-based windows
if time_based_window:
# Convert buffer to a DataFrame and check time range
buffer_df = pd.DataFrame(buffers[series_id])
if len(buffer_df) > 1: # Ensure at least two points to calculate a range
start_time = pd.to_datetime(buffer_df[self.sort_column].iloc[0])
end_time = pd.to_datetime(buffer_df[self.sort_column].iloc[-1])
if (end_time - start_time) >= window_offset:
# Extract features for the current buffer
feature_columns = [self.feature_column]
features = self.task_manager._process_window(buffer_df, feature_columns)
features[self.id_column] = series_id
yield features
buffers[series_id] = buffers[series_id][1:] # Remove oldest point
# Handle numeric windows
else:
if len(buffers[series_id]) > self.window_size:
buffers[series_id].pop(0)
if len(buffers[series_id]) == self.window_size:
buffer_df = pd.DataFrame(buffers[series_id])
feature_columns = [self.feature_column]
features = self.task_manager._process_window(buffer_df, feature_columns)
features[self.id_column] = series_id
yield features
if progress_callback:
progress_callback(total_points)
[docs]
def group_features_by_interpretability(self):
"""
Group features by their interpretability levels.
Returns
-------
dict
A dictionary where keys are interpretability levels ('easy', 'moderate', 'advanced'),
and values are lists of feature names.
"""
groups = {'easy': [], 'moderate': [], 'advanced': []}
for feature_name, metadata in self.feature_metadata.items():
level = metadata['level']
groups[level].append(feature_name)
return groups
[docs]
def add_custom_feature(self, name, function, metadata=None, params=None):
"""
Add a custom feature to the FeatureExtractor with optional parameters.
Parameters
----------
name : str
The name of the custom feature.
function : callable
A function that computes the feature. It should accept a Pandas Series and optional parameters as input.
metadata : dict, optional
A dictionary containing metadata about the feature, such as its interpretability level and description.
- level (str): Interpretability level ('easy', 'moderate', 'advanced').
- description (str): Description of the feature.
params : dict, optional
A dictionary of parameters to be passed to the feature function when it is executed.
Example:
{
'level': 'easy' | 'moderate' | 'advanced',
'description': 'Description of the feature.'
}
Raises
------
ValueError
If the feature name already exists or the function is not callable.
"""
if not hasattr(self, '_local_feature_functions'):
self.feature_functions = self.feature_functions.copy()
self.features = list(self.features)
self.feature_metadata = self.feature_metadata.copy()
self.feature_params = self.feature_params.copy()
if name in self.feature_functions:
raise ValueError(f"Feature '{name}' already exists.")
if not callable(function):
raise ValueError("The provided function is not callable.")
self.feature_functions[name] = function
self.features.append(name)
self.feature_params[name] = params or {}
if metadata:
if 'level' not in metadata or 'description' not in metadata:
raise ValueError("Metadata must include 'level' and 'description'.")
self.feature_metadata[name] = metadata
print(f"Custom feature '{name}' added successfully.")