Source code for pyriemann.datasets.simulated

import numpy as np
from sklearn.utils.validation import check_random_state

from ..geometry.base import ctranspose, invsqrtm, powm, sqrtm, expm
from ..geometry.distance import distance_riemann
from ..geometry.mean import mean_riemann
from ..transfer import encode_domains
from .sampling import sample_gaussian


def _make_eyes(n_matrices, n_dim):
    """Generate a 3d array of stacked np.eye matrices.

    Parameters
    ----------
    n_matrices : int
        Number of matrices to generate.
    n_dim : int
        Dimension of eye matrices to generate.

    Returns
    -------
    X : ndarray, shape (n_matrices, n_dim, n_dim)
        Set of np.eye matrices.
    """
    return np.repeat(np.eye(n_dim)[np.newaxis, :, :], n_matrices, axis=0)


mat_kinds = [
    "real",
    "comp",
    "inv",
    "orth",
    "sym",
    "cinv",
    "unit",
    "spd",
    "spsd",
    "herm",
    "hpd",
    "hpsd",
]


[docs] def make_matrices(n_matrices, n_dim, kind, rs=None, return_params=False, evals_low=0.5, evals_high=2.0, eigvecs_same=False, eigvecs_mean=0.0, eigvecs_std=1.0): """Generate matrices with specific properties. Parameters ---------- n_matrices : int Number of matrices to generate. n_dim : int | list of int If int, dimension of square matrices to generate. If list, dimensions of "real" or "comp" matrices to generate. kind : {"real", "comp", "inv", "orth, "sym", "spd", "spsd", "cinv", \ "unit", "herm", "hpd", "hpsd"} Kind of matrices to generate: - "real" for real-valued matrices; - "comp" for complex-valued matrices. Kind of square matrices to generate: - "inv" for invertible real-valued matrices; - "orth" for orthogonal matrices; - "sym" for symmetric real-valued matrices; - "spd" for symmetric positive-definite matrices; - "spsd" for symmetric positive semi-definite matrices; - "cinv" for invertible complex-valued matrices; - "unit" for unitary matrices; - "herm" for Hermitian matrices; - "hpd" for Hermitian positive-definite matrices; - "hpsd" for Hermitian positive semi-definite matrices. rs : int | RandomState instance | None, default=None Random state for reproducible output across multiple function calls. return_params : bool, default=False If True, returns evals and evecs for "spd", "spsd", "hpd" and "hpsd". evals_low : float, default=0.5 Lowest value of the uniform distribution to draw eigen values. evals_high : float, default=2.0 Highest value of the uniform distribution to draw eigen values. eigvecs_same : bool, default=False If True, uses the same eigen vectors for all matrices. eigvecs_mean : float, default=0.0 Mean of the normal distribution to draw eigen vectors. .. versionadded:: 0.8 eigvecs_std : float, default=1.0 Standard deviation of the normal distribution to draw eigen vectors. .. versionadded:: 0.8 Returns ------- mats : ndarray, shape (n_matrices, n_dim, n_dim) or (n_matrices, \\*n_dim) Set of generated matrices. evals : ndarray, shape (n_matrices, n_dim) Eigen values used for "spd", "spsd", "hpd" and "hpsd". Only returned if ``return_params=True``. evecs : ndarray, shape (n_matrices, n_dim, n_dim) or (n_dim, n_dim) Eigen vectors used for "spd", "spsd", "hpd" and "hpsd". Only returned if ``return_params=True``. Notes ----- .. versionadded:: 0.5 .. versionchanged:: 0.8 Add support for kinds "sym" and "herm". .. versionchanged:: 0.10 Add support for non-square matrices, and for kinds "inv" and "cinv", "orth", and "unit". """ rs = check_random_state(rs) if isinstance(n_dim, list): if kind not in ["real", "comp"]: raise ValueError(f"Unsupported matrix kind: {kind}") X = rs.randn(n_matrices, *n_dim) if kind == "comp": X = X + 1j * rs.randn(n_matrices, *n_dim) return X if not isinstance(n_dim, int): raise ValueError(f"Unsupported n_dim type: {type(n_dim)}") if kind not in mat_kinds: raise ValueError(f"Unsupported matrix kind: {kind}") if kind in ["inv", "cinv"]: while True: X = rs.randn(n_matrices, n_dim, n_dim) if kind == "cinv": X = X + 1j * rs.randn(n_matrices, n_dim, n_dim) if np.all(np.linalg.det(X) != 0): return X X = eigvecs_std * rs.randn(n_matrices, n_dim, n_dim) + eigvecs_mean if kind == "unit": Y = eigvecs_std * rs.randn(n_matrices, n_dim, n_dim) + eigvecs_mean X = X + 1j * Y if kind in ["orth", "unit"]: return np.linalg.qr(X)[0] if kind == "real": return X if kind == "sym": return X + X.transpose(0, 2, 1) if kind in ["comp", "herm", "hpd", "hpsd"]: Y = eigvecs_std * rs.randn(n_matrices, n_dim, n_dim) + eigvecs_mean if kind == "herm": return X + X.transpose(0, 2, 1) + 1j * (Y - Y.transpose(0, 2, 1)) X = X + 1j * Y if kind == "comp": return X # eigen values if evals_low <= 0.0: raise ValueError( f"Lowest value must be strictly positive (Got {evals_low})." ) if evals_high <= evals_low: raise ValueError( "Highest value must be superior to lowest value " f"(Got {evals_high} and {evals_low})." ) evals = rs.uniform(evals_low, evals_high, size=(n_matrices, n_dim)) if kind in ("spsd", "hpsd"): evals[..., -1] = 1e-10 # last eigen value set to almost zero # eigen vectors if eigvecs_same: X = X[0] evecs = np.linalg.qr(X)[0] # conjugation if eigvecs_same: mats = np.empty((n_matrices, n_dim, n_dim), dtype=X.dtype) for i in range(n_matrices): mats[i] = (evecs * evals[i]) @ ctranspose(evecs) else: mats = (evecs * evals[:, np.newaxis, :]) @ ctranspose(evecs) if return_params: return mats, evals, evecs else: return mats
[docs] def make_masks(n_masks, n_dim0, n_dim1_min, rs=None): """Generate masks defined as semi-orthogonal matrices. Parameters ---------- n_masks : int Number of masks to generate. n_dim0 : int First dimension of masks. n_dim1_min : int Minimal value for second dimension of masks. rs : int | RandomState instance | None, default=None Random state for reproducible output across multiple function calls. Returns ------- masks : list of n_masks ndarray of shape (n_dim0, n_dim1_i), \ with different n_dim1_i, such that n_dim1_min <= n_dim1_i <= n_dim0 Masks. Notes ----- .. versionadded:: 0.3 """ rs = check_random_state(rs) masks = [] for _ in range(n_masks): n_dim1 = rs.randint(n_dim1_min, n_dim0, size=1)[0] mask, _ = np.linalg.qr(rs.randn(n_dim0, n_dim1)) masks.append(mask) return masks
[docs] def make_gaussian_blobs(n_matrices=100, n_dim=2, class_sep=1.0, class_disp=1.0, return_centers=False, center_dataset=False, random_state=None, centers=None, *, n_jobs=1, sampling_method="auto"): """Generate SPD matrices for two classes. Generate a set of SPD matrices drawn from Riemannian Gaussian distributions, one per class. Currently, it supports two classes. The distributions have the same dispersions. Useful for testing classification or clustering methods. Parameters ---------- n_matrices : int, default=100 Number of matrices to generate for each class. n_dim : int, default=2 Dimensionality of the generated SPD matrices. class_sep : float, default=1.0 Distance between the centers of the classes. class_disp : float, default=1.0 Dispersion of the matrices for each class. centers : None | ndarray, shape (2, n_dim, n_dim), default=None Centers for each class. If None, the centers are drawn randomly based on class_sep. return_centers : bool, default=False If True, return the centers of each class. center_dataset : bool, default=False If True, re-center dataset to the Identity. If False, dataset is centered around a random SPD matrix. random_state : int, RandomState instance or None, default=None Pass an int for reproducible output across multiple function calls. n_jobs : int, default=1 The number of jobs to use for the computation. This works by computing each of the class centroid in parallel. If -1 all CPUs are used. sampling_method : {"auto", "slice", "rejection"}, default="auto" Method used to sample eigenvalues: "auto", "slice" or "rejection". If "auto", sampling_method will be equal to "slice" for n_dim != 2 and equal to "rejection" for n_dim = 2. .. versionadded:: 0.4 Returns ------- X : ndarray, shape (2*n_matrices, n_dim, n_dim) Set of SPD matrices, for two classes. y : ndarray, shape (2*n_matrices,) Labels corresponding to each matrix. centers : ndarray, shape (2, n_dim, n_dim) The centers of each class. Only returned if ``return_centers=True``. Notes ----- .. versionadded:: 0.3 """ if not isinstance(class_sep, float): raise ValueError(f"class_sep must be a float (Got {class_sep})") rs = check_random_state(random_state) seeds = rs.randint(100, size=2) if centers is None: C0_in = np.eye(n_dim) # first class mean at Identity at first Pv = rs.randn(n_dim, n_dim) # create random tangent vector Pv = (Pv + Pv.T)/2 # symmetrize Pv = Pv / np.linalg.norm(Pv) # normalize P = expm(Pv) # take it back to the SPD manifold C1_in = powm(P, alpha=class_sep) # control distance to Identity else: C0_in, C1_in = centers # sample matrices from class 0 X0 = sample_gaussian( n_matrices=n_matrices, mean=C0_in, sigma=class_disp, random_state=seeds[0], n_jobs=n_jobs, sampling_method=sampling_method ) y0 = np.zeros(n_matrices) # sample matrices from class 1 X1 = sample_gaussian( n_matrices=n_matrices, mean=C1_in, sigma=class_disp, random_state=seeds[1], n_jobs=n_jobs, sampling_method=sampling_method ) y1 = np.ones(n_matrices) # concatenate the samples X = np.concatenate([X0, X1]) # re-center the dataset to the Identity M = mean_riemann(X) M_invsqrt = invsqrtm(M) X = M_invsqrt @ X @ M_invsqrt if not center_dataset: # center the dataset to a random SPD matrix M = make_matrices(n_matrices=1, n_dim=n_dim, kind="spd", rs=rs)[0] M_sqrt = sqrtm(M) X = M_sqrt @ X @ M_sqrt # concatenate the labels for each class y = np.concatenate([y0, y1]).astype(int) # randomly permute the samples of the dataset idx = rs.permutation(len(X)) X, y = X[idx], y[idx] if return_centers: if centers is None: C0_out = mean_riemann(X[y == 0]) C1_out = mean_riemann(X[y == 1]) else: C0_out = C0_in C1_out = C1_in centers = np.stack([C0_out, C1_out]) return X, y, centers else: return X, y
[docs] def make_outliers(n_matrices, mean, sigma, outlier_coeff=10, random_state=None): """Generate outlier matrices. Generate matrices that are outliers for a given Riemannian Gaussian distribution with fixed mean and dispersion. Parameters ---------- n_matrices : int Number of matrices to generate. mean : ndarray, shape (n_dim, n_dim) Center of the Riemannian Gaussian distribution. sigma : float Dispersion of the Riemannian Gaussian distribution. outlier_coeff: float, default=10 Coefficient determining how to define an outlier, i.e. how many times the sigma parameter its distance to the mean should be. random_state : int | RandomState instance | None, default=None Pass an int for reproducible output across multiple function calls. Returns ------- outliers : ndarray, shape (n_matrices, n_dim, n_dim) Set of generated outlier matrices. Notes ----- .. versionadded:: 0.3 """ n_dim = mean.shape[1] mean_sqrt = sqrtm(mean) outliers = np.zeros((n_matrices, n_dim, n_dim)) for i in range(n_matrices): Oi = make_matrices(1, n_dim=n_dim, kind="spd", rs=random_state)[0] epsilon_num = outlier_coeff * sigma * n_dim epsilon_den = distance_riemann(Oi, np.eye(n_dim), squared=True) epsilon = np.sqrt(epsilon_num / epsilon_den) outliers[i] = mean_sqrt @ powm(Oi, epsilon) @ mean_sqrt return outliers
[docs] def make_classification_transfer( n_matrices, class_sep=3.0, class_disp=1.0, domain_sep=5.0, theta=0.0, stretch=1.0, random_state=None, class_names=[1, 2], domain_names=["source_domain", "target_domain"], ): """Generate 2x2 SPD matrices for two classes in source and target domains. Generate a set of 2x2 SPD matrices drawn from Riemannian Gaussian distributions, one per class and per domain. Currently, it supports two classes and two domains. The distributions have the same dispersions. You can stretch the target domain and control a rotation matrix that maps the source domain to the target domain. Useful for testing classification or clustering methods on transfer learning applications. Parameters ---------- n_matrices : int Number of 2x2 matrices to generate for each class on each domain. class_sep : float, default=3.0 Distance between the centers of the classes. class_disp : float, default=1.0 Dispersion of the matrices for each class. domain_sep : float, default=5.0 Distance between the global means of each source and target domains. theta : float, default=0.0 Angle of the 2x2 rotation matrix from source to target domain. stretch : float, default=1.0 Factor to stretch the matrices in target domain. Note that when it is != 1.0 the class dispersions in target domain will be different than those in source domain (fixed at class_disp). random_state : None | int | RandomState instance, default=None Pass an int for reproducible output across multiple function calls. class_names : list, default=[1, 2] Names of classes. domain_names : list, default=["source_domain", "target_domain"] Names of domains, source and target. .. versionadded:: 0.8 Returns ------- X_enc : ndarray, shape (4*n_matrices, 2, 2) Set of 2x2 SPD matrices, for two classes and two domains. y_enc : ndarray, shape (4*n_matrices,) Extended labels for each matrix. Notes ----- .. versionadded:: 0.4 """ rs = check_random_state(random_state) seeds = rs.randint(100, size=4) n_dim = 2 if len(class_names) != 2: raise ValueError("class_names must contain 2 elements") if len(domain_names) != 2: raise ValueError("domain_names must contain 2 elements") # create a source domain with two classes and global mean at identity M1_source = np.eye(n_dim) # first class mean at Identity at first X1_source = sample_gaussian( n_matrices=n_matrices, mean=M1_source, sigma=class_disp, random_state=seeds[0], ) y1_source = [class_names[0]] * n_matrices Pv = rs.randn(n_dim, n_dim) # create random tangent vector Pv = (Pv + Pv.T)/2 # symmetrize Pv /= np.linalg.norm(Pv) # normalize P = expm(Pv) # take it back to the SPD manifold M2_source = powm(P, alpha=class_sep) # control distance to identity X2_source = sample_gaussian( n_matrices=n_matrices, mean=M2_source, sigma=class_disp, random_state=seeds[1], ) y2_source = [class_names[1]] * n_matrices X_source = np.concatenate([X1_source, X2_source]) M_source = mean_riemann(X_source) M_source_invsqrt = invsqrtm(M_source) # center the domain to Identity X_source = M_source_invsqrt @ X_source @ M_source_invsqrt y_source = np.concatenate([y1_source, y2_source]) # create target domain based on the source domain X1_target = sample_gaussian( n_matrices=n_matrices, mean=M1_source, sigma=class_disp, random_state=seeds[2], ) X2_target = sample_gaussian( n_matrices=n_matrices, mean=M2_source, sigma=class_disp, random_state=seeds[3], ) X_target = np.concatenate([X1_target, X2_target]) M_target = mean_riemann(X_target) M_target_invsqrt = invsqrtm(M_target) # center the domain to Identity X_target = M_target_invsqrt @ X_target @ M_target_invsqrt y_target = np.copy(y_source) # stretch the matrices in target domain if needed if stretch != 1.0: X_target = powm(X_target, alpha=stretch) # move the matrices in X_target with a random matrix A = P * Q # create SPD matrix for the translation between domains Pv = rs.randn(n_dim, n_dim) # create random tangent vector Pv = (Pv + Pv.T)/2 # symmetrize Pv /= np.linalg.norm(Pv) # normalize P = expm(Pv) # take it to the manifold P = powm(P, alpha=domain_sep) # control distance to identity P = sqrtm(P) # transport matrix # create orthogonal matrix for the rotation part Q = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]) # transform the matrices from the target domain A = P @ Q X_target = A @ X_target @ A.T # create array specifying the domain for each matrix domains = np.array( len(X_source) * [domain_names[0]] + len(X_target) * [domain_names[1]] ) # encode the labels and domains together X = np.concatenate([X_source, X_target]) y = np.concatenate([y_source, y_target]) X_enc, y_enc = encode_domains(X, y, domains) return X_enc, y_enc