======================= Writing Adapter Actions ======================= Adapter Actions are the fundamental building blocks for interacting with software and hardware systems in your lab. Actions are written as python methods on an ActionModule and must be decorated with an :deco:`~artificial.adapter_common.action` decorator. Actions are executed by the Executor when a user runs a Workflow that calls the Action. .. code-block:: python from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action @ability_name('incubate') class Incubator(ActionModule): @action('Incubate') async def incubate(self, actx: ActionExecutionContext) -> None: # Tell hardware to incubate pass A Workflow that calls this Incubate Action will include an entry like this in its timeline: .. image:: _static/bare_incubate_action.png :width: 200px .. note:: Actions must be associated with an Ability to be run from a Workflow. See :doc:`adapter_actors_abilities` for more information. Action Stubs ------------ Since Actions exist in the Adapter (which is run on the IPC in your lab) and the Workflow runs in the Artificial Cloud, the Workflow needs a way to know what Actions are available in the Adapter. This is done via Action Stubs. An Action Stub is a simple python function that is generated by the Artificial Workflow Development VSCode Extension and captures relevant information about the Action's name, parameters, associated ability, and location within the Adapter—everything that the Executor needs to make sure the right Action gets called at runtime. If an :ref:`Action takes or returns dataclasses `, those dataclasses are also generated as stubs. Actions Stubs are generated in the :file:`workflow/stubs/stubs_actions.py` file in your repository. Feel free to take a look at them, but **don't edit the file directly!** Any changes will be overwritten the next time you generate the Stubs. To generate Action Stubs, click the :guilabel:`Generate Action Stubs` button in the Artificial Workflow Authoring Extension, or run the ``Generate Action Stubs`` command from the Command Palette in VSCode. .. image:: _static/generate_action_stubs.png Writing an Adapter Action is done in three parts: 1. Write the ActionModule and Action 2. Register the ActionModule with the Adapter Plugin 3. Configure the Actor and Ability for the ActionModule Action Parameters ----------------- Actions can take parameters, which are passed in from the Workflow. Parameters are defined as arguments to the Action method. Parameters can be of any json-serializable type, including complex dataclasses. .. code-block:: python :caption: :file:`adapter/main/actions.py` from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action @ability_name('incubate') class Incubator(ActionModule): @action('Incubate') async def incubate(self, actx: ActionExecutionContext, temperature: float, duration: int) -> None: # Tell hardware to incubate at the given temperature for the given duration pass Here, the Incubate Action takes two parameters: temperature and duration. When the Action is called from a Workflow, the Workflow must provide values for these parameters: .. code-block:: python :caption: :file:`workflow/run_experiment_wf.py` from artificial.workflows.decorators import workflow from stubs.stubs_actions import incubate @workflow('Run Experiment', 'run_experiment_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201') async def run_experiment_wf() -> None: await incubate(temperature=37.2, duration=30) Action parameters get captured as part of the Job Record and show up in the job's timeline in LabOps: .. video:: _static/action_parameters.mp4 :autoplay: :muted: :loop: Return Values ============= Actions can also return values which are sent back to the Workflow. Return values can be of any json-serializable type, including tuples and complex dataclasses. .. code-block:: python :caption: :file:`adapter/main/actions.py` from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action @ability_name('incubate') class Incubator(ActionModule): @action('Incubate') async def incubate(self, actx: ActionExecutionContext, temperature: float, duration: int) -> bool: # Tell hardware to incubate at the given temperature for the given duration return True Here, the Incubate Action returns a boolean value. .. code-block:: python :caption: :file:`workflow/run_experiment_wf.py` @workflow('Run Experiment', 'run_experiment_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201') async def run_experiment_wf() -> None: was_successful = await incubate(temperature=37.2, duration=30) Return values are captured as part of the Job Record and show up in the job's timeline in LabOps: .. video:: _static/action_output_parameters.mp4 :autoplay: :muted: :loop: Parameters with Complex Types ============================= Parameters can be of any type, including complex dataclasses. .. code-block:: python :caption: :file:`adapter/main/actions.py` from dataclasses import dataclass from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action @dataclass class IncubationParameters: temperature: float duration: int @dataclass class IncubationResults: was_successful: bool @ability_name('incubate') class Incubator(ActionModule): @action('Incubate') async def incubate(self, actx: ActionExecutionContext, params: IncubationParameters) -> IncubationResults: # Tell hardware to incubate at params.temperature for params.duration return IncubationResults(was_successful=True) Dataclasses that are used as Action Parameter Types will also be generated in the :ref:`Action Stubs`. .. code-block:: python :caption: :file:`workflow/stubs/stubs_actions.py` ... @dataclass class IncubationParameters: temperature: float duration: int @dataclass class IncubationResults: was_successful: bool ... Dataclass stubs can then be used in the Workflow to define the parameter values. .. code-block:: python :caption: :file:`workflow/run_experiment_wf.py` from stubs.stubs_actions import IncubationParameters, incubate @workflow('Run Experiment', 'run_experiment_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201') async def run_experiment_wf() -> None: results = await incubate(IncubationParameters(temperature=37.2, duration=30)) # assert results.was_successful == True Complex parameters are captured as part of the Job Record and show up in the job's timeline in LabOps: .. video:: _static/action_complex_params.mp4 :autoplay: :muted: :loop: Cancelable Actions ------------------ .. note:: Jobs can always be canceled in between actions, or during Assistants. This technique is specifically for when you want to be able to cancel an Action while it's still in progress. For long-running Actions, it can be useful to allow an operator to cancel the Action mid-execution. To support this, use :meth:`~artificial.ActionExecutionContext.set_abort_handler`. This method takes a callback function that will be called if the Action is canceled. To mark the Action as canceled, raise an :exc:`~artificial.ActionCanceledException` from the main Action body (not the abort handler). .. code-block:: python :caption: :file:`adapter/main/actions.py` from artificial.adapter_common import ActionCanceledException, ActionExecutionContext, ActionModule, ability_name, action @ability_name('incubate') class Incubator(ActionModule): @action('Cancelable Incubation') async def cancelable_incubate(self, actx: ActionExecutionContext, duration: int) -> None: # Track action status ('running' or 'canceled') status = 'running' # Track how much time we've spent 'running' elapsed_time = 0 # Create a local function that will be called when an operator cancelsv the Job while this Action is in progress. async def handle_abort() -> None: # Use the 'nonlocal' keyword to be able to modify the 'status' variable from the outer scope # See https://www.w3schools.com/python/ref_keyword_nonlocal.asp nonlocal status status = 'canceled' # Register the handle_abort function as the abort handler actx.set_abort_handler(handle_abort) while True: if status is 'canceled': # TODO - Clean up any running processes or resources that need to be cleaned up # Mark the action as canceled raise ActionCanceledException else: # Wait until enough 'running' time has elapsed, then break out of the loop await asyncio.sleep(1) elapsed_time += 1 if elapsed_time > duration: break Canceling this Action mid-execution will yield the following Job timeline: .. image:: _static/action_canceled.png :width: 400px Pausable Actions ---------------- .. note:: Jobs can always be paused/resumed in between actions, or during Assistants. This technique is specifically for when you want to be able to pause/resume an Action while it's still in progress. For long-running Actions, it can be useful to allow an operator to pause and resume the Action mid-execution. To support this, use :meth:`~artificial.ActionExecutionContext.set_pause_handler`. .. code-block:: python :caption: :file:`adapter/main/actions.py` from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action @ability_name('incubate') class Incubator(ActionModule): @action('Pausable Incubate') async def pausable_incubate(self, actx: ActionExecutionContext, duration: int) -> None: # Track action status ('running' or 'paused') status = 'running' # Track how much time we've spent 'running' elapsed_time = 0 # Create a local function that will be called when an operator pauses or resumes the Job while this Action is running. async def handle_pause(should_pause: bool) -> None: # Use the 'nonlocal' keyword to be able to modify the 'status' variable from the outer scope # See https://www.w3schools.com/python/ref_keyword_nonlocal.asp nonlocal status if should_pause: status = 'paused' # Report that the action has been paused await actx.report_paused(True) else: status = 'running' # Report that the action has been resumed await actx.report_paused(False) # Register the handle_pause function as the pause handler actx.set_pause_handler(handle_pause) while True: await asyncio.sleep(1) if status == 'running': # Wait until enough 'running' time has elapsed, then break out of the loop elapsed_time += 1 if elapsed_time > duration: break .. warning:: Pausable Actions should be able to handle Adapter/IPC restarts while paused. See :ref:`Persisting Action Status` for more information. Persisting Action Status ------------------------ Periodically, the IPC or Adapter may restart (e.g. if the IPC is rebooted or the Adapter is updated). When the Adapter starts up again, it will call any in-progress Actions again from the top with the same parameters, but any local variables from the previous execution context will be lost. This many be undesirable if some progress has already been made in running the Action, or if the Action had been paused or canceled before the restart. To persist the state of an Action across restarts, use :meth:`~artificial.adapter_common.ActionExecutionContext.persist_restore_data` and :meth:`~artificial.adapter_common.ActionExecutionContext.get_restore_data`. Calling :meth:`~artificial.adapter_common.ActionExecutionContext.persist_restore_data` will also automatically persist the Action's status (paused or canceled) across restarts as :attr:`~artificial.adapter_common.ActionExecutionContext.paused` and :attr:`~artificial.adapter_common.ActionExecutionContext.canceled`. .. note:: As of ``artificial.adapter_common`` version ``0.18.5``, you can call :meth:`~artificial.adapter_common.ActionExecutionContext.restorable` to persist Action status in :attr:`~artificial.adapter_common.ActionExecutionContext.paused` and :attr:`~artificial.adapter_common.ActionExecutionContext.canceled`. .. code-block:: python :caption: :file:`adapter/main/actions.py` from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action @ability_name('incubate') class Incubator(ActionModule): @action('Persistent Incubation') async def persistent_incubate(self, actx: ActionExecutionContext, duration: int) -> None: # Restore action status if actx.canceled: status = 'canceled' elif actx.paused: status = 'paused' else: status = 'running' # Restore elapsed time (or start from 0 if this is the first run) elapsed_time = int(actx.get_restore_data() or '0') # Save elapsed_time in persistent storage and mark action as restorable await actx.persist_restore_data(str(elapsed_time)) async def handle_abort() -> None: nonlocal status status = 'canceled' async def handle_pause(should_pause: bool) -> None: nonlocal status if should_pause: status = 'paused' await actx.report_paused(True) else: status = 'running' await actx.report_paused(False) actx.set_abort_handler(handle_abort) actx.set_pause_handler(handle_pause) while True: if status == 'canceled': raise ActionCanceledException await asyncio.sleep(1) if status == 'running': elapsed_time += 1 # Save elapsed_time in persistent storage every time it's updated await actx.persist_restore_data(str(elapsed_time)) if elapsed_time > duration: break