"""Microscope API specific modules."""
from __future__ import annotations
import asyncio
import json
import logging
from typing import TYPE_CHECKING, Any, ClassVar
import voluptuous as vol
from camacq.const import (
COMMAND_EVENT,
IMAGE_EVENT,
START_COMMAND_EVENT,
STOP_COMMAND_EVENT,
)
from camacq.event import Event
from camacq.helper import BASE_ACTION_SCHEMA
if TYPE_CHECKING:
from camacq.control import Center
_LOGGER = logging.getLogger(__name__)
COMMAND_VALIDATOR = vol.Any([(str, str)], vol.Coerce(str))
[docs]
def validate_commands(value: Any) -> list[Any]:
"""Validate a template string via JSON."""
if isinstance(value, str):
try:
return json.loads(value)
except ValueError as exc:
raise vol.Invalid(f"Invalid commands: {value}") from exc
else:
schema = vol.Schema([COMMAND_VALIDATOR])
return schema(value)
ACTION_SEND = "send"
ACTION_SEND_MANY = "send_many"
ACTION_START_IMAGING = "start_imaging"
ACTION_STOP_IMAGING = "stop_imaging"
CONF_API = "api"
DATA_API = "api"
SEND_ACTION_SCHEMA = BASE_ACTION_SCHEMA.extend(
{"api_name": vol.Coerce(str), "command": COMMAND_VALIDATOR}
)
SEND_MANY_ACTION_SCHEMA = BASE_ACTION_SCHEMA.extend({"commands": validate_commands})
START_IMAGING_ACTION_SCHEMA = STOP_IMAGING_ACTION_SCHEMA = BASE_ACTION_SCHEMA
ACTION_TO_METHOD: dict[str, dict[str, Any]] = {
ACTION_SEND: {"method": "send", "schema": SEND_ACTION_SCHEMA},
ACTION_SEND_MANY: {"method": "send_many", "schema": SEND_MANY_ACTION_SCHEMA},
ACTION_START_IMAGING: {
"method": "start_imaging",
"schema": START_IMAGING_ACTION_SCHEMA,
},
ACTION_STOP_IMAGING: {
"method": "stop_imaging",
"schema": STOP_IMAGING_ACTION_SCHEMA,
},
}
[docs]
def register_api(center: Center, api: Api) -> None:
"""Register api."""
api_store: dict[str, Api] = center.data.setdefault(DATA_API, {})
api_store[api.name] = api
[docs]
async def setup_module(center: Center, config: dict[str, Any]) -> None:
"""Set up the microscope API package.
Parameters
----------
center : Center instance
The Center instance.
config : dict
The config dict.
"""
api_store: dict[str, Api] = center.data.setdefault(DATA_API, {})
async def handle_action(**kwargs: Any) -> None:
"""Handle action call to send a command to an api.
Parameters
----------
**kwargs
Arbitrary keyword arguments. These will be passed to the
api method when an action is called.
"""
action_id = kwargs.pop("action_id")
method = ACTION_TO_METHOD[action_id]["method"]
api_name = kwargs.pop("api_name", None)
if api_name:
apis = [api_store[api_name]]
else:
apis = list(api_store.values())
tasks: list[asyncio.Task[Any]] = []
for api in apis:
_LOGGER.debug("Handle API %s action %s: %s", api.name, action_id, kwargs)
tasks.append(center.create_task(getattr(api, method)(**kwargs)))
if tasks:
await asyncio.wait(tasks)
for action_id, options in ACTION_TO_METHOD.items():
schema = options["schema"]
center.actions.register("command", action_id, handle_action, schema)
[docs]
class Api:
"""Represent the microscope API."""
@property
def name(self) -> str:
"""Return the name of the API."""
raise NotImplementedError()
[docs]
async def send(self, command: Any, **kwargs: Any) -> Any:
"""Send a command to the microscope API.
Parameters
----------
command : str
The command to send.
"""
raise NotImplementedError()
[docs]
async def send_many(self, commands: list[Any], **kwargs: Any) -> None:
"""Send multiple commands to the microscope API.
Parameters
----------
commands : list
A list of commands to send.
"""
for cmd in commands:
# It's important that each task is done before we start the next.
await self.send(cmd, **kwargs)
[docs]
async def start_imaging(self) -> None:
"""Send a command to the microscope to start the imaging."""
raise NotImplementedError()
[docs]
async def stop_imaging(self) -> None:
"""Send a command to the microscope to stop the imaging."""
raise NotImplementedError()
[docs]
class CommandEvent(Event):
"""An event received from the API.
Notify with this event when a command is received via API.
"""
__slots__ = ()
event_type: ClassVar[str] = COMMAND_EVENT
@property
def command(self) -> str | None:
""":str: Return the command string."""
return None
[docs]
class StartCommandEvent(CommandEvent):
"""An event received from the API.
Notify with this event when imaging starts via API.
"""
__slots__ = ()
event_type: ClassVar[str] = START_COMMAND_EVENT
[docs]
class StopCommandEvent(CommandEvent):
"""An event received from the API.
Notify with this event when imaging stops via API.
"""
__slots__ = ()
event_type: ClassVar[str] = STOP_COMMAND_EVENT
[docs]
class ImageEvent(Event):
"""An event received from the API.
Notify with this event when an image is saved via API.
"""
__slots__ = ()
event_type: ClassVar[str] = IMAGE_EVENT
@property
def path(self) -> str:
""":str: Return absolute path to the image."""
path = self.data.get("path")
if path is None:
raise NotImplementedError()
return path
@property
def well_x(self) -> int | None:
""":int: Return x coordinate of the well of the image."""
return self.data.get("well_x")
@property
def well_y(self) -> int | None:
""":int: Return y coordinate of the well of the image."""
return self.data.get("well_y")
@property
def field_x(self) -> int | None:
""":int: Return x coordinate of the well of the image."""
return self.data.get("field_x")
@property
def field_y(self) -> int | None:
""":int: Return y coordinate of the well of the image."""
return self.data.get("field_y")
@property
def z_slice_id(self) -> int | None:
""":int: Return z index of the image."""
return self.data.get("z_slice_id")
@property
def channel_id(self) -> int | None:
""":int: Return channel id of the image."""
return self.data.get("channel_id")
@property
def plate_name(self) -> str | None:
""":str: Return plate name of the image."""
return self.data.get("plate_name")
def __repr__(self) -> str:
"""Return the representation."""
data = {
"plate_name": self.plate_name,
"well_x": self.well_x,
"well_y": self.well_y,
"field_x": self.field_x,
"field_y": self.field_y,
"z_slice_id": self.z_slice_id,
"channel_id": self.channel_id,
}
return f"{type(self).__name__}(data={data})"