__init__.py 13 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
3
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
4 5
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2013-2014 Yannick Roehlly <yannick@iaora.eu>
6
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28

"""
Probability Density Function analysis module
============================================

This module builds the probability density functions (PDF) of the SED
parameters to compute their moments.

The models corresponding to all possible combinations of parameters are
computed and their fluxes in the same filters as the observations are
integrated. These fluxes are compared to the observed ones to compute the
χ² value of the fitting. This χ² give a probability that is associated with
the model values for the parameters.

At the end, for each parameter, the probability-weighted mean and standard
deviation are computed and the best fitting model (the one with the least
reduced χ²) is given for each observation.

"""

from collections import OrderedDict
29
import ctypes
30
import multiprocessing as mp
31 32 33 34 35
from multiprocessing.sharedctypes import RawArray
import time

import numpy as np

36 37
from ...utils import read_table
from .. import AnalysisModule, complete_obs_table
38
from .utils import save_table_analysis, save_table_best, analyse_chi2
39 40
from ...warehouse import SedWarehouse
from ...data import Database
41
from .workers import sed as worker_sed
42 43
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
44
from .workers import analysis as worker_analysis
45
from ..utils import ParametersHandler, backup_dir
46

47 48
from ..utils import OUT_DIR

49
# Tolerance threshold under which any flux or error is considered as 0.
50
TOLERANCE = 1e-12
51
# Limit the redshift to this number of decimals
52
REDSHIFT_DECIMALS = 2
53 54 55 56 57 58 59 60 61 62


class PdfAnalysis(AnalysisModule):
    """PDF analysis module"""

    parameter_list = OrderedDict([
        ("analysed_variables", (
            "array of strings",
            "List of the variables (in the SEDs info dictionaries) for which "
            "the statistical analysis will be done.",
63
            ["sfr", "sfr10Myrs", "sfr100Myrs"]
64 65 66 67 68 69
        )),
        ("save_best_sed", (
            "boolean",
            "If true, save the best SED for each observation to a file.",
            False
        )),
70
        ("save_chi2", (
71
            "boolean",
72
            "If true, for each observation and each analysed variable save "
73
            "the reduced chi2.",
74 75 76 77
            False
        )),
        ("save_pdf", (
            "boolean",
78 79
            "If true, for each observation and each analysed variable save "
            "the probability density function.",
80 81
            False
        )),
82 83 84 85 86
        ("lim_flag", (
            "boolean",
            "If true, for each object check whether upper limits are present "
            "and analyse them.",
            False
87 88 89 90 91 92
        )),
        ("mock_flag", (
            "boolean",
            "If true, for each object we create a mock object "
            "and analyse them.",
            False
93 94 95 96
        ))
    ])

    def process(self, data_file, column_list, creation_modules,
97
                creation_modules_params, config, cores):
98 99
        """Process with the psum analysis.

100 101 102 103 104
        The analysis is done in two steps which can both run on multiple
        processors to run faster. The first step is to compute all the fluxes
        associated with each model as well as ancillary data such as the SED
        information. The second step is to carry out the analysis of each
        object, considering all models at once.
105 106 107 108 109 110 111 112 113 114 115 116

        Parameters
        ----------
        data_file: string
            Name of the file containing the observations to fit.
        column_list: list of strings
            Name of the columns from the data file to use for the analysis.
        creation_modules: list of strings
            List of the module names (in the right order) to use for creating
            the SEDs.
        creation_modules_params: list of dictionaries
            List of the parameter dictionaries for each module.
117 118
        config: dictionary
            Dictionary containing the configuration.
119 120
        core: integer
            Number of cores to run the analysis on
121 122 123

        """

124 125
        print("Initialising the analysis module... ")

126
        # Rename the output directory if it exists
127
        backup_dir()
128

129 130 131
        # Initalise variables from input arguments.
        analysed_variables = config["analysed_variables"]
        n_variables = len(analysed_variables)
Médéric Boquien's avatar
Médéric Boquien committed
132
        save = {key: config["save_{}".format(key)].lower() == "true"
133
                for key in ["best_sed", "chi2", "pdf"]}
134
        lim_flag = config["lim_flag"].lower() == "true"
135
        mock_flag = config["mock_flag"].lower() == "true"
136

137 138
        # Get the needed filters in the pcigale database. We use an ordered
        # dictionary because we need the keys to always be returned in the
139 140
        # same order. We also put the filters in the shared modules as they
        # are needed to compute the fluxes during the models generation.
141
        with Database() as base:
142 143 144 145
            filters = OrderedDict([(name, base.get_filter(name))
                                   for name in column_list
                                   if not name.endswith('_err')])
        n_filters = len(filters)
146 147 148

        # Read the observation table and complete it by adding error where
        # none is provided and by adding the systematic deviation.
149
        obs_table = complete_obs_table(read_table(data_file), column_list,
150
                                       filters, TOLERANCE, lim_flag)
151
        n_obs = len(obs_table)
152

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
        w_redshifting = creation_modules.index('redshifting')
        if creation_modules_params[w_redshifting]['redshift'] == ['']:
            z = np.unique(np.around(obs_table['redshift'],
                                    decimals=REDSHIFT_DECIMALS))
            creation_modules_params[w_redshifting]['redshift'] = z
            del z

        # The parameters handler allows us to retrieve the models parameters
        # from a 1D index. This is useful in that we do not have to create
        # a list of parameters as they are computed on-the-fly. It also has
        # nice goodies such as finding the index of the first parameter to
        # have changed between two indices or the number of models.
        params = ParametersHandler(creation_modules, creation_modules_params)
        n_params = params.size

168
        # Retrieve an arbitrary SED to obtain the list of output parameters
169
        warehouse = SedWarehouse()
170 171 172 173 174
        sed = warehouse.get_sed(creation_modules, params.from_index(0))
        info = sed.info
        n_info = len(sed.info)
        del warehouse, sed

175 176
        print("Computing the models fluxes...")

177 178 179 180 181 182 183
        # Arrays where we store the data related to the models. For memory
        # efficiency reasons, we use RawArrays that will be passed in argument
        # to the pool. Each worker will fill a part of the RawArrays. It is
        # important that there is no conflict and that two different workers do
        # not write on the same section.
        # We put the shape in a tuple along with the RawArray because workers
        # need to know the shape to create the numpy array from the RawArray.
184 185
        model_redshifts = (RawArray(ctypes.c_double, n_params),
                           (n_params))
186
        model_fluxes = (RawArray(ctypes.c_double,
187 188
                                 n_params * n_filters),
                        (n_params, n_filters))
189
        model_variables = (RawArray(ctypes.c_double,
190 191
                                    n_params * n_variables),
                           (n_params, n_variables))
192

193 194 195
        initargs = (params, filters, analysed_variables, model_redshifts,
                    model_fluxes, model_variables, time.time(),
                    mp.Value('i', 0))
196 197
        if cores == 1:  # Do not create a new process
            init_worker_sed(*initargs)
198 199 200
            for idx in range(n_params):
                worker_sed(idx)
        else:  # Analyse observations in parallel
201 202
            with mp.Pool(processes=cores, initializer=init_worker_sed,
                         initargs=initargs) as pool:
203
                pool.map(worker_sed, range(n_params))
204

205
        print("\nAnalysing models...")
206

207 208
        # We use RawArrays for the same reason as previously
        analysed_averages = (RawArray(ctypes.c_double, n_obs * n_variables),
Médéric Boquien's avatar
Médéric Boquien committed
209
                             (n_obs, n_variables))
210
        analysed_std = (RawArray(ctypes.c_double, n_obs * n_variables),
Médéric Boquien's avatar
Médéric Boquien committed
211
                        (n_obs, n_variables))
212
        best_fluxes = (RawArray(ctypes.c_double, n_obs * n_filters),
Médéric Boquien's avatar
Médéric Boquien committed
213
                       (n_obs, n_filters))
214 215 216 217 218
        best_parameters = (RawArray(ctypes.c_double, n_obs * n_info),
                           (n_obs, n_info))
        best_chi2 = (RawArray(ctypes.c_double, n_obs), (n_obs))
        best_chi2_red = (RawArray(ctypes.c_double, n_obs), (n_obs))

219
        phase = 1
220 221
        initargs = (params, filters, analysed_variables, model_redshifts,
                    model_fluxes, model_variables, time.time(),
222 223
                    mp.Value('i', 0), analysed_averages, analysed_std,
                    best_fluxes, best_parameters, best_chi2, best_chi2_red,
224
                    save, lim_flag, n_obs, phase)
225 226
        if cores == 1:  # Do not create a new process
            init_worker_analysis(*initargs)
227 228
            for idx, obs in enumerate(obs_table):
                worker_analysis(idx, obs)
229
        else:  # Analyse observations in parallel
230 231
            with mp.Pool(processes=cores, initializer=init_worker_analysis,
                         initargs=initargs) as pool:
232
                pool.starmap(worker_analysis, enumerate(obs_table))
233

234
        analyse_chi2(best_chi2_red)
235

236 237
        print("\nSaving results...")

238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
        save_table_analysis('analysis_results.txt', obs_table['id'], 
                            analysed_variables, analysed_averages, analysed_std)
        save_table_best('best_models.txt', obs_table['id'], best_chi2, 
                     best_chi2_red, best_parameters, best_fluxes, filters, info)

        if mock_flag is True:

            print("\nMock analysis...")

            obs_fluxes = np.array([obs_table[name] for name in filters]).T
            obs_errors = np.array([obs_table[name + "_err"] for name in filters]).T
            mock_fluxes = np.empty_like(obs_fluxes)
            mock_errors = np.empty_like(obs_errors)
            bestmod_fluxes = np.ctypeslib.as_array(best_fluxes[0])
            bestmod_fluxes = bestmod_fluxes.reshape(best_fluxes[1])

            wdata =   np.where((obs_fluxes > 0.)&(obs_errors > 0.))
            wlim =    np.where((obs_fluxes > 0.)&
                               (obs_errors >= -9990.)&(obs_errors < 0.))
            wnodata = np.where((obs_fluxes <= -9990.)&(obs_errors <= -9990.))
            
            mock_fluxes[wdata] = np.random.normal(
                           bestmod_fluxes[wdata], 
                           obs_errors[wdata], 
                           len(bestmod_fluxes[wdata]))
            mock_errors[wdata] = obs_errors[wdata]
                
            mock_fluxes[wlim] = obs_fluxes[wlim]
            mock_errors[wlim] = obs_errors[wlim]

            mock_fluxes[wnodata] = obs_fluxes[wnodata]
            mock_errors[wnodata] = obs_errors[wnodata]
                
            mock_table = obs_table.copy()
            mock_table['id'] = obs_table['id']
            mock_table['redshift'] = obs_table['redshift']
            
            indx = 0
            for name in filters:
                mock_table[name] = mock_fluxes[:, indx]
                mock_table[name + "_err"] = mock_errors[:, indx]
                indx += 1
                                       
281
            phase = 2
282 283
            initargs = (params, filters, analysed_variables, model_redshifts,
                    model_fluxes, model_variables, time.time(),
284 285 286
                    mp.Value('i', 0), analysed_averages, analysed_std,
                    best_fluxes, best_parameters, best_chi2,
                    best_chi2_red, save, lim_flag, n_obs, phase)
287 288 289 290 291 292 293 294 295
            if cores == 1:  # Do not create a new process
                init_worker_analysis(*initargs)
                for idx, mock in enumerate(mock_table):
                    worker_analysis(idx, mock)
            else:  # Analyse observations in parallel
                with mp.Pool(processes=cores, initializer=init_worker_analysis,
                             initargs=initargs) as pool:
                    pool.starmap(worker_analysis, enumerate(mock_table))

296
            print("\nSaving results...")
297 298

            save_table_analysis('analysis_mock_results.txt', mock_table['id'], 
299
                  analysed_variables, analysed_averages, analysed_std)
300
            save_table_best('best_mock_models.txt', mock_table['id'], 
301 302
                        best_chi2, best_chi2_red,
                        best_parameters, best_fluxes, filters, info)
303 304

        print("Run completed!")
305

306 307
# AnalysisModule to be returned by get_module
Module = PdfAnalysis