Page Menu
Home
HEPForge
Search
Configure Global Search
Log In
Files
F8310100
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
34 KB
Subscribers
None
View Options
Index: trunk/npstat/swig/npstat_density_estimation.py
===================================================================
--- trunk/npstat/swig/npstat_density_estimation.py (revision 812)
+++ trunk/npstat/swig/npstat_density_estimation.py (revision 813)
@@ -1,816 +1,831 @@
"""
This module provides a number of useful wrapper classes and functions
related to density estimation
"""
__author__ = "Igor Volobouev (i.volobouev@ttu.edu)"
-__version__ = "1.1"
-__date__ = "Sep 20 2019"
+__version__ = "1.2"
+__date__ = "Aug 10 2022"
import math
import npstat
import numpy as np
from npstat_utils import histoBinType, ArrayND, arrayNDFromNumpy
def normalBandwidthFactor(dim, filterDegree, sampleSize):
"""
This function returns a factor by which the estimate of the sample
covariance matrix should be multiplied in order to produce the
plug-in covariance matrix of the Gaussian kernel. Take a square
root of this if you need a factor for the standard deviation rather
than covariance.
"""
assert filterDegree == 0, "Non-0 filter degrees not implemented yet"
return math.pow(4.0/(dim + 2.0), 2.0/(dim + 4.0))*\
math.pow(sampleSize, -2.0/(dim + 4.0))
def correctDensityEstimateGHU(arr, binSize):
out = np.empty(np.shape(arr), np.double)
npstat.correctDensityEstimateGHUOrig(arr, out, binSize)
return out
def scanSymmetricDensityAsWeight(kernel, bandwidthSet, prototypeHisto,
partialScan=True):
binWidths = prototypeHisto.binWidths()
return npstat.scanSymmetricDensityAsWeight2(
kernel, prototypeHisto.shape(),
bandwidthSet, binWidths, partialScan)
def scanMultivariateDensityAsWeight(kernel, bandwidthSet, prototypeHisto):
binWidths = prototypeHisto.binWidths()
newshape = list()
for n in prototypeHisto.shape():
if n % 2:
newshape.append(n)
else:
newshape.append(n+1)
return npstat.scanMultivariateDensityAsWeight2(
kernel, newshape, bandwidthSet, binWidths)
def histoPluginBandwidth1D(h, sampleSize, deg, symbetaPower):
if h.dim() != 1:
raise ValueError("Expected 1-d histogram as an argument")
assert deg >= 0.0, "Filter degree can not be negative"
if sampleSize <= 0.0:
sampleSize = h.nFillsInRange()
qvalues = (0.15865525393145705, 0.84134474606854295)
quantiles = npstat.histoQuantiles(h, 0, qvalues)
sigmaEstimate = (quantiles[1] - quantiles[0])/2.0
pluginBw = npstat.approxAmisePluginBwGauss(deg, sampleSize, sigmaEstimate)
if symbetaPower >= 0.0:
pluginBw *= npstat.approxSymbetaBandwidthRatio(symbetaPower, deg);
return pluginBw
def BandwidthCVLeastSquares1D(typename, *args):
"""
Least squares cross-validation helper class for 1-d datasets
"""
funcs = {'unsigned char' : npstat.UCharBandwidthCVLeastSquares1D,
'int' : npstat.IntBandwidthCVLeastSquares1D,
'long' : npstat.LongBandwidthCVLeastSquares1D,
'float' : npstat.FloatBandwidthCVLeastSquares1D,
'double' : npstat.DoubleBandwidthCVLeastSquares1D}
return funcs[typename](*args)
def BandwidthCVPseudoLogli1D(typename, *args):
"""
Pseudo-likelihood cross-validation helper class for 1-d datasets
"""
funcs = {'unsigned char' : npstat.UCharBandwidthCVPseudoLogli1D,
'int' : npstat.IntBandwidthCVPseudoLogli1D,
'long' : npstat.LongBandwidthCVPseudoLogli1D,
'float' : npstat.FloatBandwidthCVPseudoLogli1D,
'double' : npstat.DoubleBandwidthCVPseudoLogli1D}
return funcs[typename](*args)
def BandwidthCVLeastSquaresND(typename, *args):
"""
Least squares cross-validation helper class for multivariate datasets
"""
funcs = {'unsigned char' : npstat.UCharBandwidthCVLeastSquaresND,
'int' : npstat.IntBandwidthCVLeastSquaresND,
'long' : npstat.LongBandwidthCVLeastSquaresND,
'float' : npstat.FloatBandwidthCVLeastSquaresND,
'double' : npstat.DoubleBandwidthCVLeastSquaresND}
return funcs[typename](*args)
def BandwidthCVPseudoLogliND(typename, *args):
"""
Pseudo-likelihood cross-validation helper class for multivariate datasets
"""
funcs = {'unsigned char' : npstat.UCharBandwidthCVPseudoLogliND,
'int' : npstat.IntBandwidthCVPseudoLogliND,
'long' : npstat.LongBandwidthCVPseudoLogliND,
'float' : npstat.FloatBandwidthCVPseudoLogliND,
'double' : npstat.DoubleBandwidthCVPseudoLogliND}
return funcs[typename](*args)
def BandwidthGCVLeastSquares1D(typename, *args):
"""
Least squares grouped cross-validation helper class
for 1-d datasets
"""
funcs = {'unsigned char' : npstat.UCharBandwidthGCVLeastSquares1D,
'int' : npstat.IntBandwidthGCVLeastSquares1D,
'long' : npstat.LongBandwidthGCVLeastSquares1D,
'float' : npstat.FloatBandwidthGCVLeastSquares1D,
'double' : npstat.DoubleBandwidthGCVLeastSquares1D}
return funcs[typename](*args)
def BandwidthGCVPseudoLogli1D(typename, *args):
"""
Pseudo-likelihood grouped cross-validation helper class
for 1-d datasets
"""
funcs = {'unsigned char' : npstat.UCharBandwidthGCVPseudoLogli1D,
'int' : npstat.IntBandwidthGCVPseudoLogli1D,
'long' : npstat.LongBandwidthGCVPseudoLogli1D,
'float' : npstat.FloatBandwidthGCVPseudoLogli1D,
'double' : npstat.DoubleBandwidthGCVPseudoLogli1D}
return funcs[typename](*args)
def BandwidthGCVLeastSquaresND(typename, *args):
"""
Least squares grouped cross-validation helper class
for multivariate datasets
"""
funcs = {'unsigned char' : npstat.UCharBandwidthGCVLeastSquaresND,
'int' : npstat.IntBandwidthGCVLeastSquaresND,
'long' : npstat.LongBandwidthGCVLeastSquaresND,
'float' : npstat.FloatBandwidthGCVLeastSquaresND,
'double' : npstat.DoubleBandwidthGCVLeastSquaresND}
return funcs[typename](*args)
def BandwidthGCVPseudoLogliND(typename, *args):
"""
Pseudo-likelihood grouped cross-validation helper class
for multivariate datasets
"""
funcs = {'unsigned char' : npstat.UCharBandwidthGCVPseudoLogliND,
'int' : npstat.IntBandwidthGCVPseudoLogliND,
'long' : npstat.LongBandwidthGCVPseudoLogliND,
'float' : npstat.FloatBandwidthGCVPseudoLogliND,
'double' : npstat.DoubleBandwidthGCVPseudoLogliND}
return funcs[typename](*args)
def _KDEFilterND(*args):
return npstat.KDEFilterND_10(*args)
def _KDEFilterND_maxdeg():
return npstat.KDEFilterND_10.classMaxDegree()
def _LOrPEFilterND(*args):
return npstat.LocalPolyFilterND_10(*args)
def _LOrPEFilterND_maxdeg():
return npstat.LocalPolyFilterND_10.classMaxDegree()
class _BinnedCVCalc1D:
def __init__(self, effectiveSampleSize, weightedSample):
self._effectiveSampleSize = effectiveSampleSize
self._weightedSample = weightedSample
def __call__(self, h, result, filt):
calc = self._makeCalc(h)
if self._weightedSample is not None:
return calc.cvWeightedSample(h, self._weightedSample, result, filt)
elif self._effectiveSampleSize is not None:
return calc.cvWeighted(h, self._effectiveSampleSize, result, filt)
else:
return calc.cv(h, result, filt)
def _makeCalc(self, h):
raise NotImplementedError
class BinnedRLCVCalc1D(_BinnedCVCalc1D):
def __init__(self, regParameter, effectiveSampleSize=None, weightedSample=None):
assert regParameter >= 0.0, "RLCV regularisation parameter can not be negative"
_BinnedCVCalc1D.__init__(self, effectiveSampleSize, weightedSample)
self._regParameter = regParameter
def _makeCalc(self, h):
return BandwidthCVPseudoLogli1D(histoBinType(h), self._regParameter)
class BinnedLSCVCalc1D(_BinnedCVCalc1D):
def __init__(self, effectiveSampleSize=None, weightedSample=None):
_BinnedCVCalc1D.__init__(self, effectiveSampleSize, weightedSample)
def _makeCalc(self, h):
return BandwidthCVLeastSquares1D(histoBinType(h))
class _BinnedCVCalcND:
def __init__(self, effectiveSampleSize):
self._effectiveSampleSize = effectiveSampleSize
def __call__(self, h, result, filt):
calc = self._makeCalc(h)
if self._effectiveSampleSize is not None:
return calc.cvWeighted(h, self._effectiveSampleSize, result, filt)
else:
return calc.cv(h, result, filt)
def _makeCalc(self, h):
raise NotImplementedError
class BinnedRLCVCalcND(_BinnedCVCalcND):
def __init__(self, regParameter, effectiveSampleSize=None):
assert regParameter >= 0.0, "RLCV regularisation parameter can not be negative"
_BinnedCVCalcND.__init__(self, effectiveSampleSize)
self._regParameter = regParameter
def _makeCalc(self, h):
return BandwidthCVPseudoLogliND(histoBinType(h), self._regParameter)
class BinnedLSCVCalcND(_BinnedCVCalcND):
def __init__(self, effectiveSampleSize=None):
_BinnedCVCalcND.__init__(self, effectiveSampleSize)
def _makeCalc(self, h):
return BandwidthCVLeastSquaresND(histoBinType(h))
class _DiscreteDEBase:
"""
Base class for density estimation on regular grids
"""
def __init__(self, prototypeHisto, i_allowNegativeDensities):
self._dim = prototypeHisto.dim()
if self._dim < 1:
raise ValueError("Expected a dimensionful histogram as a prototype")
if not prototypeHisto.isUniformlyBinned():
raise ValueError("Expected a uniformy binned histogram as a prototype")
self._allowNegative = i_allowNegativeDensities
self._shape = prototypeHisto.shape()
self._binWidths = prototypeHisto.binWidths()
self._binVolume = prototypeHisto.binVolume()
self._nBinsTotal = prototypeHisto.nBins()
self._dataIsDensity = None
def __eq__(self, other):
return self._dim == other._dim and \
self._allowNegative == other._allowNegative and \
self._shape == other._shape and \
self._binWidths == other._binWidths and \
self._binVolume == other._binVolume
def __ne__(self, other):
return not self.__eq__(other)
def dim(self):
return self._dim
def isCompatible(self, h):
if h.dim() != self._dim:
return False
if not h.isUniformlyBinned():
return False
return self._shape == h.shape() and self._binWidths == h.binWidths()
def setData(self, h):
if not self.isCompatible(h):
raise ValueError("Incompatible input histogram")
self._setValidatedData(h)
self._dataIsDensity = npstat.arrayIsDensity(h.binContents())
def isDataSet(self):
return self._dataIsDensity is not None
def estimateDensity(self, h, *args):
self.setData(h)
return self.estimateDataDensity(*args)
def bestFilterByCV(self, h, cvCalc):
assert self.nFiltersAdded() > 0, "No filters added"
self.setData(h)
return self._runCV(h, cvCalc)
def estimateDataDensity(self, *args):
assert self.isDataSet(), "Data is not set"
return self._estimateDataDensity(*args)
def _normalize(self, result, guaranteedNonNegative=False):
if self._allowNegative or guaranteedNonNegative:
volume = np.sum(result)*self._binVolume
assert volume > 0.0, "Result can not be normalized. "\
"Was the input histogram filled?"
result /= volume
return result
else:
return correctDensityEstimateGHU(result, self._binVolume)
# Store the validated input histogram or its info
def _setValidatedData(self, h):
raise NotImplementedError
# Run the cross-validation procedure with all previously
# added filters
def _runCV(self, h, cvCalc):
raise NotImplementedError
# Return the density estimate on validated data, possibly
# adding a filter in the process
def _estimateDataDensity(self, *args):
raise NotImplementedError
def addFilter(self, *args):
raise NotImplementedError
def nFiltersAdded(self):
raise NotImplementedError
def clearFilters(self):
raise NotImplementedError
class _MemoizingFilterBase(_DiscreteDEBase):
"""
This class manages a dictionary of discrete filter structures
"""
def __init__(self, prototypeHisto, i_allowNegativeDensities):
_DiscreteDEBase.__init__(self, prototypeHisto, i_allowNegativeDensities)
self._filterTable = dict()
self._extraFilter = None
self._extraKey = None
def __eq__(self, other):
return _DiscreteDEBase.__eq__(self, other) and \
self._filterTable == other._filterTable and \
self._extraFilter == other.extraFilter and \
self._extraKey == other._extraKey
def nFiltersAdded(self):
return len(self._filterTable)
def clearFilters(self):
self._filterTable = dict()
self._extraFilter = None
self._extraKey = None
def _runCV(self, h, cvCalc):
bestCVValue = -1.7e308
bestKey = None
buffer = self._makeCVBuffer()
for key, filt in self._filterTable.items():
selfContrib = self._selfContributionFunctor(filt)
estimate = self._makeEstimate(key, filt, buffer)
cvValue = cvCalc(h, estimate, selfContrib)
if cvValue > bestCVValue:
bestCVValue = cvValue
bestKey = key
assert bestKey is not None, "Cross-validation scan failed"
bestEstimate = self._makeEstimate(bestKey, self._filterTable[bestKey])
return bestCVValue, bestKey, bestEstimate
def addFilter(self, *args):
raise NotImplementedError
def _estimateDataDensity(self, *args):
raise NotImplementedError
def _selfContributionFunctor(self, filt):
raise NotImplementedError
def _makeCVBuffer(self):
raise NotImplementedError
def _setValidatedData(self, h):
raise NotImplementedError
def _makeEstimate(self, key, filt, buffer=None):
raise NotImplementedError
class _MemoizingFilterMaker(_MemoizingFilterBase):
"""
This class is useful in case we are building the filters internally,
from the arguments provided to "estimateDataDensity" or "addFilter"
methods. It is assumed that the arguments can also be used as a key
in a dictionary, for looking up the filters in case we already created
them in the past.
"""
def __init__(self, prototypeHisto, i_allowNegativeDensities):
_MemoizingFilterBase.__init__(self, prototypeHisto, i_allowNegativeDensities)
def addFilter(self, key):
if key not in self._filterTable:
self._filterTable[key] = self._makeFilter(key, False)
return self._filterTable[key]
def _estimateDataDensity(self, key, rememberFilter=True):
filt = None
if key in self._filterTable:
filt = self._filterTable[key]
elif rememberFilter:
filt = self.addFilter(key)
elif self._extraKey == key:
filt = self._extraFilter
if filt is None:
filt = self._makeFilter(key, True)
self._extraFilter = filt
self._extraKey = key
return self._makeEstimate(key, filt)
def _makeFilter(self, key, isExtra):
raise NotImplementedError
class _MemoizingFilterManager(_MemoizingFilterBase):
"""
This class is useful in case we are building the filters from the
weight scans provided by the user
"""
def __init__(self, prototypeHisto, i_allowNegativeDensities):
_MemoizingFilterBase.__init__(self, prototypeHisto, i_allowNegativeDensities)
def addFilter(self, iniqueWeightDescription, filterDegree,
kernelScan, mirrorScan):
key = (iniqueWeightDescription, filterDegree)
if key not in self._filterTable:
filt = self._validateAndMakeFilter(filterDegree, kernelScan,
mirrorScan, False)
self._filterTable[key] = filt
return self._filterTable[key]
def _estimateDataDensity(self, iniqueWeightDescription, filterDegree,
kernelScan=None, mirrorScan=None, rememberFilter=True):
filt = None
key = (iniqueWeightDescription, filterDegree)
if key in self._filterTable:
filt = self._filterTable[key]
elif rememberFilter:
filt = self.addFilter(key)
elif self._extraKey == key:
filt = self._extraFilter
if filt is None:
filt = self._validateAndMakeFilter(filterDegree, kernelScan,
mirrorScan, True)
self._extraFilter = filt
self._extraKey = key
return self._makeEstimate(key, filt)
def _makeCVBuffer(self):
return np.empty(self._shape, np.double)
def _validateAndMakeFilter(self, filterDegree, kernelScan, mirrorScan, isExtra):
assert filterDegree >= 0.0, "Filter degree can not be negative"
assert filterDegree <= self._maxFilterDegree(), "Filter degree is too high"
assert kernelScan is not None, "This combination of weight/degree has not "\
"appeared yet, so the weight must be specified"
assert mirrorScan is not None, "This combination of weight/degree has not "\
"appeared yet, so the weight mirroring "\
"specification must be provided"
return self._makeFilter(filterDegree, kernelScan, mirrorScan, isExtra)
def _makeFilter(self, filterDegree, kernelScan, mirrorScan, isExtra):
raise NotImplementedError
def _maxFilterDegree(self):
raise NotImplementedError
class FFTKDE1D(_MemoizingFilterMaker):
"""
Fast 1-d KDE on regular grids utilizing FFT for convolutions
"""
def __init__(self, prototypeHisto, allowNegativeResults=True,
useMirroring=True):
_MemoizingFilterMaker.__init__(self, prototypeHisto, allowNegativeResults)
self._mirror = useMirroring
self._engine = npstat.ConvolutionEngine1D(2*self._nBinsTotal)
self._extraId = 2147483647
# Add more constructor parameters if you want to
# change the settings used below
self._bh = npstat.BoundaryHandling("BM_TRUNCATE")
self._gaussKernelNSigmas = 12.0
def clearFilters(self):
_MemoizingFilterMaker.clearFilters(self)
self._engine.clearFilterBank();
def _selfContributionFunctor(self, filt):
id, sc = filt
return npstat.ContribConstant1D(sc, self._nBinsTotal)
def _makeCVBuffer(self):
return None
def _setValidatedData(self, h):
self._engine.setFilter(npstat.doubleHistoBins1D(h, True, self._mirror))
def _makeFilter(self, key, isExtra):
filterDegree, bandwidth, symbetaPower = key
assert filterDegree >= 0.0, "Filter degree can not be negative"
assert bandwidth > 0.0, "Bandwidth must be positive"
if symbetaPower < 0.0:
distro = npstat.TruncatedGauss1D(0.0, bandwidth, self._gaussKernelNSigmas)
else:
distro = npstat.SymmetricBeta1D(0.0, bandwidth, symbetaPower)
builder = npstat.getBoundaryFilter1DBuilder(self._bh, distro, self._binVolume)
taper = npstat.contDegreeTaper(filterDegree)
kernelCenter = self._nBinsTotal - 1
pf = builder.makeFilter(taper, len(taper)-1, kernelCenter, 2*self._nBinsTotal)
selfContribution = pf.valueAt(pf.peakPosition())
tmp = npstat.polyFilter1DToNumpy(pf, kernelCenter, 2*self._nBinsTotal)
if isExtra:
id = self._extraId
else:
id = self.nFiltersAdded()
self._engine.depositFilter(id, tmp, self._nBinsTotal+1)
return id, selfContribution
def _makeEstimate(self, key, filt, buffer=None):
filterDegree, bandwidth, symbetaPower = key
id, sc = filt
guaranteedNonNegative = self._dataIsDensity and filterDegree == 0
arr = self._engine.convolveFilterAndDeposit(id, guaranteedNonNegative)
result = np.resize(arr, self._nBinsTotal)
return self._normalize(result, guaranteedNonNegative)
class LOrPE1D(_MemoizingFilterMaker):
"""
Density estimation by LOrPE on regular 1-d grids
"""
def __init__(self, prototypeHisto, allowNegativeResults=True, bh = None):
_MemoizingFilterMaker.__init__(self, prototypeHisto, allowNegativeResults)
self._bh = bh
if bh is None:
self._bh = npstat.BoundaryHandling("BM_FOLD")
self._data = None
def _makeCVBuffer(self):
return np.empty((self._nBinsTotal,), dtype=np.double)
def _selfContributionFunctor(self, filt):
return filt
def _setValidatedData(self, h):
self._data = npstat.doubleHistoBins1D(h)
def _makeFilter(self, key, isExtra):
filterDegree, bandwidth, symbetaPower = key
assert filterDegree >= 0.0, "Filter degree can not be negative"
assert bandwidth > 0.0, "Bandwidth must be positive"
return npstat.symbetaLOrPEFilter1D(
symbetaPower, bandwidth, filterDegree,
self._nBinsTotal, self._binVolume, self._bh)
def _makeEstimate(self, key, filt, buffer=None):
filterDegree, bandwidth, symbetaPower = key
guaranteedNonNegative = self._dataIsDensity and filterDegree == 0
if buffer is None:
result = filt.filter(self._data, guaranteedNonNegative)
else:
filt.filterIntoBuffer(self._data, buffer, guaranteedNonNegative)
result = buffer
return self._normalize(result, guaranteedNonNegative)
class DiscreteOSDE1D(_MemoizingFilterMaker):
"""
OSDE with Legendre polynomials on regular 1-d grids
"""
def __init__(self, prototypeHisto, allowNegativeResults=True, maxdeg=50,
weightFunction=None):
_MemoizingFilterMaker.__init__(self, prototypeHisto, allowNegativeResults)
assert maxdeg >= 0, "Filter degree can not be negative"
self._maxdeg = min(self._nBinsTotal-1, maxdeg)
if weightFunction is None:
wF = np.ones(self._nBinsTotal)
self._legendre = True
else:
wF = weightFunction
self._legendre = False
self._poly = npstat.OrthoPoly1D(self._maxdeg, wF, self._binVolume)
self._data = None
def _makeCVBuffer(self):
return np.empty((self._nBinsTotal,), dtype=np.double)
def _setValidatedData(self, h):
self._data = np.empty((self._maxdeg+1,), dtype=np.double)
self._poly.densityCoeffs(npstat.doubleHistoBins1D(h), self._data)
def _makeFilter(self, filterDegree, isExtra):
assert filterDegree >= 0.0, "Filter degree can not be negative"
taper = npstat.contDegreeTaper(filterDegree)
if len(taper) > self._maxdeg+1:
raise ValueError("Filter degree is too large")
coeffs = np.array(taper, dtype=np.double)
response = np.empty((self._nBinsTotal,), dtype=np.double)
self._poly.globalFilterDiag(coeffs, response)
contrib = npstat.ContribVector1D(response)
return coeffs, contrib
def _makeEstimate(self, filterDegree, filt, buffer=None):
assert filterDegree >= 0.0, "Filter degree can not be negative"
taper, contrib = filt
coeffs = [a*b for a,b in zip(taper, self._data)]
if buffer is None:
result = np.empty((self._nBinsTotal,), dtype=np.double)
else:
result = buffer
self._poly.allSeries(coeffs, result)
return self._normalize(result)
def _selfContributionFunctor(self, filt):
taper, contrib = filt
return contrib
def maxDegree(self):
return self._maxdeg
def optimalDegreeHart(self, h, k=2, effectiveSampleSize=None):
self.setData(h)
sampleSize = effectiveSampleSize
if sampleSize is None:
sampleSize = h.nFillsInRange()
vars = np.empty((self._maxdeg+1,), dtype=np.double)
self._poly.densityCoeffsVariance(npstat.doubleHistoBins1D(h),
vars, sampleSize)
cumsum = np.cumsum(k*vars - self._data*self._data)
return max(np.argmin(cumsum).item(), 1)
class FFTKDEND(_MemoizingFilterManager):
"""
Fast multivariate KDE on regular grids utilizing FFT for convolutions
"""
def __init__(self, prototypeHisto, allowNegativeResults=True,
useDataMirroring=True):
_MemoizingFilterManager.__init__(self, prototypeHisto, allowNegativeResults)
self._mirror = useDataMirroring
engineShape = npstat.doubleShape(self._shape)
self._engine = npstat.ConvolutionEngineND(engineShape)
self._workBuf = ArrayND("double", engineShape)
self._extraId = 2147483647
def _maxFilterDegree(self):
return _KDEFilterND_maxdeg()
def _setValidatedData(self, h):
self._engine.setFilter(npstat.doubleHistoBinsND(h, True, self._mirror))
def _makeFilter(self, filterDegree, kernelScan, mirrorScan, isExtra):
if isExtra:
id = self._extraId
else:
id = self.nFiltersAdded()
isDensity = npstat.arrayIsDensity(kernelScan)
if filterDegree > 0.0:
assert isDensity, "Filter weight must be a density"
taper = npstat.contDegreeTaper(filterDegree)
filt = _KDEFilterND(taper, kernelScan, self._engine, id,
self._workBuf, self._mirror, mirrorScan, False)
return filt, id, isDensity
def clearFilters(self):
_MemoizingFilterManager.clearFilters(self)
self._engine.clearFilterBank();
def _selfContributionFunctor(self, filtSpec):
filt, id, isDensity = filtSpec
return filt
def _makeEstimate(self, key, filterSpec, buffer=None):
name, filterDegree = key
filt, id, isDensity = filterSpec
guaranteedNonNegative = self._dataIsDensity and \
filterDegree == 0 and isDensity
arr = self._engine.convolveFilterAndDeposit(id, guaranteedNonNegative)
# We need just one quadrant of the result. Note that
# np.resize will not do the right thing here
if buffer is None:
result = np.empty(self._shape, np.double)
else:
result = buffer
arrWrap = arrayNDFromNumpy(arr, False)
rWrap = arrayNDFromNumpy(result, False)
rWrap.importSubrange([0]*self._dim, arrWrap)
return self._normalize(result, guaranteedNonNegative)
class LOrPEND(_MemoizingFilterManager):
"""
Density estimation with LOrPE on regular multivariate grids
"""
def __init__(self, prototypeHisto, allowNegativeResults=True):
_MemoizingFilterManager.__init__(self, prototypeHisto, allowNegativeResults)
self._histo = None
def _maxFilterDegree(self):
return _LOrPEFilterND_maxdeg()
def _setValidatedData(self, h):
self._histo = h
def _selfContributionFunctor(self, filterSpec):
filt, isDensity = filterSpec
return filt
def _makeFilter(self, filterDegree, kernelScan, mirrorScan, isExtra):
taper = npstat.contDegreeTaper(filterDegree)
isDensity = npstat.arrayIsDensity(kernelScan)
filt = _LOrPEFilterND(taper, kernelScan, mirrorScan, self._shape)
return filt, isDensity
def _makeEstimate(self, key, filterSpec, buffer=None):
name, filterDegree = key
filt, isDensity = filterSpec
guaranteedNonNegative = self._dataIsDensity and \
filterDegree == 0 and isDensity
if buffer is None:
result = np.empty(self._shape, np.double)
else:
result = buffer
filt.filter(self._histo.binContents(), arrayNDFromNumpy(result, False))
return self._normalize(result, guaranteedNonNegative)
class SequentialLOrPE(_MemoizingFilterMaker):
"""
Multivariate LOrPE utilizing tensor product of 1-d LOrPE
"""
def __init__(self, prototypeHisto, allowNegativeResults=True, bhSet=None):
_MemoizingFilterMaker.__init__(self, prototypeHisto, allowNegativeResults)
if bhSet is None:
self._bhSet = [npstat.BoundaryHandling("BM_FOLD")] * self._dim
elif type(bhSet).__name__ == 'BoundaryHandling':
self._bhSet = [bhSet] * self._dim
elif len(bhSet) != self._dim:
raise ValueError("Incompatible number of boundary handling specifications")
else:
self._bhSet = bhSet
self._histo = None
def _selfContributionFunctor(self, filt):
return filt
def _makeCVBuffer(self):
return np.empty(self._shape, np.double)
def _setValidatedData(self, h):
self._histo = h
def _makeFilter(self, sequence, isExtra):
if len(sequence) != self._dim:
raise ValueError("Incompatible filter specification length")
arg = list()
for spec1d, nbins, bw, bh in zip(sequence, self._shape,
self._binWidths, self._bhSet):
filterDegree, bandwidth, symbetaPower = spec1d
# Validity of bandwidth, filterDegree, nbins, and bw
# will be checked in the AllSymbetaParams1D constructor
arg.append(npstat.AllSymbetaParams1D(symbetaPower, bandwidth,
filterDegree, nbins, bw, bh))
return npstat.SequentialPolyFilterND(arg)
def _makeEstimate(self, key, filt, buffer=None):
if buffer is None:
result = np.empty(self._shape, np.double)
else:
result = buffer
filt.filter(self._histo.binContents(), arrayNDFromNumpy(result, False))
return self._normalize(result)
def bernsteinCopulaSmootherDegrees(dim, mindeg, maxdeg):
if mindeg < 0:
raise ValueError("Minimum degree can not be negative")
if mindeg > maxdeg:
raise ValueError("Minimum degree can not be larger than maximum")
if mindeg < maxdeg:
m = npstat.IntMatrix(maxdeg - mindeg, dim)
for i, j in enumerate([k for k in range(mindeg, maxdeg)]):
for col in range(dim):
m.set(i, col, j)
else:
m = npstat.IntMatrix()
return m
def bernsteinCopulaSmootherBwToDeg(bandwidth):
return int(1.0/bandwidth - 0.5)
+
+
+def pluginBandwidthScalingTable(symbetaPower, sampleSize, stepsPerDegree=100):
+ assert isinstance(symbetaPower, int)
+ assert isinstance(stepsPerDegree, int)
+ assert stepsPerDegree > 0
+ # Maximum filter degree permitted by NPStat in "approxSymbetaBandwidthRatio"
+ maxFilterDegree = 42
+ ncoords = maxFilterDegree*stepsPerDegree + 1
+ degrees = npstat.EquidistantInLinearSpace(0.0, maxFilterDegree*1.0, ncoords)
+ factors = np.empty(ncoords, np.double)
+ for i, deg in enumerate(degrees):
+ bwGauss = npstat.approxAmisePluginBwGauss(deg, sampleSize, 1.0)
+ factors[i] = bwGauss*npstat.approxSymbetaBandwidthRatio(symbetaPower, deg)
+ return npstat.LinInterpolatedTable1D(factors, 0.0, maxFilterDegree, False, True)
Index: trunk/npstat/stat/LOrPE1DFixedDegreeCVRunner.hh
===================================================================
--- trunk/npstat/stat/LOrPE1DFixedDegreeCVRunner.hh (revision 812)
+++ trunk/npstat/stat/LOrPE1DFixedDegreeCVRunner.hh (revision 813)
@@ -1,77 +1,85 @@
#ifndef NPSTAT_LORPE1DFIXEDDEGREECVRUNNER_HH_
#define NPSTAT_LORPE1DFIXEDDEGREECVRUNNER_HH_
#include <cmath>
#include <cassert>
#include "npstat/nm/goldenSectionSearch.hh"
#include "npstat/nm/MathUtils.hh"
#include "npstat/nm/Triple.hh"
namespace npstat {
class LOrPE1DFixedDegreeCVRunner
{
public:
inline LOrPE1DFixedDegreeCVRunner(const double i_fixedDegree,
const double i_bwFactorGuess,
const double i_tol)
: fixedDegree_(i_fixedDegree),
bwFactorGuess_(i_bwFactorGuess),
tol_(i_tol)
{
assert(fixedDegree_ >= 0.0);
assert(bwFactorGuess_ > 0.0);
assert(tol_ > 0.0);
}
inline double fixedDegree() const {return fixedDegree_;}
inline double bwFactorGuess() const {return bwFactorGuess_;}
inline double tol() const {return tol_;}
// The elements of the returned triple are:
// first: optimal filter degree
// second: optimal bandwidth
// third: maximum value of the cross-validation function
template<class Lorpe>
inline Triple<double,double,double> crossValidate(Lorpe& lorpe) const
{
const double oldDeg = lorpe.boundFilterDegree();
lorpe.bindFilterDegree(fixedDegree_);
- double bwFactor = 0.0, bestFcn;
+ const double sqr2 = 1.414213562373095;
+ double bwFactor = 0.0, bestFcn, divider = 1.0;
+ bool bestBwFound = false;
auto fcn = MultiplyByConst(lorpe, -1.0);
- const bool bestBwFound = goldenSectionSearchInLogSpace(
- fcn, bwFactorGuess_, tol_, &bwFactor, &bestFcn);
+ const unsigned maxtries = 11; // divider can be increased up to
+ // the factor 2^((maxtries-1)/2)
+ for (unsigned itry=0; itry<maxtries && !bestBwFound; ++itry)
+ {
+ bestBwFound = goldenSectionSearchInLogSpace(
+ fcn, bwFactorGuess_/divider, tol_, &bwFactor, &bestFcn);
+ divider *= sqr2;
+ }
assert(bestBwFound);
assert(bwFactor > 0.0);
// Construct parabolic approximation to the minimum
const double fstep = 1.0 + tol_;
const double bwMin = bwFactor/fstep;
const double bwMax = bwFactor*fstep;
double extremumCoordinate, extremumValue;
const bool isMinimum = parabolicExtremum(
std::log(bwMin), fcn(bwMin),
std::log(bwFactor), bestFcn,
std::log(bwMax), fcn(bwMax),
&extremumCoordinate, &extremumValue);
assert(isMinimum);
if (oldDeg < 0.0)
lorpe.unbindFilterDegree();
else
lorpe.bindFilterDegree(oldDeg);
return Triple<double,double,double>(
fixedDegree_, std::exp(extremumCoordinate), -extremumValue);
}
private:
double fixedDegree_;
double bwFactorGuess_;
double tol_;
};
}
#endif // NPSTAT_LORPE1DFIXEDDEGREECVRUNNER_HH_
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Dec 21, 5:21 PM (12 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
4023616
Default Alt Text
(34 KB)
Attached To
rNPSTATSVN npstatsvn
Event Timeline
Log In to Comment