Index: trunk/npstat/swig/npstat_density_estimation.py =================================================================== --- trunk/npstat/swig/npstat_density_estimation.py (revision 656) +++ trunk/npstat/swig/npstat_density_estimation.py (revision 657) @@ -1,749 +1,797 @@ """ This module provides a number of useful wrapper classes and functions related to 1-d density estimation """ __author__ = "Igor Volobouev (i.volobouev@ttu.edu)" __version__ = "1.1" __date__ = "Sep 20 2019" import math import npstat import numpy as np from npstat_utils import histoBinType, ArrayND, arrayNDFromNumpy def normalBandwidthFactor(dim, filterDegree, sampleSize): """ This function returns a factor by which the estimate of the sample covariance matrix should be multiplied in order to produce the plug-in covariance matrix of the Gaussian kernel. Take a square root of this if you need a factor for the standard deviation rather than covariance. """ assert filterDegree == 0, "Non-0 filter degrees not implemented yet" return math.pow(4.0/(dim + 2.0), 2.0/(dim + 4.0))*\ math.pow(sampleSize, -2.0/(dim + 4.0)) def correctDensityEstimateGHU(arr, binSize): out = np.empty(np.shape(arr), np.double) npstat.correctDensityEstimateGHUOrig(arr, out, binSize) return out def scanSymmetricDensityAsWeight(kernel, bandwidthSet, prototypeHisto, partialScan=True): binWidths = prototypeHisto.binWidths() return npstat.scanSymmetricDensityAsWeight2( kernel, prototypeHisto.shape(), bandwidthSet, binWidths, partialScan) def scanMultivariateDensityAsWeight(kernel, bandwidthSet, prototypeHisto): binWidths = prototypeHisto.binWidths() newshape = list() for n in prototypeHisto.shape(): if n % 2: newshape.append(n) else: newshape.append(n+1) return npstat.scanMultivariateDensityAsWeight2( kernel, newshape, bandwidthSet, binWidths) def histoPluginBandwidth1D(h, sampleSize, deg, symbetaPower): if h.dim() != 1: raise ValueError("Expected 1-d histogram as an argument") assert deg >= 0.0, "Filter degree can not be negative" if sampleSize <= 0.0: sampleSize = h.nFillsInRange() qvalues = (0.15865525393145705, 0.84134474606854295) quantiles = npstat.histoQuantiles(h, 0, qvalues) sigmaEstimate = (quantiles[1] - quantiles[0])/2.0 pluginBw = npstat.approxAmisePluginBwGauss(deg, sampleSize, sigmaEstimate) if symbetaPower >= 0.0: pluginBw *= npstat.approxSymbetaBandwidthRatio(symbetaPower, deg); return pluginBw def BandwidthCVLeastSquares1D(typename, *args): """ - Least squares cross-validation helper class - for 1-d datasets + Least squares cross-validation helper class for 1-d datasets """ funcs = {'unsigned char' : npstat.UCharBandwidthCVLeastSquares1D, 'int' : npstat.IntBandwidthCVLeastSquares1D, 'long' : npstat.LongBandwidthCVLeastSquares1D, 'float' : npstat.FloatBandwidthCVLeastSquares1D, 'double' : npstat.DoubleBandwidthCVLeastSquares1D} return funcs[typename](*args) def BandwidthCVPseudoLogli1D(typename, *args): """ - Pseudo-likelihood cross-validation helper class - for 1-d datasets + Pseudo-likelihood cross-validation helper class for 1-d datasets """ funcs = {'unsigned char' : npstat.UCharBandwidthCVPseudoLogli1D, 'int' : npstat.IntBandwidthCVPseudoLogli1D, 'long' : npstat.LongBandwidthCVPseudoLogli1D, 'float' : npstat.FloatBandwidthCVPseudoLogli1D, 'double' : npstat.DoubleBandwidthCVPseudoLogli1D} return funcs[typename](*args) def BandwidthCVLeastSquaresND(typename, *args): """ - Least squares cross-validation helper class - for multivariate datasets + Least squares cross-validation helper class for multivariate datasets """ funcs = {'unsigned char' : npstat.UCharBandwidthCVLeastSquaresND, 'int' : npstat.IntBandwidthCVLeastSquaresND, 'long' : npstat.LongBandwidthCVLeastSquaresND, 'float' : npstat.FloatBandwidthCVLeastSquaresND, 'double' : npstat.DoubleBandwidthCVLeastSquaresND} return funcs[typename](*args) def BandwidthCVPseudoLogliND(typename, *args): """ - Pseudo-likelihood cross-validation helper class - for multivariate datasets + Pseudo-likelihood cross-validation helper class for multivariate datasets """ funcs = {'unsigned char' : npstat.UCharBandwidthCVPseudoLogliND, 'int' : npstat.IntBandwidthCVPseudoLogliND, 'long' : npstat.LongBandwidthCVPseudoLogliND, 'float' : npstat.FloatBandwidthCVPseudoLogliND, 'double' : npstat.DoubleBandwidthCVPseudoLogliND} return funcs[typename](*args) +def BandwidthGCVLeastSquares1D(typename, *args): + """ + Least squares grouped cross-validation helper class + for 1-d datasets + """ + funcs = {'unsigned char' : npstat.UCharBandwidthGCVLeastSquares1D, + 'int' : npstat.IntBandwidthGCVLeastSquares1D, + 'long' : npstat.LongBandwidthGCVLeastSquares1D, + 'float' : npstat.FloatBandwidthGCVLeastSquares1D, + 'double' : npstat.DoubleBandwidthGCVLeastSquares1D} + return funcs[typename](*args) + + +def BandwidthGCVPseudoLogli1D(typename, *args): + """ + Pseudo-likelihood grouped cross-validation helper class + for 1-d datasets + """ + funcs = {'unsigned char' : npstat.UCharBandwidthGCVPseudoLogli1D, + 'int' : npstat.IntBandwidthGCVPseudoLogli1D, + 'long' : npstat.LongBandwidthGCVPseudoLogli1D, + 'float' : npstat.FloatBandwidthGCVPseudoLogli1D, + 'double' : npstat.DoubleBandwidthGCVPseudoLogli1D} + return funcs[typename](*args) + + +def BandwidthGCVLeastSquaresND(typename, *args): + """ + Least squares grouped cross-validation helper class + for multivariate datasets + """ + funcs = {'unsigned char' : npstat.UCharBandwidthGCVLeastSquaresND, + 'int' : npstat.IntBandwidthGCVLeastSquaresND, + 'long' : npstat.LongBandwidthGCVLeastSquaresND, + 'float' : npstat.FloatBandwidthGCVLeastSquaresND, + 'double' : npstat.DoubleBandwidthGCVLeastSquaresND} + return funcs[typename](*args) + + +def BandwidthGCVPseudoLogliND(typename, *args): + """ + Pseudo-likelihood grouped cross-validation helper class + for multivariate datasets + """ + funcs = {'unsigned char' : npstat.UCharBandwidthGCVPseudoLogliND, + 'int' : npstat.IntBandwidthGCVPseudoLogliND, + 'long' : npstat.LongBandwidthGCVPseudoLogliND, + 'float' : npstat.FloatBandwidthGCVPseudoLogliND, + 'double' : npstat.DoubleBandwidthGCVPseudoLogliND} + return funcs[typename](*args) + + def _KDEFilterND(*args): return npstat.KDEFilterND_10(*args) def _KDEFilterND_maxdeg(): return npstat.KDEFilterND_10.classMaxDegree() def _LOrPEFilterND(*args): return npstat.LocalPolyFilterND_10(*args) def _LOrPEFilterND_maxdeg(): return npstat.LocalPolyFilterND_10.classMaxDegree() class _BinnedCVCalc1D: def __init__(self, effectiveSampleSize, weightedSample): self._effectiveSampleSize = effectiveSampleSize self._weightedSample = weightedSample def __call__(self, h, result, filt): calc = self._makeCalc(h) if self._weightedSample is not None: return calc.cvWeightedSample(h, self._weightedSample, result, filt) elif self._effectiveSampleSize is not None: return calc.cvWeighted(h, self._effectiveSampleSize, result, filt) else: return calc.cv(h, result, filt) def _makeCalc(self, h): raise NotImplementedError class BinnedRLCVCalc1D(_BinnedCVCalc1D): def __init__(self, regParameter, effectiveSampleSize=None, weightedSample=None): assert regParameter >= 0.0, "RLCV regularisation parameter can not be negative" _BinnedCVCalc1D.__init__(self, effectiveSampleSize, weightedSample) self._regParameter = regParameter def _makeCalc(self, h): return BandwidthCVPseudoLogli1D(histoBinType(h), self._regParameter) class BinnedLSCVCalc1D(_BinnedCVCalc1D): def __init__(self, effectiveSampleSize=None, weightedSample=None): _BinnedCVCalc1D.__init__(self, effectiveSampleSize, weightedSample) def _makeCalc(self, h): return BandwidthCVLeastSquares1D(histoBinType(h)) class _BinnedCVCalcND: def __init__(self, effectiveSampleSize): self._effectiveSampleSize = effectiveSampleSize def __call__(self, h, result, filt): calc = self._makeCalc(h) if self._effectiveSampleSize is not None: return calc.cvWeighted(h, self._effectiveSampleSize, result, filt) else: return calc.cv(h, result, filt) def _makeCalc(self, h): raise NotImplementedError class BinnedRLCVCalcND(_BinnedCVCalcND): def __init__(self, regParameter, effectiveSampleSize=None): assert regParameter >= 0.0, "RLCV regularisation parameter can not be negative" _BinnedCVCalcND.__init__(self, effectiveSampleSize) self._regParameter = regParameter def _makeCalc(self, h): return BandwidthCVPseudoLogliND(histoBinType(h), self._regParameter) class BinnedLSCVCalcND(_BinnedCVCalcND): def __init__(self, effectiveSampleSize=None): _BinnedCVCalcND.__init__(self, effectiveSampleSize) def _makeCalc(self, h): return BandwidthCVLeastSquaresND(histoBinType(h)) class _DiscreteDEBase: """ Base class for density estimation on regular grids """ def __init__(self, prototypeHisto, i_allowNegativeDensities): self._dim = prototypeHisto.dim() if self._dim < 1: raise ValueError("Expected a dimensionful histogram as a prototype") if not prototypeHisto.isUniformlyBinned(): raise ValueError("Expected a uniformy binned histogram as a prototype") self._allowNegative = i_allowNegativeDensities self._shape = prototypeHisto.shape() self._binWidths = prototypeHisto.binWidths() self._binVolume = prototypeHisto.binVolume() self._nBinsTotal = prototypeHisto.nBins() self._dataIsDensity = None def __eq__(self, other): return self._dim == other._dim and \ self._allowNegative == other._allowNegative and \ self._shape == other._shape and \ self._binWidths == other._binWidths and \ self._binVolume == other._binVolume def __ne__(self, other): return not self.__eq__(other) def dim(self): return self._dim def isCompatible(self, h): if h.dim() != self._dim: return False if not h.isUniformlyBinned(): return False return self._shape == h.shape() and self._binWidths == h.binWidths() def setData(self, h): if not self.isCompatible(h): raise ValueError("Incompatible input histogram") self._setValidatedData(h) self._dataIsDensity = npstat.arrayIsDensity(h.binContents()) def isDataSet(self): return self._dataIsDensity is not None def estimateDensity(self, h, *args): self.setData(h) return self.estimateDataDensity(*args) def bestFilterByCV(self, h, cvCalc): assert self.nFiltersAdded() > 0, "No filters added" self.setData(h) return self._runCV(h, cvCalc) def estimateDataDensity(self, *args): assert self.isDataSet(), "Data is not set" return self._estimateDataDensity(*args) def _normalize(self, result, guaranteedNonNegative=False): if self._allowNegative or guaranteedNonNegative: volume = np.sum(result)*self._binVolume assert volume > 0.0, "Result can not be normalized. "\ "Was the input histogram filled?" result /= volume return result else: return correctDensityEstimateGHU(result, self._binVolume) # Store the validated input histogram or its info def _setValidatedData(self, h): raise NotImplementedError # Run the cross-validation procedure with all previously # added filters def _runCV(self, h, cvCalc): raise NotImplementedError # Return the density estimate on validated data, possibly # adding a filter in the process def _estimateDataDensity(self, *args): raise NotImplementedError def addFilter(self, *args): raise NotImplementedError def nFiltersAdded(self): raise NotImplementedError def clearFilters(self): raise NotImplementedError class _MemoizingFilterBase(_DiscreteDEBase): """ This class manages a dictionary of discrete filter structures """ def __init__(self, prototypeHisto, i_allowNegativeDensities): _DiscreteDEBase.__init__(self, prototypeHisto, i_allowNegativeDensities) self._filterTable = dict() self._extraFilter = None self._extraKey = None def __eq__(self, other): return _DiscreteDEBase.__eq__(self, other) and \ self._filterTable == other._filterTable and \ self._extraFilter == other.extraFilter and \ self._extraKey == other._extraKey def nFiltersAdded(self): return len(self._filterTable) def clearFilters(self): self._filterTable = dict() self._extraFilter = None self._extraKey = None def _runCV(self, h, cvCalc): bestCVValue = -1.7e308 bestKey = None buffer = self._makeCVBuffer() for key, filt in self._filterTable.items(): selfContrib = self._selfContributionFunctor(filt) estimate = self._makeEstimate(key, filt, buffer) cvValue = cvCalc(h, estimate, selfContrib) if cvValue > bestCVValue: bestCVValue = cvValue bestKey = key assert bestKey is not None, "Cross-validation scan failed" bestEstimate = self._makeEstimate(bestKey, self._filterTable[bestKey]) return bestCVValue, bestKey, bestEstimate def addFilter(self, *args): raise NotImplementedError def _estimateDataDensity(self, *args): raise NotImplementedError def _selfContributionFunctor(self, filt): raise NotImplementedError def _makeCVBuffer(self): raise NotImplementedError def _setValidatedData(self, h): raise NotImplementedError def _makeEstimate(self, key, filt, buffer=None): raise NotImplementedError class _MemoizingFilterMaker(_MemoizingFilterBase): """ This class is useful in case we are building the filters internally, from the arguments provided to "estimateDataDensity" or "addFilter" methods. It is assumed that the arguments can also be used as a key in a dictionary, for looking up the filters in case we already created them in the past. """ def __init__(self, prototypeHisto, i_allowNegativeDensities): _MemoizingFilterBase.__init__(self, prototypeHisto, i_allowNegativeDensities) def addFilter(self, key): if key not in self._filterTable: self._filterTable[key] = self._makeFilter(key, False) return self._filterTable[key] def _estimateDataDensity(self, key, rememberFilter=True): filt = None if key in self._filterTable: filt = self._filterTable[key] elif rememberFilter: filt = self.addFilter(key) elif self._extraKey == key: filt = self._extraFilter if filt is None: filt = self._makeFilter(key, True) self._extraFilter = filt self._extraKey = key return self._makeEstimate(key, filt) def _makeFilter(self, key, isExtra): raise NotImplementedError class _MemoizingFilterManager(_MemoizingFilterBase): """ This class is useful in case we are building the filters from the weight scans provided by the user """ def __init__(self, prototypeHisto, i_allowNegativeDensities): _MemoizingFilterBase.__init__(self, prototypeHisto, i_allowNegativeDensities) def addFilter(self, iniqueWeightDescription, filterDegree, kernelScan, mirrorScan): key = (iniqueWeightDescription, filterDegree) if key not in self._filterTable: filt = self._validateAndMakeFilter(filterDegree, kernelScan, mirrorScan, False) self._filterTable[key] = filt return self._filterTable[key] def _estimateDataDensity(self, iniqueWeightDescription, filterDegree, kernelScan=None, mirrorScan=None, rememberFilter=True): filt = None key = (iniqueWeightDescription, filterDegree) if key in self._filterTable: filt = self._filterTable[key] elif rememberFilter: filt = self.addFilter(key) elif self._extraKey == key: filt = self._extraFilter if filt is None: filt = self._validateAndMakeFilter(filterDegree, kernelScan, mirrorScan, True) self._extraFilter = filt self._extraKey = key return self._makeEstimate(key, filt) def _makeCVBuffer(self): return np.empty(self._shape, np.double) def _validateAndMakeFilter(self, filterDegree, kernelScan, mirrorScan, isExtra): assert filterDegree >= 0.0, "Filter degree can not be negative" assert filterDegree <= self._maxFilterDegree(), "Filter degree is too high" assert kernelScan is not None, "This combination of weight/degree has not "\ "appeared yet, so the weight must be specified" assert mirrorScan is not None, "This combination of weight/degree has not "\ "appeared yet, so the weight mirroring "\ "specification must be provided" return self._makeFilter(filterDegree, kernelScan, mirrorScan, isExtra) def _makeFilter(self, filterDegree, kernelScan, mirrorScan, isExtra): raise NotImplementedError def _maxFilterDegree(self): raise NotImplementedError class FFTKDE1D(_MemoizingFilterMaker): """ Fast 1-d KDE on regular grids utilizing FFT for convolutions """ def __init__(self, prototypeHisto, allowNegativeResults=True, useMirroring=True): _MemoizingFilterMaker.__init__(self, prototypeHisto, allowNegativeResults) self._mirror = useMirroring self._engine = npstat.ConvolutionEngine1D(2*self._nBinsTotal) self._extraId = 2147483647 # Add more constructor parameters if you want to # change the settings used below self._bh = npstat.BoundaryHandling("BM_TRUNCATE") self._gaussKernelNSigmas = 12.0 def clearFilters(self): _MemoizingFilterMaker.clearFilters(self) self._engine.clearFilterBank(); def _selfContributionFunctor(self, filt): id, sc = filt return npstat.ContribConstant1D(sc, self._nBinsTotal) def _makeCVBuffer(self): return None def _setValidatedData(self, h): self._engine.setFilter(npstat.doubleHistoBins1D(h, True, self._mirror)) def _makeFilter(self, key, isExtra): filterDegree, bandwidth, symbetaPower = key assert filterDegree >= 0.0, "Filter degree can not be negative" assert bandwidth > 0.0, "Bandwidth must be positive" if symbetaPower < 0.0: distro = npstat.TruncatedGauss1D(0.0, bandwidth, self._gaussKernelNSigmas) else: distro = npstat.SymmetricBeta1D(0.0, bandwidth, symbetaPower) builder = npstat.getBoundaryFilter1DBuilder(self._bh, distro, self._binVolume) taper = npstat.contDegreeTaper(filterDegree) kernelCenter = self._nBinsTotal - 1 pf = builder.makeFilter(taper, len(taper)-1, kernelCenter, 2*self._nBinsTotal) selfContribution = pf.valueAt(pf.peakPosition()) tmp = npstat.polyFilter1DToNumpy(pf, kernelCenter, 2*self._nBinsTotal) if isExtra: id = self._extraId else: id = self.nFiltersAdded() self._engine.depositFilter(id, tmp, self._nBinsTotal+1) return id, selfContribution def _makeEstimate(self, key, filt, buffer=None): filterDegree, bandwidth, symbetaPower = key id, sc = filt guaranteedNonNegative = self._dataIsDensity and filterDegree == 0 arr = self._engine.convolveFilterAndDeposit(id, guaranteedNonNegative) result = np.resize(arr, self._nBinsTotal) return self._normalize(result, guaranteedNonNegative) class LOrPE1D(_MemoizingFilterMaker): """ Density estimation by LOrPE on regular 1-d grids """ def __init__(self, prototypeHisto, allowNegativeResults=True, bh = None): _MemoizingFilterMaker.__init__(self, prototypeHisto, allowNegativeResults) self._bh = bh if bh is None: self._bh = npstat.BoundaryHandling("BM_FOLD") self._data = None def _makeCVBuffer(self): return np.empty((self._nBinsTotal,), dtype=np.double) def _selfContributionFunctor(self, filt): return filt def _setValidatedData(self, h): self._data = npstat.doubleHistoBins1D(h) def _makeFilter(self, key, isExtra): filterDegree, bandwidth, symbetaPower = key assert filterDegree >= 0.0, "Filter degree can not be negative" assert bandwidth > 0.0, "Bandwidth must be positive" return npstat.symbetaLOrPEFilter1D( symbetaPower, bandwidth, filterDegree, self._nBinsTotal, self._binVolume, self._bh) def _makeEstimate(self, key, filt, buffer=None): filterDegree, bandwidth, symbetaPower = key guaranteedNonNegative = self._dataIsDensity and filterDegree == 0 if buffer is None: result = filt.filter(self._data, guaranteedNonNegative) else: filt.filterIntoBuffer(self._data, buffer, guaranteedNonNegative) result = buffer return self._normalize(result, guaranteedNonNegative) class DiscreteOSDE1D(_MemoizingFilterMaker): """ OSDE with Legendre polynomials on regular 1-d grids """ def __init__(self, prototypeHisto, allowNegativeResults=True, maxdeg=50, weightFunction=None): _MemoizingFilterMaker.__init__(self, prototypeHisto, allowNegativeResults) assert maxdeg >= 0, "Filter degree can not be negative" self._maxdeg = min(self._nBinsTotal-1, maxdeg) if weightFunction is None: wF = np.ones(self._nBinsTotal) self._legendre = True else: wF = weightFunction self._legendre = False self._poly = npstat.OrthoPoly1D(self._maxdeg, wF, self._binVolume) self._data = None def _makeCVBuffer(self): return np.empty((self._nBinsTotal,), dtype=np.double) def _setValidatedData(self, h): self._data = np.empty((self._maxdeg+1,), dtype=np.double) self._poly.densityCoeffs(npstat.doubleHistoBins1D(h), self._data) def _makeFilter(self, filterDegree, isExtra): assert filterDegree >= 0.0, "Filter degree can not be negative" taper = npstat.contDegreeTaper(filterDegree) if len(taper) > self._maxdeg+1: raise ValueError("Filter degree is too large") coeffs = np.array(taper, dtype=np.double) response = np.empty((self._nBinsTotal,), dtype=np.double) self._poly.globalFilterDiag(coeffs, response) contrib = npstat.ContribVector1D(response) return coeffs, contrib def _makeEstimate(self, filterDegree, filt, buffer=None): assert filterDegree >= 0.0, "Filter degree can not be negative" taper, contrib = filt coeffs = [a*b for a,b in zip(taper, self._data)] if buffer is None: result = np.empty((self._nBinsTotal,), dtype=np.double) else: result = buffer self._poly.allSeries(coeffs, result) return self._normalize(result) def _selfContributionFunctor(self, filt): taper, contrib = filt return contrib def maxDegree(self): return self._maxdeg def optimalDegreeHart(self, h, k=2, effectiveSampleSize=None): self.setData(h) sampleSize = effectiveSampleSize if sampleSize is None: sampleSize = h.nFillsInRange() vars = np.empty((self._maxdeg+1,), dtype=np.double) self._poly.densityCoeffsVariance(npstat.doubleHistoBins1D(h), vars, sampleSize) cumsum = np.cumsum(k*vars - self._data*self._data) return max(np.argmin(cumsum).item(), 1) class FFTKDEND(_MemoizingFilterManager): """ Fast multivariate KDE on regular grids utilizing FFT for convolutions """ def __init__(self, prototypeHisto, allowNegativeResults=True, useDataMirroring=True): _MemoizingFilterManager.__init__(self, prototypeHisto, allowNegativeResults) self._mirror = useDataMirroring engineShape = npstat.doubleShape(self._shape) self._engine = npstat.ConvolutionEngineND(engineShape) self._workBuf = ArrayND("double", engineShape) self._extraId = 2147483647 def _maxFilterDegree(self): return _KDEFilterND_maxdeg() def _setValidatedData(self, h): self._engine.setFilter(npstat.doubleHistoBinsND(h, True, self._mirror)) def _makeFilter(self, filterDegree, kernelScan, mirrorScan, isExtra): if isExtra: id = self._extraId else: id = self.nFiltersAdded() isDensity = npstat.arrayIsDensity(kernelScan) if filterDegree > 0.0: assert isDensity, "Filter weight must be a density" taper = npstat.contDegreeTaper(filterDegree) filt = _KDEFilterND(taper, kernelScan, self._engine, id, self._workBuf, self._mirror, mirrorScan, False) return filt, id, isDensity def clearFilters(self): _MemoizingFilterManager.clearFilters(self) self._engine.clearFilterBank(); def _selfContributionFunctor(self, filtSpec): filt, id, isDensity = filtSpec return filt def _makeEstimate(self, key, filterSpec, buffer=None): name, filterDegree = key filt, id, isDensity = filterSpec guaranteedNonNegative = self._dataIsDensity and \ filterDegree == 0 and isDensity arr = self._engine.convolveFilterAndDeposit(id, guaranteedNonNegative) # We need just one quadrant of the result. Note that # np.resize will not do the right thing here if buffer is None: result = np.empty(self._shape, np.double) else: result = buffer arrWrap = arrayNDFromNumpy(arr, False) rWrap = arrayNDFromNumpy(result, False) rWrap.importSubrange([0]*self._dim, arrWrap) return self._normalize(result, guaranteedNonNegative) class LOrPEND(_MemoizingFilterManager): """ Density estimation with LOrPE on regular multivariate grids """ def __init__(self, prototypeHisto, allowNegativeResults=True): _MemoizingFilterManager.__init__(self, prototypeHisto, allowNegativeResults) self._histo = None def _maxFilterDegree(self): return _LOrPEFilterND_maxdeg() def _setValidatedData(self, h): self._histo = h def _selfContributionFunctor(self, filterSpec): filt, isDensity = filterSpec return filt def _makeFilter(self, filterDegree, kernelScan, mirrorScan, isExtra): taper = npstat.contDegreeTaper(filterDegree) isDensity = npstat.arrayIsDensity(kernelScan) filt = _LOrPEFilterND(taper, kernelScan, mirrorScan, self._shape) return filt, isDensity def _makeEstimate(self, key, filterSpec, buffer=None): name, filterDegree = key filt, isDensity = filterSpec guaranteedNonNegative = self._dataIsDensity and \ filterDegree == 0 and isDensity if buffer is None: result = np.empty(self._shape, np.double) else: result = buffer filt.filter(self._histo.binContents(), arrayNDFromNumpy(result, False)) return self._normalize(result, guaranteedNonNegative) class SequentialLOrPE(_MemoizingFilterMaker): """ Multivariate LOrPE utilizing tensor product of 1-d LOrPE """ def __init__(self, prototypeHisto, allowNegativeResults=True, bhSet=None): _MemoizingFilterMaker.__init__(self, prototypeHisto, allowNegativeResults) if bhSet is None: self._bhSet = [npstat.BoundaryHandling("BM_FOLD")] * self._dim elif type(bhSet).__name__ == 'BoundaryHandling': self._bhSet = [bhSet] * self._dim elif len(bhSet) != self._dim: raise ValueError("Incompatible number of boundary handling specifications") else: self._bhSet = bhSet self._histo = None def _selfContributionFunctor(self, filt): return filt def _makeCVBuffer(self): return np.empty(self._shape, np.double) def _setValidatedData(self, h): self._histo = h def _makeFilter(self, sequence, isExtra): if len(sequence) != self._dim: raise ValueError("Incompatible filter specification length") arg = list() for spec1d, nbins, bw, bh in zip(sequence, self._shape, self._binWidths, self._bhSet): filterDegree, bandwidth, symbetaPower = spec1d # Validity of bandwidth, filterDegree, nbins, and bw # will be checked in the AllSymbetaParams1D constructor arg.append(npstat.AllSymbetaParams1D(symbetaPower, bandwidth, filterDegree, nbins, bw, bh)) return npstat.SequentialPolyFilterND(arg) def _makeEstimate(self, key, filt, buffer=None): if buffer is None: result = np.empty(self._shape, np.double) else: result = buffer filt.filter(self._histo.binContents(), arrayNDFromNumpy(result, False)) return self._normalize(result)