Commit 1962d770 authored by Médéric Boquien's avatar Médéric Boquien

The counter can become a bottleneck when many parallel processes are running...

The counter can become a bottleneck when many parallel processes are running concurrently because of the lock to update it. To address this we implement a Counter class that only updates the counter periodically rather than at each iteration. The number of models actually computed at any moment is kept in a local counter.
parent a9c4a21f
......@@ -18,6 +18,7 @@
### Optimised
- The estimation of the physical properties is made a bit faster when all the models are valid. (Médéric Boquien)
- The access to the SED and module caches has been made faster and simpler. This results in a speedup of ~6% in the computation of the models. (Médéric Boquien)
- The models counter was a bottleneck when using many cores as updating it could stall other parallel processes. Now the internal counter is updated much less frequently. The speedup goes from between negligible (few cores) up to a factor of a few (many cores). The downside is the the updates on the screen may be a bit irregular. (Médéric Boquien)
## 0.12.1 (2018-02-27)
### Fixed
......
......@@ -27,11 +27,11 @@ reduced χ²) is given for each observation.
from collections import OrderedDict
import multiprocessing as mp
import time
import numpy as np
from .. import AnalysisModule
from ..utils import Counter
from .workers import sed as worker_sed
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
......@@ -97,25 +97,28 @@ class PdfAnalysis(AnalysisModule):
def _compute_models(self, conf, obs, params, iblock):
models = ModelsManager(conf, obs, params, iblock)
counter = Counter(len(params.blocks[iblock]), 50, 250)
initargs = (models, counter)
initargs = (models, time.time(), mp.Value('i', 0))
self._parallel_job(worker_sed, params.blocks[iblock], initargs,
init_worker_sed, conf['cores'])
# Print the final value as it may not otherwise be printed
counter.pprint(len(params.blocks[iblock]))
return models
def _compute_bayes(self, conf, obs, models):
results = ResultsManager(models)
initargs = (models, results, time.time(), mp.Value('i', 0))
initargs = (models, results, Counter(len(obs)))
self._parallel_job(worker_analysis, obs, initargs,
init_worker_analysis, conf['cores'])
return results
def _compute_best(self, conf, obs, params, results):
initargs = (conf, params, obs, results, time.time(),
mp.Value('i', 0))
initargs = (conf, params, obs, results, Counter(len(obs)))
self._parallel_job(worker_bestfit, obs, initargs,
init_worker_bestfit, conf['cores'])
......
......@@ -16,20 +16,18 @@ from .utils import save_chi2, compute_corr_dz, compute_chi2, weighted_param
from ...warehouse import SedWarehouse
def init_sed(models, t0, ncomputed):
def init_sed(models, counter):
"""Initializer of the pool of processes to share variables between workers.
Parameters
----------
models: ModelsManagers
Manages the storage of the computed models (fluxes and properties).
t0: float
Time of the beginning of the computation.
ncomputed: Value
Number of computed models. Shared among workers.
counter: Counter class object
Counter for the number of models computed
"""
global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_t0, gbl_ncomputed
global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_counter
# Limit the number of threads to 1 if we use MKL in order to limit the
# oversubscription of the CPU/RAM.
......@@ -39,11 +37,10 @@ def init_sed(models, t0, ncomputed):
gbl_warehouse = SedWarehouse()
gbl_models = models
gbl_t0 = t0
gbl_ncomputed = ncomputed
gbl_counter = counter
def init_analysis(models, results, t0, ncomputed):
def init_analysis(models, results, counter):
"""Initializer of the pool of processes to share variables between workers.
Parameters
......@@ -52,22 +49,19 @@ def init_analysis(models, results, t0, ncomputed):
Manages the storage of the computed models (fluxes and properties).
results: ResultsManager
Contains the estimates and errors on the properties.
t0: float
Time of the beginning of the computation.
ncomputed: Value
Number of computed models. Shared among workers.
counter: Counter class object
Counter for the number of objects analysed
"""
global gbl_models, gbl_obs, gbl_results, gbl_t0, gbl_ncomputed
global gbl_models, gbl_obs, gbl_results, gbl_counter
gbl_models = models
gbl_obs = models.obs
gbl_results = results
gbl_t0 = t0
gbl_ncomputed = ncomputed
gbl_counter = counter
def init_bestfit(conf, params, observations, results, t0, ncomputed):
def init_bestfit(conf, params, observations, results, counter):
"""Initializer of the pool of processes to share variables between workers.
Parameters
......@@ -78,18 +72,14 @@ def init_bestfit(conf, params, observations, results, t0, ncomputed):
Manages the parameters from a 1D index.
observations: astropy.Table
Contains the observations including the filter names.
ncomputed: Value
Number of computed models. Shared among workers.
t0: float
Time of the beginning of the computation.
results: ResultsManager
Contains the estimates and errors on the properties.
offset: integer
Offset of the block to retrieve the global model index.
counter: Counter class object
Counter for the number of objects analysed
"""
global gbl_previous_idx, gbl_warehouse, gbl_conf, gbl_params, gbl_obs
global gbl_results, gbl_t0, gbl_ncomputed
global gbl_results, gbl_counter
gbl_previous_idx = -1
gbl_warehouse = SedWarehouse()
......@@ -98,8 +88,7 @@ def init_bestfit(conf, params, observations, results, t0, ncomputed):
gbl_params = params
gbl_obs = observations
gbl_results = results
gbl_t0 = t0
gbl_ncomputed = ncomputed
gbl_counter = counter
def sed(idx, midx):
......@@ -138,15 +127,7 @@ def sed(idx, midx):
for prop in gbl_models.intprop.keys():
gbl_models.intprop[prop][idx] = sed.info[prop]
with gbl_ncomputed.get_lock():
gbl_ncomputed.value += 1
ncomputed = gbl_ncomputed.value
nmodels = len(gbl_models.block)
if ncomputed % 250 == 0 or ncomputed == nmodels:
dt = time.time() - gbl_t0
print("{}/{} models computed in {:.1f} seconds ({:.1f} models/s)".
format(ncomputed, nmodels, dt, ncomputed/dt),
end="\n" if ncomputed == nmodels else "\r")
gbl_counter.inc()
def analysis(idx, obs):
......@@ -231,14 +212,7 @@ def analysis(idx, obs):
"models are older than the Universe or that your chi² are very "
"large.".format(obs.id))
with gbl_ncomputed.get_lock():
gbl_ncomputed.value += 1
ncomputed = gbl_ncomputed.value
dt = time.time() - gbl_t0
print("{}/{} objects analysed in {:.1f} seconds ({:.2f} objects/s)".
format(ncomputed, len(gbl_models.obs), dt, ncomputed/dt),
end="\n" if ncomputed == len(gbl_models.obs) else "\r")
gbl_counter.inc()
def bestfit(oidx, obs):
"""Worker process to compute and save the best fit.
......@@ -284,10 +258,4 @@ def bestfit(oidx, obs):
if gbl_conf['analysis_params']["save_best_sed"]:
sed.to_fits('out/{}'.format(obs.id), scaling)
with gbl_ncomputed.get_lock():
gbl_ncomputed.value += 1
ncomputed = gbl_ncomputed.value
dt = time.time() - gbl_t0
print("{}/{} best fit spectra computed in {:.1f} seconds ({:.2f} objects/s)".
format(ncomputed, len(gbl_obs), dt, ncomputed/dt), end="\n" if
ncomputed == len(gbl_obs) else "\r")
gbl_counter.inc()
......@@ -17,9 +17,9 @@ The data file is used only to get the list of fluxes to be computed.
from collections import OrderedDict
import multiprocessing as mp
import time
from .. import AnalysisModule
from ..utils import Counter
from .workers import init_fluxes as init_worker_fluxes
from .workers import fluxes as worker_fluxes
from ...managers.models import ModelsManager
......@@ -75,11 +75,15 @@ class SaveFluxes(AnalysisModule):
nblocks))
models = ModelsManager(conf, obs, params, iblock)
counter = Counter(len(params.blocks[iblock]), 50, 250)
initargs = (models, time.time(), mp.Value('i', 0))
initargs = (models, counter)
self._parallel_job(worker_fluxes, params.blocks[iblock], initargs,
init_worker_fluxes, conf['cores'])
# Print the final value as it may not otherwise be printed
counter.pprint(len(params.blocks[iblock]))
print("Saving the models ....")
models.save('models-block-{}'.format(iblock))
......
......@@ -13,7 +13,7 @@ from ...warehouse import SedWarehouse
from ..utils import nothread
def init_fluxes(models, t0, ncomputed):
def init_fluxes(models, counter):
"""Initializer of the pool of processes. It is mostly used to convert
RawArrays into numpy arrays. The latter are defined as global variables to
be accessible from the workers.
......@@ -22,14 +22,12 @@ def init_fluxes(models, t0, ncomputed):
----------
models: ModelsManagers
Manages the storage of the computed models (fluxes and properties).
t0: float
Time of the beginning of the computation.
ncomputed: Value
Number of computed models. Shared among workers.
counter: Counter class object
Counter for the number of models computed
"""
global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_obs, gbl_save
global gbl_t0, gbl_ncomputed
global gbl_counter
# Limit the number of threads to 1 if we use MKL in order to limit the
......@@ -42,8 +40,7 @@ def init_fluxes(models, t0, ncomputed):
gbl_models = models
gbl_obs = models.obs
gbl_save = models.conf['analysis_params']['save_sed']
gbl_t0 = t0
gbl_ncomputed = ncomputed
gbl_counter = counter
def fluxes(idx, midx):
......@@ -83,12 +80,4 @@ def fluxes(idx, midx):
if gbl_save is True:
sed.to_fits("out/{}".format(midx))
with gbl_ncomputed.get_lock():
gbl_ncomputed.value += 1
ncomputed = gbl_ncomputed.value
nmodels = len(gbl_models.block)
if ncomputed % 250 == 0 or ncomputed == nmodels:
dt = time.time() - gbl_t0
print("{}/{} models computed in {:.1f} seconds ({:.1f} models/s)".
format(ncomputed, nmodels, dt, ncomputed/dt),
end="\n" if ncomputed == nmodels else "\r")
gbl_counter.inc()
......@@ -6,6 +6,9 @@
"""
Various utility functions for pcigale analysis modules
"""
import multiprocessing as mp
import time
def nothread():
"""Some libraries such as Intel's MKL have automatic threading. This is
......@@ -27,3 +30,42 @@ def nothread():
mkl.set_num_threads(1)
except ImportError:
pass
class Counter:
"""Class to count the number of models computers/objects analysed. It has
two internal counters. One is internal to the process and is incremented at
each iteration. The other one is global is is only incremented
periodically. The fundamental reason is that a lock is needed to increment
the global value. When using many cores this can strongly degrade the
performance. Similarly printing the number of iterations achieved and at
what speed is only done periodically. For practical reasons the printing
frequency has to be a multiple of the incrementation frequency of the
global counter.
"""
def __init__(self, nmodels, freq_inc=1, freq_print=1):
if freq_print % freq_inc != 0:
raise ValueError("The printing frequency must be a multiple of "
"the increment frequency.")
self.nmodels = nmodels
self.freq_inc = freq_inc
self.freq_print = freq_print
self.global_counter = mp.Value('i', 0)
self.proc_counter = 0
self.t0 = time.time()
def inc(self):
self.proc_counter += 1
if self.proc_counter % self.freq_inc == 0:
with self.global_counter.get_lock():
self.global_counter.value += self.freq_inc
n = self.global_counter.value
if n % self.freq_print == 0:
self.pprint(n)
def pprint(self, n):
dt = time.time() - self.t0
print("{}/{} computed in {:.1f} seconds ({:.1f}/s)".
format(n, self.nmodels, dt, n / dt),
end="\n" if n == self.nmodels else "\r")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment