Source code for pyriemann.geometry.covariance

from functools import wraps
import warnings

from array_api_compat import array_namespace as get_namespace, device as xpd
from array_api_extra import expand_dims
from scipy.stats import chi2

from ._backend import (
    apply_xp_cov,
    diag_indices,
    hann_window,
    fast_mcd,
    ledoit_wolf,
    oas,
)
from ._check import check_function, check_init, check_weights
from ._docs import deprecated
from .base import ctranspose, _vectorize_nd
from .distance import distance_mahalanobis
from .test import is_real_type, is_square


def _cov(X, **kwds):
    """Covariance matrix estimator."""
    xp = get_namespace(X)
    return apply_xp_cov(xp.cov, X, **kwds)


def _corr(X, **kwds):
    """Correlation coefficient matrix estimator."""
    xp = get_namespace(X)
    return apply_xp_cov(xp.corrcoef, X, **kwds)


def _complex_estimator(func):
    """Decorator to extend a real-valued covariance estimator to complex data.

    Applied to a real-valued covariance estimator, this decorator allows to
    estimate complex covariance matrices from complex-valued multi-channel
    time-series. See Eq.(4) in [1]_.

    Parameters
    ----------
    func : callable
        Real-valued covariance estimator.

    Returns
    -------
    output : callable
        Complex-valued covariance estimator.

    Notes
    -----
    .. versionadded:: 0.6
    .. versionchanged:: 0.12
        Add support for NumPy and PyTorch.

    References
    ----------
    .. [1] `Enhanced Covariance Matrix Estimators in Adaptive Beamforming
        <https://doi.org/10.1109/ICASSP.2007.366399>`_
        R. Abrahamsson, Y. Selen and P. Stoica. 2007 IEEE International
        Conference on Acoustics, Speech and Signal Processing, Volume 2, 2007.
    """
    @wraps(func)
    def wrapper(X, **kwds):
        xp = get_namespace(X)
        iscomplex = not is_real_type(X)
        if iscomplex:
            n_channels = X.shape[-2]
            X = xp.concat((xp.real(X), xp.imag(X)), axis=-2)
        cov = func(X, **kwds)
        if iscomplex:
            cov = cov[..., :n_channels, :n_channels] \
                + cov[..., n_channels:, n_channels:] \
                + 1j * (cov[..., n_channels:, :n_channels]
                        - cov[..., :n_channels, n_channels:])
        return cov
    return wrapper


@_complex_estimator
def _lwf(X, **kwds):
    """Wrapper for sklearn ledoit wolf covariance estimator"""
    C, _ = ledoit_wolf(X.mT, **kwds)
    return C


@_complex_estimator
def _mcd(X, **kwds):
    """Wrapper for sklearn mcd covariance estimator"""
    _, C, _, _ = fast_mcd(X.mT, **kwds)
    return C


@_complex_estimator
def _oas(X, **kwds):
    """Wrapper for sklearn oas covariance estimator"""
    C, _ = oas(X.mT, **kwds)
    return C


def _hub(X, **kwds):
    """Wrapper for Huber's M-estimator"""
    return covariance_mest(X, "hub", **kwds)


def _stu(X, **kwds):
    """Wrapper for Student-t's M-estimator"""
    return covariance_mest(X, "stu", **kwds)


def _tyl(X, **kwds):
    """Wrapper for Tyler's M-estimator"""
    return covariance_mest(X, "tyl", **kwds)


[docs] def covariance_mest(X, m_estimator, *, init=None, tol=10e-3, n_iter_max=50, assume_centered=False, q=0.9, nu=5, norm="trace"): r"""Robust M-estimators. Robust M-estimator based covariance matrix [1]_, computed by fixed point algorithm. For an input time series :math:`X \in \mathbb{R}^{c \times t}`, composed of :math:`c` channels and :math:`t` time samples, .. math:: C = \frac{1}{t} \sum_i \varphi(X[:,i]^H C^{-1} X[:,i]) X[:,i] X[:,i]^H where :math:`\varphi()` is a function allowing to weight the squared Mahalanobis distance depending on the M-estimator type: Huber, Student-t or Tyler. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series, real or complex-valued. m_estimator : {"hub", "stu", "tyl"} Type of M-estimator: - "hub" for Huber's M-estimator [2]_; - "stu" for Student-t's M-estimator [3]_; - "tyl" for Tyler's M-estimator [4]_. init : None | ndarray, shape (n_channels, n_channels), default=None A matrix used to initialize the algorithm. If None, the sample covariance matrix is used. tol : float, default=10e-3 The tolerance to stop the fixed point estimation. n_iter_max : int, default=50 The maximum number of iterations. assume_centered : bool, default=False If True, data will not be centered before computation. Useful when working with data whose mean is almost, but not exactly zero. If False, data will be centered before computation. q : float, default=0.9 Using Huber's M-estimator, q is the percentage in (0, 1] of inputs deemed uncorrupted, while (1-q) is the percentage of inputs treated as outliers w.r.t a Gaussian distribution. This estimator is a trade-off between Tyler's estimator (q=0) and the sample covariance matrix (q=1). nu : int, default=5 Using Student-t's M-estimator, degree of freedom for t-distribution (strictly positive). This estimator is a trade-off between Tyler's estimator (nu->0) and the sample covariance matrix (nu->inf). norm : {"trace", "determinant"}, default="trace" Using Tyler's M-estimator, the type of normalization: * "trace": trace of covariance matrix is n_channels; * "determinant": determinant of covariance matrix is 1. Returns ------- cov : ndarray, shape (..., n_channels, n_channels) Robust M-estimator based covariance matrix. Notes ----- .. versionadded:: 0.4 .. versionchanged:: 0.12 Add support for NumPy and PyTorch. References ---------- .. [1] `Complex Elliptically Symmetric Distributions: Survey, New Results and Applications <https://www.researchgate.net/profile/H-Vincent-Poor/publication/258658018_Complex_Elliptically_Symmetric_Distributions_Survey_New_Results_and_Applications/links/550480100cf24cee3a0150e2/Complex-Elliptically-Symmetric-Distributions-Survey-New-Results-and-Applications.pdf>`_ E. Ollila, D.E. Tyler, V. Koivunen, H.V. Poor. IEEE Transactions on Signal Processing, 2012. .. [2] `Robust antenna array processing using M-estimators of pseudo-covariance <http://lib.tkk.fi/Diss/2010/isbn9789526030319/article5.pdf>`_ E. Ollila, V. Koivunen. PIMRC, 2003. .. [3] `Influence functions for array covariance matrix estimators <https://ieeexplore.ieee.org/abstract/document/1289447/>`_ E. Ollila, V. Koivunen. IEEE SSP, 2003. .. [4] `A distribution-free M-estimator of multivariate scatter <https://projecteuclid.org/journals/annals-of-statistics/volume-15/issue-1/A-Distribution-Free-M-Estimator-of-Multivariate-Scatter/10.1214/aos/1176350263.full>`_ D.E. Tyler. The Annals of Statistics, 1987. """ # noqa xp = get_namespace(X, init) n_channels, n_times = X.shape[-2], X.shape[-1] if m_estimator == "hub": if not 0 < q <= 1: raise ValueError(f"Value q must be included in (0, 1] (Got {q})") def weight_func(x): # Example 1, Section V-C in [1] c2 = chi2.ppf(q, n_channels) / 2 b = chi2.cdf(2 * c2, n_channels + 1) + c2 * (1 - q) / n_channels return xp.minimum(xp.ones_like(x), c2 / x) / b elif m_estimator == "stu": if nu <= 0: raise ValueError(f"Value nu must be strictly positive (Got {nu})") def weight_func(x): # Eq.(42) in [1] return (2 * n_channels + nu) / (nu + 2 * x) elif m_estimator == "tyl": def weight_func(x): # Example 2, Section V-C in [1] return n_channels / x else: raise ValueError(f"Unsupported m_estimator: {m_estimator}") if not assume_centered: X = X - xp.mean(X, axis=-1, keepdims=True) if init is None: cov = X @ ctranspose(X) / n_times else: cov = check_init(init, n_channels, like=X) for _ in range(n_iter_max): dist2 = distance_mahalanobis(X, cov, squared=True) Xw = xp.sqrt(weight_func(dist2))[None, :] * X cov_new = Xw @ ctranspose(Xw) / n_times norm_delta = xp.linalg.matrix_norm(cov_new - cov) norm_cov = xp.linalg.matrix_norm(cov) cov = cov_new if norm_delta / norm_cov <= tol: break else: warnings.warn("Convergence not reached", stacklevel=2) if m_estimator == "tyl": cov = normalize(cov, norm) if norm == "trace": cov *= n_channels return cov
[docs] @_complex_estimator def covariance_sch(X): r"""Schaefer-Strimmer shrunk covariance estimator. Shrinkage covariance estimator [1]_: .. math:: C = (1 - \gamma) C_\text{scm} + \gamma T where :math:`T` is the diagonal target matrix: .. math:: T[i,j] = \{ C_\text{scm}[i,i] \ \text{if} \ i=j, 0 \ \text{otherwise} \} Note that the optimal :math:`\gamma` is estimated by the authors' method. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series, real or complex-valued. Returns ------- cov : ndarray, shape (..., n_channels, n_channels) Schaefer-Strimmer shrunk covariance matrix. Notes ----- .. versionadded:: 0.3 .. versionchanged:: 0.12 Add support for NumPy and PyTorch. References ---------- .. [1] `A shrinkage approach to large-scale covariance estimation and implications for functional genomics <http://doi.org/10.2202/1544-6115.1175>`_ J. Schafer, and K. Strimmer. Statistical Applications in Genetics and Molecular Biology, Volume 4, Issue 1, 2005. """ xp = get_namespace(X) n_times = X.shape[-1] X_c = X - xp.mean(X, axis=-1, keepdims=True) C_scm = X_c @ X_c.mT / n_times # Compute optimal gamma, the weighting between SCM and shrinkage estimator std = xp.std(X, axis=-1) std_outer = std[..., :, xp.newaxis] * std[..., xp.newaxis, :] R = (n_times / ((n_times - 1.) * std_outer)) * C_scm X_c2 = X_c ** 2 var_R = X_c2 @ X_c2.mT - n_times * C_scm ** 2 var = xp.var(X, axis=-1) var_outer = var[..., :, xp.newaxis] * var[..., xp.newaxis, :] var_R *= n_times / ((n_times - 1) ** 3 * var_outer) # Zero out diagonal diag_idx = diag_indices(R.shape[-1], like=R) R[..., diag_idx[0], diag_idx[1]] = 0 var_R[..., diag_idx[0], diag_idx[1]] = 0 R2_sum = xp.sum(R ** 2, axis=(-2, -1)) gamma = xp.clip( xp.where(R2_sum == 0, 0.0, xp.sum(var_R, axis=(-2, -1)) / R2_sum), 0, 1, ) gamma = expand_dims(gamma, axis=(-2, -1)) sigma = (1. - gamma) * (n_times / (n_times - 1.)) * C_scm # diagonal matrix from C_scm diagonal diag_C = xp.linalg.diagonal(C_scm) shrinkage_diag = xp.zeros_like(C_scm) shrinkage_diag[..., diag_idx[0], diag_idx[1]] = diag_C shrinkage = gamma * (n_times / (n_times - 1.)) * shrinkage_diag return sigma + shrinkage
[docs] def covariance_scm(X, *, assume_centered=False, weights=None): r"""Sample covariance estimator. Sample covariance estimator, re-implementing ``empirical_covariance`` of scikit-learn [1]_, but supporting: - real and complex-valued data, - broadcasting, - weights for time samples. .. math:: \mathbf{C}_\text{scm} = \mathbf{X} \text{diag}(w) \mathbf{X}^H with :math:`w` being the weights which sum to 1. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series, real or complex-valued. assume_centered : bool, default=False If True, data will not be centered before computation. Useful when working with data whose mean is almost, but not exactly zero. If False, data will be centered before computation. weights : None | ndarray, shape (n_times,), default=None Weights for each time sample. If None, it uses equal weights. .. versionadded:: 0.11 Returns ------- cov : ndarray, shape (..., n_channels, n_channels) Sample covariance matrix. Notes ----- .. versionadded:: 0.6 .. versionchanged:: 0.11 Add weights. .. versionchanged:: 0.12 Add support for NumPy and PyTorch. References ---------- .. [1] https://scikit-learn.org/stable/modules/generated/sklearn.covariance.empirical_covariance.html """ # noqa xp = get_namespace(X) weights = check_weights(weights, X.shape[-1], like=X) if not assume_centered: X = X - xp.sum(X * weights, axis=-1, keepdims=True) cov = (X * weights) @ ctranspose(X) return cov
############################################################################### cov_est_functions = { "corr": _corr, "cov": _cov, "hub": _hub, "lwf": _lwf, "mcd": _mcd, "oas": _oas, "sch": covariance_sch, "scm": covariance_scm, "stu": _stu, "tyl": _tyl, } def _check_cov_estimator(estimator): est = check_function(estimator, cov_est_functions) if est not in [covariance_sch, covariance_scm]: est = _vectorize_nd()(est) return est
[docs] def covariances(X, estimator="cov", **kwds): """Estimation of covariance matrices. Estimates covariance matrices from multi-channel time-series according to a covariance estimator. It supports real and complex-valued data. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series, real or complex-valued. estimator : string | callable, default="cov" Covariance matrix estimator [est]_: * "corr" for correlation coefficient matrix [corr]_, * "cov" for NumPy based covariance matrix [cov]_, * "hub" for Huber's M-estimator based covariance matrix [mest]_, * "lwf" for Ledoit-Wolf shrunk covariance matrix [lwf]_, * "mcd" for minimum covariance determinant matrix [mcd]_, * "oas" for oracle approximating shrunk covariance matrix [oas]_, * "sch" for Schaefer-Strimmer shrunk covariance matrix [sch]_, * "scm" for sample covariance matrix [scm]_, * "stu" for Student-t's M-estimator based covariance matrix [mest]_, * "tyl" for Tyler's M-estimator based covariance matrix [mest]_, * or a callable function. For regularization, consider "lwf" or "oas". For robustness, consider "hub", "mcd", "stu" or "tyl". For "lwf", "mcd", "oas" and "sch" estimators, complex covariance matrices are estimated according to [comp]_. **kwds : dict Any further parameters are passed directly to the covariance estimator. Returns ------- covmats : ndarray, shape (..., n_channels, n_channels) Covariance matrices. Notes ----- .. versionchanged:: 0.12 Add support for NumPy and PyTorch. References ---------- .. [est] https://scikit-learn.org/stable/modules/covariance.html .. [corr] https://numpy.org/doc/stable/reference/generated/numpy.corrcoef.html .. [cov] https://numpy.org/doc/stable/reference/generated/numpy.cov.html .. [lwf] https://scikit-learn.org/stable/modules/generated/sklearn.covariance.ledoit_wolf.html .. [mcd] https://scikit-learn.org/stable/modules/generated/sklearn.covariance.MinCovDet.html .. [mest] :func:`pyriemann.geometry.covariance.covariance_mest` .. [oas] https://scikit-learn.org/stable/modules/generated/oas-function.html .. [sch] :func:`pyriemann.geometry.covariance.covariance_sch` .. [scm] :func:`pyriemann.geometry.covariance.covariance_scm` .. [comp] `Enhanced Covariance Matrix Estimators in Adaptive Beamforming <https://doi.org/10.1109/ICASSP.2007.366399>`_ R. Abrahamsson, Y. Selen and P. Stoica. 2007 IEEE International Conference on Acoustics, Speech and Signal Processing, Volume 2, 2007. """ # noqa est = _check_cov_estimator(estimator) return est(X, **kwds)
[docs] def covariances_EP(X, P, estimator="cov", **kwds): """Special form covariance matrix, concatenating a prototype P. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series. P : ndarray, shape (n_channels_proto, n_times) Multi-channel prototype. estimator : string | callable, default="cov" Covariance matrix estimator, see :func:`pyriemann.geometry.covariance.covariances`. **kwds : optional keyword parameters Any further parameters are passed directly to the covariance estimator. Notes ----- .. versionchanged:: 0.12 Add support for NumPy and PyTorch. Returns ------- covmats : ndarray, shape (..., n_channels + n_channels_proto, \ n_channels + n_channels_proto) Covariance matrices. """ est = _check_cov_estimator(estimator) xp = get_namespace(X, P) original_shape = X.shape n_times = original_shape[-1] n_channels_proto, n_times_p = P.shape if n_times_p != n_times: raise ValueError( f"X and P do not have the same n_times: {n_times} and {n_times_p}") P_broadcast = xp.broadcast_to( P, (*original_shape[:-2], n_channels_proto, n_times) ) PX = xp.concat((P_broadcast, X), axis=-2) return est(PX, **kwds)
@deprecated( "covariances_X() is deprecated and will be removed in 0.14.0." ) def covariances_X(X, estimator="cov", alpha=0.2, **kwds): """Special form covariance matrix, embedding input X. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series. estimator : string | callable, default="cov" Covariance matrix estimator, see :func:`pyriemann.geometry.covariance.covariances`. alpha : float, default=0.2 Regularization parameter (strictly positive). **kwds : optional keyword parameters Any further parameters are passed directly to the covariance estimator. Returns ------- covmats : ndarray, shape (..., n_channels + n_times, n_channels + \ n_times) Covariance matrices. Notes ----- .. versionchanged:: 0.12 Add support for NumPy and PyTorch. References ---------- .. [1] `A special form of SPD covariance matrix for interpretation and visualization of data manipulated with Riemannian geometry <https://hal.archives-ouvertes.fr/hal-01103344/>`_ M. Congedo and A. Barachant, MaxEnt - 34th International Workshop on Bayesian Inference and Maximun Entropy Methods in Science and Engineering (MaxEnt'14), Sep 2014, Amboise, France. pp.495 """ if alpha <= 0: raise ValueError( f"Parameter alpha must be strictly positive (Got {alpha})") est = _check_cov_estimator(estimator) xp = get_namespace(X) original_shape = X.shape n_channels, n_times = original_shape[-2], original_shape[-1] dt, dev = X.real.dtype, xpd(X) Hchannels = xp.eye(n_channels, dtype=dt, device=dev) \ - xp.ones((n_channels, n_channels), dtype=dt, device=dev) / n_channels Htimes = xp.eye(n_times, dtype=dt, device=dev) \ - xp.ones((n_times, n_times), dtype=dt, device=dev) / n_times X = Hchannels @ X @ Htimes # Eq(8), double centering batch_shape = original_shape[:-2] I_ch = xp.broadcast_to( alpha * xp.eye(n_channels, dtype=X.dtype, device=xpd(X)), (*batch_shape, n_channels, n_channels), ) I_t = xp.broadcast_to( alpha * xp.eye(n_times, dtype=X.dtype, device=xpd(X)), (*batch_shape, n_times, n_times), ) Xt = X.mT top = xp.concat([X, I_ch], axis=-1) bot = xp.concat([I_t, Xt], axis=-1) Y = xp.concat([top, bot], axis=-2) # Eq(9) return est(Y, **kwds) / (2 * alpha) # Eq(10)
[docs] def block_covariances(X, blocks, estimator="cov", **kwds): """Compute block diagonal covariance. Calculates block diagonal matrices where each block is a covariance matrix of a subset of channels. Block sizes are passed as a list of integers and can vary. The sum of block sizes must equal the number of channels in X. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series. blocks: list of int List of block sizes. estimator : string | callable, default="cov" Covariance matrix estimator, see :func:`pyriemann.geometry.covariance.covariances`. **kwds : optional keyword parameters Any further parameters are passed directly to the covariance estimator. Notes ----- .. versionchanged:: 0.12 Add support for NumPy and PyTorch. Returns ------- covmats : ndarray, shape (..., n_channels, n_channels) Block diagonal covariance matrices. """ est = _check_cov_estimator(estimator) xp = get_namespace(X) original_shape = X.shape n_channels = original_shape[-2] if sum(blocks) != n_channels: raise ValueError("Sum of individual block sizes " "must match number of channels of X.") covmats = xp.zeros( (*original_shape[:-2], n_channels, n_channels), dtype=X.dtype, device=xpd(X), ) idx_start = 0 for j in blocks: block_cov = est(X[..., idx_start:idx_start+j, :], **kwds) if block_cov.ndim < 2: block_cov = xp.reshape(block_cov, (*block_cov.shape, 1, 1)) covmats[..., idx_start:idx_start+j, idx_start:idx_start+j] = \ block_cov idx_start += j return covmats
############################################################################### def eegtocov(sig, window=128, overlapp=0.5, padding=True, estimator="cov"): """Convert EEG signal to covariance using sliding window. Notes ----- .. versionchanged:: 0.12 Add support for NumPy and PyTorch. """ est = check_function(estimator, cov_est_functions) xp = get_namespace(sig) X = [] if padding: padd = xp.zeros( (int(window / 2), sig.shape[1]), dtype=sig.dtype, device=xpd(sig), ) sig = xp.concat((padd, sig, padd), axis=0) n_times, n_channels = sig.shape jump = int(window * overlapp) ix = 0 while (ix + window < n_times): X.append(est(sig[ix:ix + window, :].mT)) ix = ix + jump return xp.stack(X, axis=0) ###############################################################################
[docs] def cross_spectrum(X, window=128, overlap=0.75, fmin=None, fmax=None, fs=None): """Compute the complex cross-spectral matrices of a real signal. Note that co-spectral matrices are the real part of cross-spectra. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series, real-valued. window : int, default=128 Length of the FFT window used for spectral estimation. overlap : float, default=0.75 Percentage of overlap between windows. fmin : float | None, default=None Minimal frequency to be returned. fmax : float | None, default=None Maximal frequency to be returned. fs : float | None, default=None Sampling frequency of the time-series. Returns ------- S : ndarray, shape (..., n_channels, n_channels, n_freqs) Cross-spectral matrices, for each frequency bin. freqs : ndarray, shape (n_freqs,) Frequencies associated to cross-spectra. Notes ----- .. versionadded:: 0.2.7 .. versionchanged:: 0.12 Add support for NumPy and PyTorch. References ---------- .. [1] https://en.wikipedia.org/wiki/Cross-spectrum """ if not is_real_type(X): raise ValueError("Input must be real-valued.") window = int(window) if window < 1: raise ValueError("Value window must be a positive integer") if not 0 < overlap < 1: raise ValueError( f"Value overlap must be included in (0, 1) (Got {overlap})" ) xp, dev = get_namespace(X), xpd(X) n_times = X.shape[-1] n_freqs = int(window / 2) + 1 # X real signal => compute half-spectrum step = int((1.0 - overlap) * window) n_windows = int((n_times - window) / step + 1) win = hann_window(window, like=X) # Sliding window via advanced indexing handles any batch dims # (..., n_channels, n_times) -> (..., n_channels, n_windows, window) starts = xp.arange(n_windows, device=dev) * step offsets = xp.arange(window, device=dev) Xs = X[..., starts[:, None] + offsets[None, :]] # FFT: (..., n_channels, n_windows, n_freqs) fdata = xp.fft.rfft(Xs * win, n=window) # adjust frequency range to specified range if fs is not None: if fmin is None: fmin = 0 if fmax is None: fmax = fs / 2 if fmax <= fmin: raise ValueError("Parameter fmax must be superior to fmin") if 2.0 * fmax > fs: # check Nyquist-Shannon raise ValueError("Parameter fmax must be inferior to fs/2") f = xp.arange(0, n_freqs, dtype=X.dtype, device=dev) \ * float(fs / window) fix = (f >= fmin) & (f <= fmax) fdata = fdata[..., fix] freqs = f[fix] else: if fmin is not None: warnings.warn("Parameter fmin not used because fs is None", stacklevel=2) if fmax is not None: warnings.warn("Parameter fmax not used because fs is None", stacklevel=2) freqs = None # Cross-spectral matrix via einsum over windows # (..., n_channels, n_channels, n_freqs) S = xp.einsum('...cwf,...dwf->...cdf', xp.conj(fdata), fdata) S /= n_windows * xp.linalg.vector_norm(win)**2 # normalization to respect Parseval's theorem with the half-spectrum # excepted DC bin (always), and Nyquist bin (when window is even) if window % 2: S[..., 1:] *= 2 else: S[..., 1:-1] *= 2 return S, freqs
@deprecated( "cospectrum() is deprecated and will be removed in 0.14.0; " "please use cross_spectrum() and take the real part of first output." ) def cospectrum(X, window=128, overlap=0.75, fmin=None, fmax=None, fs=None): """Compute co-spectral matrices, the real part of cross-spectra. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series, real-valued. window : int, default=128 Length of the FFT window used for spectral estimation. overlap : float, default=0.75 Percentage of overlap between windows. fmin : float | None, default=None Minimal frequency to be returned. fmax : float | None, default=None Maximal frequency to be returned. fs : float | None, default=None Sampling frequency of the time-series. Returns ------- S : ndarray, shape (..., n_channels, n_channels, n_freqs) Co-spectral matrices, for each frequency bin. freqs : ndarray, shape (n_freqs,) Frequencies associated to cospectra. """ S, freqs = cross_spectrum( X=X, window=window, overlap=overlap, fmin=fmin, fmax=fmax, fs=fs, ) return S.real, freqs
[docs] def coherence(X, window=128, overlap=0.75, fmin=None, fmax=None, fs=None, coh="ordinary"): """Compute squared coherence. Parameters ---------- X : ndarray, shape (..., n_channels, n_times) Multi-channel time-series, real-valued. window : int, default=128 Length of the FFT window used for spectral estimation. overlap : float, default=0.75 Percentage of overlap between windows. fmin : float | None, default=None Minimal frequency to be returned. fmax : float | None, default=None Maximal frequency to be returned. fs : float | None, default=None Sampling frequency of the time-series. coh : {"ordinary", "instantaneous", "lagged", "imaginary"}, \ default="ordinary" Coherence type: * "ordinary" for the ordinary coherence, defined in Eq.(22) of [1]_; this normalization of cross-spectral matrices captures both in-phase and out-of-phase correlations. However it is inflated by the artificial in-phase (zero-lag) correlation engendered by volume conduction. * "instantaneous" for the instantaneous coherence, Eq.(26) of [1]_, capturing only in-phase correlation. * "lagged" for the lagged-coherence, Eq.(28) of [1]_, capturing only out-of-phase correlation (not defined for DC and Nyquist bins). * "imaginary" for the imaginary coherence [2]_, Eq.(0.16) of [3]_, capturing out-of-phase correlation but still affected by in-phase correlation. Returns ------- C : ndarray, shape (..., n_channels, n_channels, n_freqs) Squared coherence matrices, for each frequency bin. freqs : ndarray, shape (n_freqs,) Frequencies associated to coherence. Notes ----- .. versionadded:: 0.2.4 .. versionchanged:: 0.3 Add support for lagged and imaginary coherences. References ---------- .. [1] `Instantaneous and lagged measurements of linear and nonlinear dependence between groups of multivariate time series: frequency decomposition <https://arxiv.org/ftp/arxiv/papers/0711/0711.1455.pdf>`_ R. Pascual-Marqui. Technical report, 2007. .. [2] `Identifying true brain interaction from EEG data using the imaginary part of coherency <https://doi.org/10.1016/j.clinph.2004.04.029>`_ G. Nolte, O. Bai, L. Wheaton, Z. Mari, S. Vorbach, M. Hallett. Clinical Neurophysioly, Volume 115, Issue 10, October 2004, Pages 2292-2307 .. [3] `Non-Parametric Synchronization Measures used in EEG and MEG <https://hal.archives-ouvertes.fr/hal-01868538v2>`_ M. Congedo. Technical Report, 2018. """ S, freqs = cross_spectrum( X, window=window, overlap=overlap, fmin=fmin, fmax=fmax, fs=fs, ) # S: (..., n_channels, n_channels, n_freqs) xp, dev = get_namespace(S), xpd(S) S2 = xp.abs(S)**2 # squared cross-spectral modulus n_channels = S.shape[-3] C = xp.zeros_like(S2) f_inds = xp.arange(0, C.shape[-1], device=dev) # lagged coh not defined for DC and Nyquist bins, because S is real if coh == "lagged": if freqs is None: f_inds = xp.arange(1, C.shape[-1] - 1, device=dev) warnings.warn("DC and Nyquist bins are not defined for lagged-" "coherence: filled with zeros", stacklevel=2) else: keep = (freqs > 0) & (freqs < fs / 2) f_inds_ = f_inds[keep] if not bool(xp.all(keep)): warnings.warn("DC and Nyquist bins are not defined for lagged-" "coherence: filled with zeros", stacklevel=2) f_inds = f_inds_ # Vectorized PSD: diagonal of S2 across channels diag_idx = xp.arange(n_channels, device=dev) psd = xp.sqrt(S2[..., diag_idx, diag_idx, :]) # (..., n_ch, n_freqs) psd_prod = psd[..., :, None, :] * psd[..., None, :, :] # psd_prod: (..., n_ch, n_ch, n_freqs) if coh == "ordinary": C[..., f_inds] = S2[..., f_inds] / psd_prod[..., f_inds] elif coh == "instantaneous": C[..., f_inds] = xp.real(S[..., f_inds])**2 / psd_prod[..., f_inds] elif coh == "lagged": S_real = xp.asarray(xp.real(S), copy=True) S_real[..., diag_idx, diag_idx, :] = 0.0 # zero diagonal denom = psd_prod[..., f_inds] - S_real[..., f_inds]**2 denom = xp.where(xp.abs(denom) < 1e-10, 1e-10, denom) C[..., f_inds] = xp.imag(S[..., f_inds])**2 / denom elif coh == "imaginary": C[..., f_inds] = xp.imag(S[..., f_inds])**2 / psd_prod[..., f_inds] else: raise ValueError(f"{coh} is not a supported coherence") return C, freqs
###############################################################################
[docs] def normalize(X, norm): """Normalize a set of square matrices, using corr, trace or determinant. Parameters ---------- X : ndarray, shape (..., n, n) Set of square matrices. Matrices must be invertible for determinant-normalization. norm : {"corr", "trace", "determinant"} Type of normalization: * "corr": normalized matrices are correlation matrices, with values in [-1, 1] and diagonal values equal to 1; * "trace": trace of normalized matrices is 1; * "determinant": determinant of normalized matrices is +/- 1. Notes ----- .. versionadded:: 0.2.7 .. versionchanged:: 0.12 Add support for NumPy and PyTorch. Returns ------- Xn : ndarray, shape (..., n, n) Set of normalized matrices, same dimensions as X. """ xp = get_namespace(X) if not is_square(X): raise ValueError("Matrices must be square") if norm == "corr": stddev = xp.sqrt(xp.abs(xp.linalg.diagonal(X))) denom = expand_dims(stddev, axis=-2) * stddev[..., None] elif norm == "trace": denom = xp.linalg.trace(X) elif norm == "determinant": denom = xp.abs(xp.linalg.det(X)) ** (1 / X.shape[-1]) else: raise ValueError(f"{norm} is not a supported normalization") denom = expand_dims(denom, axis=tuple(range(denom.ndim, X.ndim))) Xn = X / denom if norm == "corr": Xn.real = xp.clip(Xn.real, -1, 1) return Xn
[docs] def get_nondiag_weight(X): """Compute non-diagonality weights of a set of square matrices. Compute non-diagonality weights of a set of square matrices, following Eq(B.1) in [1]_. Parameters ---------- X : ndarray, shape (..., n, n) Set of square matrices. Returns ------- weights : ndarray, shape (...,) Non-diagonality weights for matrices. Notes ----- .. versionadded:: 0.2.7 .. versionchanged:: 0.12 Add support for NumPy and PyTorch. References ---------- .. [1] `On the blind source separation of human electroencephalogram by approximate joint diagonalization of second order statistics <https://hal.archives-ouvertes.fr/hal-00343628>`_ M. Congedo, C. Gouy-Pailler, C. Jutten. Clinical Neurophysiology, Elsevier, 2008, 119 (12), pp.2677-2686. """ xp = get_namespace(X) if not is_square(X): raise ValueError("Matrices must be square") X2 = X**2 # sum of squared diagonal elements denom = xp.linalg.trace(X2) # sum of squared off-diagonal elements num = xp.sum(X2, axis=(-2, -1)) - denom weights = (1.0 / (X.shape[-1] - 1)) * (num / denom) return weights