__init__.py 8.09 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
3
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
4 5
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2013-2014 Yannick Roehlly <yannick@iaora.eu>
6
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

"""
Probability Density Function analysis module
============================================

This module builds the probability density functions (PDF) of the SED
parameters to compute their moments.

The models corresponding to all possible combinations of parameters are
computed and their fluxes in the same filters as the observations are
integrated. These fluxes are compared to the observed ones to compute the
χ² value of the fitting. This χ² give a probability that is associated with
the model values for the parameters.

At the end, for each parameter, the probability-weighted mean and standard
deviation are computed and the best fitting model (the one with the least
reduced χ²) is given for each observation.

"""

28
from collections import OrderedDict
29
import multiprocessing as mp
30 31 32

import numpy as np

33
from .. import AnalysisModule
34
from ..utils import Counter
35
from .workers import sed as worker_sed
36 37
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
38
from .workers import init_bestfit as init_worker_bestfit
39
from .workers import analysis as worker_analysis
40 41 42
from .workers import bestfit as worker_bestfit
from ...managers.results import ResultsManager
from ...managers.models import ModelsManager
43
from ...managers.observations import ObservationsManager
44
from ...managers.parameters import ParametersManager
45

46

47 48 49
class PdfAnalysis(AnalysisModule):
    """PDF analysis module"""

50
    parameter_list = OrderedDict([
51
        ("variables", (
52
            "cigale_string_list()",
53 54 55
            "List of the physical properties to estimate. Leave empty to "
            "analyse all the physical properties (not recommended when there "
            "are many models).",
56
            ["sfh.sfr", "sfh.sfr10Myrs", "sfh.sfr100Myrs"]
57 58
        )),
        ("save_best_sed", (
59
            "boolean()",
60 61 62
            "If true, save the best SED for each observation to a file.",
            False
        )),
63
        ("save_chi2", (
64
            "boolean()",
65 66
            "If true, for each observation and each analysed property, save "
            "the raw chi2. It occupies ~15 MB/million models/variable.",
67 68
            False
        )),
69
        ("lim_flag", (
70
            "boolean()",
71 72 73
            "If true, for each object check whether upper limits are present "
            "and analyse them.",
            False
74 75
        )),
        ("mock_flag", (
76
            "boolean()",
77 78 79
            "If true, for each object we create a mock object "
            "and analyse them.",
            False
80 81 82 83 84 85 86 87
        )),
        ("redshift_decimals", (
            "integer()",
            "When redshifts are not given explicitly in the redshifting "
            "module, number of decimals to round the observed redshifts to "
            "compute the grid of models. To disable rounding give a negative "
            "value. Do not round if you use narrow-band filters.",
            2
88 89 90 91 92 93 94
        )),
        ("blocks", (
            "integer(min=1)",
            "Number of blocks to compute the models and analyse the "
            "observations. If there is enough memory, we strongly recommend "
            "this to be set to 1.",
            1
95 96 97
        ))
    ])

98 99
    def _compute_models(self, conf, obs, params, iblock):
        models = ModelsManager(conf, obs, params, iblock)
100 101
        counter = Counter(len(params.blocks[iblock]), 50, 250)
        initargs = (models, counter)
102 103 104 105

        self._parallel_job(worker_sed, params.blocks[iblock], initargs,
                           init_worker_sed, conf['cores'])

106 107 108
        # Print the final value as it may not otherwise be printed
        counter.pprint(len(params.blocks[iblock]))

109 110 111 112 113
        return models

    def _compute_bayes(self, conf, obs, models):
        results = ResultsManager(models)

114
        initargs = (models, results, Counter(len(obs)))
115 116 117 118 119 120
        self._parallel_job(worker_analysis, obs, initargs,
                           init_worker_analysis, conf['cores'])

        return results

    def _compute_best(self, conf, obs, params, results):
121
        initargs = (conf, params, obs, results, Counter(len(obs)))
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
        self._parallel_job(worker_bestfit, obs, initargs,
                           init_worker_bestfit, conf['cores'])

    def _parallel_job(self, worker, items, initargs, initializer, ncores):
        if ncores == 1:  # Do not create a new process
            initializer(*initargs)
            for idx, item in enumerate(items):
                worker(idx, item)
        else:  # run in parallel
            with mp.Pool(processes=ncores, initializer=initializer,
                         initargs=initargs) as pool:
                pool.starmap(worker, enumerate(items))

    def _compute(self, conf, obs, params):
        results = []
        nblocks = len(params.blocks)
        for iblock in range(nblocks):
            print('\nProcessing block {}/{}...'.format(iblock + 1, nblocks))
            # We keep the models if there is only one block. This allows to
            # avoid recomputing the models when we do a mock analysis
            if not hasattr(self, '_models'):
                print("\nComputing models ...")
                models = self._compute_models(conf, obs, params, iblock)
                if nblocks == 1:
                    self._models = models
            else:
                print("\nLoading precomputed models")
                models = self._models

            print("\nEstimating the physical properties ...")
            result = self._compute_bayes(conf, obs, models)
            results.append(result)
            print("\nBlock processed.")

        print("\nEstimating physical properties on all blocks")
        results = ResultsManager.merge(results)

        print("\nComputing the best fit spectra")
        self._compute_best(conf, obs, params, results)

        return results

164
    def process(self, conf):
165 166
        """Process with the psum analysis.

167 168 169 170 171
        The analysis is done in two steps which can both run on multiple
        processors to run faster. The first step is to compute all the fluxes
        associated with each model as well as ancillary data such as the SED
        information. The second step is to carry out the analysis of each
        object, considering all models at once.
172 173 174

        Parameters
        ----------
175 176
        conf: dictionary
            Contents of pcigale.ini in the form of a dictionary
177 178

        """
179
        np.seterr(invalid='ignore')
180

181 182
        print("Initialising the analysis module... ")

183
        # Rename the output directory if it exists
184
        self.prepare_dirs()
185

186 187 188
        # Store the grid of parameters in a manager to facilitate the
        # computation of the models
        params = ParametersManager(conf)
Médéric Boquien's avatar
Médéric Boquien committed
189

190 191 192
        # Store the observations in a manager which sanitises the data, checks
        # all the required fluxes are present, adding errors if needed,
        # discarding invalid fluxes, etc.
193
        obs = ObservationsManager(conf, params)
194
        obs.save('observations')
195

196 197
        results = self._compute(conf, obs, params)
        results.best.analyse_chi2()
198

199 200
        print("\nSaving the analysis results...")
        results.save("results")
201

202
        if conf['analysis_params']['mock_flag'] is True:
203 204 205
            print("\nAnalysing the mock observations...")

            # For the mock analysis we do not save the ancillary files.
206
            for k in ['best_sed', 'chi2']:
207 208 209 210 211 212 213
                conf['analysis_params']["save_{}".format(k)] = False

            # We replace the observations with a mock catalogue..
            obs.generate_mock(results)
            obs.save('mock_observations')

            results = self._compute(conf, obs, params)
214

215
            print("\nSaving the mock analysis results...")
216
            results.save("results_mock")
217 218

        print("Run completed!")
219

220

221 222
# AnalysisModule to be returned by get_module
Module = PdfAnalysis