Source code for smadi.climatology

"""
A module for calculating climatology (climate normal) for different time steps (month, dekad, week) based on time series data.
"""

from typing import List
import pandas as pd
import matplotlib.pyplot as plt

from smadi.plot import get_plot_options, plot_colmns, plot_figure
from smadi.preprocess import (
    fillna,
    smooth,
    filter_df,
    monthly_agg,
    dekadal_agg,
    weekly_agg,
    bimonthly_agg,
    compute_clim,
)


[docs] class Aggregator: """ Base class for aggregation Attributes: ----------- df : pd.DataFrame The DataFrame containing the data to be aggregated. variable : str The variable/column in the DataFrame to be aggregated. fillna : bool Fill NaN values in the time series data using a moving window average. fillna_window_size : int The size of the moving window for filling NaN values. It is recommended to be an odd number. smoothing : bool Smooth the time series data using a moving window average. smooth_window_size : int The size of the moving window for smoothing(n-days). It is recommended to be an odd number. timespan : list[str, str] optional The start and end dates for a timespan to be aggregated. Format: ['YYYY-MM-DD', 'YYYY-MM-DD'] agg_metric : str The aggregation metric to be used. Supported values: 'mean', 'median', 'min', 'max', 'std', etc. resulted_df : pd.DataFrame The resulting DataFrame after aggregation. Methods: -------- _fillna: Fills NaN values in the time series data using a moving window average. _smooth: Smooths the time series data using a moving window average. _set_up_mode(): Filters the DataFrame based on the parameters provided to perform aggregation on a subset or all of the data. _filter_df: Filters the DataFrame based on specified time/date conditions. _validate_df_index: Validates the input DataFrame type and index. _validate_variable: Validates the variable to be aggregated. _validate_input: Validates the input parameters. aggregate: Aggregates the data based on the specified time step. """ def __init__( self, df: pd.DataFrame, variable: str, fillna: bool = False, fillna_window_size: int = None, smoothing=False, smooth_window_size=None, timespan: List[str] = None, agg_metric: str = "mean", ): """ Initializes the Aggregation class. """ self.original_df = df self.var = variable self.fillna = fillna self.fillna_window_size = fillna_window_size self.smoothing = smoothing self.smooth_window_size = smooth_window_size self.timespan = timespan self.agg_metric = agg_metric self._validate_input() self.resulted_df = pd.DataFrame() @property def df(self): """ Prepares the DataFrame for aggregation. """ # Resample the data to daily frequency _df = pd.DataFrame(self.original_df[self.var]).resample("D").mean() # Truncate the data based on the timespan provided _df = ( _df.truncate(before=self.timespan[0], after=self.timespan[1]) if self.timespan else _df ) # Validate the input parameters self._validate_input() _df = self._fillna(_df) _df = self._smooth(_df) _df.dropna(inplace=True) return _df def _fillna(self, df): """ Fills NaN values in the time series data using a moving window average. """ if self.fillna: df[self.var] = fillna(df, self.var, self.fillna_window_size) return df def _smooth(self, df): """ Smooths the time series data using a moving window average. """ if self.smoothing: df[self.var] = smooth(df, self.var, self.smooth_window_size) return df def _validate_df_index(self): """ Validates the input DataFrame type and index. Raises: ------- TypeError: If the input DataFrame is not a pandas DataFrame. ValueError: If the input DataFrame index is not a datetime index. """ if not isinstance(self.original_df, pd.DataFrame): raise TypeError("df must be a pandas DataFrame") if not isinstance(self.original_df.index, pd.DatetimeIndex): raise ValueError("df index must be a datetime index") def _validate_variable(self): """ Validates the variable to be aggregated. Raises: ------- ValueError: If the variable is not found in the input DataFrame columns. """ if self.var not in self.original_df.columns: raise ValueError( f"Variable '{self.var}' not found in the input DataFrame columns." ) def _validate_fillna_smoothing(self): """ Validates the smoothing parameters. Raises: ------- ValueError: - If the window size is not provided when smoothing is enabled. TypeError: - If the smoothing parameter is not a boolean value. - If the window size parameter is not an integer value when smoothing is enabled. """ if any( [ self.fillna and self.fillna_window_size is None, self.smoothing and self.smooth_window_size is None, ] ): raise ValueError( "window size must be provided when 'fillna' or 'smoothing' is enabled" ) def _validate_input(self): """ Validates the input parameters. """ self._validate_df_index() self._validate_variable() self._validate_fillna_smoothing()
[docs] def aggregate(self, **kwargs): """ Aggregates the data based on the specified . """ return filter_df(self.df, **kwargs)
[docs] class MonthlyAggregator(Aggregator): """ Aggregates the time series data based on month-based time step. """ def __init__( self, df: pd.DataFrame, variable: str, fillna: bool = False, fillna_window_size: int = 3, smoothing=False, smooth_window_size=None, timespan: List[str] = None, agg_metric: str = "mean", ): super().__init__( df, variable, fillna, fillna_window_size, smoothing, smooth_window_size, timespan, agg_metric, )
[docs] def aggregate(self, **kwargs): self.resulted_df[f"{self.var}-{self.agg_metric}"] = monthly_agg( self.df, self.var, self.agg_metric ) return filter_df(self.resulted_df, **kwargs)
[docs] class DekadalAggregator(Aggregator): """ Aggregates the data based on dekad-based time step. """ def __init__( self, df: pd.DataFrame, variable: str, fillna: bool = False, fillna_window_size: int = None, smoothing=False, smooth_window_size=None, timespan: List[str] = None, agg_metric: str = "mean", ): super().__init__( df, variable, fillna, fillna_window_size, smoothing, smooth_window_size, timespan, agg_metric, )
[docs] def aggregate(self, **kwargs): self.resulted_df[f"{self.var}-{self.agg_metric}"] = dekadal_agg( self.df, self.var ) return filter_df(self.resulted_df, **kwargs)
[docs] class WeeklyAggregator(Aggregator): """ Aggregates the time series data based on week-based time step. """ def __init__( self, df: pd.DataFrame, variable: str, fillna: bool = False, fillna_window_size: int = None, smoothing=False, smooth_window_size=None, timespan: List[str] = None, agg_metric: str = "mean", ): super().__init__( df, variable, fillna, fillna_window_size, smoothing, smooth_window_size, timespan, agg_metric, )
[docs] def aggregate(self, **kwargs): self.resulted_df[f"{self.var}-{self.agg_metric}"] = weekly_agg( self.df, self.var ) return filter_df(self.resulted_df, **kwargs)
[docs] class BimonthlyAggregator(Aggregator): """ Aggregates the time series data based on bimonthly (twice a month) time step. """ def __init__( self, df: pd.DataFrame, variable: str, fillna: bool = False, fillna_window_size: int = None, smoothing=False, smooth_window_size=None, timespan: List[str] = None, agg_metric: str = "mean", ): super().__init__( df, variable, fillna, fillna_window_size, smoothing, smooth_window_size, timespan, agg_metric, )
[docs] def aggregate(self, **kwargs): self.resulted_df[f"{self.var}-{self.agg_metric}"] = bimonthly_agg( self.df, self.var ) return filter_df(self.resulted_df, **kwargs)
[docs] class DailyAggregator(Aggregator): """ Aggregates the time series data based on daily time step. """ def __init__( self, df: pd.DataFrame, variable: str, fillna: bool = False, fillna_window_size: int = None, smoothing=False, smooth_window_size=None, timespan: List[str] = None, agg_metric: str = "mean", ): super().__init__( df, variable, fillna, fillna_window_size, smoothing, smooth_window_size, timespan, agg_metric, )
[docs] def aggregate(self, **kwargs): self.resulted_df[f"{self.var}-{self.agg_metric}"] = self.df[self.var] return filter_df(self.resulted_df.drop_duplicates(), **kwargs)
[docs] class Climatology(Aggregator): """ A class for calculating climatology(climate normal) for time series data. Attributes: ----------- df_original: pd.DataFrame The original input DataFrame before resampling and removing NaN values. df: pd.DataFrame The input DataFrame containing the preprocessed data to be aggregated. variable: str The variable/column in the DataFrame to be aggregated. fillna: bool Fill NaN values in the time series data using a moving window average. fillna_window_size: int The size of the moving window for filling NaN values. It is recommended to be an odd number. smoothing: bool Smooth the time series data using a moving window average. smooth_window_size: int The size of the moving window for smoothing(n-days). It is recommended to be an odd number. timespan: list[str, str] optional The start and end dates for a timespan to be aggregated. Format: ['YYYY-MM-DD ] time_step: str The time step for aggregation. Supported values: 'day', 'week', 'dekad', 'bimonth', 'month'. agg_metric: str The aggregation metric to be used. Supported values: 'mean', 'median', 'min', 'max', 'std', etc. normal_metrics: List[str] The metrics to be used in the climatology computation. Supported values: 'mean', 'median', 'min', 'max', etc. clima_df: pd.DataFrame The DataFrame containing climatology information. Methods: -------- aggregate: Aggregates the data based on the time step and metrics provided. _validate_time_step: Validates the time step. _validate_metrics: Validates the metrics to be used in the climatology computation. compute_normals: Calculates climatology based on the aggregated data. """ def __init__( self, df: pd.DataFrame, variable: str, fillna: bool = False, fillna_window_size: int = None, smoothing=False, smooth_window_size=None, timespan: List[str] = None, time_step: str = "month", normal_metrics: List[str] = ["mean"], agg_metric: str = "mean", ): """ Initializes the Climatology class. """ self.time_step = time_step self.normal_metrics = normal_metrics self.valid_time_steps = ["month", "dekad", "week", "day", "bimonth"] self.valid_metrics = ["mean", "median", "min", "max", "std"] super().__init__( df, variable, fillna, fillna_window_size, smoothing, smooth_window_size, timespan, agg_metric, ) self.clim_df = pd.DataFrame() def _validate_time_step( self, ) -> None: """ Validates the time step. Raises: ------- ValueError: If the time step is not one of the supported values. """ if self.time_step not in self.valid_time_steps: raise ValueError( f"Invalid time step '{self.time_step}'. Supported values: {self.valid_time_steps}." ) def _validate_metrics(self): """ Validates the metrics to be used in the climatology computation. Raises: ------- ValueError: If the metric is not one of the supported values. """ for metric in self.normal_metrics: if metric not in self.valid_metrics: raise ValueError( f"Invalid metric '{metric}'. Supported values: {self.valid_metrics}." ) def _validate_input(self): super()._validate_input() self._validate_time_step() self._validate_metrics()
[docs] def aggregate(self): """ Aggregates the data based on the time step and metrics provided. """ params = { "df": self.df, "variable": self.var, "fillna": self.fillna, "fillna_window_size": self.fillna_window_size, "smoothing": self.smoothing, "smooth_window_size": self.smooth_window_size, "timespan": self.timespan, "agg_metric": self.agg_metric, } if self.time_step == "month": return MonthlyAggregator(**params).aggregate() elif self.time_step == "week": return WeeklyAggregator(**params).aggregate() elif self.time_step == "dekad": return DekadalAggregator(**params).aggregate() elif self.time_step == "bimonth": return BimonthlyAggregator(**params).aggregate() elif self.time_step == "day": return DailyAggregator(**params).aggregate()
[docs] def compute_normals(self, **kwargs) -> pd.DataFrame: """ Calculates climatology based on the aggregated data. Parameters: ----------- kwargs: Additional time/date filtering parameters. Returns: -------- pd.DataFrame The DataFrame containing climatology information. """ self.clim_df = compute_clim( self.aggregate(), self.time_step, f"{self.var}-{self.agg_metric}", self.normal_metrics, ) return filter_df(self.clim_df, **kwargs)
[docs] def plot_ts( self, df=None, x_axis=None, colmns_kwargs=None, plot_raw=False, raw_resample="D", raw_kwargs=None, plot_style="ggplot", **kwargs, ): """ Plot the time series data for the provided dataframe. parameters: ----------- df: pd.DataFrame The dataframe containing the data to plot. or None if the climatology object is used. x_axis: list The x-axis values for the plot. or None if the climatology object is used. colmns_kwargs: dict The dictionary containing the column names and their respective matplotlib plot options. plot_raw: bool Whether to plot the raw data on the plot as background. raw_resample: str The resample frequency for the raw data. Supported values: 'D', 'W', 'M', etc. raw_kwargs: dict The dictionary containing the matplotlib plot options for the raw data. kwargs: dict The keyword arguments for the matplotlib plot for the figure such as title, xlabel, ylabel, legend, figsize, and grid. """ # Set values for kwargs based on provided values plt.style.use(plot_style) df = self.compute_normals() if df is None else df x_axis = df.index if x_axis is None else x_axis colmns_kwargs = ( { f"{self.var}-{self.agg_metric}": { "label": f"{self.var}-{self.agg_metric}" } } if colmns_kwargs is None else colmns_kwargs ) plot_params = get_plot_options(**kwargs) if plot_params["figsize"] is not None: plt.figure(figsize=plot_params["figsize"]) if plot_raw: raw_df = ( self.original_df.resample(raw_resample).mean() if raw_resample else self.original_df ) plt.plot( raw_df.index, raw_df[f"{self.var}"], **raw_kwargs if raw_kwargs else {"alpha": 0.5, "label": "Raw Data"}, ) plot_colmns(df, x_axis, colmns_kwargs) plot_figure(plot_params)