Skip to content

Commit

Permalink
working on configuration parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
weatherhead99 committed Nov 22, 2023
1 parent 0d8ecbc commit f4a1dad
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 57 deletions.
90 changes: 43 additions & 47 deletions python/lsst/ts/observatory/control/auxtel/atcalsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class ATSpectrographSlits(NamedTuple):
FRONTENTRANCE: float
FRONTEXIT: float

@dataclass
class ATCalibrationSequenceStep(CalibrationSequenceStepBase):
grating: ATMonochromator.Grating
Expand Down Expand Up @@ -116,44 +116,6 @@ async def _setup_electrometer(self, int_time: float):
pass



async def verify_chiller_operation(self):
chiller_temps = await self._sal_readvalue_helper(self.ATWhiteLight, "chillerTemperatures")

self.log.debug(f"Chiller supply temperature: {chiller_temps.supplyTemperature:0.1f} C"
f"Chiller return temperature: {chiller_temps.returnTemperature:0.1f} C"
f"Chiller set temperature: {chiller_temps.setTemperature:0.1f} C"
f"Chiller ambient temperature: {chiller_temps.ambientTemperature:0.1f} C")



async def turn_on_light(self, lamp_power: Quantity["power"]) -> None:
#check lamp state first
lamp_state = await self._sal_readvalue_helper(self.ATWhiteLight, "lampState")
if lamp_state == ATWhiteLight.LampBasicState.On:
#nothing to do
return

#check the shutter state
shutter_state = await self._sal_waitevent(self.ATWhiteLight, "shutterState")
if shutter_state in [ATWhiteLight.ShutterState.Unknown, ATWhiteLight.ShutterState.Open]:
await self._sal_cmd(self.ATWhiteLight, "pcloseShutter")

power_watts = int(lamp_power.to(un.W).value)
#turn on lamp and let it warm up
await self._sal_cmd(self.ATWhiteLight, "turnLampOn", power=power_watts)

async def wait_ready(self) -> None:
#in the case of auxtel, need to wait for lamp to have warmed up
#check that lamp state
lamp_state = await self._sal_waitevent(self.ATWhiteLight, "lampState", run_immediate=False)
if lamp_state == ATWhiteLight.LampBasicState.On:
return

if lamp_state not in {ATWhiteLight.LampBasicState.Warmup, ATWhiteLight.LampBasicState.TurningOn}:
raise RuntimeError("unexpected lamp state when waiting for readiness!")


async def _electrometer_expose(self, exp_time: float) -> Awaitable[str]:
await self._sal_cmd(self.Electrometer, "startScanDt", scanDuration=exp_time,
run_immediate=False)
Expand All @@ -175,51 +137,85 @@ def script_time_estimate_s(self) -> float:
For now just returns a default long time"""

match(self._intention):
case(CalsysScriptIntention.POWER_ON):
case CalsysScriptIntention.POWER_ON | CalsysScriptIntention.POWER_OFF:
#for now just use fixed values from previous script
#start out with chiller time maximum
total_time: Quantity[un.physical.time] = self.CHILLER_COOLDOWN_TIMEOUT
#add on the lamp warmup timeout
total_time += self.WHITELIGHT_LAMP_WARMUP_TIMEOUT
total_time += self.SHUTTER_OPEN_TIMEOUT
return total_time.to(un.s).value
case(_):
case _:
raise NotImplementedError("don't know how to handle this script intention")

async def power_sequence_run(self, scriptobj: salobj.BaseScript):
match(self._intention):
case(CalsysScriptIntention.POWER_ON):
case CalsysScriptIntention.POWER_ON:
await self._chiller_power(True)
await scriptobj.checkpoint("Chiller started")
chiller_start, chiller_end = await self._chiller_settle(True)
self.log_event_timings(self.log, "chiller cooldown time", chiller_start, chiller_end,
self.CHILLER_COOLDOWN_TIMEOUT)

await scriptobj.checkpoint("Chiller setpoint temperature reached")
shutter_wait_fut = asyncio.create_task(self._lamp_power(True), "lamp_start_shutter_open")
lamp_settle_fut = asyncio.create_task(self._lamp_settle(True), "lamp_power_settle")
shutter_start, shutter_end = await shutter_wait_fut
self.log_event_timings(self.log, "shutter open time", shutter_start, shutter_end,
self.SHUTTER_OPEN_TIMEOUT)

await scriptobj.checkpoint("shutter open and lamp started")
lamp_settle_start, lamp_settle_end = await lamp_settle_fut
self.log_event_timings(self.log, "lamp warm up", lamp_settle_start, lamp_settle_end,
self.WHITELIGHT_LAMP_WARMUP_TIMEOUT)
self.log.info("lamp is warmed up, ATCalsys is powered on and ready")

case(CalsysScriptIntention.POWER_OFF):
case CalsysScriptIntention.POWER_OFF:
await self._lamp_power(False)
await scriptobj.checkpoint("lamp commanded off and shutter commanded closed")
shutter_wait_fut = asyncio.create_task(self._lamp_power(False), "lamp stop shutter close")
lamp_settle_fut = asyncio.create_task(self._lamp_settle(False), "lamp power settle")

shutter_start, shutter_end = await shutter_wait_fut
self.log_event_timings(self.log, "shutter close", shutter_start, shutter_end,
self.SHUTTER_OPEN_TIMEOUT)
await scriptobj.checkpoint("shutter closed annd lamp turned off")
lamp_settle_start, lamp_settle_end = await lamp_settle_fut

self.log_event_timings(self.log, "lamp cooldown", lamp_settle_start, lamp_settle_end,
self.WHITELIGHT_LAMP_WARMUP_TIMEOUT)
await scriptobj.checkpoint("lamp has cooled down")
await self._chiller_power(False)
self.log.info("chiller has been turned off, ATCalsys is powered down"!)


case(_):
case _:
raise NotImplementedError("don't know how to handle this script intention")

#TODO: log the start and end times
async def validate_hardware_status_for_acquisition(self) -> Awaitable[float]:
shutter_fut = self._sal_waitevent(self.ATWhiteLight, "shutterState")
lamp_fut = self._sal_waitevent(self.ATWhiteLight, "lampState")

shutter_state = await shutter_fut
if shutter_state.commandedState != ATWhiteLight.ShutterState.OPEN:
errmsg = f"shutter has not been commanded to open, likely a programming error. Commanded state is {repr(shutter_state.commandedState)}"
self.log.error(errmsg)
raise RuntimeError(errmsg)

if shutter_state.actualState != ATWhiteLight.ShutterState.OPEN:
errmsg = f"shutter is not open, its state is reported as {repr(shutter_state.actualState)}")
self.log.error(errmsg)
raise RuntimeError(errmsg)

lamp_state = await lamp_fut
if lamp_state.basicState != ATWhiteLight.LampBasicState.ON:
errmsg = f"lamp state is not on, its state is reported as {repr(lamp_state.basicState)}"
self.log.error(errmsg)
raise RuntimeError(errmsg)

if !lamp_state.lightDetected:
self.log.warning(f"all states seem fine, but lamp is not reporting light detected!")

lamp_power: float = lamp_state.setPower
return lamp_power

def _chiller_temp_check(self, temps) -> bool:
self.log.debug(f"Chiller supply temperature: {temps.supplyTemperature:0.1f} C "
Expand Down
27 changes: 17 additions & 10 deletions python/lsst/ts/observatory/control/base_calsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from astropy.units import ampere, watt, nm, Quantity
import astropy.units as un
import enum
from datetime import datetime

Responsivity: TypeAlias = Quantity[ampere / watt]

Expand Down Expand Up @@ -257,7 +258,18 @@ async def _long_wait_err_handle(self, gen: AsyncGenerator, timeout_seconds,
self.log.error(f"waited {wait_time} seconds but {name_of_wait} did not succeed")
raise err



@classmethod
def log_event_timings(cls, logger, time_evt_name: str,
start_time: datetime, end_time: datetime,
expd_duration: Quantity[un.physical.time]) -> None:
logstr = f"event: {time_evt_name} started at {start_time} and finished at {end_time}"
logger.info(logstr)
duration = (start_time - end_time).total_seconds() << un.s
logstr2 = f"the duration was: {duration}, and our timeout allowance was: {expt_duration}"
logger.info(logstr2)



async def take_electrometer_exposures(
self, electrobj, exp_time_s: float, n: int
Expand Down Expand Up @@ -300,18 +312,13 @@ def pd_exposure_time_for_nelectrons(self, nelec: float) -> float:


@abstractmethod
async def power_sequence_run(self, scriptobj, **kwargs):
async def validate_hardware_status_for_acquisition(self) -> Awaitable:
pass

@abstractmethod
async def turn_on_light(self, **kwargs) -> None:
"""awaitable command which turns on the calibration light, having
already set up the appropriate wavelength and (if applicable) time delays for stabilization etc
"""
async def power_sequence_run(self, scriptobj, **kwargs):
pass

@abstractmethod
async def turn_off_light(self) -> None:
"""awaitable which turns off the calibration light"""

@abstractmethod
async def setup_for_wavelength(self, wavelen: float, **extra_params) -> None:
Expand Down

0 comments on commit f4a1dad

Please sign in to comment.