Skip to main content
After importing a agentic workflow tool, you can test it using a main Python script before add it to an agent. The following steps guide you through testing a flow, including defining, compiling, and running the flow:
1

Create a test script

Agentic workflow are asynchronous, and so are the functions used to run them.Start by creating a script that defines a function to invoke your agentic workflow. In the example below, the script hello_message_flow.py contains a function build_hello_message_flow, decorated with @flow, which returns the Flow object you want to test.
Python
import asyncio

from .hello_message_flow import build_hello_message_flow

async def main():

    my_flow_definition = build_hello_message_flow()

if __name__ == "__main__":
    asyncio.run(main())  
2

Compile the agentic workflow

Next, compile the agentic workflow. This step generates the agentic workflow model and deploys it to the engine. It returns a CompiledFlow instance, which you can use to start a agentic workflow run. Use compile_deploy() to compile and deploy the agentic workflow.
Python
compiled_flow = await my_flow_definition.compile_deploy()
If you only want to generate the JSON model without deploying it, use compile() instead.
3

Start a run

Agentic workflows are asynchronous, and the engine communicates with your client using events. You can start a run using methods on the CompiledFlow object. Depending on your needs, you can choose from the following approaches:
The simplest way to run a agentic workflows is to invoke it and wait for the result. Use CompiledFlow.invoke() to start the agentic workflow. You can optionally pass input data.
Python
flow_run = await compiled_flow.invoke({"first_name": "John", "last_name": "Doe"})
To monitor the agentic workflow and retrieve the output:
Python
from ibm_watsonx_orchestrate.experimental.flow_builder.flows.flow import FlowRunStatus

# Wait for the flow to complete
while flow_run.status != FlowRunStatus.COMPLETED and flow_run.status != FlowRunStatus.FAILED:
    await asyncio.sleep(1)  # sleep 1 second between checks
    
print(f"Flow ended with status: {flow_run.status}")
print(f"Flow output: {flow_run.output}" if flow_run.status == FlowRunStatus.COMPLETED else f"Flow error: {flow_run.error}")
Note: Always use asynchronous waits to avoid blocking the event loop.
You can provide custom event handlers to invoke() for more control over agentic workflow completion and error handling.
Python
flow_run = None

def on_flow_end(result):
    """
    Callback function to be called when the agentic workflow is completed.
    """
    print(f"Custom Handler: flow `{flow_run.name}` completed with result: {result}")

def on_flow_error(error):
    """
    Callback function to be called when the agentic workflow fails.
    """
    print(f"Custom Handler: flow `{flow_run.name}` failed: {error}")

async def main():

    my_flow_definition = build_hello_message_flow()
    compiled_flow = await my_flow_definition.compile_deploy()
    
    global flow_run
    flow_run = await compiled_flow.invoke({"first_name": "John", "last_name": "Doe"}, on_flow_end_handler=on_flow_end, on_flow_error_handler=on_flow_error)
To handle all emitted events, use invoke_events(), which returns an asynchronous iterator yielding (FlowEvent, FlowRun) tuples.
Python
from ibm_watsonx_orchestrate.experimental.flow_builder.types import FlowEventType

async for event, run in compiled_flow.invoke_events({"first_name": "John", "last_name": "Doe"}):
    if not event:
        continue
    
    if event.kind == FlowEventType.ON_FLOW_END:
        on_flow_end(event.context.data.output)
        print(f"Flow `{run.name}` completed with result: {run.output}")
        break

    elif event.kind == FlowEventType.ON_FLOW_ERROR:
        print(f"Flow `{run.name}` failed with error: {run.error}")
        break 
You can listen for task and agentic workflow events, as defined by FlowEventType and TaskEventType in ibm_watsonx_orchestrate.experimental.flow_builder.types.