workers.py 12.6 KB
Newer Older
1 2 3
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
# Copyright (C) 2013-2014 Institute of Astronomy
4
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
5 6
# Copyright (C) 2014 Yannick Roehlly <yannick@iaora.eu>
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8

9 10
import time

11
from astropy.cosmology import WMAP7 as cosmology
12
import numpy as np
13
import scipy.stats as st
14

15 16
from .utils import (save_best_sed, save_pdf, save_chi2, compute_chi2,
                    weighted_param)
17
from ...warehouse import SedWarehouse
18

Médéric Boquien's avatar
Médéric Boquien committed
19

20
def init_sed(params, filters, analysed, fluxes, variables, t_begin, n_computed):
21 22 23 24 25 26
    """Initializer of the pool of processes. It is mostly used to convert
    RawArrays into numpy arrays. The latter are defined as global variables to
    be accessible from the workers.

    Parameters
    ----------
27 28
    params: ParametersHandler
        Handles the parameters from a 1D index.
29 30
    filters: List
        Contains the names of the filters to compute the fluxes.
31 32
    analysed: list
        Variable names to be analysed.
33 34 35 36 37 38 39 40 41 42
    fluxes: RawArray and tuple containing the shape
        Fluxes of individual models. Shared among workers.
    variables: RawArray and tuple containing the shape
        Values of the analysed variables. Shared among workers.
    n_computed: Value
        Number of computed models. Shared among workers.
    t_begin: float
        Time of the beginning of the computation.

    """
43 44 45
    global gbl_model_fluxes, gbl_model_variables, gbl_n_computed, gbl_t_begin
    global gbl_params, gbl_previous_idx, gbl_filters, gbl_analysed_variables
    global gbl_warehouse
46 47 48 49 50 51 52 53 54 55

    gbl_model_fluxes = np.ctypeslib.as_array(fluxes[0])
    gbl_model_fluxes = gbl_model_fluxes.reshape(fluxes[1])

    gbl_model_variables = np.ctypeslib.as_array(variables[0])
    gbl_model_variables = gbl_model_variables.reshape(variables[1])

    gbl_n_computed = n_computed
    gbl_t_begin = t_begin

56
    gbl_params = params
57

58 59 60 61 62
    gbl_previous_idx = -1

    gbl_filters = filters
    gbl_analysed_variables = analysed

63
    gbl_warehouse = SedWarehouse()
64

Médéric Boquien's avatar
Médéric Boquien committed
65

66
def init_analysis(params, filters, analysed, z, fluxes, variables,
67 68
                  t_begin, n_computed, analysed_averages, analysed_std,
                  best_fluxes, best_parameters, best_chi2, best_chi2_red, save,
69
                  lim_flag, n_obs):
70 71 72 73 74 75
    """Initializer of the pool of processes. It is mostly used to convert
    RawArrays into numpy arrays. The latter are defined as global variables to
    be accessible from the workers.

    Parameters
    ----------
76 77
    params: ParametersHandler
        Handles the parameters from a 1D index.
78
    filters: list
79 80 81
        Contains filters to compute the fluxes.
    analysed: list
        Variable names to be analysed
82
    z: RawArray and tuple containing the shape.
83 84 85 86 87
        Redshifts of individual models. Shared among workers.
    fluxes: RawArray
        Fluxes of individual models. Shared among workers.
    variables: RawArray and tuple containing the shape
        Values of the analysed variables. Shared among workers.
88 89
    t_begin: float
        Time of the beginning of the computation.
90 91
    n_computed: Value
        Number of computed models. Shared among workers.
92 93 94 95 96 97 98 99 100 101 102 103
    analysed_averages: RawArray
        Analysed values for each observation.
    analysed_std: RawArray
        Standard devriation values for each observation.
    best_fluxes: RawArray
        Best fluxes for each observation.
    best_parameters: RawArray
        Best parameters for each observation.
    best_chi2: RawArray
        Best χ² for each observation.
    best_chi2_red: RawArray
        Best reduced χ² for each observation.
104 105 106 107 108
    save: dictionary
        Contains booleans indicating whether we need to save some data related
        to given models.
    n_obs: int
        Number of observations.
109 110

    """
111 112
    init_sed(params, filters, analysed, fluxes, variables, t_begin, n_computed)
    global gbl_z, gbl_analysed_averages, gbl_analysed_std
113 114
    global gbl_best_fluxes, gbl_best_parameters, gbl_best_chi2
    global gbl_best_chi2_red, gbl_save, gbl_n_obs, gbl_lim_flag, gbl_keys
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130

    gbl_analysed_averages = np.ctypeslib.as_array(analysed_averages[0])
    gbl_analysed_averages = gbl_analysed_averages.reshape(analysed_averages[1])

    gbl_analysed_std = np.ctypeslib.as_array(analysed_std[0])
    gbl_analysed_std = gbl_analysed_std.reshape(analysed_std[1])

    gbl_best_fluxes = np.ctypeslib.as_array(best_fluxes[0])
    gbl_best_fluxes = gbl_best_fluxes.reshape(best_fluxes[1])

    gbl_best_parameters = np.ctypeslib.as_array(best_parameters[0])
    gbl_best_parameters = gbl_best_parameters.reshape(best_parameters[1])

    gbl_best_chi2 = np.ctypeslib.as_array(best_chi2[0])

    gbl_best_chi2_red = np.ctypeslib.as_array(best_chi2_red[0])
131

132
    gbl_z = z
133

134
    gbl_save = save
135 136
    gbl_lim_flag = lim_flag

137
    gbl_n_obs = n_obs
138
    gbl_keys = None
139

Médéric Boquien's avatar
Médéric Boquien committed
140

141 142 143
def sed(idx):
    """Worker process to retrieve a SED and affect the relevant data to shared
    RawArrays.
144 145 146

    Parameters
    ----------
147 148
    idx: int
        Index of the model to retrieve its parameters from the global variable
149 150

    """
151 152 153 154 155 156 157 158
    global gbl_previous_idx
    if gbl_previous_idx > -1:
        gbl_warehouse.partial_clear_cache(
            gbl_params.index_module_changed(gbl_previous_idx, idx))
    gbl_previous_idx = idx

    sed = gbl_warehouse.get_sed(gbl_params.modules,
                                gbl_params.from_index(idx))
159

160
    if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']:
161 162 163
        gbl_model_fluxes[idx, :] = np.full(len(gbl_filters), np.nan)
        gbl_model_variables[idx, :] = np.full(len(gbl_analysed_variables),
                                              np.nan)
164
    else:
165 166 167 168 169
        gbl_model_fluxes[idx, :] = np.array([sed.compute_fnu(filter_) for
                                             filter_ in gbl_filters])
        gbl_model_variables[idx, :] = np.array([sed.info[name]
                                                for name in
                                                gbl_analysed_variables])
170

171 172 173
    with gbl_n_computed.get_lock():
        gbl_n_computed.value += 1
        n_computed = gbl_n_computed.value
174
    if n_computed % 250 == 0 or n_computed == gbl_params.size:
175
        t_elapsed = time.time() - gbl_t_begin
176
        print("{}/{} models computed in {} seconds ({} models/s)".
177
              format(n_computed, gbl_params.size,
178
                     np.around(t_elapsed, decimals=1),
179
                     np.around(n_computed/t_elapsed, decimals=1)),
180
              end="\n" if n_computed == gbl_params.size else "\r")
181

182

183
def analysis(idx, obs):
184 185 186 187
    """Worker process to analyse the PDF and estimate parameters values

    Parameters
    ----------
188 189 190
    idx: int
        Index of the observation. This is necessary to put the computed values
        at the right location in RawArrays
191 192 193 194
    obs: row
        Input data for an individual object

    """
195 196
    np.seterr(invalid='ignore')

197 198
    obs_fluxes = np.array([obs[name] for name in gbl_filters])
    obs_errors = np.array([obs[name + "_err"] for name in gbl_filters])
199
    obs_z = obs['redshift']
200
    nobs = np.where(np.isfinite(obs_fluxes))[0].size
201

202
    if obs_z >= 0.:
203 204
        # We pick the the models with the closest redshift using a slice to
        # work on views of the arrays and not on copies to save on RAM.
205 206 207 208 209 210 211 212 213 214 215 216 217
        idx_z = np.abs(obs_z - gbl_z).argmin()
        model_z = gbl_z[idx_z]
        wz = slice(idx_z, None, gbl_z.size)

        # The mass-dependent physical properties are computed assuming the
        # redshift of the model. However because we round the observed redshifts
        # to two decimals, there can be a difference of 0.005 in redshift
        # between the models and the actual observation. At low redshift, this
        # can cause a discrepancy in the mass-dependent physical properties:
        # ~0.35 dex at z=0.010 vs 0.015 for instance. Therefore we correct these
        # physical quantities by multiplying them by corr_dz.
        corr_dz = (cosmology.luminosity_distance(obs_z).value /
                   cosmology.luminosity_distance(model_z).value)**2.
218 219
    else:  # We do not know the redshift so we use the full grid
        wz = slice(0, None, 1)
220
        corr_dz = 1.
221

222 223
    chi2, scaling = compute_chi2(gbl_model_fluxes[wz, :], obs_fluxes,
                                 obs_errors, gbl_lim_flag)
224 225 226 227 228

    ##################################################################
    # Variable analysis                                              #
    ##################################################################

229 230 231
    # We select only models that have at least 0.1% of the probability of
    # the best model to reproduce the observations. It helps eliminating
    # very bad models.
232
    maxchi2 = st.chi2.isf(st.chi2.sf(np.nanmin(chi2), nobs-1) * 1e-3, nobs-1)
233 234 235
    wlikely = np.where(chi2 < maxchi2)

    if wlikely[0].size == 0:
236 237 238
        # It sometimes happen because models are older than the Universe's age
        print("No suitable model found for the object {}. One possible origin "
              "is that models are older than the Universe.".format(obs['id']))
239 240 241 242 243 244
        gbl_analysed_averages[idx, :] = np.nan
        gbl_analysed_std[idx, :] = np.nan
        gbl_best_fluxes[idx, :] = np.nan
        gbl_best_parameters[idx, :] = np.nan
        gbl_best_chi2[idx] = np.nan
        gbl_best_chi2_red[idx] = np.nan
245
    else:
246 247
        # We use the exponential probability associated with the χ² as
        # likelihood function.
248
        likelihood = np.exp(-chi2[wlikely]/2)
249

250 251
        # We define the best fitting model for each observation as the one
        # with the least χ².
252
        best_index_z = np.nanargmin(chi2)  # index for models at given z
253
        best_index = wz.start + best_index_z * wz.step  # index for all models
254

Médéric Boquien's avatar
Médéric Boquien committed
255
        # We compute once again the best sed to obtain its info
256 257 258 259
        global gbl_previous_idx
        if gbl_previous_idx > -1:
            gbl_warehouse.partial_clear_cache(
                gbl_params.index_module_changed(gbl_previous_idx,
260 261
                                                best_index))
        gbl_previous_idx = best_index
262

263
        sed = gbl_warehouse.get_sed(gbl_params.modules,
264
                                    gbl_params.from_index(best_index))
265

Médéric Boquien's avatar
Médéric Boquien committed
266
        # We correct the mass-dependent parameters
267
        for key in sed.mass_proportional_info:
268
            sed.info[key] *= scaling[best_index_z] * corr_dz
269

Médéric Boquien's avatar
Médéric Boquien committed
270 271
        # We compute the weighted average and standard deviation using the
        # likelihood as weight.
272
        for i, variable in enumerate(gbl_analysed_variables):
273
            if variable.endswith('_log'):
274
                variable = variable[:-4]
275 276 277 278 279 280
                _ = np.log10
                maxstd = lambda mean, std: max(0.02, std)
            else:
                _ = lambda x: x
                maxstd = lambda mean, std: max(0.05 * mean, std)

281
            if variable in sed.mass_proportional_info:
282
                mean, std = weighted_param(_(gbl_model_variables[wz, i][wlikely]
283 284
                                           * scaling[wlikely] * corr_dz),
                                           likelihood)
285
            else:
286
                mean, std = weighted_param(_(gbl_model_variables[wz, i][wlikely]),
287
                                           likelihood)
288

289
            gbl_analysed_averages[idx, i] = mean
290
            gbl_analysed_std[idx, i] = maxstd(mean, std)
291

292 293
        gbl_best_fluxes[idx, :] = gbl_model_fluxes[best_index, :] \
            * scaling[best_index_z]
294

295
        global gbl_keys
296 297 298 299
        if gbl_keys is None:
            gbl_keys = list(sed.info.keys())
            gbl_keys.sort()
        gbl_best_parameters[idx, :] = np.array([sed.info[k] for k in gbl_keys])
300 301
        gbl_best_chi2[idx] = chi2[best_index_z]
        gbl_best_chi2_red[idx] = chi2[best_index_z] / (nobs - 1)
302

303
        if gbl_save['best_sed']:
304
            save_best_sed(obs['id'], sed, scaling[best_index_z])
305
        if gbl_save['chi2']:
306 307
            save_chi2(obs['id'], gbl_analysed_variables,
                      sed.mass_proportional_info, gbl_model_variables[wz, :],
308
                      scaling * corr_dz, chi2 / (nobs - 1))
309
        if gbl_save['pdf']:
310 311
            save_pdf(obs['id'], gbl_analysed_variables,
                     sed.mass_proportional_info, gbl_model_variables[wz, :],
312
                     scaling * corr_dz, likelihood, wlikely)
313

314 315 316
    with gbl_n_computed.get_lock():
        gbl_n_computed.value += 1
        n_computed = gbl_n_computed.value
317 318
    t_elapsed = time.time() - gbl_t_begin
    print("{}/{} objects analysed in {} seconds ({} objects/s)".
Médéric Boquien's avatar
Médéric Boquien committed
319 320
          format(n_computed, gbl_n_obs, np.around(t_elapsed, decimals=1),
                 np.around(n_computed/t_elapsed, decimals=2)),
321
          end="\n" if n_computed == gbl_n_obs else "\r")