Commit 3caa5296 authored by Médéric Boquien's avatar Médéric Boquien
Browse files

Finally have the analysis module return the computation results through shared arrays.

parent a54af0de
......@@ -147,6 +147,13 @@ class PdfAnalysis(AnalysisModule):
filters, TOLERANCE)
n_obs = len(obs_table)
# Retrieve an arbitrary SED to obtain the list of output parameters
warehouse = SedWarehouse(cache_type=config["storage_type"])
sed = warehouse.get_sed(creation_modules, params.from_index(0))
info = sed.info
n_info = len(sed.info)
del warehouse, sed
print("Computing the models fluxes...")
# Arrays where we store the data related to the models. For memory
......@@ -179,45 +186,38 @@ class PdfAnalysis(AnalysisModule):
print('\nAnalysing models...')
# We use RawArrays for the same reason as previously
analysed_averages = (RawArray(ctypes.c_double, n_obs * n_variables),
(n_obs, n_variables))
analysed_std = (RawArray(ctypes.c_double, n_obs * n_variables),
(n_obs, n_variables))
best_fluxes = (RawArray(ctypes.c_double, n_obs * n_filters),
(n_obs, n_filters))
best_parameters = (RawArray(ctypes.c_double, n_obs * n_info),
(n_obs, n_info))
best_chi2 = (RawArray(ctypes.c_double, n_obs), (n_obs))
best_chi2_red = (RawArray(ctypes.c_double, n_obs), (n_obs))
initargs = (params, filters, analysed_variables, model_redshifts,
model_fluxes, model_variables, time.time(),
mp.Value('i', 0), save, n_obs)
mp.Value('i', 0), analysed_averages, analysed_std,
best_fluxes, best_parameters, best_chi2, best_chi2_red,
save, n_obs)
if cores == 1: # Do not create a new process
init_worker_analysis(*initargs)
items = []
for obs in obs_table:
items.append(worker_analysis(obs))
for idx, obs in enumerate(obs_table):
worker_analysis(idx, obs)
else: # Analyse observations in parallel
with mp.Pool(processes=cores, initializer=init_worker_analysis,
initargs=initargs) as pool:
items = pool.starmap(worker_analysis, zip(obs_table))
# Local arrays where to unpack the results of the analysis
analysed_averages = np.empty((n_obs, n_variables))
analysed_std = np.empty_like(analysed_averages)
best_fluxes = np.empty((n_obs, n_filters))
best_parameters = [None] * n_obs
best_chi2 = np.empty(n_obs)
best_chi2_red = np.empty_like(best_chi2)
for item_idx, item in enumerate(items):
analysed_averages[item_idx, :] = item[0]
analysed_std[item_idx, :] = item[1]
best_fluxes[item_idx, :] = item[2]
best_parameters[item_idx] = item[3]
best_chi2[item_idx] = item[4]
best_chi2_red[item_idx] = item[5]
pool.starmap(worker_analysis, enumerate(obs_table))
print("\nSaving results...")
save_table_analysis(obs_table['id'], analysed_variables,
analysed_averages, analysed_std)
# Retrieve an arbitrary SED to obtain the list of output parameters
warehouse = SedWarehouse(cache_type=config["storage_type"])
sed = warehouse.get_sed(creation_modules, params.from_index(0))
save_table_best(obs_table['id'], best_chi2, best_chi2_red,
best_parameters, best_fluxes, filters, sed.info)
best_parameters, best_fluxes, filters, info)
print("Run completed!")
......
......@@ -162,56 +162,75 @@ def save_table_analysis(obsid, analysed_variables, analysed_averages,
Names of the objects
analysed_variables: list
Analysed variable names
analysed_averages: array
analysed_averages: RawArray
Analysed variables values estimates
analysed_std: array
analysed_std: RawArray
Analysed variables errors estimates
"""
np_analysed_averages = np.ctypeslib.as_array(analysed_averages[0])
np_analysed_averages = np_analysed_averages.reshape(analysed_averages[1])
np_analysed_std = np.ctypeslib.as_array(analysed_std[0])
np_analysed_std = np_analysed_std.reshape(analysed_std[1])
result_table = Table()
result_table.add_column(Column(obsid.data, name="observation_id"))
for index, variable in enumerate(analysed_variables):
result_table.add_column(Column(
analysed_averages[:, index],
np_analysed_averages[:, index],
name=variable
))
result_table.add_column(Column(
analysed_std[:, index],
np_analysed_std[:, index],
name=variable+"_err"
))
result_table.write(OUT_DIR + RESULT_FILE)
def save_table_best(obsid, chi2, chi2_red, variables, fluxes, filters, info_keys):
def save_table_best(obsid, chi2, chi2_red, variables, fluxes, filters,
info_keys):
"""Save the values corresponding to the best fit
Parameters
----------
obsid: table column
Names of the objects
chi2: array
Best Dz for each object
chi2_red: array
Best reduced Dz for each object
variables: list
chi2: RawArray
Best χ² for each object
chi2_red: RawArray
Best reduced χ² for each object
variables: RawArray
All variables corresponding to a SED
fluxes: 2D array
fluxes: RawArray
Fluxes in all bands for each object
filters: list
filters: OrderedDict
Filters used to compute the fluxes
info_keys: list
Parameters names
"""
np_fluxes = np.ctypeslib.as_array(fluxes[0])
np_fluxes = np_fluxes.reshape(fluxes[1])
np_variables = np.ctypeslib.as_array(variables[0])
np_variables = np_variables.reshape(variables[1])
np_chi2 = np.ctypeslib.as_array(chi2[0])
np_chi2_red = np.ctypeslib.as_array(chi2_red[0])
best_model_table = Table()
best_model_table.add_column(Column(obsid.data, name="observation_id"))
best_model_table.add_column(Column(chi2, name="chi_square"))
best_model_table.add_column(Column(chi2_red, name="reduced_chi_square"))
best_model_table.add_column(Column(np_chi2, name="chi_square"))
best_model_table.add_column(Column(np_chi2_red, name="reduced_chi_square"))
for index, name in enumerate(info_keys):
column = Column([variable[index] for variable in variables], name=name)
column = Column(np_variables[:, index], name=name)
best_model_table.add_column(column)
for index, name in enumerate(filters):
column = Column(fluxes[:, index], name=name, unit='mJy')
column = Column(np_fluxes[:, index], name=name, unit='mJy')
best_model_table.add_column(column)
best_model_table.write(OUT_DIR + BEST_MODEL_FILE)
......
......@@ -68,7 +68,9 @@ def init_sed(params, filters, analysed, redshifts, fluxes, variables,
gbl_warehouse = SedWarehouse(cache_type="memory")
def init_analysis(params, filters, analysed, redshifts, fluxes, variables,
t_begin, n_computed, save, n_obs):
t_begin, n_computed, analysed_averages, analysed_std,
best_fluxes, best_parameters, best_chi2, best_chi2_red, save,
n_obs):
"""Initializer of the pool of processes. It is mostly used to convert
RawArrays into numpy arrays. The latter are defined as global variables to
be accessible from the workers.
......@@ -91,6 +93,18 @@ def init_analysis(params, filters, analysed, redshifts, fluxes, variables,
Time of the beginning of the computation.
n_computed: Value
Number of computed models. Shared among workers.
analysed_averages: RawArray
Analysed values for each observation.
analysed_std: RawArray
Standard devriation values for each observation.
best_fluxes: RawArray
Best fluxes for each observation.
best_parameters: RawArray
Best parameters for each observation.
best_chi2: RawArray
Best χ² for each observation.
best_chi2_red: RawArray
Best reduced χ² for each observation.
save: dictionary
Contains booleans indicating whether we need to save some data related
to given models.
......@@ -100,7 +114,25 @@ def init_analysis(params, filters, analysed, redshifts, fluxes, variables,
"""
init_sed(params, filters, analysed, redshifts, fluxes, variables,
t_begin, n_computed)
global gbl_redshifts, gbl_w_redshifts, gbl_save, gbl_n_obs
global gbl_redshifts, gbl_w_redshifts, gbl_analysed_averages
global gbl_analysed_std, gbl_best_fluxes, gbl_best_parameters
global gbl_best_chi2, gbl_best_chi2_red, gbl_save, gbl_n_obs
gbl_analysed_averages = np.ctypeslib.as_array(analysed_averages[0])
gbl_analysed_averages = gbl_analysed_averages.reshape(analysed_averages[1])
gbl_analysed_std = np.ctypeslib.as_array(analysed_std[0])
gbl_analysed_std = gbl_analysed_std.reshape(analysed_std[1])
gbl_best_fluxes = np.ctypeslib.as_array(best_fluxes[0])
gbl_best_fluxes = gbl_best_fluxes.reshape(best_fluxes[1])
gbl_best_parameters = np.ctypeslib.as_array(best_parameters[0])
gbl_best_parameters = gbl_best_parameters.reshape(best_parameters[1])
gbl_best_chi2 = np.ctypeslib.as_array(best_chi2[0])
gbl_best_chi2_red = np.ctypeslib.as_array(best_chi2_red[0])
gbl_redshifts = np.unique(gbl_model_redshifts)
gbl_w_redshifts = {redshift: gbl_model_redshifts == redshift
......@@ -154,18 +186,17 @@ def sed(idx):
end="\r")
def analysis(obs):
def analysis(idx, obs):
"""Worker process to analyse the PDF and estimate parameters values
Parameters
----------
idx: int
Index of the observation. This is necessary to put the computed values
at the right location in RawArrays
obs: row
Input data for an individual object
Returns
-------
The analysed parameters (values+errors), best raw and reduced χ², best
normalisation factor, info of the best SED, fluxes of the best SED
"""
w = np.where(gbl_w_redshifts[gbl_redshifts[np.abs(obs['redshift'] -
......@@ -251,6 +282,15 @@ def analysis(obs):
(model_variables - analysed_averages[np.newaxis, :])**2, axis=0,
weights=weights))
# TODO Merge with above computation after checking it is fine with a MA.
gbl_analysed_averages[idx, :] = analysed_averages
gbl_analysed_std[idx, :] = analysed_std
gbl_best_fluxes[idx, :] = norm_model_fluxes[best_index, :]
gbl_best_parameters[idx, :] = list(sed.info.values())
gbl_best_chi2[idx] = chi2_[best_index]
gbl_best_chi2_red[idx] = chi2_[best_index] / obs_fluxes.count()
if gbl_save['best_sed']:
save_best_sed(obs['id'], sed, norm_facts[best_index])
if gbl_save['chi2']:
......@@ -269,10 +309,3 @@ def analysis(obs):
format(n_computed, gbl_n_obs, np.around(t_elapsed, decimals=1),
np.around(n_computed/t_elapsed, decimals=1)),
end="\r")
return (analysed_averages,
analysed_std,
np.array(norm_model_fluxes[best_index, :], copy=True),
np.array(list(sed.info.values()), copy=True),
chi2_[best_index],
chi2_[best_index] / obs_fluxes.count())
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment