"""Aproximate joint diagonalization algorithms."""
import warnings
from array_api_compat import array_namespace as get_namespace, device as xpd
from ._backend import _add_to_diagonal
from ._check import check_function, check_init, check_weights
def _arange(*args):
return list(range(*args))
def _reshape_input(X, xp):
return xp.asarray(xp.reshape(X, (-1, X.shape[-1])).mT, copy=True)
def _reshape_output(X, n1, n2, xp):
return xp.permute_dims(xp.reshape(X, (n1, n2, n1)), (1, 0, 2))
[docs]
def rjd(X, *, init=None, eps=1e-8, n_iter_max=100):
"""Approximate joint diagonalization based on JADE.
This is an implementation of the orthogonal AJD algorithm [1]_: joint
approximate diagonalization of eigen-matrices (JADE), based on Jacobi
angles.
The code is a translation of the Matlab code provided on the author's
website.
Parameters
----------
X : ndarray, shape (n_matrices, n, n)
Set of symmetric matrices to diagonalize.
init : None | ndarray, shape (n, n), default=None
Initialization for the diagonalizer.
eps : float, default=1e-8
Tolerance for stopping criterion.
n_iter_max : int, default=100
The maximum number of iterations to reach convergence.
Returns
-------
V : ndarray, shape (n, n)
The diagonalizer, an orthogonal matrix.
D : ndarray, shape (n_matrices, n, n)
Set of quasi diagonal matrices, D = V^T X V.
Notes
-----
.. versionadded:: 0.2.4
.. versionchanged:: 0.12
Add support for NumPy and PyTorch.
See Also
--------
ajd
References
----------
.. [1] `Jacobi angles for simultaneous diagonalization
<https://epubs.siam.org/doi/abs/10.1137/S0895479893259546>`_
J.-F. Cardoso and A. Souloumiac, SIAM Journal on Matrix Analysis and
Applications, 17(1), pp. 161-164, 1996.
"""
xp = get_namespace(X, init)
n_matrices, _, _ = X.shape
A = _reshape_input(X, xp)
n, n_matrices_x_n = A.shape
# init variables
if init is None:
V = xp.eye(n, dtype=X.dtype, device=xpd(X))
else:
V = check_init(init, n, like=X)
for _ in range(n_iter_max):
crit = False
for p in range(n):
for q in range(p + 1, n):
Ip = _arange(p, n_matrices_x_n, n)
Iq = _arange(q, n_matrices_x_n, n)
# computation of Givens rotations
g = xp.stack([A[p, Ip] - A[q, Iq], A[p, Iq] + A[q, Ip]])
gg = g @ g.mT
ton = gg[0, 0] - gg[1, 1]
toff = gg[0, 1] + gg[1, 0]
theta = 0.5 * xp.atan2(toff, ton + xp.sqrt(ton**2 + toff**2))
c = xp.cos(theta)
s = xp.sin(theta)
crit = crit | (xp.abs(s) > eps)
# update of A and V matrices
if xp.abs(s) > eps:
tmp = xp.asarray(A[:, Ip], copy=True)
A[:, Ip] = c * A[:, Ip] + s * A[:, Iq]
A[:, Iq] = c * A[:, Iq] - s * tmp
tmp = xp.asarray(A[p, :], copy=True)
A[p, :] = c * A[p, :] + s * A[q, :]
A[q, :] = c * A[q, :] - s * tmp
tmp = xp.asarray(V[:, p], copy=True)
V[:, p] = c * V[:, p] + s * V[:, q]
V[:, q] = c * V[:, q] - s * tmp
if not crit:
break
else:
warnings.warn("Convergence not reached")
D = _reshape_output(A, n, n_matrices, xp)
return V, D
[docs]
def ajd_pham(X, *, init=None, eps=1e-6, n_iter_max=20, sample_weight=None):
"""Approximate joint diagonalization based on Pham's algorithm.
This is a direct implementation of the AJD algorithm [1]_, optimizing a
log-likelihood criterion based on the Kullback-Leibler divergence.
Parameters
----------
X : ndarray, shape (n_matrices, n, n)
Set of SPD/HPD matrices to diagonalize.
init : None | ndarray, shape (n, n), default=None
Initialization for the diagonalizer.
.. versionadded:: 0.4
eps : float, default=1e-6
Tolerance for stoping criterion.
n_iter_max : int, default=20
The maximum number of iterations to reach convergence.
sample_weight : None | ndarray, shape (n_matrices,), default=None
Weights for each matrix, strictly positive.
If None, it uses equal weights.
.. versionadded:: 0.2.7
Returns
-------
V : ndarray, shape (n, n)
The diagonalizer, an invertible matrix.
D : ndarray, shape (n_matrices, n, n)
Set of quasi diagonal matrices, D = V X V^H.
Notes
-----
.. versionadded:: 0.2.4
.. versionchanged:: 0.7
Add support for HPD matrices.
.. versionchanged:: 0.12
Add support for NumPy and PyTorch.
See Also
--------
ajd
References
----------
.. [1] `Joint approximate diagonalization of positive definite
Hermitian matrices
<https://epubs.siam.org/doi/10.1137/S089547980035689X>`_
D.-T. Pham. SIAM Journal on Matrix Analysis and Applications, 22(4),
pp. 1136-1152, 2000.
"""
xp = get_namespace(X)
n_matrices, _, _ = X.shape
normalized_weight = check_weights(
sample_weight,
n_matrices,
check_positivity=True,
like=X,
) # sum = 1
A = _reshape_input(X, xp)
n, n_matrices_x_n = A.shape
# init variables
if init is None:
V = xp.eye(n, dtype=X.dtype, device=xpd(X))
else:
V = check_init(init, n, like=X)
epsilon = n * (n - 1) * eps
is_real = xp.isdtype(X.dtype, "real floating")
for _ in range(n_iter_max):
crit = 0
for ii in range(1, n):
for jj in range(ii):
Ii = _arange(ii, n_matrices_x_n, n)
Ij = _arange(jj, n_matrices_x_n, n)
c1 = A[ii, Ii]
c2 = A[jj, Ij]
g12 = xp.sum(normalized_weight * (A[ii, Ij] / c1))
g21 = xp.sum(normalized_weight * (A[ii, Ij] / c2))
omega21 = xp.sum(normalized_weight * (c1 / c2))
omega12 = xp.sum(normalized_weight * (c2 / c1))
omega = xp.sqrt(omega12 * omega21)
tmp = xp.sqrt(omega21 / omega12)
tmp1 = (tmp * g12 + g21) / (omega + 1)
if is_real:
omega = max(omega - 1, 1e-9)
tmp2 = (tmp * g12 - g21) / omega
h12 = tmp1 + tmp2
h21 = xp.conj((tmp1 - tmp2) / tmp)
crit += n_matrices * xp.real(g12 * xp.conj(h12) + g21 * h21) \
/ 2.0
if is_real:
tmp = 1 + xp.sqrt(1 - h12 * h21)
else:
tmp = 1 + 0.5j * xp.imag(h12 * h21)
tmp = tmp + xp.sqrt(tmp ** 2 - h12 * h21)
tau = xp.eye(2, dtype=X.dtype, device=xpd(X))
tau[0, 1] = xp.conj(-h12 / tmp)
tau[1, 0] = xp.conj(-h21 / tmp)
A[[ii, jj], :] = xp.conj(tau) @ A[[ii, jj], :]
tmp = xp.stack((A[:, Ii], A[:, Ij]), axis=-1)
tmp = tmp @ tau.mT
A[:, Ii] = tmp[..., 0]
A[:, Ij] = tmp[..., 1]
V[[ii, jj], :] = tau @ V[[ii, jj], :]
if crit < epsilon:
break
else:
warnings.warn("Convergence not reached")
D = xp.conj(_reshape_output(A, n, n_matrices, xp))
return V, D
[docs]
def uwedge(X, *, init=None, eps=1e-7, n_iter_max=100):
"""Approximate joint diagonalization based on UWEDGE.
This is an implementation of the AJD algorithm [1]_ [2]_: uniformly
weighted exhaustive diagonalization using Gauss iterations (U-WEDGE).
It is a translation from the Matlab code provided by the authors.
Parameters
----------
X : ndarray, shape (n_matrices, n, n)
Set of symmetric matrices to diagonalize.
init : None | ndarray, shape (n, n), default=None
Initialization for the diagonalizer.
eps : float, default=1e-7
Tolerance for stoping criterion.
n_iter_max : int, default=100
The maximum number of iterations to reach convergence.
Returns
-------
V : ndarray, shape (n, n)
The diagonalizer.
D : ndarray, shape (n_matrices, n, n)
Set of quasi diagonal matrices, D = V X V^T.
Notes
-----
.. versionadded:: 0.2.4
.. versionchanged:: 0.12
Add support for NumPy and PyTorch.
See Also
--------
ajd
References
----------
.. [1] `A Fast Approximate Joint Diagonalization Algorithm Using a
Criterion with a Block Diagonal Weight Matrix
<https://ieeexplore.ieee.org/abstract/document/4518361>`_
P. Tichavsky, A. Yeredor and J. Nielsen. 2008 IEEE International
Conference on Acoustics, Speech and Signal Processing ICASSP.
.. [2] `Fast Approximate Joint Diagonalization Incorporating Weight
Matrices
<https://ieeexplore.ieee.org/document/4671095>`_
P. Tichavsky and A. Yeredor. IEEE Trans Signal Process, 57(3), pp.
878 - 891, 2009.
"""
xp = get_namespace(X, init)
n_matrices, _, _ = X.shape
M = _reshape_input(X, xp)
n, n_matrices_x_n = M.shape
# init variables
if init is None:
E, H = xp.linalg.eig(M[:, 0:n])
if xp.isdtype(X.dtype, "real floating"):
H = xp.real(H)
V = H.mT / xp.sqrt(xp.abs(E))[:, None]
else:
V = check_init(init, n, like=X)
Ms = xp.asarray(M, copy=True)
Rs = xp.zeros((n, n_matrices), dtype=X.dtype, device=xpd(X))
eye_n = xp.eye(n, dtype=X.dtype, device=xpd(X))
for k in range(n_matrices):
ini = k * n
Il = _arange(ini, ini + n)
M[:, Il] = 0.5 * (M[:, Il] + M[:, Il].mT)
Ms[:, Il] = V @ M[:, Il] @ V.mT
Rs[:, k] = xp.linalg.diagonal(Ms[:, Il])
crit = xp.sum(Ms ** 2) - xp.sum(Rs ** 2)
for _ in range(n_iter_max):
B = Rs @ Rs.mT
C1 = xp.zeros((n, n), dtype=X.dtype, device=xpd(X))
for i in range(n):
C1[:, i] = xp.sum(Ms[:, i:n_matrices_x_n:n] * Rs, axis=1)
Bdiag = xp.linalg.diagonal(B)
D0 = B * B.mT - xp.linalg.outer(Bdiag, Bdiag)
A0 = (C1 * B - Bdiag[:, None] * C1.mT) / (D0 + eye_n)
_add_to_diagonal(A0, 1, xp)
V = xp.linalg.solve(A0, V)
Raux = V @ M[:, 0:n] @ V.mT
aux = 1. / xp.sqrt(xp.abs(xp.linalg.diagonal(Raux)))
V = aux[:, None] * V
for k in range(n_matrices):
ini = k * n
Il = _arange(ini, ini + n)
Ms[:, Il] = V @ M[:, Il] @ V.mT
Rs[:, k] = xp.linalg.diagonal(Ms[:, Il])
crit_new = xp.sum(Ms ** 2) - xp.sum(Rs ** 2)
if xp.abs(crit_new - crit) < eps:
break
crit = crit_new
else:
warnings.warn("Convergence not reached")
D = _reshape_output(Ms, n, n_matrices, xp)
return V, D
###############################################################################
ajd_functions = {
"ajd_pham": ajd_pham,
"rjd": rjd,
"uwedge": uwedge,
}
[docs]
def ajd(X, method="ajd_pham", init=None, eps=1e-6, n_iter_max=100, **kwargs):
"""Aproximate joint diagonalization (AJD) according to a method.
Compute the AJD of a set of matrices according to a method [1]_, estimating
the joint diagonalizer matrix, diagonalizing the set as much as possible.
Parameters
----------
X : ndarray, shape (n_matrices, n, n)
Set of symmetric matrices to diagonalize.
method : string | callable, default="ajd_pham"
Method for AJD, can be: "ajd_pham", "rjd", "uwedge", or a callable
function.
init : None | ndarray, shape (n, n), default=None
Initialization for the diagonalizer.
eps : float, default=1e-6
Tolerance for stopping criterion.
n_iter_max : int, default=100
The maximum number of iterations to reach convergence.
kwargs : dict
The keyword arguments passed to the sub function.
Returns
-------
V : ndarray, shape (n, n)
The diagonalizer.
D : ndarray, shape (n_matrices, n, n)
Set of quasi diagonal matrices.
Notes
-----
.. versionadded:: 0.6
See Also
--------
ajd_pham
rjd
uwedge
References
----------
.. [1] `Joint Matrices Decompositions and Blind Source Separation: A survey
of methods, identification, and applications
<http://library.utia.cas.cz/separaty/2014/SI/tichavsky-0427607.pdf>`_
G. Chabriel, M. Kleinsteuber, E. Moreau, H. Shen; P. Tichavsky and A.
Yeredor. IEEE Signal Process Mag, 31(3), pp. 34-43, 2014.
"""
ajd_function = check_function(method, ajd_functions)
V, D = ajd_function(
X,
init=init,
eps=eps,
n_iter_max=n_iter_max,
**kwargs,
)
return V, D