workers.py 10.7 KB
Newer Older
1 2 3
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
# Copyright (C) 2013-2014 Institute of Astronomy
4
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
5 6
# Copyright (C) 2014 Yannick Roehlly <yannick@iaora.eu>
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8

9
from copy import deepcopy
10 11
import time

12
import numpy as np
13

14
from ..utils import nothread
15
from .utils import save_chi2, compute_corr_dz, compute_chi2, weighted_param
16
from ...warehouse import SedWarehouse
17

Médéric Boquien's avatar
Médéric Boquien committed
18

19 20
def init_sed(models, t0, ncomputed):
    """Initializer of the pool of processes to share variables between workers.
21 22 23

    Parameters
    ----------
24 25 26
    models: ModelsManagers
        Manages the storage of the computed models (fluxes and properties).
    t0: float
27
        Time of the beginning of the computation.
28 29
    ncomputed: Value
        Number of computed models. Shared among workers.
30 31

    """
32 33
    global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_obs
    global gbl_properties, gbl_t0, gbl_ncomputed
34

35 36 37 38
    # Limit the number of threads to 1 if we use MKL in order to limit the
    # oversubscription of the CPU/RAM.
    nothread()

39
    gbl_previous_idx = -1
40
    gbl_warehouse = SedWarehouse()
41

42 43 44 45 46 47
    gbl_models = models
    gbl_obs = models.obs
    gbl_properties = [prop[:-4] if prop.endswith('_log') else prop for prop in
                      models.propertiesnames]
    gbl_t0 = t0
    gbl_ncomputed = ncomputed
Médéric Boquien's avatar
Médéric Boquien committed
48

 Hector Salas's avatar
Hector Salas committed
49

50 51
def init_analysis(models, results, t0, ncomputed):
    """Initializer of the pool of processes to share variables between workers.
52 53 54

    Parameters
    ----------
55 56 57 58 59
    models: ModelsManagers
        Manages the storage of the computed models (fluxes and properties).
    results: ResultsManager
        Contains the estimates and errors on the properties.
    t0: float
60
        Time of the beginning of the computation.
61
    ncomputed: Value
62 63 64
        Number of computed models. Shared among workers.

    """
65
    global gbl_models, gbl_obs, gbl_results, gbl_t0, gbl_ncomputed
66

67
    gbl_models = models
68
    gbl_obs = models.obs
69 70 71
    gbl_results = results
    gbl_t0 = t0
    gbl_ncomputed = ncomputed
72 73


74 75
def init_bestfit(conf, params, observations, results, t0, ncomputed):
    """Initializer of the pool of processes to share variables between workers.
76

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
    Parameters
    ----------
    conf: dict
        Contains the pcigale.ini configuration.
    params: ParametersManager
        Manages the parameters from a 1D index.
    observations: astropy.Table
        Contains the observations including the filter names.
    ncomputed: Value
        Number of computed models. Shared among workers.
    t0: float
        Time of the beginning of the computation.
    results: ResultsManager
        Contains the estimates and errors on the properties.
    offset: integer
        Offset of the block to retrieve the global model index.
93

94 95 96
    """
    global gbl_previous_idx, gbl_warehouse, gbl_conf, gbl_params, gbl_obs
    global gbl_results, gbl_t0, gbl_ncomputed
97

98 99
    gbl_previous_idx = -1
    gbl_warehouse = SedWarehouse()
100

101 102 103 104 105 106
    gbl_conf = conf
    gbl_params = params
    gbl_obs = observations
    gbl_results = results
    gbl_t0 = t0
    gbl_ncomputed = ncomputed
Médéric Boquien's avatar
Médéric Boquien committed
107

 Hector Salas's avatar
Hector Salas committed
108

109 110 111
def sed(idx, midx):
    """Worker process to retrieve a SED and affect the relevant data to an
    instance of ModelsManager.
112 113 114

    Parameters
    ----------
115
    idx: int
116 117 118
        Index of the model within the current block of models.
    midx: int
        Global index of the model.
119 120

    """
121 122 123
    global gbl_previous_idx
    if gbl_previous_idx > -1:
        gbl_warehouse.partial_clear_cache(
124 125 126 127
            gbl_models.params.index_module_changed(gbl_previous_idx, midx))
    gbl_previous_idx = midx
    sed = gbl_warehouse.get_sed(gbl_models.params.modules,
                                gbl_models.params.from_index(midx))
128

129
    if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']:
130 131
        gbl_models.fluxes[:, idx] = np.full(len(gbl_obs.bands), np.nan)
        gbl_models.properties[:, idx] = np.full(len(gbl_properties), np.nan)
132 133 134
        gbl_models.intprops[:, idx] = np.full(len(gbl_obs.intprops), np.nan)
        gbl_models.extprops[:, idx] = np.full(len(gbl_obs.extprops), np.nan)

135
    else:
136 137 138 139
        gbl_models.fluxes[:, idx] = [sed.compute_fnu(filter_)
                                     for filter_ in gbl_obs.bands]
        gbl_models.properties[:, idx] = [sed.info[name]
                                         for name in gbl_properties]
140 141 142 143
        gbl_models.intprops[:, idx] = [sed.info[name]
                                       for name in gbl_obs.intprops]
        gbl_models.extprops[:, idx] = [sed.info[name]
                                       for name in gbl_obs.extprops]
144 145 146 147 148 149
    with gbl_ncomputed.get_lock():
        gbl_ncomputed.value += 1
        ncomputed = gbl_ncomputed.value
    nmodels = len(gbl_models.block)
    if ncomputed % 250 == 0 or ncomputed == nmodels:
        dt = time.time() - gbl_t0
150
        print("{}/{} models computed in {:.1f} seconds ({:.1f} models/s)".
151 152
              format(ncomputed, nmodels, dt, ncomputed/dt),
              end="\n" if ncomputed == nmodels else "\r")
153

 Hector Salas's avatar
Hector Salas committed
154

155
def analysis(idx, obs):
156 157
    """Worker process to analyse the PDF and estimate parameters values and
    store them in an instance of ResultsManager.
158 159 160

    Parameters
    ----------
161 162
    idx: int
        Index of the observation. This is necessary to put the computed values
163
        at the right location in the ResultsManager.
164 165 166 167
    obs: row
        Input data for an individual object

    """
168 169
    np.seterr(invalid='ignore')

170
    if obs.redshift >= 0.:
171 172
        # We pick the the models with the closest redshift using a slice to
        # work on views of the arrays and not on copies to save on RAM.
173 174 175
        z = np.array(
            gbl_models.conf['sed_modules_params']['redshifting']['redshift'])
        wz = slice(np.abs(obs.redshift-z).argmin(), None, z.size)
176
        corr_dz = compute_corr_dz(z[wz.start], obs.distance)
177 178
    else:  # We do not know the redshift so we use the full grid
        wz = slice(0, None, 1)
179
        corr_dz = 1.
180

181 182
    chi2, scaling = compute_chi2(gbl_models.fluxes[:, wz],
                                 gbl_models.intprops[:, wz],
183
                                 gbl_models.extprops[:, wz], obs, corr_dz,
184
                                 gbl_models.conf['analysis_params']['lim_flag'])
185

186
    if np.any(np.isfinite(chi2)):
187 188
        # We use the exponential probability associated with the χ² as
        # likelihood function.
189
        likelihood = np.exp(-chi2 / 2.)
190 191 192 193
        wlikely = np.where(np.isfinite(likelihood))
        # If all the models are valid, it is much more efficient to use a slice
        if likelihood.size == wlikely[0].size:
            wlikely = slice(None, None)
194
        gbl_results.bayes.weights[idx] = np.nansum(likelihood)
195

Médéric Boquien's avatar
Médéric Boquien committed
196 197
        # We compute the weighted average and standard deviation using the
        # likelihood as weight.
198
        for i, variable in enumerate(gbl_results.bayes.propertiesnames):
199
            if variable.endswith('_log'):
200
                variable = variable[:-4]
201 202 203 204
                _ = np.log10
            else:
                _ = lambda x: x

205 206
            if variable in gbl_results.bayes.massproportional:
                values = _(gbl_models.properties[i, wz] * scaling * corr_dz)
207
            else:
208
                values = _(gbl_models.properties[i, wz])
209

210
            mean, std = weighted_param(values[wlikely], likelihood[wlikely])
211
            gbl_results.bayes.means[idx, i] = mean
212
            gbl_results.bayes.errors[idx, i] = std
213 214 215 216 217 218 219 220 221
            if gbl_models.conf['analysis_params']['save_chi2'] is True:
                save_chi2(obs, variable, gbl_models, chi2, values)
        best_idx_z = np.nanargmin(chi2)
        gbl_results.best.chi2[idx] = chi2[best_idx_z]
        gbl_results.best.index[idx] = (wz.start + best_idx_z*wz.step +
                                       gbl_models.block.start)
    else:
        # It sometimes happens because models are older than the Universe's age
        print("No suitable model found for the object {}. One possible origin "
222
              "is that models are older than the Universe.".format(obs.id))
223 224 225 226 227

    with gbl_ncomputed.get_lock():
        gbl_ncomputed.value += 1
        ncomputed = gbl_ncomputed.value
    dt = time.time() - gbl_t0
228
    print("{}/{} objects analysed in {:.1f} seconds ({:.2f} objects/s)".
229 230 231
          format(ncomputed, len(gbl_models.obs), dt, ncomputed/dt),
          end="\n" if ncomputed == len(gbl_models.obs) else "\r")

 Hector Salas's avatar
Hector Salas committed
232

233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
def bestfit(oidx, obs):
    """Worker process to compute and save the best fit.

    Parameters
    ----------
    oidx: int
        Index of the observation. This is necessary to put the computed values
        at the right location in the ResultsManager.
    obs: row
        Input data for an individual object

    """
    np.seterr(invalid='ignore')

    best_index = int(gbl_results.best.index[oidx])
    global gbl_previous_idx
    if gbl_previous_idx > -1:
        gbl_warehouse.partial_clear_cache(
            gbl_params.index_module_changed(gbl_previous_idx, best_index))
    gbl_previous_idx = best_index
253 254 255 256

    # We compute the model at the exact redshift not to have to correct for the
    # difference between the object and the grid redshifts.
    params = deepcopy(gbl_params.from_index(best_index))
257
    if obs.redshift >= 0.:
258
        params[gbl_params.modules.index('redshifting')]['redshift'] = obs.redshift
259
    sed = gbl_warehouse.get_sed(gbl_params.modules, params)
260 261

    fluxes = np.array([sed.compute_fnu(filt) for filt in gbl_obs.bands])
262 263 264
    intprops = np.array([sed.info[prop] for prop in gbl_obs.intprops])
    extprops = np.array([sed.info[prop] for prop in gbl_obs.extprops])

265
    corr_dz = compute_corr_dz(obs.redshift, obs.distance)
266
    _, scaling = compute_chi2(fluxes[:, None], intprops[:, None],
267
                              extprops[:, None],  obs, corr_dz,
268
                              gbl_conf['analysis_params']['lim_flag'])
269

270 271 272 273
    gbl_results.best.properties[oidx, :] = [sed.info[k] for k in
                                            gbl_results.best.propertiesnames]
    iprop = [i for i, k in enumerate(gbl_results.best.propertiesnames)
             if k in gbl_results.best.massproportional]
274
    gbl_results.best.properties[oidx, iprop] *= scaling * corr_dz
275 276 277
    gbl_results.best.fluxes[oidx, :] = fluxes * scaling

    if gbl_conf['analysis_params']["save_best_sed"]:
278
        sed.to_fits('out/{}'.format(obs.id), scaling)
279 280 281 282 283 284 285 286

    with gbl_ncomputed.get_lock():
        gbl_ncomputed.value += 1
        ncomputed = gbl_ncomputed.value
    dt = time.time() - gbl_t0
    print("{}/{} best fit spectra computed in {:.1f} seconds ({:.2f} objects/s)".
          format(ncomputed, len(gbl_obs), dt, ncomputed/dt), end="\n" if
          ncomputed == len(gbl_obs) else "\r")