__init__.py 12 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
3
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
4 5
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2013-2014 Yannick Roehlly <yannick@iaora.eu>
6
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

"""
Probability Density Function analysis module
============================================

This module builds the probability density functions (PDF) of the SED
parameters to compute their moments.

The models corresponding to all possible combinations of parameters are
computed and their fluxes in the same filters as the observations are
integrated. These fluxes are compared to the observed ones to compute the
χ² value of the fitting. This χ² give a probability that is associated with
the model values for the parameters.

At the end, for each parameter, the probability-weighted mean and standard
deviation are computed and the best fitting model (the one with the least
reduced χ²) is given for each observation.

"""

28
import ctypes
29
import multiprocessing as mp
30 31 32 33 34
from multiprocessing.sharedctypes import RawArray
import time

import numpy as np

35 36
from ...utils import read_table
from .. import AnalysisModule, complete_obs_table
37
from .utils import save_table_analysis, save_table_best, analyse_chi2
38
from ...warehouse import SedWarehouse
39
from .workers import sed as worker_sed
40 41
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
42
from .workers import analysis as worker_analysis
43
from ..utils import ParametersHandler, backup_dir
44

45

46
# Tolerance threshold under which any flux or error is considered as 0.
47
TOLERANCE = 1e-12
48
# Limit the redshift to this number of decimals
49
REDSHIFT_DECIMALS = 2
50 51 52 53 54


class PdfAnalysis(AnalysisModule):
    """PDF analysis module"""

55
    parameter_list = dict([
56 57 58 59
        ("analysed_variables", (
            "array of strings",
            "List of the variables (in the SEDs info dictionaries) for which "
            "the statistical analysis will be done.",
60
            ["sfh.sfr", "sfh.sfr10Myrs", "sfh.sfr100Myrs"]
61 62 63 64 65 66
        )),
        ("save_best_sed", (
            "boolean",
            "If true, save the best SED for each observation to a file.",
            False
        )),
67
        ("save_chi2", (
68
            "boolean",
69
            "If true, for each observation and each analysed variable save "
70
            "the reduced chi2.",
71 72 73 74
            False
        )),
        ("save_pdf", (
            "boolean",
75 76
            "If true, for each observation and each analysed variable save "
            "the probability density function.",
77 78
            False
        )),
79 80 81 82 83
        ("lim_flag", (
            "boolean",
            "If true, for each object check whether upper limits are present "
            "and analyse them.",
            False
84 85 86 87 88 89
        )),
        ("mock_flag", (
            "boolean",
            "If true, for each object we create a mock object "
            "and analyse them.",
            False
90 91 92 93
        ))
    ])

    def process(self, data_file, column_list, creation_modules,
94
                creation_modules_params, config, cores):
95 96
        """Process with the psum analysis.

97 98 99 100 101
        The analysis is done in two steps which can both run on multiple
        processors to run faster. The first step is to compute all the fluxes
        associated with each model as well as ancillary data such as the SED
        information. The second step is to carry out the analysis of each
        object, considering all models at once.
102 103 104 105 106 107 108 109 110 111 112 113

        Parameters
        ----------
        data_file: string
            Name of the file containing the observations to fit.
        column_list: list of strings
            Name of the columns from the data file to use for the analysis.
        creation_modules: list of strings
            List of the module names (in the right order) to use for creating
            the SEDs.
        creation_modules_params: list of dictionaries
            List of the parameter dictionaries for each module.
114 115
        config: dictionary
            Dictionary containing the configuration.
116 117
        core: integer
            Number of cores to run the analysis on
118 119

        """
120
        np.seterr(invalid='ignore')
121

122 123
        print("Initialising the analysis module... ")

124
        # Rename the output directory if it exists
125
        backup_dir()
126

127 128
        # Initalise variables from input arguments.
        analysed_variables = config["analysed_variables"]
129 130
        analysed_variables_nolog = [variable.rstrip('_log') for variable in
                                    analysed_variables]
131
        n_variables = len(analysed_variables)
Médéric Boquien's avatar
Médéric Boquien committed
132
        save = {key: config["save_{}".format(key)].lower() == "true"
133
                for key in ["best_sed", "chi2", "pdf"]}
134
        lim_flag = config["lim_flag"].lower() == "true"
135
        mock_flag = config["mock_flag"].lower() == "true"
136

137
        filters = [name for name in column_list if not name.endswith('_err')]
138
        n_filters = len(filters)
139 140 141

        # Read the observation table and complete it by adding error where
        # none is provided and by adding the systematic deviation.
142
        obs_table = complete_obs_table(read_table(data_file), column_list,
143
                                       filters, TOLERANCE, lim_flag)
144
        n_obs = len(obs_table)
145

146
        w_redshifting = creation_modules.index('redshifting')
147
        if list(creation_modules_params[w_redshifting]['redshift']) == ['']:
148 149 150
            z = np.unique(np.around(obs_table['redshift'],
                                    decimals=REDSHIFT_DECIMALS))
            creation_modules_params[w_redshifting]['redshift'] = z
151 152
        else:
            z = np.array(creation_modules_params[w_redshifting]['redshift'])
153 154 155 156 157 158 159 160 161

        # The parameters handler allows us to retrieve the models parameters
        # from a 1D index. This is useful in that we do not have to create
        # a list of parameters as they are computed on-the-fly. It also has
        # nice goodies such as finding the index of the first parameter to
        # have changed between two indices or the number of models.
        params = ParametersHandler(creation_modules, creation_modules_params)
        n_params = params.size

162
        # Retrieve an arbitrary SED to obtain the list of output parameters
163
        warehouse = SedWarehouse()
164
        sed = warehouse.get_sed(creation_modules, params.from_index(0))
165 166 167
        info = list(sed.info.keys())
        info.sort()
        n_info = len(info)
168 169
        del warehouse, sed

170 171
        print("Computing the models fluxes...")

172 173 174 175 176 177 178 179
        # Arrays where we store the data related to the models. For memory
        # efficiency reasons, we use RawArrays that will be passed in argument
        # to the pool. Each worker will fill a part of the RawArrays. It is
        # important that there is no conflict and that two different workers do
        # not write on the same section.
        # We put the shape in a tuple along with the RawArray because workers
        # need to know the shape to create the numpy array from the RawArray.
        model_fluxes = (RawArray(ctypes.c_double,
180 181
                                 n_params * n_filters),
                        (n_params, n_filters))
182
        model_variables = (RawArray(ctypes.c_double,
183 184
                                    n_params * n_variables),
                           (n_params, n_variables))
185

186 187
        initargs = (params, filters, analysed_variables_nolog, model_fluxes,
                    model_variables, time.time(), mp.Value('i', 0))
188 189
        if cores == 1:  # Do not create a new process
            init_worker_sed(*initargs)
190 191
            for idx in range(n_params):
                worker_sed(idx)
192
        else:  # Compute the models in parallel
193 194
            with mp.Pool(processes=cores, initializer=init_worker_sed,
                         initargs=initargs) as pool:
195
                pool.map(worker_sed, range(n_params))
196

197
        print("\nAnalysing models...")
198

199 200
        # We use RawArrays for the same reason as previously
        analysed_averages = (RawArray(ctypes.c_double, n_obs * n_variables),
Médéric Boquien's avatar
Médéric Boquien committed
201
                             (n_obs, n_variables))
202
        analysed_std = (RawArray(ctypes.c_double, n_obs * n_variables),
Médéric Boquien's avatar
Médéric Boquien committed
203
                        (n_obs, n_variables))
204
        best_fluxes = (RawArray(ctypes.c_double, n_obs * n_filters),
Médéric Boquien's avatar
Médéric Boquien committed
205
                       (n_obs, n_filters))
206 207 208 209 210
        best_parameters = (RawArray(ctypes.c_double, n_obs * n_info),
                           (n_obs, n_info))
        best_chi2 = (RawArray(ctypes.c_double, n_obs), (n_obs))
        best_chi2_red = (RawArray(ctypes.c_double, n_obs), (n_obs))

211 212 213 214 215
        initargs = (params, filters, analysed_variables, z, model_fluxes,
                    model_variables, time.time(), mp.Value('i', 0),
                    analysed_averages, analysed_std, best_fluxes,
                    best_parameters, best_chi2, best_chi2_red, save, lim_flag,
                    n_obs)
216 217
        if cores == 1:  # Do not create a new process
            init_worker_analysis(*initargs)
218 219
            for idx, obs in enumerate(obs_table):
                worker_analysis(idx, obs)
220
        else:  # Analyse observations in parallel
221 222
            with mp.Pool(processes=cores, initializer=init_worker_analysis,
                         initargs=initargs) as pool:
223
                pool.starmap(worker_analysis, enumerate(obs_table))
224

225
        analyse_chi2(best_chi2_red)
226

227 228
        print("\nSaving results...")

Médéric Boquien's avatar
Médéric Boquien committed
229 230 231 232 233
        save_table_analysis('analysis_results.txt', obs_table['id'],
                            analysed_variables, analysed_averages,
                            analysed_std)
        save_table_best('best_models.txt', obs_table['id'], best_chi2,
                        best_chi2_red, best_parameters, best_fluxes, filters,
234
                        info)
235 236 237 238 239

        if mock_flag is True:

            print("\nMock analysis...")

240 241 242 243
            # For the mock analysis we do not save the ancillary files
            for k in save:
                save[k] = False

244
            obs_fluxes = np.array([obs_table[name] for name in filters]).T
Médéric Boquien's avatar
Médéric Boquien committed
245 246
            obs_errors = np.array([obs_table[name + "_err"] for name in
                                   filters]).T
247
            mock_fluxes = obs_fluxes.copy()
248 249
            bestmod_fluxes = np.ctypeslib.as_array(best_fluxes[0])
            bestmod_fluxes = bestmod_fluxes.reshape(best_fluxes[1])
250 251 252 253
            wdata = np.where((obs_fluxes > TOLERANCE) &
                             (obs_errors > TOLERANCE))
            mock_fluxes[wdata] = np.random.normal(bestmod_fluxes[wdata],
                                                  obs_errors[wdata])
Médéric Boquien's avatar
Médéric Boquien committed
254

255
            mock_table = obs_table.copy()
256 257
            for idx, name in enumerate(filters):
                mock_table[name] = mock_fluxes[:, idx]
Médéric Boquien's avatar
Médéric Boquien committed
258

259 260 261 262 263
            initargs = (params, filters, analysed_variables, z, model_fluxes,
                        model_variables, time.time(), mp.Value('i', 0),
                        analysed_averages, analysed_std, best_fluxes,
                        best_parameters, best_chi2, best_chi2_red, save,
                        lim_flag, n_obs)
264 265 266 267 268 269 270 271 272
            if cores == 1:  # Do not create a new process
                init_worker_analysis(*initargs)
                for idx, mock in enumerate(mock_table):
                    worker_analysis(idx, mock)
            else:  # Analyse observations in parallel
                with mp.Pool(processes=cores, initializer=init_worker_analysis,
                             initargs=initargs) as pool:
                    pool.starmap(worker_analysis, enumerate(mock_table))

273
            print("\nSaving results...")
274

Médéric Boquien's avatar
Médéric Boquien committed
275 276 277 278 279
            save_table_analysis('analysis_mock_results.txt', mock_table['id'],
                                analysed_variables, analysed_averages,
                                analysed_std)
            save_table_best('best_mock_models.txt', mock_table['id'],
                            best_chi2, best_chi2_red, best_parameters,
280
                            best_fluxes, filters, info)
281 282

        print("Run completed!")
283

284 285
# AnalysisModule to be returned by get_module
Module = PdfAnalysis