Commit 3a44b65a authored by Médéric Boquien's avatar Médéric Boquien

Make pdf_analysis parallel. This required important changes. Functions saving...

Make pdf_analysis parallel. This required important changes. Functions saving data have been moved to the utils.py file. Workers for the parallel processes have been put in workers.py. To make sure we eliminate models incompatible with the age of the universe now the latter is indicated in sed.info for easy access. Ditto for the redshift.
parent 899b0d44
......@@ -42,9 +42,11 @@ def run(config):
analysis_module = get_analysis_module(config.configuration[
'analysis_method'])
analysis_module_params = config.configuration['analysis_method_params']
cores = config.configuration['cores']
analysis_module.process(data_file, column_list, creation_modules,
creation_modules_params, analysis_module_params)
creation_modules_params, analysis_module_params,
cores)
def main():
......
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2014 Yannick Roehlly <yannick@iaora.eu>
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
# Author: Yannick Roehlly
# Author: Yannick Roehlly & Médéric Boquien
from astropy.table import Table, Column
import numpy as np
from scipy.stats import gaussian_kde
from scipy.linalg import LinAlgError
from ...warehouse import SedWarehouse
# Directory where the output files are stored
OUT_DIR = "out/"
# Number of points in the PDF
PDF_NB_POINTS = 1000
# Name of the file containing the analysis results
RESULT_FILE = "analysis_results.fits"
# Name of the file containing the best models information
BEST_MODEL_FILE = "best_models.fits"
def gen_pdf(values, probabilities, grid):
......@@ -61,3 +73,158 @@ def gen_pdf(values, probabilities, grid):
result = None
return result
def save_best_sed(obsid, creation_modules, params, norm):
"""Save the best SED to a VO table.
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
creation_modules: list
List of modules used to generate the SED
params: list
List of parameters to generate the SED
norm: float
Normalisation factor to scale the scale to the observations
"""
with SedWarehouse(cache_type="memory") as sed_warehouse:
sed = sed_warehouse.get_sed(creation_modules, params)
sed.to_votable(OUT_DIR + "{}_best_model.xml".format(obsid), mass=norm)
def save_pdf(obsid, analysed_variables, model_variables, likelihood):
"""Save the PDF to a FITS file
We estimate the probability density functions (PDF) of the parameters using
a weighted kernel density estimation. This part should definitely be
improved as we simulate the weight by adding as many value as their
probability * 100.
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
analysed_variables: list
Analysed variables names
model_variables: 2D array
Analysed variables values for all models
likelihood: array
Likelihood for all models
"""
for var_index, var_name in enumerate(analysed_variables):
values = model_variables[:, var_index]
pdf_grid = np.linspace(values.min(), values.max(), PDF_NB_POINTS)
pdf_prob = gen_pdf(values, likelihood, pdf_grid)
if pdf_prob is None:
# TODO: use logging
print("Can not compute PDF for observation <{}> and "
"variable <{}>.".format(obsid, var_name))
else:
table = Table((
Column(pdf_grid, name=var_name),
Column(pdf_prob, name="probability density")
))
table.write(OUT_DIR + "{}_{}_pdf.fits".format(obsid, var_name))
def save_chi2(obsid, analysed_variables, model_variables, reduced_chi2):
"""Save the best reduced χ² versus the analysed variables
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
analysed_variables: list
Analysed variable names
model_variables: 2D array
Analysed variables values for all models
reduced_chi2:
Reduced χ²
"""
for var_index, var_name in enumerate(analysed_variables):
table = Table((
Column(model_variables[:, var_index],
name=var_name),
Column(reduced_chi2, name="chi2")))
table.write(OUT_DIR + "{}_{}_chi2.fits".format(obsid, var_name))
def save_table_analysis(obsid, analysed_variables, analysed_averages,
analysed_std):
"""Save the estimated values derived from the analysis of the PDF
Parameters
----------
obsid: table column
Names of the objects
analysed_variables: list
Analysed variable names
analysed_averages: array
Analysed variables values estimates
analysed_std: array
Analysed variables errors estimates
"""
result_table = Table()
result_table.add_column(Column(obsid.data, name="observation_id"))
for index, variable in enumerate(analysed_variables):
result_table.add_column(Column(
analysed_averages[:, index],
name=variable
))
result_table.add_column(Column(
analysed_std[:, index],
name=variable+"_err"
))
result_table.write(OUT_DIR + RESULT_FILE)
def save_table_best(obsid, chi2, chi2_red, norm,
variables, fluxes, filters, sed):
"""Save the values corresponding to the best fit
Parameters
----------
obsid: table column
Names of the objects
chi2: array
Best χ² for each object
chi2_red: array
Best reduced χ² for each object
norm: array
Normalisation factor for each object
variables: list
All variables corresponding to a SED
fluxes: 2D array
Fluxes in all bands for each object
filters: list
Filters used to compute the fluxes
sed: SED object
Used to identify what to output in the table
"""
best_model_table = Table()
best_model_table.add_column(Column(obsid.data, name="observation_id"))
best_model_table.add_column(Column(chi2, name="chi_square"))
best_model_table.add_column(Column(chi2_red, name="reduced_chi_square"))
if sed.sfh is not None:
best_model_table.add_column(Column(norm, name="galaxy_mass",
unit="Msun"))
for index, name in enumerate(sed.info.keys()):
column = Column([variable[index] for variable in variables], name=name)
if name in sed.mass_proportional_info:
column *= norm
best_model_table.add_column(column)
for index, name in enumerate(filters):
column = Column(fluxes[:, index] * norm, name=name, unit='mJy')
best_model_table.add_column(column)
best_model_table.write(OUT_DIR + BEST_MODEL_FILE)
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2014 Yannick Roehlly <yannick@iaora.eu>
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
# Author: Yannick Roehlly & Médéric Boquien
import numpy as np
from .utils import save_best_sed, save_pdf, save_chi2
# Probability threshold: models with a lower probability are excluded from
# the moments computation.
MIN_PROBABILITY = 1e-20
def sed(warehouse, creation_modules, model_params, analysed_variables,
filters):
"""Worker process to retrieve a SED and return the relevant data
Parameters
----------
warehouse: SedWarhouse object
Used to retrieve a SED. Ideally a different warehouse should be used
for each forked process but it does not seem to cause any problem so
far
creation_modules: list
List of creation modules to build the SED
model_params: list
Parameters of the creation modules
analysed_variables: list
Names of the analysed variables
filters: list
Filters to compute the SED fluxes
Returns
-------
The model fluxes in each band, the values of the analysed variables, the
redshift and all the related information. Note, we could retrieve the
values of the analysed variables afterwards but this would be done in the
main process. We should benchmark this to see whether it makes any
significant difference as returning onlt the sed.info would make things
cleaner here and no more dirty on the caller's side (as we have to unpack
the list returned by starmap anyway.
"""
sed = warehouse.get_sed(creation_modules, model_params)
if 'age' in sed.info and sed.info['age'] > sed.info['universe.age']:
model_fluxes = -99. * np.ones(len(filters))
model_variables = -99. * np.ones(len(analysed_variables))
else:
model_fluxes = np.array([sed.compute_fnu(filter_.trans_table,
filter_.effective_wavelength)
for filter_ in filters.values()])
model_variables = np.array([sed.info[name]
for name in analysed_variables])
redshift = sed.info['redshift']
info = sed.info.values()
return model_fluxes, model_variables, redshift, info
def analysis(obs, model_fluxes, model_variables, info, filters, sed,
analysed_variables, creation_modules, creation_modules_params,
save):
"""Worker process to analyse the PDF and estimate parameters values
Parameters
----------
obs: row
Input data for an individual object
model_fluxes: 2D array
Fluxes for each model and for each filter
model_variables: 2D array
Variables values for each model
info: list
sed.info for each model
filters: list
Filters to compute the fluxes
sed: SED object
Used to retrieve which parameters are normalisation dependent
analysed_variables: list
Names of analysed variables
creation_modules: list
Creation modules named to recreate the best SED
creation_modules_params: list
Creation modules parameters to recreate the best SED
save: set
Booleans indicating whether to save the best SED, best χ² and PDF
Returns
-------
The analysed parameters (values+errors), best raw and reduced χ², best
normalisation factor, info of the best SED, fluxes of the best SED
"""
obs_fluxes = np.array([obs[name] for name in filters])
obs_errors = np.array([obs[name + "_err"] for name in filters])
# Some observations may not have flux value in some filters, in
# that case the user is asked to put -9999 as value. We mask these
# values. Note, we must mask obs_fluxes after obs_errors.
obs_errors = np.ma.masked_where(obs_fluxes < -9990., obs_errors)
obs_fluxes = np.ma.masked_less(obs_fluxes, -9990.)
# Normalisation factor to be applied to a model fluxes to best fit
# an observation fluxes. Normalised flux of the models. χ² and
# likelihood of the fitting. Reduced χ² (divided by the number of
# filters to do the fit).
norm_facts = (
np.sum(model_fluxes * obs_fluxes / (obs_errors * obs_errors), axis=1) /
np.sum(model_fluxes * model_fluxes / (obs_errors * obs_errors), axis=1)
)
norm_model_fluxes = model_fluxes * norm_facts[:, np.newaxis]
# χ² of the comparison of each model to each observation.
chi2_ = np.sum(
np.square((obs_fluxes - norm_model_fluxes) / obs_errors),
axis=1)
# We define the reduced χ² as the χ² divided by the number of
# fluxes used for the fitting.
chi2_red = chi2_ / obs_fluxes.count()
# We use the exponential probability associated with the χ² as
# likelihood function.
# WARNING: shouldn't this be chi2_red rather?
likelihood = np.exp(-chi2_/2)
# For the analysis, we consider that the computed models explain
# each observation. We normalise the likelihood function to have a
# total likelihood of 1 for each observation.
likelihood /= np.sum(likelihood)
# We don't want to take into account the models with a probability
# less that the threshold.
likelihood = np.ma.masked_less(likelihood, MIN_PROBABILITY)
# We re-normalise the likelihood.
likelihood /= np.sum(likelihood)
# We take the mass-dependent variable list from the last computed
# sed.
for index, variable in enumerate(analysed_variables):
if variable in sed.mass_proportional_info:
model_variables[:, index] *= norm_facts
# We also add the galaxy mass to the analysed variables if relevant
if sed.sfh is not None:
analysed_variables.insert(0, "galaxy_mass")
model_variables = np.dstack((norm_facts,
model_variables))
##################################################################
# Variable analysis #
##################################################################
# We compute the weighted average and standard deviation using the
# likelihood as weight. We first build the weight array by
# expanding the likelihood along a new axis corresponding to the
# analysed variable.
weights = likelihood[:, np.newaxis].repeat(len(analysed_variables), axis=1)
# Analysed variables average and standard deviation arrays.
analysed_averages = np.ma.average(model_variables, axis=0,
weights=weights)
analysed_std = np.ma.sqrt(np.ma.average(
(model_variables - analysed_averages[np.newaxis, :])**2, axis=0,
weights=weights))
# We define the best fitting model for each observation as the one
# with the least χ².
best_index = chi2_.argmin()
if save[0]:
save_best_sed(obs['id'], creation_modules,
creation_modules_params[best_index],
norm_facts[best_index])
if save[1]:
save_chi2(obs['id'], analysed_variables, model_variables, chi2_red)
if save[2]:
save_pdf(obs['id'], analysed_variables, model_variables, likelihood)
return (analysed_averages,
analysed_std,
chi2_[best_index],
chi2_red[best_index],
norm_facts[best_index],
list(info[best_index]),
model_fluxes[best_index, :])
......@@ -20,6 +20,7 @@ is changed, this module may need to be adapted.
from collections import OrderedDict
from ..creation_modules import CreationModule
from pcigale.sed.cosmology import cosmology
class Redshifting(CreationModule):
......@@ -38,6 +39,12 @@ class Redshifting(CreationModule):
))
])
def _init_code(self):
"""Compute the age of the Universe at a given redshift
"""
self.redshift = float(self.parameters["redshift"])
self.universe_age = cosmology.age(self.redshift).value * 1000.
def process(self, sed):
"""Redshift the SED
......@@ -46,8 +53,6 @@ class Redshifting(CreationModule):
sed : pcigale.sed.SED object
"""
redshift = float(self.parameters["redshift"])
# If the SED is already redshifted, raise an error.
if 'redshift' in sed.info.keys() > 0:
raise Exception("The SED is already redshifted <z={}>."
......@@ -55,17 +60,18 @@ class Redshifting(CreationModule):
# Raise an error when applying a negative redshift. This module is
# not for blue-shifting.
if redshift < 0:
if self.redshift < 0:
raise Exception("The redshift provided is negative <{}>."
.format(redshift))
.format(self.redshift))
# We redshift directly the SED wavelength grid
sed.wavelength_grid *= 1. + redshift
sed.wavelength_grid *= 1. + self.redshift
# We modify each luminosity contribution to keep energy constant
sed.luminosities /= 1. + redshift
sed.luminosities /= 1. + self.redshift
sed.add_info("redshift", redshift)
sed.add_info("redshift", self.redshift)
sed.add_info("universe.age", self.universe_age)
sed.add_module(self.name, self.parameters)
# CreationModule to be returned by get_module
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment