workers.py 16.4 KB
Newer Older
1
2
3
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
# Copyright (C) 2013-2014 Institute of Astronomy
4
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
5
6
# Copyright (C) 2014 Yannick Roehlly <yannick@iaora.eu>
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8

9
10
import time

11
import numpy as np
12
13
from scipy import optimize
from scipy.special import erf
14

15
from .utils import (save_best_sed, save_pdf, save_chi2, dchi2_over_ds2)
16
from ...warehouse import SedWarehouse
17

18
19
# Probability threshold: models with a lower probability  are excluded from the
# moments computation.
20
21
MIN_PROBABILITY = 1e-20

22
23
def init_sed(params, filters, analysed, redshifts, fluxes, variables,
             t_begin, n_computed):
24
25
26
27
28
29
    """Initializer of the pool of processes. It is mostly used to convert
    RawArrays into numpy arrays. The latter are defined as global variables to
    be accessible from the workers.

    Parameters
    ----------
30
31
32
33
34
35
    params: ParametersHandler
        Handles the parameters from a 1D index.
    filters: OrderedDict
        Contains filters to compute the fluxes.
    analysed: list
        Variable names to be analysed.
36
37
38
39
40
41
42
43
44
45
46
47
48
    redshifts: RawArray and tuple containing the shape
        Redshifts of individual models. Shared among workers.
    fluxes: RawArray and tuple containing the shape
        Fluxes of individual models. Shared among workers.
    variables: RawArray and tuple containing the shape
        Values of the analysed variables. Shared among workers.
    n_computed: Value
        Number of computed models. Shared among workers.
    t_begin: float
        Time of the beginning of the computation.

    """
    global gbl_model_redshifts, gbl_model_fluxes, gbl_model_variables
49
50
    global gbl_n_computed, gbl_t_begin, gbl_params, gbl_previous_idx
    global gbl_filters, gbl_analysed_variables, gbl_warehouse
51
52
53
54
55
56
57
58
59
60
61
62

    gbl_model_redshifts = np.ctypeslib.as_array(redshifts[0])

    gbl_model_fluxes = np.ctypeslib.as_array(fluxes[0])
    gbl_model_fluxes = gbl_model_fluxes.reshape(fluxes[1])

    gbl_model_variables = np.ctypeslib.as_array(variables[0])
    gbl_model_variables = gbl_model_variables.reshape(variables[1])

    gbl_n_computed = n_computed
    gbl_t_begin = t_begin

63
    gbl_params = params
64

65
66
67
68
69
    gbl_previous_idx = -1

    gbl_filters = filters
    gbl_analysed_variables = analysed

70
    gbl_warehouse = SedWarehouse()
71
72

def init_analysis(params, filters, analysed, redshifts, fluxes, variables,
73
74
                  t_begin, n_computed, analysed_averages, analysed_std,
                  best_fluxes, best_parameters, best_chi2, best_chi2_red, save,
75
                  lim_flag, n_obs):
76
77
78
79
80
81
    """Initializer of the pool of processes. It is mostly used to convert
    RawArrays into numpy arrays. The latter are defined as global variables to
    be accessible from the workers.

    Parameters
    ----------
82
83
84
85
86
87
88
    params: ParametersHandler
        Handles the parameters from a 1D index.
    filters: OrderedDict
        Contains filters to compute the fluxes.
    analysed: list
        Variable names to be analysed
    redshifts: RawArray and tuple containing the shape.
89
90
91
92
93
        Redshifts of individual models. Shared among workers.
    fluxes: RawArray
        Fluxes of individual models. Shared among workers.
    variables: RawArray and tuple containing the shape
        Values of the analysed variables. Shared among workers.
94
95
    t_begin: float
        Time of the beginning of the computation.
96
97
    n_computed: Value
        Number of computed models. Shared among workers.
98
99
100
101
102
103
104
105
106
107
108
109
    analysed_averages: RawArray
        Analysed values for each observation.
    analysed_std: RawArray
        Standard devriation values for each observation.
    best_fluxes: RawArray
        Best fluxes for each observation.
    best_parameters: RawArray
        Best parameters for each observation.
    best_chi2: RawArray
        Best χ² for each observation.
    best_chi2_red: RawArray
        Best reduced χ² for each observation.
110
111
112
113
114
    save: dictionary
        Contains booleans indicating whether we need to save some data related
        to given models.
    n_obs: int
        Number of observations.
115
116

    """
117
118
    init_sed(params, filters, analysed, redshifts, fluxes, variables,
             t_begin, n_computed)
119
120
121
    global gbl_redshifts, gbl_w_redshifts, gbl_analysed_averages
    global gbl_analysed_std, gbl_best_fluxes, gbl_best_parameters
    global gbl_best_chi2, gbl_best_chi2_red, gbl_save, gbl_n_obs
122
    global gbl_lim_flag
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

    gbl_analysed_averages = np.ctypeslib.as_array(analysed_averages[0])
    gbl_analysed_averages = gbl_analysed_averages.reshape(analysed_averages[1])

    gbl_analysed_std = np.ctypeslib.as_array(analysed_std[0])
    gbl_analysed_std = gbl_analysed_std.reshape(analysed_std[1])

    gbl_best_fluxes = np.ctypeslib.as_array(best_fluxes[0])
    gbl_best_fluxes = gbl_best_fluxes.reshape(best_fluxes[1])

    gbl_best_parameters = np.ctypeslib.as_array(best_parameters[0])
    gbl_best_parameters = gbl_best_parameters.reshape(best_parameters[1])

    gbl_best_chi2 = np.ctypeslib.as_array(best_chi2[0])

    gbl_best_chi2_red = np.ctypeslib.as_array(best_chi2_red[0])
139
140
141
142
143

    gbl_redshifts = np.unique(gbl_model_redshifts)
    gbl_w_redshifts = {redshift: gbl_model_redshifts == redshift
                       for redshift in gbl_redshifts}

144
    gbl_save = save
145
146
    gbl_lim_flag = lim_flag

147
    gbl_n_obs = n_obs
148

149

150
151
152
def sed(idx):
    """Worker process to retrieve a SED and affect the relevant data to shared
    RawArrays.
153
154
155

    Parameters
    ----------
156
157
    idx: int
        Index of the model to retrieve its parameters from the global variable
158
159

    """
160
161
162
163
164
165
166
167
    global gbl_previous_idx
    if gbl_previous_idx > -1:
        gbl_warehouse.partial_clear_cache(
            gbl_params.index_module_changed(gbl_previous_idx, idx))
    gbl_previous_idx = idx

    sed = gbl_warehouse.get_sed(gbl_params.modules,
                                gbl_params.from_index(idx))
168

169
    if 'age' in sed.info and sed.info['age'] > sed.info['universe.age']:
170
171
        model_fluxes = -99. * np.ones(len(gbl_filters))
        model_variables = -99. * np.ones(len(gbl_analysed_variables))
172
173
174
    else:
        model_fluxes = np.array([sed.compute_fnu(filter_.trans_table,
                                                 filter_.effective_wavelength)
175
                                 for filter_ in gbl_filters.values()])
176
        model_variables = np.array([sed.info[name]
177
                                    for name in gbl_analysed_variables])
178

179
180
181
182
183
184
185
    gbl_model_redshifts[idx] = sed.info['redshift']
    gbl_model_fluxes[idx, :] = model_fluxes
    gbl_model_variables[idx, :] = model_variables

    with gbl_n_computed.get_lock():
        gbl_n_computed.value += 1
        n_computed = gbl_n_computed.value
186
    if n_computed % 100 == 0 or n_computed == gbl_params.size:
187
        t_elapsed = time.time() - gbl_t_begin
188
        print("{}/{} models computed in {} seconds ({} models/s)".
189
              format(n_computed, gbl_params.size,
190
                     np.around(t_elapsed, decimals=1),
191
192
                     np.around(n_computed/t_elapsed, decimals=1)),
              end="\r")
193

194

195
def analysis(idx, obs):
196
197
198
199
    """Worker process to analyse the PDF and estimate parameters values

    Parameters
    ----------
200
201
202
    idx: int
        Index of the observation. This is necessary to put the computed values
        at the right location in RawArrays
203
204
205
206
    obs: row
        Input data for an individual object

    """
207
208
209
210
    # Tolerance threshold under which any flux or error is considered as 0.
    tolerance = 1e-12

    global gbl_mod_fluxes, gbl_obs_fluxes, gbl_obs_errors
211

Médéric Boquien's avatar
Médéric Boquien committed
212
213
    # We pick the indices of the models with closest redshift assuming we have
    # limited the number of decimals (usually set to 2 decimals).
214
215
    w = np.where(gbl_w_redshifts[gbl_redshifts[np.abs(obs['redshift'] -
                                               gbl_redshifts).argmin()]])
216

217
    # We only keep model with fluxes >= -90. If not => no data
218
    # Probably because age > age of the universe (see function sed(idx) above).
219
    model_fluxes = np.ma.masked_less(gbl_model_fluxes[w[0], :], -90.)
220
    model_variables = np.ma.masked_less(gbl_model_variables[w[0], :], -90.)
221

222
223
    obs_fluxes = np.array([obs[name] for name in gbl_filters])
    obs_errors = np.array([obs[name + "_err"] for name in gbl_filters])
224
    # Some observations may not have flux value in some filters, in
225
226
    # that case the user is asked to put -9999. as value. We mask these
    # values. Note, we must mask obs_fluxes AFTER obs_errors.
227
228
229
    obs_errors = np.ma.masked_where(obs_fluxes < -9990., obs_errors)
    obs_fluxes = np.ma.masked_less(obs_fluxes, -9990.)

230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
    # Some observations may not have flux values in some filter(s), but
    # they can have upper limit(s). To process upper limits, the user
    # is asked to put the upper limit as flux value and an error value with
    # (obs_errors>=-9990. and obs_errors<0.).
    # Next, the user has two options:
    # 1) s/he puts True in the boolean lim_flag
    # and the limits are processed as upper limits below.
    # 2) s/he puts False in the boolean lim_flag
    # and the limits are processed as no-data below.

    lim_flag = gbl_lim_flag*np.logical_and(obs_errors >= -9990.,
                                           obs_errors < tolerance)

    gbl_obs_fluxes = obs_fluxes
    gbl_obs_errors = obs_errors

246
247
248
249
250
251
252
253
    # Normalisation factor to be applied to a model fluxes to best fit
    # an observation fluxes. Normalised flux of the models. χ² and
    # likelihood of the fitting. Reduced χ² (divided by the number of
    # filters to do the fit).
    norm_facts = (
        np.sum(model_fluxes * obs_fluxes / (obs_errors * obs_errors), axis=1) /
        np.sum(model_fluxes * model_fluxes / (obs_errors * obs_errors), axis=1)
    )
254
255
256
257
258
259
260

    if lim_flag.any() is True:
        norm_init = norm_facts
        for imod in range(len(model_fluxes)):
            gbl_mod_fluxes = model_fluxes[imod, :]
            norm_facts[imod] = optimize.newton(dchi2_over_ds2, norm_init[imod],
                                               tol=1e-16)
261
262
263
    norm_model_fluxes = model_fluxes * norm_facts[:, np.newaxis]

    # χ² of the comparison of each model to each observation.
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
    mask_data = np.logical_and(obs_fluxes > tolerance, obs_errors > tolerance)
    if lim_flag.any() is True:
        # This mask selects the filter(s) for which measured fluxes are given
        # i.e., when (obs_flux is >=0. and obs_errors>=0.) and lim_flag=True
        mask_data = (obs_errors >= tolerance)
        # This mask selects the filter(s) for which upper limits are given
        # i.e., when (obs_flux is >=0. (and obs_errors>=-9990., obs_errors<0.))
        # and lim_flag=True
        mask_lim = np.logical_and(obs_errors >= -9990., obs_errors < tolerance)
        chi2_data = np.sum(np.square(
            (obs_fluxes[mask_data]-norm_model_fluxes[:, mask_data]) /
            obs_errors[mask_data]), axis=1)

        chi2_lim = -2. * np.sum(
            np.log(
                np.sqrt(np.pi/2.)*(-obs_errors[mask_lim])*(
                    1.+erf(
                        (obs_fluxes[mask_lim]-norm_model_fluxes[:, mask_lim]) /
                        (np.sqrt(2)*(-obs_errors[mask_lim]))))), axis=1)

        chi2_ = chi2_data + chi2_lim
    else:
        chi2_ = np.sum(np.square(
            (obs_fluxes[mask_data] - norm_model_fluxes[:, mask_data]) /
            obs_errors[mask_data]), axis=1)
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306

    # We use the exponential probability associated with the χ² as
    # likelihood function.
    likelihood = np.exp(-chi2_/2)
    # For the analysis, we consider that the computed models explain
    # each observation. We normalise the likelihood function to have a
    # total likelihood of 1 for each observation.
    likelihood /= np.sum(likelihood)
    # We don't want to take into account the models with a probability
    # less that the threshold.
    likelihood = np.ma.masked_less(likelihood, MIN_PROBABILITY)
    # We re-normalise the likelihood.
    likelihood /= np.sum(likelihood)

    ##################################################################
    # Variable analysis                                              #
    ##################################################################

307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
    # We define the best fitting model for each observation as the one
    # with the least χ².
    best_index = chi2_.argmin()

    # We compute once again the best sed to obtain its info
    global gbl_previous_idx
    if gbl_previous_idx > -1:
        gbl_warehouse.partial_clear_cache(
            gbl_params.index_module_changed(gbl_previous_idx,
                                            w[0][best_index]))
    gbl_previous_idx = w[0][best_index]

    sed = gbl_warehouse.get_sed(gbl_params.modules,
                                gbl_params.from_index([w[0][best_index]]))

    # We correct the mass-dependent parameters
    for key in sed.mass_proportional_info:
        sed.info[key] *= norm_facts[best_index]
325
326
327
    for index, variable in enumerate(gbl_analysed_variables):
        if variable in sed.mass_proportional_info:
            model_variables[:, index] *= norm_facts
328

329
    # We compute the weighted average and standard deviation using the
330
    # likelihood as weight.
331
332
    analysed_averages = np.empty(len(gbl_analysed_variables))
    analysed_std = np.empty_like(analysed_averages)
333
    values = np.ma.masked_where(model_variables==-99., model_variables)
334
335
336
337
338

    pdf_binsize = np.empty_like(analysed_averages)
    min_hist = np.empty_like(analysed_averages)
    max_hist = np.empty_like(analysed_averages)

339
    Npdf = 100.
340
341
342
343
344
    pdf_binsize = np.empty_like(analysed_averages)
    min_hist = np.empty_like(analysed_averages)
    max_hist = np.empty_like(analysed_averages)
    pdf_prob = np.zeros((Npdf, len(analysed_averages)))
    pdf_grid = np.zeros((Npdf, len(analysed_averages)))
345
346
347
    var = np.empty((Npdf, len(analysed_averages)))
    pdf = np.empty((Npdf, len(analysed_averages)))

348
349
    # TODO : could we simplify and/or optimize the two loops below?
    for par, val in enumerate(analysed_averages):
350
351
        min_hist[par] = np.min(values[:,par])
        max_hist[par] = np.max(values[:,par])
352
        
353
    for i, val in enumerate(analysed_averages):
354
        if min_hist[i] == max_hist[i]:
355
356
357
358
            pdf_grid = max_hist[i]
            pdf_prob = 1.
            analysed_averages[i] = model_variables[0, i]
            analysed_std[i] = 0.
359
360
            # If there is only one value, then the histogram has only one bin
            pdf_binsize[i] = -1.
361
362
363

            var[:, i] = max_hist[i]
            pdf[:, i] = 1.
364
        else:
365
            pdf_binsize[i] = (max_hist[i] - min_hist[i]) / Npdf
366
            pdf_prob, pdf_grid = np.histogram(model_variables[:, i],
367
                                              Npdf,
368
369
                                              (min_hist[i], max_hist[i]),
                                              weights=likelihood, density=True)
370
371
372
373
374
            pdf_x = (pdf_grid[1:]+pdf_grid[:-1])/2
            pdf_y = pdf_x * pdf_prob
            analysed_averages[i] = np.sum(pdf_y) / np.sum(pdf_prob)
            analysed_std[i] = np.sqrt(
                                 np.sum(
375
                                    np.square(pdf_x-analysed_averages[i]) * pdf_prob
376
377
378
                                       ) / np.sum(pdf_prob)
                                     )
            analysed_std[i] = max(0.05*analysed_averages[i], analysed_std[i])
379
380
381

            var[:, i] = np.linspace(min_hist[i], max_hist[i], Npdf)
            pdf[:, i] = np.interp(var[:, i], pdf_x, pdf_prob)
382

383

384
385
386
387
388
389
390
391
392
    # TODO Merge with above computation after checking it is fine with a MA.
    gbl_analysed_averages[idx, :] = analysed_averages
    gbl_analysed_std[idx, :] = analysed_std

    gbl_best_fluxes[idx, :] = norm_model_fluxes[best_index, :]
    gbl_best_parameters[idx, :] = list(sed.info.values())
    gbl_best_chi2[idx] = chi2_[best_index]
    gbl_best_chi2_red[idx] = chi2_[best_index] / obs_fluxes.count()

393
    if gbl_save['best_sed']:
394
        save_best_sed(obs['id'], sed, norm_facts[best_index])
395
396
    if gbl_save['chi2']:
        save_chi2(obs['id'], gbl_analysed_variables, model_variables, chi2_ /
397
                  obs_fluxes.count())
398
    if gbl_save['pdf']:
399
        save_pdf(obs['id'], gbl_analysed_variables, var, pdf)
400

401
402
403
    with gbl_n_computed.get_lock():
        gbl_n_computed.value += 1
        n_computed = gbl_n_computed.value
404
405
406
407
408
    t_elapsed = time.time() - gbl_t_begin
    print("{}/{} objects analysed in {} seconds ({} objects/s)".
            format(n_computed, gbl_n_obs, np.around(t_elapsed, decimals=1),
                    np.around(n_computed/t_elapsed, decimals=2)),
            end="\r")
409