Commit 1e53c834 authored by  Hector Salas's avatar Hector Salas

Modify results.py and models.py to use SharedArray class from utils

parent 34a2d276
...@@ -9,13 +9,10 @@ compute them, such as the configuration, the observations, and the parameters ...@@ -9,13 +9,10 @@ compute them, such as the configuration, the observations, and the parameters
of the models. of the models.
""" """
import ctypes
from multiprocessing.sharedctypes import RawArray
from astropy.table import Table, Column from astropy.table import Table, Column
import numpy as np import numpy as np
from ..warehouse import SedWarehouse from .utils import SharedArray, get_info
class ModelsManager(object): class ModelsManager(object):
...@@ -36,30 +33,29 @@ class ModelsManager(object): ...@@ -36,30 +33,29 @@ class ModelsManager(object):
self.propertiesnames = conf['analysis_params']['variables'] self.propertiesnames = conf['analysis_params']['variables']
self.allpropertiesnames, self.massproportional = self._get_info() self.allpropertiesnames, self.massproportional = self._get_info()
self._fluxes_shape = (len(obs.bands), len(self.block)) self._fluxes = SharedArray((len(self.obs.bands), len(self.block)))
self._props_shape = (len(self.propertiesnames), len(self.block)) self._properties = SharedArray((len(self.propertiesnames),
len(self.block)))
# Arrays where we store the data related to the models. For memory if conf['analysis_method'] == 'pdf_analysis':
# efficiency reasons, we use RawArrays that will be passed in argument self._intprops = SharedArray((len(self.obs.intprops),
# to the pool. Each worker will fill a part of the RawArrays. It is len(self.block)))
# important that there is no conflict and that two different workers do self._extprops = SharedArray((len(self.obs.extprops),
# not write on the same section. len(self.block)))
self._fluxes = self._shared_array(self._fluxes_shape)
self._properties = self._shared_array(self._props_shape)
@property @property
def fluxes(self): def fluxes(self):
"""Returns a shared array containing the fluxes of the models. """Returns a shared array containing the fluxes of the models.
""" """
return np.ctypeslib.as_array(self._fluxes).reshape(self._fluxes_shape) return self._fluxes.data
@property @property
def properties(self): def properties(self):
"""Returns a shared array containing the properties of the models. """Returns a shared array containing the properties of the models.
""" """
return np.ctypeslib.as_array(self._properties).reshape(self._props_shape) return self._properties.data
def _get_info(self): def _get_info(self):
warehouse = SedWarehouse() warehouse = SedWarehouse()
...@@ -87,7 +83,3 @@ class ModelsManager(object): ...@@ -87,7 +83,3 @@ class ModelsManager(object):
table.write("out/{}.fits".format(filename)) table.write("out/{}.fits".format(filename))
table.write("out/{}.txt".format(filename), format='ascii.fixed_width', table.write("out/{}.txt".format(filename), format='ascii.fixed_width',
delimiter=None) delimiter=None)
@staticmethod
def _shared_array(shape):
return RawArray(ctypes.c_double, int(np.product(shape)))
...@@ -11,17 +11,10 @@ etc. Each of these classes contain a merge() method that allows to combine ...@@ -11,17 +11,10 @@ etc. Each of these classes contain a merge() method that allows to combine
results of the analysis with different blocks of models. results of the analysis with different blocks of models.
""" """
import ctypes
from multiprocessing.sharedctypes import RawArray
from astropy.table import Table, Column from astropy.table import Table, Column
import numpy as np import numpy as np
from .utils import SharedArray
def shared_array(shape):
"""Create a shared array that can be read/written by parallel processes
"""
return RawArray(ctypes.c_double, int(np.product(shape)))
class BayesResultsManager(object): class BayesResultsManager(object):
...@@ -46,9 +39,9 @@ class BayesResultsManager(object): ...@@ -46,9 +39,9 @@ class BayesResultsManager(object):
# to the pool. Each worker will fill a part of the RawArrays. It is # to the pool. Each worker will fill a part of the RawArrays. It is
# important that there is no conflict and that two different workers do # important that there is no conflict and that two different workers do
# not write on the same section. # not write on the same section.
self._means = shared_array((self.nobs, self.nproperties)) self._means = SharedArray((self.nobs, self.nproperties))
self._errors = shared_array((self.nobs, self.nproperties)) self._errors = SharedArray((self.nobs, self.nproperties))
self._weights = shared_array((self.nobs)) self._weights = SharedArray((self.nobs))
@staticmethod @staticmethod
def merge(results): def merge(results):
...@@ -71,8 +64,8 @@ class BayesResultsManager(object): ...@@ -71,8 +64,8 @@ class BayesResultsManager(object):
weights = np.array([result.weights for result in results])[..., None] weights = np.array([result.weights for result in results])[..., None]
merged = results[0] merged = results[0]
merged._means = shared_array((merged.nobs, merged.nproperties)) merged._means = SharedArray((merged.nobs, merged.nproperties))
merged._errors = shared_array((merged.nobs, merged.nproperties)) merged._errors = SharedArray((merged.nobs, merged.nproperties))
merged._weights = None merged._weights = None
sumweights = np.sum(weights, axis=0) sumweights = np.sum(weights, axis=0)
...@@ -102,8 +95,7 @@ class BayesResultsManager(object): ...@@ -102,8 +95,7 @@ class BayesResultsManager(object):
physical property and each observation. physical property and each observation.
""" """
return np.ctypeslib.as_array(self._means).reshape((self.nobs, return self._means.data
self.nproperties))
@property @property
def errors(self): def errors(self):
...@@ -111,8 +103,7 @@ class BayesResultsManager(object): ...@@ -111,8 +103,7 @@ class BayesResultsManager(object):
for each physical property and each observation. for each physical property and each observation.
""" """
return np.ctypeslib.as_array(self._errors).reshape((self.nobs, return self._errors.data
self.nproperties))
@property @property
def weights(self): def weights(self):
...@@ -120,7 +111,7 @@ class BayesResultsManager(object): ...@@ -120,7 +111,7 @@ class BayesResultsManager(object):
each observation. each observation.
""" """
return np.ctypeslib.as_array(self._weights) return self._weights.data
class BestResultsManager(object): class BestResultsManager(object):
...@@ -148,11 +139,11 @@ class BestResultsManager(object): ...@@ -148,11 +139,11 @@ class BestResultsManager(object):
# to the pool. Each worker will fill a part of the RawArrays. It is # to the pool. Each worker will fill a part of the RawArrays. It is
# important that there is no conflict and that two different workers do # important that there is no conflict and that two different workers do
# not write on the same section. # not write on the same section.
self._fluxes = shared_array(self._fluxes_shape) self._fluxes = SharedArray(self._fluxes_shape)
self._properties = shared_array(self._properties_shape) self._properties = SharedArray(self._properties_shape)
self._chi2 = shared_array(self.nobs) self._chi2 = SharedArray(self.nobs)
# We store the index as a float to work around python issue #10746 # We store the index as a float to work around python issue #10746
self._index = shared_array(self.nobs) self._index = SharedArray(self.nobs)
@property @property
def fluxes(self): def fluxes(self):
...@@ -160,7 +151,7 @@ class BestResultsManager(object): ...@@ -160,7 +151,7 @@ class BestResultsManager(object):
each observation. each observation.
""" """
return np.ctypeslib.as_array(self._fluxes).reshape(self._fluxes_shape) return self._fluxes.data
@property @property
def properties(self): def properties(self):
...@@ -168,8 +159,7 @@ class BestResultsManager(object): ...@@ -168,8 +159,7 @@ class BestResultsManager(object):
best fit for each observation. best fit for each observation.
""" """
return np.ctypeslib.as_array(self._properties)\ return self._properties.data
.reshape(self._properties_shape)
@property @property
def chi2(self): def chi2(self):
...@@ -177,7 +167,7 @@ class BestResultsManager(object): ...@@ -177,7 +167,7 @@ class BestResultsManager(object):
each observation. each observation.
""" """
return np.ctypeslib.as_array(self._chi2) return self._chi2.data
@property @property
def index(self): def index(self):
...@@ -185,7 +175,7 @@ class BestResultsManager(object): ...@@ -185,7 +175,7 @@ class BestResultsManager(object):
observation. observation.
""" """
return np.ctypeslib.as_array(self._index) return self._index.data
@staticmethod @staticmethod
def merge(results): def merge(results):
...@@ -212,11 +202,11 @@ class BestResultsManager(object): ...@@ -212,11 +202,11 @@ class BestResultsManager(object):
index = np.array([result.index for result in results]) index = np.array([result.index for result in results])
merged = results[0] merged = results[0]
merged._fluxes = shared_array((merged.nobs, merged.nbands)) merged._fluxes = SharedArray((merged.nobs, merged.nbands))
merged._properties = shared_array((merged.nobs, merged.nproperties)) merged._properties = SharedArray((merged.nobs, merged.nproperties))
merged._chi2 = shared_array(merged.nobs) merged._chi2 = SharedArray(merged.nobs)
# We store the index as a float to work around python issue #10746 # We store the index as a float to work around python issue #10746
merged._index = shared_array(merged.nobs) merged._index = SharedArray(merged.nobs)
for iobs, bestidx in enumerate(np.argmin(chi2, axis=0)): for iobs, bestidx in enumerate(np.argmin(chi2, axis=0)):
merged.fluxes[iobs, :] = fluxes[bestidx, iobs, :] merged.fluxes[iobs, :] = fluxes[bestidx, iobs, :]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment