Commit 83e13558 authored by Médéric Boquien's avatar Médéric Boquien

Improve memory management. Several techniques are used. First, we use arrays...

Improve memory management. Several techniques are used. First, we use arrays in shared memory when computing the SEDs (this should also be done for the analysis later). This avoids using temporary objects that have to be sent back into a list from pool.starmap. Second, we do not save the sed.info data until the best sed has been computed. Only then we recompute the SED to get the info. Third, the list of changed parameters for cache cleaning was too big. It contained the key and the value of the changed parameter to identify models to be discarded. Now we simply return an array of integers giving the index of the module to be discarded. Much simpler, much smaller. There are also some small improvements here and there that I probably forget. The main culprit now is by far the list of parameters. It gets horribly big. 3.5 GB for 30M models. Anyway, now pcigale runs with 30M models.
parent bec83c55
......@@ -24,19 +24,25 @@ reduced χ²) is given for each observation.
"""
import numpy as np
from collections import OrderedDict
import ctypes
import multiprocessing as mp
from multiprocessing.sharedctypes import RawArray
import time
import numpy as np
from ...utils import read_table
from .. import AnalysisModule, complete_obs_table
from .utils import save_table_analysis, save_table_best, backup_dir
from ...warehouse import SedWarehouse
from ...data import Database
from .workers import sed as worker_sed
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
from .workers import analysis as worker_analysis
from ..utils import find_changed_parameters
import pcigale.analysis_modules.myglobals as gbl
import time
# Tolerance threshold under which any flux or error is considered as 0.
TOLERANCE = 1e-12
......@@ -123,6 +129,7 @@ class PdfAnalysis(AnalysisModule):
gbl.save_chi2 = parameters["save_chi2"].lower() == "true"
gbl.save_pdf = parameters["save_pdf"].lower() == "true"
gbl.n_models = len(creation_modules_params)
gbl.n_variables = len(gbl.analysed_variables)
# Get the needed filters in the pcigale database. We use an ordered
# dictionary because we need the keys to always be returned in the
......@@ -132,6 +139,7 @@ class PdfAnalysis(AnalysisModule):
gbl.filters = OrderedDict([(name, base.get_filter(name))
for name in column_list
if not name.endswith('_err')])
gbl.n_filters = len(gbl.filters)
# Read the observation table and complete it by adding error where
# none is provided and by adding the systematic deviation.
......@@ -147,25 +155,30 @@ class PdfAnalysis(AnalysisModule):
# The SED warehouse is used to retrieve SED corresponding to some
# modules and parameters.
gbl.warehouse = SedWarehouse(cache_type=parameters["storage_type"])
sed = gbl.warehouse.get_sed(gbl.creation_modules,
gbl.creation_modules_params[0])
creation_modules_params[0])
gbl.info_keys = list(sed.info.keys())
gbl.n_keys = len(gbl.info_keys)
gbl.mass_proportional_info = sed.mass_proportional_info
gbl.has_sfh = sed.sfh is not None
# Then, for all possible theoretical models (one for each paremeter set
# in creation_module_parameters), we compute 1) the fluxes in all the
# filters, 2) the values of the model variables, 3) the redshift of the
# models (this is important for the analysis to easily identify which
# models correspond to a given redshift), and 4) the sed.info values
# (without the keys to save space ; this is possible because we are
# using an OrderedDict so it is easy to retrieve the value for a known
# key using gbl.info_keys). Note that we could compute the last two
# elements in the main process but we would lose time doing so. Not
# necessarily much though. TODO: benchmark this.
# Arrays where we store the data related to the models. For memory
# efficiency reasons, we use RawArrays that will be passed in argument
# to the pool. Each worker will fill a part of the RawArrays. It is
# important that there is no conflict and that two different workers do
# not write on the same section.
# We put the shape in a tuple along with the RawArray because workers
# need to know the shape to create the numpy array from the RawArray.
model_redshifts = (RawArray(ctypes.c_double, gbl.n_models),
(gbl.n_models))
model_fluxes = (RawArray(ctypes.c_double,
gbl.n_models * gbl.n_filters),
(gbl.n_models, gbl.n_filters))
model_variables = (RawArray(ctypes.c_double,
gbl.n_models * gbl.n_variables),
(gbl.n_models, gbl.n_variables))
# In order not to clog the warehouse memory with SEDs that will not be
# used again during the SED generation, we identify which parameter has
......@@ -173,60 +186,28 @@ class PdfAnalysis(AnalysisModule):
changed_pars = find_changed_parameters(creation_modules_params)
# Compute the SED fluxes and ancillary data in parallel
gbl.n_computed = mp.Value('i', 0)
gbl.t_begin = time.time()
with mp.Pool(processes=cores) as pool:
items = pool.starmap(worker_sed, zip(creation_modules_params,
changed_pars))
# We create the arrays to store the model fluxes and ancillary data.
# They are stored in a shared module so they are easily accessible by
# the processes that will carry out the analysis, without requiring any
# data transfer that will grind pcigale to a halt. We use a numpy
# masked array to mask the fluxes of models that would be older than
# the age of the Universe at the considered redshift.
gbl.model_fluxes = np.ma.empty((len(creation_modules_params),
len(gbl.filters)))
gbl.model_variables = np.ma.empty((len(creation_modules_params),
len(gbl.analysed_variables)))
gbl.model_redshifts = np.empty(len(creation_modules_params))
gbl.model_info = [None] * len(creation_modules_params)
# Unpack the computed data into their respective arrays.
for idx_item, item in enumerate(items):
gbl.model_fluxes[idx_item, :] = item[0]
gbl.model_variables[idx_item, :] = item[1]
gbl.model_redshifts[idx_item] = item[2]
gbl.model_info[idx_item] = item[3]
initargs = (model_redshifts, model_fluxes, model_variables,
mp.Value('i', 0), time.time())
with mp.Pool(processes=cores, initializer=init_worker_sed,
initargs=initargs) as pool:
pool.starmap(worker_sed, zip(changed_pars,
range(gbl.n_models)))
print('\nAnalysing models...')
# Mask the invalid fluxes
gbl.model_fluxes = np.ma.masked_less(gbl.model_fluxes, -90)
# To make it easy to subprocesses to identify which parts of the
# various arrays correspond to a given redshift we 1) make a list of
# model redshifts, and 2) a dictionary indicatin the indices
# corresponding to a given redshift. These two objects are assigned to
# the shared module.
gbl.redshifts = np.unique(gbl.model_redshifts)
gbl.w_redshifts = {redshift: gbl.model_redshifts == redshift
for redshift in gbl.redshifts}
# Analysis of each object in parallel. All the model data are
# transmitted through a shared module to avoid memory copies that would
# grind pcigale to a halt.
gbl.n_computed = mp.Value('i', 0)
gbl.t_begin = time.time()
with mp.Pool(processes=cores) as pool:
# Analysis of each object in parallel.
initargs = (model_redshifts, model_fluxes, model_variables,
mp.Value('i', 0), time.time())
with mp.Pool(processes=cores, initializer=init_worker_analysis,
initargs=initargs) as pool:
items = pool.starmap(worker_analysis, zip(obs_table))
# Local arrays where to unpack the results of the analysis
analysed_averages = np.empty((len(items), len(gbl.analysed_variables)))
analysed_averages = np.empty((gbl.n_obs, gbl.n_variables))
analysed_std = np.empty_like(analysed_averages)
best_fluxes = np.empty((len(items), len(gbl.filters)))
best_parameters = [None] * len(items)
best_normalisation_factors = np.empty(len(obs_table))
best_fluxes = np.empty((gbl.n_obs, gbl.n_filters))
best_parameters = [None] * gbl.n_obs
best_normalisation_factors = np.empty(gbl.n_obs)
best_chi2 = np.empty_like(best_normalisation_factors)
best_chi2_red = np.empty_like(best_normalisation_factors)
......
......@@ -78,24 +78,20 @@ def gen_pdf(values, probabilities, grid):
return result
def save_best_sed(obsid, creation_modules, params, norm):
def save_best_sed(obsid, sed, norm):
"""Save the best SED to a VO table.
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
creation_modules: list
List of modules used to generate the SED
params: list
List of parameters to generate the SED
sed: SED object
Best SED
norm: float
Normalisation factor to scale the scale to the observations
"""
with SedWarehouse(cache_type="memory") as sed_warehouse:
sed = sed_warehouse.get_sed(creation_modules, params)
sed.to_votable(OUT_DIR + "{}_best_model.xml".format(obsid), mass=norm)
sed.to_votable(OUT_DIR + "{}_best_model.xml".format(obsid), mass=norm)
def save_pdf(obsid, analysed_variables, model_variables, likelihood):
......
......@@ -5,24 +5,89 @@
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
# Author: Yannick Roehlly & Médéric Boquien
import time
import numpy as np
from .utils import save_best_sed, save_pdf, save_chi2
import pcigale.analysis_modules.myglobals as gbl
import time
# Probability threshold: models with a lower probability are excluded from
# the moments computation.
MIN_PROBABILITY = 1e-20
def sed(model_params, changed):
def init_sed(redshifts, fluxes, variables, n_computed, t_begin):
"""Initializer of the pool of processes. It is mostly used to convert
RawArrays into numpy arrays. The latter are defined as global variables to
be accessible from the workers.
Parameters
----------
redshifts: RawArray and tuple containing the shape
Redshifts of individual models. Shared among workers.
fluxes: RawArray and tuple containing the shape
Fluxes of individual models. Shared among workers.
variables: RawArray and tuple containing the shape
Values of the analysed variables. Shared among workers.
n_computed: Value
Number of computed models. Shared among workers.
t_begin: float
Time of the beginning of the computation.
"""
global gbl_model_redshifts, gbl_model_fluxes, gbl_model_variables
global gbl_n_computed, gbl_t_begin
gbl_model_redshifts = np.ctypeslib.as_array(redshifts[0])
gbl_model_fluxes = np.ctypeslib.as_array(fluxes[0])
gbl_model_fluxes = gbl_model_fluxes.reshape(fluxes[1])
gbl_model_variables = np.ctypeslib.as_array(variables[0])
gbl_model_variables = gbl_model_variables.reshape(variables[1])
gbl_n_computed = n_computed
gbl_t_begin = t_begin
def init_analysis(redshifts, fluxes, variables, n_computed, t_begin):
"""Initializer of the pool of processes. It is mostly used to convert
RawArrays into numpy arrays. The latter are defined as global variables to
be accessible from the workers.
Parameters
----------
redshifts: RawArray and tuple containing the shape
Redshifts of individual models. Shared among workers.
fluxes: RawArray
Fluxes of individual models. Shared among workers.
variables: RawArray and tuple containing the shape
Values of the analysed variables. Shared among workers.
n_computed: Value
Number of computed models. Shared among workers.
t_begin: float
Time of the beginning of the computation
"""
init_sed(redshifts, fluxes, variables, n_computed, t_begin)
global gbl_redshifts, gbl_w_redshifts
gbl_redshifts = np.unique(gbl_model_redshifts)
gbl_w_redshifts = {redshift: gbl_model_redshifts == redshift
for redshift in gbl_redshifts}
def sed(changed, idx):
"""Worker process to retrieve a SED and return the relevant data
Parameters
----------
model_params: list
Parameters of the creation modules
changed: array
Index of the module whose parameter has changed. This is necessary for
cache cleaning
idx: int
Index of the model to retrieve its parameters from the global variable
Returns
-------
......@@ -35,7 +100,8 @@ def sed(model_params, changed):
the list returned by starmap anyway.
"""
sed = gbl.warehouse.get_sed(gbl.creation_modules, model_params)
sed = gbl.warehouse.get_sed(gbl.creation_modules,
gbl.creation_modules_params[idx])
gbl.warehouse.partial_clear_cache(changed)
if 'age' in sed.info and sed.info['age'] > sed.info['universe.age']:
......@@ -47,22 +113,22 @@ def sed(model_params, changed):
for filter_ in gbl.filters.values()])
model_variables = np.array([sed.info[name]
for name in gbl.analysed_variables])
redshift = sed.info['redshift']
info = list(sed.info.values()) # Prevents from returning a view
with gbl.n_computed.get_lock():
gbl.n_computed.value += 1
n_computed = gbl.n_computed.value
gbl_model_redshifts[idx] = sed.info['redshift']
gbl_model_fluxes[idx, :] = model_fluxes
gbl_model_variables[idx, :] = model_variables
with gbl_n_computed.get_lock():
gbl_n_computed.value += 1
n_computed = gbl_n_computed.value
if n_computed % 100 == 0 or n_computed == gbl.n_models:
t_elapsed = time.time() - gbl.t_begin
t_elapsed = time.time() - gbl_t_begin
print("{}/{} models computed in {} seconds ({} models/s)".
format(gbl.n_computed.value, gbl.n_models,
format(n_computed, gbl.n_models,
np.around(t_elapsed, decimals=1),
np.around(n_computed/t_elapsed, decimals=1)),
end="\r")
return model_fluxes, model_variables, redshift, info
def analysis(obs):
"""Worker process to analyse the PDF and estimate parameters values
......@@ -78,11 +144,11 @@ def analysis(obs):
normalisation factor, info of the best SED, fluxes of the best SED
"""
w = np.where(gbl.w_redshifts[gbl.redshifts[np.abs(obs['redshift'] -
gbl.redshifts).argmin()]])
w = np.where(gbl_w_redshifts[gbl_redshifts[np.abs(obs['redshift'] -
gbl_redshifts).argmin()]])
model_fluxes = gbl.model_fluxes[w[0], :]
model_variables = gbl.model_variables[w[0], :]
model_fluxes = np.ma.masked_less(gbl_model_fluxes[w[0], :], -90.)
model_variables = gbl_model_variables[w[0], :]
obs_fluxes = np.array([obs[name] for name in gbl.filters])
obs_errors = np.array([obs[name + "_err"] for name in gbl.filters])
......@@ -108,13 +174,8 @@ def analysis(obs):
np.square((obs_fluxes - norm_model_fluxes) / obs_errors),
axis=1)
# We define the reduced χ² as the χ² divided by the number of
# fluxes used for the fitting.
chi2_red = chi2_ / obs_fluxes.count()
# We use the exponential probability associated with the χ² as
# likelihood function.
# WARNING: shouldn't this be chi2_red rather?
likelihood = np.exp(-chi2_/2)
# For the analysis, we consider that the computed models explain
# each observation. We normalise the likelihood function to have a
......@@ -155,31 +216,33 @@ def analysis(obs):
# with the least χ².
best_index = chi2_.argmin()
# We compute once again the best sed to obtain its info
sed = gbl.warehouse.get_sed(gbl.creation_modules,
gbl.creation_modules_params[w[0][best_index]])
if gbl.save_best_sed:
save_best_sed(obs['id'], gbl.creation_modules,
gbl.creation_modules_params[w[0][best_index]],
norm_facts[best_index])
save_best_sed(obs['id'], sed, norm_facts[best_index])
if gbl.save_chi2:
save_chi2(obs['id'], gbl.analysed_variables, model_variables, chi2_red)
save_chi2(obs['id'], gbl.analysed_variables, model_variables, chi2_ /
obs_fluxes.count())
if gbl.save_pdf:
save_pdf(obs['id'], gbl.analysed_variables, model_variables,
likelihood)
with gbl.n_computed.get_lock():
gbl.n_computed.value += 1
n_computed = gbl.n_computed.value
with gbl_n_computed.get_lock():
gbl_n_computed.value += 1
n_computed = gbl_n_computed.value
if n_computed % 100 == 0 or n_computed == gbl.n_obs:
t_elapsed = time.time() - gbl.t_begin
t_elapsed = time.time() - gbl_t_begin
print("{}/{} objects analysed in {} seconds ({} objects/s)".
format(gbl.n_computed.value, gbl.n_obs,
np.around(t_elapsed, decimals=1),
format(n_computed, gbl.n_obs, np.around(t_elapsed, decimals=1),
np.around(n_computed/t_elapsed, decimals=1)),
end="\r")
return (analysed_averages,
analysed_std,
np.array(model_fluxes[best_index, :]), # do NOT remove np.array()
list(gbl.model_info[w[0][best_index]]),
np.array(model_fluxes[best_index, :], copy=True),
np.array(list(sed.info.values()), copy=True),
norm_facts[best_index],
chi2_[best_index],
chi2_red[best_index])
chi2_[best_index] / obs_fluxes.count())
......@@ -7,6 +7,8 @@
Various utility functions for pcigale analysis modules
"""
import numpy as np
def find_changed_parameters(list_parameters):
"""
......@@ -27,20 +29,24 @@ def find_changed_parameters(list_parameters):
Return
------
A list a tuples with the same size as the input list. Each tuple contains
the parameter that has changed and its value. When several parameters have
changed, it selects only the one that would discard the most models.
An array of integers containing the index of the module that has to be
discarded. When several parameters have changed we return the lowest index.
The cache cleaning routine can then just discard all SED with at least as
many modules. This will keep the cache small if used consistently.
"""
changed = [None] * len(list_parameters)
for i in range(len(list_parameters)-1):
for par, par_next in zip(list_parameters[i], list_parameters[i+1]):
for idx, (par, par_next) in enumerate(zip(list_parameters[i],
list_parameters[i+1])):
for k in par.keys():
if par[k] != par_next[k]:
if changed[i] is not None:
print('Warning! It went wrong in the cache cleaning')
changed[i] = (k, par[k])
changed[i] = idx
break
if changed[i] is not None:
break
# TODO: handle the special case of the last element
return changed
changed[-1] = 0
return np.array(changed, dtype=np.int)
......@@ -67,32 +67,29 @@ class SedWarehouse(object):
return module
def partial_clear_cache(self, flagged_param):
def partial_clear_cache(self, n_modules_max):
"""Clear the cache of SEDs that are not relevant anymore
To do partial clearing of the cache, we go through the entire cache and
delete the SEDs that correspond to a given parameter key/value.
delete the SEDs that have more than a given number of modules. This is
done by computing the index of the module that has a changed parameter.
This means that SEDs with this number of modules or more are not needed
anymore to compute new models and we can discard them. Passing 0 as an
argument empties the cache completely.
Parameters
----------
flagged_param: tuple
Tuple of 2 elements containing the parameter name and its value
n_modules_max: int
Maximum number of modules. All SED with at least this number of
modules have to be discarded
"""
if flagged_param is not None:
if n_modules_max > -1:
decoder = JSONDecoder()
# Going through all SEDs
for k in list(self.storage.dictionary.keys()):
list_params = decoder.decode(k)[1]
# Going through all parameters of a given SED. We start with
# the last module because it is more likely that the parameter
# we look for belongs to one of the last modules.
list_params.reverse()
for params in list_params:
if (flagged_param[0] in params.keys() and
params[flagged_param[0]] == flagged_param[1]):
self.storage.delete(k)
break
list_modules = decoder.decode(k)[0]
if len(list_modules) > n_modules_max:
self.storage.delete(k)
def get_sed(self, module_list, parameter_list):
"""Get the SED corresponding to the module and parameter lists
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment