Skip to content

Commit

Permalink
fix error in call in atcalsys, start documenting base_calsys
Browse files Browse the repository at this point in the history
  • Loading branch information
weatherhead99 authored and dsanmartim committed Feb 13, 2024
1 parent 69635d7 commit c18e689
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 21 deletions.
2 changes: 1 addition & 1 deletion python/lsst/ts/observatory/control/auxtel/atcalsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def setup_for_wavelength(
wavelength=wavelen,
)

elect_fut = self._sal_cmd("electrometer", "performZeroCalib")
elect_fut = self._sal_cmd(self.Electrometer, "performZeroCalib")
elect_fut2 = self._sal_cmd(
self.Electrometer,
"setDigitalFilter",
Expand Down
166 changes: 146 additions & 20 deletions python/lsst/ts/observatory/control/base_calsys.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
# This file is part of ts_observatory_control.
#
# Developed for the Vera Rubin Observatory Telescope and Site Systems.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License

__all__ = ["CalsysScriptIntention", "CalsysThroughputCalculationMixin", "BaseCalsys"]


import asyncio
import csv
import enum
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncGenerator, Callable, Coroutine
from dataclasses import dataclass
from datetime import datetime
from functools import reduce
from importlib.resources import files
from itertools import count
from typing import (
Any,
Awaitable,
Iterable,
Mapping,
Optional,
Sequence,
Expand Down Expand Up @@ -44,14 +65,9 @@ def _calsys_get_parameter(
return factory_callable(*factory_args, **factory_kwargs)


@dataclass
class CalibrationSequenceStepBase:
wavelength: float
n_exp: int
exp_time: float


class CalsysScriptIntention(enum.IntEnum):
"""Enum which indicates what a SAL script will be using the calsys for."""

TURN_ON = 0
TURN_OFF = 1
QUICK_CALIBRATION_RUN = 2
Expand All @@ -62,7 +78,7 @@ class CalsysScriptIntention(enum.IntEnum):


class CalsysThroughputCalculationMixin:
"""mixin class to allow pluggable source for calculation of throughputs"""
"""Mixin class to allow pluggable source for calculation of throughputs."""

POWER_LINE_FREQUENCY = 60 / (un.s)

Expand Down Expand Up @@ -113,12 +129,14 @@ def end_to_end_throughput(self, wavelen: float, calsys_power: float) -> float:
def total_radiometer_exposure_time(
self, rad_exposure_time: Quantity[un.physical.time], nplc: float
) -> Quantity[un.physical.time]:
# Note comm from Parker: "valid nplc values are from 0.01 to 10 (seconds)
# Note comm from Parker:
# "valid nplc values are from 0.01 to 10 (seconds)
if not (0.01 <= nplc <= 10.0):
raise ValueError(
f"supplied valud for nplc: {nplc} is not within allowed values 0.01 <= nplc <= 10"
)
# Note: magic numbers from communication with Parker F. To be added to electrometer CSC docs
# Note: magic numbers from communication with Parker F.
# To be added to electrometer CSC docs
rad_int_time = nplc / self.POWER_LINE_FREQUENCY
time_sep = (rad_int_time * 3.07) + 0.00254
# NOTE: is 0.00254 a metric -> imperial conversion?
Expand All @@ -135,13 +153,11 @@ def total_radiometer_exposure_time(


class ButlerCalsysThroughput(CalsysThroughputCalculationMixin):
"""Mixin class for calculating throughput of the calibration system backed by measurements stored
in a DM butler"""
"""Calculating throughput of the calibration system backed by measurements stored in a DM butler"""


class HardcodeCalsysThroughput(CalsysThroughputCalculationMixin):
"""Mixin class for calculating throughput of the calibration system with hardcoded values,
i.e. which can be directly imported from python code"""
"""Calculating throughput of the calibration system with hardcoded values, i.e. which can be directly imported from python code"""

BASERES: str = "lsst.ts.observatory.control.cal_curves"
RADIOMETER_CALFILE: str = "hamamatsu_responsivity.csv"
Expand All @@ -157,7 +173,6 @@ def load_calibration_csv(cls, fname: str) -> Mapping[str, Sequence[float]]:
rdr = csv.DictReader(f)
if rdr.fieldnames is None:
raise ValueError("calibration curve has no fieldnames!")

out: dict[str, list[float]] = {k: [] for k in rdr.fieldnames}
for row in rdr:
for k, v in row.items():
Expand All @@ -166,9 +181,36 @@ def load_calibration_csv(cls, fname: str) -> Mapping[str, Sequence[float]]:
return out

def _ensure_itp(
self, itpname: str, fname: str, xaxis: str, yaxis: str, **itpargs
) -> InterpolatedUnivariateSpline:
self, itpname: str, fname: Optional[str] = None, xaxis: Optional[str] = None,
yaxis: Optional[str] = None, **itpargs) -> InterpolatedUnivariateSpline:
"""Obtain an interpolated spline from a calibration curve.
If the calibration curve has already been loaded, returns the existing spline interpolation object.
Otherwise, will load the calibration data from the filename supplied
Parameters
----------
itpname : Optional[str]
The name from which to lookup the interpolation object.
When object is first loaded, will lookup an interpolation object with this name
fname : Optional[str]
The file name from which to load the calibration data.
xaxis : Optional[str]
The column name of the x-axis interpolator
yaxis : Optional[str]
The column name of the y-axis interpolator
**itpargs : 7
Arguments which will be passed through to scipy InterpolatedUnivariateSpline
Returns
-------
a scipy InterpolatedUnivariateSpline object which is callable and can be used to
obtain interpolated calibration values.
"""

if itpname not in self._itps:
if any(_ is None for _ in [fname, xaxis, yaxis]):
raise ValueError("missing required value to load calibration data")
calres = self.load_calibration_csv(fname)
itp = InterpolatedUnivariateSpline(calres[xaxis], calres[yaxis], **itpargs)
self._itps[itpname] = itp
Expand Down Expand Up @@ -198,7 +240,8 @@ def maintel_throughput(


class BaseCalsys(RemoteGroup, metaclass=ABCMeta):
"""Base class for calibration systems"""
"""Base class for calibration systems operation
"""

CMD_TIMEOUT: Quantity[un.physical.time] = 30 << un.s
EVT_TIMEOUT: Quantity[un.physical.time] = 30 << un.s
Expand All @@ -213,6 +256,24 @@ def __init__(
cmd_timeout: Optional[int] = 10,
log: Optional[logging.Logger] = None,
):
"""Construct a new BaseCalsys object
NOTE: should not generally be constructed directly by a user either interactively or in a SAL script
Parameters
----------
intention : CalsysScriptIntention
Configures the general behaviour of calls according to what the script is going to do
components : list[str]
SAL components to initialize, should be overridden in daughter class
domain : Optional[salobj.domain.Domain]
DDS OpenSplice domain to use
cmd_timeout : Optional[int]
timeout (measured in seconds) for DDS OpenSplice commands
log : Optional[logging.Logger]
existing logging object
"""
super().__init__(
components,
domain,
Expand All @@ -226,6 +287,46 @@ def __init__(
def _sal_cmd(
self, obj: salobj.Remote, cmdname: str, run_immediate: bool = True, **setargs
) -> TaskOrCoro:
"""Helper function that Runs a command on a remote SAL object
This function is mainly here to avoid a lot of boilerplate that otherwise
accumulates in the daughter classes
Parameters
----------
obj : salobj.Remote
SAL remote object to call the command on
cmdname : str
the name of the command to call (look up in the generated XML
definitions for the respective CSC), excluding the conventional 'cmd_' prefix
run_immediate : bool
chooses whether to return an event-loop posted future (which is done using
asyncio.create_task, or whether to return an un-posted coroutine function
if True, returns the future, if False, returns the coroutine
**setargs
extra arguments that are passed to the SAL remote object set function
Returns
-------
TaskOrCoro
the packaged future or coroutine (which type depends on the value of run_immediate)
Examples
--------
# some function calls that get us the relevant objects, in this case an Electrometer
calsys: BaseCalsys = get_calsys_object()
salobj: sal.Remote = get_electrometer_object()
assert type(salobj) is Electrometer
# cal the "cmd_performZeroCalib" operation on the electrometer, returning a future,
with the task having already been posted to the running event loop
fut = calsys._sal_cmd(salobj, salobj, "performZeroCalib")
"""
timeout = self.CMD_TIMEOUT.to(un.s).value
cmdfun: salobj.topics.RemoteCommand = getattr(obj, f"cmd_{cmdname}")
pkgtask = cmdfun.set_start(**setargs, timeout=timeout)
Expand All @@ -241,6 +342,31 @@ def _sal_waitevent(
flush: bool = True,
**evtargs,
) -> TaskOrCoro:
"""A helper function which waits for an event on a SAL remote object
Parameters
----------
obj : salobj.Remote
SAL remote object to wait for the telemetry or event on
evtname : str
name of the event to wait for (excluding the conventional prefix 'evt_')
run_immediate : bool
Whether to return a posted future or an un-posted coroutine,
see documentation for BaseCalsys._sal_cmd
flush : bool
Whether to flush the remote SALobject DDS event queue for this
particular event
(i.e. whether to definitely force wait for a new event rather than popping
from a queue) - see documentation for salobj.Remote
**evtargs : 7
extra arguments that get passed to the .next() method of the SAL object
Returns
-------
TaskOrCoro
posted future or unposted coro, according to value of run_immediate,
as per BaseCalsys._sal_cmd
"""
timeout = self.EVT_TIMEOUT.to(un.s).value
cmdfun: salobj.topics.RemoteEvent = getattr(obj, f"evt_{evtname}")
pkgtask = cmdfun.next(timeout=timeout, flush=flush)
Expand Down

0 comments on commit c18e689

Please sign in to comment.