Source code for camacq.plugins.api

"""Microscope API specific modules."""

from __future__ import annotations

import asyncio
import json
import logging
from typing import TYPE_CHECKING, Any, ClassVar

import voluptuous as vol

from camacq.const import (
    COMMAND_EVENT,
    IMAGE_EVENT,
    START_COMMAND_EVENT,
    STOP_COMMAND_EVENT,
)
from camacq.event import Event
from camacq.helper import BASE_ACTION_SCHEMA

if TYPE_CHECKING:
    from camacq.control import Center

_LOGGER = logging.getLogger(__name__)

COMMAND_VALIDATOR = vol.Any([(str, str)], vol.Coerce(str))


[docs] def validate_commands(value: Any) -> list[Any]: """Validate a template string via JSON.""" if isinstance(value, str): try: return json.loads(value) except ValueError as exc: raise vol.Invalid(f"Invalid commands: {value}") from exc else: schema = vol.Schema([COMMAND_VALIDATOR]) return schema(value)
ACTION_SEND = "send" ACTION_SEND_MANY = "send_many" ACTION_START_IMAGING = "start_imaging" ACTION_STOP_IMAGING = "stop_imaging" CONF_API = "api" DATA_API = "api" SEND_ACTION_SCHEMA = BASE_ACTION_SCHEMA.extend( {"api_name": vol.Coerce(str), "command": COMMAND_VALIDATOR} ) SEND_MANY_ACTION_SCHEMA = BASE_ACTION_SCHEMA.extend({"commands": validate_commands}) START_IMAGING_ACTION_SCHEMA = STOP_IMAGING_ACTION_SCHEMA = BASE_ACTION_SCHEMA ACTION_TO_METHOD: dict[str, dict[str, Any]] = { ACTION_SEND: {"method": "send", "schema": SEND_ACTION_SCHEMA}, ACTION_SEND_MANY: {"method": "send_many", "schema": SEND_MANY_ACTION_SCHEMA}, ACTION_START_IMAGING: { "method": "start_imaging", "schema": START_IMAGING_ACTION_SCHEMA, }, ACTION_STOP_IMAGING: { "method": "stop_imaging", "schema": STOP_IMAGING_ACTION_SCHEMA, }, }
[docs] def register_api(center: Center, api: Api) -> None: """Register api.""" api_store: dict[str, Api] = center.data.setdefault(DATA_API, {}) api_store[api.name] = api
[docs] async def setup_module(center: Center, config: dict[str, Any]) -> None: """Set up the microscope API package. Parameters ---------- center : Center instance The Center instance. config : dict The config dict. """ api_store: dict[str, Api] = center.data.setdefault(DATA_API, {}) async def handle_action(**kwargs: Any) -> None: """Handle action call to send a command to an api. Parameters ---------- **kwargs Arbitrary keyword arguments. These will be passed to the api method when an action is called. """ action_id = kwargs.pop("action_id") method = ACTION_TO_METHOD[action_id]["method"] api_name = kwargs.pop("api_name", None) if api_name: apis = [api_store[api_name]] else: apis = list(api_store.values()) tasks: list[asyncio.Task[Any]] = [] for api in apis: _LOGGER.debug("Handle API %s action %s: %s", api.name, action_id, kwargs) tasks.append(center.create_task(getattr(api, method)(**kwargs))) if tasks: await asyncio.wait(tasks) for action_id, options in ACTION_TO_METHOD.items(): schema = options["schema"] center.actions.register("command", action_id, handle_action, schema)
[docs] class Api: """Represent the microscope API.""" @property def name(self) -> str: """Return the name of the API.""" raise NotImplementedError()
[docs] async def send(self, command: Any, **kwargs: Any) -> Any: """Send a command to the microscope API. Parameters ---------- command : str The command to send. """ raise NotImplementedError()
[docs] async def send_many(self, commands: list[Any], **kwargs: Any) -> None: """Send multiple commands to the microscope API. Parameters ---------- commands : list A list of commands to send. """ for cmd in commands: # It's important that each task is done before we start the next. await self.send(cmd, **kwargs)
[docs] async def start_imaging(self) -> None: """Send a command to the microscope to start the imaging.""" raise NotImplementedError()
[docs] async def stop_imaging(self) -> None: """Send a command to the microscope to stop the imaging.""" raise NotImplementedError()
[docs] class CommandEvent(Event): """An event received from the API. Notify with this event when a command is received via API. """ __slots__ = () event_type: ClassVar[str] = COMMAND_EVENT @property def command(self) -> str | None: """:str: Return the command string.""" return None
[docs] class StartCommandEvent(CommandEvent): """An event received from the API. Notify with this event when imaging starts via API. """ __slots__ = () event_type: ClassVar[str] = START_COMMAND_EVENT
[docs] class StopCommandEvent(CommandEvent): """An event received from the API. Notify with this event when imaging stops via API. """ __slots__ = () event_type: ClassVar[str] = STOP_COMMAND_EVENT
[docs] class ImageEvent(Event): """An event received from the API. Notify with this event when an image is saved via API. """ __slots__ = () event_type: ClassVar[str] = IMAGE_EVENT @property def path(self) -> str: """:str: Return absolute path to the image.""" path = self.data.get("path") if path is None: raise NotImplementedError() return path @property def well_x(self) -> int | None: """:int: Return x coordinate of the well of the image.""" return self.data.get("well_x") @property def well_y(self) -> int | None: """:int: Return y coordinate of the well of the image.""" return self.data.get("well_y") @property def field_x(self) -> int | None: """:int: Return x coordinate of the well of the image.""" return self.data.get("field_x") @property def field_y(self) -> int | None: """:int: Return y coordinate of the well of the image.""" return self.data.get("field_y") @property def z_slice_id(self) -> int | None: """:int: Return z index of the image.""" return self.data.get("z_slice_id") @property def channel_id(self) -> int | None: """:int: Return channel id of the image.""" return self.data.get("channel_id") @property def plate_name(self) -> str | None: """:str: Return plate name of the image.""" return self.data.get("plate_name") def __repr__(self) -> str: """Return the representation.""" data = { "plate_name": self.plate_name, "well_x": self.well_x, "well_y": self.well_y, "field_x": self.field_x, "field_y": self.field_y, "z_slice_id": self.z_slice_id, "channel_id": self.channel_id, } return f"{type(self).__name__}(data={data})"