Commit 86300034 authored by Médéric Boquien's avatar Médéric Boquien

Add a results manager to store the results of the analysis (bayesian and best...

Add a results manager to store the results of the analysis (bayesian and best fit estimates, best chi², etc.). Several such managers can be merged to combine appropriately the analysis of the observations in different blocks. This is the base class. The implementation will come in subsequent patches.
parent 2b63e5eb
# -*- coding: utf-8 -*-
# Copyright (C) 2017 Universidad de Antofagasta
# Licensed under the CeCILL-v2 licence - see Licence_CeCILL_V2-en.txt
# Author: Médéric Boquien
"""These classes manage the results from the analysis. The main class
ResultsManager contains instances of BayesResultsManager and BestResultsManager
that contain the bayesian and best-fit estimates of the physical properties
along with the names of the parameters, which are proportional to the mass,
etc. Each of these classes contain a merge() method that allows to combine
results of the analysis with different blocks of models.
"""
import ctypes
from multiprocessing.sharedctypes import RawArray
from astropy.table import Table, Column
import numpy as np
def shared_array(shape):
"""Create a shared array that can be read/written by parallel processes
"""
return RawArray(ctypes.c_double, int(np.product(shape)))
class BayesResultsManager(object):
"""This class contains the results of the bayesian estimates of the
physical properties of the analysed objects. It is constructed from a
ModelsManager instance, which provides the required information on the
shape of the arrays. Because it can contain the partial results for only
one block of models, we also store the sum of the model weights (that is
the likelihood) so we can merge different instances to compute the combined
estimates of the physical properties.
"""
def __init__(self, models):
self.nobs = len(models.obs)
self.propertiesnames = models.propertiesnames
self.massproportional = models.massproportional.\
intersection(models.propertiesnames)
self.nproperties = len(models.propertiesnames)
# Arrays where we store the data related to the models. For memory
# efficiency reasons, we use RawArrays that will be passed in argument
# to the pool. Each worker will fill a part of the RawArrays. It is
# important that there is no conflict and that two different workers do
# not write on the same section.
self._means = shared_array((self.nobs, self.nproperties))
self._errors = shared_array((self.nobs, self.nproperties))
self._weights = shared_array((self.nobs))
@staticmethod
def merge(results):
"""Merge a list of partial results computed on individual blocks of
models.
Parameters
----------
results: list of BayesResultsManager instances
List of the partial results to be merged
"""
if not (isinstance(results, list) and
all(isinstance(result, BayesResultsManager)
for result in results)):
raise TypeError("A list of BayesResultsManager is required.")
if len(results) == 1:
results[0]._weights = None
return results[0]
means = np.array([result.means for result in results])
errors = np.array([result.errors for result in results])
weights = np.array([result.weights for result in results])
merged = results[0]
merged._means = shared_array((merged.nobs, merged.nproperties))
merged._errors = shared_array((merged.nobs, merged.nproperties))
merged._weights = None
sumweights = np.sum(weights, axis=0)
merged.means[:] = np.sum(means*weights[:, :, None], axis=0) / \
sumweights[:, None]
# We compute the merged standard deviation by combining the standard
# deviations for each block. This is done using a parallel algorithm.
# See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
# where the number of datapoints has been substituted with the weights.
merged.errors[:] = np.sqrt((np.sum(errors**2.*weights[:, :, None], axis=0) +
np.sum((means[1:, :, :]-means[:1, :, :])**2 *
(weights[1:, :]*weights[:1, :] / sumweights)[:, :, None],
axis=0)) / sumweights[:, None])
return merged
@property
def means(self):
"""Returns a shared array containing the weighted means for each
physical property and each observation.
"""
return np.ctypeslib.as_array(self._means).reshape((self.nobs,
self.nproperties))
@property
def errors(self):
"""Returns a shared array containing the weighted standard deviation
for each physical property and each observation.
"""
return np.ctypeslib.as_array(self._errors).reshape((self.nobs,
self.nproperties))
@property
def weights(self):
"""Returns a shared array containing the some of the weights for each
each observation.
"""
return np.ctypeslib.as_array(self._weights)
class BestResultsManager(object):
"""This class contains the physical properties of the best fit of the
analysed objects. It is constructed from a ModelsManager instance, which
provides the required information on the shape of the arrays. Because it
can contain the partial results for only one block of models, we also store
the index so we can merge different instances to compute the best fit.
"""
def __init__(self, models):
self.obs = models.obs
self.nbands = len(models.obs.bands)
self.nobs = len(models.obs)
self.propertiesnames = models.allpropertiesnames
self.massproportional = models.massproportional
self.nproperties = len(models.allpropertiesnames)
self._fluxes_shape = (self.nobs, self.nbands)
self._properties_shape = (self.nobs, self.nproperties)
# Arrays where we store the data related to the models. For memory
# efficiency reasons, we use RawArrays that will be passed in argument
# to the pool. Each worker will fill a part of the RawArrays. It is
# important that there is no conflict and that two different workers do
# not write on the same section.
self._fluxes = shared_array(self._fluxes_shape)
self._properties = shared_array(self._properties_shape)
self._chi2 = shared_array(self.nobs)
# We store the index as a float to work around python issue #10746
self._index = shared_array(self.nobs)
@property
def fluxes(self):
"""Returns a shared array containing the fluxes of the best fit for
each observation.
"""
return np.ctypeslib.as_array(self._fluxes).reshape(self._fluxes_shape)
@property
def properties(self):
"""Returns a shared array containing the physical properties of the
best fit for each observation.
"""
return np.ctypeslib.as_array(self._properties)\
.reshape(self._properties_shape)
@property
def chi2(self):
"""Returns a shared array containing the raw chi² of the best fit for
each observation.
"""
return np.ctypeslib.as_array(self._chi2)
@property
def index(self):
"""Returns a shared array containing the index of the best fit for each
observation.
"""
return np.ctypeslib.as_array(self._index)
@staticmethod
def merge(results):
"""Merge a list of partial results computed on individual blocks of
models.
Parameters
----------
results: list of BestResultsManager instances
List of the partial results to be merged
"""
if not (isinstance(results, list) and
all(isinstance(result, BestResultsManager)
for result in results)):
raise TypeError("A list of BestResultsManager is required.")
if len(results) == 1:
return results[0]
fluxes = np.array([result.fluxes for result in results])
properties = np.array([result.properties for result in results])
chi2 = np.array([result.chi2 for result in results])
index = np.array([result.index for result in results])
merged = results[0]
merged._fluxes = shared_array((merged.nobs, merged.nbands))
merged._properties = shared_array((merged.nobs, merged.nproperties))
merged._chi2 = shared_array(merged.nobs)
# We store the index as a float to work around python issue #10746
merged._index = shared_array(merged.nobs)
for iobs, bestidx in enumerate(np.argmin(chi2, axis=0)):
merged.fluxes[iobs, :] = fluxes[bestidx, iobs, :]
merged.properties[iobs, :] = properties[bestidx, iobs, :]
merged.chi2[iobs] = chi2[bestidx, iobs]
merged.index[iobs] = index[bestidx, iobs]
return merged
def analyse_chi2(self):
"""Function to analyse the best chi^2 and find out what fraction of
objects seems to be overconstrainted.
"""
obs = [self.obs.table[obs].data for obs in self.obs.bands]
nobs = np.count_nonzero(np.isfinite(obs), axis=0)
chi2_red = self.chi2 / (nobs - 1)
# If low values of reduced chi^2, it means that the data are overfitted
# Errors might be under-estimated or not enough valid data.
print("\n{}% of the objects have chi^2_red~0 and {}% chi^2_red<0.5"
.format(np.round((chi2_red < 1e-12).sum() / chi2_red.size, 1),
np.round((chi2_red < 0.5).sum() / chi2_red.size, 1)))
class ResultsManager(object):
"""This class contains the physical properties (best fit and bayesian) of
the analysed objects. It is constructed from a ModelsManager instance,
which provides the required information to initialise the instances of
BestResultsManager and BayesResultsManager that store the results.
"""
def __init__(self, models):
self.conf = models.conf
self.obs = models.obs
self.params = models.params
self.bayes = BayesResultsManager(models)
self.best = BestResultsManager(models)
@staticmethod
def merge(results):
"""Merge a list of partial results computed on individual blocks of
models.
Parameters
----------
results: list of ResultsManager instances
List of the partial results to be merged
"""
merged = results[0]
merged.bayes = BayesResultsManager.merge([result.bayes
for result in results])
merged.best = BestResultsManager.merge([result.best
for result in results])
return merged
def save(self, filename):
"""Save the estimated values derived from the analysis of the PDF and
the parameters associated with the best fit. A simple text file and a
FITS file are generated.
Parameters
----------
filename:
"""
table = Table()
table.add_column(Column(self.obs.table['id'], name="id"))
for idx, name in enumerate(self.bayes.propertiesnames):
table.add_column(Column(self.bayes.means[:, idx],
name="bayes."+name))
table.add_column(Column(self.bayes.errors[:, idx],
name="bayes."+name+"_err"))
table.add_column(Column(self.best.chi2, name="best.chi_square"))
obs = [self.obs.table[obs].data for obs in self.obs.bands]
nobs = np.count_nonzero(np.isfinite(obs), axis=0)
table.add_column(Column(self.best.chi2 / (nobs - 1),
name="best.reduced_chi_square"))
for idx, name in enumerate(self.best.propertiesnames):
table.add_column(Column(self.best.properties[:, idx],
name="best."+name))
for idx, name in enumerate(self.obs.bands):
table.add_column(Column(self.best.fluxes[:, idx],
name="best."+name, unit='mJy'))
table.write("out/{}.txt".format(filename), format='ascii.fixed_width',
delimiter=None)
table.write("out/{}.fits".format(filename), format='fits')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment