Commit cd09ea57 authored by PERNOT Guillaume's avatar PERNOT Guillaume

Initial commit

parents
NISP test campaign automated acquisitions
=========================================
# Installation
Needs python3
```
pip install -r pip-requirements.txt
```
# Run
Start server with :
```
python3 server.py
```
def do_connect(args):
print('!CONNECT "Atik 420", serial 2060247612 opened successfully')
def do_cool(args):
print('!COOL 1 set point=-10.0C, power=0.0%')
def do_time(args):
print('!TIME {args}')
def do_sint(args):
print('!SINT saved file "output.fits"')
def do_quit(args):
print('QUIT')
def repl_run():
commands = {'@connect': do_connect,
'@cool': do_cool,
'@time': do_time,
'@sint': do_sint,
'@quit': do_quit
}
while True:
cmd = input('AtikCtl>').split(maxsplit=1)
if len(cmd) == 1:
args = None
else:
args = cmd[1]
if cmd[0] in commands:
commands[cmd[0]](args)
if __name__ == '__main__':
repl_run()
import cmd
import time
class ATS_Shell(cmd.Cmd, object):
intro = """ ____ ________ __ _____
/ __ \/_ __/ / / / / ___/____ ____ ________
/ / / / / / / / / / \__ \/ __ \/ __ `/ ___/ _ \\
/ /_/ / / / / /_/ / ___/ / /_/ / /_/ / /__/ __/
/_____/ /_/ \____/ /____/ .___/\__,_/\___/\___/
/_/ """
intro += '\nWelcome to the ATS shell Rev.2. Type help or ? to list commands.\n'
prompt = 'ATS>'
file = None
def __init__(self, config=None):
# Initialize the superclass
super().__init__()
def do_stop(self, arg):
""" Immediately stops any movement """
print('STOP')
def do_get_enc_position(self, arg):
""" Returns the current encoder position corrected for origin offset """
print('-1.1,2.2,-3.3,4.4,-5.5')
def do_wait_move(self, arg):
time.sleep(2)
if __name__ == "__main__":
# TODO: Make option for loading a configuration file
shell = ATS_Shell()
shell.cmdloop()
import pexpect
import logging
import shutil
import time
import socket
import threading
from astropy.io import fits
BIND_IP = '0.0.0.0'
BIND_PORT = 9999
ATSCTL_PATH = 'python3 atstest.py'
ATSCTL_PROMPT = 'ATS>'
ATIK_PATH = 'python3 atiktest.py'
ATIK_PROMPT = 'AtikCtl>'
logging.basicConfig(filename='atsctl.log', level=logging.DEBUG)
def cmd(proc, msg, expect, error):
proc.sendline(msg)
i = proc.expect([expect, error], timeout=4)
if i == 1:
logging.error(proc.after)
raise Exception(proc.after)
else:
logging.info(proc.after)
return proc.after
def move_and_acquire(timestamp, atsctl, atik, rel_pos):
cmd(atsctl, 'move_rel_pos_ats {}'.format(rel_pos), ATSCTL_PROMPT, 'ERROR')
cmd(atsctl, 'wait_move', ATSCTL_PROMPT, 'ERROR')
#float_re = '([+-]?\ *\d+\.?\d*)'
cmd(atsctl, 'get_enc_position',
r'([+-]?\ *\d+\.?\d*),([+-]?\ *\d+\.?\d*),([+-]?\ *\d+\.?\d*),([+-]?\ *\d+\.?\d*),([+-]?\ *\d+\.?\d*)',
'ERROR')
x, y, z, theta, phi = atsctl.match.groups()
logging.info('New position : {} {} {} {} {}'.format(x, y, z, theta, phi))
cmd(atik, '@sint', '!SINT', '!ERROR')
# output.fits has been created
dst = 'outputs/{}.fits'.format(timestamp)
try:
shutil.copyfile('output.fits', dst)
except Exception as e:
logging.error('An error occured while moving output.fits to {} : {}'.format(dst, e))
raise
# add timestamp header
hdul = fits.open(dst)
logging.info('{} file written'.format(dst))
def handle_client_connection(client_socket, atsctl, atik):
timestamp = int(time.time())
request = client_socket.recv(1024)
logging.debug('net command: {}'.format(request))
cmd = request.split()
error = None
if cmd[0] == b'maa':
# Move And Aquire command
try:
move_and_acquire(timestamp, atsctl, atik, cmd[1])
except Exception as e:
error = str(e)
else:
error = 'Unhandled command {}'.format(cmd[0])
if error:
client_socket.send(bytes(error, 'utf-8'))
else:
client_socket.send(b'ok')
client_socket.close()
def tcp_server(atsctl, atik):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((BIND_IP, BIND_PORT))
server.listen(5) # max backlog of connections
while True:
client_sock, address = server.accept()
logging.info('Accepted connection from {}:{}'.format(address[0], address[1]))
client_handler = threading.Thread(
target=handle_client_connection,
args=(client_sock, atsctl, atik)
)
client_handler.start()
if __name__ == '__main__':
atsctl = pexpect.spawn(ATSCTL_PATH)
atsctl.expect(ATSCTL_PROMPT)
atik = pexpect.spawn(ATIK_PATH)
atik.expect(ATIK_PROMPT)
cmd(atik, '@connect', '!CONNECT', '!ERROR')
cmd(atik, '@cool on', '!COOL', '!ERROR')
cmd(atik, '@time 0.005', '!TIME', '!ERROR')
tcp_server(atsctl, atik)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment