Source code for magica.core.magic_adjuster

"""
Statistical distribution fitting and adjustment for wind data
"""

import numpy as np
from scipy import stats
from scipy.interpolate import UnivariateSpline
from scipy.signal import savgol_filter
from typing import Union, Dict, Any, Optional, Tuple, List
import warnings
from tqdm import tqdm, trange

from .data_processor import DataProcessor


def get_available_distributions():
    """
    Get all available distribution names and their scipy.stats objects.
    
    Returns
    -------
    dict
        Dictionary mapping distribution names to scipy.stats objects
    """
    return {
        # Complete and validated mapping of scipy.stats continuous distributions
        'alpha': stats.alpha,
        'anglit': stats.anglit,
        'arcsine': stats.arcsine,
        'argus': stats.argus,
        'beta': stats.beta,
        'betaprime': stats.betaprime,
        'bradford': stats.bradford,
        'burr': stats.burr,
        'burr12': stats.burr12,
        'cauchy': stats.cauchy,
        'chi': stats.chi,
        'chi2': stats.chi2,
        'cosine': stats.cosine,
        'crystalball': stats.crystalball,
        'dgamma': stats.dgamma,
        # 'dpareto_lognorm': stats.dpareto_lognorm,  # Not available in SciPy 1.14
        'dweibull': stats.dweibull,
        'erlang': stats.erlang,
        'expon': stats.expon,
        'exponnorm': stats.exponnorm,
        'exponpow': stats.exponpow,
        'exponweib': stats.exponweib,
        'f': stats.f,
        'fatiguelife': stats.fatiguelife,
        'fisk': stats.fisk,
        'foldcauchy': stats.foldcauchy,
        'foldnorm': stats.foldnorm,
        'gamma': stats.gamma,
        'gausshyper': stats.gausshyper,
        'genexpon': stats.genexpon,
        'genextreme': stats.genextreme,
        'gengamma': stats.gengamma,
        'genhalflogistic': stats.genhalflogistic,
        'genhyperbolic': stats.genhyperbolic,
        'geninvgauss': stats.geninvgauss,
        'genlogistic': stats.genlogistic,
        'gennorm': stats.gennorm,
        'genpareto': stats.genpareto,
        'gibrat': stats.gibrat,
        'gompertz': stats.gompertz,
        'gumbel_l': stats.gumbel_l,
        'gumbel_r': stats.gumbel_r,
        'halfcauchy': stats.halfcauchy,
        'halfgennorm': stats.halfgennorm,
        'halflogistic': stats.halflogistic,
        'halfnorm': stats.halfnorm,
        'hypsecant': stats.hypsecant,
        'invgamma': stats.invgamma,
        'invgauss': stats.invgauss,
        'invweibull': stats.invweibull,
        'irwinhall': stats.irwinhall,
        'jf_skew_t': stats.jf_skew_t,
        'johnsonsb': stats.johnsonsb,
        'johnsonsu': stats.johnsonsu,
        'kappa3': stats.kappa3,
        'kappa4': stats.kappa4,
        'ksone': stats.ksone,
        'kstwo': stats.kstwo,
        'kstwobign': stats.kstwobign,
        # 'landau': stats.landau,  # Not available in SciPy 1.14
        'laplace': stats.laplace,
        'laplace_asymmetric': stats.laplace_asymmetric,
        'levy': stats.levy,
        'levy_l': stats.levy_l,
        #'levy_stable': stats.levy_stable,
        'loggamma': stats.loggamma,
        'logistic': stats.logistic,
        'loglaplace': stats.loglaplace,
        'lognorm': stats.lognorm,
        'loguniform': stats.loguniform,
        'lomax': stats.lomax,
        'maxwell': stats.maxwell,
        'mielke': stats.mielke,
        'moyal': stats.moyal,
        'nakagami': stats.nakagami,
        'ncf': stats.ncf,
        'nct': stats.nct,
        'ncx2': stats.ncx2,
        'norm': stats.norm,
        'norminvgauss': stats.norminvgauss,
        'pareto': stats.pareto,
        'pearson3': stats.pearson3,
        'powerlaw': stats.powerlaw,
        'powerlognorm': stats.powerlognorm,
        'powernorm': stats.powernorm,
        'rayleigh': stats.rayleigh,
        'rdist': stats.rdist,
        'recipinvgauss': stats.recipinvgauss,
        'reciprocal': stats.reciprocal,
        'rel_breitwigner': stats.rel_breitwigner,
        'rice': stats.rice,
        'semicircular': stats.semicircular,
        'skewcauchy': stats.skewcauchy,
        'skewnorm': stats.skewnorm,
        #'studentized_range': stats.studentized_range,
        'students_t': stats.t,  # More descriptive name for t-distribution
        't': stats.t,  # Also support direct 't' name
        'trapezoid': stats.trapezoid,
        'trapz': stats.trapezoid,  # Common alias
        'triang': stats.triang,
        'truncexpon': stats.truncexpon,
        'truncnorm': stats.truncnorm,
        'truncpareto': stats.truncpareto,
        'truncweibull_min': stats.truncweibull_min,
        'tukeylambda': stats.tukeylambda,
        'uniform': stats.uniform,
        'vonmises': stats.vonmises,
        'vonmises_line': stats.vonmises_line,
        'wald': stats.wald,
        'weibull': stats.weibull_min,  # Common alias for Weibull
        'weibull_max': stats.weibull_max,
        'weibull_min': stats.weibull_min,
        'wrapcauchy': stats.wrapcauchy
    }


[docs] class MagicAdjuster: """ Simple class for statistical distribution fitting using SciPy. This class takes processed data and fits statistical distributions, designed to be extended with goodness-of-fit tests and advanced features. """
[docs] def __init__(self, data_processor: DataProcessor): """ Initialize the adjuster with processed data. Parameters ---------- data_processor : DataProcessor Processor instance with loaded data """ if data_processor.data is None: raise ValueError("DataProcessor must have data loaded.") self.data_processor = data_processor self.data = data_processor.get_data_array() self.fitted_distribution = None self.fitted_params = None self.distribution_name = None
[docs] def fit_distribution(self, distribution: Union[str, object], **kwargs) -> 'MagicAdjuster': """ Fit a statistical distribution to the data. Parameters ---------- distribution : str or scipy.stats distribution Distribution to fit. Can be: - String: 'weibull', 'gamma', 'lognorm', 'norm', etc. - SciPy distribution object: stats.weibull_min, stats.gamma, etc. **kwargs : dict Additional arguments passed to the distribution's fit method Returns ------- MagicAdjuster Adjuster instance with fitted distribution """ # Handle string distribution names if isinstance(distribution, str): distribution_map = get_available_distributions() if distribution.lower() not in distribution_map: available = list(distribution_map.keys()) raise ValueError(f"Unknown distribution '{distribution}'. Available: {available}") self.fitted_distribution = distribution_map[distribution.lower()] self.distribution_name = distribution.lower() else: # Assume it's a SciPy distribution object self.fitted_distribution = distribution self.distribution_name = getattr(distribution, 'name', str(distribution)) # Fit the distribution try: self.fitted_params = self.fitted_distribution.fit(self.data, **kwargs) except Exception as e: raise RuntimeError(f"Failed to fit {self.distribution_name} distribution: {e}") return self
[docs] def __getattr__(self, name): """ Delegate method calls to the fitted scipy distribution with smart defaults. This allows direct access to all scipy.stats methods like: cdf, pdf, ppf, sf, isf, rvs, stats, etc. For methods that commonly evaluate distributions at data points (pdf, cdf, sf, logpdf, logcdf, logsf), if no input is provided, the original data will be used automatically. """ if self.fitted_distribution is None or self.fitted_params is None: raise ValueError("No distribution has been fitted yet. Call fit_distribution() first.") # Check if the method exists in the distribution class (not frozen) if not hasattr(self.fitted_distribution, name): raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") # Methods that commonly use the original data as default input data_aware_methods = { 'pdf', 'cdf', 'sf', 'logpdf', 'logcdf', 'logsf', 'ppf', 'isf', 'interval' } # Get the original method from the distribution class original_method = getattr(self.fitted_distribution, name) if name in data_aware_methods: def smart_wrapper(*args, **kwargs): """ Smart wrapper that uses original data as default for common methods. If the first argument is not provided for evaluation methods, use the original data points. If parameters are not provided, use the fitted parameters. """ # For methods that evaluate at data points, use smart defaults if len(args) == 0 and name in ['pdf', 'cdf', 'sf', 'logpdf', 'logcdf', 'logsf']: return original_method(self.data, *self.fitted_params, **kwargs) # If only data provided (1 arg), add fitted parameters elif len(args) == 1 and name in ['pdf', 'cdf', 'sf', 'logpdf', 'logcdf', 'logsf']: return original_method(args[0], *self.fitted_params, **kwargs) # For ppf, isf, interval - these need explicit input, use frozen distribution elif name in ['ppf', 'isf', 'interval']: frozen_dist = self.fitted_distribution(*self.fitted_params) return getattr(frozen_dist, name)(*args, **kwargs) else: # Normal call with all provided arguments return original_method(*args, **kwargs) # Copy metadata from original method smart_wrapper.__name__ = getattr(original_method, '__name__', name) smart_wrapper.__doc__ = getattr(original_method, '__doc__', None) return smart_wrapper else: # For other methods, use frozen distribution for convenience frozen_dist = self.fitted_distribution(*self.fitted_params) return getattr(frozen_dist, name)
[docs] def get_fitted_params(self) -> Tuple: """ Get the fitted distribution parameters. Returns ------- tuple Fitted parameters of the distribution """ if self.fitted_params is None: raise ValueError("No distribution has been fitted yet.") return self.fitted_params
[docs] def get_distribution_info(self) -> Dict[str, Any]: """ Get information about the fitted distribution. Returns ------- dict Dictionary with distribution information """ if self.fitted_distribution is None: raise ValueError("No distribution has been fitted yet.") return { 'name': self.distribution_name, 'parameters': self.fitted_params, 'num_params': len(self.fitted_params), 'data_size': len(self.data) }
[docs] def __repr__(self) -> str: """String representation of the object.""" if self.fitted_distribution is None: return f"MagicAdjuster(data_size={len(self.data)}, no distribution fitted)" return f"MagicAdjuster(data_size={len(self.data)}, distribution='{self.distribution_name}')"
[docs] def get_bin_number_sturges(self): """ Calculate the optimal number of bins using Sturges' rule. Sturges' rule is a simple heuristic for determining the number of bins in a histogram. It assumes that the data follows a normal distribution and is best suited for smaller datasets. Returns ------- int Number of bins calculated using Sturges' rule """ N = len(self.data) return int(1 + np.log2(N))
[docs] def get_bin_number_rice(self): """ Calculate the optimal number of bins using Rice's rule. Rice's rule suggests a bin count that scales with the cube root of the dataset size. It is a simple alternative to Sturges' rule and works well for larger datasets. Returns ------- int Number of bins calculated using Rice's rule """ N = len(self.data) return int(2 * N**(1/3))
[docs] def get_bin_number_freedman_diaconis(self): """ Calculate the number of bins based on the Freedman-Diaconis rule. This rule uses the interquartile range (IQR) to calculate bin width and is robust for skewed distributions. Returns ------- int Number of bins calculated using Freedman-Diaconis rule """ iqr = np.percentile(self.data, 75) - np.percentile(self.data, 25) bin_width = 2 * iqr / len(self.data)**(1/3) return max(1, int((max(self.data) - min(self.data)) / bin_width))
[docs] def get_bin_number_scott(self): """ Calculate the bin width based on Scott's rule. Scott's rule minimizes the integrated mean squared error for normal distributions, but can also work for large datasets. Returns ------- int Number of bins calculated using Scott's rule """ bin_width = 3.5 * np.std(self.data) / len(self.data)**(1/3) return max(1, int((max(self.data) - min(self.data)) / bin_width))
[docs] def get_bin_number_doane(self): """ Calculate the optimal number of bins using Doane's formula. Doane's formula is an extension of Sturges' rule that accounts for the skewness of the data distribution. This method is particularly useful when dealing with non-normal data distributions, as it adjusts the bin count based on the sample skewness. Returns ------- int Number of bins calculated using Doane's rule """ N = len(self.data) g1 = stats.skew(self.data) sigma_g1 = np.sqrt((6 * (N - 2)) / ((N + 1) * (N + 3))) return int(1 + np.log2(N) + np.log2(1 + abs(g1) / sigma_g1))
[docs] def get_num_bins(self, bins='doane'): """ Determines the number of bins for histogram plotting based on a chosen method. Parameters ---------- bins : int or str, optional The binning method to use. Default is 'doane'. - Integer (e.g., 30): A fixed number of bins. - 'sturges': Sturges' rule (log-based, best for normal distributions). - 'freedman-diaconis': Uses Freedman-Diaconis rule (best for skewed distributions). - 'rice': Uses Rice’s rule (scales with cube root of dataset size). - 'scott': Uses Scott’s rule (minimizes IMSE for normal distributions). - 'doane': Uses Doane's rule (extension of Sturges' rule that accounts for the skewness of the data distribution). Returns ------- int The computed number of bins. Raises ------ ValueError If an unsupported binning method is provided. """ if bins == 'sturges': num_bins = self.get_bin_number_sturges() elif bins == 'rice': return self.get_bin_number_rice() elif bins == 'scott': num_bins = self.get_bin_number_scott() elif bins == 'freedman-diaconis': num_bins = self.get_bin_number_freedman_diaconis() elif bins == 'doane': num_bins = self.get_bin_number_doane() else: num_bins = bins # If a specific number of bins is provided return num_bins
[docs] def goodness_of_fit(self, method: str, bins: Union[str, int] = 'doane', warn_on_normalization: bool = True): """ Perform goodness-of-fit test on the fitted distribution. Parameters ---------- method : str Test method: 'chi2', 'ks', 'rmse', 'aic', or 'bic' bins : int or str, default='doane' Binning method for chi2 and rmse tests warn_on_normalization : bool, default=True Whether to warn about frequency normalization in chi2 test Returns ------- dict or float Test results (dict for statistical tests, float for RMSE/AIC/BIC) """ if self.fitted_distribution is None or self.fitted_params is None: raise ValueError("No distribution has been fitted yet. Call fit_distribution() first.") if method.lower() in ['chisquared', 'chi2']: n_bins = self.get_num_bins(bins) params = self.fitted_params # Empirical frequencies observed_freq, bin_edges = np.histogram(self.data, bins=n_bins) # Compute theoretical probabilities for each bin bin_probs = np.diff(self.fitted_distribution.cdf(bin_edges, *params)) # Expected frequencies = probabilities × total sample size expected_freq = bin_probs * len(self.data) # Check if normalization is needed discrepancy = abs(expected_freq.sum() - observed_freq.sum()) if discrepancy > 1e-6 and warn_on_normalization: warnings.warn(f"Normalizing expected frequencies. Original sum: {expected_freq.sum():.6f}, " f"Target sum: {observed_freq.sum()}") # Normalize only if necessary if discrepancy > 1e-10: expected_freq *= observed_freq.sum() / expected_freq.sum() # Chi-square test chi_stats = stats.chisquare(observed_freq, f_exp=expected_freq) return { 'chi2_statistic': chi_stats.statistic, 'p_value': chi_stats.pvalue, 'n_bins': n_bins, 'observed_freq': observed_freq, 'expected_freq': expected_freq } elif method.lower() in ['kolmogorov-smirnov', 'ks']: # KS test ks_stats = stats.kstest(self.data, self.fitted_distribution.cdf, args=self.fitted_params) return { 'ks_statistic': ks_stats.statistic, 'p_value': ks_stats.pvalue } elif method.lower() in ['root-mean-square-error', 'rmse']: # RMSE between empirical and theoretical CDF sorted_data = np.sort(self.data) empirical_cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data) theoretical_cdf = self.fitted_distribution.cdf(sorted_data, *self.fitted_params) rmse = np.sqrt(np.mean((empirical_cdf - theoretical_cdf) ** 2)) return rmse elif method.lower() == 'aic': # Akaike Information Criterion # AIC = 2k - 2ln(L) where k = number of parameters, L = likelihood log_likelihood = np.sum(self.fitted_distribution.logpdf(self.data, *self.fitted_params)) k = len(self.fitted_params) aic = 2 * k - 2 * log_likelihood return aic elif method.lower() == 'bic': # Bayesian Information Criterion # BIC = k*ln(n) - 2ln(L) where k = params, n = sample size, L = likelihood log_likelihood = np.sum(self.fitted_distribution.logpdf(self.data, *self.fitted_params)) k = len(self.fitted_params) n = len(self.data) bic = k * np.log(n) - 2 * log_likelihood return bic else: raise ValueError(f"Unknown goodness-of-fit method: {method}. " f"Available: 'chi2', 'ks', 'rmse', 'aic', 'bic'")
def _generate_subsample_indices( self, size: int, n_repeats: int, sampling: str = 'random', seed: Optional[int] = None, ) -> List[np.ndarray]: """ Generate index arrays for subsamples according to a chosen strategy. Parameters ---------- size : int Number of elements in each subsample. n_repeats : int Number of subsamples (repeats) to generate per size. sampling : {'random','bootstrap','disjoint'}, optional Sampling strategy. Defaults to 'random'. seed : int, optional Seed for reproducible RNG. If None, RNG will be random. Returns ------- List[numpy.ndarray] A list with `n_subsamples` index arrays (dtype int) pointing into ``self.data``. Notes ----- - This function returns index arrays (views) and does not copy the data itself. - 'random' = sampling without replacement (size must be <= len(data)). - 'bootstrap' = sampling with replacement (allows size > len(data)). - 'disjoint' = non-overlapping partitions; will raise if size > len(data). """ # local import to keep top-level imports minimal rng = np.random.default_rng(seed) N = len(self.data) if sampling not in {'random', 'bootstrap', 'disjoint'}: raise ValueError(f"Unknown sampling strategy: {sampling}") if sampling == 'random' and size > N: raise ValueError("For 'random' sampling size must be <= len(data). Use 'bootstrap' to allow size > N.") indices: List[np.ndarray] = [] if sampling == 'bootstrap': # with replacement for _ in range(n_repeats): idx = rng.integers(0, N, size=size) indices.append(idx) elif sampling == 'random': # without replacement for _ in range(n_repeats): idx = rng.choice(N, size=size, replace=False) indices.append(idx) else: # disjoint if size <= 0: raise ValueError("size must be > 0 for disjoint sampling") per_pass = N // size if per_pass == 0: raise ValueError("disjoint sampling not possible: size > len(data)") subsamples_needed = n_repeats while subsamples_needed > 0: perm = rng.permutation(N) for i in range(per_pass): if subsamples_needed == 0: break start = i * size idx = perm[start:start + size] indices.append(idx) subsamples_needed -= 1 return indices
[docs] def monte_carlo_fit( self, sizes: Optional[List[int]] = None, n_repeats: int = 20, tests: List[str] = ['ks'], stability_method: str = 'kneedle', fig_output_path: Optional[str] = None, plot_type: str = 'series', sampling: str = 'random', seed: Optional[int] = None, min_size: int = 50, max_size: Optional[int] = None, n_sizes: int = 10, distribution_params: Optional[Tuple] = None, **kwargs ): """ Perform Monte Carlo stability analysis for distribution fitting. This method evaluates how stable distribution parameters and goodness-of-fit statistics are across different sample sizes, helping determine the minimum sample size needed for reliable parameter estimation. Parameters ---------- sizes : list[int], optional Explicit list of sample sizes to test. If None, a grid of sizes between min_size and max_size is created. n_repeats : int, default=20 Number of subsamples (repetitions) for each size. tests : list[str], default=['ks'] Goodness-of-fit tests to perform. Options: 'ks', 'chi2', 'rmse'. **Recommendation**: Always include 'rmse' for best stability detection. stability_method : str, default='cv' Method for detecting stability points. Options: - 'cv': Coefficient of variation approach (default, robust) - 'kneedle': Kneedle algorithm on median curves (best for RMSE) - 'plateau': Relative gain heuristic (good for converging parameters) - 'aggregate': Median-based detection with tolerance windows (legacy) - 'detect': Alias for 'cv' (backward compatibility) - None or 'none': Skip stability detection fig_output_path : str, optional When provided, a 2x3 summary figure is generated and saved to this path. The Dataset attribute 'figure_path' will contain the path. If None (default) no figure is created (saves time in large runs). plot_type : {'series','boxplots'}, default='series' Visual style per panel. 'series' plots medians with IQR shading; 'boxplots' draws boxplots per size. sampling : {'random','bootstrap','disjoint'}, default='random' Sampling strategy for subsamples. seed : int, optional Random seed for reproducibility (propagated to subsamples). min_size : int, default=50 Minimum sample size when auto-generating sizes. max_size : int, optional Maximum sample size. Defaults to full dataset if None. n_sizes : int, default=10 Number of sizes in automatically generated grid. distribution_params : tuple, optional If provided, these fixed parameters are used for all subsamples (no refitting). Use only for scenarios evaluating GOF with known parameters. If None (default), each subsample is refitted. **kwargs Additional parameters: - bins : int or str, default='doane' Binning for chi-square test. - fit_kwargs : dict Passed to fit_distribution for parameter constraints (e.g., fit_kwargs={'floc': 0}). Stability detection parameters (method-specific): For 'cv' method: - window_size : int, default=len(sizes)//4 Window size for CV consistency check - cv_threshold : float, default=0.1 Maximum acceptable coefficient of variation (10%) For 'kneedle' method: - smooth : bool, default=True Whether to smooth curves before detection - smoothing_method : str, default='savgol' Smoothing method: 'savgol' or 'spline' For 'plateau' method: - consecutive_points : int, default=3 Number of consecutive points for plateau (L parameter) - relative_tolerance : float, default=0.01 Maximum relative change threshold (Δ parameter, 1%) Returns ------- xarray.Dataset Monte Carlo results with dimensions: - **sizes** : Sample sizes tested - **repeats** : Repetition index for each size Data variables include: - **param_0, param_1, ...** : Fitted distribution parameters - **ks_statistic, ks_pvalue** : Kolmogorov-Smirnov test results - **chi2_statistic, chi2_pvalue** : Chi-square test results - **rmse** : Root mean square error values Attributes include: - **distribution** : Distribution name - **original_data_size** : Size of original dataset - **sampling_method** : Sampling strategy used - **bins_method** : Binning method for chi-square test - **stability_method** : Detection method used - **stability_points** : Detected stability points per variable - **recommended_size** : Recommended sample size (from primary metric) - **primary_metric** : Metric used for recommendation (typically 'rmse') - **stable_pvalue_ks** : KS p-value at stable size (if applicable) - **stable_pvalue_chi2** : Chi-square p-value at stable size (if applicable) - **stable_rmse** : RMSE at stable size (if applicable) - **figure_path** : Path to saved 2x3 summary figure (only if generated) - **created_by** : Method identifier Examples -------- >>> # Basic usage with recommended settings (includes RMSE!) >>> results = adjuster.monte_carlo_fit( ... n_repeats=50, ... tests=['ks', 'chi2', 'rmse'], ... stability_method='kneedle' # Best for RMSE ... ) >>> # Access stability information >>> stability = results.attrs['stability_points'] >>> recommended_size = results.attrs['recommended_size'] >>> print(f"RMSE stabilizes at n={stability['rmse']['size']}") >>> # Use fit constraints (e.g., fix location for Weibull) >>> results = adjuster.monte_carlo_fit( ... n_repeats=100, ... tests=['rmse', 'ks'], ... stability_method='plateau', ... fit_kwargs={'floc': 0} ... ) >>> # Use pre-calculated parameters (bypass fitting entirely) >>> known_params = (2.0, 0.0, 1.0) # shape, loc, scale >>> results = adjuster.monte_carlo_fit( ... distribution_params=known_params, ... n_repeats=50, ... tests=['chi2', 'ks'] ... ) >>> # Generate figure with automatic stability detection >>> results = adjuster.monte_carlo_fit( ... tests=['ks', 'rmse'], ... stability_method='kneedle', ... fig_output_path='mc_analysis.png' ... ) Notes ----- **Fitting Strategy:** By default, each subsample gets an independent fit to evaluate how parameter estimates change with sample size. This is the correct approach for stability analysis since parameter estimation quality depends on sample size. **Stability Detection Methods:** 1. **CV (Coefficient of Variation)**: Default method, robust and works for all variables. Looks for when std/mean becomes consistently small. 2. **Kneedle**: Best for RMSE and converging parameters. Finds the "elbow" point where improvement rate changes. Requires curve smoothing. 3. **Plateau**: Good for metrics with clear convergence. Uses relative gain heuristic (Δᵢ = |y(i-1) - y(i)| / |y(i-1)|). **Choosing the Right Method:** - Use **'kneedle'** with **RMSE** for clearest stability detection - Use **'cv'** for general-purpose analysis - Use **'plateau'** when you expect clear convergence behavior **Parameter Constraints:** Use `fit_kwargs` to impose constraints during fitting (e.g., `floc=0` for Weibull distributions). These constraints apply to all subsample fits. **Pre-calculated Parameters:** Use `distribution_params` only when you want to evaluate goodness-of-fit with known/fixed parameters across all sample sizes, bypassing fitting entirely. """ # Extract kwargs bins = kwargs.get('bins', 'doane') fit_kwargs = kwargs.get('fit_kwargs', {}) N = len(self.data) if N == 0: raise ValueError("No data available for monte_carlo_fit.") # Determine sizes if sizes is None: if max_size is None: max_size = N sizes = np.unique(np.linspace(min_size, max_size, n_sizes, dtype=int)) sizes = sizes[sizes > 0] if sizes.size == 0: raise ValueError("Generated empty sizes grid; check min_size and n_sizes values.") else: sizes = np.unique(np.asarray(sizes, dtype=int)) if sizes.size == 0: raise ValueError("`sizes` provided is empty or invalid") # Determine repeats per size if n_repeats is None: smallest = int(sizes[0]) n_repeats = max(10, min(100, N // smallest)) test_list = [t.lower() for t in tests] if tests is not None else ['ks'] # Determine max number of parameters if distribution_params is not None: # Use pre-calculated parameters to determine count max_params = len(distribution_params) else: # Fit a small sample to determine parameter count sample_idx = np.random.choice(len(self.data), min(100, len(self.data)), replace=False) sample_data = self.data[sample_idx] temp_dp = DataProcessor(sample_data) temp_adj = temp_dp._get_adjuster() temp_adj.fit_distribution(self.fitted_distribution, **fit_kwargs) max_params = len(temp_adj.get_fitted_params()) # Initialize data arrays n_sizes = len(sizes) param_arrays = {} for i in range(max_params): param_arrays[f'param_{i}'] = np.full((n_sizes, n_repeats), np.nan) # Initialize test result arrays test_arrays = {} for test in test_list: if test == 'ks': test_arrays['ks_statistic'] = np.full((n_sizes, n_repeats), np.nan) test_arrays['ks_pvalue'] = np.full((n_sizes, n_repeats), np.nan) elif test == 'chi2': test_arrays['chi2_statistic'] = np.full((n_sizes, n_repeats), np.nan) test_arrays['chi2_pvalue'] = np.full((n_sizes, n_repeats), np.nan) elif test == 'rmse': test_arrays['rmse'] = np.full((n_sizes, n_repeats), np.nan) # Setup random seeds if seed is None: child_seq = [None] * n_sizes else: ss = np.random.SeedSequence(seed) child_seq = ss.spawn(n_sizes) # Prepare storage for aggregate-style results if requested aggregate_results = {'results': {int(s): [] for s in sizes}} if stability_method == 'aggregate' else None # Main Monte Carlo loop sizes_iter = tqdm(list(enumerate(sizes)), desc='Monte Carlo sizes') for i, size in sizes_iter: child_seed = child_seq[i] idx_list = self._generate_subsample_indices( size=int(size), n_repeats=int(n_repeats), sampling=sampling, seed=child_seed ) # Inner progress bar for repeats rep_iter = trange(len(idx_list), desc=f'size={size}', leave=False) for rep_j in rep_iter: try: sub_idx = idx_list[rep_j] subdata = self.data[sub_idx] # Create temporary adjuster dp = DataProcessor(subdata) adj = dp._get_adjuster() if self.fitted_distribution is None: raise ValueError("No fitted distribution available.") # Always fit new parameters for each subsample # Use distribution_params only if explicitly provided (for pre-calculated scenarios) if distribution_params is not None: # Use pre-calculated parameters (bypass fitting) adj.fitted_distribution = self.fitted_distribution adj.fitted_params = distribution_params params = distribution_params else: # Fit new parameters with optional constraints from fit_kwargs adj.fit_distribution(self.fitted_distribution, **fit_kwargs) params = adj.get_fitted_params() # Store parameters for param_idx, param_val in enumerate(params): if param_idx < max_params: param_arrays[f'param_{param_idx}'][i, rep_j] = param_val # If aggregate collection requested, store per-rep entry if aggregate_results is not None: rep_entry = {'params': params, 'gof': {}} # Run goodness-of-fit tests for test in test_list: try: if test == 'ks': ks_res = adj.goodness_of_fit('ks') test_arrays['ks_statistic'][i, rep_j] = ks_res.get('statistic', ks_res.get('ks_statistic', np.nan)) test_arrays['ks_pvalue'][i, rep_j] = ks_res.get('p_value', np.nan) if aggregate_results is not None: rep_entry['gof']['ks'] = ks_res elif test == 'chi2': chi_res = adj.goodness_of_fit('chi2', bins=bins, warn_on_normalization=False) test_arrays['chi2_statistic'][i, rep_j] = chi_res.get('statistic', chi_res.get('chi2_statistic', np.nan)) test_arrays['chi2_pvalue'][i, rep_j] = chi_res.get('p_value', np.nan) if aggregate_results is not None: rep_entry['gof']['chi2'] = chi_res elif test == 'rmse': test_arrays['rmse'][i, rep_j] = adj.goodness_of_fit('rmse') if aggregate_results is not None: rep_entry['gof']['rmse'] = test_arrays['rmse'][i, rep_j] except Exception: # Values remain NaN for failed tests pass if aggregate_results is not None: aggregate_results['results'][int(size)].append(rep_entry) except Exception: # Values remain NaN for failed fits pass # Create xarray Dataset data_vars = {} data_vars.update(param_arrays) data_vars.update(test_arrays) coords = { 'sizes': sizes, 'repeats': np.arange(n_repeats) } # Detect stability points using selected method stability_points = None agg_summary = None # Normalize stability_method name if stability_method is None or stability_method.lower() == 'none': # Skip stability detection stability_points = {k: {'size': None, 'index': None, 'cv_at_stability': None, 'smoothed_curve': None, 'method': 'none'} for k in list(param_arrays.keys()) + list(test_arrays.keys())} elif stability_method.lower() == 'detect': # Backward compatibility: 'detect' maps to 'cv' stability_points = self._detect_stability_unified(data_vars, sizes, method='cv', **kwargs) elif stability_method.lower() in ['cv', 'kneedle', 'plateau']: # Use unified detection with specified method stability_points = self._detect_stability_unified(data_vars, sizes, method=stability_method.lower(), **kwargs) elif stability_method.lower() == 'aggregate' and aggregate_results is not None: # Legacy aggregate method agg_summary, _, _ = self._aggregate_and_detect_stability(aggregate_results, sizes, test_list, max_params) # Convert agg_summary to stability_points format used by plotting stability_points = {} # Tests for test in agg_summary.get('tests', {}): inf_idx = agg_summary['tests'][test].get('inflection_index') stability_points[test] = { 'size': int(sizes[inf_idx]) if inf_idx is not None else None, 'index': int(inf_idx) if inf_idx is not None else None, 'cv_at_stability': None, 'smoothed_curve': None, 'method': 'aggregate' } # Params (use params inflection) p_inf_idx = agg_summary['params'].get('inflection_index') stability_points['param_0'] = { 'size': int(sizes[p_inf_idx]) if p_inf_idx is not None else None, 'index': int(p_inf_idx) if p_inf_idx is not None else None, 'cv_at_stability': None, 'smoothed_curve': None, 'method': 'aggregate' } else: raise ValueError(f"Unknown stability_method: {stability_method}. " f"Options: 'cv', 'kneedle', 'plateau', 'aggregate', 'detect', 'none'") # Determine recommended size and primary metric # Priority: RMSE > KS > Chi2 > param_0 recommended_size = None primary_metric = None for metric in ['rmse', 'ks_pvalue', 'chi2_pvalue', 'param_0']: if metric in stability_points and stability_points[metric]['size'] is not None: recommended_size = stability_points[metric]['size'] primary_metric = metric break # If still no recommendation, use largest size if recommended_size is None: recommended_size = int(sizes[-1]) primary_metric = 'max_size' # Get p-values and metrics at stable size for reporting stable_idx = None for metric_name, info in stability_points.items(): if info['size'] == recommended_size: stable_idx = info['index'] break stable_pvalue_ks = None stable_pvalue_chi2 = None stable_rmse = None if stable_idx is not None: if 'ks_pvalue' in data_vars: ks_at_stable = data_vars['ks_pvalue'][stable_idx, :] stable_pvalue_ks = float(np.nanmedian(ks_at_stable)) if 'chi2_pvalue' in data_vars: chi2_at_stable = data_vars['chi2_pvalue'][stable_idx, :] stable_pvalue_chi2 = float(np.nanmedian(chi2_at_stable)) if 'rmse' in data_vars: rmse_at_stable = data_vars['rmse'][stable_idx, :] stable_rmse = float(np.nanmedian(rmse_at_stable)) # Optionally create & save figure figure_path = None if fig_output_path is not None: try: fig = self._create_monte_carlo_figure( data_vars, sizes, plot_type, stability_points, distribution_name=self.distribution_name or str(self.fitted_distribution), max_size=int(sizes[-1]), recommended_size=recommended_size, primary_metric=primary_metric, stable_pvalue_ks=stable_pvalue_ks, stable_pvalue_chi2=stable_pvalue_chi2, stable_rmse=stable_rmse, sampling_method=sampling, stability_method=stability_method ) if fig is not None: fig.savefig(fig_output_path, dpi=150, bbox_inches='tight') figure_path = fig_output_path except Exception as e: warnings.warn(f"Failed to create figure: {e}") figure_path = None import xarray as xr # lazy import — only needed when returning Dataset # Create Dataset (all numeric data only) ds = xr.Dataset( data_vars={name: (['sizes', 'repeats'], array) for name, array in data_vars.items()}, coords=coords, attrs={ 'distribution': self.distribution_name or str(self.fitted_distribution), 'original_data_size': N, 'sampling_method': sampling, 'bins_method': bins, 'stability_method': stability_method, 'stability_points': stability_points, 'recommended_size': recommended_size, 'primary_metric': primary_metric, 'stable_pvalue_ks': stable_pvalue_ks, 'stable_pvalue_chi2': stable_pvalue_chi2, 'stable_rmse': stable_rmse, 'n_repeats': n_repeats, 'min_size': int(sizes[0]), 'max_size': int(sizes[-1]), 'n_sizes': len(sizes), 'figure_path': figure_path, 'created_by': 'MagicAdjuster.monte_carlo_fit' } ) return ds
def _smooth_curve(self, x: np.ndarray, y: np.ndarray, method: str = 'savgol') -> np.ndarray: """ Smooth a curve for stability detection. Parameters ---------- x : np.ndarray X values (sample sizes) y : np.ndarray Y values (metric to smooth) method : str, default='savgol' Smoothing method: 'savgol' (Savitzky-Golay) or 'spline' Returns ------- np.ndarray Smoothed y values Notes ----- Savitzky-Golay filter is preferred for preserving features while smoothing noise. Spline interpolation provides smoother curves but may over-smooth important features. """ # Remove NaN values valid_mask = ~(np.isnan(x) | np.isnan(y)) if np.sum(valid_mask) < 4: return y # Not enough points to smooth x_valid = x[valid_mask] y_valid = y[valid_mask] if method == 'savgol': # Savitzky-Golay filter - preserves features window_length = min(len(x_valid), max(5, len(x_valid) // 3)) if window_length % 2 == 0: window_length -= 1 # Must be odd poly_order = min(3, window_length - 1) try: y_smooth = savgol_filter(y_valid, window_length, poly_order) # Map back to original x positions result = np.full_like(y, np.nan) result[valid_mask] = y_smooth return result except: return y elif method == 'spline': # Spline interpolation - smoother but may over-smooth try: # Use spline with automatic smoothing spl = UnivariateSpline(x_valid, y_valid, s=len(x_valid) * 0.1, k=3) result = np.full_like(y, np.nan) result[valid_mask] = spl(x_valid) return result except: return y else: return y def _kneedle_detection( self, x: np.ndarray, y: np.ndarray, smooth: bool = True, smoothing_method: str = 'savgol' ) -> Tuple[Optional[int], Optional[np.ndarray]]: """ Detect elbow/knee point using the Kneedle algorithm. Based on Satopää et al. (2011): "Finding a 'Kneedle' in a Haystack: Detecting Knee Points in System Behavior" Parameters ---------- x : np.ndarray Sample sizes (must be monotonic) y : np.ndarray Metric values (e.g., RMSE, median parameter) smooth : bool, default=True Whether to smooth the curve before detection smoothing_method : str, default='savgol' Method for smoothing: 'savgol' or 'spline' Returns ------- knee_idx : int or None Index of detected knee point, or None if not found y_smooth : np.ndarray or None Smoothed curve if smoothing was applied, else None Notes ----- The algorithm: 1. Normalizes x and y to [0,1] 2. Computes difference between curve and reference line 3. Finds maximum difference (the "knee") Works best for decreasing metrics (RMSE) or converging parameters. """ # Remove NaN values valid_mask = ~(np.isnan(x) | np.isnan(y)) if np.sum(valid_mask) < 3: return None, None x_valid = x[valid_mask] y_valid = y[valid_mask] # Smooth if requested y_smooth = None if smooth: y_work = self._smooth_curve(x, y, method=smoothing_method) y_work_valid = y_work[valid_mask] y_smooth = y_work else: y_work_valid = y_valid # Normalize to [0, 1] x_norm = (x_valid - x_valid.min()) / (x_valid.max() - x_valid.min() + 1e-10) y_norm = (y_work_valid - y_work_valid.min()) / (y_work_valid.max() - y_work_valid.min() + 1e-10) # Calculate reference line (straight line from start to end) y_ref = np.linspace(y_norm[0], y_norm[-1], len(y_norm)) # Calculate differences (for decreasing curves, use y_ref - y_norm) # For increasing curves, use y_norm - y_ref # Auto-detect curve direction if y_norm[-1] < y_norm[0]: # Decreasing curve (e.g., RMSE) differences = y_ref - y_norm else: # Increasing curve (e.g., p-values sometimes) differences = y_norm - y_ref # Find maximum difference (the knee) if len(differences) == 0: return None, y_smooth knee_idx_local = np.argmax(differences) # Map back to original indices valid_indices = np.where(valid_mask)[0] knee_idx = valid_indices[knee_idx_local] return int(knee_idx), y_smooth def _plateau_detection( self, x: np.ndarray, y: np.ndarray, consecutive_points: int = 3, relative_tolerance: float = 0.01 ) -> Optional[int]: """ Detect plateau using relative gain heuristic. Based on the concept that stability is reached when the relative improvement between consecutive sample sizes becomes negligible. Parameters ---------- x : np.ndarray Sample sizes y : np.ndarray Metric values (medians across repeats) consecutive_points : int, default=3 Number of consecutive points that must satisfy the criterion (L parameter in the heuristic formula) relative_tolerance : float, default=0.01 Maximum acceptable relative change (Δ parameter, default 1%) Returns ------- plateau_idx : int or None Index where plateau starts, or None if not found Notes ----- The algorithm computes relative gain: Δᵢ = |RMSE(nᵢ₋₁) - RMSE(nᵢ)| / RMSE(nᵢ₋₁) Plateau is detected when Δᵢ < threshold for L consecutive points. Works best for decreasing metrics (RMSE) or parameter convergence. """ # Remove NaN values valid_mask = ~(np.isnan(x) | np.isnan(y)) if np.sum(valid_mask) < consecutive_points + 1: return None x_valid = x[valid_mask] y_valid = y[valid_mask] # Calculate relative changes rel_changes = [] for i in range(1, len(y_valid)): if y_valid[i-1] != 0: rel_change = abs(y_valid[i] - y_valid[i-1]) / abs(y_valid[i-1]) rel_changes.append(rel_change) else: rel_changes.append(np.inf) # Find first point where relative changes stay below tolerance # for consecutive_points consecutive measurements for i in range(len(rel_changes) - consecutive_points + 1): window = rel_changes[i:i + consecutive_points] if all(rc < relative_tolerance for rc in window if not np.isinf(rc)): # Map back to original indices valid_indices = np.where(valid_mask)[0] # Return index after which the plateau starts (i+1 because we skip first point) plateau_idx = valid_indices[i + 1] return int(plateau_idx) return None def _detect_stability_unified( self, data_vars: Dict[str, np.ndarray], sizes: np.ndarray, method: str = 'cv', **kwargs ) -> Dict[str, Dict[str, Any]]: """ Unified stability detection supporting multiple methods. Parameters ---------- data_vars : dict Dictionary of variable names to (sizes x repeats) arrays sizes : np.ndarray Sample sizes tested method : str, default='cv' Detection method: - 'cv': Coefficient of variation (original method) - 'kneedle': Kneedle algorithm on median curve - 'plateau': Relative gain heuristic on median curve **kwargs Method-specific parameters: - For 'cv': window_size, cv_threshold - For 'kneedle': smooth, smoothing_method - For 'plateau': consecutive_points, relative_tolerance Returns ------- dict Stability points for each variable with keys: - 'size': Sample size where stability detected (or None) - 'index': Index in sizes array (or None) - 'cv_at_stability': CV value if method='cv' (or None) - 'smoothed_curve': Smoothed median curve if applicable - 'method': Detection method used Notes ----- Different methods work better for different metrics: - 'cv': General purpose, works for all variables - 'kneedle': Best for decreasing metrics (RMSE) or converging parameters - 'plateau': Best for metrics with clear convergence plateaus """ stability_points = {} # Extract kwargs for each method # CV method window_size = kwargs.get('window_size', max(2, len(sizes) // 4)) cv_threshold = kwargs.get('cv_threshold', 0.1) # Kneedle method smooth = kwargs.get('smooth', True) smoothing_method = kwargs.get('smoothing_method', 'savgol') # Plateau method consecutive_points = kwargs.get('consecutive_points', 3) relative_tolerance = kwargs.get('relative_tolerance', 0.01) for var_name, data_array in data_vars.items(): if np.all(np.isnan(data_array)): stability_points[var_name] = { 'size': None, 'index': None, 'cv_at_stability': None, 'smoothed_curve': None, 'method': method } continue # Calculate medians for methods that need them medians = np.array([np.nanmedian(data_array[i, :]) for i in range(len(sizes))]) stable_idx = None smoothed_curve = None cv_at_stability = None if method == 'cv': # Original CV-based method cv_values = [] for i in range(len(sizes)): size_data = data_array[i, :] valid_data = size_data[~np.isnan(size_data)] if len(valid_data) > 1: mean_val = np.mean(valid_data) std_val = np.std(valid_data) cv = std_val / abs(mean_val) if mean_val != 0 else np.inf cv_values.append(cv) else: cv_values.append(np.inf) # Find first point where CV stays below threshold for i in range(len(cv_values) - window_size + 1): window_cvs = cv_values[i:i + window_size] if all(cv < cv_threshold for cv in window_cvs if not np.isinf(cv)): stable_idx = i cv_at_stability = cv_values[i] break elif method == 'kneedle': # Kneedle algorithm on median curve stable_idx, smoothed_curve = self._kneedle_detection( sizes, medians, smooth=smooth, smoothing_method=smoothing_method ) elif method == 'plateau': # Plateau detection on median curve stable_idx = self._plateau_detection( sizes, medians, consecutive_points=consecutive_points, relative_tolerance=relative_tolerance ) else: raise ValueError(f"Unknown stability detection method: {method}") # Store results if stable_idx is not None: stability_points[var_name] = { 'size': int(sizes[stable_idx]), 'index': int(stable_idx), 'cv_at_stability': cv_at_stability, 'smoothed_curve': smoothed_curve, 'method': method } else: stability_points[var_name] = { 'size': None, 'index': None, 'cv_at_stability': None, 'smoothed_curve': smoothed_curve, 'method': method } return stability_points def _detect_stability_points(self, data_vars: Dict[str, np.ndarray], sizes: np.ndarray) -> Dict[str, Dict[str, Any]]: """ Detect stability points for each variable using coefficient of variation. Legacy method kept for backward compatibility. Use _detect_stability_unified instead. A variable is considered stable when its coefficient of variation (std/mean) across repeats becomes consistently low. """ return self._detect_stability_unified(data_vars, sizes, method='cv') def _aggregate_and_detect_stability( self, results: Dict[str, Any], sizes: np.ndarray, test_list: Optional[List[str]] = None, max_params: Optional[int] = None, ) -> Tuple[Dict[str, Any], Dict[int, List[List[float]]], Dict[int, List[float]]]: """ Aggregate monte carlo `results` and detect stability (CPS) for tests and parameters. Returns (summary, param_values_per_size, param_medians) """ summary: Dict[str, Any] = {'sizes': sizes.tolist(), 'tests': {}, 'params': {}} # If test_list not provided, infer from first non-empty rep if test_list is None: inferred = set() for size in sizes: reps = results.get('results', {}).get(int(size), []) if len(reps) > 0: for rep in reps: gof = rep.get('gof', {}) inferred.update(gof.keys()) break test_list = sorted(list(inferred)) if inferred else [] # If max_params not provided, infer from first available params if max_params is None: inferred_max = 0 for size in sizes: reps = results.get('results', {}).get(int(size), []) for rep in reps: params = rep.get('params') if params is not None: inferred_max = max(inferred_max, len(params)) if inferred_max > 0: break max_params = int(inferred_max) # For each test, compute list of p-values (or metric) per size for test in test_list: values_per_size = [] for size in sizes: reps = results['results'].get(int(size), []) vals = [] for rep in reps: gof = rep.get('gof', {}) entry = gof.get(test) if entry is None: continue # chi2 and ks return dict with 'p_value', rmse returns scalar if isinstance(entry, dict): p = entry.get('p_value') if 'p_value' in entry else None if p is not None: vals.append(float(p)) else: stat = entry.get('chi2_statistic') or entry.get('ks_statistic') if stat is not None: vals.append(float(stat)) else: try: vals.append(float(entry)) except Exception: continue values_per_size.append(vals) # median per size medians = np.array([np.median(v) if len(v) > 0 else np.nan for v in values_per_size]) # detect inflection / stability: look for first index where moving window is stable tol = 0.1 window = 4 inflection_idx = None if medians.size >= window: for j in range(0, len(medians) - window + 1): window_vals = medians[j:j+window] if np.all(np.isfinite(window_vals)) and (np.nanmax(window_vals) - np.nanmin(window_vals) <= tol): inflection_idx = j + (window - 1) break inflection_size = int(sizes[inflection_idx]) if inflection_idx is not None else None summary['tests'][test] = { 'values_per_size': values_per_size, 'medians': medians.tolist(), 'inflection_index': int(inflection_idx) if inflection_idx is not None else None, 'inflection_size': inflection_size, } # Aggregate parameters: collect per-parameter lists per size and compute medians param_medians = {p: [] for p in range(max_params)} param_values_per_size = {p: [] for p in range(max_params)} for size in sizes: reps = results['results'].get(int(size), []) cols = {p: [] for p in range(max_params)} for rep in reps: params = rep.get('params') if params is None: continue for p in range(max_params): if p < len(params): try: cols[p].append(float(params[p])) except Exception: pass for p in range(max_params): param_values_per_size[p].append(cols[p]) param_medians[p].append(np.median(cols[p]) if len(cols[p]) > 0 else np.nan) summary['params']['values_per_size'] = param_values_per_size summary['params']['medians'] = {p: np.array(v).tolist() for p, v in param_medians.items()} # Detect inflection for parameters (use first parameter as representative) if max_params > 0: p0_meds = np.array(param_medians[0]) tol_p = 1e-3 * (np.nanmax(p0_meds) - np.nanmin(p0_meds) if np.nanmax(p0_meds) != np.nanmin(p0_meds) else 1.0) inflection_idx_p = None if p0_meds.size >= window: for j in range(0, len(p0_meds) - window + 1): window_vals = p0_meds[j:j+window] if np.all(np.isfinite(window_vals)) and (np.nanmax(window_vals) - np.nanmin(window_vals) <= tol_p): inflection_idx_p = j + (window - 1) break summary['params']['inflection_index'] = int(inflection_idx_p) if inflection_idx_p is not None else None summary['params']['inflection_size'] = int(sizes[inflection_idx_p]) if inflection_idx_p is not None else None else: summary['params']['inflection_index'] = None summary['params']['inflection_size'] = None return summary, param_values_per_size, param_medians def _create_monte_carlo_figure( self, data_vars: Dict[str, np.ndarray], sizes: np.ndarray, plot_type: str, stability_points: Optional[Dict[str, Dict[str, Any]]] = None, distribution_name: Optional[str] = None, max_size: Optional[int] = None, recommended_size: Optional[int] = None, primary_metric: Optional[str] = None, stable_pvalue_ks: Optional[float] = None, stable_pvalue_chi2: Optional[float] = None, stable_rmse: Optional[float] = None, sampling_method: Optional[str] = None, stability_method: Optional[str] = None, ): """ Create 2x3 summary figure with enhanced information. Parameters ---------- data_vars : dict Dictionary of variable names to (sizes x repeats) arrays sizes : np.ndarray Sample sizes tested plot_type : str 'series' or 'boxplots' stability_points : dict, optional Stability information for each variable distribution_name : str, optional Name of fitted distribution max_size : int, optional Maximum sample size tested recommended_size : int, optional Recommended sample size from stability analysis primary_metric : str, optional Primary metric used for recommendation stable_pvalue_ks : float, optional KS p-value at stable size stable_pvalue_chi2 : float, optional Chi-square p-value at stable size stable_rmse : float, optional RMSE at stable size Returns ------- matplotlib.figure.Figure or None Generated figure """ try: import matplotlib.pyplot as plt except ImportError: # pragma: no cover return None # Select up to first 3 parameter variables param_names = sorted([k for k in data_vars if k.startswith('param_')])[:3] if not param_names: return None # Nothing meaningful to show # Preferred order for test panels (only keep those present) preferred_tests = ["ks_pvalue", "chi2_pvalue", "rmse"] test_names = [t for t in preferred_tests if t in data_vars][:3] # Always build a 2x3 grid; hide unused axes fig, axes = plt.subplots(2, 3, figsize=(16, 8), sharex=True) axes_flat = axes.ravel() def _panel(ax, data: np.ndarray, title: str, var_name: str): """Create individual panel with data and stability markers.""" # Plot main data if plot_type == 'boxplots': box_data = [data[i, ~np.isnan(data[i, :])] for i in range(len(sizes))] if len(sizes) > 1: width = max(1, (sizes[1] - sizes[0]) * 0.6) else: width = 5 bp = ax.boxplot(box_data, positions=sizes, widths=width, patch_artist=True) for patch in bp['boxes']: patch.set_facecolor('lightblue') patch.set_alpha(0.7) else: # series med = np.nanmedian(data, axis=1) q25 = np.nanpercentile(data, 25, axis=1) q75 = np.nanpercentile(data, 75, axis=1) ax.plot(sizes, med, 'o-', lw=2, label='Median', color='steelblue', markersize=5) ax.fill_between(sizes, q25, q75, alpha=0.3, label='IQR (25-75%)', color='steelblue') # Add smoothed curve if available if stability_points and var_name in stability_points: sp = stability_points[var_name] if sp.get('smoothed_curve') is not None: smoothed = sp['smoothed_curve'] valid_mask = ~np.isnan(smoothed) if np.sum(valid_mask) > 0: ax.plot(sizes[valid_mask], smoothed[valid_mask], '--', lw=1.5, label='Smoothed', color='orange', alpha=0.8) ax.set_title(title, fontsize=11, fontweight='bold') ax.grid(True, alpha=0.3, linestyle=':', linewidth=0.5) # Stability vertical lines legend_items = [] # Red line: primary metric stability point (shown in all panels) if recommended_size is not None: try: is_primary = (var_name == primary_metric) label = f"Stable: n={recommended_size}" if is_primary else f"RMSE stable: n={recommended_size}" vline_red = ax.axvline(recommended_size, color='red', linestyle='--', linewidth=2, alpha=0.7, zorder=10) legend_items.append((vline_red, label)) except Exception: pass # Gray line: this metric's own stability point (if different from primary) if stability_points and var_name in stability_points and var_name != primary_metric: sp = stability_points[var_name] if sp and sp.get('size') is not None and sp['size'] != recommended_size: try: vline_gray = ax.axvline(sp['size'], color='gray', linestyle=':', linewidth=1.5, alpha=0.6, zorder=9) legend_items.append((vline_gray, f"{var_name} stable: n={sp['size']}")) except Exception: pass # Draw horizontal alpha threshold for p-value panels (KS / Chi2) if var_name in ('ks_pvalue', 'chi2_pvalue'): try: hline = ax.axhline(0.05, color='darkred', linestyle=':', linewidth=1.5, alpha=0.7, zorder=9) legend_items.append((hline, 'α = 0.05')) # Add p-value at stable point if available if var_name == 'ks_pvalue' and stable_pvalue_ks is not None: ax.text(0.98, 0.98, f'p-value @ stable: {stable_pvalue_ks:.3f}', transform=ax.transAxes, fontsize=9, verticalalignment='top', horizontalalignment='right', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) elif var_name == 'chi2_pvalue' and stable_pvalue_chi2 is not None: ax.text(0.98, 0.98, f'p-value @ stable: {stable_pvalue_chi2:.3f}', transform=ax.transAxes, fontsize=9, verticalalignment='top', horizontalalignment='right', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) except Exception: pass # Add RMSE value at stable point if var_name == 'rmse' and stable_rmse is not None: try: ax.text(0.98, 0.98, f'RMSE @ stable: {stable_rmse:.4f}', transform=ax.transAxes, fontsize=9, verticalalignment='top', horizontalalignment='right', bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.5)) except Exception: pass # Create legend if plot_type == 'series' or legend_items: handles, labels = ax.get_legend_handles_labels() for item_handle, item_label in legend_items: handles.append(item_handle) labels.append(item_label) if handles: ax.legend(handles, labels, frameon=True, fontsize=8, loc='best', framealpha=0.9) # Parameter panels (row 0) for col, pname in enumerate(param_names): ax = axes[0, col] _panel(ax, data_vars[pname], pname.replace('_', ' ').title(), pname) ax.set_ylabel('Parameter Value', fontsize=10) # Hide unused param axes in row 0 for col in range(len(param_names), 3): axes[0, col].axis('off') # Test panels (row 1) for col, tname in enumerate(test_names): ax = axes[1, col] label = tname.replace('_', ' ').title() _panel(ax, data_vars[tname], label, tname) ax.set_xlabel('Sample Size (n)', fontsize=10) if col == 0: ax.set_ylabel('Test Value', fontsize=10) # Ensure p-value panels show full range [0, 1] if 'pvalue' in tname: try: ax.set_ylim(-0.05, 1.05) except Exception: pass # RMSE should start at 0 elif tname == 'rmse': try: ax.set_ylim(bottom=0) except Exception: pass for col in range(len(test_names), 3): axes[1, col].axis('off') # Shared formatting for ax in axes_flat: if ax.has_data(): ax.tick_params(axis='both', labelsize=9) # Create informative title title_parts = ['Monte Carlo Stability Analysis'] if distribution_name: title_parts.append(f'Distribution: {distribution_name}') if max_size: title_parts.append(f'Max n: {max_size}') if sampling_method: title_parts.append(f'Sampling: {sampling_method}') if stability_method: method_display = stability_method.capitalize() title_parts.append(f'Method: {method_display}') if recommended_size and primary_metric: metric_display = primary_metric.replace('_', ' ').upper() title_parts.append(f'Stable @ n={recommended_size} ({metric_display})') title = ' | '.join(title_parts) fig.suptitle(title, fontsize=13, fontweight='bold') fig.tight_layout(rect=(0, 0, 1, 0.96)) return fig
## TO-DO: # When set a new distribution to an already used variable, it will override the distribution form previous variable # For example: # fitted_data_weibull = data.fit_distribution('weibull') # fitted_data_norm = data.fit_distribution('norm') # # The distribution from "fitted_data_weibull" will return the norm dist.