Commit ce50e5a9 authored by EPINAT Benoit's avatar EPINAT Benoit
Browse files

Revert "Merge branch 'dev_adhoc_to_fits' into 'master'"

This reverts merge request !2
parent 0ae6ceb6
# Created by .ignore support plugin (hsz.mobi)
# IntelliJ project files
.idea
*.iml
out
gen
.idea/
\.cache/v/cache/
*.pyc
# Output test files
tests/data/*.tex
tests/data/*.txt
*.me
......@@ -634,16 +634,16 @@ class MainWindow(QMainWindow, Ui_PyQubeVis):
elif ext == '.fits.gz':
hdu = fits.open(gzip.open(fname[0]))
elif ext == '.ad2':
dtu = pad.read_ad2(fname[0])
dtu = pad.readad2(fname[0])
hdu = dtu.tohdu()
elif ext == '.ad2.gz':
dtu = pad.read_ad2(gzip.open(fname[0]))
dtu = pad.readad2(gzip.open(fname[0]))
hdu = dtu.tohdu()
elif ext == '.ad3':
dtu = pad.read_ad3(fname[0])
dtu = pad.readad3(fname[0])
hdu = dtu.tohdu()
elif ext == '.ad3.gz':
dtu = pad.read_ad3(gzip.open(fname[0]))
dtu = pad.readad3(gzip.open(fname[0]))
hdu = dtu.tohdu()
else:
return
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
AdHoc to Fits
Script created to convert .ad2 and .ad3 files to .fits files. During the
conversion, the script can get the header from a text file or from another
fits file.
by Bruno C. Quint,
based on code from Benoit Epinat
Feb 2018
"""
import argparse
import os
import logging
import numpy as np
import sys
import threading
from astropy.io import fits as pyfits
# Trailer structure provided by Benoit Epinat
TRAILER_DATA_TYPE_AD2 = [
('nbdim', np.int32),
('id1', np.int32),
('id2', np.int32),
('lx', np.int32),
('ly', np.int32),
('lz', np.int32),
('scale', np.float32),
('ix0', np.int32),
('iy0', np.int32),
('zoom', np.float32),
('modevis', np.int32),
('thrshld', np.float32),
('step', np.float32),
('nbiso', np.int32),
('pal', np.int32),
('cdelt1', np.float64),
('cdelt2', np.float64),
('crval1', np.float64),
('crval2', np.float64),
('crpix1', np.float32),
('crpix2', np.float32),
('crota2', np.float32),
('equinox', np.float32),
('x_mirror', np.int8),
('y_mirror', np.int8),
('was_compressed', np.int8),
('none2', np.int8, 1),
('none', np.int32, 4),
('comment', np.int8, 128)
]
# Trailer Structure provided by Philippe Amram.
TRAILER_DATA_TYPE_AD3 = [
("nbdim", np.int32),
("id1", np.int32),
("id2", np.int32),
("lx", np.int32),
("ly", np.int32),
("lz", np.int32),
("scale", np.float32),
("ix0", np.int32),
("iy0", np.int32),
("zoom", np.int32),
("xl1", np.float32), # 10 -- lambda of the 1st channel
("xil", np.float32), # 11 -- interfringe in Angstroems
("vr0", np.float32), # 12 -- mean radial velocity of the object
("corrvadh", np.float32), # 13 -- heliocentric RV correction (km/s)
("p0", np.float32), # 14 -- FP interference order
("xlp", np.float32), # 15 -- reference lambda for p0 (in Ansgtroems)
("xl0", np.float32), # 16 -- observed zero-velocity lambda (in Angstroems)
("vr1", np.float32), # 17 -- radial velocity of the 1st channel (in km/s)
("xik", np.float32), # 18 -- interfringe in km/s
("cdelt1", np.float64), # 19 -- unused
("cdelt2", np.float64), # 20 -- unused
("crval1", np.float64), # 21 -- unused
("crval2", np.float64), # 22 -- unused
("crpix1", np.float32), # 23 -- unused
("crpix2", np.float32), # 24 -- unused
("crota2", np.float32), # 25 -- unused
("equinox", np.float32), # 26 -- unused
("x_mirror", np.int8), # 27 -- unused
("y_mirror", np.int8), # 28 -- unused
("was_compressed", np.int8), # 29 -- unused
("none2", np.int8), # 30 -- unused
("comment", np.float64) # 31 -- unused
]
LOG_FORMAT = ' %(message)s'
parser = argparse.ArgumentParser(
description="Converts .ad2/.ad3 files to .fits.")
parser.add_argument('-d', '--debug', action='store_true',
help="Run debug mode.")
parser.add_argument('--header', type=str, default=None,
help="ASCII (.hdr or .txt) or FITS (.fits) file that "
"contains the header to be copied.")
parser.add_argument('-o', '--output', type=str, default=None,
help="Name of the output cube.")
parser.add_argument('-q', '--quiet', action='store_true',
help="Run quietly.")
parser.add_argument('input_file', type=str,
help="Input .ad2/.ad3 filename.")
parsed_args = parser.parse_args()
log_formatter = logging.Formatter(LOG_FORMAT)
log_handler = logging.StreamHandler()
log_handler.setFormatter(log_formatter)
if parsed_args.debug:
log_handler.setLevel('DEBUG')
elif not parsed_args.quiet:
log_handler.setLevel('INFO')
else:
log_handler.setLevel('WARNING')
log = logging.Logger('ad_to_fits')
log.addHandler(log_handler)
log.info("")
log.info("AdH to Fits Conversion")
log.info("by Bruno Quint (bquint@ctio.noao.edu)")
log.info("Starting program.")
log.info("")
input_file = parsed_args.input_file
log.info("Input File: {input_file:s}".format(**locals()))
output_file = parsed_args.output
if output_file is None:
if os.path.splitext(input_file)[-1].lower() in ['.ad2']:
output_file = input_file.replace('.ad2', '.fits')
if os.path.splitext(input_file)[-1].lower() in ['.ad3']:
output_file = input_file.replace('.ad3', '.fits')
log.info("Output File: {output_file:s}".format(**locals()))
header_file = parsed_args.header
if header_file is not None:
log.info("Header File: {header_file:s}".format(**locals()))
else:
log.info("No header file.")
log.info("Checking input file: {input_file:s}".format(**locals()))
if not os.path.exists(input_file):
log.error('Input file {input_file:s} does not exists.'
' Leaving now.'.format(**locals()))
sys.exit(1)
if os.path.splitext(input_file)[-1].lower() not in ['.ad2', '.ad3']:
log.error('Input file {input_file:s} is not .ad2 or .ad3. '
'Leaving now.'.format(**locals()))
sys.exit(1)
log.info("Ok.")
log.info("Checking output file: {output_file:s}".format(**locals()))
if os.path.exists(output_file):
log.error('Output file {output_file:s} already exists. '
'Delete it and run again.'.format(**locals()))
sys.exit(1)
log.info("Ok.")
if header_file is not None:
log.info("Checking header file: {header_file:s}".format(**locals()))
if not os.path.exists(header_file):
log.error("Header file {header_file:s} does not exist. "
"Leaving now.".format(**locals()))
sys.exit(1)
log.info("Ok.")
else:
log.info("No header file provided.")
log.info("Reading {input_file:s}".format(**locals()))
if os.path.splitext(input_file)[-1].lower() == '.ad2':
data = open(input_file, 'rb')
data.seek(0, 2)
# size of the data array
sz = int((data.tell() - 256) / 4)
data.close()
dt = np.dtype([
('data', np.float32, sz),
('trailer', TRAILER_DATA_TYPE_AD2)
])
ad2 = np.fromfile(input_file, dtype=dt)[0]
ndim = ad2['trailer']['nbdim']
naxis1 = ad2['trailer']['lx']
naxis2 = ad2['trailer']['ly']
naxis3 = ad2['trailer']['lz']
nxyz = naxis1 * naxis2 * naxis3
data = ad2['data']
if ndim != 2:
log.error('{input_file:s} is not a valid .ad2 file - Number of'
' dimensions provided by the trailer is not'
' "2".'.format(**locals()))
sys.exit(1)
if naxis1 * naxis2 * naxis3 != data.size:
log.error('{input_file:s} is not a valid .ad2 file - Dimensions in the'
' trailer and data size mismatch.'.format(**locals()))
sys.exit(1)
data = data.reshape(naxis2, naxis1)
data[np.where(data == -3.1E38)] = np.nan
elif os.path.splitext(input_file)[-1].lower() == '.ad3':
data = open(input_file, 'rb')
data.seek(0, 2)
sz = int((data.tell() - 256) / 4)
data.close()
dt = np.dtype([
('data', np.float32, sz),
('trailer', TRAILER_DATA_TYPE_AD3)
])
ad3 = np.fromfile(input_file, dtype=dt)[0]
ndim = ad3['trailer']['nbdim']
naxis1 = ad3['trailer']['lx']
naxis2 = ad3['trailer']['ly']
naxis3 = ad3['trailer']['lz']
nxyz = naxis1 * naxis2 * naxis3
data = ad3['data']
if ndim != 3:
log.error('{input_file:s} is not a valid .ad3 file - Number of '
'dimensions provided by the trailer is not '
'"3".'.format(**locals()))
sys.exit(1)
if naxis1 * naxis2 * naxis3 != data.size:
log.error('{input_file:s} is not a valid .ad3 file - Dimensions in the '
'trailer and data size mismatch.'.format(**locals()))
sys.exit(1)
data = np.reshape(data, (naxis2, naxis1, naxis3))
data = np.transpose(data, (2, 0, 1))
data = np.flip(data, 1)
data[np.where(data == -3.1E38)] = np.nan
else:
log.error('Input file is not an .ad2 or .ad3 file.')
sys.exit(1)
if header_file is not None:
log.info("Reading {header_file:s}.".format(**locals()))
if os.path.splitext(header_file)[-1].lower() in ['.fits']:
header = pyfits.getheader(header_file)
elif os.path.splitext(header_file)[-1].lower() in ['.txt', '.hdr']:
header = pyfits.Header.fromtextfile(header_file)
else:
log.error("Invalid header format. Leaving now.")
sys.exit(1)
else:
header_file = ''
header = pyfits.Header()
if data.ndim == 2:
header.set('NAXIS', data.ndim)
header.set('NAXIS1', data.shape[0])
header.set('NAXIS2', data.shape[1])
elif data.ndim == 3.:
header.set('NAXIS', data.ndim)
header.set('NAXIS1', data.shape[0])
header.set('NAXIS2', data.shape[1])
header.set('NAXIS3', data.shape[2])
else:
log.error('Data does not have 2 nor 3 dimensions.')
sys.exit(1)
header.append(card=('AD2FITSH', header_file, "Original Header File."), end=True)
header.append(card=('AD2FITSF', input_file, "Original AD2/AD3 File."), end=True)
log.info("Writing file: {output_file:s}".format(**locals()))
pyfits.writeto(output_file, data, header)
log.info("All done!")
log.info("")
This diff is collapsed.
This diff is collapsed.
SIMPLE = T / Written by IDL: Fri Jan 26 13:55:58 2018
BITPIX = -32 / Number of bits per data pixel
NAXIS = 3 / Number of data axes
NAXIS1 = 512 /
NAXIS2 = 512 /
NAXIS3 = 24 /
DATE = '2018-01-26' / Creation UTC (CCCC-MM-DD) date of FITS header
COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy
COMMENT and Astrophysics', volume 376, page 359; bibcode 2001A&A...376..359H
FP_XYZ = 1 /0 if fits is in zxy order, 1 if fits is in xyz o
CTYPE3 = 'VELOCITY' /
CUNIT3 = 'KM/S ' /
CRPIX3 = 1.00000000000 /
CRVAL3 = 617.201372921 /
CDELT3 = 15.7694322583 /
RADESYS = 'FK5 ' /
VELREF = 3 /
BSCALE = 1.00000 /
BZERO = 0.00000 /
CTYPE1 = 'RA---ARC' /X-axis type
CTYPE2 = 'DEC--ARC' /Y-axix type
CRVAL1 = 190.935102779 /Reference pixel value
CRVAL2 = 16.3971060178 /Reference pixel value
CRPIX1 = 257.000 /Reference pixel
CRPIX2 = 257.000 /Reference pixel
CROTA2 = 0.797687 /Rotation in degrees
CDELT1 = -0.000188258237483 /Degres/pixel
CDELT2 = 0.000188258237483 /Degres/pixel
EQUINOX = 2000.00 /Equinox of coordinates
CD1_1 = -0.000188239992805 /
CD1_2 = -2.62089473023E-06 /
CD2_1 = -2.62089473023E-06 /
CD2_2 = 0.000188239992805 /
FP_ID = ' ' /ADHOC identification
FP_ORDER= 793.000 /PF interference order
FP_L_REF= 6562.78 /Reference lambda for fp_order (Angstrom)
FP_B_L = 6576.24 /Lambda of the first channel (Angstrom)
FP_B_R = 0.00000 /Radial velocity of the first channel
FP_I_A = 8.31909 /Interfringe (Angstrom)
FP_I_V = 379.016 /Interfringe (km/s)
FP_L_RE = 6562.78 /Lmabda rest: observed zero-velocity lambda (Angs
FP_M_RV = 804.904 /Mean RV of the object (km/s)
FP_H_CO = 2.96392 /Heliocentric correction for the object (km/s)
\ No newline at end of file
import os
from astropy.io import fits
from unittest import TestCase, skip, main
import pyadhoc
import numpy as np
def test_foo(caplog):
caplog.set_level(pyadhoc.logging.DEBUG, logger='pyadhoc')
pass
class TestAdxCreation(TestCase):
def test_new_tdu_errors(self):
try:
os.mkdir('./my_dir')
except FileExistsError:
pass
self.assertRaises(TypeError, pyadhoc.TDU, data='')
self.assertRaises(ValueError, pyadhoc.TDU, data=np.ones(1))
self.assertRaises(TypeError, pyadhoc.TDU, trailer='')
self.assertRaises(TypeError, pyadhoc.TDU, filename=0)
self.assertRaises(FileNotFoundError, pyadhoc.TDU, filename='foo')
self.assertRaises(IsADirectoryError, pyadhoc.TDU, filename='./my_dir')
os.rmdir('./my_dir')
class TestAd2Io(TestCase):
test_data_path = "tests/data"
filename_ad2 = os.path.join(test_data_path, 'ad2_data.ad2')
filename_da2 = os.path.join(test_data_path, 'da2_data.da2')
filename_fits = os.path.join(test_data_path, 'fits_image.fits')
def test_ad2_to_ad2(self):
tdu = pyadhoc.read_ad2(self.filename_ad2)
output_filename = os.path.join(self.test_data_path, 'temp.ad2')
pyadhoc.write_ad2(tdu, output_filename, overwrite=True)
new_tdu = pyadhoc.read_ad2(output_filename)
np.testing.assert_equal(tdu.trailer.size, new_tdu.trailer.size)
np.testing.assert_equal(tdu.trailer, new_tdu.trailer)
np.testing.assert_equal(tdu.data.shape, new_tdu.data.shape)
np.testing.assert_array_almost_equal(tdu.data, new_tdu.data)
os.remove(output_filename)
def test_ad2_vs_fits(self):
tdu = pyadhoc.read_ad2(self.filename_ad2)
hdu = fits.open(self.filename_fits)[0]
np.testing.assert_array_equal(tdu.data, hdu.data)
def test_ad2_to_fits(self):
hdu_reference = fits.open(self.filename_fits)[0]
input_file = self.filename_ad2
output_file = self.filename_ad2.replace('.ad2', '_ad2.fits')
pyadhoc.ad2_to_fits(input_file, output_file, overwrite=True)
hdu = fits.open(output_file)[0]
np.testing.assert_array_equal(hdu_reference.data.data, hdu.data)
os.remove(output_file)
def test_fits_to_ad2(self):
input_fits = self.filename_fits
output_ad2 = self.filename_fits.replace('.fits', '_fits.ad2')
pyadhoc.fits_to_ad2(input_fits, output_ad2, overwrite=True)
tdu_ref = pyadhoc.read_ad2(self.filename_ad2)
tdu_new = pyadhoc.read_ad2(output_ad2)
np.testing.assert_array_equal(tdu_ref.data, tdu_new.data)
os.remove(output_ad2)
def test_header_to_ad2_trailer(self):
header = fits.getheader(self.filename_fits)
trailer = pyadhoc.fits_header_to_ad2trailer(header)
tdu = pyadhoc.read_ad2(self.filename_ad2)
ref_trailer = tdu.trailer
np.testing.assert_equal(trailer.dtype, ref_trailer.dtype)
for t, r in zip(trailer, ref_trailer):
np.testing.assert_array_almost_equal(t, r)
def test_nparray_to_ad2(self):
nx = 50
ny = 60
xy_grid = np.mgrid[0:ny,0:nx]
data = np.array(xy_grid[0], dtype=np.float32)
tdu = pyadhoc.TDU(data=data)
np.testing.assert_array_equal(data, tdu.data)
self.assertEqual(data.ndim, tdu.ndim)
self.assertEqual(data.shape, tdu.shape)
dt = np.dtype(pyadhoc.TRAILER_AD2)
self.assertEqual(np.dtype(tdu.trailer.dtype), dt)
self.assertEqual(data.ndim, tdu.trailer['nbdim'])
self.assertEqual(data.shape[1], tdu.trailer['lx'])
self.assertEqual(data.shape[0], tdu.trailer['ly'])
self.assertEqual(1, tdu.trailer['lz'])
for key in pyadhoc.DEFAULTS_TRAILER_AD2.keys():
np.testing.assert_array_equal(
pyadhoc.DEFAULTS_TRAILER_AD2[key], tdu.trailer[key],
err_msg='Inconsistent value for parameter {:s}'.format(key)
)
tdu.writeto(
os.path.join(self.test_data_path, 'ndarray.ad2'),
overwrite=True
)
class TestAd3Io(TestCase):
test_data_path = "tests/data"
filename_ad3 = os.path.join(test_data_path, 'ad3_data.ad3')
filename_da3 = os.path.join(test_data_path, 'da3_data.da3')
filename_fits = os.path.join(test_data_path, 'fits_cube.fits')
def test_ad3_to_ad3(self):
tdu = pyadhoc.read_ad3(self.filename_ad3)
output_filename = os.path.join(self.test_data_path, 'temp.ad3')
pyadhoc.write_ad3(tdu, output_filename, overwrite=True)
new_tdu = pyadhoc.read_ad3(output_filename)
np.testing.assert_equal(tdu.trailer.size, new_tdu.trailer.size)
np.testing.assert_equal(tdu.trailer, new_tdu.trailer)
np.testing.assert_equal(tdu.data.shape, new_tdu.data.shape)
np.testing.assert_array_almost_equal(tdu.data, new_tdu.data)
os.remove(output_filename)
def test_ad3_vs_fits(self):
tdu = pyadhoc.read_ad3(self.filename_ad3)
hdu = fits.open(self.filename_fits)[0]
np.testing.assert_equal(tdu.data.shape, hdu.data.shape)
np.testing.assert_equal(tdu.data, hdu.data)
def test_ad3_to_fits(self):
hdu_reference = fits.open(self.filename_fits)[0]
input_file = self.filename_ad3
output_file = self.filename_ad3.replace('.ad3', '_ad3.fits')
header_file = self.filename_fits.replace('.fits', '.hdr')
pyadhoc.ad3_to_fits(input_file, output_file, overwrite=True)
pyadhoc.ad3_to_fits(input_file, output_file, overwrite=True,
header_from=self.filename_fits)
pyadhoc.ad3_to_fits(input_file, output_file, overwrite=True,
header_from=header_file)
hdu = fits.open(output_file)[0]
np.testing.assert_equal(hdu_reference.data.shape, hdu.data.shape)
np.testing.assert_equal(hdu_reference.data, hdu.data)
os.remove(output_file)