import pymc3 as pm
import numpy as np
import time
from .models import IntFunc
[docs]def BetaModelPM(x, beta, rc, norm, bkg):
"""
"""
n2 = 10. ** norm
c2 = 10. ** bkg
out = n2 * (1. + (x / rc) ** 2) ** (-3. * beta + 0.5) + c2
return out
[docs]def DoubleBetaPM(x, beta, rc1, rc2, ratio, norm, bkg):
"""
"""
comp1 = (1. + (x / rc1) ** 2) ** (-3. * beta + 0.5)
comp2 = (1. + (x / rc2) ** 2) ** (-3. * beta + 0.5)
n2 = 10. ** norm
c2 = 10. ** bkg
out = n2 * (comp1 + ratio * comp2) + c2
return out
[docs]def PowerLawPM(x, alpha, norm, pivot, bkg):
"""
"""
n2 = 10. ** norm
c2 = 10. ** bkg
out = n2 * (x / pivot) ** (-alpha) + c2
return out
[docs]def ConstPM(x, bkg):
"""
"""
out = 10. ** bkg
return out
[docs]def VikhlininPM(x,beta,rc,alpha,rs,epsilon,gamma,norm,bkg):
"""
"""
term1 = (x/rc) ** (-alpha) * (1. + (x/rc) ** 2) ** (-3 * beta + alpha/2)
term2 = (1. + (x / rs) ** gamma) ** (-epsilon / gamma)
n2 = 10. ** norm
c2 = 10. ** bkg
return n2 * term1 * term2 + c2
[docs]def IntFuncPM(omega,rf,alpha,xmin,xmax):
"""
"""
nb = 100
logmin = pm.math.log(xmin) / pm.math.log(10.)
logmax = pm.math.log(xmax) / pm.math.log(10.)
intot = 0.
for i in range(nb):
basex_low = (logmin + i / nb * (logmax - logmin))
basex_high = (logmin + (i+1) / nb * (logmax - logmin))
z = (10 ** basex_low + 10 ** basex_high) / 2.
width = 10 ** basex_high - 10 ** basex_low
term1 = (omega**2 + z**2) / rf**2
term2 = term1 ** (-alpha)
intot = intot + term2 * width
return intot
[docs]def BknPowPM(x, alpha1, alpha2, norm, jump, bkg, rf=3.0):
"""
Broken power law 3D model projected along the line of sight for discontinuity modeling
.. math::
I(r) = I_0 \\int F(\\omega)^2 d\\ell + B
with :math:`\\omega^2 = r^2 + \ell^2` and
.. math::
F(\\omega) = \left\{ \\begin{array}{ll} \omega^{-\\alpha_1}, & \omega<r_f \\\\ \\frac{1}{C}\omega ^{-\\alpha_2}, & \omega\\geq r_f
\end{array} \\right.
:param x: Radius in arcmin
:type x: numpy.ndarray
:param alpha1: :math:`\\alpha_1` parameter
:type alpha1: class:`theano.tensor`
:param alpha2: :math:`\\alpha_2` parameter
:type alpha2: class:`theano.tensor`
:param rf: rf parameter
:type rf: float
:param norm: log of I0 parameter
:type norm: class:`theano.tensor`
:param jump: C parameter
:type jump: class:`theano.tensor`
:param bkg: log of B parameter
:type bkg: class:`theano.tensor`
:return: Calculated model
:rtype: :class:`numpy.ndarray`
"""
A1 = 10. ** norm
A2 = A1 / (jump ** 2)
inreg = np.where(x < rf)
term1 = IntFunc(x[inreg], rf, alpha1, 0.01 * np.ones(len(x[inreg])), np.sqrt(rf ** 2 - x[inreg] ** 2))
term2 = IntFunc(x[inreg], rf, alpha2, np.sqrt(rf ** 2 - x[inreg] ** 2), 1e3 * np.ones(len(x[inreg])))
inside = A1 * term1 + A2 * term2
outreg = np.where(x >= rf)
term = IntFunc(x[outreg], rf, alpha2, 0.01 * np.ones(len(x[outreg])), 1e3 * np.ones(len(x[outreg])))
outside = A2 * term
out = pm.math.stack([inside, outside])
c2 = 10. ** bkg
return out + c2
[docs]def fit_profile_pymc3(hmcmod, prof, nmcmc=1000, tune=500, find_map=True, fitlow=0., fithigh=1e10):
if hmcmod.start is None or hmcmod.sd is None:
print('Missing prior definition, cannot continue; please provide both "start" and "sd" parameters')
return
npar = hmcmod.npar
reg = np.where(np.logical_and(prof.bins>=fitlow, prof.bins<=fithigh))
sb = prof.profile[reg]
esb = prof.eprof[reg]
rad = prof.bins[reg]
erad = prof.ebins[reg]
counts = prof.counts[reg]
area = prof.area[reg]
exposure = prof.effexp[reg]
bkgcounts = prof.bkgcounts[reg]
if prof.psfmat is not None:
psfmat = np.transpose(prof.psfmat)
else:
psfmat = np.eye(prof.nbin)
hmc_model = pm.Model()
with hmc_model:
# Set up model parameters
allpmod = []
numrf = None
for i in range(npar):
name = hmcmod.parnames[i]
if not hmcmod.fix[i]:
print('Setting Gaussian prior for parameter %s with center %g and standard deviation %g' % (name, hmcmod.start[i], hmcmod.sd[i]))
if hmcmod.limits is not None:
lim = hmcmod.limits[i]
modpar = pm.TruncatedNormal(name, mu=hmcmod.start[i], sd=hmcmod.sd[i], lower=lim[0], upper=lim[1])
else:
modpar = pm.Normal(name, mu=hmcmod.start[i], sd=hmcmod.sd[i])
else:
print('Parameter %s is fixed' % (name))
modpar = pm.ConstantDist(name, hmcmod.start[i])
if name == 'rf':
numrf = modpar.random()
else:
allpmod.append(modpar)
pmod = pm.math.stack(allpmod, axis=0)
if numrf is not None:
modcounts = hmcmod.model(rad, *pmod, rf=numrf) * area * exposure
else:
modcounts = hmcmod.model(rad, *pmod) * area * exposure
pred = pm.math.dot(psfmat, modcounts) + bkgcounts
count_obs = pm.Poisson('counts', mu=pred, observed=counts) # counts likelihood
tinit = time.time()
print('Running MCMC...')
with hmc_model:
if find_map:
start = pm.find_MAP()
trace = pm.sample(nmcmc, start=start, tune=tune)
else:
trace = pm.sample(nmcmc, tune=tune)
print('Done.')
tend = time.time()
print(' Total computing time is: ', (tend - tinit) / 60., ' minutes')
hmcmod.trace = trace
[docs]class HMCModel(object):
"""
Class containing pyproffit model structure for HMC optimization
:param model: Function to be used as surface brightness model
:type model: function
:param vals: Array containing initial values for the parameters (optional)
:type vals: :class:`numpy.ndarray`
"""
def __init__(self,model, start=None, sd=None, limits=None, fix=None):
"""
Constructor of class HMCModel
"""
self.model = model
npar = model.__code__.co_argcount - 1
self.npar = npar
self.parnames = model.__code__.co_varnames[1:npar + 1]
if start is not None:
if len(start) != self.npar:
print(
'Wrong number of parameters in input parameter vector, the provided function requires %d but the vector contains %d. Ignoring.' % (
self.npar, len(start)))
self.start = None
else:
self.start = start
else:
self.start = None
if sd is not None:
if len(sd) != self.npar:
print(
'Wrong number of parameters in prior standard deviation vector, the provided function requires %d but the vector contains %d. Ignoring.' % (
self.npar, len(sd)))
self.sd = None
else:
self.sd = sd
else:
self.sd = None
if limits is not None:
try:
assert (limits.shape == (self.npar, 2))
except AssertionError:
print('Wrong number of parameters in parameter limits, the vector should have shape (%d , 2). Ignoring.' % (
self.npar))
return
self.limits = limits
else:
self.limits = None
if fix is not None:
if len(fix) != self.npar:
print(
'Wrong number of parameters in input vector, the provided function requires %d but the vector contains %d. Ignoring.' % (
self.npar, len(fix)))
self.fix = np.zeros(self.npar, dtype=bool)
else:
self.fix = fix
else:
self.fix = np.zeros(self.npar, dtype=bool)
[docs] def SetPriors(self, start=None, sd=None, limits=None, fix=None):
"""
Set prior definition for the function parameters
:param start:
:param sd:
:param limits:
:param fix:
"""
if start is not None:
if len(start) != self.npar:
print(
'Wrong number of parameters in input parameter vector, the provided function requires %d but the vector contains %d. Ignoring.' % (
self.npar, len(start)))
self.start = None
else:
self.start = start
else:
self.start = None
if sd is not None:
if len(sd) != self.npar:
print(
'Wrong number of parameters in prior standard deviation vector, the provided function requires %d but the vector contains %d. Ignoring.' % (
self.npar, len(sd)))
self.sd = None
else:
self.sd = sd
else:
self.sd = None
if limits is not None:
try:
assert (limits.shape == (self.npar, 2))
except AssertionError:
print('Wrong number of parameters in parameter limits, the vector should have shape (%d , 2). Ignoring.' % (
self.npar))
return
self.limits = limits
else:
self.limits = None
if fix is not None:
if len(fix) != self.npar:
print(
'Wrong number of parameters in input vector, the provided function requires %d but the vector contains %d. Ignoring.' % (
self.npar, len(fix)))
self.fix = np.zeros(self.npar, dtype=bool)
else:
self.fix = fix
else:
self.fix = np.zeros(self.npar, dtype=bool)