From 9440c95957a6c555fe0f3ab2c9890245fdf12f95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A9d=C3=A9ric=20Boquien?= Date: Mon, 27 Mar 2017 11:46:48 -0300 Subject: [PATCH] Implement the analysis of the observations by blocks of models. --- CHANGELOG.md | 3 + .../analysis_modules/pdf_analysis/__init__.py | 243 +++++------ .../analysis_modules/pdf_analysis/utils.py | 267 ++---------- .../analysis_modules/pdf_analysis/workers.py | 407 ++++++++---------- pcigale/analysis_modules/utils.py | 38 -- pcigale_plots/__init__.py | 37 +- 6 files changed, 340 insertions(+), 655 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65866022..fc1016cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,12 @@ - An option `redshift\_decimals` is now provided in `pdf\_analysis` to indicate the number of decimals to round the observed redshifts to compute the grid of models. By default the model redshifts are rounded to two decimals but this can be insufficient at low z and/or when using narrow-band filters for instance. This only applies to the grid. The physical properties are still computed for the redshift at full precision. (Médéric Boquien) - Bands with negative fluxes are now considered valid and are fitted as any other band. (Médéric Boquien) - Allow the models to be computed by blocks in `savefluxes`. This can be useful when computing a very large grid and/or to split the results file into various smaller files as large files can be difficult to handle. The number of blocks is set with the `blocks` parameters in the pcigale.ini. (Médéric Boquien) +- Allow the observations to be analysed by blocks of models in `pdf\_analysis`. This is useful when computing a very large grid of models that would not fit in memory. The number of blocks is set with the `blocks` parameters in the pcigale.ini. (Médéric Boquien) ### Changed - Make the timestamp more readable when moving the out/ directory. (Médéric Boquien) +- To accommodate the analysis of the observations by blocks, all models are now included in the estimation of the physical properties and no cut in chi² is done anymore. (Médéric Boquien) +- To accommodate the analysis of the observations by blocks, the `save_pdf` option has been eliminated. To plot PDF one needs to set `save_chi2` to True and then run `pcigale-plots pdf`. (Médéric Boquien) ### Fixed - Corrected a typo that prevented `restframe\_parameters` from being listed among the available modules. (Médéric Boquien) diff --git a/pcigale/analysis_modules/pdf_analysis/__init__.py b/pcigale/analysis_modules/pdf_analysis/__init__.py index 4d9b72e6..a1e421d5 100644 --- a/pcigale/analysis_modules/pdf_analysis/__init__.py +++ b/pcigale/analysis_modules/pdf_analysis/__init__.py @@ -28,19 +28,20 @@ reduced χ²) is given for each observation. from collections import OrderedDict import ctypes import multiprocessing as mp -from multiprocessing.sharedctypes import RawArray import time import numpy as np from ...utils import read_table from .. import AnalysisModule -from .utils import save_results, analyse_chi2 -from ...warehouse import SedWarehouse from .workers import sed as worker_sed from .workers import init_sed as init_worker_sed from .workers import init_analysis as init_worker_analysis +from .workers import init_bestfit as init_worker_bestfit from .workers import analysis as worker_analysis +from .workers import bestfit as worker_bestfit +from ...managers.results import ResultsManager +from ...managers.models import ModelsManager from ...managers.observations import ObservationsManager from ...managers.parameters import ParametersManager @@ -63,14 +64,8 @@ class PdfAnalysis(AnalysisModule): )), ("save_chi2", ( "boolean()", - "If true, for each observation and each analysed variable save " - "the reduced chi2.", - False - )), - ("save_pdf", ( - "boolean()", - "If true, for each observation and each analysed variable save " - "the probability density function.", + "If true, for each observation and each analysed property, save " + "the raw chi2. It occupies ~15 MB/million models/variable.", False )), ("lim_flag", ( @@ -92,9 +87,80 @@ class PdfAnalysis(AnalysisModule): "compute the grid of models. To disable rounding give a negative " "value. Do not round if you use narrow-band filters.", 2 + )), + ("blocks", ( + "integer(min=1)", + "Number of blocks to compute the models and analyse the " + "observations. If there is enough memory, we strongly recommend " + "this to be set to 1.", + 1 )) ]) + + def _compute_models(self, conf, obs, params, iblock): + models = ModelsManager(conf, obs, params, iblock) + + initargs = (models, time.time(), mp.Value('i', 0)) + self._parallel_job(worker_sed, params.blocks[iblock], initargs, + init_worker_sed, conf['cores']) + + return models + + def _compute_bayes(self, conf, obs, models): + results = ResultsManager(models) + + initargs = (models, results, time.time(), mp.Value('i', 0)) + self._parallel_job(worker_analysis, obs, initargs, + init_worker_analysis, conf['cores']) + + return results + + def _compute_best(self, conf, obs, params, results): + initargs = (conf, params, obs, results, time.time(), + mp.Value('i', 0)) + self._parallel_job(worker_bestfit, obs, initargs, + init_worker_bestfit, conf['cores']) + + def _parallel_job(self, worker, items, initargs, initializer, ncores): + if ncores == 1: # Do not create a new process + initializer(*initargs) + for idx, item in enumerate(items): + worker(idx, item) + else: # run in parallel + with mp.Pool(processes=ncores, initializer=initializer, + initargs=initargs) as pool: + pool.starmap(worker, enumerate(items)) + + def _compute(self, conf, obs, params): + results = [] + nblocks = len(params.blocks) + for iblock in range(nblocks): + print('\nProcessing block {}/{}...'.format(iblock + 1, nblocks)) + # We keep the models if there is only one block. This allows to + # avoid recomputing the models when we do a mock analysis + if not hasattr(self, '_models'): + print("\nComputing models ...") + models = self._compute_models(conf, obs, params, iblock) + if nblocks == 1: + self._models = models + else: + print("\nLoading precomputed models") + models = self._models + + print("\nEstimating the physical properties ...") + result = self._compute_bayes(conf, obs, models) + results.append(result) + print("\nBlock processed.") + + print("\nEstimating physical properties on all blocks") + results = ResultsManager.merge(results) + + print("\nComputing the best fit spectra") + self._compute_best(conf, obs, params, results) + + return results + def process(self, conf): """Process with the psum analysis. @@ -117,146 +183,37 @@ class PdfAnalysis(AnalysisModule): # Rename the output directory if it exists self.prepare_dirs() - # Initalise variables from input arguments. - variables = conf['analysis_params']["variables"] - variables_nolog = [variable[:-4] if variable.endswith('_log') else - variable for variable in variables] - n_variables = len(variables) - save = {key: conf['analysis_params']["save_{}".format(key)] for key in - ["best_sed", "chi2", "pdf"]} - lim_flag = conf['analysis_params']["lim_flag"] - - filters = [name for name in conf['bands'] if not - name.endswith('_err')] - n_filters = len(filters) - - # Read the observation table and complete it by adding error where - # none is provided and by adding the systematic deviation. + # Store the observations in a manager which sanitises the data, checks + # all the required fluxes are present, adding errors if needed, + # discarding invalid fluxes, etc. obs = ObservationsManager(conf) - n_obs = len(obs.table) + obs.save('observations') - z = np.array(conf['sed_modules_params']['redshifting']['redshift']) - - # The parameters manager allows us to retrieve the models parameters - # from a 1D index. This is useful in that we do not have to create - # a list of parameters as they are computed on-the-fly. It also has - # nice goodies such as finding the index of the first parameter to - # have changed between two indices or the number of models. + # Store the grid of parameters in a manager to facilitate the + # computation of the models params = ParametersManager(conf) - n_params = params.size - - # Retrieve an arbitrary SED to obtain the list of output parameters - warehouse = SedWarehouse() - sed = warehouse.get_sed(conf['sed_modules'], params.from_index(0)) - info = list(sed.info.keys()) - info.sort() - n_info = len(info) - del warehouse, sed - - print("Computing the models fluxes...") - - # Arrays where we store the data related to the models. For memory - # efficiency reasons, we use RawArrays that will be passed in argument - # to the pool. Each worker will fill a part of the RawArrays. It is - # important that there is no conflict and that two different workers do - # not write on the same section. - # We put the shape in a tuple along with the RawArray because workers - # need to know the shape to create the numpy array from the RawArray. - model_fluxes = (RawArray(ctypes.c_double, n_params * n_filters), - (n_filters, n_params)) - model_variables = (RawArray(ctypes.c_double, n_params * n_variables), - (n_variables, n_params)) - - initargs = (params, filters, variables_nolog, model_fluxes, - model_variables, time.time(), mp.Value('i', 0)) - if conf['cores'] == 1: # Do not create a new process - init_worker_sed(*initargs) - for idx in range(n_params): - worker_sed(idx) - else: # Compute the models in parallel - with mp.Pool(processes=conf['cores'], initializer=init_worker_sed, - initargs=initargs) as pool: - pool.map(worker_sed, range(n_params)) - - print("\nAnalysing models...") - - # We use RawArrays for the same reason as previously - analysed_averages = (RawArray(ctypes.c_double, n_obs * n_variables), - (n_obs, n_variables)) - analysed_std = (RawArray(ctypes.c_double, n_obs * n_variables), - (n_obs, n_variables)) - best_fluxes = (RawArray(ctypes.c_double, n_obs * n_filters), - (n_obs, n_filters)) - best_parameters = (RawArray(ctypes.c_double, n_obs * n_info), - (n_obs, n_info)) - best_chi2 = (RawArray(ctypes.c_double, n_obs), (n_obs)) - best_chi2_red = (RawArray(ctypes.c_double, n_obs), (n_obs)) - - initargs = (params, filters, variables, z, model_fluxes, - model_variables, time.time(), mp.Value('i', 0), - analysed_averages, analysed_std, best_fluxes, - best_parameters, best_chi2, best_chi2_red, save, lim_flag, - n_obs) - if conf['cores'] == 1: # Do not create a new process - init_worker_analysis(*initargs) - for idx, obs in enumerate(obs.table): - worker_analysis(idx, obs) - else: # Analyse observations in parallel - with mp.Pool(processes=conf['cores'], - initializer=init_worker_analysis, - initargs=initargs) as pool: - pool.starmap(worker_analysis, enumerate(obs.table)) - - analyse_chi2(best_chi2_red) - print("\nSaving results...") + results = self._compute(conf, obs, params) + results.best.analyse_chi2() - save_results("results", obs.table['id'], variables, analysed_averages, - analysed_std, best_chi2, best_chi2_red, best_parameters, - best_fluxes, filters, info) + print("\nSaving the analysis results...") + results.save("results") if conf['analysis_params']['mock_flag'] is True: + print("\nAnalysing the mock observations...") + + # For the mock analysis we do not save the ancillary files. + for k in ['best_sed', 'chi2', 'pdf']: + conf['analysis_params']["save_{}".format(k)] = False + + # We replace the observations with a mock catalogue.. + obs.generate_mock(results) + obs.save('mock_observations') + + results = self._compute(conf, obs, params) - print("\nMock analysis...") - - # For the mock analysis we do not save the ancillary files - for k in save: - save[k] = False - - obs_fluxes = np.array([obs.table[name] for name in filters]).T - obs_errors = np.array([obs.table[name + "_err"] for name in - filters]).T - mock_fluxes = obs_fluxes.copy() - bestmod_fluxes = np.ctypeslib.as_array(best_fluxes[0]) - bestmod_fluxes = bestmod_fluxes.reshape(best_fluxes[1]) - wdata = np.where((obs_fluxes > 0.) & (obs_errors > 0.)) - mock_fluxes[wdata] = np.random.normal(bestmod_fluxes[wdata], - obs_errors[wdata]) - - for idx, name in enumerate(filters): - obs.table[name] = mock_fluxes[:, idx] - - initargs = (params, filters, variables, z, model_fluxes, - model_variables, time.time(), mp.Value('i', 0), - analysed_averages, analysed_std, best_fluxes, - best_parameters, best_chi2, best_chi2_red, save, - lim_flag, n_obs) - if conf['cores'] == 1: # Do not create a new process - init_worker_analysis(*initargs) - for idx, mock in enumerate(obs.table): - worker_analysis(idx, mock) - else: # Analyse observations in parallel - with mp.Pool(processes=conf['cores'], - initializer=init_worker_analysis, - initargs=initargs) as pool: - pool.starmap(worker_analysis, enumerate(obs.table)) - - print("\nSaving results...") - - save_results("results_mock", obs.table['id'], variables, - analysed_averages, analysed_std, best_chi2, - best_chi2_red, best_parameters, best_fluxes, filters, - info) + print("\nSaving the mock analysis results...") + results.save("mock_results") print("Run completed!") diff --git a/pcigale/analysis_modules/pdf_analysis/utils.py b/pcigale/analysis_modules/pdf_analysis/utils.py index 738d7ed7..493fcea2 100644 --- a/pcigale/analysis_modules/pdf_analysis/utils.py +++ b/pcigale/analysis_modules/pdf_analysis/utils.py @@ -5,7 +5,11 @@ # Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt # Author: Yannick Roehlly & Médéric Boquien +from functools import lru_cache +import os + from astropy import log +from astropy.cosmology import WMAP7 as cosmo from astropy.table import Table, Column import numpy as np from scipy import optimize @@ -13,228 +17,45 @@ from scipy.special import erf log.setLevel('ERROR') - -def save_best_sed(obsid, sed, norm): - """Save the best SED to a VO table. - - Parameters - ---------- - obsid: string - Name of the object. Used to prepend the output file name - sed: SED object - Best SED - norm: float - Normalisation factor to scale the scale to the observations - - """ - sed.to_fits("out/{}".format(obsid), mass=norm) - - -def _save_pdf(obsid, name, model_variable, likelihood): - """Compute and save the PDF to a FITS file - - We estimate the probability density functions (PDF) of the parameter from - a likelihood-weighted histogram. - - Parameters - ---------- - obsid: string - Name of the object. Used to prepend the output file name - name: string - Analysed variable name - model_variable: array - Values of the model variable - likelihood: 1D array - Likelihood of the "likely" models +def save_chi2(obs, variable, models, chi2, values): + """Save the chi² and the associated physocal properties """ - - # We check how many unique parameter values are analysed and if - # less than Npdf (= 100), the PDF is initally built assuming a - # number of bins equal to the number of unique values for a given - # parameter - Npdf = 100 - min_hist = np.min(model_variable) - max_hist = np.max(model_variable) - Nhist = min(Npdf, len(np.unique(model_variable))) - - if min_hist == max_hist: - pdf_grid = np.array([min_hist, max_hist]) - pdf_prob = np.array([1., 1.]) + fname = 'out/{}_{}_chi2.npy'.format(obs['id'], variable) + if os.path.exists(fname): + data = np.memmap(fname, dtype=np.float64, mode='r+', + shape=(2, models.params.size)) else: - pdf_prob, pdf_grid = np.histogram(model_variable, Nhist, - (min_hist, max_hist), - weights=likelihood, density=True) - pdf_x = (pdf_grid[1:]+pdf_grid[:-1]) / 2. - - pdf_grid = np.linspace(min_hist, max_hist, Npdf) - pdf_prob = np.interp(pdf_grid, pdf_x, pdf_prob) - - if pdf_prob is None: - print("Can not compute PDF for observation <{}> and variable <{}>." - "".format(obsid, name)) - else: - table = Table(( - Column(pdf_grid, name=name), - Column(pdf_prob, name="probability density") - )) - table.write("out/{}_{}_pdf.fits".format(obsid, name)) - - -def save_pdf(obsid, names, mass_proportional, model_variables, scaling, - likelihood, wlikely): - """Compute and save the PDF of analysed variables - - Parameters - ---------- - obsid: string - Name of the object. Used to prepend the output file name - names: list of strings - Analysed variables names - model_variables: array - Values of the model variables - likelihood: 1D array - Likelihood of the "likely" models - - """ - for i, name in enumerate(names): - if name.endswith('_log'): - if name[:-4] in mass_proportional: - model_variable = np.log10(model_variables[i, :][wlikely] * - scaling[wlikely]) - else: - model_variable = np.log10(model_variables[i, :][wlikely]) - else: - if name in mass_proportional: - model_variable = (model_variables[i, :][wlikely] * - scaling[wlikely]) - else: - model_variable = model_variables[i, :][wlikely] - - _save_pdf(obsid, name, model_variable, likelihood) - - -def _save_chi2(obsid, name, model_variable, chi2): - """Save the best reduced χ² versus an analysed variable + data = np.memmap(fname, dtype=np.float64, mode='w+', + shape=(2, models.params.size)) + data[0, models.block] = chi2 + data[1, models.block] = values + + +@lru_cache(maxsize=None) +def compute_corr_dz(model_z, obs_z): + """The mass-dependent physical properties are computed assuming the + redshift of the model. However because we round the observed redshifts to + two decimals, there can be a difference of 0.005 in redshift between the + models and the actual observation. At low redshift, this can cause a + discrepancy in the mass-dependent physical properties: ~0.35 dex at z=0.010 + vs 0.015 for instance. Therefore we correct these physical quantities by + multiplying them by corr_dz. Parameters ---------- - obsid: string - Name of the object. Used to prepend the output file name - name: string - Analysed variable name - model_variable: array - Values of the model variable - chi2: - Reduced χ² + model_z: float + Redshift of the model. + obs_z: float + Redshift of the observed object. """ - table = Table((Column(model_variable, name=name), - Column(chi2, name="chi2"))) - table.write("out/{}_{}_chi2.fits".format(obsid, name)) - - -def save_chi2(obsid, names, mass_proportional, model_variables, scaling, chi2): - """Save the best reduced χ² versus analysed variables - - Parameters - ---------- - obsid: string - Name of the object. Used to prepend the output file name - name: list of strings - Analysed variables names - model_variables: array - Values of the model variables - scaling: array - Scaling factors of the models - chi2: - Reduced χ² - """ - for i, name in enumerate(names): - if name.endswith('_log'): - if name[:-4] in mass_proportional: - model_variable = np.log10(model_variables[i, :] * scaling) - else: - model_variable = np.log10(model_variables[i, :]) - else: - if name in mass_proportional: - model_variable = model_variables[i, :] * scaling - else: - model_variable = model_variables[i, :] - - _save_chi2(obsid, name, model_variable, chi2) - - -def save_results(filename, obsid, bayes_variables, bayes_mean, bayes_std, chi2, - chi2_red, best, fluxes, filters, info_keys): - """Save the estimated values derived from the analysis of the PDF and the - parameters associated with the best fit. An simple text file and a FITS - file are generated. - - Parameters - ---------- - filename: string - Name of the output file without the extension - obsid: table column - Names of the objects - bayes_variables: list - Analysed variable names - bayes_mean: RawArray - Analysed variables values estimates - bayes_std: RawArray - Analysed variables errors estimates - chi2: RawArray - Best χ² for each object - chi2_red: RawArray - Best reduced χ² for each object - best: RawArray - All variables corresponding to the best SED - fluxes: RawArray - Fluxes in all bands for the best SED - filters: list - Filters used to compute the fluxes - info_keys: list - Parameters names - - """ - np_bayes_mean = np.ctypeslib.as_array(bayes_mean[0]) - np_bayes_mean = np_bayes_mean.reshape(bayes_mean[1]) - - np_bayes_std = np.ctypeslib.as_array(bayes_std[0]) - np_bayes_std = np_bayes_std.reshape(bayes_std[1]) - - np_fluxes = np.ctypeslib.as_array(fluxes[0]) - np_fluxes = np_fluxes.reshape(fluxes[1]) - - np_best = np.ctypeslib.as_array(best[0]) - np_best = np_best.reshape(best[1]) - - np_chi2 = np.ctypeslib.as_array(chi2[0]) - - np_chi2_red = np.ctypeslib.as_array(chi2_red[0]) - - table = Table() - - table.add_column(Column(obsid.data, name="id")) - - for idx, name in enumerate(bayes_variables): - table.add_column(Column(np_bayes_mean[:, idx], name="bayes."+name)) - table.add_column(Column(np_bayes_std[:, idx], - name="bayes."+name+"_err")) - - table.add_column(Column(np_chi2, name="best.chi_square")) - table.add_column(Column(np_chi2_red, name="best.reduced_chi_square")) - - for idx, name in enumerate(info_keys): - table.add_column(Column(np_best[:, idx], name="best."+name)) - - for idx, name in enumerate(filters): - table.add_column(Column(np_fluxes[:, idx], name="best."+name, - unit='mJy')) - - table.write('out/'+filename+".txt", format='ascii.fixed_width', - delimiter=None) - table.write('out/'+filename+".fits", format='fits') + if model_z == obs_z: + return 1. + if model_z > 0.: + return (cosmo.luminosity_distance(obs_z).value / + cosmo.luminosity_distance(model_z).value)**2. + return (cosmo.luminosity_distance(obs_z).value * 1e5)**2. def dchi2_over_ds2(s, obs_fluxes, obs_errors, mod_fluxes): @@ -304,24 +125,6 @@ def dchi2_over_ds2(s, obs_fluxes, obs_errors, mod_fluxes): return func -def analyse_chi2(chi2): - """Function to analyse the best chi^2 and find out whether what fraction of - objects seem to be overconstrainted. - - Parameters - ---------- - chi2: RawArray - Contains the reduced chi^2 - - """ - chi2_red = np.ctypeslib.as_array(chi2[0]) - # If low values of reduced chi^2, it means that the data are overfitted - # Errors might be under-estimated or not enough valid data. - print("\n{}% of the objects have chi^2_red~0 and {}% chi^2_red<0.5" - .format(np.round((chi2_red < 1e-12).sum()/chi2_red.size, 1), - np.round((chi2_red < 0.5).sum()/chi2_red.size, 1))) - - def _compute_scaling(model_fluxes, obs_fluxes, obs_errors): """Compute the scaling factor to be applied to the model fluxes to best fit the observations. Note that we look over the bands to avoid the creation of diff --git a/pcigale/analysis_modules/pdf_analysis/workers.py b/pcigale/analysis_modules/pdf_analysis/workers.py index 74aed11f..cd7308f1 100644 --- a/pcigale/analysis_modules/pdf_analysis/workers.py +++ b/pcigale/analysis_modules/pdf_analysis/workers.py @@ -8,278 +8,179 @@ import time -from astropy.cosmology import WMAP7 as cosmology import numpy as np -import scipy.stats as st from ..utils import nothread -from .utils import (save_best_sed, save_pdf, save_chi2, compute_chi2, - weighted_param) +from .utils import save_chi2, compute_corr_dz, compute_chi2, weighted_param from ...warehouse import SedWarehouse -def init_sed(params, filters, analysed, fluxes, variables, t_begin, n_computed): - """Initializer of the pool of processes. It is mostly used to convert - RawArrays into numpy arrays. The latter are defined as global variables to - be accessible from the workers. +def init_sed(models, t0, ncomputed): + """Initializer of the pool of processes to share variables between workers. Parameters ---------- - params: ParametersManager - Manages the parameters from a 1D index. - filters: List - Contains the names of the filters to compute the fluxes. - analysed: list - Variable names to be analysed. - fluxes: RawArray and tuple containing the shape - Fluxes of individual models. Shared among workers. - variables: RawArray and tuple containing the shape - Values of the analysed variables. Shared among workers. - n_computed: Value - Number of computed models. Shared among workers. - t_begin: float + models: ModelsManagers + Manages the storage of the computed models (fluxes and properties). + t0: float Time of the beginning of the computation. + ncomputed: Value + Number of computed models. Shared among workers. """ - global gbl_model_fluxes, gbl_model_variables, gbl_n_computed, gbl_t_begin - global gbl_params, gbl_previous_idx, gbl_filters, gbl_analysed_variables - global gbl_warehouse + global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_obs + global gbl_properties, gbl_t0, gbl_ncomputed # Limit the number of threads to 1 if we use MKL in order to limit the # oversubscription of the CPU/RAM. nothread() - gbl_model_fluxes = np.ctypeslib.as_array(fluxes[0]) - gbl_model_fluxes = gbl_model_fluxes.reshape(fluxes[1]) - - gbl_model_variables = np.ctypeslib.as_array(variables[0]) - gbl_model_variables = gbl_model_variables.reshape(variables[1]) - - gbl_n_computed = n_computed - gbl_t_begin = t_begin - - gbl_params = params - gbl_previous_idx = -1 - - gbl_filters = filters - gbl_analysed_variables = analysed - gbl_warehouse = SedWarehouse() + gbl_models = models + gbl_obs = models.obs + gbl_properties = [prop[:-4] if prop.endswith('_log') else prop for prop in + models.propertiesnames] + gbl_t0 = t0 + gbl_ncomputed = ncomputed -def init_analysis(params, filters, analysed, z, fluxes, variables, - t_begin, n_computed, analysed_averages, analysed_std, - best_fluxes, best_parameters, best_chi2, best_chi2_red, save, - lim_flag, n_obs): - """Initializer of the pool of processes. It is mostly used to convert - RawArrays into numpy arrays. The latter are defined as global variables to - be accessible from the workers. +def init_analysis(models, results, t0, ncomputed): + """Initializer of the pool of processes to share variables between workers. Parameters ---------- - params: ParametersManager - Manages the parameters from a 1D index. - filters: list - Contains filters to compute the fluxes. - analysed: list - Variable names to be analysed - z: RawArray and tuple containing the shape. - Redshifts of individual models. Shared among workers. - fluxes: RawArray - Fluxes of individual models. Shared among workers. - variables: RawArray and tuple containing the shape - Values of the analysed variables. Shared among workers. - t_begin: float + models: ModelsManagers + Manages the storage of the computed models (fluxes and properties). + results: ResultsManager + Contains the estimates and errors on the properties. + t0: float Time of the beginning of the computation. - n_computed: Value + ncomputed: Value Number of computed models. Shared among workers. - analysed_averages: RawArray - Analysed values for each observation. - analysed_std: RawArray - Standard devriation values for each observation. - best_fluxes: RawArray - Best fluxes for each observation. - best_parameters: RawArray - Best parameters for each observation. - best_chi2: RawArray - Best χ² for each observation. - best_chi2_red: RawArray - Best reduced χ² for each observation. - save: dictionary - Contains booleans indicating whether we need to save some data related - to given models. - n_obs: int - Number of observations. """ - init_sed(params, filters, analysed, fluxes, variables, t_begin, n_computed) - global gbl_z, gbl_analysed_averages, gbl_analysed_std - global gbl_best_fluxes, gbl_best_parameters, gbl_best_chi2 - global gbl_best_chi2_red, gbl_save, gbl_n_obs, gbl_lim_flag, gbl_keys - - gbl_analysed_averages = np.ctypeslib.as_array(analysed_averages[0]) - gbl_analysed_averages = gbl_analysed_averages.reshape(analysed_averages[1]) - - gbl_analysed_std = np.ctypeslib.as_array(analysed_std[0]) - gbl_analysed_std = gbl_analysed_std.reshape(analysed_std[1]) + global gbl_models, gbl_results, gbl_t0, gbl_ncomputed - gbl_best_fluxes = np.ctypeslib.as_array(best_fluxes[0]) - gbl_best_fluxes = gbl_best_fluxes.reshape(best_fluxes[1]) + gbl_models = models + gbl_results = results + gbl_t0 = t0 + gbl_ncomputed = ncomputed - gbl_best_parameters = np.ctypeslib.as_array(best_parameters[0]) - gbl_best_parameters = gbl_best_parameters.reshape(best_parameters[1]) - gbl_best_chi2 = np.ctypeslib.as_array(best_chi2[0]) +def init_bestfit(conf, params, observations, results, t0, ncomputed): + """Initializer of the pool of processes to share variables between workers. - gbl_best_chi2_red = np.ctypeslib.as_array(best_chi2_red[0]) - - gbl_z = z + Parameters + ---------- + conf: dict + Contains the pcigale.ini configuration. + params: ParametersManager + Manages the parameters from a 1D index. + observations: astropy.Table + Contains the observations including the filter names. + ncomputed: Value + Number of computed models. Shared among workers. + t0: float + Time of the beginning of the computation. + results: ResultsManager + Contains the estimates and errors on the properties. + offset: integer + Offset of the block to retrieve the global model index. - gbl_save = save - gbl_lim_flag = lim_flag + """ + global gbl_previous_idx, gbl_warehouse, gbl_conf, gbl_params, gbl_obs + global gbl_results, gbl_t0, gbl_ncomputed - gbl_n_obs = n_obs - gbl_keys = None + gbl_previous_idx = -1 + gbl_warehouse = SedWarehouse() + gbl_conf = conf + gbl_params = params + gbl_obs = observations + gbl_results = results + gbl_t0 = t0 + gbl_ncomputed = ncomputed -def sed(idx): - """Worker process to retrieve a SED and affect the relevant data to shared - RawArrays. +def sed(idx, midx): + """Worker process to retrieve a SED and affect the relevant data to an + instance of ModelsManager. Parameters ---------- idx: int - Index of the model to retrieve its parameters from the global variable + Index of the model within the current block of models. + midx: int + Global index of the model. """ global gbl_previous_idx if gbl_previous_idx > -1: gbl_warehouse.partial_clear_cache( - gbl_params.index_module_changed(gbl_previous_idx, idx)) - gbl_previous_idx = idx - - sed = gbl_warehouse.get_sed(gbl_params.modules, - gbl_params.from_index(idx)) + gbl_models.params.index_module_changed(gbl_previous_idx, midx)) + gbl_previous_idx = midx + sed = gbl_warehouse.get_sed(gbl_models.params.modules, + gbl_models.params.from_index(midx)) if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']: - gbl_model_fluxes[:, idx] = np.full(len(gbl_filters), np.nan) - gbl_model_variables[:, idx] = np.full(len(gbl_analysed_variables), - np.nan) + gbl_models.fluxes[:, idx] = np.full(len(gbl_obs.bands), np.nan) + gbl_models.properties[:, idx] = np.full(len(gbl_properties), np.nan) else: - gbl_model_fluxes[:, idx] = np.array([sed.compute_fnu(filter_) for - filter_ in gbl_filters]) - gbl_model_variables[:, idx] = np.array([sed.info[name] - for name in - gbl_analysed_variables]) - - with gbl_n_computed.get_lock(): - gbl_n_computed.value += 1 - n_computed = gbl_n_computed.value - if n_computed % 250 == 0 or n_computed == gbl_params.size: - t_elapsed = time.time() - gbl_t_begin + gbl_models.fluxes[:, idx] = [sed.compute_fnu(filter_) + for filter_ in gbl_obs.bands] + gbl_models.properties[:, idx] = [sed.info[name] + for name in gbl_properties] + + with gbl_ncomputed.get_lock(): + gbl_ncomputed.value += 1 + ncomputed = gbl_ncomputed.value + nmodels = len(gbl_models.block) + if ncomputed % 250 == 0 or ncomputed == nmodels: + dt = time.time() - gbl_t0 print("{}/{} models computed in {:.1f} seconds ({:.1f} models/s)". - format(n_computed, gbl_params.size, t_elapsed, - n_computed/t_elapsed), - end="\n" if n_computed == gbl_params.size else "\r") - + format(ncomputed, nmodels, dt, ncomputed/dt), + end="\n" if ncomputed == nmodels else "\r") def analysis(idx, obs): - """Worker process to analyse the PDF and estimate parameters values + """Worker process to analyse the PDF and estimate parameters values and + store them in an instance of ResultsManager. Parameters ---------- idx: int Index of the observation. This is necessary to put the computed values - at the right location in RawArrays + at the right location in the ResultsManager. obs: row Input data for an individual object """ np.seterr(invalid='ignore') - obs_fluxes = np.array([obs[name] for name in gbl_filters]) - obs_errors = np.array([obs[name + "_err"] for name in gbl_filters]) - obs_z = obs['redshift'] - nobs = np.where(np.isfinite(obs_fluxes))[0].size - - if obs_z >= 0.: + if obs['redshift'] >= 0.: # We pick the the models with the closest redshift using a slice to # work on views of the arrays and not on copies to save on RAM. - idx_z = np.abs(obs_z - gbl_z).argmin() - model_z = gbl_z[idx_z] - wz = slice(idx_z, None, gbl_z.size) - - # The mass-dependent physical properties are computed assuming the - # redshift of the model. However because we round the observed redshifts - # to two decimals, there can be a difference of 0.005 in redshift - # between the models and the actual observation. At low redshift, this - # can cause a discrepancy in the mass-dependent physical properties: - # ~0.35 dex at z=0.010 vs 0.015 for instance. Therefore we correct these - # physical quantities by multiplying them by corr_dz. - if model_z == obs_z: - corr_dz = 1. - else: - if model_z > 0.: - corr_dz = (cosmology.luminosity_distance(obs_z).value / - cosmology.luminosity_distance(model_z).value)**2. - else: - corr_dz = (cosmology.luminosity_distance(obs_z).value * 1e5)**2. + z = np.array(gbl_models.conf['sed_modules_params']['redshifting']['redshift']) + wz = slice(np.abs(obs['redshift']-z).argmin(), None, z.size) + corr_dz = compute_corr_dz(z, obs['redshift']) else: # We do not know the redshift so we use the full grid wz = slice(0, None, 1) corr_dz = 1. - chi2, scaling = compute_chi2(gbl_model_fluxes[:, wz], obs_fluxes, - obs_errors, gbl_lim_flag) - - ################################################################## - # Variable analysis # - ################################################################## + obs_fluxes = np.array([obs[name] for name in gbl_models.obs.bands]) + obs_errors = np.array([obs[name + "_err"] for name in + gbl_models.obs.bands]) + chi2, scaling = compute_chi2(gbl_models.fluxes[:, wz], obs_fluxes, + obs_errors, gbl_models.conf['analysis_params']['lim_flag']) - # We select only models that have at least 0.1% of the probability of - # the best model to reproduce the observations. It helps eliminating - # very bad models. - maxchi2 = st.chi2.isf(st.chi2.sf(np.nanmin(chi2), nobs-1) * 1e-3, nobs-1) - wlikely = np.where(chi2 < maxchi2) - - if wlikely[0].size == 0: - # It sometimes happen because models are older than the Universe's age - print("No suitable model found for the object {}. One possible origin " - "is that models are older than the Universe.".format(obs['id'])) - gbl_analysed_averages[idx, :] = np.nan - gbl_analysed_std[idx, :] = np.nan - gbl_best_fluxes[idx, :] = np.nan - gbl_best_parameters[idx, :] = np.nan - gbl_best_chi2[idx] = np.nan - gbl_best_chi2_red[idx] = np.nan - else: + if np.any(np.isfinite(chi2)): # We use the exponential probability associated with the χ² as # likelihood function. - likelihood = np.exp(-chi2[wlikely]/2) - - # We define the best fitting model for each observation as the one - # with the least χ². - best_index_z = np.nanargmin(chi2) # index for models at given z - best_index = wz.start + best_index_z * wz.step # index for all models - - # We compute once again the best sed to obtain its info - global gbl_previous_idx - if gbl_previous_idx > -1: - gbl_warehouse.partial_clear_cache( - gbl_params.index_module_changed(gbl_previous_idx, - best_index)) - gbl_previous_idx = best_index - - sed = gbl_warehouse.get_sed(gbl_params.modules, - gbl_params.from_index(best_index)) - - # We correct the mass-dependent parameters - for key in sed.mass_proportional_info: - sed.info[key] *= scaling[best_index_z] * corr_dz + likelihood = np.exp(-chi2 / 2.) + gbl_results.bayes.weights[idx] = np.sum(likelihood) # We compute the weighted average and standard deviation using the # likelihood as weight. - for i, variable in enumerate(gbl_analysed_variables): + for i, variable in enumerate(gbl_results.bayes.propertiesnames): if variable.endswith('_log'): variable = variable[:-4] _ = np.log10 @@ -288,43 +189,77 @@ def analysis(idx, obs): _ = lambda x: x maxstd = lambda mean, std: max(0.05 * mean, std) - if variable in sed.mass_proportional_info: - mean, std = weighted_param(_(gbl_model_variables[i, wz][wlikely] - * scaling[wlikely] * corr_dz), - likelihood) + if variable in gbl_results.bayes.massproportional: + values = _(gbl_models.properties[i, wz] * scaling * corr_dz) else: - mean, std = weighted_param(_(gbl_model_variables[i, wz][wlikely]), - likelihood) - - gbl_analysed_averages[idx, i] = mean - gbl_analysed_std[idx, i] = maxstd(mean, std) - - gbl_best_fluxes[idx, :] = gbl_model_fluxes[:, best_index] \ - * scaling[best_index_z] - - global gbl_keys - if gbl_keys is None: - gbl_keys = list(sed.info.keys()) - gbl_keys.sort() - gbl_best_parameters[idx, :] = np.array([sed.info[k] for k in gbl_keys]) - gbl_best_chi2[idx] = chi2[best_index_z] - gbl_best_chi2_red[idx] = chi2[best_index_z] / (nobs - 1) - - if gbl_save['best_sed']: - save_best_sed(obs['id'], sed, scaling[best_index_z]) - if gbl_save['chi2']: - save_chi2(obs['id'], gbl_analysed_variables, - sed.mass_proportional_info, gbl_model_variables[:, wz], - scaling * corr_dz, chi2 / (nobs - 1)) - if gbl_save['pdf']: - save_pdf(obs['id'], gbl_analysed_variables, - sed.mass_proportional_info, gbl_model_variables[:, wz], - scaling * corr_dz, likelihood, wlikely) - - with gbl_n_computed.get_lock(): - gbl_n_computed.value += 1 - n_computed = gbl_n_computed.value - t_elapsed = time.time() - gbl_t_begin + values = _(gbl_models.properties[i, wz] * corr_dz) + + mean, std = weighted_param(values, likelihood) + gbl_results.bayes.means[idx, i] = mean + gbl_results.bayes.errors[idx, i] = maxstd(mean, std) + if gbl_models.conf['analysis_params']['save_chi2'] is True: + save_chi2(obs, variable, gbl_models, chi2, values) + best_idx_z = np.nanargmin(chi2) + gbl_results.best.chi2[idx] = chi2[best_idx_z] + gbl_results.best.index[idx] = (wz.start + best_idx_z*wz.step + + gbl_models.block.start) + else: + # It sometimes happens because models are older than the Universe's age + print("No suitable model found for the object {}. One possible origin " + "is that models are older than the Universe.".format(obs['id'])) + + with gbl_ncomputed.get_lock(): + gbl_ncomputed.value += 1 + ncomputed = gbl_ncomputed.value + dt = time.time() - gbl_t0 print("{}/{} objects analysed in {:.1f} seconds ({:.2f} objects/s)". - format(n_computed, gbl_n_obs, t_elapsed, n_computed/t_elapsed), - end="\n" if n_computed == gbl_n_obs else "\r") + format(ncomputed, len(gbl_models.obs), dt, ncomputed/dt), + end="\n" if ncomputed == len(gbl_models.obs) else "\r") + +def bestfit(oidx, obs): + """Worker process to compute and save the best fit. + + Parameters + ---------- + oidx: int + Index of the observation. This is necessary to put the computed values + at the right location in the ResultsManager. + obs: row + Input data for an individual object + + """ + np.seterr(invalid='ignore') + + best_index = int(gbl_results.best.index[oidx]) + global gbl_previous_idx + if gbl_previous_idx > -1: + gbl_warehouse.partial_clear_cache( + gbl_params.index_module_changed(gbl_previous_idx, best_index)) + gbl_previous_idx = best_index + sed = gbl_warehouse.get_sed(gbl_params.modules, + gbl_params.from_index(best_index)) + + fluxes = np.array([sed.compute_fnu(filt) for filt in gbl_obs.bands]) + obs_fluxes = np.array([obs[name] for name in gbl_obs.bands]) + obs_errors = np.array([obs[name + '_err'] for name in gbl_obs.bands]) + _, scaling = compute_chi2(fluxes[:, None], obs_fluxes, obs_errors, + gbl_conf['analysis_params']['lim_flag']) + + gbl_results.best.properties[oidx, :] = [sed.info[k] for k in + gbl_results.best.propertiesnames] + iprop = [i for i, k in enumerate(gbl_results.best.propertiesnames) + if k in gbl_results.best.massproportional] + corr_dz = compute_corr_dz(sed.info['universe.redshift'], obs['redshift']) + gbl_results.best.properties[oidx, iprop] *= scaling * corr_dz + gbl_results.best.fluxes[oidx, :] = fluxes * scaling + + if gbl_conf['analysis_params']["save_best_sed"]: + sed.to_fits('out/{}'.format(obs['id']), scaling) + + with gbl_ncomputed.get_lock(): + gbl_ncomputed.value += 1 + ncomputed = gbl_ncomputed.value + dt = time.time() - gbl_t0 + print("{}/{} best fit spectra computed in {:.1f} seconds ({:.2f} objects/s)". + format(ncomputed, len(gbl_obs), dt, ncomputed/dt), end="\n" if + ncomputed == len(gbl_obs) else "\r") diff --git a/pcigale/analysis_modules/utils.py b/pcigale/analysis_modules/utils.py index d37fe59a..110e1951 100644 --- a/pcigale/analysis_modules/utils.py +++ b/pcigale/analysis_modules/utils.py @@ -7,44 +7,6 @@ Various utility functions for pcigale analysis modules """ -import numpy as np -from astropy import log -from astropy.table import Table, Column - -log.setLevel('ERROR') - -def save_fluxes(model_fluxes, model_parameters, filters, names): - """Save fluxes and associated parameters into a table. - - Parameters - ---------- - model_fluxes: RawArray - Contains the fluxes of each model. - model_parameters: RawArray - Contains the parameters associated to each model. - filters: list - Contains the filter names. - names: List - Contains the parameters names. - - """ - out_fluxes = np.ctypeslib.as_array(model_fluxes[0]) - out_fluxes = out_fluxes.reshape(model_fluxes[1]) - - out_params = np.ctypeslib.as_array(model_parameters[0]) - out_params = out_params.reshape(model_parameters[1]) - - out_table = Table(np.hstack((out_fluxes, out_params)), - names=filters + list(names)) - - out_table.add_column(Column(np.arange(model_fluxes[1][0]), name='id'), - index=0) - - out_table.write("out/computed_fluxes.fits") - out_table.write("out/computed_fluxes.txt", format='ascii.fixed_width', - delimiter=None) - - def nothread(): """Some libraries such as Intel's MKL have automatic threading. This is good when having only one process. However we already do our own diff --git a/pcigale_plots/__init__.py b/pcigale_plots/__init__.py index 759f9e87..b2695ff0 100644 --- a/pcigale_plots/__init__.py +++ b/pcigale_plots/__init__.py @@ -49,11 +49,13 @@ def _chi2_worker(obj_name, var_name): Name of the analysed variable.. """ - if os.path.isfile("out/{}_{}_chi2.fits".format(obj_name, var_name)): - chi2 = Table.read("out/{}_{}_chi2.fits".format(obj_name, var_name)) + fname = "out/{}_{}_chi2.npy".format(obj_name, var_name) + if os.path.isfile(fname): + data = np.memmap(fname, dtype=np.float64) + data = np.memmap(fname, dtype=np.float64, shape=(2, data.size // 2)) figure = plt.figure() ax = figure.add_subplot(111) - ax.scatter(chi2[var_name], chi2['chi2'], color='k', s=.1) + ax.scatter(data[1, :], data[0, :], color='k', s=.1) ax.set_xlabel(var_name) ax.set_ylabel("Reduced $\chi^2$") ax.set_ylim(0., ) @@ -77,11 +79,34 @@ def _pdf_worker(obj_name, var_name): Name of the analysed variable.. """ - if os.path.isfile("out/{}_{}_pdf.fits".format(obj_name, var_name)): - pdf = Table.read("out/{}_{}_pdf.fits".format(obj_name, var_name)) + fname = "out/{}_{}_chi2.npy".format(obj_name, var_name) + if os.path.isfile(fname): + data = np.memmap(fname, dtype=np.float64) + data = np.memmap(fname, dtype=np.float64, shape=(2, data.size // 2)) + + likelihood = np.exp(-data[0, :] / 2.) + model_variable = data[1, :] + + Npdf = 100 + min_hist = np.min(model_variable) + max_hist = np.max(model_variable) + Nhist = min(Npdf, len(np.unique(model_variable))) + + if min_hist == max_hist: + pdf_grid = np.array([min_hist, max_hist]) + pdf_prob = np.array([1., 1.]) + else: + pdf_prob, pdf_grid = np.histogram(model_variable, Nhist, + (min_hist, max_hist), + weights=likelihood, density=True) + pdf_x = (pdf_grid[1:]+pdf_grid[:-1]) / 2. + + pdf_grid = np.linspace(min_hist, max_hist, Npdf) + pdf_prob = np.interp(pdf_grid, pdf_x, pdf_prob) + figure = plt.figure() ax = figure.add_subplot(111) - ax.plot(pdf[var_name], pdf['probability density'], color='k') + ax.plot(pdf_grid, pdf_prob, color='k') ax.set_xlabel(var_name) ax.set_ylabel("Probability density") ax.minorticks_on() -- GitLab