Commit 31c30672 authored by Médéric Boquien's avatar Médéric Boquien

The initial implementation of the parallel analysis ended up grinding pcigale...

The initial implementation of the parallel analysis ended up grinding pcigale to a halt. This was due to the numerous array transfers between the main process and subprocesses. To solve this problem, we now share arrays through a module. This has the advantage
that thanks to copy-on-write, we almost never need to actually copy arrays. Now all the subprocessing run at full speed. Quite a few changes for a patch that only starting with the activation of cache clearing.
parent 2ea03335
# This is a dummy module to store the data that need to be shared between all
# processes. In pratice, once set these data should never change. Note that we
# cannot initialise thoses variables in subprocesses.
# Variables known at init
warehouse = []
creation_modules = []
creation_modules_params = []
analysed_variables = []
save_best_sed = []
save_chi2 = []
save_pdf = []
filters = []
# Variables that need to be initilised after having computed the models
model_fluxes = []
model_variables = []
model_redshifts = []
model_info = []
mass_proportional_info = []
has_sfh = []
info_keys = []
redshifts = []
w_redshifts = []
......@@ -24,19 +24,18 @@ reduced χ²) is given for each observation.
"""
import os
import numpy as np
from collections import OrderedDict
from datetime import datetime
from itertools import repeat
import multiprocessing as mp
from ...utils import read_table
from .. import AnalysisModule, complete_obs_table
from .utils import save_table_analysis, save_table_best
from .utils import save_table_analysis, save_table_best, backup_dir
from ...warehouse import SedWarehouse
from ...data import Database
from .workers import sed as worker_sed
from .workers import analysis as worker_analysis
from ..utils import find_changed_parameters
import pcigale.analysis_modules.myglobals as gbl
# Tolerance threshold under which any flux or error is considered as 0.
TOLERANCE = 1e-12
......@@ -84,9 +83,11 @@ class PdfAnalysis(AnalysisModule):
creation_modules_params, parameters, cores):
"""Process with the psum analysis.
The analysis is done in two nested loops: over each observation and
over each theoretical SEDs. We first loop over the SEDs to limit the
number of time the SEDs are created.
The analysis is done in two steps which can both run on multiple
processors to run faster. The first step is to compute all the fluxes
associated with each model as well as ancillary data such as the SED
information. The second step is to carry out the analysis of each
object, considering all models at once.
Parameters
----------
......@@ -106,176 +107,144 @@ class PdfAnalysis(AnalysisModule):
"""
print("Initialising the analysis module... ")
# Rename the output directory if it exists
if os.path.exists(OUT_DIR):
new_name = datetime.now().strftime("%Y%m%d%H%M") + "_" + OUT_DIR
os.rename(OUT_DIR, new_name)
print("The existing {} directory was renamed to {}".format(
OUT_DIR,
new_name
))
os.mkdir(OUT_DIR)
# Get the parameters
analysed_variables = parameters["analysed_variables"]
save = (parameters["save_best_sed"].lower() == "true",
parameters["save_chi2"].lower() == "true",
parameters["save_pdf"].lower() == "true")
backup_dir(OUT_DIR)
# Get the parameters. They are stored in a shared module so that this
# can be accessed by subprocesses during the model generation and
# analysis. This avoids transmitting the exact same data all over again
gbl.analysed_variables = parameters["analysed_variables"]
gbl.creation_modules = creation_modules
gbl.creation_modules_params = creation_modules_params
gbl.save_best_sed = parameters["save_best_sed"].lower() == "true"
gbl.save_chi2 = parameters["save_chi2"].lower() == "true"
gbl.save_pdf = parameters["save_pdf"].lower() == "true"
# Get the needed filters in the pcigale database. We use an ordered
# dictionary because we need the keys to always be returned in the
# same order.
# same order. We also put the filters in the shared modules as they
# are needed to compute the fluxes during the models generation.
with Database() as base:
filters = OrderedDict([(name, base.get_filter(name))
for name in column_list
if not name.endswith('_err')])
gbl.filters = OrderedDict([(name, base.get_filter(name))
for name in column_list
if not name.endswith('_err')])
# Read the observation table and complete it by adding error where
# none is provided and by adding the systematic deviation.
obs_table = complete_obs_table(
read_table(data_file),
column_list,
filters,
TOLERANCE
)
##################################################################
# Model computation #
##################################################################
obs_table = complete_obs_table(read_table(data_file), column_list,
gbl.filters, TOLERANCE)
print("Computing the models fluxes...")
# First, we compute for all the possible theoretical models (one for
# each parameter set in sed_module_parameters) the fluxes in all the
# filters. These fluxes are stored in:
# model_fluxes:
# - axis 0: model index
# - axis 1: filter index
# We use a numpy masked array to mask the fluxes of models that would
# be older than the age of the Universe at the considered redshift.
# The values for the analysed variables are stored in:
# model_variables:
# - axis 0: the model index in sed_module_params
# - axis 1: the variable index in analysed_variables
# For convenience, the redshift of each model is stored in
# model_redshift.
model_fluxes = np.ma.empty((len(creation_modules_params),
len(filters)))
model_variables = np.ma.empty((len(creation_modules_params),
len(analysed_variables)))
model_redshift = np.empty(len(creation_modules_params))
# We keep the information (i.e. the content of the sed.info
# dictionary) for each model.
model_info = [None] * len(creation_modules_params)
# First we get a dummy sed to obtain some information on the
# parameters, such as which ones are dependent on the normalisation
# factor of the fit.
# The SED warehouse is used to retrieve SED corresponding to some
# modules and parameters.
with SedWarehouse(cache_type=parameters["storage_type"]) as warehouse,\
mp.Pool(processes=cores) as pool:
# First we get a dummy sed to obtain some information on the
# parameters, such as which ones are dependent on the normalisation
# factor of the fit.
sed = warehouse.get_sed(creation_modules,
creation_modules_params[0])
items = pool.starmap(worker_sed, zip(repeat(warehouse),
repeat(creation_modules),
creation_modules_params,
repeat(analysed_variables),
repeat(filters)))
pool.close()
pool.join()
for idx_item, item in enumerate(items):
model_fluxes[idx_item, :] = item[0]
model_variables[idx_item, :] = item[1]
model_redshift[idx_item] = item[2]
model_info[idx_item] = item[3]
del items
# Mask the invalid fluxes
model_fluxes = np.ma.masked_less(model_fluxes, -90)
##################################################################
# Observations to models comparison #
##################################################################
# We compute the χ² only for models with the closest redshift. We
# extract model fluxes and information into arrays dedicated to a
# given observation.
# This is a tricky part here. The data arrays we have computed are for
# all redshifts. However, for memory efficiency concerns, we want to
# transmit to the worker arrays containing only data corresponding to
# the redshift of the observed object. The idea here is that we will
# constructs these arrays thanks to generators. Thw basic algorithm is
# the following:
# 1) We compute the set of unique redshifts (as we do not have access
# to the configuration file)
# 2) We build a dictionary containing the indices of the models at a
# given redshift
# 3) We find for each observation, which is the closest redshift
# 4) We build the generators slicing the input arrays with the indices
# corresponding to the observation redshift
redshifts = np.unique(model_redshift)
w_redshifts = {redshift: model_redshift == redshift
for redshift in redshifts}
closest_redshifts = [redshifts[np.abs(obs_redshift -
redshifts).argmin()]
for obs_redshift in obs_table['redshift']]
model_fluxes_obs = (model_fluxes[w_redshifts[redshift], :]
for redshift in closest_redshifts)
model_info_obs = (np.array(model_info)[w_redshifts[redshift]]
for redshift in closest_redshifts)
model_variables_obs = (model_variables[w_redshifts[redshift], :]
for redshift in closest_redshifts)
gbl.warehouse = SedWarehouse(cache_type=parameters["storage_type"])
sed = gbl.warehouse.get_sed(gbl.creation_modules,
gbl.creation_modules_params[0])
gbl.info_keys = list(sed.info.keys())
gbl.mass_proportional_info = sed.mass_proportional_info
gbl.has_sfh = sed.sfh is not None
# Then, for all possible theoretical models (one for each paremeter set
# in creation_module_parameters), we compute 1) the fluxes in all the
# filters, 2) the values of the model variables, 3) the redshift of the
# models (this is important for the analysis to easily identify which
# models correspond to a given redshift), and 4) the sed.info values
# (without the keys to save space ; this is possible because we are
# using an OrderedDict so it is easy to retrieve the value for a known
# key using gbl.info_keys). Note that we could compute the last two
# elements in the main process but we would lose time doing so. Not
# necessarily much though. TODO: benchmark this.
# In order not to clog the warehouse memory with SEDs that will not be
# used again during the SED generation, we identify which parameter has
# changed, invalidating models with this parameter
changed_pars = find_changed_parameters(creation_modules_params)
# Compute the SED fluxes and ancillary data in parallel
with mp.Pool(processes=cores) as pool:
items = pool.starmap(worker_sed, zip(creation_modules_params,
changed_pars))
# We create the arrays to store the model fluxes and ancillary data.
# They are stored in a shared module so they are easily accessible by
# the processes that will carry out the analysis, without requiring any
# data transfer that will grind pcigale to a halt. We use a numpy
# masked array to mask the fluxes of models that would be older than
# the age of the Universe at the considered redshift.
gbl.model_fluxes = np.ma.empty((len(creation_modules_params),
len(gbl.filters)))
gbl.model_variables = np.ma.empty((len(creation_modules_params),
len(gbl.analysed_variables)))
gbl.model_redshifts = np.empty(len(creation_modules_params))
gbl.model_info = [None] * len(creation_modules_params)
# Unpack the computed data into their respective arrays.
for idx_item, item in enumerate(items):
gbl.model_fluxes[idx_item, :] = item[0]
gbl.model_variables[idx_item, :] = item[1]
gbl.model_redshifts[idx_item] = item[2]
gbl.model_info[idx_item] = item[3]
# We transform model_info into an array as it is easier for
# selecting only models are the right redshift
gbl.model_info = np.array(gbl.model_info)
print('Analysing models...')
# Mask the invalid fluxes
gbl.model_fluxes = np.ma.masked_less(gbl.model_fluxes, -90)
# To make it easy to subprocesses to identify which parts of the
# various arrays correspond to a given redshift we 1) make a list of
# model redshifts, and 2) a dictionary indicatin the indices
# corresponding to a given redshift. These two objects are assigned to
# the shared module.
gbl.redshifts = np.unique(gbl.model_redshifts)
gbl.w_redshifts = {redshift: gbl.model_redshifts == redshift
for redshift in gbl.redshifts}
# Analysis of each object in parallel. All the model data are
# transmitted through a shared module to avoid memory copies that would
# grind pcigale to a halt.
with mp.Pool(processes=cores) as pool:
items = pool.starmap(worker_analysis,
zip(obs_table,
model_fluxes_obs,
model_variables_obs,
model_info_obs,
repeat(filters),
repeat(sed),
repeat(analysed_variables),
repeat(creation_modules),
repeat(creation_modules_params),
repeat(save)))
pool.close()
pool.join()
analysed_averages = np.empty((len(items), len(analysed_variables)))
items = pool.starmap(worker_analysis, zip(obs_table))
# Local arrays where to unpack the results of the analysis
analysed_averages = np.empty((len(items), len(gbl.analysed_variables)))
analysed_std = np.empty_like(analysed_averages)
chi2_ = np.empty(len(obs_table))
chi2_red = np.empty_like(chi2_)
normalisation_factors = np.empty_like(chi2_)
variables = [None] * len(items)
fluxes = np.empty((len(items), len(filters)))
best_fluxes = np.empty((len(items), len(gbl.filters)))
best_parameters = [None] * len(items)
best_normalisation_factors = np.empty(len(obs_table))
best_chi2 = np.empty_like(best_normalisation_factors)
best_chi2_red = np.empty_like(best_normalisation_factors)
for item_idx, item in enumerate(items):
analysed_averages[item_idx, :] = item[0]
analysed_std[item_idx, :] = item[1]
chi2_[item_idx] = item[2]
chi2_red[item_idx] = item[3]
normalisation_factors[item_idx] = item[4]
variables[item_idx] = item[5]
fluxes[item_idx, :] = item[6]
best_fluxes[item_idx, :] = item[2]
best_parameters[item_idx] = item[3]
best_normalisation_factors[item_idx] = item[4]
best_chi2[item_idx] = item[5]
best_chi2_red[item_idx] = item[6]
del items
print("Saving results...")
save_table_analysis(obs_table['id'], analysed_variables,
save_table_analysis(obs_table['id'], gbl.analysed_variables,
analysed_averages, analysed_std)
save_table_best(obs_table['id'], chi2_, chi2_red,
normalisation_factors, variables, fluxes, filters, sed)
save_table_best(obs_table['id'], best_chi2, best_chi2_red,
best_normalisation_factors, best_parameters,
best_fluxes, gbl.filters)
print("Run completed!")
# AnalysisModule to be returned by get_module
Module = PdfAnalysis
......@@ -5,11 +5,14 @@
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
# Author: Yannick Roehlly & Médéric Boquien
from datetime import datetime
import os
from astropy.table import Table, Column
import numpy as np
from scipy.stats import gaussian_kde
from scipy.linalg import LinAlgError
from ...warehouse import SedWarehouse
import pcigale.analysis_modules.myglobals as gbl
# Directory where the output files are stored
OUT_DIR = "out/"
......@@ -185,8 +188,7 @@ def save_table_analysis(obsid, analysed_variables, analysed_averages,
result_table.write(OUT_DIR + RESULT_FILE)
def save_table_best(obsid, chi2, chi2_red, norm,
variables, fluxes, filters, sed):
def save_table_best(obsid, chi2, chi2_red, norm, variables, fluxes, filters):
"""Save the values corresponding to the best fit
Parameters
......@@ -205,21 +207,16 @@ def save_table_best(obsid, chi2, chi2_red, norm,
Fluxes in all bands for each object
filters: list
Filters used to compute the fluxes
sed: SED object
Used to identify what to output in the table
"""
best_model_table = Table()
best_model_table.add_column(Column(obsid.data, name="observation_id"))
best_model_table.add_column(Column(chi2, name="chi_square"))
best_model_table.add_column(Column(chi2_red, name="reduced_chi_square"))
if sed.sfh is not None:
best_model_table.add_column(Column(norm, name="galaxy_mass",
unit="Msun"))
for index, name in enumerate(sed.info.keys()):
for index, name in enumerate(gbl.info_keys):
column = Column([variable[index] for variable in variables], name=name)
if name in sed.mass_proportional_info:
if name in gbl.mass_proportional_info:
column *= norm
best_model_table.add_column(column)
......@@ -228,3 +225,14 @@ def save_table_best(obsid, chi2, chi2_red, norm,
best_model_table.add_column(column)
best_model_table.write(OUT_DIR + BEST_MODEL_FILE)
def backup_dir(directory):
if os.path.exists(directory):
new_name = datetime.now().strftime("%Y%m%d%H%M") + "_" + directory
os.rename(directory, new_name)
print("The existing {} directory was renamed to {}".format(
OUT_DIR,
new_name
))
os.mkdir(OUT_DIR)
......@@ -7,30 +7,21 @@
import numpy as np
from .utils import save_best_sed, save_pdf, save_chi2
import pcigale.analysis_modules.myglobals as gbl
# Probability threshold: models with a lower probability are excluded from
# the moments computation.
MIN_PROBABILITY = 1e-20
def sed(warehouse, creation_modules, model_params, analysed_variables,
filters):
def sed(model_params, changed):
"""Worker process to retrieve a SED and return the relevant data
Parameters
----------
warehouse: SedWarhouse object
Used to retrieve a SED. Ideally a different warehouse should be used
for each forked process but it does not seem to cause any problem so
far
creation_modules: list
List of creation modules to build the SED
model_params: list
Parameters of the creation modules
analysed_variables: list
Names of the analysed variables
filters: list
Filters to compute the SED fluxes
Returns
-------
......@@ -43,57 +34,46 @@ def sed(warehouse, creation_modules, model_params, analysed_variables,
the list returned by starmap anyway.
"""
sed = warehouse.get_sed(creation_modules, model_params)
sed = gbl.warehouse.get_sed(gbl.creation_modules, model_params)
gbl.warehouse.partial_clear_cache(changed)
if 'age' in sed.info and sed.info['age'] > sed.info['universe.age']:
model_fluxes = -99. * np.ones(len(filters))
model_variables = -99. * np.ones(len(analysed_variables))
model_fluxes = -99. * np.ones(len(gbl.filters))
model_variables = -99. * np.ones(len(gbl.analysed_variables))
else:
model_fluxes = np.array([sed.compute_fnu(filter_.trans_table,
filter_.effective_wavelength)
for filter_ in filters.values()])
for filter_ in gbl.filters.values()])
model_variables = np.array([sed.info[name]
for name in analysed_variables])
for name in gbl.analysed_variables])
redshift = sed.info['redshift']
info = sed.info.values()
return model_fluxes, model_variables, redshift, info
def analysis(obs, model_fluxes, model_variables, info, filters, sed,
analysed_variables, creation_modules, creation_modules_params,
save):
def analysis(obs):
"""Worker process to analyse the PDF and estimate parameters values
Parameters
----------
obs: row
Input data for an individual object
model_fluxes: 2D array
Fluxes for each model and for each filter
model_variables: 2D array
Variables values for each model
info: list
sed.info for each model
filters: list
Filters to compute the fluxes
sed: SED object
Used to retrieve which parameters are normalisation dependent
analysed_variables: list
Names of analysed variables
creation_modules: list
Creation modules named to recreate the best SED
creation_modules_params: list
Creation modules parameters to recreate the best SED
save: set
Booleans indicating whether to save the best SED, best χ² and PDF
Returns
-------
The analysed parameters (values+errors), best raw and reduced χ², best
normalisation factor, info of the best SED, fluxes of the best SED
"""
obs_fluxes = np.array([obs[name] for name in filters])
obs_errors = np.array([obs[name + "_err"] for name in filters])
w = np.where(gbl.w_redshifts[gbl.redshifts[np.abs(obs['redshift'] -
gbl.redshifts).argmin()]])
model_fluxes = gbl.model_fluxes[w[0], :]
model_variables = gbl.model_variables[w[0], :]
obs_fluxes = np.array([obs[name] for name in gbl.filters])
obs_errors = np.array([obs[name + "_err"] for name in gbl.filters])
# Some observations may not have flux value in some filters, in
# that case the user is asked to put -9999 as value. We mask these
......@@ -136,16 +116,10 @@ def analysis(obs, model_fluxes, model_variables, info, filters, sed,
# We take the mass-dependent variable list from the last computed
# sed.
for index, variable in enumerate(analysed_variables):
if variable in sed.mass_proportional_info:
for index, variable in enumerate(gbl.analysed_variables):
if variable in gbl.mass_proportional_info:
model_variables[:, index] *= norm_facts
# We also add the galaxy mass to the analysed variables if relevant
if sed.sfh is not None:
analysed_variables.insert(0, "galaxy_mass")
model_variables = np.dstack((norm_facts,
model_variables))
##################################################################
# Variable analysis #
##################################################################
......@@ -154,7 +128,8 @@ def analysis(obs, model_fluxes, model_variables, info, filters, sed,
# likelihood as weight. We first build the weight array by
# expanding the likelihood along a new axis corresponding to the
# analysed variable.
weights = likelihood[:, np.newaxis].repeat(len(analysed_variables), axis=1)
weights = likelihood[:, np.newaxis].repeat(len(gbl.analysed_variables),
axis=1)
# Analysed variables average and standard deviation arrays.
analysed_averages = np.ma.average(model_variables, axis=0,
......@@ -168,19 +143,20 @@ def analysis(obs, model_fluxes, model_variables, info, filters, sed,
# with the least χ².
best_index = chi2_.argmin()
if save[0]:
save_best_sed(obs['id'], creation_modules,
creation_modules_params[best_index],
if gbl.save_best_sed:
save_best_sed(obs['id'], gbl.creation_modules,
gbl.creation_modules_params[best_index],
norm_facts[best_index])
if save[1]:
save_chi2(obs['id'], analysed_variables, model_variables, chi2_red)
if save[2]:
save_pdf(obs['id'], analysed_variables, model_variables, likelihood)
if gbl.save_chi2:
save_chi2(obs['id'], gbl.analysed_variables, model_variables, chi2_red)
if gbl.save_pdf:
save_pdf(obs['id'], gbl.analysed_variables, model_variables,
likelihood)
return (analysed_averages,
analysed_std,
chi2_[best_index],
chi2_red[best_index],
np.array(model_fluxes[best_index, :]), # do NOT remove np.array()
list(gbl.model_info[w][best_index]),
norm_facts[best_index],
list(info[best_index]),
model_fluxes[best_index, :])
chi2_[best_index],
chi2_red[best_index])
......@@ -24,9 +24,9 @@ from astropy.table import Table
from . import AnalysisModule
from ..warehouse import SedWarehouse
from ..data import Database
from .utils import find_changed_parameters
def _worker_sed(warehouse, filters, modules, parameters):
def _worker_sed(warehouse, filters, modules, parameters, changed):
"""Internal function to parallelize the computation of fluxes.
Parameters
......@@ -45,6 +45,7 @@ def _worker_sed(warehouse, filters, modules, parameters):
"""
sed = warehouse.get_sed(modules, parameters)
warehouse.partial_clear_cache(changed)
row = []
......@@ -147,11 +148,13 @@ class SaveFluxes(AnalysisModule):
# Parallel computation of the fluxes
with SedWarehouse(cache_type=parameters["storage_type"]) as warehouse,\
mp.Pool(processes=cores) as pool:
changed_pars = find_changed_parameters(creation_modules_params)
out_rows = pool.starmap(_worker_sed,
zip(repeat(warehouse),
repeat(filter_list),
repeat(creation_modules),
creation_modules_params))
<