"""
Extreme values analysis for statistical data
This module provides tools for analyzing extreme values in time series data,
including return period and return value analysis.
"""
import numpy as np
import pandas as pd
import warnings
from typing import Union, Optional, Dict, Any, Tuple
from scipy import stats
from .data_processor import DataProcessor
[docs]
class ExtremesAnalyzer:
"""
Extreme values analysis with return period and return value calculations.
This class analyzes extreme values in time series data, calculating return
periods and return values using statistical distributions fitted to the data.
The analyzer supports multiple input formats:
- Pandas Series with datetime index
- Pandas DataFrame with time column
- Paired numpy arrays (times, values)
- Simple numpy array (assumes uniform time spacing)
Parameters
----------
data_processor : DataProcessor
Processor instance with loaded data
times : array-like, optional
Time values corresponding to data points. Can be:
- datetime array/Series
- numeric array (e.g., years, days)
- None if data is pandas Series with datetime index
time_unit : str, default='years'
Time unit for return period calculations ('years', 'days', 'hours', 'months')
Examples
--------
>>> import numpy as np
>>> import pandas as pd
>>> import magica as ma
>>>
>>> # Using pandas Series with datetime index
>>> dates = pd.date_range('2000-01-01', periods=1000, freq='D')
>>> values = np.random.weibull(2, 1000) * 10
>>> series = pd.Series(values, index=dates)
>>>
>>> processor = ma.read_data(series)
>>> extremes = processor.get_extremes_analyzer()
>>> extremes.fit_distribution('genextreme')
>>>
>>> # Calculate 100-year return value
>>> rv_100 = extremes.return_value(100)
>>> print(f"100-year return value: {rv_100:.2f}")
"""
[docs]
def __init__(
self,
data_processor: DataProcessor,
times: Optional[Union[np.ndarray, pd.Series, pd.DatetimeIndex]] = None,
time_unit: str = 'years'
):
"""
Initialize ExtremesAnalyzer with data and time information.
Parameters
----------
data_processor : DataProcessor
The data processor containing values to analyze
times : array-like, optional
Time values or datetime index
time_unit : str, default='years'
Unit for return period calculations
"""
if data_processor.data is None:
raise ValueError("DataProcessor must contain data before extreme analysis")
self.data_processor = data_processor
self.data = data_processor.get_data_array()
self.time_unit = time_unit
# Handle different time input formats
self._process_times(times)
# Internal adjuster for distribution fitting
self._adjuster = None
self.distribution_name = None
self.fitted_params = None
def _process_times(self, times: Optional[Union[np.ndarray, pd.Series, pd.DatetimeIndex]]):
"""
Process time information from various input formats.
Parameters
----------
times : array-like or None
Time information in various formats
"""
if times is None:
# Check if original data was pandas Series with datetime index
if hasattr(self.data_processor, '_original_data'):
original = self.data_processor._original_data
if isinstance(original, pd.Series) and isinstance(original.index, pd.DatetimeIndex):
times = original.index
if times is None:
# No time information - assume uniform spacing
warnings.warn(
"No time information provided. Assuming uniform time spacing. "
"Return periods will be in units of observation count."
)
self.times = np.arange(len(self.data))
self.has_datetime = False
self.time_span = len(self.data)
elif isinstance(times, pd.DatetimeIndex):
self.times = times
self.has_datetime = True
self.time_span = self._calculate_time_span(times)
elif isinstance(times, pd.Series):
if pd.api.types.is_datetime64_any_dtype(times):
self.times = pd.DatetimeIndex(times)
self.has_datetime = True
self.time_span = self._calculate_time_span(self.times)
else:
self.times = times.values
self.has_datetime = False
self.time_span = np.ptp(self.times) # max - min
else:
# Numpy array or list
times_array = np.array(times)
if np.issubdtype(times_array.dtype, np.datetime64):
self.times = pd.DatetimeIndex(times_array)
self.has_datetime = True
self.time_span = self._calculate_time_span(self.times)
else:
self.times = times_array
self.has_datetime = False
self.time_span = np.ptp(self.times) if len(times_array) > 1 else len(times_array)
def _calculate_time_span(self, times: pd.DatetimeIndex) -> float:
"""
Calculate time span in specified units from datetime index.
Parameters
----------
times : pd.DatetimeIndex
Datetime index
Returns
-------
float
Time span in specified units
"""
delta = times[-1] - times[0]
if self.time_unit == 'years':
return delta.total_seconds() / (365.25 * 24 * 3600)
elif self.time_unit == 'days':
return delta.total_seconds() / (24 * 3600)
elif self.time_unit == 'hours':
return delta.total_seconds() / 3600
elif self.time_unit == 'months':
return delta.total_seconds() / (30.44 * 24 * 3600) # Average month
else:
raise ValueError(f"Unknown time unit: {self.time_unit}")
def _get_adjuster(self):
"""Get or create the internal adjuster."""
if self._adjuster is None:
from .magic_adjuster import MagicAdjuster
self._adjuster = MagicAdjuster(self.data_processor)
return self._adjuster
[docs]
def fit_distribution(self, distribution: Union[str, object], **kwargs) -> 'ExtremesAnalyzer':
"""
Fit a statistical distribution for extreme value analysis.
This method can be called directly on ExtremesAnalyzer to fit
distributions to the extreme data, without requiring a prior
fit on the underlying DataProcessor.
Common distributions for extremes:
- 'genextreme' - Generalized Extreme Value (GEV)
- 'gumbel_r' - Gumbel distribution (Type I extreme)
- 'gumbel_l' - Gumbel left (minimum extremes)
- 'weibull_min' - Weibull (minimum extremes)
- 'weibull_max' - Weibull (maximum extremes)
- 'genpareto' - Generalized Pareto Distribution (GPD, for POT)
Parameters
----------
distribution : str or scipy.stats distribution
Distribution to fit
**kwargs : dict
Additional arguments passed to fit method
Returns
-------
ExtremesAnalyzer
Self for method chaining
Examples
--------
>>> # Direct fitting on extremes analyzer
>>> extremes = processor.get_extremes_analyzer()
>>> annual_max, times = extremes.extract_block_maxima('Y')
>>>
>>> # Create new processor with maxima and fit
>>> maxima_proc = ma.read_data(annual_max)
>>> maxima_extremes = maxima_proc.get_extremes_analyzer()
>>> maxima_extremes.fit_distribution('genextreme')
>>> rv_100 = maxima_extremes.return_value(100)
"""
adjuster = self._get_adjuster()
adjuster.fit_distribution(distribution, **kwargs)
self.distribution_name = adjuster.distribution_name
self.fitted_params = adjuster.fitted_params
return self
[docs]
def return_value(self, return_period: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
Calculate return value for given return period(s).
The return value is the value expected to be exceeded once every
T time units on average, where T is the return period.
Parameters
----------
return_period : float or array-like
Return period(s) in time_unit units (e.g., years)
Returns
-------
float or ndarray
Return value(s) corresponding to the return period(s)
Examples
--------
>>> # Single return value
>>> rv_100 = extremes.return_value(100) # 100-year return value
>>>
>>> # Multiple return values
>>> periods = [10, 50, 100, 500]
>>> rv = extremes.return_value(periods)
"""
if self._adjuster is None or self.fitted_params is None:
raise ValueError(
"Must fit a distribution before calculating return values. "
"Use extremes.fit_distribution('genextreme') or similar first."
)
# Convert return period to exceedance probability
# P(X > x) = 1/T => P(X <= x) = 1 - 1/T
return_period = np.asarray(return_period)
exceedance_prob = 1.0 / return_period
non_exceedance_prob = 1.0 - exceedance_prob
# Calculate quantile (return value) for non-exceedance probability
return_values = self._adjuster.ppf(non_exceedance_prob)
return return_values
[docs]
def return_period(self, value: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
Calculate return period for given value(s).
The return period is the average time interval between exceedances
of the given value.
Parameters
----------
value : float or array-like
Value(s) for which to calculate return period
Returns
-------
float or ndarray
Return period(s) in time_unit units
Examples
--------
>>> # Return period for specific value
>>> rp = extremes.return_period(25.0)
>>> print(f"A value of 25 has a return period of {rp:.1f} years")
>>>
>>> # Return periods for multiple values
>>> values = [20, 25, 30, 35]
>>> rp = extremes.return_period(values)
"""
if self._adjuster is None or self.fitted_params is None:
raise ValueError(
"Must fit a distribution before calculating return periods. "
"Use extremes.fit_distribution('genextreme') or similar first."
)
# Calculate exceedance probability: P(X > value)
value = np.asarray(value)
exceedance_prob = 1.0 - self._adjuster.cdf(value)
# Return period T = 1 / P(exceedance)
# Avoid division by zero
return_periods = np.where(
exceedance_prob > 0,
1.0 / exceedance_prob,
np.inf
)
return return_periods
[docs]
def ppf(self, q: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
Percent point function (inverse of CDF) of the fitted distribution.
Parameters
----------
q : float or array-like
Probability value(s) between 0 and 1
Returns
-------
float or ndarray
Quantile(s) corresponding to the probability value(s)
"""
if self._adjuster is None or self.fitted_params is None:
raise ValueError(
"Must fit a distribution before using ppf. "
"Use extremes.fit_distribution('genextreme') or similar first."
)
return self._adjuster.ppf(q)
[docs]
def cdf(self, x: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
Cumulative distribution function of the fitted distribution.
Parameters
----------
x : float or array-like
Value(s) at which to evaluate the CDF
Returns
-------
float or ndarray
Probability value(s) corresponding to x
"""
if self._adjuster is None or self.fitted_params is None:
raise ValueError(
"Must fit a distribution before using cdf. "
"Use extremes.fit_distribution('genextreme') or similar first."
)
return self._adjuster.cdf(x)
[docs]
def pdf(self, x: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
Probability density function of the fitted distribution.
Parameters
----------
x : float or array-like
Value(s) at which to evaluate the PDF
Returns
-------
float or ndarray
Probability density value(s) corresponding to x
"""
if self._adjuster is None or self.fitted_params is None:
raise ValueError(
"Must fit a distribution before using pdf. "
"Use extremes.fit_distribution('genextreme') or similar first."
)
return self._adjuster.pdf(x)
[docs]
def goodness_of_fit(self, method: str, **kwargs):
"""
Perform goodness-of-fit test for the fitted distribution.
Parameters
----------
method : str
Test method: 'ks' (Kolmogorov-Smirnov), 'chi2' (Chi-square),
'ad' (Anderson-Darling), or 'rmse' (Root Mean Square Error)
**kwargs : dict
Additional arguments passed to the goodness-of-fit method
Returns
-------
dict or float
Test results (dict for statistical tests, float for RMSE)
Examples
--------
>>> extremes.fit_distribution('genextreme')
>>> ks_test = extremes.goodness_of_fit('ks')
>>> print(f"KS statistic: {ks_test['ks_statistic']:.4f}")
>>> print(f"P-value: {ks_test['p_value']:.4f}")
"""
if self._adjuster is None or self.fitted_params is None:
raise ValueError(
"Must fit a distribution before performing goodness-of-fit test. "
"Use extremes.fit_distribution('genextreme') or similar first."
)
return self._adjuster.goodness_of_fit(method, **kwargs)
[docs]
def peaks_over_threshold(
self,
threshold: float,
min_separation: Optional[Union[str, pd.Timedelta, int, float]] = None,
event_wise: bool = False
) -> Tuple[np.ndarray, Optional[Union[pd.DatetimeIndex, np.ndarray]]]:
"""
Extract peaks over threshold (POT) from time series.
This method is used for GPD (Generalized Pareto Distribution) analysis.
Parameters
----------
threshold : float
Threshold value for peak detection
min_separation : str, Timedelta, int, or float, optional
Minimum time separation between peaks to avoid clustering.
Can be:
- String: '1D', '12H', etc. (pandas time string)
- pd.Timedelta: pd.Timedelta(days=1)
- Numeric: interpreted as days (e.g., 3 means 3 days)
- 'event': Use event-wise declustering (requires event_wise=True)
If None, all exceedances are returned.
event_wise : bool, default=False
If True, identifies consecutive periods above threshold as single events
and returns only the maximum value from each event. This is useful for
identifying storm events where multiple consecutive days exceed the threshold.
When True, min_separation is ignored.
Returns
-------
exceedances : ndarray
Values exceeding the threshold
times : DatetimeIndex or ndarray or None
Times of exceedances
Examples
--------
>>> # Extract all peaks over threshold
>>> peaks, times = extremes.peaks_over_threshold(threshold=20.0)
>>>
>>> # Extract peaks with minimum 1-day separation (numeric)
>>> peaks, times = extremes.peaks_over_threshold(threshold=20.0, min_separation=1)
>>>
>>> # Extract peaks with minimum 1-day separation (string)
>>> peaks, times = extremes.peaks_over_threshold(
... threshold=20.0,
... min_separation='1D'
... )
>>>
>>> # Extract one peak per consecutive event (storm-wise)
>>> peaks, times = extremes.peaks_over_threshold(threshold=20.0, event_wise=True)
"""
# Find values exceeding threshold
exceed_mask = self.data > threshold
if not self.has_datetime:
exceedances = self.data[exceed_mask]
exceed_times = None
if min_separation is not None or event_wise:
warnings.warn(
"min_separation and event_wise require datetime information. "
"Returning all exceedances without declustering."
)
return exceedances, exceed_times
# Event-wise declustering: identify consecutive periods as single events
if event_wise:
# Create pandas Series for easier manipulation
series = pd.Series(self.data, index=self.times)
# Identify groups of consecutive exceedances
# Create a group ID that increments when there's a gap
exceed_series = series[exceed_mask]
if len(exceed_series) == 0:
return np.array([]), pd.DatetimeIndex([])
# Calculate time differences between consecutive exceedances
time_diffs = exceed_series.index.to_series().diff()
# Expected time step (most common difference in original series)
original_diffs = pd.Series(self.times).diff().dropna()
expected_step = original_diffs.mode()[0] if len(original_diffs) > 0 else pd.Timedelta(days=1)
# A new event starts when the gap is larger than expected step
# (i.e., there's at least one day below threshold)
is_new_event = time_diffs > expected_step
event_ids = is_new_event.cumsum()
# For each event, get the maximum value and its time
event_maxima = []
event_times = []
for event_id in event_ids.unique():
event_data = exceed_series[event_ids == event_id]
max_idx = event_data.idxmax()
event_maxima.append(event_data[max_idx])
event_times.append(max_idx)
return np.array(event_maxima), pd.DatetimeIndex(event_times)
# Regular declustering with min_separation
exceedances = self.data[exceed_mask]
exceed_times = self.times[exceed_mask]
if min_separation is None:
return exceedances, exceed_times
# Convert min_separation to Timedelta
if isinstance(min_separation, str):
min_separation = pd.Timedelta(min_separation)
elif isinstance(min_separation, (int, float)):
# Interpret numeric values as days
min_separation = pd.Timedelta(days=min_separation)
# Decluster algorithm
keep_indices = [0] # Always keep first peak
for i in range(1, len(exceed_times)):
time_diff = exceed_times[i] - exceed_times[keep_indices[-1]]
if time_diff >= min_separation:
keep_indices.append(i)
declustered_values = exceedances[keep_indices]
declustered_times = exceed_times[keep_indices]
return declustered_values, declustered_times
[docs]
def get_summary_statistics(self) -> Dict[str, Any]:
"""
Get summary statistics for extreme value analysis.
Returns
-------
dict
Dictionary with summary statistics including:
- n_observations: Number of data points
- time_span: Total time span in time_unit units
- time_unit: Time unit used for calculations
- max: Maximum value in dataset
- min: Minimum value in dataset
- mean: Mean value
- std: Standard deviation
- percentile_95: 95th percentile
- percentile_99: 99th percentile
- has_datetime: Whether datetime information is available
- distribution_name: Fitted distribution name (if fitted)
- distribution_params: Fitted parameters (if fitted)
"""
summary = {
'n_observations': len(self.data),
'time_span': self.time_span,
'time_unit': self.time_unit,
'max': float(np.max(self.data)),
'min': float(np.min(self.data)),
'mean': float(np.mean(self.data)),
'std': float(np.std(self.data)),
'percentile_95': float(np.percentile(self.data, 95)),
'percentile_99': float(np.percentile(self.data, 99)),
'has_datetime': self.has_datetime,
'distribution_name': self.distribution_name
}
if self.has_datetime:
summary['start_date'] = str(self.times[0])
summary['end_date'] = str(self.times[-1])
if self.fitted_params is not None:
summary['distribution_params'] = self.fitted_params
return summary
[docs]
def plot_return_levels(
self,
ax=None,
return_periods: Optional[np.ndarray] = None,
empirical: bool = True,
confidence_level: Optional[float] = None,
title: Optional[str] = None
):
"""
Plot return level plot (return value vs return period).
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes object to plot on. If None, creates a new figure and axes.
return_periods : array-like, optional
Return periods to plot. If None, uses logarithmic spacing.
empirical : bool, default=True
Whether to include empirical return values
confidence_level : float, optional
Confidence level for confidence intervals (e.g., 0.95)
title : str, optional
Plot title. If None, uses default title.
Returns
-------
fig : matplotlib.figure.Figure
Figure object (None if ax was provided)
ax : matplotlib.axes.Axes
Axes object
Examples
--------
>>> # Create standalone plot
>>> fig, ax = extremes.plot_return_levels()
>>>
>>> # Plot on existing axes
>>> fig, ax = plt.subplots()
>>> extremes.plot_return_levels(ax=ax, title='Custom Title')
"""
import matplotlib.pyplot as plt
if self._adjuster is None or self.fitted_params is None:
raise ValueError("Must fit a distribution before plotting return levels")
# Create figure if ax not provided
if ax is None:
fig, ax = plt.subplots(figsize=(10, 6))
else:
fig = None
# Default return periods
if return_periods is None:
return_periods = np.logspace(0, 3, 50) # 1 to 1000
# Calculate theoretical return values
theoretical_rv = self.return_value(return_periods)
# Plot theoretical curve
ax.plot(return_periods, theoretical_rv, 'r-', linewidth=2,
label=f'Theoretical ({self.distribution_name})')
# Plot empirical points if requested
if empirical:
# Sort data and calculate empirical return periods
sorted_data = np.sort(self.data)[::-1] # Descending order
n = len(sorted_data)
empirical_rp = (n + 1) / np.arange(1, n + 1)
ax.plot(empirical_rp, sorted_data, 'bo', alpha=0.6,
markersize=4, label='Empirical')
ax.set_xlabel(f'Return Period ({self.time_unit})')
ax.set_ylabel('Return Value')
ax.set_title(title if title else 'Return Level Plot')
ax.set_xscale('log')
ax.grid(True, alpha=0.3, which='both')
ax.legend()
if fig is not None:
plt.tight_layout()
return fig, ax
[docs]
def find_optimal_pot_threshold(
self,
min_samples: int = 50,
percentile_min: float = 90,
percentile_max: float = 99,
percentile_step: float = 1.0,
min_separation_hours: float = 48,
max_separation_hours: float = 120,
separation_step_hours: float = 24,
vary_first: str = 'percentile',
max_iterations: int = 200,
verbose: bool = False
) -> Dict[str, Any]:
"""
Find optimal POT threshold ensuring sufficient independent samples.
This function systematically searches for a threshold that provides
at least `min_samples` independent exceedances after declustering.
The search can prioritize varying percentiles or time separation first.
Parameters
----------
min_samples : int, default=50
Minimum number of independent exceedances required
percentile_min : float, default=90
Minimum percentile to test (lower bound)
percentile_max : float, default=99
Maximum percentile to test (upper bound, starting point)
percentile_step : float, default=1.0
Step size for decreasing percentile
min_separation_hours : float, default=48
Minimum time separation between peaks in hours (starting point)
max_separation_hours : float, default=120
Maximum time separation to test in hours
separation_step_hours : float, default=24
Step size for increasing separation in hours
vary_first : str, default='percentile'
Which parameter to vary first: 'percentile' or 'separation'
- 'percentile': Varies percentile from max to min, then increases separation
- 'separation': Varies separation from min to max, then decreases percentile
max_iterations : int, default=200
Maximum total search iterations to prevent infinite loops
verbose : bool, default=False
Print progress information during search
Returns
-------
dict
Dictionary with keys:
- 'threshold': Selected threshold value (float)
- 'percentile': Percentile of threshold (float)
- 'separation_hours': Time separation used in hours (float)
- 'n_raw_exceedances': Total exceedances before declustering (int)
- 'n_independent': Independent exceedances after declustering (int)
- 'exceedances': Array of independent exceedance values (ndarray)
- 'exceedance_times': Times of independent exceedances (DatetimeIndex or ndarray)
- 'success': Whether minimum samples achieved (bool)
- 'iterations': Number of iterations performed (int)
- 'warning': Warning message if any (str, optional)
Examples
--------
>>> # Default search (vary percentile first, 99->90, then increase separation)
>>> result = extremes.find_optimal_pot_threshold(min_samples=50)
>>> print(f"Threshold: {result['threshold']:.2f}")
>>> print(f"Independent samples: {result['n_independent']}")
>>>
>>> # Vary separation first, then percentile
>>> result = extremes.find_optimal_pot_threshold(
... min_samples=50,
... vary_first='separation'
... )
>>>
>>> # Custom percentile and separation ranges
>>> result = extremes.find_optimal_pot_threshold(
... min_samples=30,
... percentile_min=85,
... percentile_max=98,
... min_separation_hours=24,
... max_separation_hours=96,
... verbose=True
... )
Notes
-----
**Search Strategy:**
When `vary_first='percentile'` (default):
1. Start with min_separation_hours and percentile_max
2. Decrease percentile by percentile_step until percentile_min
3. If not enough samples, increase separation by separation_step_hours
4. Repeat percentile search with new separation
5. Continue until max_separation_hours or min_samples achieved
When `vary_first='separation'`:
1. Start with percentile_max and min_separation_hours
2. Increase separation by separation_step_hours until max_separation_hours
3. If not enough samples, decrease percentile by percentile_step
4. Repeat separation search with new percentile
5. Continue until percentile_min or min_samples achieved
**Best Practices:**
- For synoptic events (storms): min_separation_hours >= 48
- For sub-daily events: min_separation_hours >= 12
- Typical percentile range: 90-99 for extremes
- Ensure min_samples >= 30 for reliable fitting (50+ recommended)
"""
if not self.has_datetime:
raise ValueError(
"POT threshold search requires datetime information. "
"Provide times when creating ExtremesAnalyzer."
)
if vary_first not in ['percentile', 'separation']:
raise ValueError("vary_first must be 'percentile' or 'separation'")
# Validate ranges
if percentile_min >= percentile_max:
raise ValueError("percentile_min must be < percentile_max")
if min_separation_hours >= max_separation_hours:
raise ValueError("min_separation_hours must be < max_separation_hours")
# Initialize search variables
iteration = 0
best_result = None
if vary_first == 'percentile':
# Strategy: Vary percentile first, then increase separation
separations = np.arange(min_separation_hours, max_separation_hours + 1,
separation_step_hours)
for separation_hours in separations:
if verbose:
print(f"\n--- Testing separation: {separation_hours}h ---")
# Test percentiles from max to min
current_percentile = percentile_max
while current_percentile >= percentile_min:
iteration += 1
if iteration > max_iterations:
break
# Calculate threshold
threshold = np.percentile(self.data, current_percentile)
# Count raw exceedances
n_raw = (self.data > threshold).sum()
if n_raw == 0:
current_percentile -= percentile_step
continue
try:
# Extract POT with current separation
sep_days = separation_hours / 24
exceedances, exc_times = self.peaks_over_threshold(
threshold=threshold,
min_separation=sep_days
)
n_independent = len(exceedances)
# Store result
result = {
'threshold': threshold,
'percentile': current_percentile,
'separation_hours': separation_hours,
'n_raw_exceedances': n_raw,
'n_independent': n_independent,
'exceedances': exceedances,
'exceedance_times': exc_times,
'success': n_independent >= min_samples,
'iterations': iteration
}
# Update best result
if best_result is None or n_independent > best_result['n_independent']:
best_result = result.copy()
if verbose:
status = "✓" if result['success'] else "•"
print(f"{status} p={current_percentile:.1f}, "
f"thresh={threshold:.2f}, n={n_independent}")
# Check if we found enough samples
if n_independent >= min_samples:
return result
except Exception as e:
if verbose:
print(f"✗ p={current_percentile:.1f}: {str(e)}")
# Decrease percentile
current_percentile -= percentile_step
if iteration > max_iterations:
break
else: # vary_first == 'separation'
# Strategy: Vary separation first, then decrease percentile
percentiles = np.arange(percentile_max, percentile_min - percentile_step,
-percentile_step)
for percentile in percentiles:
if verbose:
print(f"\n--- Testing percentile: {percentile:.1f} ---")
# Test separations from min to max
current_separation = min_separation_hours
while current_separation <= max_separation_hours:
iteration += 1
if iteration > max_iterations:
break
# Calculate threshold
threshold = np.percentile(self.data, percentile)
# Count raw exceedances
n_raw = (self.data > threshold).sum()
if n_raw == 0:
break # No exceedances at this percentile
try:
# Extract POT with current separation
sep_days = current_separation / 24
exceedances, exc_times = self.peaks_over_threshold(
threshold=threshold,
min_separation=sep_days
)
n_independent = len(exceedances)
# Store result
result = {
'threshold': threshold,
'percentile': percentile,
'separation_hours': current_separation,
'n_raw_exceedances': n_raw,
'n_independent': n_independent,
'exceedances': exceedances,
'exceedance_times': exc_times,
'success': n_independent >= min_samples,
'iterations': iteration
}
# Update best result
if best_result is None or n_independent > best_result['n_independent']:
best_result = result.copy()
if verbose:
status = "✓" if result['success'] else "•"
print(f"{status} sep={current_separation}h, "
f"thresh={threshold:.2f}, n={n_independent}")
# Check if we found enough samples
if n_independent >= min_samples:
return result
except Exception as e:
if verbose:
print(f"✗ sep={current_separation}h: {str(e)}")
# Increase separation
current_separation += separation_step_hours
if iteration > max_iterations:
break
# No solution found meeting min_samples requirement
if best_result is None:
# Return empty result
return {
'threshold': np.nan,
'percentile': np.nan,
'separation_hours': np.nan,
'n_raw_exceedances': 0,
'n_independent': 0,
'exceedances': np.array([]),
'exceedance_times': pd.DatetimeIndex([]) if self.has_datetime else None,
'success': False,
'iterations': iteration,
'warning': 'No exceedances found in search range'
}
# Return best effort result
best_result['warning'] = (
f"Could not find threshold with {min_samples} samples. "
f"Best result: {best_result['n_independent']} samples. "
f"Consider relaxing constraints."
)
if verbose:
print(f"\n⚠️ {best_result['warning']}")
return best_result
[docs]
@staticmethod
def plot_directional_return_values(
directional_results: Dict[str, Dict[str, Any]],
return_periods: list = None,
colors: list = None,
overlay: bool = False,
show_values: bool = True,
figsize: tuple = None,
title: str = None
) -> tuple:
"""
Create polar plots of return values by direction.
This method creates polar plots showing how return values vary by wind
direction for different return periods. Can create separate subplots for
each return period or overlay all periods in a single plot.
Parameters
----------
directional_results : dict
Dictionary with sector names as keys and result dictionaries as values.
Each result dictionary must contain:
- 'center_deg': Center angle of sector in degrees
- 'return_values': Dictionary mapping return periods to values
- 'success': Boolean indicating if fit was successful
return_periods : list of int, optional
Return periods to plot (max 4). If None, uses [10, 20, 50, 100].
Examples: [10, 50], [1, 5, 10, 20], [100]
colors : list of str, optional
Colors for each return period. If None, uses default palette.
Must have same length as return_periods.
overlay : bool, default=False
If True, plot all return periods on single polar plot.
If False, create separate subplot for each return period.
show_values : bool, default=True
If True, display numeric values on the plot.
figsize : tuple, optional
Figure size (width, height). If None, automatically determined.
title : str, optional
Main figure title. If None, uses default title.
Returns
-------
fig : matplotlib.figure.Figure
The figure object
axes : matplotlib.axes.Axes or array of Axes
The axes object(s)
Raises
------
ValueError
If return_periods has more than 4 values
If colors length doesn't match return_periods length
If directional_results is empty or invalid
Examples
--------
>>> # Assuming you have fitted distributions for each sector
>>> directional_results = {}
>>> for sector in sectors:
... # ... fit distribution and calculate return values ...
... directional_results[sector] = {
... 'center_deg': center_angle,
... 'return_values': {10: rv10, 50: rv50, 100: rv100},
... 'success': True
... }
>>>
>>> # Create separate subplots for each return period
>>> fig, axes = extremes.plot_directional_return_values(
... directional_results,
... return_periods=[10, 50, 100]
... )
>>>
>>> # Create single overlay plot
>>> fig, ax = extremes.plot_directional_return_values(
... directional_results,
... return_periods=[10, 50, 100],
... overlay=True
... )
>>>
>>> # Customize colors and appearance
>>> fig, axes = extremes.plot_directional_return_values(
... directional_results,
... return_periods=[20, 100],
... colors=['#FF6B6B', '#4ECDC4'],
... figsize=(12, 6)
... )
>>> plt.show()
Notes
-----
- Maximum 4 return periods can be plotted
- If return_periods <= 2 and not overlay, creates single row of subplots
- If return_periods == 1 and not overlay, creates single subplot
- In overlay mode, all periods shown on one polar plot with legend
- Automatically handles missing/failed sectors
"""
import matplotlib.pyplot as plt
# Validate inputs
if not directional_results:
raise ValueError("directional_results cannot be empty")
# Default return periods
if return_periods is None:
return_periods = [10, 20, 50, 100]
# Validate return periods
if len(return_periods) > 4:
raise ValueError("Maximum 4 return periods can be plotted")
if len(return_periods) == 0:
raise ValueError("At least one return period must be specified")
# Default colors
if colors is None:
default_colors = ['#2E86AB', '#A23B72', '#F18F01', '#C73E1D']
colors = default_colors[:len(return_periods)]
elif len(colors) != len(return_periods):
raise ValueError("Number of colors must match number of return periods")
# Extract sector information and sort by angle (not alphabetically)
# This ensures the polar plot forms a proper circle
sector_data = []
for sector_name, result in directional_results.items():
if 'center_deg' in result:
sector_data.append((sector_name, result['center_deg']))
# Sort by angle to get proper circular order
sector_data.sort(key=lambda x: x[1])
sector_names = [name for name, _ in sector_data]
if not sector_names:
raise ValueError("No sectors found in directional_results")
# Determine subplot layout
n_periods = len(return_periods)
if overlay:
# Single plot with all periods overlaid
if figsize is None:
figsize = (10, 10)
fig = plt.figure(figsize=figsize)
ax = fig.add_subplot(111, projection='polar')
axes = ax
# Plot each return period
for i, rp in enumerate(return_periods):
angles = []
values = []
for sector_name in sector_names:
result = directional_results[sector_name]
if result.get('success', False) and 'return_values' in result:
rv_dict = result['return_values']
if rp in rv_dict and not np.isnan(rv_dict[rp]):
angle_center = result['center_deg']
angles.append(np.deg2rad(angle_center))
values.append(rv_dict[rp])
if angles:
# Close the plot
angles.append(angles[0])
values.append(values[0])
# Plot line
ax.plot(angles, values, 'o-', linewidth=2.5, markersize=8,
color=colors[i], label=f'{rp}-year', alpha=0.8)
ax.fill(angles, values, alpha=0.15, color=colors[i])
# Format
ax.set_theta_zero_location('N')
ax.set_theta_direction(-1)
if title is None:
title = 'Return Values by Direction'
ax.set_title(title, fontsize=14, fontweight='bold', pad=20)
ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0), fontsize=10)
ax.grid(True, alpha=0.3)
# Add value labels if requested
if show_values:
for i, rp in enumerate(return_periods):
angles = []
values = []
for sector_name in sector_names:
result = directional_results[sector_name]
if result.get('success', False) and 'return_values' in result:
rv_dict = result['return_values']
if rp in rv_dict and not np.isnan(rv_dict[rp]):
angle_center = result['center_deg']
angles.append(np.deg2rad(angle_center))
values.append(rv_dict[rp])
if angles and i == 0: # Only show values for first period to avoid clutter
# Create text box per sector with all values
for j, (angle, _) in enumerate(zip(angles, values)):
sector_name = sector_names[j]
result = directional_results[sector_name]
rv_dict = result.get('return_values', {})
# Build text with all return periods
text_lines = []
for rp_val in return_periods:
if rp_val in rv_dict and not np.isnan(rv_dict[rp_val]):
text_lines.append(f'{rp_val}yr: {rv_dict[rp_val]:.1f}')
if text_lines:
text = '\n'.join(text_lines)
# Position text slightly outside the plot
max_val = max([v for v in values if not np.isnan(v)])
r_pos = max_val * 1.15
ax.text(angle, r_pos, text, ha='center', va='center',
fontsize=7, bbox=dict(boxstyle='round,pad=0.3',
facecolor='white', alpha=0.8, edgecolor='gray'))
else:
# Separate subplots for each return period
if n_periods == 1:
# Single subplot
if figsize is None:
figsize = (8, 8)
fig = plt.figure(figsize=figsize)
axes = [fig.add_subplot(111, projection='polar')]
nrows, ncols = 1, 1
elif n_periods == 2:
# Single row
if figsize is None:
figsize = (14, 6)
fig, axes = plt.subplots(1, 2, figsize=figsize,
subplot_kw=dict(projection='polar'))
axes = axes.flatten()
else:
# Grid layout (2 rows)
if figsize is None:
figsize = (14, 10)
nrows = 2
ncols = 2
fig, axes = plt.subplots(nrows, ncols, figsize=figsize,
subplot_kw=dict(projection='polar'))
axes = axes.flatten()
# Plot each return period
for i, rp in enumerate(return_periods):
ax = axes[i]
# Get return values for this period
angles = []
values = []
for sector_name in sector_names:
result = directional_results[sector_name]
if result.get('success', False) and 'return_values' in result:
rv_dict = result['return_values']
if rp in rv_dict and not np.isnan(rv_dict[rp]):
angle_center = result['center_deg']
angles.append(np.deg2rad(angle_center))
values.append(rv_dict[rp])
if not angles:
ax.text(0.5, 0.5, 'No data available',
ha='center', va='center', transform=ax.transAxes)
continue
# Close the plot
angles.append(angles[0])
values.append(values[0])
# Plot
ax.plot(angles, values, 'o-', linewidth=2.5, markersize=8,
color=colors[i], alpha=0.8)
ax.fill(angles, values, alpha=0.25, color=colors[i])
# Format
ax.set_theta_zero_location('N')
ax.set_theta_direction(-1)
ax.set_title(f'{rp}-Year Return Value', fontsize=12,
fontweight='bold', pad=20)
ax.set_xticks(np.deg2rad(np.arange(0, 360, 45)))
ax.set_xticklabels(['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'])
ax.grid(True, alpha=0.3)
# Add value labels if requested
if show_values:
for angle, value in zip(angles[:-1], values[:-1]):
if not np.isnan(value):
ax.text(angle, value * 1.1, f'{value:.1f}',
ha='center', va='center', fontsize=8,
bbox=dict(boxstyle='round,pad=0.3',
facecolor='white', alpha=0.8))
# Set main title
if title is None:
title = 'Return Values by Direction (m/s)'
fig.suptitle(title, fontsize=14, fontweight='bold', y=0.98)
plt.tight_layout()
return fig, axes
[docs]
def __repr__(self) -> str:
"""String representation of the analyzer."""
dist_info = f", distribution={self.distribution_name}" if self.distribution_name else ""
time_info = f", time_span={self.time_span:.1f} {self.time_unit}"
return f"ExtremesAnalyzer(n_points={len(self.data)}{time_info}{dist_info})"