Commit 73fe7323 authored by LUSTIG Peter's avatar LUSTIG Peter
Browse files

further work...

parent aab3851c
import numpy as np
from astropy.io import fits
from astropy.table import Table
import astropy.units as u
import matplotlib.pyplot as plt
from astropy.wcs import WCS
import warnings
def Completness1D(sources, fake_sources, threshold_range, nbins,
fakesourcelocmask=None):
if fakesourcelocmask is None:
fakesourcelocmask = np.zeros(len(fake_sources), dtype=bool)
plt.close('all')
def GetLocationMask(sources, mask, addmask=None):
if addmask is None:
addmask = np.zeros(len(sources), dtype=bool)
if sources is not None:
pos = np.array([sources['ymean'], sources['xmean']]).T
pos[addmask] = np.zeros(2)
locmask = pos_in_mask(pos, mask=mask, retindex=True)
locmask = locmask | addmask
else:
locmask = None
return locmask
min_threshold, max_threshold = threshold_range
def GetInsideMask(sources, shape, within=(0, 1)):
pos = np.array((sources['ymean'], sources['xmean'])).T
limits = shape * np.asarray(within)[:, np.newaxis]
inside = np.sum((pos >= limits[0]) & (pos <= limits[1] - 1), 1) == 2
outside = ~inside
nout = np.sum(outside)
if nout != 0:
warnings.warn("{} sources detected outside shape".format(nout),
UserWarning)
return outside
def pos_in_mask(pos, mask=None, nsources=1, extra=None, retindex=False):
"""Check if pos is in mask, issue warning with less than nsources
STOLEN FROM NIKAMAP AND MODIFIED TO RETURN INDEX
Parameters
----------
pos : array_like (N, 2)
pixel indexes (y, x) to be checked in mask
mask : 2D boolean array_like
corresponding mask
nsources : int
the requested number of sources
Returns
-------
:class:`numpy.ndarray`
the pixel indexes within the mask
"""
pos = np.asarray(pos)
inside = np.ones(pos.shape[0], dtype=bool)
if mask is not None:
pos_idx = np.floor(pos + 0.5).astype(int)
inside = ~mask[pos_idx[:, 0], pos_idx[:, 1]]
if retindex:
return ~inside
pos = pos[inside]
if pos.shape[0] < nsources:
warnings.warn("Only {} positions".format(pos.shape[0]), UserWarning)
if extra is None:
return pos
else:
return pos, extra[inside]
def AddXYCoordinatesTo(sources, wcs):
if sources is not None:
pos = np.array(wcs.wcs_world2pix(sources['ra'],
sources['dec'], 0)).T
sources['xmean'], sources['ymean'] = pos[:, 0], pos[:, 1]
return sources
class BasicEvaluation:
def __init__(self, sources, fake_sources, shape, wcs, mask=None):
self.sources = sources
self.fake_sources = fake_sources
self.shape = shape
self.wcs = wcs
self.ChangeMask(mask)
def ChangeMask(self, mask):
fake_sourceslocmask = np.zeros(len(fake_sources), dtype=bool)
sourceslocmask = np.zeros(len(sources), dtype=bool)
if mask is not None:
shape = self.shape
#
fake_sourceslocmask = GetLocationMask(fake_sources, mask)
# should check if corresponding fake_source is in mask or not
# sourceslocmask = fakesourceslocmask[sources['fake_sources']] ?
sourceslocmask = GetInsideMask(sources, shape)
sourceslocmask = GetLocationMask(sources, mask,
addmask=sourceslocmask)
self.sourceslocmask = sourceslocmask
self.fake_sourceslocmask = fake_sourceslocmask
class PCAreaEvaluation(BasicEvaluation):
def __init__(self, sources, fake_sources=None,
shape=None, wcs=None, threshold_edges=None, flux_edges=None,
flux_centers=None,
mask=None):
sources = AddXYCoordinatesTo(sources, wcs)
fake_sources = AddXYCoordinatesTo(fake_sources, wcs)
self.sources = sources
self.fake_sources = fake_sources
self.shape = shape
self.ChangeMask(mask)
self.threshold_edges = threshold_edges
self.flux_edges = flux_edges
self.flux_centers = flux_centers,
self.Completness()
def ChangeMask(self, mask):
fake_sourceslocmask = np.zeros(len(fake_sources), dtype=bool)
sourceslocmask = np.zeros(len(sources), dtype=bool)
if mask is not None:
shape = self.shape
fake_sourceslocmask = GetLocationMask(fake_sources, mask)
# should check if corresponding fake_source is in mask or not
# sourceslocmask = fakesourceslocmask[sources['fake_sources']] ?
sourceslocmask = GetInsideMask(sources, shape)
sourceslocmask = GetLocationMask(sources, mask,
addmask=sourceslocmask)
self.mask = mask
self.sourceslocmask = sourceslocmask
self.fake_sourceslocmask = fake_sourceslocmask
def Completness(self):
threshold_edges = self.threshold_edges
flux_edges = self.flux_edges
sources = self.sources
fake_sources = self.fake_sources
fake_sourceslocmask = self.fake_sourceslocmask
completness, norm_completness = Completness2D(
sources, fake_sources, threshold_edges, flux_edges,
fake_sourceslocmask)
self.completness = completness / norm_completness[:, np.newaxis]
def PlotCompletness1D(self, idx, constant='flux', linestyles=None,
labels=None, ax=None, **kwargs):
if type(idx) is int:
idx = np.array([idx])
nplots = len(idx)
if constant == 'thresh':
completnesses = self.completness[:, idx]
x = self.flux_centers
xlabel = 'Flux'
labelval = self.threshold_edges[idx]
elif constant == 'flux':
completnesses = self.completness[idx]
x = self.threshold_edges[:-1]
xlabel = 'Detection Threshold'
labelval = self.flux_centers[idx]
else:
raise UserWarning('you must chose \'flux\' or thresh for constant')
if linestyles is None:
linestyles = ['-'] * nplots
if labels is None:
labels = np.repeat(labels, nplots)
if ax is None:
fig = plt.figure(figsize=(7, 6))
fig.subplots_adjust(left=0.16, right=0.97, bottom=0.15, top=0.9)
ax = fig.add_subplot(111)
for i, (completness, linestyle, label) in enumerate(
zip(completnesses, linestyles,
labels)):
ax.plot(x, completness, linestyle=linestyle, label=label,
**kwargs)
ax.set_title('Completness', fontsize=30, y=1.02)
ax.set_xlabel(xlabel, fontsize=25)
ax.set_ylabel('Completness', fontsize=25)
ax.tick_params(axis='both', labelsize=20)
return ax
def Plot2D(self, ax=None, nfluxlabels=5, nthreshlabels=5):
completness = self.completness
flux = self.flux_centers
thresh = self.threshold_edges
if ax is None:
fig = plt.figure(figsize=(7,7))
ax = fig.add_subplot(111)
nflux, nthresh = completness.shape
fluxlabelidx = numpy.rint(np.linspace(0, nflux, nfluxlabels))
threshlabelidx = numpy.rint(np.linspace(0, nthresh, nthreshlabels))
threshlabel = np.array2string(np.round(
thresh[threshlabelidx], decimals=2),
separator=',')[1:-1].split(',')
ax.imshow(completness, origin='lower', aspect='auto')
class CountsAreaEvaluation(BasicEvaluation):
def __init__(self, sources,
shape=None, wcs=None, threshold_edges=None,
no_sim=1,
mask=None):
sources = AddXYCoordinatesTo(sources, wcs)
self.sources = sources
self.shape = shape
self.mask = mask
self.sourceslocmask = self.ChangeMask(mask)
self.threshold_edges = threshold_edges
self.no_sim = no_sim
self.counts = self.Counts()
def ChangeMask(self, mask):
sources = self.sources
sourceslocmask = np.zeros(len(sources), dtype=bool)
if mask is not None:
shape = self.shape
# should check if corresponding fake_source is in mask or not
# sourceslocmask = fakesourceslocmask[sources['fake_sources']] ?
sourceslocmask = GetInsideMask(sources, shape)
sourceslocmask = GetLocationMask(sources, mask,
addmask=sourceslocmask)
self.mask = mask
self.sourceslocmask = sourceslocmask
return sourceslocmask
def Counts(self):
false_counts = AbsoluteCounts(self.sources, self.threshold_edges,
self.sourceslocmask)
false_counts /= self.no_sim
self.counts = false_counts
return false_counts
def PlotCounts(self, ax=None, label=None, **kwargs):
data = self.counts
thresholds = self.threshold_edges[0:-1]
if label is None:
label = '{} Simulations'.format(self.no_sim)
ax, plot = Plot1D(
thresholds, data, title='Absolute Counts',
xlabel='Detection Threshold',
ylabel='Counts', label=label,
ax=ax, **kwargs)
fig = ax.get_figure()
fig.add_axes(ax)
# ax.set_xscale('log')
ax.set_yscale('log')
ax.legend(fontsize=20)
fig.subplots_adjust(left=0.16, right=0.97, bottom=0.15, top=0.9)
return ax
def Plot1D(x, y, title='', xlabel='', ylabel='', ax=None, **kwargs):
if ax is None:
fig = plt.figure(figsize=(7, 6))
fig.subplots_adjust(left=0.16, right=0.97, bottom=0.15, top=0.9)
ax = fig.add_subplot(111)
plot = ax.plot(x, y, **kwargs)
if title != '':
ax.set_title(title, fontsize=30, y=1.02)
if xlabel != '':
ax.set_xlabel(xlabel, fontsize=25)
if ylabel != '':
ax.set_ylabel(ylabel, fontsize=25)
ax.tick_params(axis='both', labelsize=20)
return ax, plot
def Completness2D(sources, fake_sources, thresholdbins, fluxbins,
fakesourceslocmask=None):
if fakesourceslocmask is None:
fakesourceslocmask = np.zeros(len(fake_sources), dtype=bool)
# get SNR for each fake source
fake_snr = np.ma.array(sources[fake_sources['find_peak']
.filled(0)]['SNR'],
mask=fake_sources['find_peak'].mask)
# SNR higher than max treshold is also detected...
fake_snr[fake_snr > max_threshold] = max_threshold
fake_snr[fake_snr > thresholdbins[-1]] = thresholdbins[-1]
bin_edges = np.linspace(min_threshold, max_threshold, nbins)
# add right edge to last bin (width not important)
bin_edges = np.append(bin_edges, bin_edges[-1]+1)
fake_flux = np.ma.array(fake_sources['amplitude'],
mask=fake_sources['find_peak'].mask)
# Number of sources that were detected and that are in region
# Sources that were detected and that are in region
compweights = (~np.array(fake_sources['find_peak'].mask
| fakesourcelocmask)).astype(int)
completness, _ = np.histogram(
fake_snr, bins=bin_edges,
weights=compweights)
completness = np.cumsum(completness[::-1], dtype=float)[::-1]
| fakesourceslocmask)).astype(int)
completness = np.histogram2d(fake_flux, fake_snr,
bins=(fluxbins, thresholdbins),
weights=compweights)[0]
completness = np.cumsum(completness[:, ::-1], dtype=float, axis=1)[:, ::-1]
# just take number of sources in area for normalisation
norm_comp = float(len(fake_sources[~fakesourcelocmask]))
norm_comp, _ = np.histogram(fake_sources['amplitude'], bins=fluxbins,
weights=(~fakesourceslocmask).astype(int))
return completness, norm_comp
def AbsoluteCounts(sources, threshold_bins, sourceslocmask=None):
if sourceslocmask is None:
sourceslocmask = np.zeros(len(sources), dtype=bool)
max_threshold = threshold_bins[-1]
sources_snr = sources['SNR']
sources_snr[sources_snr > max_threshold] = max_threshold
counts, _ = np.histogram(sources_snr, bins=threshold_bins,
weights=(~sourceslocmask).astype(int))
counts = np.cumsum(counts[::-1], dtype=float)[::-1]
return counts
def LogBinEdges(LogBinCenters):
centers = np.sort(np.unique(LogBinCenters))
nedges = len(centers) + 1
exponents = np.log10(centers)
step = np.mean(exponents[1:] - exponents[:-1])
edges = np.logspace(exponents[0] - .5 * step,
exponents[-1] + .5 * step, nedges)
return edges
class Purity:
def __init__(self, absolute_counts, false_counts, thresholds):
pur = (abs_counts - false_counts) / abs_counts
self.thresholds = thresholds
self.purity = pur
def Plot(self, ax=None, **kwargs):
purity = self.purity
thresholds = self.thresholds
title = 'Purity'
xlabel = 'Detection Threshold [SNR]'
ylabel = (r'$\frac{N_{\mathrm{abs}}-N_{\mathrm{false}}}'
'{N_{\mathrm{abs}}}$')
ax, _ = Plot1D(thresholds, purity, title=title, xlabel=xlabel,
ylabel=ylabel, ax=ax)
return ax
%matplotlib tk
showcompletness = True
showfalsecounts = False
showrealcounts = False
maskfilename = ('/home/peter/Dokumente/Uni/Paris/Stage/FirstSteps/'
'Completness/masks.fits')
with fits.open(maskfilename) as hdul:
hitmask = hdul['hitmask'].data.astype(bool)
hlsmask = hdul['hls3fwhm'].data.astype(bool)
wcs = WCS(hdul['hitmask'].header)
totmask = hlsmask | hitmask
shape = totmask.shape
threshbins = np.arange(2.5, 7, .5)
threshbins = np.linspace(2.5, 10, 100)
if showcompletness:
filename = ('/home/peter/Dokumente/Uni/Paris/Stage/FirstSteps/'
'Completness/DetectionNewrealisation.fits')
with fits.open(filename) as hdul:
sources = Table.read(hdul, 'DETECTED_SOURCES')
fake_sources = Table.read(hdul, 'FAKE_SOURCES')
fluxedges = LogBinEdges(fake_sources['amplitude']) * u.Jy
fluxcenters = np.sort(np.unique(fake_sources['amplitude']))
# %matplotlib tk
obj = PCAreaEvaluation(sources, fake_sources, shape=(501, 501), wcs=wcs,
threshold_edges=threshbins, flux_edges=fluxedges,
mask=totmask, flux_centers=fluxcenters)
obj.completness
obj.sourceslocmask
obj.fake_sourceslocmask
# obj.PlotCompletness1D(np.array([1, 25, 49]), constant='flux')
obj.Plot2D()
if showfalsecounts:
emptycountsfile = ('/home/peter/Dokumente/Uni/Paris/Stage/FirstSteps/'
'Completness/montecarlo_results/Newrealisation/'
'FalseDetections/180513_123146_NewRealisation_'
'flux0.00mJy_thresh2.5_nsources0_nsim1000_photFalse'
'_detectTrue.fits')
emptysources = Table.read(emptycountsfile)
threshbins = np.linspace(2.5, 10, 100)
fc = CountsAreaEvaluation(
emptysources,
shape=shape, wcs=wcs, threshold_edges=threshbins,
mask=totmask, no_sim=1000)
false_counts = fc.Counts()
#%matplotlib tk
nax = fc.PlotCounts(label='False\nDetections')
if showrealcounts:
realcountsfile = ('/home/peter/Dokumente/Uni/Paris/Stage/FirstSteps/'
'Completness/montecarlo_results/Newrealisation/'
'Realmap_dthresh2.5_photTrue.fits')
realsources = Table.read(realcountsfile)
rc = CountsAreaEvaluation(
realsources,
shape=shape, wcs=wcs, threshold_edges=threshbins,
mask=totmask, no_sim=1)
abs_counts = rc.Counts()
rc.PlotCounts(ax=nax, label='Absolute\nDetections')
pur = Purity(abs_counts, false_counts, threshbins[0:-1])
pur.Plot()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment