import sys, uuid, datetime, logging
from multiprocessing import Manager, Process
from threading import Thread
import numpy as np
import astropy.units as u
from astropy.coordinates import Angle, SkyCoord
from clu import AMQPClient
from cluplus.proxy import Proxy, invoke
from lvmtipo.site import Site
from lvmtipo.siderostat import Siderostat
from lvmtipo.target import Target
from sdsstools import get_logger
import yaml
from lvmagp.actor.internalfunc import (
GuideImage,
cal_pa,
check_target,
define_visb_limit,
findfocus,
print_debug,
print_error,
print_info,
)
from lvmagp.actor.user_parameters import temp_vs_focus, usrpars
from lvmagp.exceptions import (
LvmagpAcquisitionFailed,
LvmagpActorMissing,
LvmagpInterlockEngaged,
LvmagpTargetOverTheLimit,
)
# import time
[docs]class LVMFocuser:
"""
Class for the focusers
Parameters
----------
tel
Telescope unit to which the focuser belongs
amqpc
AMQP client to be used for actor communication
"""
def __init__(self, tel, amqpc):
self.amqpc = amqpc
self._foc = None
try:
self._foc = Proxy(self.amqpc, "lvm." + tel + ".foc")
self._foc.start()
except Exception as e:
print_error(self.amqpc, e)
raise
[docs] def focusoffset(self, delta_f=None):
"""
Apply a relative telescope focus offset.
Parameters:
delta_f (float): focus offset value in steps
"""
try:
print_debug(self.amqpc, f"Move focus by {delta_f}")
self._foc.moveRelative(delta_f, "STEPS")
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, "Move focus done")
return True
[docs] def focus(self, value=None, temperature=None):
"""
Move focus to a particular value or a first guess from a given temperature
Parameters:
value (float): if provided, focus stage moves to this step value
temperature (float): if 'value' is not provided or 'value=None',
focus stage moves to best guess based on focus vs temperature model
Returns:
None
"""
try:
if value is not None:
print_debug(self.amqpc, f"Move focus to {value}")
self._foc.moveAbsolute(value, "STEPS")
else:
temp_value = temp_vs_focus(temperature=temperature)
print_debug(self.amqpc, f"{datetime.datetime.now()} | Move focus to {value} (estimated)")
self._foc.moveAbsolute(temp_value, "STEPS")
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, "Move focus done")
[docs] def getfocusposition(self, unit="STEPS"):
"""
Get current position of device in given unit.
Parameters
----------
unit
Unit of position
"""
try:
position = self._foc.getPosition(unit)["Position"]
except Exception as e:
print_error(self.amqpc, e)
raise
print_info(self.amqpc, f"Current focus position: {position}")
return position
[docs]class LVMKMirror:
"""
Class for the K-mirros
Parameters
----------
tel
Telescope unit to which the focuser belongs
"""
def __init__(self, tel, amqpc):
self.amqpc = amqpc
self._km = None
try:
self._km = Proxy(self.amqpc, "lvm." + tel + ".km")
self._km.start()
except Exception as e:
print_error(self.amqpc, e)
raise
[docs] def derotate(self, pa):
"""
Derotate the field to correct the given field angle.
Parameters
----------
pa
Target field angle to be corrected.
The direction where pa = field angle will head up after the derotation.
"""
try:
print_debug(self.amqpc, f"Rotate Kmirror to {pa}")
self._km.moveAbsolute(pa, "DEG")
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, "Rotating Kmirror done")
def _kmoffset(self, delta_pa):
"""
Rotate K-mirror by given offset angle in degree.
Parameters
----------
delta_pa
Offset angle for K-mirror to be rotated.
"""
try:
print_debug(self.amqpc, f"Rotate Kmirror by {delta_pa}")
self._km.moveRelative(delta_pa, "DEG")
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, "Rotating Kmirror done")
[docs]class LVMFiberselector:
"""
Class for the fiber selectors
Parameters
----------
tel
Telescope unit to which the focuser belongs
"""
def __init__(self, tel, amqpc):
self.amqpc = amqpc
self._fibsel = None
try:
self._fibsel = Proxy(self.amqpc, "lvm." + tel + ".fibsel")
self._fibsel.start()
except Exception as e:
print_error(self.amqpc, e)
raise
[docs]class LVMTelescope:
"""
Class for the telescopes (Planewave mounts)
Parameters
----------
tel
The name of the telescope
"""
def __init__(self, tel, amqpc, sitename="LCO"):
self.amqpc = amqpc
self.lvmpwi = "lvm." + tel + ".pwi"
self._pwi = None
try:
self._pwi = Proxy(self.amqpc, self.lvmpwi)
self._pwi.start()
except Exception as e:
print_error(self.amqpc, e)
raise
def _slew_radec2000(self, target_ra_h, target_dec_d):
"""
Slew the telescope to given equatorial coordinates whose epoch is J2000.
Parameters
----------
target_ra_h
Target right ascension in hours in J2000 epoch
target_dec_d
Target declination in degrees in J2000 epoch
"""
try:
print_debug(self.amqpc, f"Start to slew telescope to RA {target_ra_h}, Dec {target_dec_d}.") # noqa: E501
self._pwi.gotoRaDecJ2000(target_ra_h, target_dec_d)
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, "Slew completed.")
def _slew_altaz(self, target_alt_d, target_az_d):
"""
Slew the telescope to given horizontal coordinates.
Parameters
----------
target_alt_d
Target altitude in degree
target_az_d
Target azimuth in degree
"""
try:
print_debug(self.amqpc, f"Start to slew telescope to Alt {target_alt_d}, Az {target_az_d}.") # noqa: E501
self._pwi.gotoAltAzJ2000(target_alt_d, target_az_d)
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, "Slew completed.")
def _reset_offset_radec(self):
"""
Reset the offset of both axes to zero.
"""
try:
print_debug(self.amqpc, "Set telescope offsets to zero.")
self._pwi.offset(ra_reset=0, dec_reset=0)
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, "Zero offset setting completed.")
def _offset_radec(self, ra_arcsec, dec_arcsec):
"""
Give some offset to the mount
Parameters
----------
ra_arcsec
Distance to move along right ascension axis in arcseconds
dec_arcsec
Distance to move along declination axis in arcseconds
"""
print_debug(self.amqpc, f"Set telescope offsets ra={ra_arcsec}, dec={dec_arcsec}") # noqa: E501
try:
self._pwi.offset(ra_add_arcsec=ra_arcsec, dec_add_arcsec=dec_arcsec)
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, "Set offset done")
def _get_dec2000_deg(self):
"""
Return the declination (J2000) of current position in degrees
"""
try:
status = self._pwi.status()
except Exception as e:
print_error(self.amqpc, e)
raise
return status["dec_j2000_degs"]
def _set_tracking(self, enable):
"""
Turn on or off the mount tracking
Parameters
----------
enable
If enable is True, tracking will be started. Otherwise, the tracking will be stopped.
"""
s = "ON" if enable else "OFF"
print_debug(self.amqpc, f"Set tracking {s}")
try:
self._pwi.setTracking(enable)
except Exception as e:
print_error(self.amqpc, e)
raise
[docs]class LVMCamera:
"""
Class for the FLIR guide cameras
"""
def __init__(self):
self.lvmcam = "lvmcam" # actor name
self.camname = "" # cam name
self.lvmcampath = ""
self.amqpc = None
self._cam = None
[docs] def single_exposure(self, exptime):
"""
Take a single exposure
Parameters
----------
exptime
Exposure time
"""
print_debug(self.amqpc, f"Guider {self.camname} exposure started")
try:
path = self._cam.expose(exptime, 1, self.camname)[
"PATH"
]
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, f"Guider {self.camname} exposure done")
return path['0']
[docs] def test_exposure(self, exptime):
"""
Take a test exposure using ``-t`` option of lvmcam.
The image file (Temp.fits) will be overwritten whenever this command is executed.
Parameters
----------
exptime
Exposure time
"""
print_debug(self.amqpc, f"Guider {self.camname} exposure started")
try:
path = self._cam.expose(
exptime, 1, self.camname, testshot=""
)["PATH"]
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, f"Guider {self.camname} exposure done")
return path["0"]
[docs] def bias_exposure(self, repeat):
"""
Take a test exposure using ``-t`` option of lvmcam.
The image file (Temp.fits) will be overwritten whenever this command is executed.
Parameters
----------
repeat
The number of bias images
"""
print_debug(self.amqpc, f"Guider {self.camname} exposure started")
try:
path = self._cam.expose(0, repeat, self.camname)["PATH"]
except Exception as e:
print_error(self.amqpc, e)
raise
print_debug(self.amqpc, f"Guider {self.camname} exposure done")
return path.values()
[docs] def status(self):
'''
Return status of autoguide camera (i.e. power status, exposing/reading out/idle ).
'''
try:
status = self._cam.status()
except Exception as e:
print_error(self.amqpc, e)
raise
print_info(self.amqpc, status)
return status
[docs]class LVMEastCamera(LVMCamera):
"""
Class for the FLIR guide cameras installed in east side
Parameters
----------
tel
Telescope to which the camera belongs
"""
def __init__(self, tel, amqpc):
super().__init__()
self.amqpc = amqpc
self.lvmcam = "lvm." + tel + ".age"
self.camname = tel + ".age"
try:
self._cam = Proxy(self.amqpc, self.lvmcam)
self._cam.start()
try:
self._cam.disconnect()
self._cam.connect(name=self.camname)
except Exception as e:
self._cam.connect(name=self.camname)
except Exception as e:
print_error(self.amqpc, e)
raise
[docs]class LVMWestCamera(LVMCamera):
"""
Class for the FLIR guide cameras installed in west side
Parameters
----------
tel
Telescope to which the camera belongs
"""
def __init__(self, tel, amqpc):
super().__init__()
self.amqpc = amqpc
self.lvmcam = "lvm." + tel + ".agw"
self.camname = tel + ".agw"
try:
self._cam = Proxy(self.amqpc, self.lvmcam)
self._cam.start()
try:
self._cam.disconnect()
self._cam.connect(name=self.camname)
except Exception as e:
self._cam.connect(name=self.camname)
except Exception as e:
print_error(self.amqpc, e)
raise
[docs]class LVMTelescopeUnit(
LVMTelescope, LVMFocuser, LVMWestCamera, LVMEastCamera, LVMFiberselector, LVMKMirror
):
def __init__(
self,
tel,
sitename = "LCO",
enable_pwi=True,
enable_foc=True,
enable_agw=True,
enable_age=False,
enable_fibsel=False,
enable_km=True,
):
self.name = tel
self.log = get_logger(f"sdss-lvmagp-{tel}")
self.log.sh.setLevel(logging.DEBUG)
self.amqpc = AMQPClient(
name=f"{sys.argv[0]}.proxy-{uuid.uuid4().hex[:8]}",
log_dir="/home/hojae/Desktop/lvmagp/logs",
log=self.log,
)
self.site = Site(name=sitename)
self.latitude = self.site.lat
self.longitude = self.site.long
self.ag_task = None
# self.ag_on = False
self.ag_break = False
self.siderostat = Siderostat()
self.target = None
# log_dir => How to fix??
self.__connect_actors(
enable_pwi, enable_foc, enable_agw, enable_age, enable_fibsel, enable_km
)
self.__read_parameters()
def __connect_actors(
self, enable_pwi, enable_foc, enable_agw, enable_age, enable_fibsel, enable_km
):
try:
if enable_pwi is True:
LVMTelescope.__init__(self, self.name, self.amqpc)
if enable_foc is True:
LVMFocuser.__init__(self, self.name, self.amqpc)
if enable_agw is True:
self.agw = LVMWestCamera(self.name, self.amqpc)
if enable_age is True:
self.age = LVMEastCamera(self.name, self.amqpc)
else:
self.age = self.agw
if enable_fibsel is True:
LVMFiberselector.__init__(self, self.name, self.amqpc)
if enable_km is True:
LVMKMirror.__init__(self, self.name, self.amqpc)
except Exception:
raise LvmagpActorMissing
print_debug(self.amqpc, "All instrument actors are connected.")
def __read_parameters(self):
#with open('../etc/lvmagp.yml') as f:
with open('../../etc/lvmagp.yml') as f:
conf = yaml.load(f, Loader=yaml.FullLoader)
data = conf[self.name]['agw']
self.offset_x = data['offset_x']
self.offset_y = data['offset_y']
self.pixelscale = data['pixelscale']
self.rotationangle = data['rotationangle']
############# Autofocus functions #########################
[docs] def coarse_autofocus(self):
"""
Find the focus coarsely by scanning whole reachable position.
"""
pass
[docs] def fine_autofocus(self):
"""
Find the optimal focus position which is near the current position.
"""
position, FWHM = [], []
incremental = usrpars.af_incremental
repeat = usrpars.af_repeat
print_info(self.amqpc, "(lvmagp) Fine autofocus start")
# get current pos of focus stage
currentposition = self.getfocusposition(unit="STEPS")
# Move focus
targetposition = currentposition - (incremental * (repeat - 1)) / 2.0
self.focus(value=targetposition)
currentposition = targetposition
position.append(currentposition)
# Take picture & picture analysis
FWHM_tmp = self.__get_fwhm()
print_info(self.amqpc, f"Focus at {currentposition}: {FWHM_tmp}")
FWHM.append(FWHM_tmp)
for iteration in range(repeat - 1):
targetposition = currentposition + incremental
self.focus(value=targetposition)
currentposition = targetposition
position.append(currentposition)
FWHM_tmp = self.__get_fwhm()
FWHM.append(FWHM_tmp)
# Fitting
bestposition, bestfocus = findfocus(position, FWHM)
self.focus(value=bestposition)
print_info(self.amqpc, f"(lvmagp) Fine autofocus done: pos={bestposition}, focus={bestfocus}") # noqa: E501
return bestposition, bestfocus
def __get_fwhm(self):
"""
Take a testshot using both guide image camera and return FWHM of the image.
"""
exptime = usrpars.af_exptime
imgcmd_w, imgcmd_e = None, None
try:
if self.name != "phot":
imgcmd_w, imgcmd_e = invoke(
self.agw._cam.async_expose(usrpars.aqu_exptime, 1, self.agw.camname),
self.age._cam.async_expose(usrpars.aqu_exptime, 1, self.age.camname),
)
imgcmd_w, imgcmd_e = imgcmd_w["PATH"]["0"], imgcmd_e["PATH"]["0"]
else:
imgcmd_w = self.agw.single_exposure(exptime=exptime)
imgcmd_e = imgcmd_w
except Exception as e:
print_error(self.amqpc, e)
raise
guideimg_w = GuideImage(imgcmd_w)
guideimg_w.findstars()
guideimg_e = GuideImage(imgcmd_e)
guideimg_e.findstars()
FWHM = 0.5 * (guideimg_w.FWHM + guideimg_e.FWHM)
return FWHM
############# Slew functions #########################
[docs] def goto_eq(
self,
target_ra_h,
target_dec_d,
target_pa_d=0,
ra_d=False,
):
"""
Point telescope to position using RA and dec (J2000).
Comments and desired actions:
Check dome safety interlock: if interlock is engaged, do not move the telescope.
Return error message (assume people are inside dome)
run status (LVMI_interface) to check instrument status: if not idle (i.e. carrying out any observations or calibrations),
repeat status command regularly until it returns idle,
then move telescope. Regularly give updates
(i.e. "waiting for instrument to become idle before moving telescope")
Parameters
----------
target_ra_h
The right ascension (J2000) of the target in hours
target_dec_d
The declination (J2000) of the target in degrees
target_pa_d
Desired position angle of the image
ra_d
If ``True``, the RA of target should be given in degrees.
"""
if self.check_safety_interlock():
print_debug(self.amqpc, "(lvmagp) safety interlock engaged")
raise LvmagpInterlockEngaged
# Check status to confirm the system is in idle state?
# Check the target is in reachable area
long_d = self.longitude
lat_d = self.latitude
if ra_d:
target_ra_h /= 15
self.target = Target(SkyCoord(ra=target_ra_h*u.hourangle, dec=target_dec_d*u.degree))
if not check_target(target_ra_h, target_dec_d, long_d, lat_d):
print_debug(self.amqpc, "(lvmagp) Target is over the limit")
raise LvmagpTargetOverTheLimit
self._reset_offset_radec()
# Calculate position angle and rotate K-mirror :: should be changed to traj method.
target_pa_d0 = cal_pa(target_ra_h, target_dec_d, long_d, lat_d)
# self.derotate(target_pa_d0 + target_pa_d)
# self._slew_radec2000(target_ra_h=target_ra_h, target_dec_d=target_dec_d),
pa = target_pa_d0 + target_pa_d
print_debug(self.amqpc, f"Rotate Kmirror to {pa}")
print_debug(self.amqpc, f"Start to slew telescope to RA {target_ra_h}, Dec {target_dec_d}.") # noqa: E501
invoke(
self._km.async_moveAbsolute(pa , "DEG"),
self._pwi.async_gotoRaDecJ2000(target_ra_h, target_dec_d),
)
print_debug(self.amqpc, "Rotating Kmirror done")
print_debug(self.amqpc, "Slew completed.")
print_debug(self.amqpc, "(lvmagp) Initial slew completed")
for iter in range(usrpars.aqu_max_iter + 1):
# take an image for astrometry
print_debug(self.amqpc, "(lvmagp) Astrometry...")
print_debug(self.amqpc, f"{self.agw.camname} exposure started")
print_debug(self.amqpc, f"{self.age.camname} exposure started")
guideimgpath = invoke(
self.agw._cam.async_expose(usrpars.aqu_exptime, 1, self.agw.camname, testshot=True),
self.age._cam.async_expose(usrpars.aqu_exptime, 1, self.age.camname, testshot=True)
)
guideimgpath = [guideimgpath[0]["PATH"]["0"], guideimgpath[1]["PATH"]["0"]]
print_debug(self.amqpc, f"{self.agw.camname} exposure done")
print_debug(self.amqpc, f"{self.age.camname} exposure done")
westguideimg = GuideImage(guideimgpath[0])
eastguideimg = GuideImage(guideimgpath[1])
# astrometry
manager = Manager()
posinfo1 = manager.dict()
posinfo2 = manager.dict()
processes = []
processes.append(
Process(
target=westguideimg.astrometry,
args=(target_ra_h, target_dec_d, posinfo1),
)
)
processes.append(
Process(
target=eastguideimg.astrometry,
args=(target_ra_h, target_dec_d, posinfo2),
)
)
for p in processes:
p.start()
for p in processes:
p.join()
westguideimg.ra2000 = posinfo1["ra"]
westguideimg.dec2000 = posinfo1["dec"]
westguideimg.pa = posinfo1["pa"]
eastguideimg.ra2000 = posinfo1["ra"]
eastguideimg.dec2000 = posinfo1["dec"]
eastguideimg.pa = posinfo1["pa"]
ra2000_d = 0.5 * (eastguideimg.ra2000 + westguideimg.ra2000)
dec2000_d = 0.5 * (eastguideimg.dec2000 + westguideimg.dec2000)
pa_d = 0.5 * (eastguideimg.pa + westguideimg.pa)
self.agw.rotationangle = westguideimg.pa
self.age.rotationangle = eastguideimg.pa
ra2000 = Angle(ra2000_d, u.degree)
dec2000 = Angle(dec2000_d, u.degree)
target_ra = Angle(target_ra_h, u.hour)
target_dec = Angle(target_dec_d, u.degree)
comp_ra_arcsec = (target_ra - ra2000).arcsecond
comp_dec_arcsec = (target_dec - dec2000).arcsecond
print_debug(self.amqpc, "(lvmagp) Astrometry result")
print_info(self.amqpc, f"Img_ra2000={ra2000.to_string(unit=u.hour)}")
print_info(self.amqpc, f"Img_dec2000={dec2000.to_string(unit=u.degree)}")
print_info(self.amqpc, f"Img_pa={pa_d:.3f} deg")
print_info(self.amqpc, f"offset_ra={comp_ra_arcsec:.3f} arcsec")
print_info(self.amqpc, f"offset_dec={comp_dec_arcsec:.3f} arcsec")
# Compensation // Compensation for K-mirror based on astrometry result?
# may be by offset method..
if (
np.sqrt(comp_ra_arcsec ** 2 + comp_dec_arcsec ** 2) >
usrpars.aqu_tolerance_arcsec
):
if iter >= usrpars.aqu_max_iter:
print_debug(self.amqpc, "(lvmagp) Acquisition compensation failed")
raise LvmagpAcquisitionFailed
else:
print_debug(self.amqpc, f"(lvmagp) Compensate offset: #{iter}")
pa = target_pa_d - pa_d
print_debug(self.amqpc, f"Rotate Kmirror to {pa}")
print_debug(self.amqpc, f"Set telescope offsets ra={comp_ra_arcsec}, dec={comp_dec_arcsec}")
invoke(
self._km.async_moveAbsolute(pa, "DEG"),
self._pwi.async_offset(ra_add_arcsec=comp_ra_arcsec, dec_add_arcsec=comp_dec_arcsec)
)
print_debug(self.amqpc, "Rotating Kmirror done")
print_debug(self.amqpc, "Set offset done")
# # invoke(
# self.derotate(target_pa_d - pa_d)
# self._offset_radec(ra_arcsec=comp_ra_arcsec, dec_arcsec=comp_dec_arcsec)
# # )
else:
break
[docs] def goto_aa(
self, target_alt_d, target_az_d, target_pa_d=0, target="optical_axis", deg=True
):
"""
Point telescope to position using alt/az (i.e. point to screen or manually park)
It does not run additional compensation based on astrometry.
Comments and desired actions:
Check dome safety interlock: if interlock is engaged, do not move the telescope.
Return error message (assume people are inside dome)
run status (LVMI_interface) to check instrument status: if not idle (i.e. carrying out any observations or calibrations),
repeat status command regularly until it returns idle,
then move telescope. Regularly give updates
(i.e. "waiting for instrument to become idle before moving telescope")
Parameters
----------
target_alt_d
Target altitude in degree
target_az_d
Target azimuth in degree
target_pa_d
Target position angle in degree
target
???
deg
???
"""
if self.check_safety_interlock():
print_debug(self.amqpc, "(lvmagp) safety interlock engaged")
raise LvmagpInterlockEngaged
# Check status to confirm the system is in idle state?
# Check the target is in reachable area
alt_low, alt_high = define_visb_limit(target_az_d)
if not (alt_low < target_alt_d) or not (target_alt_d < alt_high):
print_debug(self.amqpc, "(lvmagp) Target is over the limit")
raise LvmagpTargetOverTheLimit
self._reset_offset_radec()
# Calculate position angle and rotate K-mirror :: should be changed to traj method.
target_pa_d0 = (
0 # I dont know ... # cal_pa(target_ra_h, target_dec_d, long_d, lat_d)
)
# invoke(
self.derotate(target_pa_d0 + target_pa_d)
self._slew_altaz(target_alt_d=target_alt_d, target_az_d=target_az_d),
# )
print_debug(self.amqpc, "(lvmagp) Initial slew completed")
[docs] def goto_screen(self):
"""
Point telescope to screen for dome flats
Comments and desired actions:
Check dome safety interlock: if interlock is engaged, do not move the telescope.
Return error message (assume people are inside dome)
run status (LVMI_interface) to check instrument status: if not idle (i.e. carrying out any observations or calibrations),
repeat status command regularly until it returns idle,
then move telescope. Regularly give updates
(i.e. "waiting for instrument to become idle before moving telescope")
"""
self.goto_aa(target_alt_d=usrpars.screen_alt_d, target_az_d=usrpars.screen_az_d)
[docs] def goto_park(self):
"""
Move telescope to safe park position.
Comments and desired actions:
Check dome safety interlock: if interlock is engaged, do not move the telescope. Return error message
(assume people are inside dome)
run status (LVMI_interface) to check instrument status: if not idle (i.e. carrying out any observations or calibrations),
abort observation (halt in LVMI_interface) and park telescope
(in this case, we assume user wants to park due to an emergency)
"""
self.goto_aa(target_alt_d=usrpars.park_alt_d, target_az_d=usrpars.park_az_d)
[docs] def goto_zenith(self):
"""
Point telescope to zenith
Comments and desired actions:
Check dome safety interlock: if interlock is engaged, do not move the telescope.
Return error message (assume people are inside dome)
run status (LVMI_interface) to check instrument status: if not idle (i.e. carrying out any observations or calibrations),
repeat status command regularly until it returns idle,
then move telescope. Regularly give updates
(i.e. "waiting for instrument to become idle before moving telescope")
"""
self.goto_aa(target_alt_d=89.9999, target_az_d=180.)
[docs] def track_on(self, derotator=True):
'''
Turn on tracking, optionally track in rotation, optionally
supply non-sidereal track rates.
Parameters
----------
derotator
If True, derotator will work to compensate the field rotation.
'''
self._set_tracking(enable=True)
if derotator:
pass # do something to derotate the field
[docs] def track_off(self):
'''
Turn off mount tracking and field derotating.
'''
self._set_tracking(enable=False)
# do something to stop the derotator
############# Autoguide functions #########################
[docs] def offset(
self, delta_ra=None, delta_dec=None, delta_x=None, delta_y=None, delta_pa=None
):
"""
Normal offset (for guiding): do NOT change track rates
It returns applied offset values.
Parameters
----------
delta_ra
Offset in ra direction in arcsecond (+/- = E/W)
delta_dec
Offset in dec direction in arcsecond(+/- = N/S)
delta_x
Offset in x direction in guide frame in pixel(+/- = R/L)
delta_y
Offset in y direction in guide frame in pixel (+/- = U/D)
delta_pa
Offset in position angle in degree (+/- = CW/CCW)
"""
if (delta_ra is not None) or (delta_dec is not None):
self._offset_radec(ra_arcsec=delta_ra, dec_arcsec=delta_dec)
returnval = ['radec', delta_ra, delta_dec]
if (delta_x is not None) or (delta_y is not None):
P = np.array([1, -1]) # inversion matrix
# (x-axis by right-hand coordinate to left-hand, y-axis by siderostat mirror)
offset_xy = np.array([delta_x, delta_y])
if self.name == 'sci': # 20220401 'phot':
#20220401
theta = -self.siderostat.fieldAngle(self.site, self.target, ambi=None)
print_debug(self.amqpc, f"field angle={np.rad2deg(theta)}")
# theta = np.deg2rad(self.rotationangle)
c, s = np.cos(theta), np.sin(theta)
R = np.array([[c, -s], [s, c]]) # rotation matrix
else:
R = np.array([[1,0],[0,1]])
offset_radec = np.dot(R, offset_xy) * self.pixelscale
decj2000_deg = self._get_dec2000_deg()
offset_radec[0] /= np.cos(np.deg2rad(decj2000_deg))
offset_radec *= P
self._offset_radec(ra_arcsec=offset_radec[0], dec_arcsec=offset_radec[1])
returnval = ['radec', offset_radec[0], offset_radec[1]]
if (delta_pa is not None):
self._kmoffset(delta_pa)
returnval = ['pa', delta_pa]
return returnval
[docs] def dither(self, delta_x=None, delta_y=None, delta_pa=None):
"""
move the guider set points (guide boxes) by specified amount; used to move accurate
offsets by letting the guider do the work. used, e.g., for dithering.
Parameters
----------
delta_x
Offset in x direction in guide frame in pixel(+/- = R/L)
delta_y
Offset in y direction in guide frame in pixel (+/- = U/D)
delta_pa
Offset in position angle in degree (+/- = CW/CCW)
"""
pass
[docs] def guide_on(self, guide_parameters=None, ra_h=None, dec_d=None):
'''
Start guiding, or modify parameters of running guide loop. <--- modify????
guide_parameters is a dictionary containing additional parameters for
the guiders, e.g. exposure times, cadence, PID parameters, readout or window modes, ...
Parameters
----------
useteldata
If ``useteldata`` is flagged, the sequence will use the pixel scale and
rotation angle from LVMTelescope.
Otherwise, the sequence will get pixel scale from LVMCamera, and
it assumes that the camera is north-oriented.
guide_parameters
exposure times, cadence, PID parameters, readout or window modes, ... ???
'''
if ra_h is not None and dec_d is not None:
self.target = Target(SkyCoord(ra=ra_h*u.hourangle, dec=dec_d*u.degree))
# if self.ag_on:
# pass #raise lvmagpguidealreadyrunning ?????
self.ag_task = True
print_debug(self.amqpc, "(lvmagp) Autoguide Start")
try:
t = Thread(target=self.autoguide_supervisor)
# t.setDaemon(True)
t.start()
except Exception as e:
print_error(self.amqpc, e)
raise
# finally:
# self.ag_task = None
# return self.amqpc.log.debug(f"{datetime.datetime.now()} | Guide stopped")
[docs] def guide_off(self):
'''
Turn off guiding, revert to tracking (track_on).
'''
if self.ag_task is not None:
self.ag_break = True
self.ag_task = None
return
else:
return print_error(self.amqpc, f"There is no autoguiding loop for telescope {self.name}")
'''
def calibration(self):
"""
Run calibration sequence to calculate the transformation
from the equatorial coordinates to the xy coordinates of the image.
"""
offset_per_step = usrpars.ag_cal_offset_per_step
num_step = usrpars.ag_cal_num_step
decj2000_deg = self._get_dec2000_deg()
xpositions, ypositions = [], []
initposition, initflux = self.find_guide_stars()
xpositions.append(initposition[:, 0])
ypositions.append(initposition[:, 1])
time.sleep(3)
# dec axis calibration
for step in range(1, num_step + 1):
self._offset_radec(0, offset_per_step)
position, flux = self.find_guide_stars(positionguess=initposition)
xpositions.append(position[:, 0])
ypositions.append(position[:, 1])
self._offset_radec(0, -num_step * offset_per_step)
xoffsets = np.array(xpositions) - xpositions[0]
yoffsets = np.array(ypositions) - ypositions[0]
print(xpositions)
print(xoffsets)
print(ypositions)
print(yoffsets)
xscale_dec = (
np.average(xoffsets[1:] / np.array([[i] * 3 for i in range(1, num_step + 1)])) /
offset_per_step
) # displacement along x-axis by ra offset in pixel per arcsec. exclude the first index (0,0)
yscale_dec = (
np.average(yoffsets[1:] / np.array([[i] * 3 for i in range(1, num_step + 1)])) /
offset_per_step
) # exclude the first index (0,0)
# ra axis calibration
xpositions = [initposition[:, 0]]
ypositions = [initposition[:, 1]]
for step in range(1, num_step + 1):
self._offset_radec(
offset_per_step / np.cos(np.deg2rad(decj2000_deg)), 0
)
position, flux = self.find_guide_stars(positionguess=initposition)
xpositions.append(position[:, 0])
ypositions.append(position[:, 1])
self._offset_radec(
-num_step * offset_per_step / np.cos(np.deg2rad(decj2000_deg)), 0
)
xoffsets = np.array(xpositions) - xpositions[0]
yoffsets = np.array(ypositions) - ypositions[0]
print(xpositions)
print(xoffsets)
print(ypositions)
print(yoffsets)
xscale_ra = (
np.sum(xoffsets[1:] / np.array([[i] * 3 for i in range(1, num_step + 1)]))
/ offset_per_step
) # exclude the first index (0,0)
yscale_ra = (
np.sum(yoffsets[1:] / np.array([[i] * 3 for i in range(1, num_step + 1)]))
/ offset_per_step
) # exclude the first index (0,0)
self.scale_matrix = np.linalg.inv(
np.array([[xscale_ra, xscale_dec], [yscale_ra, yscale_dec]])
) # inverse matrix.. linear system of equations..
return self.amqpc.log.debug(
f"{datetime.datetime.now()} |"+
f'xscale_ra={xscale_ra} pixel/arcsec'+
f'yscale_ra={yscale_ra} pixel/arcsec'+
f'xscale_dec={xscale_dec} pixel/arcsec'+
f'yscale_dec={yscale_dec} pixel/arcsec'
)
'''
[docs] def autoguide_supervisor(self):
"""
Manage the autoguide sequence.
It starts real autoguide loop and keeps it until the break signal comes.
Parameters
----------
tel
Telescope to autoguide
"""
initposition, initflux = self.find_guide_stars()
while 1:
print_debug(self.amqpc, "autoguiding loop Start")
self.autoguiding(
initposition,
initflux
)
print_debug(self.amqpc, "autoguiding loop Done")
if self.ag_break:
self.ag_break = False
break
return True
[docs] def find_guide_stars(self, positionguess=None):
"""
Expose an image, and find three guide stars from the image.
Also calculate the center coordinates and fluxes of found stars.
Parameters
----------
positionguess
Initial guess of guidestar position.
It should be given in np.ndarray as [[x1, y1], [x2, y2], ...]
If ``positionguess`` is not None, ``find_guide_stars`` only conduct center finding
based on ``positionguess`` without finding new stars.
"""
# take an image for astrometry
print_debug(self.amqpc, "Taking image...")
try:
# guideimgpath = [
# self.agw.single_exposure(usrpars.ag_exptime),
# self.age.single_exposure(usrpars.ag_exptime)
# ]
print_debug(self.amqpc, f"{self.agw.camname} exposure started")
print_debug(self.amqpc, f"{self.age.camname} exposure started")
guideimgpath = invoke(
self.agw._cam.async_expose(usrpars.aqu_exptime, 1, self.agw.camname),
self.age._cam.async_expose(usrpars.aqu_exptime, 1, self.age.camname)
)
guideimgpath = [guideimgpath[0]["PATH"]["0"], guideimgpath[1]["PATH"]["0"]]
print_debug(self.amqpc, f"{self.agw.camname} exposure done")
print_debug(self.amqpc, f"{self.age.camname} exposure done")
except Exception as e:
print_error(self.amqpc, e)
raise
westguideimg = GuideImage(guideimgpath[0])
eastguideimg = GuideImage(guideimgpath[1])
if positionguess is None:
starposition = westguideimg.findstars()
else:
westguideimg.guidestarposition = positionguess
westguideimg.update_guidestar_properties()
starposition = westguideimg.guidestarposition
starflux = westguideimg.guidestarflux
return starposition, starflux
[docs] def autoguiding(self, initposition,
initflux,
):
"""
Expose an image, and calculate offset from the image and initial values.
Compensate the offset.
Parameters
----------
tel
Telescope to autoguide
initposition
Position of guide stars when the autoguide is started
initflux
Flux of guide stars when the autoguide is started
"""
#self._pwi.offset(ra_add_arcsec=3, dec_add_arcsec=-2)
#time.sleep(1)
starposition, starflux = self.find_guide_stars(
positionguess=initposition,
)
print_debug(self.amqpc, f'initposition = {initposition} | initflux = {initflux}')
print_debug(self.amqpc, f'starposition = {starposition} | starflux = {starflux}')
if (
np.abs(
np.average(starflux / initflux - 1, weights=2.5 * np.log10(initflux * 10))
)
> usrpars.ag_flux_tolerance
):
return print_error(self.amqpc,
"Star flux variation %.3f is too large."
% np.abs(
np.average(
starflux / initflux - 1, weights=2.5 * np.log10(initflux * 10)
)
)
)
offset = np.mean(starposition - initposition, axis=0) # in x,y [pixel]
if (np.sqrt(offset[0] ** 2 + offset[1] ** 2)) > usrpars.ag_min_offset:
correction = self.offset(delta_x=-offset[0], delta_y=-offset[1])
print_debug(self.amqpc,
"compensate signal: ra %.2f arcsec dec %.2f arcsec x %.2f pixel y %.2f pixel"
% (correction[1], correction[2], -offset[0], -offset[1])
)
return correction[1:3]
else:
return [0.0, 0.0]
[docs] def zero_coordinates(self):
'''
Zero-out mount model once pointing is verified.
'''
pass #what is this?
[docs] def check_safety_interlock(self):
pass