Source code for camacq.plugins.leica

"""Leica microscope API specific modules."""

from __future__ import annotations

import asyncio
from functools import partial
import logging
import tempfile
from typing import TYPE_CHECKING, Any, ClassVar

from leicacam.async_cam import AsyncCAM
from leicacam.cam import bytes_as_dict, check_messages, tuples_as_bytes
from leicaimage import attribute, attribute_as_str
import voluptuous as vol

from camacq.const import CAMACQ_STOP_EVENT
from camacq.helper import ensure_dict
from camacq.plugins.api import (
    Api,
    CommandEvent,
    ImageEvent,
    StartCommandEvent,
    StopCommandEvent,
    register_api,
)

from .command import start, stop
from .helper import find_image_path, get_field, get_imgs
from .sample import setup_module as sample_setup_module

if TYPE_CHECKING:
    from camacq.control import Center
    from camacq.event import Event

_LOGGER = logging.getLogger(__name__)

CONF_HOST = "host"
CONF_IMAGING_DIR = "imaging_dir"
CONF_LEICA = "leica"
CONF_PORT = "port"
JOB_ID = "--E{:02d}"
LEICA_COMMAND_EVENT = "leica_command_event"
LEICA_START_COMMAND_EVENT = "leica_start_command_event"
LEICA_STOP_COMMAND_EVENT = "leica_stop_command_event"
LEICA_IMAGE_EVENT = "leica_image_event"
REL_IMAGE_PATH = "relpath"
SCAN_FINISHED = "scanfinished"
SCAN_STARTED = "scanstart"
START_STOP_DELAY = 2.0

CONFIG_SCHEMA = vol.Schema(
    vol.All(
        ensure_dict,
        {
            vol.Optional(CONF_HOST, default="localhost"): vol.Coerce(str),
            vol.Optional(CONF_PORT, default=8895): vol.Coerce(int),
            vol.Optional(CONF_IMAGING_DIR, default=tempfile.gettempdir()): vol.IsDir(),
        },
    )
)


[docs] async def setup_module(center: Center, config: dict[str, Any]) -> None: """Set up Leica api package. Parameters ---------- center : Center instance The Center instance. config : dict The config dict. """ await sample_setup_module(center, config) conf: dict[str, Any] = config[CONF_LEICA] host: str = conf[CONF_HOST] port: int = conf[CONF_PORT] cam = AsyncCAM(host, port) try: await cam.connect() except OSError as exc: _LOGGER.error("Connecting to server %s failed: %s", host, exc) return api = LeicaApi(center, conf, cam) register_api(center, api) # Start task that calls receive on the socket to the microscope task = center.create_task(api.start_listen()) async def stop_listen(center: Center, event: Event) -> None: """Stop the task that listens to the client socket.""" task.cancel() await task api.client.close() center.bus.register(CAMACQ_STOP_EVENT, stop_listen)
[docs] class LeicaApi(Api): """Represent the Leica API.""" def __init__( self, center: Center, config: dict[str, Any], client: AsyncCAM ) -> None: """Set up the Leica API.""" self.center = center self.client = client self.config = config self._last_image_path: str | None = None @property def name(self) -> str: """Return the name of the API.""" return __name__
[docs] async def start_listen(self) -> None: """Receive from the microscope socket.""" try: while True: reply = await self.client.receive() self.center.create_task(self.receive(reply)) # type: ignore[arg-type] except asyncio.CancelledError: _LOGGER.debug("Stopped listening for messages from CAM")
[docs] async def receive(self, replies: list[dict[str, Any]] | dict[str, Any]) -> None: """Receive replies from CAM server and fire an event per reply. Parameters ---------- replies : list A list of replies from the CAM server. """ # if reply check reply and call correct listener # parse reply and create Event # await event notify in sequential order # reply must be an iterable if not isinstance(replies, list): replies = [replies] for reply in replies: if not reply or not isinstance(reply, dict): continue if REL_IMAGE_PATH in reply: imaging_dir: str = self.config[CONF_IMAGING_DIR] rel_path: str = reply[REL_IMAGE_PATH] if rel_path == self._last_image_path: # guard against duplicate image events from the microscope _LOGGER.debug("Duplicate image reply received: %s", rel_path) continue self._last_image_path = rel_path image_path = find_image_path(rel_path, imaging_dir) field_path = await self.center.add_executor_job(get_field, image_path) image_paths = await self.center.add_executor_job( partial( get_imgs, field_path, search=JOB_ID.format(attribute(image_path, "E")), ) ) for path in image_paths: # await in sequential order await self.center.bus.notify(LeicaImageEvent({"path": path})) elif SCAN_STARTED in list(reply.values()): await self.center.bus.notify(LeicaStartCommandEvent(reply)) elif SCAN_FINISHED in list(reply.values()): await self.center.bus.notify(LeicaStopCommandEvent(reply)) else: await self.center.bus.notify(LeicaCommandEvent(reply))
[docs] async def send( self, command: list[tuple[str, str]] | str, **kwargs: Any, ) -> asyncio.Future[bool] | bool: """Send a command to the Leica API. Parameters ---------- command : list of tuples or string The command to send. """ block: bool = kwargs.get("block", True) if isinstance(command, str): command_dict = bytes_as_dict(command.encode()) command = list(command_dict.items()) cmd, value = command[0] # use the first cmd and value to wait for cmd_sent: asyncio.Future[bool] = self.center.loop.create_future() async def receive_reply(center: Center, event: LeicaCommandEvent) -> None: """Indicate that reply has been received.""" if check_messages([event.data], cmd, value=value): # type: ignore[list-item] if not cmd_sent.done(): cmd_sent.set_result(True) remove = self.center.bus.register( LEICA_COMMAND_EVENT, receive_reply, # type: ignore[arg-type] ) cmd_sent.add_done_callback(lambda x: remove()) await self.client.send(command) if not block: return cmd_sent return await cmd_sent
[docs] async def start_imaging(self) -> None: """Send a command to the microscope to start the imaging.""" await self._start_stop_imaging(start(), LEICA_START_COMMAND_EVENT, SCAN_STARTED) # A delay is needed after starting. await asyncio.sleep(START_STOP_DELAY)
[docs] async def stop_imaging(self) -> None: """Send a command to the microscope to stop the imaging.""" # A delay is needed before and after stopping. await asyncio.sleep(START_STOP_DELAY) await self._start_stop_imaging(stop(), LEICA_STOP_COMMAND_EVENT, SCAN_FINISHED) await asyncio.sleep(START_STOP_DELAY)
async def _start_stop_imaging( self, cmd: list[tuple[str, str]], event: str, ack_cmd: str ) -> None: """Send a command to the microscope to start or stop the imaging.""" cmd_sent: asyncio.Future[bool] = self.center.loop.create_future() async def receive_reply(center: Center, event: Event) -> None: """Indicate that reply has been received.""" if not cmd_sent.done(): cmd_sent.set_result(True) remove = self.center.bus.register(event, receive_reply) cmd_sent.add_done_callback(lambda x: remove()) trigger_cmd_sent = await self.send(cmd, block=False) _LOGGER.info("Waiting for %s message for 10 seconds", ack_cmd) try: async with asyncio.timeout(10.0): await asyncio.wait( # type: ignore[type-var] [cmd_sent, trigger_cmd_sent] ) except TimeoutError: _LOGGER.info("No acknowledgement event received, continuing anyway")
[docs] class LeicaCommandEvent(CommandEvent): """Leica CommandEvent class.""" __slots__ = () event_type: ClassVar[str] = LEICA_COMMAND_EVENT @property def command(self) -> str: """Return the command string.""" return tuples_as_bytes(list(self.data.items())).decode()
[docs] class LeicaStartCommandEvent(StartCommandEvent, LeicaCommandEvent): """Leica StartCommandEvent class.""" __slots__ = () event_type: ClassVar[str] = LEICA_START_COMMAND_EVENT
[docs] class LeicaStopCommandEvent(StopCommandEvent, LeicaCommandEvent): """Leica StopCommandEvent class.""" __slots__ = () event_type: ClassVar[str] = LEICA_STOP_COMMAND_EVENT
[docs] class LeicaImageEvent(ImageEvent): """Leica ImageEvent class.""" __slots__ = () event_type: ClassVar[str] = LEICA_IMAGE_EVENT @property def path(self) -> str: """:str: Return absolute path to the image.""" return self.data.get("path", "") @property def well_x(self) -> int | None: """:int: Return x coordinate of the well of the image.""" return attribute(self.path, "U") @property def well_y(self) -> int | None: """:int: Return y coordinate of the well of the image.""" return attribute(self.path, "V") @property def field_x(self) -> int | None: """:int: Return x coordinate of the well of the image.""" return attribute(self.path, "X") @property def field_y(self) -> int | None: """:int: Return y coordinate of the well of the image.""" return attribute(self.path, "Y") @property def z_slice_id(self) -> int | None: """:int: Return z index of the image.""" return attribute(self.path, "Z") @property def channel_id(self) -> int | None: """:int: Return channel id of the image.""" return attribute(self.path, "C") @property def job_id(self) -> int | None: """:int: Return job id of the image.""" return attribute(self.path, "E") @property def plate_name(self) -> str | None: """:str: Return plate name of the image.""" return attribute_as_str(self.path, "S")