import numpy as np
from astropy.io import fits
from astropy import wcs
from scipy.ndimage.filters import gaussian_filter
from scipy.interpolate import griddata
[docs]def get_extnum(fitsfile):
"""
Find the extension number of the first IMAGE extension in an input FITS file
:param fitsfile: Input FITS file to be read
:type fitsfile: str
:return: extension number
:rtype: int
"""
next = 0
if fitsfile[0].header['NAXIS'] == 2:
return 0
else:
print('Primary HDU is not an image, moving on')
nhdu = len(fitsfile)
if nhdu == 1:
print('Error: No IMAGE extension found in input file')
return -1
cont = 1
next = 1
while (cont and next < nhdu):
extension = fitsfile[next].header['XTENSION']
if extension == 'IMAGE':
print('IMAGE HDU found in extension ', next)
cont = 0
else:
next = next + 1
if cont == 1:
print('Error: No IMAGE extension found in input file')
return -1
return next
[docs]class Data(object):
'''Class containing the data to be loaded and used by other pyproffit routines
:param imglink: Path to input image
:type imglink: str
:param explink: Path to exposure map. If none, assume a flat exposure of 1s or an input error map provided through rmsmap
:type explink: str , optional
:param bkglink: Path to background map. If none, assume zero background
:type bkglink: str , optional
:param voronoi: Define whether the input image is a Voronoi image or not (default=False)
:type voronoi: bool , optional
:param rmsmap: Path to error map if the data is not Poisson distributed
:type rmsmap: str , optional
'''
def __init__(self, imglink, explink=None, bkglink=None, voronoi=False, rmsmap=None):
'''
Constructor of class Data
'''
if imglink is None:
print('Error: Image file not provided')
return
fimg = fits.open(imglink)
next = get_extnum(fimg)
self.img = fimg[next].data.astype(float)
self.imglink = imglink
self.explink = explink
self.bkglink = bkglink
self.voronoi = voronoi
self.rmsmap = rmsmap
head = fimg[next].header
self.header = head
self.wcs_inp = wcs.WCS(head, relax=False)
if 'CDELT2' in head:
self.pixsize = head['CDELT2'] * 60. # arcmin
elif 'CD2_2' in head:
self.pixsize = head['CD2_2'] * 60. # arcmin
else:
print('No pixel size could be found in header, will assume a pixel size of 2.5 arcsec')
self.pixsize = 2.5 / 60.
self.axes = self.img.shape
if voronoi:
self.errmap = fimg[1].data.astype(float)
fimg.close()
if explink is None:
self.exposure = np.ones(self.axes)
else:
fexp = fits.open(explink)
next = get_extnum(fexp)
expo = fexp[next].data.astype(float)
if expo.shape != self.axes:
print('Error: Image and exposure map sizes do not match')
return
self.exposure = expo
self.defaultexpo = np.copy(expo)
fexp.close()
if bkglink is None:
self.bkg = np.zeros(self.axes)
else:
fbkg = fits.open(bkglink)
next = get_extnum(fbkg)
bkg = fbkg[next].data.astype(float)
if bkg.shape != self.axes:
print('Error: Image and background map sizes do not match')
return
self.bkg = bkg
fbkg.close()
if rmsmap is not None:
frms = fits.open(rmsmap)
next = get_extnum(frms)
rms = frms[next].data.astype(float)
if rms.shape != self.axes:
print('Error: Image and RMS map sizes do not match')
return
self.rmsmap = rms
frms.close()
else:
self.rmsmap = None
self.filth = None
[docs] def region(self, regfile):
'''
Filter out regions provided in an input DS9 region file
:param regfile: Path to region file. Accepted region file formats are fk5 and image.
:type regfile: str
'''
freg = open(regfile)
lreg = freg.readlines()
freg.close()
nsrc = 0
nreg = len(lreg)
if self.exposure is None:
print('No exposure given')
return
expo = np.copy(self.exposure)
y, x = np.indices(self.axes)
regtype = None
for i in range(nreg):
if 'fk5' in lreg[i]:
regtype = 'fk5'
elif 'image' in lreg[i]:
regtype = 'image'
if regtype is None:
print('Error: invalid format')
return
for i in range(nreg):
if 'circle' in lreg[i]:
vals = lreg[i].split('(')[1].split(')')[0]
if regtype == 'fk5':
xsrc = float(vals.split(',')[0])
ysrc = float(vals.split(',')[1])
rad = vals.split(',')[2]
if '"' in rad:
rad = float(rad.split('"')[0]) / self.pixsize / 60.
elif '\'' in rad:
rad = float(rad.split('\'')[0]) / self.pixsize
else:
rad = float(rad) / self.pixsize * 60.
wc = np.array([[xsrc, ysrc]])
pixcrd = self.wcs_inp.wcs_world2pix(wc, 1)
xsrc = pixcrd[0][0] - 1.
ysrc = pixcrd[0][1] - 1.
else:
xsrc = float(vals.split(',')[0])
ysrc = float(vals.split(',')[1])
rad = float(vals.split(',')[2])
# Define box around source to spped up calculation
boxsize = np.round(rad + 0.5).astype(int)
intcx = np.round(xsrc).astype(int)
intcy = np.round(ysrc).astype(int)
xmin = np.max([intcx-boxsize, 0])
xmax = np.min([intcx+boxsize + 1, self.axes[1]])
ymin = np.max([intcy-boxsize, 0])
ymax = np.min([intcy+boxsize + 1, self.axes[0]])
rbox = np.hypot(x[ymin:ymax,xmin:xmax] - xsrc,y[ymin:ymax,xmin:xmax] - ysrc)
# Mask source
src = np.where(rbox < rad)
expo[ymin:ymax,xmin:xmax][src] = 0.0
nsrc = nsrc + 1
elif 'ellipse' in lreg[i]:
vals = lreg[i].split('(')[1].split(')')[0]
if regtype == 'fk5':
xsrc = float(vals.split(',')[0])
ysrc = float(vals.split(',')[1])
rad1 = vals.split(',')[2]
rad2 = vals.split(',')[3]
angle = float(vals.split(',')[4])
if '"' in rad1:
rad1 = float(rad1.split('"')[0]) / self.pixsize / 60.
rad2 = float(rad2.split('"')[0]) / self.pixsize / 60.
elif '\'' in rad1:
rad1 = float(rad1.split('\'')[0]) / self.pixsize
rad2 = float(rad2.split('\'')[0]) / self.pixsize
else:
rad1 = float(rad1) / self.pixsize * 60.
rad2 = float(rad2) / self.pixsize * 60.
wc = np.array([[xsrc, ysrc]])
pixcrd = self.wcs_inp.wcs_world2pix(wc, 1)
xsrc = pixcrd[0][0] - 1.
ysrc = pixcrd[0][1] - 1.
else:
xsrc = float(vals.split(',')[0])
ysrc = float(vals.split(',')[1])
rad1 = float(vals.split(',')[2])
rad2 = float(vals.split(',')[3])
angle = float(vals.split(',')[2])
ellang = angle * np.pi / 180. + np.pi / 2.
aoverb = rad1/rad2
# Define box around source to spped up calculation
boxsize = np.round(np.max([rad1, rad2]) + 0.5).astype(int)
intcx = np.round(xsrc).astype(int)
intcy = np.round(ysrc).astype(int)
xmin = np.max([intcx-boxsize, 0])
xmax = np.min([intcx+boxsize + 1, self.axes[1]])
ymin = np.max([intcy-boxsize, 0])
ymax = np.min([intcy+boxsize + 1, self.axes[0]])
xtil = np.cos(ellang)*(x[ymin:ymax,xmin:xmax]-xsrc) + np.sin(ellang)*(y[ymin:ymax,xmin:xmax]-ysrc)
ytil = -np.sin(ellang)*(x[ymin:ymax,xmin:xmax]-xsrc) + np.cos(ellang)*(y[ymin:ymax,xmin:xmax]-ysrc)
rbox = aoverb * np.hypot(xtil, ytil / aoverb)
# Mask source
src = np.where(rbox < rad1)
expo[ymin:ymax,xmin:xmax][src] = 0.0
nsrc = nsrc + 1
print('Excluded %d sources' % (nsrc))
self.exposure = expo
[docs] def reset_exposure(self):
"""
Revert to the original exposure map and ignore the current region file
"""
self.exposure = self.defaultexpo
[docs] def dmfilth(self, outfile=None, smoothing_scale=8):
'''
Mask the regions provided in a region file and fill in the holes by interpolating the smoothed image into the gaps and generating a Poisson realization
:param outfile: If outfile is not None, file name to output the dmfilth image into a FITS file
:type outfile: str , optional
:param smoothing_scale: Size of smoothing scale (in pixel) to estimate the surface brightness distribution outside of the masked areas
:type smoothing_scale: int
'''
if self.img is None:
print('No data given')
return
# Apply source mask on image
chimg = np.where(self.exposure == 0.0)
imgc = np.copy(self.img)
imgc[chimg] = 0.0
# High-pass filter
print('Applying high-pass filter')
gsb = gaussian_filter(imgc, smoothing_scale)
gsexp = gaussian_filter(self.exposure, smoothing_scale)
#img_smoothed = np.nan_to_num(np.divide(gsb, gsexp)) * self.exposure
img_smoothed = np.nan_to_num(np.divide(gsb, gsexp))
img_smoothed[chimg] = 0.
# Interpolate
print('Interpolating in the masked regions')
y, x = np.indices(self.axes)
nonz = np.where(img_smoothed > 0.)
p_ok = np.array([x[nonz], y[nonz]]).T
vals = img_smoothed[nonz]
int_vals = np.nan_to_num(griddata(p_ok, vals, (x, y), method='cubic'))
# Fill holes
print('Filling holes')
area_to_fill = np.where(np.logical_and(int_vals > 0., self.exposure == 0))
dmfilth = np.copy(self.img)
dmfilth[area_to_fill] = np.random.poisson(int_vals[area_to_fill] * self.defaultexpo[area_to_fill])
self.filth = dmfilth
if outfile is not None:
hdu = fits.PrimaryHDU(dmfilth)
hdu.header = self.header
hdu.writeto(outfile, overwrite=True)
print('Dmfilth image written to file '+outfile)