Commit 8b764157 authored by Médéric Boquien's avatar Médéric Boquien

Switch savefluxes to using a models manager and make all the necessary...

Switch savefluxes to using a models manager and make all the necessary adjustments with the workers.
parent beade2fa
...@@ -18,13 +18,13 @@ The data file is used only to get the list of fluxes to be computed. ...@@ -18,13 +18,13 @@ The data file is used only to get the list of fluxes to be computed.
from collections import OrderedDict from collections import OrderedDict
import ctypes import ctypes
import multiprocessing as mp import multiprocessing as mp
from multiprocessing.sharedctypes import RawArray
import time import time
from .. import AnalysisModule from .. import AnalysisModule
from ..utils import save_fluxes
from .workers import init_fluxes as init_worker_fluxes from .workers import init_fluxes as init_worker_fluxes
from .workers import fluxes as worker_fluxes from .workers import fluxes as worker_fluxes
from ...managers.models import ModelsManager
from ...managers.observations import ObservationsManager
from ...managers.parameters import ParametersManager from ...managers.parameters import ParametersManager
...@@ -51,6 +51,23 @@ class SaveFluxes(AnalysisModule): ...@@ -51,6 +51,23 @@ class SaveFluxes(AnalysisModule):
)) ))
]) ])
@staticmethod
def _compute_models(conf, observations, params):
models = ModelsManager(conf, observations, params)
initargs = (models, time.time(), mp.Value('i', 0))
if conf['cores'] == 1: # Do not create a new process
init_worker_fluxes(*initargs)
for idx in range(len(params)):
worker_fluxes(idx)
else: # Analyse observations in parallel
with mp.Pool(processes=conf['cores'],
initializer=init_worker_fluxes,
initargs=initargs) as pool:
pool.map(worker_fluxes, range(len(params)))
return models
def process(self, conf): def process(self, conf):
"""Process with the savedfluxes analysis. """Process with the savedfluxes analysis.
...@@ -66,11 +83,10 @@ class SaveFluxes(AnalysisModule): ...@@ -66,11 +83,10 @@ class SaveFluxes(AnalysisModule):
# Rename the output directory if it exists # Rename the output directory if it exists
self.prepare_dirs() self.prepare_dirs()
save_sed = conf['analysis_params']['save_sed']
filters = [name for name in conf['bands'] if not # Read the observations information in order to retrieve the list of
name.endswith('_err')] # bands to compute the fluxes.
n_filters = len(filters) observations = ObservationsManager(conf)
# The parameters manager allows us to retrieve the models parameters # The parameters manager allows us to retrieve the models parameters
# from a 1D index. This is useful in that we do not have to create # from a 1D index. This is useful in that we do not have to create
...@@ -78,28 +94,12 @@ class SaveFluxes(AnalysisModule): ...@@ -78,28 +94,12 @@ class SaveFluxes(AnalysisModule):
# nice goodies such as finding the index of the first parameter to # nice goodies such as finding the index of the first parameter to
# have changed between two indices or the number of models. # have changed between two indices or the number of models.
params = ParametersManager(conf) params = ParametersManager(conf)
n_params = params.size
info = conf['analysis_params']['variables']
n_info = len(info)
model_fluxes = (RawArray(ctypes.c_double, n_params * n_filters),
(n_params, n_filters))
model_parameters = (RawArray(ctypes.c_double, n_params * n_info),
(n_params, n_info))
initargs = (params, filters, save_sed, info, model_fluxes, print("Computing the models ...")
model_parameters, time.time(), mp.Value('i', 0)) models = self._compute_models(conf, observations, params)
if conf['cores'] == 1: # Do not create a new process
init_worker_fluxes(*initargs)
for idx in range(n_params):
worker_fluxes(idx)
else: # Analyse observations in parallel
with mp.Pool(processes=conf['cores'],
initializer=init_worker_fluxes,
initargs=initargs) as pool:
pool.map(worker_fluxes, range(n_params))
save_fluxes(model_fluxes, model_parameters, filters, info) print("Saving the models ...")
models.save('models')
# AnalysisModule to be returned by get_module # AnalysisModule to be returned by get_module
......
...@@ -13,59 +13,39 @@ from ...warehouse import SedWarehouse ...@@ -13,59 +13,39 @@ from ...warehouse import SedWarehouse
from ..utils import nothread from ..utils import nothread
def init_fluxes(params, filters, save_sed, variables, fluxes, info, t_begin, def init_fluxes(models, t0, ncomputed):
n_computed):
"""Initializer of the pool of processes. It is mostly used to convert """Initializer of the pool of processes. It is mostly used to convert
RawArrays into numpy arrays. The latter are defined as global variables to RawArrays into numpy arrays. The latter are defined as global variables to
be accessible from the workers. be accessible from the workers.
Parameters Parameters
---------- ----------
params: ParametersManager models: ModelsManagers
Manages the parameters from a 1D index. Manages the storage of the computed models (fluxes and properties).
filters: List t0: float
Contains the names of the filters to compute the fluxes.
save_sed: boolean
Indicates whether the SED should be saved.
variables: list
List of variables to be computed
fluxes: RawArray and tuple containing the shape
Fluxes of individual models. Shared among workers.
n_computed: Value
Number of computed models. Shared among workers.
t_begin: float
Time of the beginning of the computation. Time of the beginning of the computation.
ncomputed: Value
Number of computed models. Shared among workers.
""" """
global gbl_model_fluxes, gbl_model_info, gbl_n_computed, gbl_t_begin global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_obs
global gbl_params, gbl_previous_idx, gbl_filters, gbl_save_sed global gbl_properties, gbl_save, gbl_t0, gbl_ncomputed
global gbl_warehouse, gbl_variables
# Limit the number of threads to 1 if we use MKL in order to limit the # Limit the number of threads to 1 if we use MKL in order to limit the
# oversubscription of the CPU/RAM. # oversubscription of the CPU/RAM.
nothread() nothread()
gbl_model_fluxes = np.ctypeslib.as_array(fluxes[0])
gbl_model_fluxes = gbl_model_fluxes.reshape(fluxes[1])
gbl_model_info = np.ctypeslib.as_array(info[0])
gbl_model_info = gbl_model_info.reshape(info[1])
gbl_n_computed = n_computed
gbl_t_begin = t_begin
gbl_params = params
gbl_previous_idx = -1 gbl_previous_idx = -1
gbl_filters = filters
gbl_save_sed = save_sed
gbl_variables = variables
gbl_warehouse = SedWarehouse() gbl_warehouse = SedWarehouse()
gbl_models = models
gbl_obs = models.obs
gbl_properties = [prop[:-4] if prop.endswith('_log') else prop for prop in
models.propertiesnames]
gbl_save = models.conf['analysis_params']['save_sed']
gbl_t0 = t0
gbl_ncomputed = ncomputed
def fluxes(idx): def fluxes(idx):
"""Worker process to retrieve a SED and affect the relevant data to shared """Worker process to retrieve a SED and affect the relevant data to shared
...@@ -74,36 +54,36 @@ def fluxes(idx): ...@@ -74,36 +54,36 @@ def fluxes(idx):
Parameters Parameters
---------- ----------
idx: int idx: int
Index of the model to retrieve its parameters from the parameters Index of the model within the current block of models.
manager.
""" """
global gbl_previous_idx, gbl_keys global gbl_previous_idx
if gbl_previous_idx > -1: if gbl_previous_idx > -1:
gbl_warehouse.partial_clear_cache( gbl_warehouse.partial_clear_cache(
gbl_params.index_module_changed(gbl_previous_idx, idx)) gbl_models.params.index_module_changed(gbl_previous_idx, idx))
gbl_previous_idx = idx gbl_previous_idx = idx
sed = gbl_warehouse.get_sed(gbl_params.modules, sed = gbl_warehouse.get_sed(gbl_models.params.modules,
gbl_params.from_index(idx)) gbl_models.params.from_index(idx))
if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']: if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']:
gbl_model_fluxes[idx, :] = np.full(len(gbl_filters), np.nan) gbl_models.fluxes[:, idx] = np.full(len(gbl_obs.bands), np.nan)
gbl_model_info[idx, :] = np.full(len(gbl_variables), np.nan) gbl_models.properties[:, idx] = np.full(len(gbl_properties), np.nan)
else: else:
gbl_model_fluxes[idx, :] = np.array([sed.compute_fnu(filter_) for gbl_models.fluxes[:, idx] = [sed.compute_fnu(filter_)
filter_ in gbl_filters]) for filter_ in gbl_obs.bands]
gbl_model_info[idx, :] = np.array([sed.info[name] for name in gbl_models.properties[:, idx] = [sed.info[name]
gbl_variables]) for name in gbl_properties]
if gbl_save_sed is True: if gbl_save is True:
sed.to_fits("out/{}".format(idx)) sed.to_fits("out/{}".format(idx))
with gbl_n_computed.get_lock():
gbl_n_computed.value += 1 with gbl_ncomputed.get_lock():
n_computed = gbl_n_computed.value gbl_ncomputed.value += 1
if n_computed % 250 == 0 or n_computed == gbl_params.size: ncomputed = gbl_ncomputed.value
t_elapsed = time.time() - gbl_t_begin nmodels = len(gbl_models.block)
if ncomputed % 250 == 0 or ncomputed == nmodels:
dt = time.time() - gbl_t0
print("{}/{} models computed in {:.1f} seconds ({:.1f} models/s)". print("{}/{} models computed in {:.1f} seconds ({:.1f} models/s)".
format(n_computed, gbl_params.size, t_elapsed, format(ncomputed, nmodels, dt, ncomputed/dt),
n_computed/t_elapsed), end="\n" if ncomputed == nmodels else "\r")
end="\n" if n_computed == gbl_params.size else "\r")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment