Source code for pytesmo.interpolate.dctpls

"""
The following python implementation of the DCT-PLS algorithm (Garcia 2010)
for python 3 and is based on:

- Original MATLAB implementation: https://www.biomecardio.com/matlab/smoothn.m
- Python 2 implementation: https://github.com/profLewis/geogg122

References:
-----------
Garcia, D. (2010) 'Robust smoothing of gridded data in one and higher
dimensions with missing values',
Computational Statistics & Data Analysis, 54(4), pp. 1167–1178.
Available at: https://doi.org/10.1016/j.csda.2009.09.020

TODO:
 - [] : average leverage for h from matlab?
 - [] : add field for input uncertainties? Propagate through interpolation
"""

import sys
import warnings
import scipy.optimize as opt
import numpy as np
from typing import Literal, Optional, Union, Tuple, Annotated
from dataclasses import dataclass
from scipy.fftpack import dct, idct
from scipy.ndimage import distance_transform_edt
from collections import OrderedDict
from collections.abc import Iterable
import logging

from pytesmo.utils import pytesmolog, loglevel


[docs]@dataclass class ValueRange: min: float max: float
logger = pytesmolog streamHandler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter)
[docs]def calc_init_guess( y: np.ndarray, mask: Optional[np.ndarray] = None, coeff: Optional[float] = 0.1, sampling: Optional[Union[float, Iterable[float]]] = None, return_distances: Optional[bool] = True ) -> Tuple[np.ndarray, Union[np.ndarray, None]]: """ Initial smoothing guess. This also fills any gaps in the data using a a nearest neighbour search. Later on, the initial results will be improved using the DCT-PLS. The sampling parameter allows a different unit for each dimension. Parameters ---------- y: np.ndarray Data (with gaps) to interpolate mask: np.ndarray, optional (default: None) Boolean array indicating missing values coeff: float, optional (default: 0.1) Percent relative (0-1) of DCT coefficients to use to generate the initial guess. By default, the first 10% are used. sampling : float | Iterable[float] | None, optional (default: None) Spacing of elements along each dimension. If a sequence, must be of length equal to the input rank; if a single number, this is used for all axes. If not specified, a grid spacing of unity is implied. ------ The default sampling does not prefer any axis (i.e., 1 for all). We can e.g. pass (0.5, 1, 1) for 3D data, to prefer the temporal neighbours over spatial neighbours (assuming that dim=0 refers to different time stamps). return_distances: bool, optional (default: True) Return Euclidean distance to the nearest valid data point in addition to initial interpolation and smoothing result. Returns ------- z: np.ndarray Initial guess of smoothened and interpolated data dist: np.ndarray or None Euclidean distance to the nearest observation (only if mask is passed). Only if `return_distances=True` (otherwise this is None) """ z = y.copy() dist = None if mask is not None: if mask.shape != z.shape: raise ValueError("Data and missing values have different shapes.") if np.any(mask): # For the initial guess, fill gaps using a NN interpolation ret = distance_transform_edt( mask.astype(int), return_distances=return_distances, return_indices=True, sampling=np.full(z.ndim, 1) if sampling is None else sampling) if return_distances is True: dist, nn_inds = ret dist = dist.astype(np.float32) else: nn_inds = ret z[mask] = z[tuple(ind[mask] for ind in nn_inds)] # coarse fast initial smoothing z = dctNd(z, inverse=False) z_coarse = np.full(z.shape, 0.) if coeff > 1 or coeff <= 0: raise ValueError("Choose `coeff` as 0 < coeff <= 1") shape_reduced = np.ceil(np.array(z.shape) * coeff).astype(int) loc = tuple([slice(None, s) for s in shape_reduced]) z_coarse[loc] = z[loc] z = dctNd(z_coarse, inverse=True) return z.astype(y.dtype), dist
[docs]def dctNd(data: np.ndarray, type: Optional[int] = 2, norm: Optional[Union[str, None]] = 'ortho', inverse: Optional[bool] = False) -> np.ndarray: """ Applies discrete cosine transform function to up to 3 dimensions of data. Nans in data are ignored Parameters: ---------- data: np.ndarray 1, 2, or 3-dimensional data to calculate DCT for. type: int, optional (default: 2) DCT Type to use (see scipy DCT docs) norm: str or None, optional (default: 'ortho') Normalization mode for DCT (see scipy DCT docs) inverse: bool, optional (default: False) Use the inverse function. """ func = idct if inverse else dct nd = len(data.shape) kwargs = dict(norm=norm, type=type) if nd == 1: dctNd = func(data, axis=0, **kwargs) elif nd == 2: dctNd = func(func(data, **kwargs, axis=0), **kwargs, axis=1) elif nd == 3: dctNd = func( func(func(data, axis=0, **kwargs), axis=1, **kwargs), axis=2, **kwargs) else: raise ValueError( f"Expected 1, 2, or 3 dimensional dat, but got {nd} dims.") return dctNd
[docs]def gcv(p: float, Lambda: np.ndarray, DCTy: np.ndarray, y: np.ndarray, smoothOrder: Optional[int] = 2, Wtot: Optional[np.ndarray] = None, score_only: Optional[bool] = False): """ Find the best smoothing parameter. I.e. the parameter that minimizes the generalised cross validation (GCV) score. =================================================== smooth = argmin(GCV) GCV(s) = (wRSS / (n - n_{miss})) / (1 - Tr(H) / n)² =================================================== Parameters: ---------- p: float Defines the order of smoothing for parameter as 10 ** p Lambda: np.ndarray Diagonal Eigenvalue matrix of D defined by Yueh (2005) with: \\lambda_i = -2 + 2*cos( (i-1) * \\pi / n) DCTy: np.ndarray Output array of dctNd y: np.ndarray Input data array smoothOrder: int, optional (default: 2) Exponential of lambda used for smoothing. Either 1 or 2 Wtot: np.ndarray, optional (default: None) Weights for each y value to use (same shape as y). Elements in Wtot for which the counterpart in y is NaN are ignored. When None are passed, then the unweighted implementation, which is much faster, will be used. score_only: bool, optional (default: False) Only return the score for p, not Gamma and TrH (for optimisation) This is required for the bounded minimization which requires a single return value. Returns: -------- score: np.ndarray or None GCV score. 0, 1 or 2 dimensional ... dim=Lambda.ndim-1 smooth: float or None 10**p, the smoothing parameter Gamma: np.ndarray or None Gamma when applying s TrH: np.ndarray or None Track of Hat when applying s """ if smoothOrder not in [1, 2]: raise ValueError(f"SmoothOrder must be either 1 or 2 but " f"{smoothOrder} was passed.") smooth = 10 ** p # todo: Use Eq 12 for large n? Gamma = (1. / (1 + smooth * Lambda ** smoothOrder)) mask_finite = np.array(np.isfinite(y)).astype(bool) nfin = np.count_nonzero(mask_finite) nmiss = mask_finite.size - nfin # Residual sum-of-squares calculation if Wtot is None: # very much faster: does not require any inverse DCT RSS = np.linalg.norm(DCTy * (Gamma - 1.)) ** 2 else: # take account of the weights to calculate RSS: yhat = dctNd(Gamma * DCTy, inverse=True) RSS = np.linalg.norm( np.sqrt(Wtot[mask_finite]) * (y[mask_finite] - yhat[mask_finite])) ** 2 if not (yhat.ndim == y.ndim): raise ValueError("`yhat` and `y` must have the same shape") TrH = np.sum(Gamma) score = RSS / float(nfin) / (1. - TrH / float(nmiss + nfin)) ** 2 score = score.astype(np.float32) if score_only: return score.flatten() else: return score.flatten(), smooth, Gamma, TrH
[docs]def sumnd(x): """ Apply numpy sum over all available dimensions Parameters ---------- x: np.ndarray 1, 2 or 3-dimensional data array Returns ------- sum: float Sum over all available dimensions """ if x.ndim == 1: return x elif x.ndim == 2: return np.sum(x, axis=1) elif x.ndim == 3: return np.sum(x, axis=1) else: raise NotImplementedError("sumnd is only available for 1,2,3 dim data")
[docs]def RobustWeights(residuals, h, mask=None, method: Literal["cauchy", "talworth", "bisquare"] = 'cauchy', simple=True): """ Compute weights from residuals. Parameters ---------- residuals: np.ndarray Residuals between previous and current smoothing run. Larger residuals with result in larger weights. NaNs will get weight 0. h: float todo mask: np.ndarray, optional (default: None) Boolean array of same shape as residuals. True indicates residuals to ignore when creating new weights. By default, all residuals are used. method: str, optional (default: 'cauchy') A method to compute the weights. One of: "cauchy", "talworth", "bisquare" todo: Note that at the moment, only "cauchy" is implemented. simple: bool, optional (default: True) Use numpy absolute to compute the residuals. Returns ------- weights: np.ndarray Weights based on passed residuals and the chosen method. Nans have weight 0. """ _methods = ['cauchy', 'talworth', 'bisquare'] def sabs(x): return np.sqrt(sumnd(np.abs(x) ** 2)) if simple: f = np.abs else: f = sabs if mask is None: mask = np.full(residuals.shape, True) AD = sabs(residuals[~mask] - np.median(residuals[~mask])) if residuals.ndim == 3: res = residuals.flatten() MAD = np.median(AD) # median absolute deviation u = f(res / (1.4826 * MAD)) / np.sqrt(1 - h) # studentized residuals u = u.reshape(residuals.shape) else: MAD = np.median(AD) # median absolute deviation u = f(residuals / (1.4826 * MAD)) / np.sqrt(1 - h) # studentized residuals if method not in _methods: raise ValueError(f"{method} is not a supported method: {_methods}") if method.lower() == 'cauchy': c = 2.385 weights = 1. / (1 + (u / c) ** 2) else: raise NotImplementedError("Only 'cauchy' weights are implemented at " "at the moment") weights[np.isnan(weights)] = 0 return weights
[docs]def smoothn( data: np.ndarray, smooth: Optional[float] = None, axis: Optional[Union[int, Tuple[int, ...]]] = None, data_weights: Optional[np.ndarray] = None, smoothOrder: Optional[int] = 2, init_guess: Optional[np.ndarray] = None, isrobust: Optional[bool] = True, MaxIter: Optional[int] = 100, TolZ: Annotated[float, ValueRange(0.0, 1.0)] = 0.001, gap_value: Optional[Union[float, int]] = np.nan, exclusion_mask: Optional[np.ndarray] = None, data_sampling: Optional[Union[float, Tuple[float, ...]]] = 1., debug_mode: Optional[bool] = False, return_stats: Optional[Tuple[str, ...]] = None, ) -> (np.ndarray, float, int, dict): """ Robust spline smoothing for 1-D to 3-D data. A fast, automatized and robust discretised smoothing spline for data of any dimension. When using this algorithm, refer to: Garcia, D. (2010) 'Robust smoothing of gridded data in one and higher dimensions with missing values', Computational Statistics & Data Analysis, 54(4), pp. 1167–1178. Available at: https://doi.org/10.1016/j.csda.2009.09.020 Parameters --------- data: np.ndarray 1,2 or 3-dimensional array of values to smoothen. data gaps to fill should have the value given as `gap_value`. smooth: float, optional (default: None) Smoothing parameter. If given, smooth must be a real positive scalar. The larger smooth is, the smoother the output will be. If None is passed smooth is automatically determined from data by minimising the generalized cross-validation (GCV) score. axis: int or tuple[int,...], optional (default: None) Axes along which the smoothing is applied. If not given, smoothing is applied along all axes of dat. data_weights: np.ndarray, optional (default: None) Weight (normally between 0 and 1) for each value in dat. Weights must be given as array of same shape as dat. Weight 0 would mean that a value is ignored when fitting the model. NaNs in data will be assigned the weight 0. smoothOrder: int, optional (default: 2) Exponential of lambda and gamma used for smoothing. 1 would be a linear interpolation init_guess: np.ndarray, optional (default: None) First guess for the interpolated data, resp. initial value for the iterative process. Must have the same shape as data if passed. If z0 / initial guess is not passed, it will be generated from the passed data. isrobust: bool, optional (default: False) Apply robust weights (not affected by outliers) MaxIter: Maximum number of iterations allowed (default = 100) TolZ: Termination tolerance on Z (default = 1e-3) TolZ must be in ]0,1[ exclusion_mask: np.ndarray, optional (default: None) A boolean mask of elements in data that are excluded from calculating the smoothing function. Must have the same shape as data. Elements in data where the corresponding exclusion mask is True will still be considered in the interpolation (so that the original distances and neighbourhoods are retained), but in the final output they will be removed again and replaced with NaNs. data_sampling: int or tuple, optional (default: 1) Sampling unit for each dimension of ax. If a tuple is given, it must be the same length as the number of dimensions of dat. If a single number is given, it is applied to all dimensions. debug_mode: bool, optional (default: False) All debug messages are logged by the pytesmo logger (logging.getLogger('pytesmo')). If this setting is activated, we add a handler to log to stdout for the DCTPLS function to print the debug messages. return_stats: tuple, optional (default: None) Select which side products should be kept and returned. Removing some from the list will reduce memory usage. - 'initial_guess': Return the initial guess for smoothing - 'euclidean_distance': Return measure for the gap size (only when exclusion mask is set). - 'final_weights': How much weight was assigned to each data element in the end. - gcv_score: Score of the GCV Returns ------- z: np.ndarray Smoothed and interpolated version of input data smooth: float Final parameter that was used exit_flag: int 1: ok 2: less than 2 data samples 3: Inner loop did not converge stats: dict Side products that were selected in `return_stats` """ # Make a copy of the data to avoid changing the original data data = np.array(data, copy=True) def _enter_debug_mode(): logger.setLevel('DEBUG') logger.addHandler(streamHandler) def _exit_debug_mode(): logger.setLevel(loglevel) logger.handlers.remove(streamHandler) if debug_mode: _enter_debug_mode() exit_flag = 1 # ok stats = OrderedDict([ ('initial_guess', None), ('euclidean_distance', None), ('final_weights', None), ('gcv_score', None), ]) if return_stats is None: return_stats = [] logger.debug(f"Data has shape: {data.shape}") # setting up the weights arrays if data_weights is not None: weights = data_weights.copy() if weights.shape != data.shape: raise ValueError(f"Data and weights must have the same shape: " f"{data.shape} vs. {weights.shape}") weights = (weights / np.max(weights)) # normalise logger.debug(f"Weights are found with shape {weights.shape} and " f"normalised with factor {weights.max()}") else: weights = np.ones(data.shape) logger.debug(f"No weights are found. " f"Using 1s with shape {weights.shape}") weights = weights.astype(np.float32) n_all = data.size # Handle case of insufficient data _thres = 2 if n_all < _thres: logger.warning(f"Less than {_thres} values to interpolate passed.") z = None exit_flag = 2 if debug_mode: _exit_debug_mode() return z, smooth, exit_flag, stats logger.debug(f"Number of elements: {n_all}, which is larger than " f"threshold {_thres}") # set data to 0 where we don't want to use it isfinite: np.ndarray = \ (np.isfinite(data) if not np.isfinite(gap_value) else data != gap_value).astype(bool) & \ (True if exclusion_mask is None else ~exclusion_mask) # arbitrary values for missing y-data data[~isfinite] = 0 weights[~isfinite] = 0 # also set the weights to 0 if np.any(weights < 0): raise ValueError('Weights must all be >=0') logger.debug( f"{len(isfinite)} elements in data are NaN and set arbitrarily. " f"Weights for those elements set to 0 accordingly") isweighted = np.any(weights != 1) logger.debug(f"isweighted status to be applied is: {isweighted}") autosmooth = True if smooth is None else False logger.debug(f"autosmooth status to be applied is: {autosmooth}") # Dimensions that the algorithm is applied over axis = np.arange(data.ndim) if axis is None else axis logger.debug(f"Dimension of data are: {data.ndim}") di = 1 # for equally spaced data Lambda = np.full(data.shape, 0.).astype(data.dtype) for i, ax in enumerate(axis): logger.debug( f"Lambda (shape {Lambda.shape}) MEAN after {i} " f"iterations: {Lambda.mean()}" ) n = int(data.shape[ax]) _lambda = 2 - 2 * np.cos(np.pi * (np.arange(1, n + 1) - 1.) / n) _lambda = _lambda / (di ** 2) _shp = [n if a == ax else 1 for a in axis] Lambda += _lambda.astype(data.dtype).reshape(_shp) del _lambda logger.debug(f"Lambda MEAN after scaling: {Lambda.mean()}") hMin, hMax = 1e-6, 0.99 ndim = data.ndim # np.linalg.matrix_rank(data) if smoothOrder == 1: sMinBnd = (1 / hMax ** (2 / ndim) - 1) / 4 sMaxBnd = (1 / hMin ** (2 / ndim) - 1) / 4 else: sMinBnd = (((1 + np.sqrt(1 + 8 * hMax ** (2 / ndim))) / 4 / hMax ** (2 / ndim)) ** 2 - 1) / 16 sMaxBnd = (((1 + np.sqrt(1 + 8 * hMin ** (2 / ndim))) / 4 / hMin ** (2 / ndim)) ** 2 - 1) / 16 logger.debug( f"sBands start with min={sMinBnd}, max={sMaxBnd} " f"for data with ndim: {ndim}" ) if init_guess is not None: # initial guess z0 = init_guess.copy() logger.debug( f"z from custom passed initial guess init_guess={z0.shape}") else: if isweighted: s = data_sampling if isinstance( data_sampling, Iterable) else tuple( np.full(data.ndim, data_sampling)) z0, dist = calc_init_guess(data, mask=~isfinite, sampling=s) logger.debug(f"z from from initial guess init_guess={z0.shape}") else: z0 = np.full(data.shape, 0).astype(data.dtype) logger.debug(f"z from from zeros init_guess={z0.shape}") if 'initial_guess' in return_stats: stats['initial_guess'] = z0.copy() z = z0.copy() tol = 1. errp = 0.1 # Error on p. Smoothness parameter smooth = 10^p # Relaxation factor RF: to speedup convergence RF = 1 + 0.75 * bool(isweighted) # Main iterative process p_best = None Wtot = weights.copy() score = None for i in range(3): # RobustIterativeProcess, 3 times logger.debug( f"RobustIterativeProcess {i} -> tol={tol}, errp={errp}, RF={RF}") j = 0 while (tol > TolZ) and j < MaxIter: logger.debug( f"RobustIterativeProcess {i} -> Inner Loop {j}: tol={tol}") assert data.shape == z.shape, "data and Z have differnt shapes" logger.debug( f"RobustIterativeProcess {i} -> " f"data - z={np.sum(np.abs(data - z))}" ) DCTdat = dctNd(Wtot * (data - z) + z, inverse=False) logger.debug( f"RobustIterativeProcess {i} -> " f"Inner Loop {j}: DCTdat mean ={np.nanmean(DCTdat)}" ) # Wtot = None if not weighted else Wtot # find the optimal smoothing order for the gcv score w = None if (Wtot.sum() / n_all) >= 1. else Wtot args = (Lambda, DCTdat, data, smoothOrder, w) if autosmooth: # to reduce the number of iterations if (j == 0) or np.log2(j).is_integer(): p_best, fval, ierr, numfunc = \ opt.fminbound( gcv, full_output=True, xtol=errp, x1=np.log10(sMinBnd), x2=np.log10(sMaxBnd), args=(*args, True)) p_best = p_best.astype(data.dtype) logger.debug( f"RobustIterativeProcess {i} -> " f"Inner Loop {j}: FminBound found best p={p_best}" ) logger.debug( f"RobustIterativeProcess {i} -> " f"Inner Loop {j}: Optimise function mid value: {fval}" ) logger.debug( f"RobustIterativeProcess {i} -> " f"Inner Loop {j}: Optimise function error flag: {ierr}" ) logger.debug( f"RobustIterativeProcess {i} -> " f"Inner Loop {j}: Optimise function calls: {numfunc}" ) logger.debug( f"RobustIterativeProcess {i} -> " f"Inner Loop {j}: Autosmooth found best p={p_best}" ) else: p_best = np.log(smooth) / np.log(10) logger.debug( f"autosmooth OFF with p_best={p_best} as initial guess") # calculate smooth, Gamma and TrH for p_best score, smooth, Gamma, TrH = gcv(p_best, *args, score_only=False) logger.debug( f"RobustIterativeProcess {i} -> Current smooth={smooth}") z = RF * dctNd(Gamma * DCTdat, inverse=True) + (1 - RF) * z logger.debug( f"RobustIterativeProcess {i} -> " f"Gamma shape/mean: {Gamma.shape}/{Gamma.mean()}" ) logger.debug( f"RobustIterativeProcess {i} -> " f"z (shape/mean)={z.shape}/{z.mean()}" ) # if no weighted/missing data => tol=0 (no iteration) dz = z[isfinite] - z0[isfinite] tol = isweighted * np.linalg.norm(dz) / np.linalg.norm(z[isfinite]) del dz logger.debug(f"RobustIterativeProcess {i} -> tol={tol}") z0 = z.copy() # re-initialization logger.debug(f"RobustIterativeProcess {i} -> Reinitialise with z") j += 1 logger.debug(f"--------------------------------------------") logger.debug(f"RobustIterativeProcess finished after i={i} " f"tol={tol}") if i >= MaxIter and (tol > TolZ): exit_flag = 3 warnings.warn(f"Inner loop DID NOT CONVERGE after {i} iterations") if isrobust: # -- Robust Smoothing: iteratively re-weighted process if smoothOrder == 1: h = 1 / np.sqrt(1 + 4 * smooth / di ** (2 ** smoothOrder)) else: h = np.sqrt(1 + 16 * smooth / di ** (2 ** smoothOrder)) h = np.sqrt(1 + h) / np.sqrt(2) / h logger.debug(f"Robust smoothing with h={h} (rank={ndim})") # --- take robust weights into account Wtot = weights * RobustWeights( residuals=data - z, h=h, mask=~isfinite, method="cauchy", simple=True) logger.debug(f"Wtot={Wtot.shape}") # --- re-initialize for another iterative weighted process isweighted = True tol = 1 # --- else: logger.debug(f"Not Robust process, escape loop.") break if 'final_weights' in return_stats: stats['final_weights'] = Wtot if 'gcv_score' in return_stats: stats['gcv_score'] = score # Warning messages # --- if autosmooth: if abs(np.log10(smooth) - np.log10(sMinBnd)) < errp: warnings.warn( f'{smooth}: the LOWER bound for smooth has been reached. ' 'Put smooth as an input variable if required.') elif abs(np.log10(smooth) - np.log10(sMaxBnd)) < errp: warnings.warn( f'{smooth}: the UPPER bound for smooth has been reached. ' 'Put smooth as an input variable if required.') # apply the fill values back onto the interpolated data for excluded points if exclusion_mask is not None: z[exclusion_mask] = gap_value if debug_mode: _exit_debug_mode() return z, smooth, exit_flag, stats