__init__.py 10.8 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
3
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
4 5
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2013-2014 Yannick Roehlly <yannick@iaora.eu>
6
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

"""
Probability Density Function analysis module
============================================

This module builds the probability density functions (PDF) of the SED
parameters to compute their moments.

The models corresponding to all possible combinations of parameters are
computed and their fluxes in the same filters as the observations are
integrated. These fluxes are compared to the observed ones to compute the
χ² value of the fitting. This χ² give a probability that is associated with
the model values for the parameters.

At the end, for each parameter, the probability-weighted mean and standard
deviation are computed and the best fitting model (the one with the least
reduced χ²) is given for each observation.

"""

28
from collections import OrderedDict
29
import ctypes
30
import multiprocessing as mp
31 32 33 34 35
from multiprocessing.sharedctypes import RawArray
import time

import numpy as np

36 37
from ...utils import read_table
from .. import AnalysisModule, complete_obs_table
38
from .utils import save_results, analyse_chi2
39
from ...warehouse import SedWarehouse
40
from .workers import sed as worker_sed
41 42
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
43
from .workers import analysis as worker_analysis
44
from ...managers.parameters import ParametersManager
45

46

47
# Tolerance threshold under which any flux or error is considered as 0.
48
TOLERANCE = 1e-12
49 50 51 52 53


class PdfAnalysis(AnalysisModule):
    """PDF analysis module"""

54
    parameter_list = OrderedDict([
55
        ("variables", (
56
            "cigale_string_list()",
57 58 59
            "List of the physical properties to estimate. Leave empty to "
            "analyse all the physical properties (not recommended when there "
            "are many models).",
60
            ["sfh.sfr", "sfh.sfr10Myrs", "sfh.sfr100Myrs"]
61 62
        )),
        ("save_best_sed", (
63
            "boolean()",
64 65 66
            "If true, save the best SED for each observation to a file.",
            False
        )),
67
        ("save_chi2", (
68
            "boolean()",
69
            "If true, for each observation and each analysed variable save "
70
            "the reduced chi2.",
71 72 73
            False
        )),
        ("save_pdf", (
74
            "boolean()",
75 76
            "If true, for each observation and each analysed variable save "
            "the probability density function.",
77 78
            False
        )),
79
        ("lim_flag", (
80
            "boolean()",
81 82 83
            "If true, for each object check whether upper limits are present "
            "and analyse them.",
            False
84 85
        )),
        ("mock_flag", (
86
            "boolean()",
87 88 89
            "If true, for each object we create a mock object "
            "and analyse them.",
            False
90 91 92
        ))
    ])

93
    def process(self, conf):
94 95
        """Process with the psum analysis.

96 97 98 99 100
        The analysis is done in two steps which can both run on multiple
        processors to run faster. The first step is to compute all the fluxes
        associated with each model as well as ancillary data such as the SED
        information. The second step is to carry out the analysis of each
        object, considering all models at once.
101 102 103

        Parameters
        ----------
104 105
        conf: dictionary
            Contents of pcigale.ini in the form of a dictionary
106 107

        """
108
        np.seterr(invalid='ignore')
109

110 111
        print("Initialising the analysis module... ")

112
        # Rename the output directory if it exists
113
        self.prepare_dirs()
114

115
        # Initalise variables from input arguments.
116 117 118 119
        variables = conf['analysis_params']["variables"]
        variables_nolog = [variable[:-4] if variable.endswith('_log') else
                           variable for variable in variables]
        n_variables = len(variables)
120 121 122
        save = {key: conf['analysis_params']["save_{}".format(key)] for key in
                ["best_sed", "chi2", "pdf"]}
        lim_flag = conf['analysis_params']["lim_flag"]
123

124
        filters = [name for name in conf['bands'] if not
125
                   name.endswith('_err')]
126
        n_filters = len(filters)
127 128 129

        # Read the observation table and complete it by adding error where
        # none is provided and by adding the systematic deviation.
130
        obs_table = complete_obs_table(read_table(conf['data_file']),
131
                                       conf['bands'], filters, TOLERANCE,
132
                                       lim_flag)
133
        n_obs = len(obs_table)
134

135
        z = np.array(conf['sed_modules_params']['redshifting']['redshift'])
136

137
        # The parameters manager allows us to retrieve the models parameters
138 139 140 141
        # from a 1D index. This is useful in that we do not have to create
        # a list of parameters as they are computed on-the-fly. It also has
        # nice goodies such as finding the index of the first parameter to
        # have changed between two indices or the number of models.
142
        params = ParametersManager(conf)
143 144
        n_params = params.size

145
        # Retrieve an arbitrary SED to obtain the list of output parameters
146
        warehouse = SedWarehouse()
147
        sed = warehouse.get_sed(conf['sed_modules'], params.from_index(0))
148 149 150
        info = list(sed.info.keys())
        info.sort()
        n_info = len(info)
151 152
        del warehouse, sed

153 154
        print("Computing the models fluxes...")

155 156 157 158 159 160 161
        # Arrays where we store the data related to the models. For memory
        # efficiency reasons, we use RawArrays that will be passed in argument
        # to the pool. Each worker will fill a part of the RawArrays. It is
        # important that there is no conflict and that two different workers do
        # not write on the same section.
        # We put the shape in a tuple along with the RawArray because workers
        # need to know the shape to create the numpy array from the RawArray.
162
        model_fluxes = (RawArray(ctypes.c_double, n_params * n_filters),
163
                        (n_filters, n_params))
164
        model_variables = (RawArray(ctypes.c_double, n_params * n_variables),
165
                           (n_variables, n_params))
166

167
        initargs = (params, filters, variables_nolog, model_fluxes,
168
                    model_variables, time.time(), mp.Value('i', 0))
169
        if conf['cores'] == 1:  # Do not create a new process
170
            init_worker_sed(*initargs)
171 172
            for idx in range(n_params):
                worker_sed(idx)
173
        else:  # Compute the models in parallel
174
            with mp.Pool(processes=conf['cores'], initializer=init_worker_sed,
175
                         initargs=initargs) as pool:
176
                pool.map(worker_sed, range(n_params))
177

178
        print("\nAnalysing models...")
179

180 181
        # We use RawArrays for the same reason as previously
        analysed_averages = (RawArray(ctypes.c_double, n_obs * n_variables),
Médéric Boquien's avatar
Médéric Boquien committed
182
                             (n_obs, n_variables))
183
        analysed_std = (RawArray(ctypes.c_double, n_obs * n_variables),
Médéric Boquien's avatar
Médéric Boquien committed
184
                        (n_obs, n_variables))
185
        best_fluxes = (RawArray(ctypes.c_double, n_obs * n_filters),
Médéric Boquien's avatar
Médéric Boquien committed
186
                       (n_obs, n_filters))
187 188 189 190 191
        best_parameters = (RawArray(ctypes.c_double, n_obs * n_info),
                           (n_obs, n_info))
        best_chi2 = (RawArray(ctypes.c_double, n_obs), (n_obs))
        best_chi2_red = (RawArray(ctypes.c_double, n_obs), (n_obs))

192
        initargs = (params, filters, variables, z, model_fluxes,
193 194 195 196
                    model_variables, time.time(), mp.Value('i', 0),
                    analysed_averages, analysed_std, best_fluxes,
                    best_parameters, best_chi2, best_chi2_red, save, lim_flag,
                    n_obs)
197
        if conf['cores'] == 1:  # Do not create a new process
198
            init_worker_analysis(*initargs)
199 200
            for idx, obs in enumerate(obs_table):
                worker_analysis(idx, obs)
201
        else:  # Analyse observations in parallel
202 203
            with mp.Pool(processes=conf['cores'],
                         initializer=init_worker_analysis,
204
                         initargs=initargs) as pool:
205
                pool.starmap(worker_analysis, enumerate(obs_table))
206

207
        analyse_chi2(best_chi2_red)
208

209 210
        print("\nSaving results...")

211 212 213
        save_results("results", obs_table['id'], variables, analysed_averages,
                     analysed_std, best_chi2, best_chi2_red, best_parameters,
                     best_fluxes, filters, info)
214

215
        if conf['analysis_params']['mock_flag'] is True:
216 217 218

            print("\nMock analysis...")

219 220 221 222
            # For the mock analysis we do not save the ancillary files
            for k in save:
                save[k] = False

223
            obs_fluxes = np.array([obs_table[name] for name in filters]).T
Médéric Boquien's avatar
Médéric Boquien committed
224 225
            obs_errors = np.array([obs_table[name + "_err"] for name in
                                   filters]).T
226
            mock_fluxes = obs_fluxes.copy()
227 228
            bestmod_fluxes = np.ctypeslib.as_array(best_fluxes[0])
            bestmod_fluxes = bestmod_fluxes.reshape(best_fluxes[1])
229 230 231 232
            wdata = np.where((obs_fluxes > TOLERANCE) &
                             (obs_errors > TOLERANCE))
            mock_fluxes[wdata] = np.random.normal(bestmod_fluxes[wdata],
                                                  obs_errors[wdata])
Médéric Boquien's avatar
Médéric Boquien committed
233

234
            mock_table = obs_table.copy()
235 236
            for idx, name in enumerate(filters):
                mock_table[name] = mock_fluxes[:, idx]
Médéric Boquien's avatar
Médéric Boquien committed
237

238
            initargs = (params, filters, variables, z, model_fluxes,
239 240 241 242
                        model_variables, time.time(), mp.Value('i', 0),
                        analysed_averages, analysed_std, best_fluxes,
                        best_parameters, best_chi2, best_chi2_red, save,
                        lim_flag, n_obs)
243
            if conf['cores'] == 1:  # Do not create a new process
244 245 246 247
                init_worker_analysis(*initargs)
                for idx, mock in enumerate(mock_table):
                    worker_analysis(idx, mock)
            else:  # Analyse observations in parallel
248 249
                with mp.Pool(processes=conf['cores'],
                             initializer=init_worker_analysis,
250 251 252
                             initargs=initargs) as pool:
                    pool.starmap(worker_analysis, enumerate(mock_table))

253
            print("\nSaving results...")
254

255
            save_results("results_mock", mock_table['id'], variables,
256 257 258
                         analysed_averages, analysed_std, best_chi2,
                         best_chi2_red, best_parameters, best_fluxes, filters,
                         info)
259 260

        print("Run completed!")
261

262

263 264
# AnalysisModule to be returned by get_module
Module = PdfAnalysis