Commit 9b165143 authored by LUSTIG Peter's avatar LUSTIG Peter

multiprocess parallelisation

parent ec3c682c
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Mar 18 11:08:28 2018
@author: peter
"""
from mpi4py import MPI
import matplotlib.pyplot as plt
import numpy as np
import astropy.units as u
......@@ -14,25 +8,34 @@ from nikamap import NikaMap
#from completness import *
import myfunctions as mfct
from time import clock
import argparse
from multiprocess import Pool
from functools import partial
import sys
#import sys
import completness as cp
import os
import matplotlib.pyplot as plt
plt.close('all')
flux_bins = 2
flux_range = [0.1, 10]
threshold_bins = 5
threshold_range = [2, 5]
threshold_range = [3, 5]
threshold_range = [2, 10]
#threshold_range = [3, 5]
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
plt.ion()
nsim = 6
flux = 10 * u.mJy
noisefile = "../../N2CLS/ForMarseille/HLS091828_common_mode_kids_out/map_JK.fits"
method="unknown"
outfile = None
outdir = "results/"
ncore = 3
'''
import argparse
desc = "Place false sources with a defined flux on Jackknife maps, detect sources and calculate the purity and completness of the detection."
parser = argparse.ArgumentParser(description=desc)
#parser.add_argument("-n", "--noisefile", action="store",
......@@ -69,11 +72,13 @@ outdir = args.outdir
#flux range
#threshold bins
#threshold range
'''
############### Begin Calculation #############
sim_per_core = mfct.Distribute_Njobs(size, nsim)
sim_per_core = mfct.Distribute_Njobs(ncore, nsim)
data = NikaMap.read(noisefile)
......@@ -93,56 +98,73 @@ wcs_threshold = wcs_4D.sub([3])
assert np.all(np.abs(wcs_threshold.all_pix2world(np.arange(threshold_bins+1)-0.5, 0) - threshold_edges) < 1e-15)
wcs_flux = wcs_4D.sub([4])
assert np.all(np.abs(wcs_flux.all_pix2world(np.arange(flux_bins+1)-0.5, 0) - fluxes_edges.value) < 1e-13)
comm.Barrier()
if rank == 0:
T0 = clock()
comm.Barrier()
T0 = clock()
comp, comp_n, pur, pur_n = cp.completness_purity_2(flux, nsim=sim_per_core[rank],
print('------------')
#data.plot()
#plt.show(block=True)
helpfunc = partial(cp.completness_purity_2, flux, **{"nsources":8**2, "within":(0, 1),
"shape":shape_4D[0:3],
"wcs":wcs_4D.sub([1, 2, 3]), "jk_map":data})
'''
comp, comp_n, pur, pur_n = cp.completness_purity_2(flux, nsim=sim_per_core,
nsources=8**2, within=(0, 1),
shape=shape_4D[0:3],
wcs=wcs_4D.sub([1, 2, 3]), jk_map=data)
'''
comm.Barrier()
'''
all_comp = comm.gather(comp, root=0)
all_comp_n = comm.gather(comp_n, root=0)
all_pur = comm.gather(pur, root=0)
all_pur_n = comm.gather(pur_n, root=0)
'''
#print('-----before-------')
#a, b, c, d = helpfunc(nsim=3)
#print('-----after-------')
#sys.exit()
pool = Pool(ncore)
print(sim_per_core)
tmpres = pool.map(helpfunc, sim_per_core)
tmpres = list(zip(*tmpres))
all_comp = np.sum(tmpres[0], axis=0)
all_comp_n = np.sum(tmpres[1], axis=0)
all_pur = np.sum(tmpres[2], axis=0)
all_pur_n = np.sum(tmpres[3], axis=0)
print(all_comp.shape)
print(all_comp_n.shape)
with np.errstate(divide='ignore', invalid='ignore'):
completness = all_comp / all_comp_n[..., np.newaxis]
purity = all_pur / all_pur_n
#print(len(tmpres))
#print(np.array(comp).shape)
if rank == 0:
if (method == "unknown" and "/" in noisefile):
method = noisefile.split("/")[-2]
if outfile is None:
outfile = "completness-" + method + "-flux_{:04.0f}-nsim_{:03}.fits".format(flux.value, nsim)
if outdir != "":
if outdir[-1] != "/":
outdir = outdir + "/"
if not os.path.exists(outdir):
os.makedirs(outdir)
outfile = outdir + outfile
all_comp = np.sum(all_comp, axis=0)
all_comp_n = np.sum(all_comp_n, axis=0)
all_pur = np.sum(all_pur, axis=0)
all_pur_n = np.sum(all_pur_n, axis=0)
if (method == "unknown" and "/" in noisefile):
method = noisefile.split("/")[-2]
with np.errstate(divide='ignore', invalid='ignore'):
completness = all_comp / all_comp_n[..., np.newaxis]
purity = all_pur / all_pur_n
if outfile is None:
outfile = "completness-" + method + "-flux_{:04.0f}-nsim_{:03}.fits".format(flux.value, nsim)
if outdir != "":
if outdir[-1] != "/":
outdir = outdir + "/"
cp.WriteResult(outfile, threshold, completness, purity, nsim, wcs_4D, method, flux)
cp.Plot_CompPur(completness, purity, threshold, nsim=nsim, savename=outfile[0:-5]+"-plot.png")
if not os.path.exists(outdir):
os.makedirs(outdir)
T = clock()-T0
print('Total calculation time: {:.0f}'.format(T))
outfile = outdir + outfile
cp.WriteResult(outfile, threshold, completness, purity, nsim, wcs_4D, method, flux)
cp.Plot_CompPur(completness, purity, threshold, nsim=nsim, savename=outfile[0:-5]+"-plot.png")
comm.Barrier()
T = clock()-T0
print('Total calculation time: {:.0f}s'.format(T))
#Branch test
from __future__ import absolute_import, division, print_function
from pathlib import Path
......@@ -19,7 +18,6 @@ from scipy.optimize import curve_fit
from nikamap import NikaMap, Jackknife
from nikamap.utils import pos_uniform
from mpi4py import MPI
from IPython import get_ipython
ipython = get_ipython()
......@@ -34,11 +32,6 @@ if '__IPYTHON__' in globals():
ipython.magic('autoreload 2')
ipython.magic('matplotlib tk')
DATA_DIR = "/home/peter/Dokumente/Uni/Paris/Stage/N2CLS/ForMarseille/HLS091828_common_mode_one_block/"
def add_axis(name, range, bins, unit=None, i_axe=3, log=False):
"""Define a dictionnary for additionnal wcs axes (linear or log)"""
......@@ -231,8 +224,10 @@ def completness_purity(flux, wcs=None, shape=None, nsources=8**2, within=(1 / 4,
norm_pur = np.zeros(shape, dtype=np.float)
jk_iter = Jackknife(jk_filenames, n=nsim)
print("Begin loop")
for img in ProgressBar(jk_iter):
print("In loop")
# %load_ext snakeviz
# %snakeviz the following line.... all is spend in the find_peaks / fit_2d_gaussian
# TODO: Change the find_peaks routine, or maybe just the fit_2d_gaussian to be FAST ! (Maybe look into gcntrd.pro routine or photutils.centroid.centroid_1dg maybe ?)
......@@ -264,6 +259,7 @@ def completness_purity(flux, wcs=None, shape=None, nsources=8**2, within=(1 / 4,
def completness_purity_2(flux, nsim=2, wcs=None, shape=None, nsources=8**2, within=(1 / 4, 3 / 4), jk_map=None):
"""Compute completness map for a given flux"""
#print(flux)
# wcs_celestial = wcs.celestial
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment