Commit 9440c959 authored by Médéric Boquien's avatar Médéric Boquien
Browse files

Implement the analysis of the observations by blocks of models.

parent 86300034
......@@ -6,9 +6,12 @@
- An option `redshift\_decimals` is now provided in `pdf\_analysis` to indicate the number of decimals to round the observed redshifts to compute the grid of models. By default the model redshifts are rounded to two decimals but this can be insufficient at low z and/or when using narrow-band filters for instance. This only applies to the grid. The physical properties are still computed for the redshift at full precision. (Médéric Boquien)
- Bands with negative fluxes are now considered valid and are fitted as any other band. (Médéric Boquien)
- Allow the models to be computed by blocks in `savefluxes`. This can be useful when computing a very large grid and/or to split the results file into various smaller files as large files can be difficult to handle. The number of blocks is set with the `blocks` parameters in the pcigale.ini. (Médéric Boquien)
- Allow the observations to be analysed by blocks of models in `pdf\_analysis`. This is useful when computing a very large grid of models that would not fit in memory. The number of blocks is set with the `blocks` parameters in the pcigale.ini. (Médéric Boquien)
### Changed
- Make the timestamp more readable when moving the out/ directory. (Médéric Boquien)
- To accommodate the analysis of the observations by blocks, all models are now included in the estimation of the physical properties and no cut in chi² is done anymore. (Médéric Boquien)
- To accommodate the analysis of the observations by blocks, the `save_pdf` option has been eliminated. To plot PDF one needs to set `save_chi2` to True and then run `pcigale-plots pdf`. (Médéric Boquien)
### Fixed
- Corrected a typo that prevented `restframe\_parameters` from being listed among the available modules. (Médéric Boquien)
......
......@@ -28,19 +28,20 @@ reduced χ²) is given for each observation.
from collections import OrderedDict
import ctypes
import multiprocessing as mp
from multiprocessing.sharedctypes import RawArray
import time
import numpy as np
from ...utils import read_table
from .. import AnalysisModule
from .utils import save_results, analyse_chi2
from ...warehouse import SedWarehouse
from .workers import sed as worker_sed
from .workers import init_sed as init_worker_sed
from .workers import init_analysis as init_worker_analysis
from .workers import init_bestfit as init_worker_bestfit
from .workers import analysis as worker_analysis
from .workers import bestfit as worker_bestfit
from ...managers.results import ResultsManager
from ...managers.models import ModelsManager
from ...managers.observations import ObservationsManager
from ...managers.parameters import ParametersManager
......@@ -63,14 +64,8 @@ class PdfAnalysis(AnalysisModule):
)),
("save_chi2", (
"boolean()",
"If true, for each observation and each analysed variable save "
"the reduced chi2.",
False
)),
("save_pdf", (
"boolean()",
"If true, for each observation and each analysed variable save "
"the probability density function.",
"If true, for each observation and each analysed property, save "
"the raw chi2. It occupies ~15 MB/million models/variable.",
False
)),
("lim_flag", (
......@@ -92,9 +87,80 @@ class PdfAnalysis(AnalysisModule):
"compute the grid of models. To disable rounding give a negative "
"value. Do not round if you use narrow-band filters.",
2
)),
("blocks", (
"integer(min=1)",
"Number of blocks to compute the models and analyse the "
"observations. If there is enough memory, we strongly recommend "
"this to be set to 1.",
1
))
])
def _compute_models(self, conf, obs, params, iblock):
models = ModelsManager(conf, obs, params, iblock)
initargs = (models, time.time(), mp.Value('i', 0))
self._parallel_job(worker_sed, params.blocks[iblock], initargs,
init_worker_sed, conf['cores'])
return models
def _compute_bayes(self, conf, obs, models):
results = ResultsManager(models)
initargs = (models, results, time.time(), mp.Value('i', 0))
self._parallel_job(worker_analysis, obs, initargs,
init_worker_analysis, conf['cores'])
return results
def _compute_best(self, conf, obs, params, results):
initargs = (conf, params, obs, results, time.time(),
mp.Value('i', 0))
self._parallel_job(worker_bestfit, obs, initargs,
init_worker_bestfit, conf['cores'])
def _parallel_job(self, worker, items, initargs, initializer, ncores):
if ncores == 1: # Do not create a new process
initializer(*initargs)
for idx, item in enumerate(items):
worker(idx, item)
else: # run in parallel
with mp.Pool(processes=ncores, initializer=initializer,
initargs=initargs) as pool:
pool.starmap(worker, enumerate(items))
def _compute(self, conf, obs, params):
results = []
nblocks = len(params.blocks)
for iblock in range(nblocks):
print('\nProcessing block {}/{}...'.format(iblock + 1, nblocks))
# We keep the models if there is only one block. This allows to
# avoid recomputing the models when we do a mock analysis
if not hasattr(self, '_models'):
print("\nComputing models ...")
models = self._compute_models(conf, obs, params, iblock)
if nblocks == 1:
self._models = models
else:
print("\nLoading precomputed models")
models = self._models
print("\nEstimating the physical properties ...")
result = self._compute_bayes(conf, obs, models)
results.append(result)
print("\nBlock processed.")
print("\nEstimating physical properties on all blocks")
results = ResultsManager.merge(results)
print("\nComputing the best fit spectra")
self._compute_best(conf, obs, params, results)
return results
def process(self, conf):
"""Process with the psum analysis.
......@@ -117,146 +183,37 @@ class PdfAnalysis(AnalysisModule):
# Rename the output directory if it exists
self.prepare_dirs()
# Initalise variables from input arguments.
variables = conf['analysis_params']["variables"]
variables_nolog = [variable[:-4] if variable.endswith('_log') else
variable for variable in variables]
n_variables = len(variables)
save = {key: conf['analysis_params']["save_{}".format(key)] for key in
["best_sed", "chi2", "pdf"]}
lim_flag = conf['analysis_params']["lim_flag"]
filters = [name for name in conf['bands'] if not
name.endswith('_err')]
n_filters = len(filters)
# Read the observation table and complete it by adding error where
# none is provided and by adding the systematic deviation.
# Store the observations in a manager which sanitises the data, checks
# all the required fluxes are present, adding errors if needed,
# discarding invalid fluxes, etc.
obs = ObservationsManager(conf)
n_obs = len(obs.table)
obs.save('observations')
z = np.array(conf['sed_modules_params']['redshifting']['redshift'])
# The parameters manager allows us to retrieve the models parameters
# from a 1D index. This is useful in that we do not have to create
# a list of parameters as they are computed on-the-fly. It also has
# nice goodies such as finding the index of the first parameter to
# have changed between two indices or the number of models.
# Store the grid of parameters in a manager to facilitate the
# computation of the models
params = ParametersManager(conf)
n_params = params.size
# Retrieve an arbitrary SED to obtain the list of output parameters
warehouse = SedWarehouse()
sed = warehouse.get_sed(conf['sed_modules'], params.from_index(0))
info = list(sed.info.keys())
info.sort()
n_info = len(info)
del warehouse, sed
print("Computing the models fluxes...")
# Arrays where we store the data related to the models. For memory
# efficiency reasons, we use RawArrays that will be passed in argument
# to the pool. Each worker will fill a part of the RawArrays. It is
# important that there is no conflict and that two different workers do
# not write on the same section.
# We put the shape in a tuple along with the RawArray because workers
# need to know the shape to create the numpy array from the RawArray.
model_fluxes = (RawArray(ctypes.c_double, n_params * n_filters),
(n_filters, n_params))
model_variables = (RawArray(ctypes.c_double, n_params * n_variables),
(n_variables, n_params))
initargs = (params, filters, variables_nolog, model_fluxes,
model_variables, time.time(), mp.Value('i', 0))
if conf['cores'] == 1: # Do not create a new process
init_worker_sed(*initargs)
for idx in range(n_params):
worker_sed(idx)
else: # Compute the models in parallel
with mp.Pool(processes=conf['cores'], initializer=init_worker_sed,
initargs=initargs) as pool:
pool.map(worker_sed, range(n_params))
print("\nAnalysing models...")
# We use RawArrays for the same reason as previously
analysed_averages = (RawArray(ctypes.c_double, n_obs * n_variables),
(n_obs, n_variables))
analysed_std = (RawArray(ctypes.c_double, n_obs * n_variables),
(n_obs, n_variables))
best_fluxes = (RawArray(ctypes.c_double, n_obs * n_filters),
(n_obs, n_filters))
best_parameters = (RawArray(ctypes.c_double, n_obs * n_info),
(n_obs, n_info))
best_chi2 = (RawArray(ctypes.c_double, n_obs), (n_obs))
best_chi2_red = (RawArray(ctypes.c_double, n_obs), (n_obs))
initargs = (params, filters, variables, z, model_fluxes,
model_variables, time.time(), mp.Value('i', 0),
analysed_averages, analysed_std, best_fluxes,
best_parameters, best_chi2, best_chi2_red, save, lim_flag,
n_obs)
if conf['cores'] == 1: # Do not create a new process
init_worker_analysis(*initargs)
for idx, obs in enumerate(obs.table):
worker_analysis(idx, obs)
else: # Analyse observations in parallel
with mp.Pool(processes=conf['cores'],
initializer=init_worker_analysis,
initargs=initargs) as pool:
pool.starmap(worker_analysis, enumerate(obs.table))
analyse_chi2(best_chi2_red)
print("\nSaving results...")
results = self._compute(conf, obs, params)
results.best.analyse_chi2()
save_results("results", obs.table['id'], variables, analysed_averages,
analysed_std, best_chi2, best_chi2_red, best_parameters,
best_fluxes, filters, info)
print("\nSaving the analysis results...")
results.save("results")
if conf['analysis_params']['mock_flag'] is True:
print("\nAnalysing the mock observations...")
# For the mock analysis we do not save the ancillary files.
for k in ['best_sed', 'chi2', 'pdf']:
conf['analysis_params']["save_{}".format(k)] = False
# We replace the observations with a mock catalogue..
obs.generate_mock(results)
obs.save('mock_observations')
results = self._compute(conf, obs, params)
print("\nMock analysis...")
# For the mock analysis we do not save the ancillary files
for k in save:
save[k] = False
obs_fluxes = np.array([obs.table[name] for name in filters]).T
obs_errors = np.array([obs.table[name + "_err"] for name in
filters]).T
mock_fluxes = obs_fluxes.copy()
bestmod_fluxes = np.ctypeslib.as_array(best_fluxes[0])
bestmod_fluxes = bestmod_fluxes.reshape(best_fluxes[1])
wdata = np.where((obs_fluxes > 0.) & (obs_errors > 0.))
mock_fluxes[wdata] = np.random.normal(bestmod_fluxes[wdata],
obs_errors[wdata])
for idx, name in enumerate(filters):
obs.table[name] = mock_fluxes[:, idx]
initargs = (params, filters, variables, z, model_fluxes,
model_variables, time.time(), mp.Value('i', 0),
analysed_averages, analysed_std, best_fluxes,
best_parameters, best_chi2, best_chi2_red, save,
lim_flag, n_obs)
if conf['cores'] == 1: # Do not create a new process
init_worker_analysis(*initargs)
for idx, mock in enumerate(obs.table):
worker_analysis(idx, mock)
else: # Analyse observations in parallel
with mp.Pool(processes=conf['cores'],
initializer=init_worker_analysis,
initargs=initargs) as pool:
pool.starmap(worker_analysis, enumerate(obs.table))
print("\nSaving results...")
save_results("results_mock", obs.table['id'], variables,
analysed_averages, analysed_std, best_chi2,
best_chi2_red, best_parameters, best_fluxes, filters,
info)
print("\nSaving the mock analysis results...")
results.save("mock_results")
print("Run completed!")
......
......@@ -5,7 +5,11 @@
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
# Author: Yannick Roehlly & Médéric Boquien
from functools import lru_cache
import os
from astropy import log
from astropy.cosmology import WMAP7 as cosmo
from astropy.table import Table, Column
import numpy as np
from scipy import optimize
......@@ -13,228 +17,45 @@ from scipy.special import erf
log.setLevel('ERROR')
def save_best_sed(obsid, sed, norm):
"""Save the best SED to a VO table.
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
sed: SED object
Best SED
norm: float
Normalisation factor to scale the scale to the observations
"""
sed.to_fits("out/{}".format(obsid), mass=norm)
def _save_pdf(obsid, name, model_variable, likelihood):
"""Compute and save the PDF to a FITS file
We estimate the probability density functions (PDF) of the parameter from
a likelihood-weighted histogram.
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
name: string
Analysed variable name
model_variable: array
Values of the model variable
likelihood: 1D array
Likelihood of the "likely" models
def save_chi2(obs, variable, models, chi2, values):
"""Save the chi² and the associated physocal properties
"""
# We check how many unique parameter values are analysed and if
# less than Npdf (= 100), the PDF is initally built assuming a
# number of bins equal to the number of unique values for a given
# parameter
Npdf = 100
min_hist = np.min(model_variable)
max_hist = np.max(model_variable)
Nhist = min(Npdf, len(np.unique(model_variable)))
if min_hist == max_hist:
pdf_grid = np.array([min_hist, max_hist])
pdf_prob = np.array([1., 1.])
fname = 'out/{}_{}_chi2.npy'.format(obs['id'], variable)
if os.path.exists(fname):
data = np.memmap(fname, dtype=np.float64, mode='r+',
shape=(2, models.params.size))
else:
pdf_prob, pdf_grid = np.histogram(model_variable, Nhist,
(min_hist, max_hist),
weights=likelihood, density=True)
pdf_x = (pdf_grid[1:]+pdf_grid[:-1]) / 2.
pdf_grid = np.linspace(min_hist, max_hist, Npdf)
pdf_prob = np.interp(pdf_grid, pdf_x, pdf_prob)
if pdf_prob is None:
print("Can not compute PDF for observation <{}> and variable <{}>."
"".format(obsid, name))
else:
table = Table((
Column(pdf_grid, name=name),
Column(pdf_prob, name="probability density")
))
table.write("out/{}_{}_pdf.fits".format(obsid, name))
def save_pdf(obsid, names, mass_proportional, model_variables, scaling,
likelihood, wlikely):
"""Compute and save the PDF of analysed variables
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
names: list of strings
Analysed variables names
model_variables: array
Values of the model variables
likelihood: 1D array
Likelihood of the "likely" models
"""
for i, name in enumerate(names):
if name.endswith('_log'):
if name[:-4] in mass_proportional:
model_variable = np.log10(model_variables[i, :][wlikely] *
scaling[wlikely])
else:
model_variable = np.log10(model_variables[i, :][wlikely])
else:
if name in mass_proportional:
model_variable = (model_variables[i, :][wlikely] *
scaling[wlikely])
else:
model_variable = model_variables[i, :][wlikely]
_save_pdf(obsid, name, model_variable, likelihood)
def _save_chi2(obsid, name, model_variable, chi2):
"""Save the best reduced χ² versus an analysed variable
data = np.memmap(fname, dtype=np.float64, mode='w+',
shape=(2, models.params.size))
data[0, models.block] = chi2
data[1, models.block] = values
@lru_cache(maxsize=None)
def compute_corr_dz(model_z, obs_z):
"""The mass-dependent physical properties are computed assuming the
redshift of the model. However because we round the observed redshifts to
two decimals, there can be a difference of 0.005 in redshift between the
models and the actual observation. At low redshift, this can cause a
discrepancy in the mass-dependent physical properties: ~0.35 dex at z=0.010
vs 0.015 for instance. Therefore we correct these physical quantities by
multiplying them by corr_dz.
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
name: string
Analysed variable name
model_variable: array
Values of the model variable
chi2:
Reduced χ²
model_z: float
Redshift of the model.
obs_z: float
Redshift of the observed object.
"""
table = Table((Column(model_variable, name=name),
Column(chi2, name="chi2")))
table.write("out/{}_{}_chi2.fits".format(obsid, name))
def save_chi2(obsid, names, mass_proportional, model_variables, scaling, chi2):
"""Save the best reduced χ² versus analysed variables
Parameters
----------
obsid: string
Name of the object. Used to prepend the output file name
name: list of strings
Analysed variables names
model_variables: array
Values of the model variables
scaling: array
Scaling factors of the models
chi2:
Reduced χ²
"""
for i, name in enumerate(names):
if name.endswith('_log'):
if name[:-4] in mass_proportional:
model_variable = np.log10(model_variables[i, :] * scaling)
else:
model_variable = np.log10(model_variables[i, :])
else:
if name in mass_proportional:
model_variable = model_variables[i, :] * scaling
else:
model_variable = model_variables[i, :]
_save_chi2(obsid, name, model_variable, chi2)
def save_results(filename, obsid, bayes_variables, bayes_mean, bayes_std, chi2,
chi2_red, best, fluxes, filters, info_keys):
"""Save the estimated values derived from the analysis of the PDF and the
parameters associated with the best fit. An simple text file and a FITS
file are generated.
Parameters
----------
filename: string
Name of the output file without the extension
obsid: table column
Names of the objects
bayes_variables: list
Analysed variable names
bayes_mean: RawArray
Analysed variables values estimates
bayes_std: RawArray
Analysed variables errors estimates
chi2: RawArray
Best χ² for each object
chi2_red: RawArray
Best reduced χ² for each object
best: RawArray
All variables corresponding to the best SED
fluxes: RawArray
Fluxes in all bands for the best SED
filters: list
Filters used to compute the fluxes
info_keys: list
Parameters names
"""
np_bayes_mean = np.ctypeslib.as_array(bayes_mean[0])
np_bayes_mean = np_bayes_mean.reshape(bayes_mean[1])
np_bayes_std = np.ctypeslib.as_array(bayes_std[0])
np_bayes_std = np_bayes_std.reshape(bayes_std[1])
np_fluxes = np.ctypeslib.as_array(fluxes[0])
np_fluxes = np_fluxes.reshape(fluxes[1])
np_best = np.ctypeslib.as_array(best[0])
np_best = np_best.reshape(best[1])
np_chi2 = np.ctypeslib.as_array(chi2[0])
np_chi2_red = np.ctypeslib.as_array(chi2_red[0])
table = Table()
table.add_column(Column(obsid.data, name="id"))
for idx, name in enumerate(bayes_variables):
table.add_column(Column(np_bayes_mean[:, idx], name="bayes."+name))
table.add_column(Column(np_bayes_std[:, idx],
name="bayes."+name+"_err"))
table.add_column(Column(np_chi2, name="best.chi_square"))
table.add_column(Column(np_chi2_red, name="best.reduced_chi_square"))
for idx, name in enumerate(info_keys):
table.add_column(Column(np_best[:, idx], name="best."+name))
for idx, name in enumerate(filters):
table.add_column(Column(np_fluxes[:, idx], name="best."+name,
unit='mJy'))
table.write('out/'+filename+".txt", format='ascii.fixed_width',
delimiter=None)
table.write('out/'+filename+".fits", format='fits')
if model_z == obs_z:
return 1.
if model_z > 0.:
return (cosmo.luminosity_distance(obs_z).value /
cosmo.luminosity_distance(model_z).value)**2.
return (cosmo.luminosity_distance(obs_z).value * 1e5)**2.
def dchi2_over_ds2(s, obs_fluxes, obs_errors, mod_fluxes):
......@@ -304,24 +125,6 @@ def dchi2_over_ds2(s, obs_fluxes, obs_errors, mod_fluxes):
return func
def analyse_chi2(chi2):
"""Function to analyse the best chi^2 and find out whether what fraction of
objects seem to be overconstrainted.
Parameters
----------
chi2: RawArray
Contains the reduced chi^2
"""
chi2_red = np.ctypeslib.as_array(chi2[0])
# If low values of reduced chi^2, it means that the data are overfitted
# Errors might be under-estimated or not enough valid data.
print("\n{}% of the objects have chi^2_red~0 and {}% chi^2_red<0.5"
.format(np.round((chi2_red < 1e-12).sum()/chi2_red.size, 1),