workers.py 10.5 KB
Newer Older
1
2
3
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
# Copyright (C) 2013-2014 Institute of Astronomy
4
# Copyright (C) 2014 Laboratoire d'Astrophysique de Marseille, AMU
5
6
# Copyright (C) 2014 Yannick Roehlly <yannick@iaora.eu>
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
7
# Author: Yannick Roehlly, Médéric Boquien & Denis Burgarella
8

9
from copy import deepcopy
10
11
import time

12
import numpy as np
13

14
from ..utils import nothread
15
from .utils import save_chi2, compute_corr_dz, compute_chi2, weighted_param
16
from ...warehouse import SedWarehouse
17

Médéric Boquien's avatar
Médéric Boquien committed
18

19
20
def init_sed(models, t0, ncomputed):
    """Initializer of the pool of processes to share variables between workers.
21
22
23

    Parameters
    ----------
24
25
26
    models: ModelsManagers
        Manages the storage of the computed models (fluxes and properties).
    t0: float
27
        Time of the beginning of the computation.
28
29
    ncomputed: Value
        Number of computed models. Shared among workers.
30
31

    """
32
33
    global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_obs
    global gbl_properties, gbl_t0, gbl_ncomputed
34

35
36
37
38
    # Limit the number of threads to 1 if we use MKL in order to limit the
    # oversubscription of the CPU/RAM.
    nothread()

39
    gbl_previous_idx = -1
40
    gbl_warehouse = SedWarehouse()
41

42
43
44
45
46
47
    gbl_models = models
    gbl_obs = models.obs
    gbl_properties = [prop[:-4] if prop.endswith('_log') else prop for prop in
                      models.propertiesnames]
    gbl_t0 = t0
    gbl_ncomputed = ncomputed
Médéric Boquien's avatar
Médéric Boquien committed
48

 Hector Salas's avatar
Hector Salas committed
49

50
51
def init_analysis(models, results, t0, ncomputed):
    """Initializer of the pool of processes to share variables between workers.
52
53
54

    Parameters
    ----------
55
56
57
58
59
    models: ModelsManagers
        Manages the storage of the computed models (fluxes and properties).
    results: ResultsManager
        Contains the estimates and errors on the properties.
    t0: float
60
        Time of the beginning of the computation.
61
    ncomputed: Value
62
63
64
        Number of computed models. Shared among workers.

    """
65
    global gbl_models, gbl_obs, gbl_results, gbl_t0, gbl_ncomputed
66

67
    gbl_models = models
68
    gbl_obs = models.obs
69
70
71
    gbl_results = results
    gbl_t0 = t0
    gbl_ncomputed = ncomputed
72
73


74
75
def init_bestfit(conf, params, observations, results, t0, ncomputed):
    """Initializer of the pool of processes to share variables between workers.
76

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
    Parameters
    ----------
    conf: dict
        Contains the pcigale.ini configuration.
    params: ParametersManager
        Manages the parameters from a 1D index.
    observations: astropy.Table
        Contains the observations including the filter names.
    ncomputed: Value
        Number of computed models. Shared among workers.
    t0: float
        Time of the beginning of the computation.
    results: ResultsManager
        Contains the estimates and errors on the properties.
    offset: integer
        Offset of the block to retrieve the global model index.
93

94
95
96
    """
    global gbl_previous_idx, gbl_warehouse, gbl_conf, gbl_params, gbl_obs
    global gbl_results, gbl_t0, gbl_ncomputed
97

98
99
    gbl_previous_idx = -1
    gbl_warehouse = SedWarehouse()
100

101
102
103
104
105
106
    gbl_conf = conf
    gbl_params = params
    gbl_obs = observations
    gbl_results = results
    gbl_t0 = t0
    gbl_ncomputed = ncomputed
Médéric Boquien's avatar
Médéric Boquien committed
107

 Hector Salas's avatar
Hector Salas committed
108

109
110
111
def sed(idx, midx):
    """Worker process to retrieve a SED and affect the relevant data to an
    instance of ModelsManager.
112
113
114

    Parameters
    ----------
115
    idx: int
116
117
118
        Index of the model within the current block of models.
    midx: int
        Global index of the model.
119
120

    """
121
122
123
    global gbl_previous_idx
    if gbl_previous_idx > -1:
        gbl_warehouse.partial_clear_cache(
124
125
126
127
            gbl_models.params.index_module_changed(gbl_previous_idx, midx))
    gbl_previous_idx = midx
    sed = gbl_warehouse.get_sed(gbl_models.params.modules,
                                gbl_models.params.from_index(midx))
128

129
    if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']:
130
131
        gbl_models.fluxes[:, idx] = np.full(len(gbl_obs.bands), np.nan)
        gbl_models.properties[:, idx] = np.full(len(gbl_properties), np.nan)
132
133
134
        gbl_models.intprops[:, idx] = np.full(len(gbl_obs.intprops), np.nan)
        gbl_models.extprops[:, idx] = np.full(len(gbl_obs.extprops), np.nan)

135
    else:
136
137
138
139
        gbl_models.fluxes[:, idx] = [sed.compute_fnu(filter_)
                                     for filter_ in gbl_obs.bands]
        gbl_models.properties[:, idx] = [sed.info[name]
                                         for name in gbl_properties]
140
141
142
143
        gbl_models.intprops[:, idx] = [sed.info[name]
                                       for name in gbl_obs.intprops]
        gbl_models.extprops[:, idx] = [sed.info[name]
                                       for name in gbl_obs.extprops]
144
145
146
147
148
149
    with gbl_ncomputed.get_lock():
        gbl_ncomputed.value += 1
        ncomputed = gbl_ncomputed.value
    nmodels = len(gbl_models.block)
    if ncomputed % 250 == 0 or ncomputed == nmodels:
        dt = time.time() - gbl_t0
150
        print("{}/{} models computed in {:.1f} seconds ({:.1f} models/s)".
151
152
              format(ncomputed, nmodels, dt, ncomputed/dt),
              end="\n" if ncomputed == nmodels else "\r")
153

 Hector Salas's avatar
Hector Salas committed
154

155
def analysis(idx, obs):
156
157
    """Worker process to analyse the PDF and estimate parameters values and
    store them in an instance of ResultsManager.
158
159
160

    Parameters
    ----------
161
162
    idx: int
        Index of the observation. This is necessary to put the computed values
163
        at the right location in the ResultsManager.
164
165
166
167
    obs: row
        Input data for an individual object

    """
168
169
    np.seterr(invalid='ignore')

170
    if obs.redshift >= 0.:
171
172
        # We pick the the models with the closest redshift using a slice to
        # work on views of the arrays and not on copies to save on RAM.
173
174
175
        z = np.array(
            gbl_models.conf['sed_modules_params']['redshifting']['redshift'])
        wz = slice(np.abs(obs.redshift-z).argmin(), None, z.size)
176
        corr_dz = compute_corr_dz(z[wz.start], obs.distance)
177
178
    else:  # We do not know the redshift so we use the full grid
        wz = slice(0, None, 1)
179
        corr_dz = 1.
180

181
182
    chi2, scaling = compute_chi2(gbl_models.fluxes[:, wz],
                                 gbl_models.intprops[:, wz],
183
                                 gbl_models.extprops[:, wz], obs, corr_dz,
184
                                 gbl_models.conf['analysis_params']['lim_flag'])
185

186
    if np.any(np.isfinite(chi2)):
187
188
        # We use the exponential probability associated with the χ² as
        # likelihood function.
189
        likelihood = np.exp(-chi2 / 2.)
190
        gbl_results.bayes.weights[idx] = np.nansum(likelihood)
191

Médéric Boquien's avatar
Médéric Boquien committed
192
193
        # We compute the weighted average and standard deviation using the
        # likelihood as weight.
194
        for i, variable in enumerate(gbl_results.bayes.propertiesnames):
195
            if variable.endswith('_log'):
196
                variable = variable[:-4]
197
198
199
200
                _ = np.log10
            else:
                _ = lambda x: x

201
202
            if variable in gbl_results.bayes.massproportional:
                values = _(gbl_models.properties[i, wz] * scaling * corr_dz)
203
            else:
204
                values = _(gbl_models.properties[i, wz])
205

206
207
            wlikely = np.where(np.isfinite(likelihood))
            mean, std = weighted_param(values[wlikely], likelihood[wlikely])
208
            gbl_results.bayes.means[idx, i] = mean
209
            gbl_results.bayes.errors[idx, i] = std
210
211
212
213
214
215
216
217
218
            if gbl_models.conf['analysis_params']['save_chi2'] is True:
                save_chi2(obs, variable, gbl_models, chi2, values)
        best_idx_z = np.nanargmin(chi2)
        gbl_results.best.chi2[idx] = chi2[best_idx_z]
        gbl_results.best.index[idx] = (wz.start + best_idx_z*wz.step +
                                       gbl_models.block.start)
    else:
        # It sometimes happens because models are older than the Universe's age
        print("No suitable model found for the object {}. One possible origin "
219
              "is that models are older than the Universe.".format(obs.id))
220
221
222
223
224

    with gbl_ncomputed.get_lock():
        gbl_ncomputed.value += 1
        ncomputed = gbl_ncomputed.value
    dt = time.time() - gbl_t0
225
    print("{}/{} objects analysed in {:.1f} seconds ({:.2f} objects/s)".
226
227
228
          format(ncomputed, len(gbl_models.obs), dt, ncomputed/dt),
          end="\n" if ncomputed == len(gbl_models.obs) else "\r")

 Hector Salas's avatar
Hector Salas committed
229

230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def bestfit(oidx, obs):
    """Worker process to compute and save the best fit.

    Parameters
    ----------
    oidx: int
        Index of the observation. This is necessary to put the computed values
        at the right location in the ResultsManager.
    obs: row
        Input data for an individual object

    """
    np.seterr(invalid='ignore')

    best_index = int(gbl_results.best.index[oidx])
    global gbl_previous_idx
    if gbl_previous_idx > -1:
        gbl_warehouse.partial_clear_cache(
            gbl_params.index_module_changed(gbl_previous_idx, best_index))
    gbl_previous_idx = best_index
250
251
252
253

    # We compute the model at the exact redshift not to have to correct for the
    # difference between the object and the grid redshifts.
    params = deepcopy(gbl_params.from_index(best_index))
254
    if obs.redshift >= 0.:
255
        params[gbl_params.modules.index('redshifting')]['redshift'] = obs.redshift
256
    sed = gbl_warehouse.get_sed(gbl_params.modules, params)
257
258

    fluxes = np.array([sed.compute_fnu(filt) for filt in gbl_obs.bands])
259
260
261
    intprops = np.array([sed.info[prop] for prop in gbl_obs.intprops])
    extprops = np.array([sed.info[prop] for prop in gbl_obs.extprops])

262
    corr_dz = compute_corr_dz(obs.redshift, obs.distance)
263
    _, scaling = compute_chi2(fluxes[:, None], intprops[:, None],
264
                              extprops[:, None],  obs, corr_dz,
265
                              gbl_conf['analysis_params']['lim_flag'])
266

267
268
269
270
271

    gbl_results.best.properties[oidx, :] = [sed.info[k] for k in
                                            gbl_results.best.propertiesnames]
    iprop = [i for i, k in enumerate(gbl_results.best.propertiesnames)
             if k in gbl_results.best.massproportional]
272
    gbl_results.best.properties[oidx, iprop] *= scaling * corr_dz
273
274
275
    gbl_results.best.fluxes[oidx, :] = fluxes * scaling

    if gbl_conf['analysis_params']["save_best_sed"]:
276
        sed.to_fits('out/{}'.format(obs.id), scaling)
277
278
279
280
281
282
283
284

    with gbl_ncomputed.get_lock():
        gbl_ncomputed.value += 1
        ncomputed = gbl_ncomputed.value
    dt = time.time() - gbl_t0
    print("{}/{} best fit spectra computed in {:.1f} seconds ({:.2f} objects/s)".
          format(ncomputed, len(gbl_obs), dt, ncomputed/dt), end="\n" if
          ncomputed == len(gbl_obs) else "\r")