Source code for camacq.plugins.automations

"""Handle automations."""

# Copyright 2013-2017 The Home Assistant Authors
# https://github.com/home-assistant/home-assistant/blob/master/LICENSE.md
# This file was modified by The Camacq Authors.
from __future__ import annotations

from collections import deque
from collections.abc import Callable, Generator
from functools import partial
import logging
from typing import TYPE_CHECKING, Any

import voluptuous as vol

from camacq.const import CAMACQ_STOP_EVENT, CONF_DATA, CONF_ID
from camacq.exceptions import TemplateError
from camacq.helper import BASE_ACTION_SCHEMA, get_module, has_at_least_one_key
from camacq.helper.template import make_template, render_template

if TYPE_CHECKING:
    from jinja2 import Template

    from camacq.control import Center
    from camacq.event import Event

_LOGGER = logging.getLogger(__name__)

CONF_AUTOMATIONS = "automations"
CONF_ACTION = "action"
CONF_CONDITION = "condition"
CONF_CONDITIONS = "conditions"
CONF_NAME = "name"
CONF_TRIGGER = "trigger"
CONF_TYPE = "type"
ENABLED = "enabled"
NAME = "name"
ACTION_DELAY = "delay"
ACTION_TOGGLE = "toggle"
DATA_AUTOMATIONS = "automations"

TRIGGER_ACTION_SCHEMA = vol.Schema(
    [
        {
            vol.Required(CONF_TYPE): vol.Coerce(str),
            vol.Required(CONF_ID): vol.Coerce(str),
            vol.Optional(CONF_DATA, default={}): dict,
        }
    ],
)

CONDITION_SCHEMA: Any = vol.All(
    has_at_least_one_key(CONF_TYPE, CONF_CONDITION),
    {
        vol.Inclusive(CONF_TYPE, "condition"): vol.All(
            vol.Upper, vol.In(["AND", "OR"])
        ),
        vol.Inclusive(CONF_CONDITIONS, "condition"): [
            lambda value: CONDITION_SCHEMA(value)
        ],
        vol.Exclusive(CONF_CONDITION, "condition"): vol.Coerce(str),
    },
)

CONFIG_SCHEMA = vol.Schema(
    [
        {
            vol.Required(CONF_NAME): vol.Coerce(str),
            vol.Required(CONF_TRIGGER): TRIGGER_ACTION_SCHEMA,
            vol.Required(CONF_ACTION): TRIGGER_ACTION_SCHEMA,
            vol.Optional(
                CONF_CONDITION, default={CONF_CONDITION: "true"}
            ): CONDITION_SCHEMA,
        }
    ]
)


[docs] async def setup_module(center: Center, config: dict[str, Any]) -> None: """Set up automations package. Parameters ---------- center : Center instance The Center instance. config : dict The config dict. """ _process_automations(center, config) automations: dict[str, Automation] = center.data[DATA_AUTOMATIONS] async def handle_action(**kwargs: Any) -> None: """Enable or disable an automation.""" name: str = kwargs[NAME] automation = automations[name] enabled: bool = kwargs.get(ENABLED, not automation.enabled) if enabled: automation.enable() else: automation.disable() toggle_action_schema = BASE_ACTION_SCHEMA.extend( { vol.Required(NAME): vol.All(vol.Coerce(str), vol.In(automations)), ENABLED: vol.Boolean(), } ) # register action to enable/disable automation center.actions.register( "automations", ACTION_TOGGLE, handle_action, toggle_action_schema )
def _process_automations(center: Center, config: dict[str, Any]) -> None: """Process automations from config.""" automations: dict[str, Automation] = center.data.setdefault(DATA_AUTOMATIONS, {}) conf: list[dict[str, Any]] = config[CONF_AUTOMATIONS] for block in conf: name: str = block[CONF_NAME] _LOGGER.debug("Setting up automation %s", name) action_sequence = _get_actions(center, block[CONF_ACTION]) cond_func = _process_condition(center, block[CONF_CONDITION]) # use partial to get a function with args to call later attach_triggers = partial(_process_trigger, center, block[CONF_TRIGGER]) automations[name] = Automation( center, name, attach_triggers, cond_func, action_sequence ) def _get_actions(center: Center, config_block: list[dict[str, Any]]) -> ActionSequence: """Return actions.""" actions: Generator[TemplateAction, None, None] = ( TemplateAction(center, action_conf) for action_conf in config_block ) return ActionSequence(center, actions) def _process_condition( center: Center, config_block: dict[str, Any] ) -> Callable[[dict[str, Any]], str | bool]: """Return a function that parses the condition.""" if CONF_TYPE in config_block: checks: list[Callable[[dict[str, Any]], str | bool]] = [] condition_type: str = config_block[CONF_TYPE] conditions: list[dict[str, Any]] = config_block[CONF_CONDITIONS] for cond in conditions: check = _process_condition(center, cond) checks.append(check) return make_checker(condition_type, checks) data: str = config_block[CONF_CONDITION] template: Template = make_template(center, data) return partial(render_template, template)
[docs] def make_checker( condition_type: str, checks: list[Callable[[dict[str, Any]], str | bool]] ) -> Callable[[dict[str, Any]], bool]: """Return a function to check condition.""" def check_condition(variables: dict[str, Any]) -> bool: """Return True if all or any condition(s) pass.""" if condition_type.lower() == "and": return all(template_check(check(variables)) for check in checks) if condition_type.lower() == "or": return any(template_check(check(variables)) for check in checks) return False return check_condition
[docs] def template_check(value: str | bool) -> bool: """Check if a rendered template string equals true. If value is not a string, return value as is. """ if isinstance(value, str): return value.lower() == "true" return value
def _process_trigger( center: Center, config_block: list[dict[str, Any]], trigger: Callable[[dict[str, Any]], Any], ) -> Callable[[], None] | None: """Process triggers for an automation.""" remove_funcs: list[Callable[[], None]] = [] for conf in config_block: trigger_id: str = conf[CONF_ID] trigger_type: str = conf[CONF_TYPE] trigger_mod = get_module(__name__, trigger_type) if not trigger_mod: continue _LOGGER.debug("Setting up trigger %s", trigger_id) remove = trigger_mod.handle_trigger(center, conf, trigger) if not remove: _LOGGER.error("Setting up trigger %s failed", trigger_id) continue remove_funcs.append(remove) if not remove_funcs: return None def remove_triggers() -> None: """Remove attached triggers.""" for remove in remove_funcs: remove() return remove_triggers
[docs] class Automation: """Automation class.""" def __init__( self, center: Center, name: str, attach_triggers: Callable[ [Callable[[dict[str, Any]], Any]], Callable[[], None] | None ], cond_func: Callable[[dict[str, Any]], str | bool], action_sequence: ActionSequence, enabled: bool = True, ) -> None: """Set up instance.""" self._center = center self.name = name self.enabled = False self._action_sequence = action_sequence self._attach_triggers = attach_triggers self._detach_triggers: Callable[[], None] | None = None self._cond_func = cond_func if enabled: self.enable() def __repr__(self) -> str: """Return the representation.""" return ( f"Automation(center={self._center}, name={self.name}, " f"attach_triggers={self._attach_triggers}, cond_func={self._cond_func}, " f"action_sequence={self._action_sequence}, enabled={self.enabled})" )
[docs] def enable(self) -> None: """Enable automation.""" if self.enabled: return self._detach_triggers = self._attach_triggers(self.trigger) self.enabled = True
[docs] def disable(self) -> None: """Disable automation.""" if not self.enabled: return if self._detach_triggers is not None: self._detach_triggers() self._detach_triggers = None self.enabled = False
[docs] async def trigger(self, variables: dict[str, Any]) -> None: """Run actions of this automation.""" variables["samples"] = self._center.samples _LOGGER.debug("Triggered automation %s", self.name) try: cond = self._cond_func(variables) except TemplateError as exc: _LOGGER.error("Failed to render condition for %s: %s", self.name, exc) return if cond: _LOGGER.debug("Condition passed for %s", self.name) await self._action_sequence(variables)
[docs] class ActionSequence: """Represent a sequence of actions.""" def __init__( self, center: Center, actions: Generator[TemplateAction, None, None] | list[TemplateAction], ) -> None: """Set up instance.""" self._center = center self.actions: list[TemplateAction] = list( actions ) # copy to list to make sure it's a list async def __call__(self, variables: dict[str, Any]) -> None: """Start action sequence.""" waiting: deque[TemplateAction] = deque(self.actions) while waiting: action = waiting.popleft() if action.action_type == "automations" and action.action_id == ACTION_DELAY: rendered_kwargs = action.render(variables) seconds = rendered_kwargs.get("seconds") if seconds is not None: self.delay(float(seconds), variables, waiting) else: _LOGGER.debug( "Calling action %s.%s", action.action_type, action.action_id ) await action(variables)
[docs] def delay( self, seconds: float, variables: dict[str, Any], waiting: deque[TemplateAction], ) -> None: """Delay action sequence. Parameters ---------- seconds : float A time interval to delay the pending action sequence. variables : dict A dict of template variables. """ sequence = ActionSequence(self._center, list(waiting)) callback = partial(self._center.create_task, sequence(variables)) waiting.clear() _LOGGER.info("Action delay for %s seconds", seconds) handle = self._center.loop.call_later(seconds, callback) async def cancel_pending_actions(center: Center, event: Event) -> None: """Cancel pending actions.""" handle.cancel() self._center.bus.register(CAMACQ_STOP_EVENT, cancel_pending_actions)
[docs] class TemplateAction: """Representation of an action with template data.""" def __init__(self, center: Center, action_conf: dict[str, Any]) -> None: """Set up instance.""" self._center = center self.action_id: str = action_conf[CONF_ID] self.action_type: str = action_conf[CONF_TYPE] action_data: dict[str, Any] = action_conf[CONF_DATA] self.template: Template = make_template(center, action_data) async def __call__(self, variables: dict[str, Any] | None = None) -> None: """Execute action with optional template variables.""" try: rendered = self.render(variables) except TemplateError: return await self._center.actions.call(self.action_type, self.action_id, **rendered)
[docs] def render(self, variables: dict[str, Any] | None) -> dict[str, Any]: """Render the template with the kwargs for the action.""" variables = variables or {} try: rendered: dict[str, Any] = render_template(self.template, variables) except TemplateError as exc: _LOGGER.error( "Failed to render variables for %s.%s: %s", self.action_type, self.action_id, exc, ) raise return rendered