Commit 2b63e5eb authored by Médéric Boquien's avatar Médéric Boquien

Implement the possibility of computing models by blocks in savefluxes

parent d2191a8a
......@@ -5,6 +5,7 @@
- Provide the possibility not to store a given module in cache. This can be useful on computers with a limited amount of memory. The downside is that when not caching the model generation will be slower. (Médéric Boquien)
- An option `redshift\_decimals` is now provided in `pdf\_analysis` to indicate the number of decimals to round the observed redshifts to compute the grid of models. By default the model redshifts are rounded to two decimals but this can be insufficient at low z and/or when using narrow-band filters for instance. This only applies to the grid. The physical properties are still computed for the redshift at full precision. (Médéric Boquien)
- Bands with negative fluxes are now considered valid and are fitted as any other band. (Médéric Boquien)
- Allow the models to be computed by blocks in `savefluxes`. This can be useful when computing a very large grid and/or to split the results file into various smaller files as large files can be difficult to handle. The number of blocks is set with the `blocks` parameters in the pcigale.ini. (Médéric Boquien)
### Changed
- Make the timestamp more readable when moving the out/ directory. (Médéric Boquien)
......
......@@ -48,25 +48,42 @@ class SaveFluxes(AnalysisModule):
"boolean()",
"If True, save the generated spectrum for each model.",
False
)),
("blocks", (
"integer(min=1)",
"Number of blocks to compute the models. Having a number of blocks"
" larger than 1 can be useful when computing a very large number "
"of models or to split the result file into smaller files.",
1
))
])
@staticmethod
def _compute_models(conf, observations, params):
models = ModelsManager(conf, observations, params)
initargs = (models, time.time(), mp.Value('i', 0))
if conf['cores'] == 1: # Do not create a new process
init_worker_fluxes(*initargs)
for idx in range(len(params)):
worker_fluxes(idx)
else: # Analyse observations in parallel
with mp.Pool(processes=conf['cores'],
initializer=init_worker_fluxes,
def _parallel_job(worker, items, initargs, initializer, ncores):
if ncores == 1: # Do not create a new process
initializer(*initargs)
for idx, item in enumerate(items):
worker(idx, item)
else: # run in parallel
with mp.Pool(processes=ncores, initializer=initializer,
initargs=initargs) as pool:
pool.map(worker_fluxes, range(len(params)))
pool.starmap(worker, enumerate(items))
def _compute_models(self, conf, obs, params):
nblocks = len(params.blocks)
for iblock in range(nblocks):
print('Computing models for block {}/{}...'.format(iblock + 1,
nblocks))
models = ModelsManager(conf, obs, params, iblock)
initargs = (models, time.time(), mp.Value('i', 0))
self._parallel_job(worker_fluxes, params.blocks[iblock], initargs,
init_worker_fluxes, conf['cores'])
print("Saving the models ....")
models.save('models-block-{}'.format(iblock))
return models
def process(self, conf):
"""Process with the savedfluxes analysis.
......@@ -95,11 +112,7 @@ class SaveFluxes(AnalysisModule):
# have changed between two indices or the number of models.
params = ParametersManager(conf)
print("Computing the models ...")
models = self._compute_models(conf, observations, params)
print("Saving the models ...")
models.save('models')
self._compute_models(conf, observations, params)
# AnalysisModule to be returned by get_module
......
......@@ -47,7 +47,7 @@ def init_fluxes(models, t0, ncomputed):
gbl_ncomputed = ncomputed
def fluxes(idx):
def fluxes(idx, midx):
"""Worker process to retrieve a SED and affect the relevant data to shared
RawArrays.
......@@ -60,11 +60,11 @@ def fluxes(idx):
global gbl_previous_idx
if gbl_previous_idx > -1:
gbl_warehouse.partial_clear_cache(
gbl_models.params.index_module_changed(gbl_previous_idx, idx))
gbl_previous_idx = idx
gbl_models.params.index_module_changed(gbl_previous_idx, midx))
gbl_previous_idx = midx
sed = gbl_warehouse.get_sed(gbl_models.params.modules,
gbl_models.params.from_index(idx))
gbl_models.params.from_index(midx))
if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']:
gbl_models.fluxes[:, idx] = np.full(len(gbl_obs.bands), np.nan)
......@@ -76,7 +76,7 @@ def fluxes(idx):
for name in gbl_properties]
if gbl_save is True:
sed.to_fits("out/{}".format(idx))
sed.to_fits("out/{}".format(midx))
with gbl_ncomputed.get_lock():
gbl_ncomputed.value += 1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment