Commit e16ed33a authored by Alexandre Beelen's avatar Alexandre Beelen
Browse files

fix: multiprocessing now works, main function created

parent f3bf73e1
......@@ -2,6 +2,7 @@ from __future__ import absolute_import, division, print_function
from pathlib import Path
import os
import uuid
import numpy as np
import matplotlib.pyplot as plt
......@@ -19,7 +20,7 @@ from scipy.optimize import curve_fit
from nikamap import NikaMap, Jackknife
from nikamap.utils import pos_uniform
from astropy.io import fits
from time import clock
from time import process_time
import argparse
import sys
import datetime
......@@ -27,44 +28,32 @@ from astropy.io.fits import ImageHDU
from copy import deepcopy, copy
from utils import CombineMeasurements
import binascii
#import dill as pickle
plt.close('all')
# import dill as pickle
'''
plt.close("all")
"""
%load_ext autoreload
%autoreload 2
%matplotlib tk
'''
"""
plt.ion()
def get_size(obj, seen=None):
"""Recursively finds size of objects"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([get_size(v, seen) for v in obj.values()])
size += sum([get_size(k, seen) for k in obj.keys()])
elif hasattr(obj, '__dict__'):
size += get_size(obj.__dict__, seen)
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes,
bytearray)):
size += sum([get_size(i, seen) for i in obj])
return size
def fake_worker(img, min_threshold=2, nsources=8**2, flux=1*u.Jy,
within=(0, 1), cat_gen=pos_uniform,
retmask=True, detection=True, photometry=False, **kwargs):
def worker(
img,
min_threshold=2,
nsources=8 ** 2,
flux=1 * u.Jy,
within=(0, 1),
cat_gen=pos_uniform,
retmask=True,
detection=True,
photometry=False,
**kwargs,
):
"""The completness purity worker, create a fake dataset from an jackknifed
image and return catalogs
......@@ -78,8 +67,7 @@ def fake_worker(img, min_threshold=2, nsources=8**2, flux=1*u.Jy,
arguments for source injection
"""
# print('using Jackknife object', jkiter)
assert detection or photometry, ('you have to chose either detection or '
'photometry or both')
assert detection or photometry, "you have to chose either detection or " "photometry or both"
# np.random.seed(int(os.urandom(4).hex(), 16))
np.random.seed(int(binascii.hexlify(os.urandom(4)), 16))
......@@ -91,8 +79,7 @@ def fake_worker(img, min_threshold=2, nsources=8**2, flux=1*u.Jy,
# Actually rather slow... maybe check the code ?
# print(flux)
if flux is not None and (nsources != 0):
img.add_gaussian_sources(nsources=nsources, within=within,
peak_flux=flux, cat_gen=cat_gen, **kwargs)
img.add_gaussian_sources(nsources=nsources, within=within, peak_flux=flux, cat_gen=cat_gen, **kwargs)
if detection:
# ... match filter it ...
......@@ -137,202 +124,201 @@ def CreateParallelJackkifes(jkiter, parity_threshold):
jk = jkiter(parity_threshold=parity_threshold)
return jk
##############################################################
def main(
data_dir,
output_dir=None,
pattern="**/*.fits",
label=None,
min_detection_threshold=4,
photometry=True,
detection=True,
band="1mm",
):
"""Process all maps in a given directory
Parameters
----------
data_dir : Path or str
the input path for the jackknifes maps
output_dir : Path or str, optional
the output directory, by default '.'
pattern : str, optional
the pattern to look for maps, by default '**/*.fits', all fits file in all sub directories
label : str
the output label for this jackknife
min_detection_threshold : float
minimum detection thresdhold, by default 4
photometry : bool
do the photometry of the sources (?), by default True
detection : bool
do the detection of the sources (?), by default True
band : str, optionnal
the band to real (for IDL maps), by default '1mm'
"""
method = 211
if label is None:
label = str(uuid.uuid4())
min_detection_threshold = 4
photometry = True
detection = True
if output_dir is None:
output_dir = Path(".")
flux = np.geomspace(0.1, 10, 50) * u.mJy #np.geomspace(0.1, 10, 50) for 1mm
nsim = 1000
ncores = 24
nsources = 50
output_dir = Path(output_dir) / label
parity_threshold = 0 # incomplete map when using 1 as threshold
band = '1mm'
if not output_dir.exists():
print("creating directory {}".format(output_dir))
output_dir.mkdir()
method_dir = Path(f'{method}')
outdir = Path(f"{band}")#montecarlo_results/
# TODO: Shall we pass that as argument also ?
fluxes = np.geomspace(0.1, 10, 50) * u.mJy # np.geomspace(0.1, 10, 50) for 1mm
nsim = 1000
nsources = 50
#if 1:
# np.random.seed(5)
# flux = np.array([10]) * u.mJy
# nsim = 1
# ncores = 2
# nsources = 2
# outdir = Path("testdir")
parity_threshold = 0 # incomplete map when using 1 as threshold
filenamecomment = 'NewRealisation_'
##############################################################
filenamecomment = "NewRealisation_"
# DATA_DIR_SERVER = Path("/data/NIKA/Reduced/"
# "HLS091828_common_mode_one_block/v_1")
# DATA_DIR_SERVER = Path("/data/NIKA/Reduced/May2018/"
# "HLS091828_common_mode_one_block/v_1/")
#DATA_DIR_SERVER = Path("/data/NIKA/Reduced/May2018/HLS091828_atm_and"
# "_all_box_iter_IterativeMM_CosSinElevOffset/v_1")
#DATA_DIR_MYPC = Path("/home/peter/Dokumente/Uni/Paris/Stage/data/v_1")
DATA_DIR_SERVER = Path(f"/data/PHOTOM/Reduced/23515M/GOODSNORTH/{method}/iter15/v_1")
DATA_DIR_MYPC = Path("/home/lbing/data/purity_completeness")
outdir = DATA_DIR_MYPC / method_dir / outdir
message = '''\
###############################################################
# Running Monte Carlo Tests #
# #
# No Simulations per flux: {:5d} #
# Number of CPUs used: {:5d} #
# Minimum Detection Threshold:{:5.1f} #
# No Different Fluxes: {:5d} #
# Number of Injected Sources: {:5d} #
# Photometry: {} #
# Detection: {} #
# Band: {} #
###############################################################
'''.format(nsim, ncores, min_detection_threshold, len(flux), nsources,
photometry, detection, band)
print(message, "\n")
timeprefix = '{date:%y%m%d_%H%M%S}_'.format(date=datetime.datetime.now())
timeprefix += filenamecomment
if not outdir.exists():
print('creating directory {}'.format(outdir))
os.makedirs(outdir)
print("Creating Jackkife Object")
''' load maps for jackkifing'''
t0 = clock()
if DATA_DIR_SERVER.exists():
DATA_DIR = DATA_DIR_SERVER
elif DATA_DIR_MYPC.exists():
DATA_DIR = DATA_DIR_MYPC
else:
sys.exit("Raw data path not found. Exit.")
jk_filenames = list(Path(DATA_DIR).glob('*/map.fits'))
for i in range(len(jk_filenames)):
jk_filenames[i] = str(jk_filenames[i])
jk_iter = Jackknife(jk_filenames, band=band, n=nsim,
parity_threshold=parity_threshold)
print('Done in {:.2f}s'.format(clock()-t0))
print('Begin Monte Carlo')
p = Pool(ncores)
if nsources == 0:
flux = np.array([0]) * u.mJy
ii = 0
for _flux in flux:
print("Creating {} Jackkife Maps".format(nsim))
to = clock()
jackknifes = []
for i in range(nsim):
jackknifes.append(jk_iter())
if ii == 0:
if i == 0:
jk_std = jackknifes[0].data
jk_std = jk_std[np.newaxis,:,:]
else:
jk_std = np.append(jk_std,jackknifes[i].data[np.newaxis,:,:],axis=0)
if ii == 0:
jk_std = np.std(jk_std,axis=0)
print('Done in {:.2f}s'.format(clock()-t0))
ii += 1
min_source_dist = 2 * jackknifes[0].beam.fwhm_pix.value
helpfunc = partial(fake_worker, **{'min_threshold':
min_detection_threshold,
'nsources': nsources, 'flux': _flux,
'within': (0, 1),
'cat_gen': pos_uniform,
'dist_threshold': min_source_dist,
'photometry': photometry,
'detection': detection})
if nsources != 0:
print('Simulation with {:.2f}...'.format(_flux))
else:
print('simulation without source injection')
message = """\
###############################################################
# Running Monte Carlo Tests #
# #
# No Simulations per flux: {:5d} #
# Number of CPUs used: {:5d} #
# Minimum Detection Threshold:{:5.1f} #
# No Different Fluxes: {:5d} #
# Number of Injected Sources: {:5d} #
# Photometry: {} #
# Detection: {} #
# Band: {} #
###############################################################
""".format(
nsim, cpu_count(), min_detection_threshold, len(fluxes), nsources, photometry, detection, band
)
print(message, "\n")
timeprefix = "{date:%y%m%d_%H%M%S}_".format(date=datetime.datetime.now())
timeprefix += filenamecomment
print("Creating Jackkife Object")
t0 = process_time()
jk_filenames = Path(data_dir).glob("*/map.fits")
jk_iter = Jackknife(jk_filenames, band=band, n=nsim, parity_threshold=parity_threshold)
print("Done in {:.2f}s".format(process_time() - t0))
print("Begin Monte Carlo")
if nsources == 0:
fluxes = np.array([0]) * u.mJy
for flux in fluxes:
print("Creating {} Jackkife Maps".format(nsim))
to = process_time()
jackknifes = []
# TODO: This take a LOT of time and could be done in parallel in the worker, as needed, need to see how to pass the Jackknife object to the worker...
jk_std = []
for i in ProgressBar(range(nsim)):
jackknifes.append(jk_iter())
jk_std.append(jackknifes[i].data)
jk_std = np.std(jk_std, axis=0)
print("Done in {:.2f}s".format(process_time() - t0))
min_source_dist = 2 * jackknifes[0].beam.fwhm_pix.value
helpfunc = partial(
worker,
**{
"min_threshold": min_detection_threshold,
"nsources": nsources,
"flux": flux,
"within": (0, 1),
"cat_gen": pos_uniform,
"dist_threshold": min_source_dist,
"photometry": photometry,
"detection": detection,
},
)
if nsources != 0:
print("Simulation with {:.2f}...".format(flux))
else:
print("simulation without source injection")
with Pool(cpu_count()) as p:
res = p.map(helpfunc, jackknifes)
# Should work, but don't... investigate :
# res = ProgressBar.map(helpfunc, jackknifes, multiprocess=cpu_count())
if(1):
res = p.map(helpfunc, jackknifes)
res = list(zip(*res))
(DETECTED_SOURCES,
FAKE_SOURCES,
immask,
matchmask) = res
else:
DETECTED_SOURCES = []
FAKE_SOURCES = []
immask = []
matchmask = []
with ProgressBar(nsim) as bar:
for iloop in range(nsim):
(tmpsource,
tmpfakesource,
_immask,
_matchmask) = helpfunc(jackknifes[iloop])
DETECTED_SOURCES.append(tmpsource)
FAKE_SOURCES.append(tmpfakesource)
immask.append(_immask)
matchmask.append(_matchmask)
bar.update()
# merge calalogues to one:
sources, fake_sources = CombineMeasurements(DETECTED_SOURCES, FAKE_SOURCES)
# create global masks:
immask = np.sum(immask, axis=0)
matchmask = np.sum(matchmask, axis=0)
fname = ('flux{:.3f}mJy_thresh{}_nsources{}_nsim{}_phot{}_detect{}.fits'
.format(_flux.to_value(unit=u.mJy),
min_detection_threshold, nsources, nsim, photometry,
detection))
fname = timeprefix + fname
outfile = outdir / fname
stdname = f'stddev.fits'
stdfile = outdir / stdname
phdu = fits.PrimaryHDU(header=jackknifes[0].wcs.to_header())
phdu.header['influx'] = '{}'.format(_flux)
phdu.header['nsim'] = nsim
phdu.header['sourcespersim'] = nsources
phdu.header['dthresh'] = min_detection_threshold
hdul = [phdu]
if len(sources) > 0:
hdul.append(fits.BinTableHDU(data=sources,
name='Detected_Sources'))
if len(fake_sources) > 0:
hdul.append(fits.BinTableHDU(data=fake_sources, name='Fake_Sources'))
hdul.append(ImageHDU(data=immask, name='imagemask'))
hdul.append(ImageHDU(data=matchmask, name='matchmask'))
hdul = fits.HDUList(hdul)
hdul.writeto(outfile, overwrite=False)
if ii == 0:
stdhdu = fits.PrimaryHDU()
stdhdu = [stdhdu]
stdhdu.append(ImageHDU(data=jk_std,name='stddev_from_jackknife',header=jackknifes[0].wcs.to_header()))
stdhdu = fits.HDUList(stdhdu)
stdhdu.writeto(stdfile,overwrite=False)
print('results written to {}'.format(outfile))
(DETECTED_SOURCES, FAKE_SOURCES, immask, matchmask) = res
# merge calalogues to one:
sources, fake_sources = CombineMeasurements(DETECTED_SOURCES, FAKE_SOURCES)
# create global masks:
immask = np.sum(immask, axis=0)
matchmask = np.sum(matchmask, axis=0)
fname = "flux{:.3f}mJy_thresh{}_nsources{}_nsim{}_phot{}_detect{}.fits".format(
flux.to_value(unit=u.mJy), min_detection_threshold, nsources, nsim, photometry, detection
)
fname = timeprefix + fname
outfile = output_dir / fname
stdname = f"stddev.fits"
stdfile = output_dir / stdname
phdu = fits.PrimaryHDU(header=jackknifes[0].wcs.to_header())
phdu.header["influx"] = "{}".format(flux)
phdu.header["nsim"] = nsim
phdu.header["sourcespersim"] = nsources
phdu.header["dthresh"] = min_detection_threshold
hdul = [phdu]
if len(sources) > 0:
hdul.append(fits.BinTableHDU(data=sources, name="Detected_Sources"))
if len(fake_sources) > 0:
hdul.append(fits.BinTableHDU(data=fake_sources, name="Fake_Sources"))
hdul.append(ImageHDU(data=immask, name="imagemask"))
hdul.append(ImageHDU(data=matchmask, name="matchmask"))
hdul = fits.HDUList(hdul)
hdul.writeto(outfile, overwrite=False)
stdhdu = fits.PrimaryHDU()
stdhdu = [stdhdu]
stdhdu.append(ImageHDU(data=jk_std, name="stddev_from_jackknife", header=jackknifes[0].wcs.to_header()))
stdhdu = fits.HDUList(stdhdu)
stdhdu.writeto(stdfile, overwrite=False)
print("results written to {}".format(outfile))
if __name__ == "__main__":
##############################################################
# DATA_DIR_SERVER = Path("/data/NIKA/Reduced/"
# "HLS091828_common_mode_one_block/v_1")
# DATA_DIR_SERVER = Path("/data/NIKA/Reduced/May2018/"
# "HLS091828_common_mode_one_block/v_1/")
# DATA_DIR_SERVER = Path("/data/NIKA/Reduced/May2018/HLS091828_atm_and"
# "_all_box_iter_IterativeMM_CosSinElevOffset/v_1")
# DATA_DIR_MYPC = Path("/home/peter/Dokumente/Uni/Paris/Stage/data/v_1")
band = "1mm"
method = 211
method_dir = Path(f"{method}")
DATA_DIR_SERVER = Path(f"/data/PHOTOM/Reduced/23515M/GOODSNORTH/{method}/iter15/v_1")
DATA_DIR_MYPC = Path("/home/lbing/data/purity_completeness")
outdir = DATA_DIR_MYPC
main(DATA_DIR_SERVER, output_dir=outdir, label=str(method), band=band)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment