Defining Actors and Abilities#
Actors and abilities are an abstraction layer between the Action stub calls in the Workflow and the Action implementation in the ActionModule in the Adapter. Conceptually, Actors represent the various systems that can perform useful work in the lab, and Abilities represent the various types of work that can be performed. Abilities are defined with a capacity, which is the number of times that ability can be performed concurrently. This is useful for defining the maximum number of samples that can be processed at once, for example.
Actors and Abilities are defined within configs/[environment]/config.yaml
in the adapter
section.
...
adapter:
name: MyAdapter
actors:
- id: 'liquid_handler'
abilities:
run_protocol: 1
- id: 's3_uploader'
abilities:
upload_file: 3
Abilities and Actions#
Actions called from a workflow must be associated with an Ability to help the Scheduler and Executor know which Adapter can handle running the Action.
There are two ways to map an Action to an Ability:
Manually specify the Ability when calling the Action in the Workflow using the
action-ability-name
annotation.from artificial.workflows.decorators import workflow from stubs.stubs_actions import run_protocol @workflow('Run Protocol', 'run_protocol_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201') async def run_protocol_wf() -> None: '''action-ability-name: run_protocol''' await run_protocol()
Note
The Ability name is not required to match the action name, it just happens to in this case.
Add the
@ability_name
decorator to the ActionModule class. This will automatically map all calls to all Actions in the ActionModule to the given Ability.import asyncio from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action # Mark all calls to all Actions in this ActionModule with the 'run_protocol' Ability @ability_name('run_protocol') class LiquidHandler(ActionModule): @action('Run Protocol') async def run_protocol(self, actx: ActionExecutionContext): await asyncio.sleep(30)
(as of
adapter_common
v0.18.4) Pass anability_name
into the@action
decorator.import asyncio from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action class LiquidHandler(ActionModule): @action('Run Protocol', ability_name='run_protocol') async def run_protocol(self, actx: ActionExecutionContext): await asyncio.sleep(30)
Note
This is useful when you want to specify the Ability name for a single Action, but not for all Actions in the ActionModule.
Capacity and Concurrent Execution#
The capacity of an Ability is the number of times that Ability can be run concurrently. This is useful for limiting concurrent execution on equipment than can only run one method at a time, for example. If an Ability has a capacity of 1, then the Adapter can only perform Actions associated with that Ability one-at-a-time. If the Executor receives a request to run an Action associated with that Abiltiy a second time, it will wait until the first run has completed.
Example: Single Concurrency#
With this Actor configuration, ActionModule, and Workflow…
...
adapter:
name: MyAdapter
actors:
- id: 'liquid_handler'
abilities:
run_protocol: 1
import asyncio
from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action
@ability_name('run_protocol')
class LiquidHandler(ActionModule):
@action('Run Protocol')
async def run_protocol(self, actx: ActionExecutionContext):
await asyncio.sleep(30)
from artificial.workflows.decorators import workflow
from stubs.stubs_actions import run_protocol
@workflow('Run Protocol', 'run_protocol_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201')
async def run_protocol_wf() -> None:
await run_protocol()
…running the Workflow multiple times in a row yields the following behavior:
Note that the Executor waits for the first run of the Workflow to complete before starting the second run of the Workflow.
Example: Multi-Concurrency#
With this Actor configuration, ActionModule, and Workflow…
...
adapter:
name: MyAdapter
actors:
- id: 's3_uploader'
abilities:
upload_file: 3
import asyncio
from artificial.adapter_common import ActionExecutionContext, ActionModule, ability_name, action
@ability_name('upload_file')
class S3Uploader(ActionModule):
@action('Upload File')
async def upload_file(self, actx: ActionExecutionContext):
await asyncio.sleep(30)
from artificial.workflows.decorators import workflow
from stubs.stubs_actions import upload_file
@workflow('Upload File', 'upload_file_wf', 'lab_0c47f8f2-4a3d-452f-b566-9fffb3b68201')
async def upload_file_wf() -> None:
await upload_file()
…running the Workflow multiple times in a row yields the following behavior:
Note that the Executor ensures that no more than three Workflow runs are running concurrently.