__init__.py 11.2 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
3
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
4 5
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2013-2014 Yannick Roehlly <yannick@iaora.eu>
6
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

"""
Probability Density Function analysis module
============================================

This module builds the probability density functions (PDF) of the SED
parameters to compute their moments.

The models corresponding to all possible combinations of parameters are
computed and their fluxes in the same filters as the observations are
integrated. These fluxes are compared to the observed ones to compute the
χ² value of the fitting. This χ² give a probability that is associated with
the model values for the parameters.

At the end, for each parameter, the probability-weighted mean and standard
deviation are computed and the best fitting model (the one with the least
reduced χ²) is given for each observation.

"""

28
from collections import OrderedDict
29
import ctypes
30
import multiprocessing as mp
31 32 33 34 35
from multiprocessing.sharedctypes import RawArray
import time

import numpy as np

36 37
from ...utils import read_table
from .. import AnalysisModule, complete_obs_table
38
from .utils import save_results, analyse_chi2
39
from ...warehouse import SedWarehouse
40
from .workers import sed as worker_sed
41 42
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
43
from .workers import analysis as worker_analysis
44 45
from ..utils import backup_dir
from ...handlers.parameters_handler import ParametersHandler
46

47

48
# Tolerance threshold under which any flux or error is considered as 0.
49
TOLERANCE = 1e-12
50 51 52 53 54


class PdfAnalysis(AnalysisModule):
    """PDF analysis module"""

55
    parameter_list = OrderedDict([
56
        ("analysed_variables", (
57
            "cigale_string_list()",
58 59 60
            "List of the physical properties to estimate. Leave empty to "
            "analyse all the physical properties (not recommended when there "
            "are many models).",
61
            ["sfh.sfr", "sfh.sfr10Myrs", "sfh.sfr100Myrs"]
62 63
        )),
        ("save_best_sed", (
64
            "boolean()",
65 66 67
            "If true, save the best SED for each observation to a file.",
            False
        )),
68
        ("save_chi2", (
69
            "boolean{}",
70
            "If true, for each observation and each analysed variable save "
71
            "the reduced chi2.",
72 73 74
            False
        )),
        ("save_pdf", (
75
            "boolean{}",
76 77
            "If true, for each observation and each analysed variable save "
            "the probability density function.",
78 79
            False
        )),
80
        ("lim_flag", (
81
            "boolean()",
82 83 84
            "If true, for each object check whether upper limits are present "
            "and analyse them.",
            False
85 86
        )),
        ("mock_flag", (
87
            "boolean()",
88 89 90
            "If true, for each object we create a mock object "
            "and analyse them.",
            False
91 92 93
        ))
    ])

94
    def process(self, conf):
95 96
        """Process with the psum analysis.

97 98 99 100 101
        The analysis is done in two steps which can both run on multiple
        processors to run faster. The first step is to compute all the fluxes
        associated with each model as well as ancillary data such as the SED
        information. The second step is to carry out the analysis of each
        object, considering all models at once.
102 103 104

        Parameters
        ----------
105 106
        conf: dictionary
            Contents of pcigale.ini in the form of a dictionary
107 108

        """
109
        np.seterr(invalid='ignore')
110

111 112
        print("Initialising the analysis module... ")

113
        # Rename the output directory if it exists
114
        backup_dir()
115

116
        # Initalise variables from input arguments.
117 118 119
        creation_modules = conf['creation_modules']
        creation_modules_params = conf['creation_modules_params']
        analysed_variables = conf['analysis_method_params']["analysed_variables"]
120 121 122
        analysed_variables_nolog = [variable[:-4] if variable.endswith('_log')
                                    else variable for variable in
                                    analysed_variables]
123
        n_variables = len(analysed_variables)
124
        save = {key: conf['analysis_method_params']["save_{}".format(key)].lower() == "true"
125
                for key in ["best_sed", "chi2", "pdf"]}
126 127
        lim_flag = conf['analysis_method_params']["lim_flag"].lower() == "true"
        mock_flag = conf['analysis_method_params']["mock_flag"].lower() == "true"
128

129 130
        filters = [name for name in conf['column_list'] if not
                   name.endswith('_err')]
131
        n_filters = len(filters)
132 133 134

        # Read the observation table and complete it by adding error where
        # none is provided and by adding the systematic deviation.
135 136 137
        obs_table = complete_obs_table(read_table(conf['data_file']),
                                       conf['column_list'], filters, TOLERANCE,
                                       lim_flag)
138
        n_obs = len(obs_table)
139

140
        w_redshifting = creation_modules.index('redshifting')
141
        z = np.array(creation_modules_params[w_redshifting]['redshift'])
142 143 144 145 146 147

        # The parameters handler allows us to retrieve the models parameters
        # from a 1D index. This is useful in that we do not have to create
        # a list of parameters as they are computed on-the-fly. It also has
        # nice goodies such as finding the index of the first parameter to
        # have changed between two indices or the number of models.
148
        params = ParametersHandler(conf)
149 150
        n_params = params.size

151
        # Retrieve an arbitrary SED to obtain the list of output parameters
152
        warehouse = SedWarehouse()
153
        sed = warehouse.get_sed(creation_modules, params.from_index(0))
154 155 156
        info = list(sed.info.keys())
        info.sort()
        n_info = len(info)
157 158
        del warehouse, sed

159 160
        print("Computing the models fluxes...")

161 162 163 164 165 166 167
        # Arrays where we store the data related to the models. For memory
        # efficiency reasons, we use RawArrays that will be passed in argument
        # to the pool. Each worker will fill a part of the RawArrays. It is
        # important that there is no conflict and that two different workers do
        # not write on the same section.
        # We put the shape in a tuple along with the RawArray because workers
        # need to know the shape to create the numpy array from the RawArray.
168
        model_fluxes = (RawArray(ctypes.c_double, n_params * n_filters),
169
                        (n_params, n_filters))
170
        model_variables = (RawArray(ctypes.c_double, n_params * n_variables),
171
                           (n_params, n_variables))
172

173 174
        initargs = (params, filters, analysed_variables_nolog, model_fluxes,
                    model_variables, time.time(), mp.Value('i', 0))
175
        if conf['cores'] == 1:  # Do not create a new process
176
            init_worker_sed(*initargs)
177 178
            for idx in range(n_params):
                worker_sed(idx)
179
        else:  # Compute the models in parallel
180
            with mp.Pool(processes=conf['cores'], initializer=init_worker_sed,
181
                         initargs=initargs) as pool:
182
                pool.map(worker_sed, range(n_params))
183

184
        print("\nAnalysing models...")
185

186 187
        # We use RawArrays for the same reason as previously
        analysed_averages = (RawArray(ctypes.c_double, n_obs * n_variables),
Médéric Boquien's avatar
Médéric Boquien committed
188
                             (n_obs, n_variables))
189
        analysed_std = (RawArray(ctypes.c_double, n_obs * n_variables),
Médéric Boquien's avatar
Médéric Boquien committed
190
                        (n_obs, n_variables))
191
        best_fluxes = (RawArray(ctypes.c_double, n_obs * n_filters),
Médéric Boquien's avatar
Médéric Boquien committed
192
                       (n_obs, n_filters))
193 194 195 196 197
        best_parameters = (RawArray(ctypes.c_double, n_obs * n_info),
                           (n_obs, n_info))
        best_chi2 = (RawArray(ctypes.c_double, n_obs), (n_obs))
        best_chi2_red = (RawArray(ctypes.c_double, n_obs), (n_obs))

198 199 200 201 202
        initargs = (params, filters, analysed_variables, z, model_fluxes,
                    model_variables, time.time(), mp.Value('i', 0),
                    analysed_averages, analysed_std, best_fluxes,
                    best_parameters, best_chi2, best_chi2_red, save, lim_flag,
                    n_obs)
203
        if conf['cores'] == 1:  # Do not create a new process
204
            init_worker_analysis(*initargs)
205 206
            for idx, obs in enumerate(obs_table):
                worker_analysis(idx, obs)
207
        else:  # Analyse observations in parallel
208 209
            with mp.Pool(processes=conf['cores'],
                         initializer=init_worker_analysis,
210
                         initargs=initargs) as pool:
211
                pool.starmap(worker_analysis, enumerate(obs_table))
212

213
        analyse_chi2(best_chi2_red)
214

215 216
        print("\nSaving results...")

217 218 219
        save_results("results", obs_table['id'], analysed_variables,
                     analysed_averages, analysed_std, best_chi2, best_chi2_red,
                     best_parameters, best_fluxes, filters, info)
220 221 222 223 224

        if mock_flag is True:

            print("\nMock analysis...")

225 226 227 228
            # For the mock analysis we do not save the ancillary files
            for k in save:
                save[k] = False

229
            obs_fluxes = np.array([obs_table[name] for name in filters]).T
Médéric Boquien's avatar
Médéric Boquien committed
230 231
            obs_errors = np.array([obs_table[name + "_err"] for name in
                                   filters]).T
232
            mock_fluxes = obs_fluxes.copy()
233 234
            bestmod_fluxes = np.ctypeslib.as_array(best_fluxes[0])
            bestmod_fluxes = bestmod_fluxes.reshape(best_fluxes[1])
235 236 237 238
            wdata = np.where((obs_fluxes > TOLERANCE) &
                             (obs_errors > TOLERANCE))
            mock_fluxes[wdata] = np.random.normal(bestmod_fluxes[wdata],
                                                  obs_errors[wdata])
Médéric Boquien's avatar
Médéric Boquien committed
239

240
            mock_table = obs_table.copy()
241 242
            for idx, name in enumerate(filters):
                mock_table[name] = mock_fluxes[:, idx]
Médéric Boquien's avatar
Médéric Boquien committed
243

244 245 246 247 248
            initargs = (params, filters, analysed_variables, z, model_fluxes,
                        model_variables, time.time(), mp.Value('i', 0),
                        analysed_averages, analysed_std, best_fluxes,
                        best_parameters, best_chi2, best_chi2_red, save,
                        lim_flag, n_obs)
249 250 251 252 253 254 255 256 257
            if cores == 1:  # Do not create a new process
                init_worker_analysis(*initargs)
                for idx, mock in enumerate(mock_table):
                    worker_analysis(idx, mock)
            else:  # Analyse observations in parallel
                with mp.Pool(processes=cores, initializer=init_worker_analysis,
                             initargs=initargs) as pool:
                    pool.starmap(worker_analysis, enumerate(mock_table))

258
            print("\nSaving results...")
259

260 261 262 263
            save_results("results_mock", mock_table['id'], analysed_variables,
                         analysed_averages, analysed_std, best_chi2,
                         best_chi2_red, best_parameters, best_fluxes, filters,
                         info)
264 265

        print("Run completed!")
266

267 268
# AnalysisModule to be returned by get_module
Module = PdfAnalysis