from typing import Union, List
import pandas as pd
[docs]
def fillna(df: pd.DataFrame, variable: str, fillna_window_size: int) -> pd.DataFrame:
"""
Fills NaN values in the time series data using a moving window average.
Parameters:
-----------
df: pd.DataFrame
The DataFrame containing the time series data to be filled indexed by datetime index.
variable: str
The variable/column in the DataFrame to be filled.
fillna_window_size: int
The size of the moving window [days] for filling NaN values. It is recommended to be an odd number.
Returns:
--------
pd.DataFrame
The DataFrame containing the filled time series data.
"""
df[variable] = df[variable].fillna(
df[variable]
.rolling(window=fillna_window_size, center=True, min_periods=1)
.mean()
)
return df
[docs]
def smooth(df: pd.DataFrame, variable: str, window_size: int) -> pd.DataFrame:
"""
Smooths the time series data using a moving window average.
Parameters:
-----------
df: pd.DataFrame
The DataFrame containing the time series data to be smoothed indexed by datetime index.
variable: str
The variable/column in the DataFrame to be smoothed.
window_size: int
The size of the moving window [days] for smoothing(. It is recommended to be an odd number.
Returns:
--------
pd.DataFrame
The DataFrame containing the smoothed time series data.
"""
df[variable] = (
df[variable].rolling(window=window_size, center=True, min_periods=1).mean()
)
return df
[docs]
def filter_df(
df: pd.DataFrame = None,
year: Union[int, None] = None,
month: Union[int, None] = None,
dekad: Union[int, None] = None,
bimonth: Union[int, None] = None,
day: Union[int, None] = None,
week: Union[int, None] = None,
start_date: Union[str, None] = None,
end_date: Union[str, None] = None,
) -> pd.DataFrame:
"""
Filters the DataFrame based on specified time/date conditions.
Parameters:
-----------
df: pd.DataFrame, optional
The DataFrame to be filtered. It should be indexed by a datetime index.
year: int or None, optional
The year to filter the DataFrame.
month: int or None, optional
The month to filter the DataFrame.
bimonth: int or None, optional
The bimonth to filter the DataFrame.
dekad: int or None, optional
The dekad to filter the DataFrame.
week: int or None, optional
The week to filter the DataFrame.
day: int or None, optional
The day to filter the DataFrame.
start_date: str or None, optional
The start date for filtering.
end_date: str or None, optional
The end date for filtering.
Returns:
--------
pd.DataFrame
The filtered DataFrame.
"""
if df is None:
print("No dataframe provided")
return pd.DataFrame()
if start_date and end_date:
df = df.truncate(before=start_date, after=end_date)
if year:
df = df[df.index.year == year]
if month:
df = df[df.index.month == month]
if bimonth:
if bimonth not in [1, 2]:
raise ValueError("Invalid bimonth value. Supported values: 1, 2")
if "bimonth" not in df.columns:
df["bimonth"] = df.index.map(lambda x: 1 if x.day <= 15 else 2)
df = df[df["bimonth"] == bimonth]
if dekad:
if dekad not in [1, 2, 3]:
raise ValueError("Invalid dekad value. Supported values: 1, 2, 3")
if "dekad" not in df.columns:
df["dekad"] = df.index.map(
lambda x: 1 if x.day <= 10 else 2 if x.day <= 20 else 3
)
df = df[df["dekad"] == dekad]
if week:
df = df[df.index.isocalendar().week == week]
if day:
df = df[df.index.day == day]
return df
[docs]
def monthly_agg(df: pd.DataFrame, variable: str, agg_metric="mean") -> pd.DataFrame:
"""
Aggregates the time series data based on month-based time step.
Parameters:
-----------
df: pd.DataFrame
The DataFrame containing the time series data to be aggregated indexed by datetime index.
variable: str
The variable/column in the DataFrame to be aggregated.
agg_metric: str
The aggregation metric to be used. Supported values: 'mean', 'median', 'min', 'max', etc.
Returns:
--------
pd.DataFrame
The DataFrame containing the aggregated data.
"""
return df.resample("ME")[variable].agg(agg_metric)
[docs]
def dekadal_agg(df: pd.DataFrame, variable: str, agg_metric="mean") -> pd.DataFrame:
"""
Aggregates the time series data based on dekad-based time step.
Parameters:
-----------
df: pd.DataFrame
The DataFrame containing the time series data to be aggregated indexed by datetime index.
variable: str
The variable/column in the DataFrame to be aggregated.
agg_metric: str
The aggregation metric to be used. Supported values: 'mean', 'median', 'min', 'max', etc.
Returns:
--------
pd.DataFrame
The DataFrame containing the aggregated data.
"""
df["dekad"] = df.index.map(lambda x: 1 if x.day <= 10 else 2 if x.day <= 20 else 3)
return (
df.groupby([df.index.year, df.index.month, "dekad"])[variable]
.transform(agg_metric)
.drop_duplicates()
)
[docs]
def weekly_agg(df: pd.DataFrame, variable: str, agg_metric="mean") -> pd.DataFrame:
"""
Aggregates the time series data based on week-based time step.
Parameters:
-----------
df: pd.DataFrame
The DataFrame containing the time series data to be aggregated indexed by datetime index.
variable: str
The variable/column in the DataFrame to be aggregated.
agg_metric: str
The aggregation metric to be used. Supported values: 'mean', 'median', 'min', 'max', etc.
Returns:
--------
pd.DataFrame
The DataFrame containing the aggregated data.
"""
return (
df.groupby([df.index.year, df.index.isocalendar().week])[variable]
.transform(agg_metric)
.drop_duplicates()
)
[docs]
def bimonthly_agg(df: pd.DataFrame, variable: str, agg_metric="mean") -> pd.DataFrame:
"""
Aggregates the time series data based on bimonth-based time step.
Parameters:
-----------
df: pd.DataFrame
The DataFrame containing the time series data to be aggregated indexed by datetime index.
variable: str
The variable/column in the DataFrame to be aggregated.
agg_metric: str
The aggregation metric to be used. Supported values: 'mean', 'median', 'min', 'max', etc.
Returns:
--------
pd.DataFrame
The DataFrame containing the aggregated data.
"""
if "bimonth" not in df.columns:
df["bimonth"] = df.index.map(lambda x: 1 if x.day <= 15 else 2)
return (
df.groupby([df.index.year, df.index.month, df["bimonth"]])[variable]
.transform(agg_metric)
.drop_duplicates()
)
[docs]
def clim_groupping(df: pd.DataFrame, time_step: str) -> list:
"""
Groups the DataFrame based on the provided time step for climatology computation.
parameters:
-----------
df: pd.DataFrame
The DataFrame to be grouped.
returns:
--------
list
The list of date parameters to be used for grouping.
"""
if time_step == "month":
return [df.index.month]
elif time_step == "week":
return [df.index.isocalendar().week]
elif time_step == "dekad":
if "dekad" not in df.columns:
df["dekad"] = df.index.map(
lambda x: 1 if x.day <= 10 else 2 if x.day <= 20 else 3
)
return [df["dekad"], df.index.month]
elif time_step == "bimonth":
if "bimonth" not in df.columns:
df["bimonth"] = df.index.map(lambda x: 1 if x.day <= 15 else 2)
return [df["bimonth"], df.index.month]
elif time_step == "day":
return [df.index.day, df.index.month]
[docs]
def compute_clim(
df: pd.DataFrame, time_step: str, variable: str, metrics: List[str]
) -> pd.DataFrame:
"""
Computes the climatology of the time series data based on the provided time step.
Parameters:
-----------
df: pd.DataFrame
The DataFrame containing the time series data to be aggregated indexed by datetime index.
time_step: str
The time step to be used for computing the climatology. Supported values: 'month', 'week', 'dekad', 'bimonth', 'day'
variable: str
The variable/column in the DataFrame to be aggregated.
metrics: List[str]
The metrics to be computed. Supported values: 'mean', 'median', 'min', 'max', etc.
Returns:
--------
pd.DataFrame
The DataFrame containing the climatology data.
"""
for metric in metrics:
df["norm-" + metric] = df.groupby(clim_groupping(df, time_step))[
variable
].transform(metric)
return df