Commit 3160d9d4 authored by Médéric Boquien's avatar Médéric Boquien

Get rid of the parameters array. Rather we generate parameters on-on-fly. This...

Get rid of the parameters array. Rather we generate parameters on-on-fly. This is done using a new class ParameterHandlers, which handles the generation of parameters from a given 1D index. While at it, also do away with global variables. This allows to spawn processes rather than fork them. Even if it takes a little time to do the spawn, this allows to solves some deadlocks with libraries which do not support being used on both sides of a forked process. Overall these modifications makes startup faster (no need to compute the cache cleaning parameters beforehand) and the generation speed is also a few percent faster when the number of models is large enough to compensate for the relatively slow spaning time.
parent e50ff0e1
......@@ -6,8 +6,12 @@
__version__ = "0.1-alpha"
import argparse
import multiprocessing as mp
import sys
from .session.configuration import Configuration
from .analysis_modules import get_module as get_analysis_module
from .analysis_modules.utils import ParametersHandler
def init(config):
......@@ -30,7 +34,10 @@ def check(config):
# TODO: Check if all the parameters that don't have default values are
# given for each module.
print("With this configuration, pcigale must compute {} "
"SEDs.".format(len(config.creation_modules_conf_array)))
"SEDs.".format(ParametersHandler(
config.configuration['creation_modules'],
config.configuration['creation_modules_params']
).size))
def run(config):
......@@ -38,7 +45,7 @@ def run(config):
data_file = config.configuration['data_file']
column_list = config.configuration['column_list']
creation_modules = config.configuration['creation_modules']
creation_modules_params = config.creation_modules_conf_array
creation_modules_params = config.configuration['creation_modules_params']
analysis_module = get_analysis_module(config.configuration[
'analysis_method'])
analysis_module_params = config.configuration['analysis_method_params']
......@@ -50,6 +57,16 @@ def run(config):
def main():
# We set the sub processes start method to spawn because it solves
# deadlocks when a library cannot handle being used on two sides of a
# forked process. This happens on modern Macs with the Accelerate library
# for instance. Unfortunately this only comes with python≥3.4. People using
# older versions should upgrade if they encounter deadlocks.
if sys.version_info[:2] >= (3, 4):
mp.set_start_method('spawn')
else:
print("Could not set the multiprocessing start method to spawn. If "
"you encounter a deadlock, please upgrade to Python≥3.4.")
parser = argparse.ArgumentParser()
......
# This is a dummy module to store the data that need to be shared between all
# processes. Note that they are read-only in subprocesses in the sense that the
# changes will not be propagated outside of the child processes.
......@@ -41,8 +41,7 @@ from .workers import sed as worker_sed
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
from .workers import analysis as worker_analysis
from ..utils import find_changed_parameters
import pcigale.analysis_modules.myglobals as gbl
from ..utils import ParametersHandler
# Tolerance threshold under which any flux or error is considered as 0.
TOLERANCE = 1e-12
......@@ -87,7 +86,7 @@ class PdfAnalysis(AnalysisModule):
])
def process(self, data_file, column_list, creation_modules,
creation_modules_params, parameters, cores):
creation_modules_params, config, cores):
"""Process with the psum analysis.
The analysis is done in two steps which can both run on multiple
......@@ -107,8 +106,8 @@ class PdfAnalysis(AnalysisModule):
the SEDs.
creation_modules_params: list of dictionaries
List of the parameter dictionaries for each module.
parameters: dictionary
Dictionary containing the parameters.
config: dictionary
Dictionary containing the configuration.
core: integer
Number of cores to run the analysis on
......@@ -119,51 +118,37 @@ class PdfAnalysis(AnalysisModule):
# Rename the output directory if it exists
backup_dir(OUT_DIR)
# Get the parameters. They are stored in a shared module so that this
# can be accessed by subprocesses during the model generation and
# analysis. This avoids transmitting the exact same data all over again
gbl.analysed_variables = parameters["analysed_variables"]
gbl.creation_modules = creation_modules
gbl.creation_modules_params = creation_modules_params
gbl.save_best_sed = parameters["save_best_sed"].lower() == "true"
gbl.save_chi2 = parameters["save_chi2"].lower() == "true"
gbl.save_pdf = parameters["save_pdf"].lower() == "true"
gbl.n_models = len(creation_modules_params)
gbl.n_variables = len(gbl.analysed_variables)
# Initalise variables from input arguments.
analysed_variables = config["analysed_variables"]
n_variables = len(analysed_variables)
save = {key:"save_{}".format(key).lower() == "true"
for key in ["best_sed", "chi2", "pdf"]}
# The parameters handler allows us to retrieve the models parameters
# from a 1D index. This is useful in that we do not have to create
# a list of parameters as they are computed on-the-fly. It also has
# nice goodies such as finding the index of the first parameter to
# have changed between two indices or the number of models.
params = ParametersHandler(creation_modules, creation_modules_params)
n_params = params.size
# Get the needed filters in the pcigale database. We use an ordered
# dictionary because we need the keys to always be returned in the
# same order. We also put the filters in the shared modules as they
# are needed to compute the fluxes during the models generation.
with Database() as base:
gbl.filters = OrderedDict([(name, base.get_filter(name))
for name in column_list
if not name.endswith('_err')])
gbl.n_filters = len(gbl.filters)
filters = OrderedDict([(name, base.get_filter(name))
for name in column_list
if not name.endswith('_err')])
n_filters = len(filters)
# Read the observation table and complete it by adding error where
# none is provided and by adding the systematic deviation.
obs_table = complete_obs_table(read_table(data_file), column_list,
gbl.filters, TOLERANCE)
gbl.n_obs = len(obs_table)
filters, TOLERANCE)
n_obs = len(obs_table)
print("Computing the models fluxes...")
# First we get a dummy sed to obtain some information on the
# parameters, such as which ones are dependent on the normalisation
# factor of the fit.
# The SED warehouse is used to retrieve SED corresponding to some
# modules and parameters.
gbl.warehouse = SedWarehouse(cache_type=parameters["storage_type"])
sed = gbl.warehouse.get_sed(gbl.creation_modules,
creation_modules_params[0])
gbl.info_keys = list(sed.info.keys())
gbl.n_keys = len(gbl.info_keys)
gbl.mass_proportional_info = sed.mass_proportional_info
gbl.has_sfh = sed.sfh is not None
# Arrays where we store the data related to the models. For memory
# efficiency reasons, we use RawArrays that will be passed in argument
# to the pool. Each worker will fill a part of the RawArrays. It is
......@@ -171,73 +156,68 @@ class PdfAnalysis(AnalysisModule):
# not write on the same section.
# We put the shape in a tuple along with the RawArray because workers
# need to know the shape to create the numpy array from the RawArray.
model_redshifts = (RawArray(ctypes.c_double, gbl.n_models),
(gbl.n_models))
model_redshifts = (RawArray(ctypes.c_double, n_params),
(n_params))
model_fluxes = (RawArray(ctypes.c_double,
gbl.n_models * gbl.n_filters),
(gbl.n_models, gbl.n_filters))
n_params * n_filters),
(n_params, n_filters))
model_variables = (RawArray(ctypes.c_double,
gbl.n_models * gbl.n_variables),
(gbl.n_models, gbl.n_variables))
n_params * n_variables),
(n_params, n_variables))
# In order not to clog the warehouse memory with SEDs that will not be
# used again during the SED generation, we identify which parameter has
# changed, invalidating models with this parameter
changed_pars = find_changed_parameters(creation_modules_params)
# Compute the SED fluxes and ancillary data in parallel
initargs = (model_redshifts, model_fluxes, model_variables,
mp.Value('i', 0), time.time())
initargs = (params, filters, analysed_variables, model_redshifts,
model_fluxes, model_variables, time.time(),
mp.Value('i', 0))
if cores == 1: # Do not create a new process
init_worker_sed(*initargs)
for changed_par, idx in zip(changed_pars, range(gbl.n_models)):
worker_sed(changed_par, idx)
else:
for idx in range(n_params):
worker_sed(idx)
else: # Analyse observations in parallel
with mp.Pool(processes=cores, initializer=init_worker_sed,
initargs=initargs) as pool:
pool.starmap(worker_sed, zip(changed_pars,
range(gbl.n_models)))
pool.map(worker_sed, range(n_params))
print('\nAnalysing models...')
# Analysis of each object in parallel.
initargs = (model_redshifts, model_fluxes, model_variables,
mp.Value('i', 0), time.time())
initargs = (params, filters, analysed_variables, model_redshifts,
model_fluxes, model_variables, time.time(),
mp.Value('i', 0), save, n_obs)
if cores == 1: # Do not create a new process
init_worker_analysis(*initargs)
items = []
for obs in obs_table:
items.append(worker_analysis(obs))
else:
else: # Analyse observations in parallel
with mp.Pool(processes=cores, initializer=init_worker_analysis,
initargs=initargs) as pool:
items = pool.starmap(worker_analysis, zip(obs_table))
# Local arrays where to unpack the results of the analysis
analysed_averages = np.empty((gbl.n_obs, gbl.n_variables))
analysed_averages = np.empty((n_obs, n_variables))
analysed_std = np.empty_like(analysed_averages)
best_fluxes = np.empty((gbl.n_obs, gbl.n_filters))
best_parameters = [None] * gbl.n_obs
best_normalisation_factors = np.empty(gbl.n_obs)
best_chi2 = np.empty_like(best_normalisation_factors)
best_chi2_red = np.empty_like(best_normalisation_factors)
best_fluxes = np.empty((n_obs, n_filters))
best_parameters = [None] * n_obs
best_chi2 = np.empty(n_obs)
best_chi2_red = np.empty_like(best_chi2)
for item_idx, item in enumerate(items):
analysed_averages[item_idx, :] = item[0]
analysed_std[item_idx, :] = item[1]
best_fluxes[item_idx, :] = item[2]
best_parameters[item_idx] = item[3]
best_normalisation_factors[item_idx] = item[4]
best_chi2[item_idx] = item[5]
best_chi2_red[item_idx] = item[6]
best_chi2[item_idx] = item[4]
best_chi2_red[item_idx] = item[5]
print("\nSaving results...")
save_table_analysis(obs_table['id'], gbl.analysed_variables,
save_table_analysis(obs_table['id'], analysed_variables,
analysed_averages, analysed_std)
# Retrieve an arbitrary SED to obtain the list of output parameters
warehouse = SedWarehouse(cache_type=config["storage_type"])
sed = warehouse.get_sed(creation_modules, params.from_index(0))
save_table_best(obs_table['id'], best_chi2, best_chi2_red,
best_normalisation_factors, best_parameters,
best_fluxes, gbl.filters)
best_parameters, best_fluxes, filters, sed.info)
print("Run completed!")
......
......@@ -11,8 +11,6 @@ from astropy.table import Table, Column
import numpy as np
from scipy.stats import gaussian_kde
from scipy.linalg import LinAlgError
from ...warehouse import SedWarehouse
import pcigale.analysis_modules.myglobals as gbl
# Directory where the output files are stored
OUT_DIR = "out/"
......@@ -132,7 +130,7 @@ def save_pdf(obsid, analysed_variables, model_variables, likelihood):
def save_chi2(obsid, analysed_variables, model_variables, reduced_chi2):
"""Save the best reduced χ² versus the analysed variables
"""Save the best reduced Dz versus the analysed variables
Parameters
----------
......@@ -143,7 +141,7 @@ def save_chi2(obsid, analysed_variables, model_variables, reduced_chi2):
model_variables: 2D array
Analysed variables values for all models
reduced_chi2:
Reduced χ²
Reduced Dz
"""
for var_index, var_name in enumerate(analysed_variables):
......@@ -184,7 +182,7 @@ def save_table_analysis(obsid, analysed_variables, analysed_averages,
result_table.write(OUT_DIR + RESULT_FILE)
def save_table_best(obsid, chi2, chi2_red, norm, variables, fluxes, filters):
def save_table_best(obsid, chi2, chi2_red, variables, fluxes, filters, info_keys):
"""Save the values corresponding to the best fit
Parameters
......@@ -192,11 +190,9 @@ def save_table_best(obsid, chi2, chi2_red, norm, variables, fluxes, filters):
obsid: table column
Names of the objects
chi2: array
Best χ² for each object
Best Dz for each object
chi2_red: array
Best reduced χ² for each object
norm: array
Normalisation factor for each object
Best reduced Dz for each object
variables: list
All variables corresponding to a SED
fluxes: 2D array
......@@ -210,14 +206,12 @@ def save_table_best(obsid, chi2, chi2_red, norm, variables, fluxes, filters):
best_model_table.add_column(Column(chi2, name="chi_square"))
best_model_table.add_column(Column(chi2_red, name="reduced_chi_square"))
for index, name in enumerate(gbl.info_keys):
for index, name in enumerate(info_keys):
column = Column([variable[index] for variable in variables], name=name)
if name in gbl.mass_proportional_info:
column *= norm
best_model_table.add_column(column)
for index, name in enumerate(filters):
column = Column(fluxes[:, index] * norm, name=name, unit='mJy')
column = Column(fluxes[:, index], name=name, unit='mJy')
best_model_table.add_column(column)
best_model_table.write(OUT_DIR + BEST_MODEL_FILE)
......
......@@ -10,20 +10,27 @@ import time
import numpy as np
from .utils import save_best_sed, save_pdf, save_chi2
import pcigale.analysis_modules.myglobals as gbl
from ...warehouse import SedWarehouse
# Probability threshold: models with a lower probability are excluded from
# the moments computation.
MIN_PROBABILITY = 1e-20
def init_sed(redshifts, fluxes, variables, n_computed, t_begin):
def init_sed(params, filters, analysed, redshifts, fluxes, variables,
t_begin, n_computed):
"""Initializer of the pool of processes. It is mostly used to convert
RawArrays into numpy arrays. The latter are defined as global variables to
be accessible from the workers.
Parameters
----------
params: ParametersHandler
Handles the parameters from a 1D index.
filters: OrderedDict
Contains filters to compute the fluxes.
analysed: list
Variable names to be analysed.
redshifts: RawArray and tuple containing the shape
Redshifts of individual models. Shared among workers.
fluxes: RawArray and tuple containing the shape
......@@ -37,7 +44,8 @@ def init_sed(redshifts, fluxes, variables, n_computed, t_begin):
"""
global gbl_model_redshifts, gbl_model_fluxes, gbl_model_variables
global gbl_n_computed, gbl_t_begin
global gbl_n_computed, gbl_t_begin, gbl_params, gbl_previous_idx
global gbl_filters, gbl_analysed_variables, gbl_warehouse
gbl_model_redshifts = np.ctypeslib.as_array(redshifts[0])
......@@ -50,69 +58,85 @@ def init_sed(redshifts, fluxes, variables, n_computed, t_begin):
gbl_n_computed = n_computed
gbl_t_begin = t_begin
gbl_params = params
def init_analysis(redshifts, fluxes, variables, n_computed, t_begin):
gbl_previous_idx = -1
gbl_filters = filters
gbl_analysed_variables = analysed
gbl_warehouse = SedWarehouse(cache_type="memory")
def init_analysis(params, filters, analysed, redshifts, fluxes, variables,
t_begin, n_computed, save, n_obs):
"""Initializer of the pool of processes. It is mostly used to convert
RawArrays into numpy arrays. The latter are defined as global variables to
be accessible from the workers.
Parameters
----------
redshifts: RawArray and tuple containing the shape
params: ParametersHandler
Handles the parameters from a 1D index.
filters: OrderedDict
Contains filters to compute the fluxes.
analysed: list
Variable names to be analysed
redshifts: RawArray and tuple containing the shape.
Redshifts of individual models. Shared among workers.
fluxes: RawArray
Fluxes of individual models. Shared among workers.
variables: RawArray and tuple containing the shape
Values of the analysed variables. Shared among workers.
t_begin: float
Time of the beginning of the computation.
n_computed: Value
Number of computed models. Shared among workers.
t_begin: float
Time of the beginning of the computation
save: dictionary
Contains booleans indicating whether we need to save some data related
to given models.
n_obs: int
Number of observations.
"""
init_sed(redshifts, fluxes, variables, n_computed, t_begin)
global gbl_redshifts, gbl_w_redshifts
init_sed(params, filters, analysed, redshifts, fluxes, variables,
t_begin, n_computed)
global gbl_redshifts, gbl_w_redshifts, gbl_save, gbl_n_obs
gbl_redshifts = np.unique(gbl_model_redshifts)
gbl_w_redshifts = {redshift: gbl_model_redshifts == redshift
for redshift in gbl_redshifts}
gbl_save = save
gbl_n_obs = n_obs
def sed(changed, idx):
"""Worker process to retrieve a SED and return the relevant data
def sed(idx):
"""Worker process to retrieve a SED and affect the relevant data to shared
RawArrays.
Parameters
----------
changed: array
Index of the module whose parameter has changed. This is necessary for
cache cleaning
idx: int
Index of the model to retrieve its parameters from the global variable
Returns
-------
The model fluxes in each band, the values of the analysed variables, the
redshift and all the related information. Note, we could retrieve the
values of the analysed variables afterwards but this would be done in the
main process. We should benchmark this to see whether it makes any
significant difference as returning onlt the sed.info would make things
cleaner here and no more dirty on the caller's side (as we have to unpack
the list returned by starmap anyway.
"""
sed = gbl.warehouse.get_sed(gbl.creation_modules,
gbl.creation_modules_params[idx])
gbl.warehouse.partial_clear_cache(changed)
global gbl_previous_idx
if gbl_previous_idx > -1:
gbl_warehouse.partial_clear_cache(
gbl_params.index_module_changed(gbl_previous_idx, idx))
gbl_previous_idx = idx
sed = gbl_warehouse.get_sed(gbl_params.modules,
gbl_params.from_index(idx))
if 'age' in sed.info and sed.info['age'] > sed.info['universe.age']:
model_fluxes = -99. * np.ones(len(gbl.filters))
model_variables = -99. * np.ones(len(gbl.analysed_variables))
model_fluxes = -99. * np.ones(len(gbl_filters))
model_variables = -99. * np.ones(len(gbl_analysed_variables))
else:
model_fluxes = np.array([sed.compute_fnu(filter_.trans_table,
filter_.effective_wavelength)
for filter_ in gbl.filters.values()])
for filter_ in gbl_filters.values()])
model_variables = np.array([sed.info[name]
for name in gbl.analysed_variables])
for name in gbl_analysed_variables])
gbl_model_redshifts[idx] = sed.info['redshift']
gbl_model_fluxes[idx, :] = model_fluxes
......@@ -121,10 +145,10 @@ def sed(changed, idx):
with gbl_n_computed.get_lock():
gbl_n_computed.value += 1
n_computed = gbl_n_computed.value
if n_computed % 100 == 0 or n_computed == gbl.n_models:
if n_computed % 100 == 0 or n_computed == gbl_params.size:
t_elapsed = time.time() - gbl_t_begin
print("{}/{} models computed in {} seconds ({} models/s)".
format(n_computed, gbl.n_models,
format(n_computed, gbl_params.size,
np.around(t_elapsed, decimals=1),
np.around(n_computed/t_elapsed, decimals=1)),
end="\r")
......@@ -150,8 +174,8 @@ def analysis(obs):
model_fluxes = np.ma.masked_less(gbl_model_fluxes[w[0], :], -90.)
model_variables = gbl_model_variables[w[0], :]
obs_fluxes = np.array([obs[name] for name in gbl.filters])
obs_errors = np.array([obs[name + "_err"] for name in gbl.filters])
obs_fluxes = np.array([obs[name] for name in gbl_filters])
obs_errors = np.array([obs[name + "_err"] for name in gbl_filters])
# Some observations may not have flux value in some filters, in
# that case the user is asked to put -9999 as value. We mask these
......@@ -187,62 +211,71 @@ def analysis(obs):
# We re-normalise the likelihood.
likelihood /= np.sum(likelihood)
# We take the mass-dependent variable list from the last computed
# sed.
for index, variable in enumerate(gbl.analysed_variables):
if variable in gbl.mass_proportional_info:
model_variables[:, index] *= norm_facts
##################################################################
# Variable analysis #
##################################################################
# We define the best fitting model for each observation as the one
# with the least χ².
best_index = chi2_.argmin()
# We compute once again the best sed to obtain its info
global gbl_previous_idx
if gbl_previous_idx > -1:
gbl_warehouse.partial_clear_cache(
gbl_params.index_module_changed(gbl_previous_idx,
w[0][best_index]))
gbl_previous_idx = w[0][best_index]
sed = gbl_warehouse.get_sed(gbl_params.modules,
gbl_params.from_index([w[0][best_index]]))
# We correct the mass-dependent parameters
for key in sed.mass_proportional_info:
sed.info[key] *= norm_facts[best_index]
# We compute the weighted average and standard deviation using the
# likelihood as weight. We first build the weight array by
# expanding the likelihood along a new axis corresponding to the
# analysed variable.
weights = likelihood[:, np.newaxis].repeat(len(gbl.analysed_variables),
weights = likelihood[:, np.newaxis].repeat(len(gbl_analysed_variables),
axis=1)
# Analysed variables average and standard deviation arrays.
analysed_averages = np.ma.average(model_variables, axis=0,
weights=weights)
analysed_std = np.ma.sqrt(np.ma.average(
(model_variables - analysed_averages[np.newaxis, :])**2, axis=0,
weights=weights))
# We define the best fitting model for each observation as the one
# with the least χ².
best_index = chi2_.argmin()
# We compute once again the best sed to obtain its info
sed = gbl.warehouse.get_sed(gbl.creation_modules,
gbl.creation_modules_params[w[0][best_index]])
# We correct the mass-dependent parameters
for index, variable in enumerate(gbl_analysed_variables):
if variable in sed.mass_proportional_info:
analysed_averages[index] *= norm_facts[best_index]
analysed_std[index] *= norm_facts[best_index]
if gbl.save_best_sed:
if gbl_save['best_sed']:
save_best_sed(obs['id'], sed, norm_facts[best_index])
if gbl.save_chi2:
save_chi2(obs['id'], gbl.analysed_variables, model_variables, chi2_ /
if gbl_save['chi2']:
save_chi2(obs['id'], gbl_analysed_variables, model_variables, chi2_ /
obs_fluxes.count())
if gbl.save_pdf:
save_pdf(obs['id'], gbl.analysed_variables, model_variables,
if gbl_save['pdf']:
save_pdf(obs['id'], gbl_analysed_variables, model_variables,
likelihood)
with gbl_n_computed.get_lock():
gbl_n_computed.value += 1
n_computed = gbl_n_computed.value
if n_computed % 100 == 0 or n_computed == gbl.n_obs:
if n_computed % 100 == 0 or n_computed == gbl_n_obs:
t_elapsed = time.time() - gbl_t_begin
print("{}/{} objects analysed in {} seconds ({} objects/s)".
format(n_computed, gbl.n_obs, np.around(t_elapsed, decimals=1),
format(n_computed, gbl_n_obs, np.around(t_elapsed, decimals=1),
np.around(n_computed/t_elapsed, decimals=1)),
end="\r")
return (analysed_averages,
analysed_std,
np.array(model_fluxes[best_index, :], copy=True),
np.array(norm_model_fluxes[best_index, :], copy=True),
np.array(list(sed.info.values()), copy=True),
norm_facts[best_index],
chi2_[best_index],
chi2_[best_index] / obs_fluxes.count())