Skip to main content
After importing an agentic workflow tool, you can test it using a main Python script before add it to an agent. The following steps guide you through testing a flow, including defining, compiling, and running the flow:
1

Create a test script

Agentic workflow are asynchronous, and so are the functions used to run them.Start by creating a script that defines a function to invoke your agentic workflow. In the example below, the script hello_message_flow.py contains a function build_hello_message_flow, decorated with @flow, which returns the Flow object you want to test.
Python
import asyncio

from .hello_message_flow import build_hello_message_flow

async def main():

    my_flow_definition = build_hello_message_flow()

if __name__ == "__main__":
    asyncio.run(main())  
2

Compile the agentic workflow

Next, compile the agentic workflow. This step generates the agentic workflow model and deploys it to the engine. It returns a CompiledFlow instance, which you can use to start an agentic workflow run. Use compile_deploy() to compile and deploy the agentic workflow.
Python
compiled_flow = await my_flow_definition.compile_deploy()
If you only want to generate the JSON model without deploying it, use compile() instead.
3

Start a run

Agentic workflows are asynchronous, and the engine communicates with your client using events. You can start a run using methods on the CompiledFlow object. Depending on your needs, you can choose from the following approaches:
The simplest way to run an agentic workflows is to invoke it and wait for the result. Use CompiledFlow.invoke() to start the agentic workflow. You can optionally pass input data.
Python
flow_run = await compiled_flow.invoke({"first_name": "John", "last_name": "Doe"})
To monitor the agentic workflow and retrieve the output:
Python
from ibm_watsonx_orchestrate.experimental.flow_builder.flows.flow import FlowRunStatus

# Wait for the flow to complete
while flow_run.status != FlowRunStatus.COMPLETED and flow_run.status != FlowRunStatus.FAILED:
    await asyncio.sleep(1)  # sleep 1 second between checks
    
print(f"Flow ended with status: {flow_run.status}")
print(f"Flow output: {flow_run.output}" if flow_run.status == FlowRunStatus.COMPLETED else f"Flow error: {flow_run.error}")
Note: Always use asynchronous waits to avoid blocking the event loop.
You can provide custom event handlers to invoke() for more control over agentic workflow completion and error handling.
Python
flow_run = None

def on_flow_end(result):
    """
    Callback function to be called when the agentic workflow is completed.
    """
    print(f"Custom Handler: flow `{flow_run.name}` completed with result: {result}")

def on_flow_error(error):
    """
    Callback function to be called when the agentic workflow fails.
    """
    print(f"Custom Handler: flow `{flow_run.name}` failed: {error}")

async def main():

    my_flow_definition = build_hello_message_flow()
    compiled_flow = await my_flow_definition.compile_deploy()
    
    global flow_run
    flow_run = await compiled_flow.invoke({"first_name": "John", "last_name": "Doe"}, on_flow_end_handler=on_flow_end, on_flow_error_handler=on_flow_error)
To handle all emitted events, use invoke_events(), which returns an asynchronous iterator yielding (FlowEvent, FlowRun) tuples.
Python
from ibm_watsonx_orchestrate.experimental.flow_builder.types import FlowEventType

async for event, run in compiled_flow.invoke_events({"first_name": "John", "last_name": "Doe"}):
    if not event:
        continue
    
    if event.kind == FlowEventType.ON_FLOW_END:
        on_flow_end(event.context.data.output)
        print(f"Flow `{run.name}` completed with result: {run.output}")
        break

    elif event.kind == FlowEventType.ON_FLOW_ERROR:
        print(f"Flow `{run.name}` failed with error: {run.error}")
        break 
You can listen for task and agentic workflow events, as defined by FlowEventType and TaskEventType in ibm_watsonx_orchestrate.experimental.flow_builder.types.