from math import exp
from numpy import asarray, atleast_2d, dot, log, maximum, ndarray, zeros
from numpy.linalg import multi_dot, slogdet
from optimix import Function, Scalar
from .._util import (
SVD,
cached_property,
economic_qs_zeros,
log2pi,
nice_inv,
numbers,
rsolve,
)
from ._b import B
from ._lmm_scan import FastScanner
[docs]class LMM(Function):
"""
Fast Linear Mixed Models inference via maximum likelihood.
Examples
--------
.. doctest::
>>> from numpy import array
>>> from numpy_sugar.linalg import economic_qs_linear
>>> from glimix_core.lmm import LMM
>>>
>>> X = array([[1, 2], [3, -1]], float)
>>> QS = economic_qs_linear(X)
>>> covariates = array([[1], [1]])
>>> y = array([-1, 2], float)
>>> lmm = LMM(y, covariates, QS)
>>> lmm.fit(verbose=False)
>>> print('%.3f' % lmm.lml())
-3.649
One can also specify which parameters should be fitted:
.. doctest::
>>> from numpy import array
>>> from numpy_sugar.linalg import economic_qs_linear
>>> from glimix_core.lmm import LMM
>>>
>>> X = array([[1, 2], [3, -1]], float)
>>> QS = economic_qs_linear(X)
>>> covariates = array([[1], [1]])
>>> y = array([-1, 2], float)
>>> lmm = LMM(y, covariates, QS)
>>> lmm.fix('delta')
>>> lmm.fix('scale')
>>> lmm.delta = 0.5
>>> lmm.scale = 1
>>> lmm.fit(verbose=False)
>>> print('%.3f' % lmm.lml())
-3.832
>>> lmm.unfix('delta')
>>> lmm.fit(verbose=False)
>>> print('%.3f' % lmm.lml())
-3.713
Notes
-----
The LMM model can be equivalently written as ::
๐ฒ โผ ๐(๐๐ท, ๐ ((1-๐ฟ)๐บ + ๐ฟ๐ธ)),
and we thus have vโ = s (1 - ๐ฟ) and vโ = s ๐ฟ. Consider the economic
eigendecomposition of ๐บ::
๐บ = [๐โ ๐โ] [๐โ ๐] [๐โแต]
[ ๐ ๐] [๐โแต]
and let
๐ณ = [(1-๐ฟ)๐โ + ๐ฟ๐ธโ ๐ ]
[ ๐ ๐ฟ๐ธโ].
In order to eliminate the need of ๐โ, note that ๐๐แต = ๐ธ implies that ::
๐โ๐โแต = ๐ธ - ๐โ๐โแต.
We will need to solve ((1-๐ฟ)๐บ + ๐ฟ๐ธ)๐ฑ = ๐ฎ for ๐ฑ. Let ๐ณโ = ((1-๐ฟ)๐โ + ๐ฟ๐ธโ) and let us
define ::
๐ฑ = ๐โ๐ณโโปยน๐โแต if ๐ฟ=0, and
๐ฑ = ๐โ๐ณโโปยน๐โแต + ๐ฟโปยน(๐ธ - ๐โ๐โแต) if ๐ฟ>0.
We therefore have ::
๐ฑ = ๐ฑ๐ฎ.
"""
[docs] def __init__(self, y, X, QS=None, restricted=False):
"""
Constructor.
Parameters
----------
y : array_like
Outcome.
X : array_like
Covariates as a two-dimensional array.
QS : tuple
Economic eigendecompositon in form of ``((Q0, ), S0)`` of a
covariance matrix ``K``.
restricted : bool
``True`` for restricted maximum likelihood optimization; ``False``
otherwise. Defaults to ``False``.
"""
from numpy_sugar import is_all_finite
logistic = Scalar(0.0)
logistic.listen(self._delta_update)
logistic.bounds = (-numbers.logmax, +numbers.logmax)
Function.__init__(self, "LMM", logistic=logistic)
self._logistic = logistic
y = asarray(y, float).ravel()
if not is_all_finite(y):
raise ValueError("There are non-finite values in the outcome.")
if len(y) == 0:
raise ValueError("The outcome array is empty.")
X = atleast_2d(asarray(X, float).T).T
if not is_all_finite(X):
raise ValueError("There are non-finite values in the covariates matrix.")
self._optimal = {"beta": False, "scale": False}
if QS is None:
QS = economic_qs_zeros(len(y))
self._B = B(QS[0][0], QS[1], 0.0, 1.0)
self.delta = 1.0
logistic.fix()
else:
self._B = B(QS[0][0], QS[1], 0.5, 0.5)
self.delta = 0.5
if QS[0][0].shape[0] != len(y):
msg = "Sample size differs between outcome and covariance decomposition."
raise ValueError(msg)
if y.shape[0] != X.shape[0]:
msg = "Sample size differs between outcome and covariates."
raise ValueError(msg)
self._y = y
self._Q0 = QS[0][0]
self._S0 = QS[1]
self._Xsvd = SVD(X)
self._tbeta = zeros(self._Xsvd.rank)
self._scale = 1.0
self._fix = {"beta": False, "scale": False}
self._restricted = restricted
@property
def beta(self) -> ndarray:
"""
Fixed-effect sizes.
Returns
-------
effect-sizes
Optimal fixed-effect sizes.
Notes
-----
Setting the derivative of log(p(๐ฒ)) over effect sizes equal
to zero leads to solutions ๐ท from equation ::
๐แต๐ฑ๐๐ท = ๐แต๐ฑ๐ฒ
"""
return rsolve(self._Xsvd.Vt, rsolve(self._Xsvd.US, self.mean()))
@beta.setter
def beta(self, beta):
beta = asarray(beta, float).ravel()
self._tbeta[:] = self._Xsvd.Vt @ beta
self._optimal["beta"] = False
self._optimal["scale"] = False
@property
def beta_covariance(self):
"""
Estimates the covariance-matrix of the optimal beta.
Returns
-------
beta-covariance : ndarray
๐ (๐แต๐ฑ๐)โปยน.
References
----------
.. Rencher, A. C., & Schaalje, G. B. (2008). Linear models in statistics. John
Wiley & Sons.
"""
A = nice_inv(self._tXTBtX)
VT = self._Xsvd.Vt
H = rsolve(VT, A)
return rsolve(VT, H.T) * self.scale
def fix(self, param: str):
"""
Disable parameter optimization.
Parameters
----------
param
Possible values are ``"delta"``, ``"beta"``, and ``"scale"``.
"""
if param == "delta":
super()._fix("logistic")
else:
self._fix[param] = True
def unfix(self, param: str):
"""
Enable parameter optimization.
Parameters
----------
param
Possible values are ``"delta"``, ``"beta"``, and ``"scale"``.
"""
if param == "delta":
self._unfix("logistic")
else:
self._fix[param] = False
@property
def v0(self):
"""
First variance.
Returns
-------
v0 : float
s(1 - ๐ฟ).
"""
return self.scale * (1 - self.delta)
@property
def v1(self) -> float:
"""
Second variance.
Returns
-------
v1
s๐ฟ.
"""
return self.scale * self.delta
def fit(self, verbose=True):
"""
Maximise the marginal likelihood.
Parameters
----------
verbose : bool, optional
``True`` for progress output; ``False`` otherwise.
Defaults to ``True``.
"""
if not self._isfixed("logistic"):
self._maximize_scalar(desc="LMM", rtol=1e-6, atol=1e-6, verbose=verbose)
if not self._fix["beta"]:
self._update_beta()
if not self._fix["scale"]:
self._update_scale()
def get_fast_scanner(self) -> FastScanner:
"""
Return :class:`.FastScanner` for association scan.
Returns
-------
fast-scanner
Instance of a class designed to perform very fast association scan.
"""
v0 = self.v0
v1 = self.v1
QS = ((self._Q0,), v0 * self._S0)
return FastScanner(self._y, self.X, QS, v1)
def value(self) -> float:
"""
Internal use only.
"""
if not self._fix["beta"]:
self._update_beta()
if not self._fix["scale"]:
self._update_scale()
return self.lml()
def gradient(self):
"""
Not implemented.
"""
raise NotImplementedError
@property
def nsamples(self) -> int:
"""
Number of samples.
"""
return len(self._y)
@property
def ncovariates(self) -> int:
"""
Number of covariates.
"""
return self._Xsvd.A.shape[1]
def lml(self) -> float:
"""
Log of the marginal likelihood.
Returns
-------
lml
Log of the marginal likelihood.
Notes
-----
The log of the marginal likelihood is given by ::
2โ
log(p(๐ฒ)) = -nโ
log(2ฯ) - nโ
log(s) - log|๐ณ|
- (๐โแต๐ฒ)แต(s๐ณโ)โปยน(๐โแต๐ฒ) - (๐ฒ)แต(s๐ฟ)โปยน(๐ฒ) + (๐โแต๐ฒ)แต(s๐ฟ)โปยน(๐โแต๐ฒ)
- (๐โแต๐๐ท)แต(s๐ณโ)โปยน(๐โแต๐๐ท) - (๐๐ท)แต(s๐ฟ)โปยน(๐๐ท) + (๐โแต๐๐ท)แต(s๐ฟ)โปยน(๐โแต๐๐ท)
+ 2(๐โแต๐ฒ)แต(s๐ณโ)โปยน(๐๐ท) + 2(๐ฒ)แต(s๐ฟ)โปยน(๐๐ท) - 2(๐โแต๐ฒ)แต(s๐ฟ)โปยน(๐โแต๐๐ท)
By using the optimal ๐ท, the log of the marginal likelihood can be rewritten
as::
2โ
log(p(๐ฒ)) = -nโ
log(2ฯ) - nโ
log(s) - log|๐ณ| + (๐โแต๐ฒ)แต(s๐ณโ)โปยน๐โแต(๐๐ท - ๐ฒ)
+ (๐ฒ)แต(s๐ฟ)โปยน(๐๐ท - ๐ฒ) - (๐โแต๐ฒ)แต(s๐ฟ)โปยน๐โแต(๐๐ท - ๐ฒ).
In the extreme case where ๐ท is such that ๐ฒ = ๐๐ท, the maximum is attained as
sโ0.
For optimals ๐ท and s, the log of the marginal likelihood can be further
simplified to ::
2โ
log(p(๐ฒ; ๐ท, s)) = -nโ
log(2ฯ) - nโ
log s - log|๐ณ| - n.
"""
reml = (self._logdetXX - self._logdetH()) / 2
if self._optimal["scale"]:
lml = self._lml_optimal_scale()
else:
lml = self._lml_arbitrary_scale()
return lml + reml
@property
def X(self):
"""
Covariates matrix.
Returns
-------
X : ndarray
Covariates.
"""
return self._Xsvd.A
@property
def delta(self) -> float:
"""
Variance ratio between ``K`` and ``I``.
"""
from numpy_sugar import epsilon
v = float(self._logistic.value)
if v > 0.0:
v = 1 / (1 + exp(-v))
else:
v = exp(v)
v = v / (v + 1.0)
return min(max(v, epsilon.tiny), 1 - epsilon.tiny)
@delta.setter
def delta(self, delta):
from numpy_sugar import epsilon
delta = min(max(delta, epsilon.tiny), 1 - epsilon.tiny)
self._logistic.value = log(delta / (1 - delta))
self._optimal["beta"] = False
self._optimal["scale"] = False
@property
def scale(self) -> float:
"""
Scaling factor.
Returns
-------
scale : float
Scaling factor.
Notes
-----
Setting the derivative of log(p(๐ฒ; ๐ท)), for which ๐ท is optimal, over
scale equal to zero leads to the maximum ::
s = nโปยน(Qแต๐ฒ)แตDโปยน Qแต(๐ฒ-๐๐ท).
In the case of restricted marginal likelihood ::
s = (n-c)โปยน(Qแต๐ฒ)แตDโปยน Qแต(๐ฒ-๐๐ท),
where s is the number of covariates.
"""
return self._scale
@scale.setter
def scale(self, scale):
self._scale = scale
self._optimal["scale"] = False
def mean(self):
"""
Mean of the prior.
Formally, ๐ฆ = ๐๐ท.
Returns
-------
mean : ndarray
Mean of the prior.
"""
return self._Xsvd.US @ self._tbeta
def covariance(self):
"""
Covariance of the prior.
Returns
-------
covariance : ndarray
vโ๐บ + vโ๐ธ.
"""
from numpy_sugar.linalg import ddot, sum2diag
Q0 = self._Q0
S0 = self._S0
return sum2diag(dot(ddot(Q0, self.v0 * S0), Q0.T), self.v1)
def _delta_update(self):
self._optimal["beta"] = False
self._optimal["scale"] = False
delta = self.delta
self._B.set_variances(1 - delta, delta)
@cached_property
def _logdetXX(self):
"""
log(๏ฝXแตX๏ฝ).
"""
if not self._restricted:
return 0.0
ldet = slogdet(self._Xsvd.US.T @ self._Xsvd.US)
if ldet[0] != 1.0:
raise ValueError("The determinant of XแตX should be positive.")
return ldet[1]
def _logdetH(self):
"""
log(๏ฝH๏ฝ) for H = sโปยนXแตQDโปยนQแตX.
"""
if not self._restricted:
return 0.0
ldet = slogdet(self._tXTBtX / self.scale)
if ldet[0] != 1.0:
raise ValueError("The determinant of H should be positive.")
return ldet[1]
def _lml_optimal_scale(self):
"""
Log of the marginal likelihood for optimal scale.
Implementation for unrestricted LML::
Returns
-------
lml : float
Log of the marginal likelihood.
"""
assert self._optimal["scale"]
n = len(self._y)
lml = -self._df * log2pi - self._df - n * log(self.scale)
lml -= self._logdetD
return lml / 2
def _lml_arbitrary_scale(self):
"""
Log of the marginal likelihood for arbitrary scale.
Returns
-------
lml : float
Log of the marginal likelihood.
"""
s = self.scale
n = len(self._y)
lml = -self._df * log2pi - n * log(s)
lml -= self._logdetD
my = self.mean() - self._y
lml -= my.T @ self._B.dot(my) / s
return lml / 2
@property
def _df(self):
"""
Degrees of freedom.
"""
if not self._restricted:
return self.nsamples
return self.nsamples - self._Xsvd.rank
def _optimal_scale_using_optimal_beta(self):
from numpy_sugar import epsilon
assert self._optimal["beta"]
s = self._yTBy - self._yTBtX @ self._tbeta
return maximum(s / self._df, epsilon.small)
def _update_beta(self):
assert not self._fix["beta"]
if self._optimal["beta"]:
return
self._tbeta[:] = rsolve(self._tXTBtX, self._yTBtX)
self._optimal["beta"] = True
self._optimal["scale"] = False
def _update_scale(self):
from numpy_sugar import epsilon
if self._optimal["beta"]:
self._scale = self._optimal_scale_using_optimal_beta()
else:
p0 = self._yTBy - 2 * self._yTBtX @ self._tbeta
p1 = multi_dot((self._tbeta, self._tXTBtX, self._tbeta))
self._scale = maximum((p0 + p1) / self._df, epsilon.small)
self._optimal["scale"] = True
@property
def _logdetD(self):
v = 0.0
d = self.delta
rank = self._S0.size
if rank > 0:
v += log((1 - d) * self._S0 + d).sum()
rankdef = self._y.shape[0] - rank
v += rankdef * log(d)
return v
@property
def _yTBy(self):
return self._y.T @ self._B.dot(self._y)
@property
def _yTBtX(self):
return self._y.T @ self._B.dot(self._Xsvd.US)
@property
def _tXTBtX(self):
return self._Xsvd.US.T @ self._B.dot(self._Xsvd.US)