Commit 29934196 authored by Médéric Boquien's avatar Médéric Boquien

Store the model fluxes and properties in dictionaries of SharedArray rather...

Store the model fluxes and properties in dictionaries of SharedArray rather than large SharedArray and adapt the rest of the code. It may be slightly slower doing so but it makes memory allocation easier and it allows to retrieve the values of the fluxes/properties much more convenient as we do not have to retain the index.
parent 457e3ed2
......@@ -120,7 +120,7 @@ def dchi2_over_ds2(s, obs_values, obs_errors, mod_values):
return func
def _compute_scaling(model_fluxes, model_propsmass, obs):
def _compute_scaling(models, obs, wz):
"""Compute the scaling factor to be applied to the model fluxes to best fit
the observations. Note that we look over the bands to avoid the creation of
an array of the same size as the model_fluxes array. Because we loop on the
......@@ -128,41 +128,47 @@ def _compute_scaling(model_fluxes, model_propsmass, obs):
Parameters
----------
model_fluxes: array
Fluxes of the models
model_propsmass: array
Extensive properties of the models to be fitted
models: ModelsManagers class instance
Contains the models (fluxes, intensive, and extensive properties).
obs: Observation class instance
Contains the fluxes, intensive properties, extensive properties and
their errors, for a sigle observation.
wz: slice
Selection of the models at the redshift of the observation or all the
redshifts in photometric-redshift mode.
Returns
-------
scaling: array
Scaling factors minimising the χ²
"""
num = np.zeros(model_fluxes.shape[1])
denom = np.zeros(model_fluxes.shape[1])
for i in range(obs.fluxes.size):
_ = list(models.flux.keys())[0]
num = np.zeros_like(models.flux[_].array[wz])
denom = np.zeros_like(models.flux[_].array[wz])
for band, flux in obs.flux.items():
# Multiplications are faster than division, so we directly use the
# inverse error
inv_err2 = 1. / obs.fluxes_err[i] ** 2.
if np.isfinite(obs.fluxes[i]) and obs.fluxes_err[i] > 0.:
num += model_fluxes[i, :] * (obs.fluxes[i] * inv_err2)
denom += np.square(model_fluxes[i, :] * (1./obs.fluxes_err[i]))
for i in range(obs.extprops.size):
inv_err2 = 1. / obs.flux_err[band] ** 2.
if np.isfinite(flux) and obs.flux_err[band] > 0.:
model = models.flux[band].array[wz]
num += model * (flux * inv_err2)
denom += model**2. * inv_err2
for name, prop in obs.extprop.items():
# Multiplications are faster than division, so we directly use the
# inverse error
inv_err2 = 1. / obs.extprops_err[i] ** 2.
if np.isfinite(obs.extprops[i]) and obs.extprops_err[i] > 0.:
num += model_propsmass[i, :] * (obs.extprops[i] * inv_err2)
denom += model_propsmass[i, :] ** 2. * inv_err2
inv_err2 = 1. / obs.extprop_err[name] ** 2.
if np.isfinite(prop) and obs.extprop_err[name] > 0.:
model = models.extprop[name].array[wz]
num += model * (prop * inv_err2)
denom += model ** 2. * inv_err2
return num/denom
def compute_chi2(model_fluxes, model_props, model_propsmass, obs,
corr_dz, lim_flag):
def compute_chi2(models, obs, corr_dz, wz, lim_flag):
"""Compute the χ² of observed fluxes with respect to the grid of models. We
take into account upper limits if need be. Note that we look over the bands
to avoid the creation of an array of the same size as the model_fluxes
......@@ -171,17 +177,17 @@ def compute_chi2(model_fluxes, model_props, model_propsmass, obs,
Parameters
----------
model_fluxes: array
2D grid containing the fluxes of the models
model_props: array
2D grid containing the intensive properties of the models
model_propsmass: array
2D grid containing the extensive properties of the models
models: ModelsManagers class instance
Contains the models (fluxes, intensive, and extensive properties).
obs: Observation class instance
Contains the fluxes, intensive properties, extensive properties and
their errors, for a sigle observation.
corr_dz: correction factor to scale the extensive properties to the right
corr_dz: float
Correction factor to scale the extensive properties to the right
distance
wz: slice
Selection of the models at the redshift of the observation or all the
redshifts in photometric-redshift mode.
lim_flag: boolean
Boolean indicating whether upper limits should be treated (True) or
discarded (False)
......@@ -195,7 +201,7 @@ def compute_chi2(model_fluxes, model_props, model_propsmass, obs,
"""
limits = lim_flag and (np.any(obs.fluxes_err <= 0.)
or np.any(obs.extprops_err <= 0.))
scaling = _compute_scaling(model_fluxes, model_propsmass, obs)
scaling = _compute_scaling(models, obs, wz)
# Some observations may not have flux values in some filter(s), but
# they can have upper limit(s).
......@@ -212,32 +218,37 @@ def compute_chi2(model_fluxes, model_props, model_propsmass, obs,
chi2 = np.zeros_like(scaling)
# Computation of the χ² from fluxes
for i in range(obs.fluxes.size):
if np.isfinite(obs.fluxes[i]) and obs.fluxes_err[i] > 0.:
chi2 += np.square((obs.fluxes[i] - model_fluxes[i, :] * scaling) *
(1./obs.fluxes_err[i]))
for band, flux in obs.flux.items():
# Multiplications are faster than division, so we directly use the
# inverse error
inv_flux_err = 1. / obs.flux_err[band]
if np.isfinite(flux) and inv_flux_err > 0.:
model = models.flux[band].array[wz]
chi2 += ((flux - model * scaling) * inv_flux_err) ** 2.
# Computation of the χ² from intensive properties
for i in range(obs.extprops.size):
if np.isfinite(obs.extprops[i]):
chi2 += np.square((obs.extprops[i] - corr_dz * (scaling *
model_propsmass[i, :])) *
(1./obs.extprops_err[i]))
for name, prop in obs.intprop.items():
if np.isfinite(prop):
model = models.intprop[name].array[wz]
chi2 += ((prop - model) * (1. / obs.intprops_err[name])) ** 2.
# Computation of the χ² from extensive properties
for i in range(obs.intprops.size):
if np.isfinite(obs.intprops[i]):
chi2 += np.square((obs.intprops[i] - model_props[i, :]) *
(1./obs.intprops_err[i]))
for name, prop in obs.extprop.items():
# Multiplications are faster than division, so we directly use the
# inverse error
inv_prop_err = 1. / obs.extprop_err[name]
if np.isfinite(prop) and inv_prop_err > 0.:
model = models.extprop[name].array[wz]
chi2 += ((prop - (scaling * model) * corr_dz) * inv_prop_err) ** 2.
# Finally take the presence of upper limits into account
if limits == True:
for i, obs_error in enumerate(obs.fluxes_err):
for band, obs_error in obs.flux_err.items():
if obs_error < 0.:
chi2 -= 2. * np.log(.5 *
(1. + erf(((obs.fluxes[i] -
model_fluxes[i, :] * scaling) /
(-np.sqrt(2.)*obs.fluxes_err[i])))))
(1. + erf(((obs.fluxes[band] -
model_flux[band].array[wz] * scaling) /
(-np.sqrt(2.)*obs_error)))))
return chi2, scaling
......
......@@ -29,8 +29,7 @@ def init_sed(models, t0, ncomputed):
Number of computed models. Shared among workers.
"""
global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_obs
global gbl_properties, gbl_t0, gbl_ncomputed
global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_t0, gbl_ncomputed
# Limit the number of threads to 1 if we use MKL in order to limit the
# oversubscription of the CPU/RAM.
......@@ -40,9 +39,6 @@ def init_sed(models, t0, ncomputed):
gbl_warehouse = SedWarehouse()
gbl_models = models
gbl_obs = models.obs
gbl_properties = [prop[:-4] if prop.endswith('_log') else prop for prop in
models.propertiesnames]
gbl_t0 = t0
gbl_ncomputed = ncomputed
......@@ -127,20 +123,21 @@ def sed(idx, midx):
gbl_models.params.from_index(midx))
if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']:
gbl_models.fluxes[:, idx] = np.full(len(gbl_obs.bands), np.nan)
gbl_models.properties[:, idx] = np.full(len(gbl_properties), np.nan)
gbl_models.intprops[:, idx] = np.full(len(gbl_obs.intprops), np.nan)
gbl_models.extprops[:, idx] = np.full(len(gbl_obs.extprops), np.nan)
for band in gbl_models.flux:
gbl_models.flux[band].array[idx] = np.nan
for prop in gbl_models.extprop:
gbl_models.extprop[prop].array[idx] = np.nan
for prop in gbl_models.intprop:
gbl_models.intprop[prop].array[idx] = np.nan
else:
gbl_models.fluxes[:, idx] = [sed.compute_fnu(filter_)
for filter_ in gbl_obs.bands]
gbl_models.properties[:, idx] = [sed.info[name]
for name in gbl_properties]
gbl_models.intprops[:, idx] = [sed.info[name]
for name in gbl_obs.intprops]
gbl_models.extprops[:, idx] = [sed.info[name]
for name in gbl_obs.extprops]
for band in gbl_models.flux.keys():
gbl_models.flux[band].array[idx] = sed.compute_fnu(band)
for prop in gbl_models.extprop.keys():
gbl_models.extprop[prop].array[idx] = sed.info[prop]
for prop in gbl_models.intprop.keys():
gbl_models.intprop[prop].array[idx] = sed.info[prop]
with gbl_ncomputed.get_lock():
gbl_ncomputed.value += 1
ncomputed = gbl_ncomputed.value
......@@ -178,9 +175,7 @@ def analysis(idx, obs):
wz = slice(0, None, 1)
corr_dz = 1.
chi2, scaling = compute_chi2(gbl_models.fluxes[:, wz],
gbl_models.intprops[:, wz],
gbl_models.extprops[:, wz], obs, corr_dz,
chi2, scaling = compute_chi2(gbl_models, obs, corr_dz, wz,
gbl_models.conf['analysis_params']['lim_flag'])
if np.any(np.isfinite(chi2)):
......@@ -196,13 +191,12 @@ def analysis(idx, obs):
# We compute the weighted average and standard deviation using the
# likelihood as weight.
for prop in gbl_results.bayes.intmean:
i = gbl_models.conf['analysis_params']['variables'].index(prop)
if prop.endswith('_log'):
prop = prop[:-4]
_ = np.log10
else:
_ = lambda x: x
values = _(gbl_models.properties[i, wz])
values = _(gbl_models.intprop[prop].array[wz])
mean, std = weighted_param(values[wlikely], likelihood[wlikely])
gbl_results.bayes.intmean[prop].array[idx] = mean
gbl_results.bayes.interror[prop].array[idx] = std
......@@ -210,13 +204,12 @@ def analysis(idx, obs):
save_chi2(obs, prop, gbl_models, chi2, values)
for prop in gbl_results.bayes.extmean:
i = gbl_models.conf['analysis_params']['variables'].index(prop)
if prop.endswith('_log'):
prop = prop[:-4]
_ = np.log10
else:
_ = lambda x: x
values = _(gbl_models.properties[i, wz])
values = _(gbl_models.extprop[prop].array[wz])
mean, std = weighted_param(values[wlikely] * scaling * corr_dz,
likelihood[wlikely])
gbl_results.bayes.extmean[prop].array[idx] = mean
......@@ -224,7 +217,6 @@ def analysis(idx, obs):
if gbl_models.conf['analysis_params']['save_chi2'] is True:
save_chi2(obs, prop, gbl_models, chi2, values)
mean, std = weighted_param(values[wlikely], likelihood[wlikely])
best_idx_z = np.nanargmin(chi2)
gbl_results.best.chi2[idx] = chi2[best_idx_z]
gbl_results.best.scaling[idx] = scaling[best_idx_z]
......
......@@ -28,8 +28,9 @@ def init_fluxes(models, t0, ncomputed):
Number of computed models. Shared among workers.
"""
global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_obs
global gbl_properties, gbl_save, gbl_t0, gbl_ncomputed
global gbl_previous_idx, gbl_warehouse, gbl_models, gbl_obs, gbl_save
global gbl_t0, gbl_ncomputed
# Limit the number of threads to 1 if we use MKL in order to limit the
# oversubscription of the CPU/RAM.
......@@ -40,8 +41,6 @@ def init_fluxes(models, t0, ncomputed):
gbl_models = models
gbl_obs = models.obs
gbl_properties = [prop[:-4] if prop.endswith('_log') else prop for prop in
models.propertiesnames]
gbl_save = models.conf['analysis_params']['save_sed']
gbl_t0 = t0
gbl_ncomputed = ncomputed
......@@ -67,13 +66,19 @@ def fluxes(idx, midx):
gbl_models.params.from_index(midx))
if 'sfh.age' in sed.info and sed.info['sfh.age'] > sed.info['universe.age']:
gbl_models.fluxes[:, idx] = np.full(len(gbl_obs.bands), np.nan)
gbl_models.properties[:, idx] = np.full(len(gbl_properties), np.nan)
for band in gbl_models.flux:
gbl_models.flux[band].raw[idx] = np.nan
for prop in gbl_models.extprop:
gbl_models.extprop[prop].raw[idx] = np.nan
for prop in gbl_models.intprop:
gbl_models.intprop[prop].raw[idx] = np.nan
else:
gbl_models.fluxes[:, idx] = [sed.compute_fnu(filter_)
for filter_ in gbl_obs.bands]
gbl_models.properties[:, idx] = [sed.info[name]
for name in gbl_properties]
for band in gbl_models.flux.keys():
gbl_models.flux[band].array[idx] = sed.compute_fnu(band)
for prop in gbl_models.extprop.keys():
gbl_models.extprop[prop].array[idx] = sed.info[prop]
for prop in gbl_models.intprop.keys():
gbl_models.intprop[prop].array[idx] = sed.info[prop]
if gbl_save is True:
sed.to_fits("out/{}".format(midx))
......
......@@ -26,47 +26,50 @@ class ModelsManager(object):
self.conf = conf
self.obs = obs
self.params = params
self.iblock = iblock
self.block = params.blocks[iblock]
self.allpropnames, self.allextpropnames = get_info(self)
self.allintpropnames = set(self.allpropnames) - self.allextpropnames
self.propertiesnames = conf['analysis_params']['variables']
self.allpropertiesnames, self.massproportional = get_info(self)
self.intpropnames = (self.allintpropnames & set(obs.intprops) |
self.allintpropnames & set(conf['analysis_params']['variables']))
self.extpropnames = (self.allextpropnames & set(obs.extprops) |
self.allextpropnames & set(conf['analysis_params']['variables']))
size = len(params.blocks[iblock])
self._fluxes = SharedArray((len(self.obs.bands), len(self.block)))
self._properties = SharedArray((len(self.propertiesnames),
len(self.block)))
if conf['analysis_method'] == 'pdf_analysis':
self._intprops = SharedArray((len(self.obs.intprops),
len(self.block)))
self._extprops = SharedArray((len(self.obs.extprops),
len(self.block)))
self.flux = {band: SharedArray(size) for band in obs.bands}
self.intprop = {prop: SharedArray(size) for prop in self.intpropnames}
self.extprop = {prop: SharedArray(size) for prop in self.extpropnames}
@property
def fluxes(self):
def flux(self):
"""Returns a shared array containing the fluxes of the models.
"""
return self._fluxes.array
@property
def properties(self):
"""Returns a shared array containing the properties of the models.
return self._flux
"""
return self._properties.array
@flux.setter
def flux(self, flux):
self._flux = flux
@property
def intprops(self):
def intprop(self):
"""Returns a shared array containing the intensive properties to fit.
"""
return self._intprops.array
return self._intprop
@intprop.setter
def intprop(self, intprop):
self._intprop = intprop
@property
def extprops(self):
def extprop(self):
"""Returns a shared array containing the extensive properties to fit.
"""
return self._extprops.array
return self._extprop
@extprop.setter
def extprop(self, extprop):
self._extprop = extprop
def save(self, filename):
"""Save the fluxes and properties of all the models into a table.
......@@ -77,10 +80,15 @@ class ModelsManager(object):
Root of the filename where to save the data.
"""
table = Table(np.vstack((self.fluxes, self.properties)).T,
names=self.obs.bands + self.propertiesnames)
table.add_column(Column(self.block, name='id'), index=0)
table = Table()
table.add_column(Column(self.block, name='id'))
for band in sorted(self.flux.keys()):
table.add_column(Column(self.flux[band].array, name=band,
unit='mJy'))
for prop in sorted(self.extprop.keys()):
table.add_column(Column(self.extprop[prop].array, name=prop))
for prop in sorted(self.intprop.keys()):
table.add_column(Column(self.intprop[prop].array, name=prop))
table.write("out/{}.fits".format(filename))
table.write("out/{}.txt".format(filename), format='ascii.fixed_width',
......
......@@ -279,6 +279,8 @@ class ObservationsManagerVirtual(object):
# this situation
self.bands_err = None
self.table = None
self.intprops = set()
self.extprops = set()
def __len__(self):
"""As there is no observed object the length is naturally 0.
......@@ -304,11 +306,9 @@ class Observation(object):
self.distance = cosmo.luminosity_distance(self.redshift).value
else:
self.distance = np.nan
self.fluxes = np.array([row[band] for band in cls.bands])
self.fluxes_err = np.array([row[band + '_err'] for band in cls.bands])
self.intprops = np.array([row[prop] for prop in cls.intprops])
self.intprops_err = np.array([row[prop + '_err'] for prop in
cls.intprops])
self.extprops = np.array([row[prop] for prop in cls.extprops])
self.extprops_err = np.array([row[prop + '_err'] for prop in
cls.extprops])
self.flux = {band: row[band] for band in cls.bands}
self.flux_err = {band: row[band + '_err'] for band in cls.bands}
self.intprop = {prop: row[prop] for prop in cls.intprops}
self.intprop_err = {prop: row[prop + '_err'] for prop in cls.intprops}
self.extprop = {prop: row[prop] for prop in cls.extprops}
self.extprop_err = {prop: row[prop + '_err'] for prop in cls.extprops}
......@@ -29,11 +29,10 @@ class BayesResultsManager(object):
"""
def __init__(self, models):
nobs = len(models.obs)
self.propertiesnames = models.propertiesnames
extpropnames = models.massproportional.\
intersection(models.propertiesnames)
intpropnames = set(models.propertiesnames) - extpropnames
self.nproperties = len(models.propertiesnames)
self.propertiesnames = models.allpropnames
extpropnames = models.extpropnames
intpropnames = models.intpropnames
self.nproperties = len(intpropnames) + len(extpropnames)
# Arrays where we store the data related to the models. For memory
# efficiency reasons, we use RawArrays that will be passed in argument
......@@ -157,8 +156,8 @@ class BestResultsManager(object):
# important that there is no conflict and that two different workers do
# not write on the same section.
self.flux = {band: SharedArray(nobs) for band in models.obs.bands}
allintpropnames = set(models.allpropertiesnames) - models.massproportional
allextpropnames = set(models.allpropertiesnames) - allintpropnames
allintpropnames = models.allintpropnames
allextpropnames = models.allextpropnames
self.intprop = {prop: SharedArray(nobs)
for prop in allintpropnames}
self.extprop = {prop: SharedArray(nobs)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment