Source code for tripy.loglikelihood

"""
Likelihood evaluation
"""

from typing import List, Optional, Union

import numpy as np
from scipy.linalg import eigh, eigh_tridiagonal
from scipy.linalg.lapack import dpotrf, dpotrs

from tripy.utils import (
    _cast_scalar_to_array,
    cho_solve_symm_tridiag,
    chol_tridiag,
    inv_cov_vec_1D,
    kron_op,
    symm_tri_block_chol,
    symm_tri_block_solve,
    symm_tri_mvm,
)


[docs]def chol_loglike_1D( y: np.ndarray, x: np.ndarray, l_corr: Union[int, float], std_model: Union[int, float, np.ndarray], std_meas: Optional[Union[int, float, np.ndarray]] = None, y_model: Optional[np.ndarray] = None, ) -> float: """ Efficient Gaussian loglikelihood for 1D problems with exponential correlation. Linear time solution for 1D (e.g. timeseries) observations with additive Gaussian noise and exponential model prediction uncertainty. The model prediction uncertainty can be multiplicative or additive. Args: y: [N, ] Vector of observations or residuals between measurements and model predictions. x: [N, ] vector of coordinates l_corr: Scalar correlation length std_model: [N, ] vector of model prediction uncertainty coefficient of variation in case of multiplicative model prediction uncertainty, or vector of std. devs. for additive in the case of additive model prediction uncertainty. std_meas: [N, ] optional vector of measurement uncertainty std. dev. y_model: [N, ] optional vector of model predictions in the case of multiplicative model prediction uncertainty. Returns: L: Loglikelihood. """ # Initialization Nx = len(x) # Set y_model to vector of ones if None is specified if y_model is None: y_model = np.ones(Nx) if std_meas is None: # Inverse covariance in vector form d0, d1 = inv_cov_vec_1D(x, l_corr, std_model * y_model) # Factorize D, L = chol_tridiag(d0, d1) # Cholesky solve X = symm_tri_mvm(d0, d1, y) ySigmay = np.sum(y * X) # Determinants logdet_Sigma = -2 * np.sum(np.log(D)) # Sum terms loglike = -0.5 * (logdet_Sigma + ySigmay + Nx * np.log(2 * np.pi)) else: # Cast noise std. dev. to array std_meas = _cast_scalar_to_array(std_meas, Nx) # Inverse covariance in vector form d0, d1 = inv_cov_vec_1D(x, l_corr, std_model) # Assemble terms of Eqs 50 - 52 W = 1 / (np.ones(Nx) * std_meas**2) # Inverse noise vector yWy = np.sum(y**2 * (1 / std_meas**2)) # Obtained from Woodbury id GWG = y_model**2 * (1 / std_meas**2) Wyx = W * y_model * y d0_yWy = d0 + GWG # Factorize symmetric tridiagonal matrices D, L = chol_tridiag(d0, d1) Dw, Lw = chol_tridiag(d0_yWy, d1) # Cholesky solve X = cho_solve_symm_tridiag(Dw, Lw, Wyx) ySigmay = yWy - np.sum(Wyx * X) # Calculate determinants logdet_cov = -2 * np.sum(np.log(D)) # Logdet of the inverse covariance matrix logdet_C_yWy = 2 * np.sum(np.log(Dw)) # Logdet of the inverse cov + Y*W*Y logdet_W = np.sum(-np.log(W)) logdet_Sigma = logdet_C_yWy + logdet_W + logdet_cov # Sum terms loglike = -0.5 * (logdet_Sigma + ySigmay + Nx * np.log(2 * np.pi)) return loglike
[docs]def chol_loglike_2D( y: np.ndarray, Cx: Union[np.ndarray, List], Ct: List, std_meas: Union[int, float, np.ndarray], y_model: Optional[np.ndarray] = None, ) -> float: """ Efficient 2D loglikelihood using block Cholesky decomposition. Given the uncertainty parameters and vectors of the x and t positions on a lattice, calculates the loglikelihood of the 2D Gaussian process with multiplicative space and time uncertainties and i.i.d. noise. It is intended that this function is general and works with any combination of spatial and temporal covariance, with only the following restrictions: - Space and time are separable (Kronecker structure) - At least one generalized dimension (space or time) has only one subdimension and exponential correlation Notes: * The tridiagonal inverse of the correlation matrix for exponential correlation can be obtained using `utils.inv_cov_vec_1D`. * Having the spatial correlation matrix as an input would result in a full inversion performed for each call to this function which is potentially unecessary. This is solved by specifying the spatial correlation in terms of the inverse. Warnings: * Directly inverting the correlation matrix to obtain `Cx` in the general case is computationally expensive and numerically unstable. To avoid numerical issues it is suggested to add a small value to the diagonal (jitter) before inverting. Values smaller than 1e-6 are typically sufficient. Args: y: [Nx, Nt] Array of observations or residuals between measurements and model predictions. Cx: List of diagonal and off diagonal vectors of inverse Exponential correlation matrix or full inverse of spatial correlation matrix. Ct: List of iagonal and off-diagonal of inverse of Exponential temporal correlation matrix. std_meas: Std. dev. of the measurement uncertainty. y_model: [Nx, Nt] optional vector of model predictions in the case of multiplicative model prediction uncertainty. Returns: L: Loglikelihood. """ # Check if Cx and Ct are lists or numpy arrays. if isinstance(Cx, list): # TODO: Change dpttrf to chol_tridiag Nx = len(Cx[0]) Lx = chol_tridiag(Cx[0], Cx[1])[0] elif isinstance(Cx, np.ndarray): Nx = np.shape(Cx)[0] Lx = np.diag(dpotrf(Cx, lower=1, clean=1, overwrite_a=0)[0]) else: raise ValueError( "Cx must be a numpy array containing the correlation" "matrix, or a list of the diagonal and off-diagonal" "vectors of the inverse correlation matrix." ) if isinstance(Ct, list): Nt = len(Ct[0]) Lt = chol_tridiag(Ct[0], Ct[1])[0] else: raise ValueError( "Ct must be a list containing the diagonal and " "off-diagonal vectors of a tridiagonal matrix" ) if (Nx < 2) or (Nt < 2): raise ValueError( f"The number of points in each dimension must be > 2" f"but is Nx = {Nx} and Nt = {Nt}." ) # Cast noise to array std_meas = _cast_scalar_to_array(std_meas, (Nx, Nt)) # Set y_model to vector of ones if None is specified if y_model is None: y_model = np.ones((Nx, Nt)) L, C = symm_tri_block_chol(Cx, Ct, std_meas**2, y=y_model) # Get diagonal elements of L Ldiag = np.diagonal(L, axis1=1, axis2=2) # Vectors to be used later Winv_vec = 1 / std_meas**2 yWy = np.sum(y**2 * (1 / std_meas**2)) WGx = Winv_vec * y_model * y # Solve the linear system X = symm_tri_block_solve(L, C, WGx, Nx, Nt) # Vector product xSx = yWy - np.sum(WGx * X) # ======================================================================== # Logdeterminant # ======================================================================== # Logdet of noise matrix logdet_W = np.sum(2 * np.log(std_meas)) # Logdet of the full correlation matrix and Cholesky factors logdet_C = -2 * np.sum(np.log(Lx)) * Nt + -2 * np.sum(np.log(Lt)) * Nx logdet_chol = 2 * np.sum(np.log(Ldiag)) # Sum logdeterminants logdet_tot = logdet_W + logdet_C + logdet_chol return -0.5 * (logdet_tot + xSx + Nx * Nt * np.log(2 * np.pi))
[docs]def kron_loglike_2D_tridiag( y: np.ndarray, x: np.ndarray, t: np.ndarray, l_corr_x: Union[int, float], std_x: Union[int, float], l_corr_t: Union[int, float], std_t: Union[int, float], std_meas: Optional[Union[int, float]] = None, check_finite: Optional[bool] = False, jitter: Optional[Union[int, float]] = 1e-6, ) -> float: """ Efficient 2D loglikelihood using Kronecker properties. Given the uncertainty parameters and vectors of the x and t positions on a lattice, calculates the loglikelihood of the observations for exponential space and time correlation in the additive modeling uncertainty and i.i.d. Gaussian noise. Warnings: * This is special case of kron_loglike_2D where both space and time have exponential correlation. * This is superceded by kron_loglike_ND_tridiag and will be removed. Args: y: [Nx, Nt] Array of observations or residuals between measurements and model predictions. x: [Nx,] Vector of space coordinates. t: [Nt,] Vector of time coordinates. std_meas: Vector of measurement uncertainty std. dev. l_corr_x: Spatial correlation length. std_x: Std. dev. of model prediction uncertainty in space. l_corr_t: Temporal correlation length. std_t: Std. dev. of model prediction uncertainty in time. check_finite: Optional flag of scipy.linalg.eigh_tridiagonal. jitter: Optional small scalar noise value to ensure numerical stability in cases with no noise. Default = 1e-6. Returns: L: Loglikelihood. """ Nx = len(x) Nt = len(t) Cx_0, Cx_1 = inv_cov_vec_1D(x, l_corr_x, std_x) Ct_0, Ct_1 = inv_cov_vec_1D(t, l_corr_t, std_t) # Eigendecomposition using tridiagonality lambda_t, w_t = eigh_tridiagonal(Ct_0, Ct_1, check_finite=check_finite) lambda_x, w_x = eigh_tridiagonal(Cx_0, Cx_1, check_finite=check_finite) # Kronecker prod of eigenvalues. if std_meas is not None: if not isinstance(std_meas, (int, float)): raise ValueError(f"`std_meas` must be {int}, {float} or {None}.") # Kronecker prod of eigenvalues. C_xt = np.kron(1 / lambda_x, 1 / lambda_t) + std_meas**2 else: C_xt = np.kron(1 / lambda_x, 1 / lambda_t) + jitter # Determinant: C_xt is diagonal. We can therefore sum the log terms. # This is the determinant of the inverse. logdet_C_xt = np.sum(np.log(C_xt)) # Rotated data vector. Y = np.ravel(np.matmul(np.matmul(w_x.T, y), w_t)) # Loglikelihood return ( -Nx * Nt / 2 * np.log(2 * np.pi) - 0.5 * logdet_C_xt - 0.5 * np.sum(Y**2 * 1 / C_xt) )
def kron_loglike_2D( y: np.ndarray, Cx: Union[list, np.ndarray], Ct: Union[list, np.ndarray], std_meas: Optional[Union[int, float]] = None, check_finite: Optional[bool] = False, jitter: Optional[Union[int, float]] = 1e-6, ) -> float: """ Efficient loglikelihood for Exponential temporal correlation. Any combination of a single dimension with Markovian covariance and N nonseparable dimensions with non-Markovian covariance can be evaluated using this function. Notes: * The tridiagonal inverse of the correlation matrix for exponential correlation can be obtained using `utils.inv_cov_vec_1D`. Args: y: [Nx, Nt] Array of observations or residuals between measurements and model predictions. std_meas: Std. dev. of measurement uncertainty. Cx: Correlation matrix of the model prediction uncertainty in space, or list with diagonal and off-diagonal of tridiagonal inverse. Ct: Correlation matrix of the model prediction uncertainty in time, or list with diagonal and off-diagonal of tridiagonal inverse. check_finite: Optional flag of scipy.linalg.eigh_tridiagonal. jitter: Optional small scalar noise value to ensure numerical stability in cases with no noise. Default = 1e-6. Returns: L: Loglikelihood. """ # Check if the supplied Cx and Ct are correlation matrices or lists of # the diagonal and off-diagonal vectors of a tridiagonal inverse correlation # matrix. Apply the corresponding eigendecomposition. if isinstance(Cx, list): Nx = len(Cx[0]) lambda_x, w_x = eigh_tridiagonal(Cx[0], Cx[1], check_finite=check_finite) lambda_x = 1 / lambda_x elif isinstance(Cx, np.ndarray): Nx = np.shape(Cx)[0] lambda_x, w_x = eigh(Cx, check_finite=check_finite) else: raise ValueError( "Cx must be a numpy array containing the correlation" "matrix, or a list of the diagonal and off-diagonal" "vectors of the inverse correlation matrix." ) if isinstance(Ct, list): Nt = len(Ct[0]) lambda_t, w_t = eigh_tridiagonal(Ct[0], Ct[1], check_finite=check_finite) lambda_t = 1 / lambda_t elif isinstance(Ct, np.ndarray): Nt = np.shape(Ct)[0] lambda_t, w_t = eigh(Ct, check_finite=check_finite) else: raise ValueError( "Ct must be a numpy array containing the correlation" "matrix, or a list of the diagonal and off-diagonal" "vectors of the inverse correlation matrix." ) if std_meas is not None: if not isinstance(std_meas, (int, float)): raise ValueError(f"`std_meas` must be {int}, {float} or {None}.") # Kronecker prod of eigenvalues. C_xt = np.kron(lambda_x, lambda_t) + std_meas**2 else: C_xt = np.kron(lambda_x, lambda_t) + jitter # Determinant: C_xt is diagonal. We can therefore sum the log terms. This is the # determinant of the inverse. logdet_C_xt = np.sum(np.log(C_xt)) # Rotated data vector. Y = np.ravel(np.matmul(np.matmul(w_x.T, y), w_t)) # Loglikelihood return ( -Nx * Nt / 2 * np.log(2 * np.pi) - 0.5 * logdet_C_xt - 0.5 * np.sum(Y**2 * 1 / C_xt) ) def _kron_loglike_ND_tridiag( y: np.ndarray, x: List, std_model: Union[List, np.ndarray], l_corr_d: Union[List, np.ndarray], std_meas: Optional[Union[int, float]] = None, check_finite: Optional[bool] = False, jitter: Optional[Union[int, float]] = 1e-6, ) -> float: """ Args: y: [N1, N2, ..., Nd] Vector of observations or residuals between measurements and model predictions. x: List of `d` lists, each containing the coordinate vector of a dim. std_meas: Std. dev. of the measurement uncertainty. std_model: List of vectors of the model uncertainty std. dev. per dim. l_corr_d: Correlation lengths for each dimension. check_finite: Optional flag of scipy.linalg.eigh_tridiagonal. jitter: Optional small scalar noise value to ensure numerical stability in cases with no noise. Default = 1e-6. Returns: References: [1] Efficient inference in matrix-variate Gaussian models with i.i.d. observation noise. Stegle et al. (2011) [2] E Gilboa, Y Saatçi, JP Cunningham - Scaling Multidimensional Inference for Structured Gaussian Processes, Pattern Analysis and Machine Intelligence, IEEE, 2015 """ raise NotImplementedError("Disabled until error is fixed") # Initialize arrays Nd = [] lambda_d = [] w_d = [] C = 1 # Loop over dimensions for i, d in enumerate(x): Nd.append(len(d)) # Tridiagonal inverse covariance matrix and eigendecomposition d0, d1 = inv_cov_vec_1D(d, l_corr_d[i], std_model[i]) lambda_i, w_i = eigh_tridiagonal(d0, d1, check_finite=check_finite) # Kronecker product of eigenvalues C = np.kron(1 / lambda_i, C) # Append to list lambda_d.append(lambda_i) w_d.append(w_i) # Add noise and calculate determinant if std_meas is not None: if not isinstance(std_meas, (int, float)): raise ValueError(f"`std_meas` must be {int}, {float} or {None}.") C += std_meas**2 else: C += jitter**2 logdet_C = np.sum(np.log(C)) # Kronecker mvm. Note that eigenvec(A) = eigenvec(A^-1) a = kron_op(w_d, y, transA=True) a = a * 1 / C a = kron_op(w_d, a) ySy = np.sum(y * a) # Loglikelihood return -0.5 * np.prod(Nd) * np.log(2 * np.pi) - 0.5 * logdet_C - 0.5 * ySy def _loglike_normal(y, std): """ Args: y: std: Returns: """ return -0.5 * (y / std) ** 2 - np.log(std * np.sqrt(2 * np.pi)) def _loglike_multivariate_normal(y, cov): """ Calculate the multivariate normal loglikelihood Args: y: cov: Returns: """ N = y.size # Lower Cholesky factor L = np.linalg.cholesky(cov) # Solve factorized system x, info = dpotrs(L, y, lower=True, overwrite_b=False) ySy = np.sum(y * x) # Logdeterminant logdet = np.sum(2 * np.log(np.diag(L))) return -0.5 * (logdet + ySy + N * np.log(2 * np.pi))
[docs]def log_likelihood_linear_normal( y: np.ndarray, K: Optional[Union[int, float, np.ndarray]], std_meas: Optional[Union[int, float, np.ndarray]] = None, y_model: Optional[np.ndarray] = None, jitter: Optional[Union[int, float]] = 1e-6, ) -> np.ndarray: """ Gaussian log-likelihood function for the following models: ``X_model = K1 * f(theta) + E`` ``X_model = K2 + E`` where * ``K1`` ~ MVN(mean=1, covariance=k_cov_mx) * ``K2`` ~ MVN(mean) * ``E`` ~ MVN(mean=0, covariance=e_cov_mx) * ``f`` is a physical model function parametrized by theta Notes: * If K is None, the loglikelihood is calculated as the sum of N independent normally distributed random variables TODO: * The logic in this function works but can probably be improved. Args: y: [N_1, N_2, ... Ni, ... N_d] Array of observations or residuals between measurements and model predictions. K: covariance matrix of `K`, shape [prod(Ni), prod(Ni)]. std_meas: [shape(y)] Std. dev. of the measurement uncertainty with y_model: [shape(y)] optional vector of model predictions in the case of multiplicative model prediction uncertainty. jitter: Optional small scalar noise value to ensure numerical stability in cases with no noise. Default = 1e-6. Returns: log_like: value of the log-likelihood evaluated at theta. """ # Pre-process y = y.ravel() N = y.size # Check that model uncertainty covariance matrix is supplied if `y_model` # is not None. Scale model uncertainty covariance by model prediction. if y_model is not None: if K is not None: y_model_diag = np.diag(y_model.ravel()) K = np.matmul(y_model_diag, np.matmul(K, y_model_diag)) else: raise ValueError( "Model uncertainty scaling vector is provided but `K` is None" ) # If K is not supplied, then the loglikelihood can be evaluated as the # sum of loglikelihoods of independent normally distributed random vars if K is None: if std_meas is not None: std_meas = _cast_scalar_to_array(std_meas, N).ravel() return _loglike_normal(y, std_meas) else: raise ValueError("At least one of `K`, `std_meas` expected to be not None") # Multivariate normal case else: if std_meas is not None: std_meas = _cast_scalar_to_array(std_meas, N).ravel() return _loglike_multivariate_normal(y, K + np.diag(std_meas**2)) else: return _loglike_multivariate_normal(y, K + np.ones(N) * jitter)