Writing Adapter Actions#

Adapter Actions are the fundamental building blocks for interacting with software and hardware systems in your lab. Actions are written as python methods on an ActionModule and must be decorated with an @action decorator. Actions are executed by the Executor when a user runs a Workflow that calls the Action.

from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action

@ability_name('incubate')
class Incubator(ActionModule):

    @action('Incubate')
    async def incubate(self, actx: ActionExecutionContext) -> None:
        # Tell hardware to incubate
        pass

A Workflow that calls this Incubate Action will include an entry like this in its timeline:

_images/bare_incubate_action.png

Note

Actions must be associated with an Ability to be run from a Workflow. See Defining Actors and Abilities for more information.

Action Stubs#

Since Actions exist in the Adapter (which is run on the IPC in your lab) and the Workflow runs in the Artificial Cloud, the Workflow needs a way to know what Actions are available in the Adapter. This is done via Action Stubs. An Action Stub is a simple python function that is generated by the Artificial Workflow Development VSCode Extension and captures relevant information about the Action’s name, parameters, associated ability, and location within the Adapter—everything that the Executor needs to make sure the right Action gets called at runtime. If an Action takes or returns dataclasses, those dataclasses are also generated as stubs.

Actions Stubs are generated in the workflow/stubs/stubs_actions.py file in your repository. Feel free to take a look at them, but don’t edit the file directly! Any changes will be overwritten the next time you generate the Stubs.

To generate Action Stubs, click the Generate Action Stubs button in the Artificial Workflow Authoring Extension, or run the Generate Action Stubs command from the Command Palette in VSCode.

_images/generate_action_stubs.png

Writing an Adapter Action is done in three parts:

  1. Write the ActionModule and Action

  2. Register the ActionModule with the Adapter Plugin

  3. Configure the Actor and Ability for the ActionModule

Action Parameters#

Actions can take parameters, which are passed in from the Workflow. Parameters are defined as arguments to the Action method. Parameters can be of any json-serializable type, including complex dataclasses.

adapter/main/actions.py#
from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action

@ability_name('incubate')
class Incubator(ActionModule):

    @action('Incubate')
    async def incubate(self, actx: ActionExecutionContext, temperature: float, duration: int) -> None:
        # Tell hardware to incubate at the given temperature for the given duration
        pass

Here, the Incubate Action takes two parameters: temperature and duration. When the Action is called from a Workflow, the Workflow must provide values for these parameters:

workflow/run_experiment_wf.py#
from artificial.workflows.decorators import workflow
from stubs.stubs_actions import incubate

@workflow('Run Experiment', 'run_experiment_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201')
async def run_experiment_wf() -> None:
    await incubate(temperature=37.2, duration=30)

Action parameters get captured as part of the Job Record and show up in the job’s timeline in LabOps:

Return Values#

Actions can also return values which are sent back to the Workflow. Return values can be of any json-serializable type, including tuples and complex dataclasses.

adapter/main/actions.py#
from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action

@ability_name('incubate')
class Incubator(ActionModule):

    @action('Incubate')
    async def incubate(self, actx: ActionExecutionContext, temperature: float, duration: int) -> bool:
        # Tell hardware to incubate at the given temperature for the given duration
        return True

Here, the Incubate Action returns a boolean value.

workflow/run_experiment_wf.py#
@workflow('Run Experiment', 'run_experiment_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201')
async def run_experiment_wf() -> None:
    was_successful = await incubate(temperature=37.2, duration=30)

Return values are captured as part of the Job Record and show up in the job’s timeline in LabOps:

Parameters with Complex Types#

Parameters can be of any type, including complex dataclasses.

adapter/main/actions.py#
from dataclasses import dataclass
from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action

@dataclass
class IncubationParameters:
    temperature: float
    duration: int

@dataclass
class IncubationResults:
    was_successful: bool

@ability_name('incubate')
class Incubator(ActionModule):
    @action('Incubate')
    async def incubate(self, actx: ActionExecutionContext, params: IncubationParameters) -> IncubationResults:
        # Tell hardware to incubate at params.temperature for params.duration
        return IncubationResults(was_successful=True)

Dataclasses that are used as Action Parameter Types will also be generated in the Action Stubs.

workflow/stubs/stubs_actions.py#
...
@dataclass
class IncubationParameters:
    temperature: float
    duration: int


@dataclass
class IncubationResults:
    was_successful: bool
...

Dataclass stubs can then be used in the Workflow to define the parameter values.

workflow/run_experiment_wf.py#
from stubs.stubs_actions import IncubationParameters, incubate

@workflow('Run Experiment', 'run_experiment_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201')
async def run_experiment_wf() -> None:
    results = await incubate(IncubationParameters(temperature=37.2, duration=30))
    # assert results.was_successful == True

Complex parameters are captured as part of the Job Record and show up in the job’s timeline in LabOps:

Cancelable Actions#

Note

Jobs can always be canceled in between actions, or during Assistants. This technique is specifically for when you want to be able to cancel an Action while it’s still in progress.

For long-running Actions, it can be useful to allow an operator to cancel the Action mid-execution. To support this, use set_abort_handler(). This method takes a callback function that will be called if the Action is canceled. To mark the Action as canceled, raise an ActionCanceledException from the main Action body (not the abort handler).

adapter/main/actions.py#
from artificial.adapter_common import ActionCanceledException, ActionExecutionContext, ActionModule, ability_name, action

@ability_name('incubate')
class Incubator(ActionModule):
    @action('Cancelable Incubation')
    async def cancelable_incubate(self, actx: ActionExecutionContext, duration: int) -> None:
        # Track action status ('running' or 'canceled')
        status = 'running'

        # Track how much time we've spent 'running'
        elapsed_time = 0

        # Create a local function that will be called when an operator cancelsv the Job while this Action is in progress.
        async def handle_abort() -> None:
            # Use the 'nonlocal' keyword to be able to modify the 'status' variable from the outer scope
            # See https://www.w3schools.com/python/ref_keyword_nonlocal.asp
            nonlocal status
            status = 'canceled'

        # Register the handle_abort function as the abort handler
        actx.set_abort_handler(handle_abort)

        while True:
            if status is 'canceled':
                # TODO - Clean up any running processes or resources that need to be cleaned up
                # Mark the action as canceled
                raise ActionCanceledException
            else:
                # Wait until enough 'running' time has elapsed, then break out of the loop
                await asyncio.sleep(1)
                elapsed_time += 1
                if elapsed_time > duration:
                    break

Canceling this Action mid-execution will yield the following Job timeline:

_images/action_canceled.png

Pausable Actions#

Note

Jobs can always be paused/resumed in between actions, or during Assistants. This technique is specifically for when you want to be able to pause/resume an Action while it’s still in progress.

For long-running Actions, it can be useful to allow an operator to pause and resume the Action mid-execution. To support this, use set_pause_handler().

adapter/main/actions.py#
from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action

@ability_name('incubate')
class Incubator(ActionModule):
    @action('Pausable Incubate')
    async def pausable_incubate(self, actx: ActionExecutionContext, duration: int) -> None:
        # Track action status ('running' or 'paused')
        status = 'running'

        # Track how much time we've spent 'running'
        elapsed_time = 0

        # Create a local function that will be called when an operator pauses or resumes the Job while this Action is running.
        async def handle_pause(should_pause: bool) -> None:
            # Use the 'nonlocal' keyword to be able to modify the 'status' variable from the outer scope
            # See https://www.w3schools.com/python/ref_keyword_nonlocal.asp
            nonlocal status

            if should_pause:
                status = 'paused'
                # Report that the action has been paused
                await actx.report_paused(True)
            else:
                status = 'running'
                # Report that the action has been resumed
                await actx.report_paused(False)

        # Register the handle_pause function as the pause handler
        actx.set_pause_handler(handle_pause)

        while True:
            await asyncio.sleep(1)
            if status == 'running':
                # Wait until enough 'running' time has elapsed, then break out of the loop
                elapsed_time += 1
                if elapsed_time > duration:
                    break

Warning

Pausable Actions should be able to handle Adapter/IPC restarts while paused. See Persisting Action Status for more information.

Persisting Action Status#

Periodically, the IPC or Adapter may restart (e.g. if the IPC is rebooted or the Adapter is updated). When the Adapter starts up again, it will call any in-progress Actions again from the top with the same parameters, but any local variables from the previous execution context will be lost. This many be undesirable if some progress has already been made in running the Action, or if the Action had been paused or canceled before the restart.

To persist the state of an Action across restarts, use persist_restore_data() and get_restore_data(). Calling persist_restore_data() will also automatically persist the Action’s status (paused or canceled) across restarts as paused and canceled.

Note

As of artificial.adapter_common version 0.18.5, you can call restorable() to persist Action status in paused and canceled.

adapter/main/actions.py#
from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action

@ability_name('incubate')
class Incubator(ActionModule):
    @action('Persistent Incubation')
    async def persistent_incubate(self, actx: ActionExecutionContext, duration: int) -> None:
        # Restore action status
        if actx.canceled:
            status = 'canceled'
        elif actx.paused:
            status = 'paused'
        else:
            status = 'running'

        # Restore elapsed time (or start from 0 if this is the first run)
        elapsed_time = int(actx.get_restore_data() or '0')
        # Save elapsed_time in persistent storage and mark action as restorable
        await actx.persist_restore_data(str(elapsed_time))

        async def handle_abort() -> None:
            nonlocal status
            status = 'canceled'

        async def handle_pause(should_pause: bool) -> None:
            nonlocal status

            if should_pause:
                status = 'paused'
                await actx.report_paused(True)
            else:
                status = 'running'
                await actx.report_paused(False)

        actx.set_abort_handler(handle_abort)
        actx.set_pause_handler(handle_pause)

        while True:
            if status == 'canceled':
                raise ActionCanceledException

            await asyncio.sleep(1)
            if status == 'running':
                elapsed_time += 1
                # Save elapsed_time in persistent storage every time it's updated
                await actx.persist_restore_data(str(elapsed_time))
                if elapsed_time > duration:
                    break