Source code for wavy.models

from __future__ import annotations

import warnings

import numpy as np
import pandas as pd
import tensorflow as tf
from keras import Sequential
from keras.layers import Conv1D, Dense, Flatten, Reshape
from sklearn.base import is_classifier
from sklearn.metrics import auc, roc_curve

from .panel import Panel, set_training_split


class _ConstantKerasModel(tf.keras.Model):
    """A Keras model that returns the input values as outputs."""

    def __init__(self):
        super().__init__()

    def call(self, inputs):
        return inputs


[docs]class BaseModel: """Base class for panel models.""" def __init__( self, x: Panel, y: Panel, model_type: str = None, loss: str = None, optimizer: str = None, metrics: list[str] = None, last_activation: str = None, ): """ Base model class. Args: x (``Panel``): Panel of input data. y (``Panel``): Panel of output data. model_type (``str``): Type of model. loss (``str``): Loss function. optimizer (``str``): Optimizer. metrics (``list[str]``): Metrics. last_activation (``str``): Last activation. """ PARAMS = { "regression": { "loss": "MSE", "optimizer": "adam", "metrics": ["mae"], "last_activation": "linear", }, "classification": { "loss": "binary_crossentropy", "optimizer": "adam", "metrics": ["AUC", "accuracy"], "last_activation": "sigmoid", }, "multi_classification": { "loss": "categorical_crossentropy", "optimizer": "adam", "metrics": ["AUC", "accuracy"], "last_activation": "softmax", }, } # Raise error when panel has nan values if x.findna_frames().any(): raise ValueError("Panel x has NaN values.") if y.findna_frames().any(): raise ValueError("Panel y has NaN values.") # Convert boolean in x and y to int for col in x.columns: if x[col].dtype == bool: x[col] = x[col].astype(int) for col in y.columns: if y[col].dtype == bool: y[col] = y[col].astype(int) if not model_type and y.unique().shape[0] == 2: model_type = "classification" elif not model_type and y.unique().shape[0] < 20: model_type = "multi_classification" elif not model_type: model_type = "regression" if ( not hasattr(x, "train_size") or not x.train_size or not hasattr(y, "train_size") or not y.train_size ): warnings.warn("Running set_training_split with default parameters") set_training_split(x, y) # Raise error if column is not numeric for sample in [x, y]: for col in sample.columns: if sample[col].dtype not in [np.float64, np.int64]: raise ValueError(f"Column {col} is not numeric.") self.x = x self.y = y self.model_type = model_type self.loss = loss or PARAMS[model_type]["loss"] self.optimizer = optimizer or PARAMS[model_type]["optimizer"] self.metrics = metrics or PARAMS[model_type]["metrics"] self.last_activation = last_activation or PARAMS[model_type]["last_activation"] self.set_arrays() self.build() self.compile() self.model._name = self.__class__.__name__
[docs] def set_arrays(self) -> None: """Set the arrays.""" self.x_train = self.x.train.values_panel self.x_val = self.x.val.values_panel self.x_test = self.x.test.values_panel self.y_train = self.y.train.values_panel.squeeze(axis=2) self.y_val = self.y.val.values_panel.squeeze(axis=2) self.y_test = self.y.test.values_panel.squeeze(axis=2)
[docs] def get_auc(self) -> float: """Get the AUC score.""" y = self.y_test.squeeze() prediction = self.model.predict(self.x_test).squeeze() fpr, tpr, _ = roc_curve(y, prediction) fpr, tpr, _ = roc_curve(y, prediction) return auc(fpr, tpr)
[docs] def fit(self, **kwargs) -> None: """ Fit the model. Args: **kwargs: Additional arguments to pass to the fit method. """ self.model.fit( self.x_train, self.y_train, validation_data=(self.x_val, self.y_val), **kwargs, )
[docs] def compile(self, **kwargs) -> None: """Compile the model. Args: **kwargs: Additional arguments to pass to the compile method. """ self.model.compile( loss=self.loss, optimizer=self.optimizer, metrics=self.metrics, **kwargs )
[docs] def build(self) -> None: """Build the model.""" pass
[docs] def predict_proba(self, data: Panel = None, **kwargs) -> Panel: """Predict probabilities. Args: data (``Panel``): Panel of data to predict. **kwargs: Additional arguments to pass to the predict method. Returns: Panel of predicted probabilities. """ if data is not None: x = data.values_panel if data.index.nlevels == 1: index = pd.Index(data.get_timesteps(0)) else: index = pd.MultiIndex.from_arrays([data.ids, data.get_timesteps(0)]) else: x = np.concatenate([self.x_train, self.x_val, self.x_test], axis=0) if self.y.index.nlevels == 1: index = pd.Index( np.concatenate( [self.y.train.index, self.y.val.index, self.y.test.index] ), name=self.y.index.name, ) else: index = pd.MultiIndex.from_tuples( np.concatenate( [self.y.train.index, self.y.val.index, self.y.test.index] ), names=self.y.index.names, ) return Panel( self.model.predict(x), columns=self.y.columns, index=index, )
[docs] def predict(self, data: Panel = None, **kwargs) -> Panel: """Predict. Args: data (``Panel``): Panel of data to predict. **kwargs: Additional arguments to pass to the predict method. Returns: ``Panel`` of predicted values. """ threshold = self.get_auc() if self.model_type == "classification" else None panel = self.predict_proba(data=data, **kwargs) return ( panel if threshold is None else panel.apply(lambda x: (x > threshold) + 0) )
[docs] def score(self, on: list[str] | str = None, **kwargs) -> pd.DataFrame: """Score the model. Args: on (``list[str]`` or ``str``): Columns to score on. **kwargs: Additional arguments to pass to the score method. Returns: Panel of scores. """ on = [on] if on else ["train", "val", "test"] dic = {} if "train" in on: dic["train"] = self.model.evaluate( self.x_train, self.y_train, verbose=0, **kwargs ) if "test" in on: dic["test"] = self.model.evaluate( self.x_test, self.y_test, verbose=0, **kwargs ) if "val" in on: dic["val"] = self.model.evaluate( self.x_val, self.y_val, verbose=0, **kwargs ) indexes = [ self.model.metrics_names.index(metric.lower()) for metric in self.metrics ] return pd.DataFrame( {key: [value[index] for index in indexes] for key, value in dic.items()}, index=self.metrics, )
[docs] def residuals(self) -> Panel: """Residuals. Returns: ``Panel`` of residuals. """ return self.predict() - self.y
class _Baseline(BaseModel): def __init__( self, x, y, model_type: str = None, loss: str = None, metrics: list[str] = None, ): super().__init__(x=x, y=y, model_type=model_type, loss=loss, metrics=metrics) def build(self) -> None: """Build the model.""" self.model = _ConstantKerasModel()
[docs]class BaselineShift(_Baseline): """Baseline shift model.""" # ! Maybe shift should be y.horizon by default, to avoid leakage # TODO test with different gap and horizon values def __init__( self, x, y, model_type: str = None, loss: str = None, metrics: list[str] = None, fillna=0, shift=1, ): self.fillna = fillna self.shift = shift super().__init__(x=x, y=y, model_type=model_type, loss=loss, metrics=metrics)
[docs] def set_arrays(self): """Set the arrays.""" self.x_train = ( self.y.train.shift(self.shift * self.x.num_timesteps) .fillna(self.fillna) .values ) self.x_val = ( self.y.val.shift(self.shift * self.x.num_timesteps) .fillna(self.fillna) .values ) self.x_test = ( self.y.test.shift(self.shift * self.x.num_timesteps) .fillna(self.fillna) .values ) self.y_train = self.y.train.values self.y_val = self.y.val.values self.y_test = self.y.test.values
[docs] def build(self) -> None: """Build the model.""" self.model = _ConstantKerasModel()
[docs]class BaselineConstant(_Baseline): """Baseline constant model.""" # TODO BUG: Not working when model_type="classification" def __init__( self, x, y, model_type: str = None, loss: str = None, metrics: list[str] = None, constant: float = 0, ): self.constant = constant if model_type == "regression" else int(constant) super().__init__(x=x, y=y, model_type=model_type, loss=loss, metrics=metrics)
[docs] def set_arrays(self) -> None: """Set the arrays.""" self.x_train = np.full(self.y.train.shape, self.constant) self.x_val = np.full(self.y.val.shape, self.constant) self.x_test = np.full(self.y.test.shape, self.constant) self.y_train = self.y.train.values self.y_val = self.y.val.values self.y_test = self.y.test.values
[docs]class DenseModel(BaseModel): """Dense model.""" def __init__( self, x, y, model_type: str = None, dense_layers: int = 1, dense_units: int = 32, activation: str = "relu", loss: str = None, optimizer: str = None, metrics: list[str] = None, last_activation: str = None, ): """ Dense Model. Args: panel (``Panel``): Panel with data model_type (``str``): Model type (regression, classification, multi_classification) dense_layers (``int``): Number of dense layers dense_units (``int``)t``)t``)t``)t``): Number of neurons in each dense layer activation (``str``): Activation type of each dense layer loss (``str``): Loss name optimizer (``str``): Optimizer name metrics (``list[str]``): Metrics list last_activation (``str``): Activation type of the last layer Returns: ``DenseModel``: Constructed DenseModel """ self.dense_layers = dense_layers self.dense_units = dense_units self.activation = activation super().__init__( x=x, y=y, model_type=model_type, loss=loss, optimizer=optimizer, metrics=metrics, last_activation=last_activation, )
[docs] def build(self) -> None: """Build the model.""" dense = Dense(units=self.dense_units, activation=self.activation) layers = [Flatten()] # (time, features) => (time*features) layers += [dense for _ in range(self.dense_layers)] # layers += [drop] layers += [ Dense( units=self.y.num_timesteps * self.y.num_columns, activation=self.last_activation, ), Reshape((self.y.num_columns,)), ] self.model = Sequential(layers)
[docs]class ConvModel(BaseModel): """Convolutional model.""" def __init__( self, x: Panel, y: Panel, model_type: str = None, conv_layers: int = 1, conv_filters: int = 32, kernel_size: int = 3, dense_layers: int = 1, dense_units: int = 32, activation: str = "relu", loss: str = None, optimizer: str = None, metrics: list[str] = None, last_activation: str = None, ): """ Convolution Model. Args: x (``Panel``): Panel with x data y (``Panel``): Panel with y data model_type (``str``): Model type (regression, classification, multi_classification) conv_layers (``int``): Number of convolution layers conv_filters (``int``): Number of convolution filters kernel_size (``int``): Kernel size of convolution layer dense_layers (``int``): Number of dense layers dense_units (``int``): Number of neurons in each dense layer activation (``str``): Activation type of each dense layer loss (``str``): Loss name optimizer (``str``): Optimizer name metrics (``list[str]``): Metrics list last_activation (``str``): Activation type of the last layer Returns: ``DenseModel``: Constructed DenseModel """ if x.shape_panel[1] < kernel_size: raise ValueError( f"Lookback ({x.shape_panel[1]}) must be greater or equal to kernel_size ({kernel_size})" ) self.conv_layers = conv_layers self.conv_filters = conv_filters self.kernel_size = kernel_size self.dense_layers = dense_layers self.dense_units = dense_units self.activation = activation super().__init__( x=x, y=y, model_type=model_type, loss=loss, optimizer=optimizer, metrics=metrics, last_activation=last_activation, )
[docs] def build(self) -> None: """Build the model.""" if self.x.num_timesteps % self.kernel_size != 0: warnings.warn("Kernel size is not a divisor of lookback.") conv = Conv1D( filters=self.conv_filters, kernel_size=self.kernel_size, activation=self.activation, ) dense = Dense(units=self.dense_units, activation=self.activation) layers = [conv for _ in range(self.conv_layers)] layers += [Flatten()] layers += [conv for _ in range(self.conv_layers)] layers += [dense for _ in range(self.dense_layers)] layers += [ Dense( units=self.y.num_timesteps * self.y.num_columns, activation=self.last_activation, ), Reshape((self.y.num_columns,)), ] self.model = Sequential(layers)
[docs]class LinearRegression(DenseModel): """Linear regression model.""" def __init__(self, x, y, **kwargs): super().__init__(x=x, y=y, model_type="regression", dense_layers=0, **kwargs)
[docs]class LogisticRegression(DenseModel): """Logistic regression model.""" def __init__(self, x, y, **kwargs): super().__init__( x=x, y=y, model_type="classification", dense_layers=0, **kwargs )
[docs]class ShallowModel: """Shallow model.""" def __init__(self, x: Panel, y: Panel, model: str, metrics: list[str], **kwargs): """Shallow Model. Args: x (``Panel``): Panel with x data y (``Panel``): Panel with y data model (``str``): Model (regression, classification, multi_classification) metrics (``list[str]``): Metrics list **kwargs: Additional arguments Returns: ``ShallowModel``: Constructed ShallowModel """ self.x = x self.y = y if len(self.y.columns) > 1: raise ValueError("ShallowModel can only be used for single-output models.") self.model = model(**kwargs) self.metrics = metrics self.set_arrays()
[docs] def set_arrays(self) -> None: """ Sets arrays for training, testing, and validation. """ self.x_train = self.x.train.flatten_panel().values self.y_train = self.y.train.values self.x_val = self.x.val.flatten_panel().values self.y_val = self.y.val.values self.x_test = self.x.test.flatten_panel().values self.y_test = self.y.test.values
[docs] def fit(self, **kwargs): """Fit the model. Args: **kwargs: Keyword arguments for the fit method of the model. """ self.model.fit(X=self.x_train, y=self.y_train, **kwargs)
[docs] def get_auc(self) -> float: """Get the AUC score.""" y = self.y_test.squeeze() prediction = self.model.predict(self.x_test).squeeze() fpr, tpr, _ = roc_curve(y, prediction) return auc(fpr, tpr)
[docs] def predict_proba(self, data: Panel = None) -> Panel: """Predict probabilities. Args: data (``Panel``): Panel with data Returns: ``ShallowModel``: The predicted probabilities. """ if data is not None: x = data.flatten_panel().values if data.index.nlevels == 1: index = pd.Index(data.get_timesteps(0)) else: index = pd.MultiIndex.from_arrays([data.ids, data.get_timesteps(0)]) else: x = np.concatenate([self.x_train, self.x_val, self.x_test], axis=0) if self.y.index.nlevels == 1: index = pd.Index( np.concatenate( [self.y.train.index, self.y.val.index, self.y.test.index] ), name=self.y.index.name, ) else: index = pd.MultiIndex.from_tuples( np.concatenate( [self.y.train.index, self.y.val.index, self.y.test.index] ), names=self.y.index.names, ) output = self.model.predict_proba(x) if output.shape[1] == 2: output = output[:, 1] return Panel( output, columns=self.y.columns, index=index, ) return Panel( output, columns=[f"{i}_prob" for i in range(output.shape[1])], index=index, )
[docs] def predict(self, data: Panel = None) -> Panel: """Predict on data. Args: data (``Panel``, optional): Data to predict on. Defaults to None. Returns: ``Panel``: Predicted data """ if is_classifier(self.model): threshold = self.get_auc() panel = self.predict_proba(data) return panel.apply(lambda x: (x > threshold) + 0) else: if data is not None: x = data.flatten_panel().values index = pd.MultiIndex.from_arrays([data.ids, data.first_timestamp]) else: x = np.concatenate([self.x_train, self.x_val, self.x_test], axis=0) index = pd.MultiIndex.from_tuples( np.concatenate( [self.y.train.index, self.y.val.index, self.y.test.index] ), names=self.y.index.names, ) return Panel( self.model.predict(x), columns=self.y.columns, index=index, )
[docs] def score(self, on: list[str] | str = None) -> pd.DataFrame: """Score the model. Args: on (``list[str]`` or ``str``): Data to use for scoring Returns: ``pd.Series``: Score """ on = [on] if on else ["train", "val", "test"] dic = {} if "train" in on: metrics_dict = { a.__name__: a( self.y.train.values.squeeze(), self.predict(self.x.train).values.squeeze(), ) for a in self.metrics } dic["train"] = metrics_dict if "test" in on: metrics_dict = { a.__name__: a( self.y.test.values.squeeze(), self.predict(self.x.test).values.squeeze(), ) for a in self.metrics } dic["test"] = metrics_dict if "val" in on: metrics_dict = { a.__name__: a( self.y.val.values.squeeze(), self.predict(self.x.val).values.squeeze(), ) for a in self.metrics } dic["val"] = metrics_dict return pd.DataFrame(dic, index=[a.__name__ for a in self.metrics])
[docs] def residuals(self) -> Panel: """Residuals. Returns: ``Panel``: Residuals """ return self.predict() - self.y
[docs]def compute_score_per_model(*models, on="val"): # BUG """ Compute score per model Args: *models: Models to score on (``str``, optional): Data to use for scoring. Defaults to "val". Returns: pd.DataFrame: Scores """ return pd.DataFrame( [model.score(on=on) for model in models], index=[model.model.name for model in models], )
[docs]def compute_default_scores(x, y, model_type, epochs=10, verbose=0, **kwargs): # BUG """ Compute default scores for a model. Args: x (``Panel``): X data. y (``Panel``): Y data. model_type (``str``): Model type. epochs (``int``, optional): Number of epochs. Defaults to 10. verbose (``int``, optional): Verbosity. Defaults to 0. **kwargs: Keyword arguments for the model. Returns: pd.DataFrame: Scores """ models = [BaselineConstant, BaselineShift, DenseModel] models = [model(x=x, y=y, model_type=model_type) for model in models] for model in models: model.fit(epochs=epochs, verbose=verbose, **kwargs) return compute_score_per_model(*models)