Source code for ecgan.utils.transformation

"""Implementation of various normalizers for time series data."""
from abc import ABC, abstractmethod
from logging import getLogger
from typing import Dict, Union

import torch
from numpy import ndarray
from torch import Tensor, from_numpy

from ecgan.utils.custom_types import Transformation
from ecgan.utils.miscellaneous import to_torch

logger = getLogger(__name__)


[docs]class DataTransformation(ABC): """A base class for transformations to inherit from.""" def __init__(self): self.params = None
[docs] def fit(self, data: Union[ndarray, Tensor]) -> None: """Fit a transformation on a numpy array of data points.""" data_: Tensor = to_torch(data) if data_.dim() == 2: return self._fit_2d(data_) if data_.dim() == 3: return self._fit_3d(data_) raise ValueError( 'Array with shape {0} and {1} dimensions is not valid.' 'Please provide a 2D or 3D Array'.format(data_.shape, data_.dim()) )
@abstractmethod def _fit_2d(self, data: Tensor) -> None: raise NotImplementedError("The selected DataTransformation needs to implement the `_fit_2d` method.") @abstractmethod def _fit_3d(self, data: Tensor) -> None: raise NotImplementedError("The selected DataTransformation needs to implement the `_fit_3d` method.")
[docs] def transform(self, data: Union[ndarray, Tensor]) -> Tensor: """ Apply a transformation on a numpy array of data points. Requires an already fitted transformation. Returns: Transformed data. """ data_: Tensor = data if isinstance(data, Tensor) else from_numpy(data) if data_.dim() == 2: return self._transform_2d(data_) if data_.dim() == 3: return self._transform_3d(data_) raise ValueError( 'Array with shape {0} and {1} dimensions is not valid.' 'Please provide a 2D or 3D Array'.format(data_.shape, data_.dim()) )
@abstractmethod def _transform_2d(self, data: Tensor) -> Tensor: raise NotImplementedError("The selected DataTransformation needs to implement the `_transform_2d` method.") @abstractmethod def _transform_3d(self, data: Tensor) -> Tensor: raise NotImplementedError("The selected DataTransformation needs to implement the `_transform_3d` method.")
[docs] def fit_transform(self, data: Tensor) -> Tensor: """ First apply the fit and then perform the transformation on given data. The 2D case as well as the 3D case are transformed along the columns. In 2D this is useful for a typical feature matrix but not often useful for time series data where one might want to transform along the rows or the whole dataset. If you want to transform time series data, one way would be to use 3D transformation with shape (samples x sequence_length x 1). """ self.fit(data) return self.transform(data)
[docs]class MinMaxTransformation(DataTransformation): """Min-Max normalizer: scales the input to [0,1].""" def _fit_2d(self, data: Tensor) -> None: self.params = {'min': torch.min(data, dim=0), 'max': torch.max(data, dim=0)} def _transform_2d(self, data: Tensor) -> Tensor: if self.params['min'] is None or self.params['max'] is None: raise ValueError( 'Either min (value: {0}) or max (value: {1}) are not set during transform. ' 'Please fit your normalizer before transforming the data.'.format( self.params['min'], self.params['max'] ) ) _, columns = data.shape normalized_data = torch.zeros(data.shape) for column in range(columns): normalized_data[:, column] = (data[:, column] - self.params['min'][0][column]) / ( self.params['max'][0][column] - self.params['min'][0][column] ) return normalized_data def _fit_3d(self, data: Tensor) -> None: mins = [] maxs = [] _, _, columns = data.shape for column in range(columns): mins.append(torch.min(data[:, :, column])) maxs.append(torch.max(data[:, :, column])) self.params = {'min': mins, 'max': maxs} def _transform_3d(self, data: Tensor) -> Tensor: if self.params['min'] is None or self.params['max'] is None: raise ValueError( 'Either min (values: {0}) or max (values: {1}) are not set during transform. ' 'Please fit your normalizer before transforming the data.'.format( self.params['min'], self.params['max'] ) ) _, _, columns = data.shape normalized_data = torch.zeros(data.shape) for column in range(columns): normalized_data[:, :, column] = (data[:, :, column] - self.params['min'][column]) / ( self.params['max'][column] - self.params['min'][column] ) return normalized_data
[docs] def get_params(self) -> Dict: """Retrieve normalization parameters.""" return self.params # type: ignore
[docs] def set_params(self, params: Dict) -> None: """Set existing normalization parameters.""" self.params = params
[docs]class StandardizationTransformation(DataTransformation): """Standardize the data such that it is distributed to N(0,1).""" def _fit_2d(self, data: Tensor) -> None: self.params = { 'mean': torch.mean(data, dim=0), 'std': torch.std(data, dim=0), } def _transform_2d(self, data: Tensor) -> Tensor: if self.params['mean'] is None or self.params['std'] is None: raise ValueError( 'Either mean (value: {0}) or std (value: {1}) are not set during transform. ' 'Please fit your normalizer before transforming the data.'.format( self.params['mean'], self.params['std'] ) ) _, columns = data.shape normalized_data = torch.zeros(data.shape) for column in range(columns): normalized_data[:, column] = (data[:, column] - self.params['mean'][column]) / self.params['std'][column] return normalized_data def _fit_3d(self, data: Tensor) -> None: means = [] stds = [] _, _, columns = data.shape for column in range(columns): means.append(torch.mean(data[:, :, column])) stds.append(torch.std(data[:, :, column])) self.params = {'mean': means, 'std': stds} def _transform_3d(self, data: Tensor) -> Tensor: if self.params['mean'] is None or self.params['std'] is None: raise ValueError( 'Either means (values: {0}) or stds (values: {1}) are not set during transform. ' 'Please fit your normalizer before transforming the data.'.format( self.params['mean'], self.params['std'] ) ) _, _, columns = data.shape normalized_data = torch.zeros(data.shape) for column in range(columns): normalized_data[:, :, column] = (data[:, :, column] - self.params['mean'][column]) / self.params['std'][ column ] return normalized_data
[docs]class WhiteningTransformation(DataTransformation): """ Apply a Whitening transformation on data. The Whitening transformation returns decorrelated data i.e. data with unit covariance matrix. """ def __init__(self, fudge=1e-16): super().__init__() self.fudge = fudge def _compute_whitening_matrix(self, column) -> Tensor: eigenvalues, eigenvectors = ( self.params['eigenvalues'][column], self.params['eigenvectors'][column], ) diag = torch.diag(1 / torch.sqrt(torch.abs(eigenvalues) + self.fudge)) whitening = torch.mm(torch.mm(eigenvectors, diag), eigenvectors.t()) return whitening def _fit_2d(self, data: Tensor) -> None: _, columns = data.shape mean = torch.mean(data, dim=0) eigenvalues, eigenvectors = [], [] for column in range(columns): mean_ = mean[column] data_column = data[:, column] shifted_data = data_column - mean_ cov = torch.dot(shifted_data.t(), shifted_data) eigenvalue, eigenvector = torch.symeig(cov.view(1, 1), eigenvectors=True) eigenvalues.append(eigenvalue) eigenvectors.append(eigenvector) self.params = {'eigenvalues': eigenvalues, 'eigenvectors': eigenvectors} def _fit_3d(self, data: Tensor) -> None: _, _, columns = data.shape eigenvalues, eigenvectors = [], [] for column in range(columns): data_column = data[:, :, column].t() mean = torch.mean(data_column, dim=0) shifted_data = data_column - mean cov = torch.mm(shifted_data.t(), shifted_data) eigenvalue, eigenvector = torch.symeig(cov, eigenvectors=True) eigenvalues.append(eigenvalue) eigenvectors.append(eigenvector) self.params = {'eigenvalues': eigenvalues, 'eigenvectors': eigenvectors} def _transform_2d(self, data: Tensor) -> Tensor: if self.params['eigenvalues'] is None or self.params['eigenvectors'] is None: raise ValueError( 'Either eigenvalues (value: {0}) or eigenvectors (value: {1}) are not set during transform. ' 'Please fit your normalizer before transforming the data.'.format( self.params['eigenvalues'], self.params['eigenvectors'] ) ) rows, columns = data.shape normalized_data = torch.zeros(data.shape) for column in range(columns): whitening = self._compute_whitening_matrix(column) data_column = data[:, column].view(1, rows) normalized_data[:, column] = torch.mm(data_column.t(), whitening)[:, 0] return normalized_data def _transform_3d(self, data: Tensor) -> Tensor: if self.params['eigenvalues'] is None or self.params['eigenvectors'] is None: raise ValueError( 'Either eigenvalues (values: {0}) or eigenvectors (values: {1}) are not set during transform. ' 'Please fit your normalizer before transforming the data.'.format( self.params['eigenvalues'], self.params['eigenvectors'] ) ) _, _, columns = data.shape normalized_data = torch.zeros(data.shape) for column in range(columns): whitening = self._compute_whitening_matrix(column) data_column = data[:, :, column].t() normalized_data[:, :, column] = torch.mm(data_column, whitening).t() return normalized_data
[docs]class FFTTransformation(DataTransformation): """Compute the 2D or 3D discrete Fourier transform using the PyTorch FFT implementation.""" def _fit_2d(self, data: Tensor) -> None: return def _fit_3d(self, data: Tensor) -> None: return def _transform_2d(self, data: Tensor) -> Tensor: """ Return the 2D data transformed via FFT. The FFT maps each value in the data tensor to a complex number in the frequency domain. The method separates the two real and imaginary components into separate channels, alternating real and imaginary components of the value in frequency domain. Thus a tensor of size (batch_size, seq_length) is transformed to a tensor (batch_size, seq_length, 2), with floating point entries. """ fourier_coeffs = torch.fft.fft(data, norm="ortho") zipped = list(zip(fourier_coeffs.real.tolist(), fourier_coeffs.imag.tolist())) zipped_tensor: Tensor = torch.tensor(zipped) return zipped_tensor.permute(0, 2, 1) def _transform_3d(self, data: Tensor) -> Tensor: """ Return the 3D data transformed via FFT. The FFT maps each value in the data tensor to a complex number in the frequency domain. The method separates the two real and imaginary components into separate channels, alternating real and imaginary components of the value in frequency domain. Thus a tensor of size (batch_size, seq_length, num_channels) is transformed to a tensor (batch_size, seq_length, 2 * num_channels), with floating point entries. """ batch_size, seq_length, num_channels = data.shape fourier_coeffs = torch.fft.fftn(data, norm="ortho") zipped = list( zip( fourier_coeffs.real.flatten().tolist(), fourier_coeffs.imag.flatten().tolist(), ) ) zipped_tensor: Tensor = torch.tensor(zipped) return zipped_tensor.reshape(batch_size, seq_length, 2 * num_channels)
[docs]class SamplewiseMinmaxTransformation(DataTransformation): """ Scales each sample to the [0, 1] range. MinMaxTransformation scales in the same way but per channel, not per sample. """ def _fit_2d(self, data: Tensor) -> None: pass def _transform_2d(self, data: Tensor) -> Tensor: raise NotImplementedError("2D samplewise minmax scaling not yet supported.") def _fit_3d(self, data: Tensor) -> None: pass
[docs] @staticmethod def transform_1d(sample): """Scale individual series to 0 and 1.""" normalized_data = (sample - torch.min(sample)) / (torch.max(sample) - torch.min(sample)) return normalized_data
def _transform_3d( self, data: Tensor, ) -> Tensor: data = data.permute(0, 2, 1) normalized_data = torch.zeros(data.shape) batch, columns, _ = normalized_data.shape for sample in range(batch): for col in range(columns): normalized_data[sample][col] = self.transform_1d(data[sample][col][:]) normalized_data = normalized_data.permute(0, 2, 1) return normalized_data
[docs]class NoTransformation(DataTransformation): """Apply no transformation.""" def _fit_2d(self, data: Tensor) -> None: pass def _transform_2d(self, data: Tensor) -> Tensor: return data def _fit_3d(self, data: Tensor) -> None: pass def _transform_3d( self, data: Tensor, ) -> Tensor: return data
[docs]def get_transformation( transformation: Transformation, ) -> DataTransformation: """Transform the data. The output range depends on the normalizer chosen.""" transformations = { Transformation.MINMAX: MinMaxTransformation(), Transformation.WHITENING: WhiteningTransformation(), Transformation.STANDARDIZE: StandardizationTransformation(), Transformation.FOURIER: FFTTransformation(), Transformation.INDIVIDUAL: SamplewiseMinmaxTransformation(), Transformation.NONE: NoTransformation(), } try: return transformations[transformation] except KeyError: logger.warning('No known transformation with name {0}. Defaulting to no transformation.'.format(transformation)) return NoTransformation()