# -*- coding: utf-8 -*-
## Copyright(c) 2021 Yoann Robin
##
## This file is part of SBCK.
##
## SBCK is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## SBCK is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with SBCK. If not, see <https://www.gnu.org/licenses/>.
###############
## Libraries ##
###############
import numpy as np
import scipy.stats as sc
import scipy.interpolate as sci
#############
## Classes ##
#############
[docs]
class MonotoneInverse: ##{{{
[docs]
def __init__(self, xminmax, yminmax, transform): ##{{{
self.xmin = xminmax[0]
self.xmax = xminmax[1]
self.ymin = yminmax[0]
self.ymax = yminmax[1]
delta = 0.05 * (self.xmax - self.xmin)
nstepmin, nstepmax = 0, 0
while transform(self.xmin) > self.ymin:
self.xmin -= delta
nstepmin += 1
while transform(self.xmax) < self.ymax:
self.xmax += delta
nstepmax += 1
self.nstep = 100 + max(nstepmin, nstepmax)
x = np.linspace(self.xmin, self.xmax, self.nstep)
y = transform(x)
self._inverse = sci.interp1d(y, x)
##}}}
def __call__(self, y): ##{{{
return self._inverse(y)
##}}}
##}}}
# class rv_histogram(sc.rv_histogram):##{{{
# """
# SBCK.tools.rv_histogram
# =======================
# Wrapper on scipy.stats.rv_histogram adding a fit method.
# """
# def __init__( self , *args , **kwargs ):##{{{
# sc.rv_histogram.__init__( self , *args , **kwargs )
# ##}}}
#
# def fit( X , bins = 100 ):##{{{
# return (np.histogram( X , bins = bins ),)
# ##}}}
#
###}}}
[docs]
class rv_histogram: ##{{{
"""
SBCK.tools.rv_histogram
=======================
Empirical histogram class. The difference with scipy.stats.rv_histogram
is the way to infer the cdf and the icdf. Here:
>>> X ## Input
>>> Xs = np.sort(X)
>>> Xr = sc.rankdata(Xs,method="max")
>>> p = np.unique(Xr) / X.size
>>> q = Xs[np.unique(Xr)-1]
>>> p[0] = 0
>>>
>>> icdf = scipy.interpolate.interp1d( p , q )
>>> cdf = scipy.interpolate.interp1d( q , p )
"""
[docs]
def __init__(self, cdf=None, icdf=None, pdf=None, *args, X=None, **kwargs):
self._cdf = None
self._icdf = None
self._pdf = None
if cdf is not None and icdf is not None and pdf is not None:
self._cdf = cdf
self._icdf = icdf
self._pdf = pdf
elif X is not None:
cdf, icdf, pdf = rv_histogram.fit(X)
self._cdf = cdf
self._icdf = icdf
self._pdf = pdf
[docs]
def fit(X, *args, **kwargs):
Xs = np.sort(X.squeeze())
Xr = sc.rankdata(Xs, method="max")
p = np.unique(Xr) / X.size
q = Xs[np.unique(Xr) - 1]
p[0] = 0
# p = np.hstack( (0,p) )
# q = np.hstack( (X.min(),q) )
# if q[0] == q[1]:
# eps = np.sqrt(np.finfo(float).resolution)
# q[1] = (1-eps) * q[0] + eps * q[2]
icdf = sci.interp1d(p, q, bounds_error=False, fill_value=(q[0], q[-1]))
cdf = sci.interp1d(q, p, bounds_error=False, fill_value=(0, 1))
h, c = np.histogram(X, int(0.1 * X.size), density=True)
c = (c[1:] + c[:-1]) / 2
pdf = sci.interp1d(c, h, bounds_error=False, fill_value=(0, 0))
return (cdf, icdf, pdf)
[docs]
def rvs(self, size):
return self._icdf(np.random.uniform(size=size))
[docs]
def cdf(self, q):
return self._cdf(q)
[docs]
def icdf(self, p):
return self._icdf(p)
[docs]
def sf(self, q):
return 1 - self._cdf(q)
[docs]
def isf(self, p):
return self._icdf(1 - p)
[docs]
def ppf(self, p):
return self.icdf(p)
[docs]
def pdf(self, x):
return self._pdf(x)
##}}}
class _Dist:
def __init__(self, dist, kwargs):
self.dist = dist if dist is not None else rv_histogram
self.kwargs = kwargs if kwargs is not None else {}
self.law = []
def set_features(self, n_features):
if type(self.dist) is not list:
self.dist = [self.dist for _ in range(n_features)]
def is_frozen(self, i):
return isinstance(self.dist[i], sc._distn_infrastructure.rv_frozen)
def is_parametric(self, i):
ispar = self.is_frozen(i)
if len(self.law) >= i:
ispar = ispar or isinstance(self.law[i], sc._distn_infrastructure.rv_frozen)
return ispar
def fit(self, X, i):
if self.is_frozen(i):
self.law.append(self.dist[i])
else:
self.law.append(self.dist[i](*self.dist[i].fit(X.squeeze()), **self.kwargs))
[docs]
class QM:
"""
SBCK.QM
=======
Description
-----------
Quantile Mapping bias corrector, see e.g. [1,2,3]. The implementation proposed here is generic, and can use
scipy.stats to fit a parametric distribution, or can use a frozen distribution.
Example
-------
```
## Start with a reference / biased dataset, noted Y,X, from normal distribution:
X = np.random.normal( loc = 0 , scale = 2 , size = 1000 )
Y = np.random.normal( loc = 5 , scale = 0.5 , size = 1000 )
## Generally, we do not know the distribution of X and Y, and we use the empirical quantile mapping:
qm_empiric = QM( distY0 = SBCK.tools.rv_histogram , distX0 = SBCK.tools.rv_histogram ) ## = QM(), default
qm_empiric.fit(Y,X)
Z_empiric = qm_empiric.predict(X) ## Z is the correction in a non parametric way
## But we can know that X and Y follow a Normal distribution, without knowing the parameters:
qm_normal = QM( distY0 = scipy.stats.norm , distX0 = scipy.stats.norm )
qm_normal.fit(Y,X)
Z_normal = qm_normal.predict(X)
## And finally, we can know the law of Y, and it is usefull to freeze the distribution:
qm_freeze = QM( distY0 = scipy.stats.norm( loc = 5 , scale = 0.5 ) , distX0 = scipy.stats.norm )
qm_freeze.fit(Y,X) ## = qm_freeze.fit(None,X) because Y is not used
Z_freeze = qm_freeze.predict(X)
```
References
----------
[1] Panofsky, H. A. and Brier, G. W.: Some applications of statistics to meteorology, Mineral Industries Extension Services, College of Mineral Industries, Pennsylvania State University, 103 pp., 1958.
[2] Wood, A. W., Leung, L. R., Sridhar, V., and Lettenmaier, D. P.: Hydrologic Implications of Dynamical and Statistical Approaches to Downscaling Climate Model Outputs, Clim. Change, 62, 189–216, https://doi.org/10.1023/B:CLIM.0000013685.99609.9e, 2004.
[3] Déqué, M.: Frequency of precipitation and temperature extremes over France in an anthropogenic scenario: Model results and statistical correction according to observed values, Global Planet. Change, 57, 16–26, https://doi.org/10.1016/j.gloplacha.2006.11.030, 2007.
"""
[docs]
def __init__(self, **kwargs): ##{{{
"""
Initialisation of Quantile Mapping bias corrector. All arguments must be named.
Parameters
----------
distY0 : A statistical distribution from scipy.stats or SBCK.tools.rv_*
The distribution of references.
distX0 : A statistical distribution from scipy.stats or SBCK.tools.rv_*
The distribution of biased dataset.
kwargsY0 : dict
Arguments passed to distY0
kwargsX0 : dict
Arguments passed to distX0
n_features: None or integer
Numbers of features, optional because it is determined during fit if X0 and Y0 are not None.
tol : float
Numerical tolerance, default 1e-3
"""
self.n_features = kwargs.get("n_features")
self._tol = kwargs.get("tol") if kwargs.get("tol") is not None else 1e-3
self._distY0 = _Dist(dist=kwargs.get("distY0"), kwargs=kwargs.get("kwargsY0"))
self._distX0 = _Dist(dist=kwargs.get("distX0"), kwargs=kwargs.get("kwargsX0"))
##}}}
[docs]
def fit(self, Y0, X0): ##{{{
"""
Fit the QM model
Parameters
----------
Y0 : np.array[ shape = (n_samples,n_features) ]
Reference dataset
X0 : np.array[ shape = (n_samples,n_features) ]
Biased dataset
"""
## Reshape data in form [n_samples,n_features]
if Y0 is not None and Y0.ndim == 1:
Y0 = Y0.reshape(-1, 1)
if X0 is not None and X0.ndim == 1:
X0 = X0.reshape(-1, 1)
if self.n_features is None:
if Y0 is None and X0 is None:
print("n_features must be set during initialization if Y0 = X0 = None")
elif Y0 is not None:
self.n_features = Y0.shape[1]
else:
self.n_features = X0.shape[1]
##
self._distY0.set_features(self.n_features)
self._distX0.set_features(self.n_features)
## Fit
for i in range(self.n_features):
if Y0 is not None:
self._distY0.fit(Y0[:, i], i)
else:
self._distY0.fit(None, i)
if X0 is not None:
self._distX0.fit(X0[:, i], i)
else:
self._distX0.fit(None, i)
##}}}
[docs]
def predict(self, X0): ##{{{
"""
Perform the bias correction
Parameters
----------
X0 : np.array[ shape = (n_samples,n_features) ]
Array of values to be corrected
Returns
-------
Z0 : np.array[ shape = (n_samples,n_features) ]
Return an array of correction
"""
if X0.ndim == 1:
X0 = X0.reshape(-1, 1)
Z0 = np.zeros_like(X0)
for i in range(self.n_features):
cdf = self._distX0.law[i].cdf(X0[:, i])
cdf[np.logical_not(cdf < 1)] = 1 - self._tol
cdf[np.logical_not(cdf > 0)] = self._tol
Z0[:, i] = self._distY0.law[i].ppf(cdf)
return Z0
##}}}