workers.py 3.54 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Centre de données Astrophysiques de Marseille
# Copyright (C) 2013-2014 Institute of Astronomy
# Copyright (C) 2014 Yannick Roehlly <yannick@iaora.eu>
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
# Author: Yannick Roehlly & Médéric Boquien

import time

import numpy as np

12
from ...warehouse import SedWarehouse
13
from ..utils import OUT_DIR
14

Médéric Boquien's avatar
Médéric Boquien committed
15

16 17
def init_fluxes(params, filters, save_sed, variables, fluxes, info, t_begin,
                n_computed):
18 19 20 21 22 23 24 25
    """Initializer of the pool of processes. It is mostly used to convert
    RawArrays into numpy arrays. The latter are defined as global variables to
    be accessible from the workers.

    Parameters
    ----------
    params: ParametersHandler
        Handles the parameters from a 1D index.
26 27
    filters: List
        Contains the names of the filters to compute the fluxes.
28 29
    save_sed: boolean
        Indicates whether the SED should be saved.
30 31
    variables: list
        List of variables to be computed
32 33 34 35 36 37 38 39 40
    fluxes: RawArray and tuple containing the shape
        Fluxes of individual models. Shared among workers.
    n_computed: Value
        Number of computed models. Shared among workers.
    t_begin: float
        Time of the beginning of the computation.

    """
    global gbl_model_fluxes, gbl_model_info, gbl_n_computed, gbl_t_begin
41
    global gbl_params, gbl_previous_idx, gbl_filters, gbl_save_sed
42
    global gbl_warehouse, gbl_variables
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58

    gbl_model_fluxes = np.ctypeslib.as_array(fluxes[0])
    gbl_model_fluxes = gbl_model_fluxes.reshape(fluxes[1])

    gbl_model_info = np.ctypeslib.as_array(info[0])
    gbl_model_info = gbl_model_info.reshape(info[1])

    gbl_n_computed = n_computed
    gbl_t_begin = t_begin

    gbl_params = params

    gbl_previous_idx = -1

    gbl_filters = filters

59 60
    gbl_save_sed = save_sed

61 62
    gbl_variables = variables

63
    gbl_warehouse = SedWarehouse()
64 65 66 67 68 69 70 71 72 73 74 75 76


def fluxes(idx):
    """Worker process to retrieve a SED and affect the relevant data to shared
    RawArrays.

    Parameters
    ----------
    idx: int
        Index of the model to retrieve its parameters from the parameters
        handler.

    """
77
    global gbl_previous_idx, gbl_keys
78 79 80 81 82 83 84 85
    if gbl_previous_idx > -1:
        gbl_warehouse.partial_clear_cache(
            gbl_params.index_module_changed(gbl_previous_idx, idx))
    gbl_previous_idx = idx

    sed = gbl_warehouse.get_sed(gbl_params.modules,
                                gbl_params.from_index(idx))

86
    if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']:
87
        gbl_model_fluxes[idx, :] = np.full(len(gbl_filters), np.nan)
88
        gbl_model_info[idx, :] = np.full(len(gbl_variables), np.nan)
89
    else:
90
        gbl_model_fluxes[idx, :] = np.array([sed.compute_fnu(filter_) for
91 92 93
                                           filter_ in gbl_filters])
        gbl_model_info[idx, :] = np.array([sed.info[name] for name in
                                           gbl_variables])
94

95
    if gbl_save_sed is True:
96
        sed.to_fits(OUT_DIR + "{}".format(idx))
97 98 99
    with gbl_n_computed.get_lock():
        gbl_n_computed.value += 1
        n_computed = gbl_n_computed.value
100
    if n_computed % 250 == 0 or n_computed == gbl_params.size:
101 102 103 104 105
        t_elapsed = time.time() - gbl_t_begin
        print("{}/{} models computed in {} seconds ({} models/s)".
              format(n_computed, gbl_params.size,
                     np.around(t_elapsed, decimals=1),
                     np.around(n_computed/t_elapsed, decimals=1)),
106
              end="\n" if n_computed == gbl_params.size else "\r")