# -*- coding: utf-8 -*-
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2013-2014 Yannick Roehlly
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
# Author: Yannick Roehlly & Médéric Boquien
"""
Probability Density Function analysis module
============================================
This module builds the probability density functions (PDF) of the SED
parameters to compute their moments.
The models corresponding to all possible combinations of parameters are
computed and their fluxes in the same filters as the observations are
integrated. These fluxes are compared to the observed ones to compute the
χ² value of the fitting. This χ² give a probability that is associated with
the model values for the parameters.
At the end, for each parameter, the probability-weighted mean and standard
deviation are computed and the best fitting model (the one with the least
reduced χ²) is given for each observation.
"""
import numpy as np
from collections import OrderedDict
import multiprocessing as mp
from ...utils import read_table
from .. import AnalysisModule, complete_obs_table
from .utils import save_table_analysis, save_table_best, backup_dir
from ...warehouse import SedWarehouse
from ...data import Database
from .workers import sed as worker_sed
from .workers import analysis as worker_analysis
from ..utils import find_changed_parameters
import pcigale.analysis_modules.myglobals as gbl
# Tolerance threshold under which any flux or error is considered as 0.
TOLERANCE = 1e-12
# Limit the redshift to this number of decimals
REDSHIFT_DECIMALS = 2
# Directory where the output files are stored
OUT_DIR = "out/"
class PdfAnalysis(AnalysisModule):
"""PDF analysis module"""
parameter_list = OrderedDict([
("analysed_variables", (
"array of strings",
"List of the variables (in the SEDs info dictionaries) for which "
"the statistical analysis will be done.",
["sfr", "average_sfr"]
)),
("save_best_sed", (
"boolean",
"If true, save the best SED for each observation to a file.",
False
)),
("save_chi2", (
"boolean",
"If true, for each observation and each analysed variable save "
"the reduced chi².",
False
)),
("save_pdf", (
"boolean",
"If true, for each observation and each analysed variable save "
"the probability density function.",
False
)),
("storage_type", (
"string",
"Type of storage used to cache the generate SED.",
"memory"
))
])
def process(self, data_file, column_list, creation_modules,
creation_modules_params, parameters, cores):
"""Process with the psum analysis.
The analysis is done in two steps which can both run on multiple
processors to run faster. The first step is to compute all the fluxes
associated with each model as well as ancillary data such as the SED
information. The second step is to carry out the analysis of each
object, considering all models at once.
Parameters
----------
data_file: string
Name of the file containing the observations to fit.
column_list: list of strings
Name of the columns from the data file to use for the analysis.
creation_modules: list of strings
List of the module names (in the right order) to use for creating
the SEDs.
creation_modules_params: list of dictionaries
List of the parameter dictionaries for each module.
parameters: dictionary
Dictionary containing the parameters.
core: integer
Number of cores to run the analysis on
"""
print("Initialising the analysis module... ")
# Rename the output directory if it exists
backup_dir(OUT_DIR)
# Get the parameters. They are stored in a shared module so that this
# can be accessed by subprocesses during the model generation and
# analysis. This avoids transmitting the exact same data all over again
gbl.analysed_variables = parameters["analysed_variables"]
gbl.creation_modules = creation_modules
gbl.creation_modules_params = creation_modules_params
gbl.save_best_sed = parameters["save_best_sed"].lower() == "true"
gbl.save_chi2 = parameters["save_chi2"].lower() == "true"
gbl.save_pdf = parameters["save_pdf"].lower() == "true"
# Get the needed filters in the pcigale database. We use an ordered
# dictionary because we need the keys to always be returned in the
# same order. We also put the filters in the shared modules as they
# are needed to compute the fluxes during the models generation.
with Database() as base:
gbl.filters = OrderedDict([(name, base.get_filter(name))
for name in column_list
if not name.endswith('_err')])
# Read the observation table and complete it by adding error where
# none is provided and by adding the systematic deviation.
obs_table = complete_obs_table(read_table(data_file), column_list,
gbl.filters, TOLERANCE)
print("Computing the models fluxes...")
# First we get a dummy sed to obtain some information on the
# parameters, such as which ones are dependent on the normalisation
# factor of the fit.
# The SED warehouse is used to retrieve SED corresponding to some
# modules and parameters.
gbl.warehouse = SedWarehouse(cache_type=parameters["storage_type"])
sed = gbl.warehouse.get_sed(gbl.creation_modules,
gbl.creation_modules_params[0])
gbl.info_keys = list(sed.info.keys())
gbl.mass_proportional_info = sed.mass_proportional_info
gbl.has_sfh = sed.sfh is not None
# Then, for all possible theoretical models (one for each paremeter set
# in creation_module_parameters), we compute 1) the fluxes in all the
# filters, 2) the values of the model variables, 3) the redshift of the
# models (this is important for the analysis to easily identify which
# models correspond to a given redshift), and 4) the sed.info values
# (without the keys to save space ; this is possible because we are
# using an OrderedDict so it is easy to retrieve the value for a known
# key using gbl.info_keys). Note that we could compute the last two
# elements in the main process but we would lose time doing so. Not
# necessarily much though. TODO: benchmark this.
# In order not to clog the warehouse memory with SEDs that will not be
# used again during the SED generation, we identify which parameter has
# changed, invalidating models with this parameter
changed_pars = find_changed_parameters(creation_modules_params)
# Compute the SED fluxes and ancillary data in parallel
with mp.Pool(processes=cores) as pool:
items = pool.starmap(worker_sed, zip(creation_modules_params,
changed_pars))
# We create the arrays to store the model fluxes and ancillary data.
# They are stored in a shared module so they are easily accessible by
# the processes that will carry out the analysis, without requiring any
# data transfer that will grind pcigale to a halt. We use a numpy
# masked array to mask the fluxes of models that would be older than
# the age of the Universe at the considered redshift.
gbl.model_fluxes = np.ma.empty((len(creation_modules_params),
len(gbl.filters)))
gbl.model_variables = np.ma.empty((len(creation_modules_params),
len(gbl.analysed_variables)))
gbl.model_redshifts = np.empty(len(creation_modules_params))
gbl.model_info = [None] * len(creation_modules_params)
# Unpack the computed data into their respective arrays.
for idx_item, item in enumerate(items):
gbl.model_fluxes[idx_item, :] = item[0]
gbl.model_variables[idx_item, :] = item[1]
gbl.model_redshifts[idx_item] = item[2]
gbl.model_info[idx_item] = item[3]
print('Analysing models...')
# Mask the invalid fluxes
gbl.model_fluxes = np.ma.masked_less(gbl.model_fluxes, -90)
# To make it easy to subprocesses to identify which parts of the
# various arrays correspond to a given redshift we 1) make a list of
# model redshifts, and 2) a dictionary indicatin the indices
# corresponding to a given redshift. These two objects are assigned to
# the shared module.
gbl.redshifts = np.unique(gbl.model_redshifts)
gbl.w_redshifts = {redshift: gbl.model_redshifts == redshift
for redshift in gbl.redshifts}
# Analysis of each object in parallel. All the model data are
# transmitted through a shared module to avoid memory copies that would
# grind pcigale to a halt.
with mp.Pool(processes=cores) as pool:
items = pool.starmap(worker_analysis, zip(obs_table))
# Local arrays where to unpack the results of the analysis
analysed_averages = np.empty((len(items), len(gbl.analysed_variables)))
analysed_std = np.empty_like(analysed_averages)
best_fluxes = np.empty((len(items), len(gbl.filters)))
best_parameters = [None] * len(items)
best_normalisation_factors = np.empty(len(obs_table))
best_chi2 = np.empty_like(best_normalisation_factors)
best_chi2_red = np.empty_like(best_normalisation_factors)
for item_idx, item in enumerate(items):
analysed_averages[item_idx, :] = item[0]
analysed_std[item_idx, :] = item[1]
best_fluxes[item_idx, :] = item[2]
best_parameters[item_idx] = item[3]
best_normalisation_factors[item_idx] = item[4]
best_chi2[item_idx] = item[5]
best_chi2_red[item_idx] = item[6]
print("Saving results...")
save_table_analysis(obs_table['id'], gbl.analysed_variables,
analysed_averages, analysed_std)
save_table_best(obs_table['id'], best_chi2, best_chi2_red,
best_normalisation_factors, best_parameters,
best_fluxes, gbl.filters)
print("Run completed!")
# AnalysisModule to be returned by get_module
Module = PdfAnalysis