Use a parallel branch node to run multiple branches of your agentic workflow at the same time. When conditions match, the node runs every matching path so you can complete independent work in parallel.
Parallel branch nodes differ from conditions branch nodes in these ways:
Feature Conditions branch node (conditions()) Parallel branch node (parallel_conditions()) Processing First matching condition only ALL matching conditions Semantics Sequential evaluation such as if-else logic Concurrent evaluation Use case Exclusive paths Concurrent execution Merging Single path continues Waits for all paths to complete
Configuring unconditional parallel branch
To configure an unconditional parallel branch node, call the parallel() method in your agentic workflow. Use this node when you want every branch to run every time. Define the following parameters in this method:
Use unconditional parallel when you need to:
Run multiple teams at the same time
Send work to multiple services at the same time
Process data through multiple pipelines at the same time
Set to None for unconditional execution. All branches run.
Name for the parallel node. watsonx Orchestrate generates this value if you do not provide it.
Display name for the parallel node.
from ibm_watsonx_orchestrate.flow_builder.flows import flow, Flow, START , END
@flow ( name = "multi_squad_development" )
def build_flow ( aflow : Flow) -> Flow:
# Create unconditional parallel agentic workflow
parallel_dev = aflow.parallel(
evaluator = None , # No evaluator = all paths run
name = "parallel_development" ,
display_name = "Development Phase"
)
# Define work for each squad (all run in parallel)
squad1 = parallel_dev.script(
name = "squad1_work" ,
script = "print('Squad 1 working...')"
)
squad2 = parallel_dev.script(
name = "squad2_work" ,
script = "print('Squad 2 working...')"
)
squad3 = parallel_dev.script(
name = "squad3_work" ,
script = "print('Squad 3 working...')"
)
# Connect each squad from START to END within parallel agentic workflow
parallel_dev.sequence( START , squad1, END )
parallel_dev.sequence( START , squad2, END )
parallel_dev.sequence( START , squad3, END )
# Continue after all squads complete
completion = aflow.script(
name = "all_complete" ,
script = "print('All squads completed!')"
)
aflow.sequence( START , parallel_dev, completion, END )
return aflow
See all 40 lines
Configuring conditional parallel branch
To configure a conditional parallel node, call the parallel_conditions() method in your agentic workflow. Use this node when you want to run every branch whose condition matches. Define the following parameters in this method:
Use conditional parallel when you need to:
Process work based on multiple criteria
Route work to multiple handlers based on attributes
Fan out work only when conditions match
Name for the parallel node. watsonx Orchestrate generates this value if you leave it empty.
Display name for the parallel node. The default value is the node name.
Adding conditions to parallel branch
To add conditions to the parallel branch node, call the condition() method. The node runs all matching conditions in parallel. Define the following parameters in this method:
Node to run when the condition matches.
Python expression to evaluate. Required unless default=True.
When set to True, this parameter defines the default case.
from pydantic import BaseModel
from ibm_watsonx_orchestrate.flow_builder.flows import flow, Flow, START , END
class TaskInput ( BaseModel ):
priority: str
category: str
value: float
@flow (
name = "conditional_parallel_processing" ,
input_schema = TaskInput
)
def build_flow ( aflow : Flow) -> Flow:
# Create conditional parallel agentic workflow
parallel_processing = aflow.parallel_conditions(
name = "task_processing" ,
display_name = "Task Processing"
)
# Define handlers within the parallel subflow
high_priority = parallel_processing.script(
name = "high_priority_handler" ,
script = "print('Processing high priority task')"
)
billing = parallel_processing.script(
name = "billing_handler" ,
script = "print('Processing billing task')"
)
large_value = parallel_processing.script(
name = "large_value_handler" ,
script = "print('Processing large value task')"
)
default_handler = parallel_processing.script(
name = "default_handler" ,
script = "print('Default processing')"
)
# Add conditions - all matching conditions run in parallel
parallel_processing.condition(
expression = "flow.input.priority == 'high'" ,
to_node = high_priority
).condition(
expression = "flow.input.category == 'billing'" ,
to_node = billing
).condition(
expression = "flow.input.value > 1000" ,
to_node = large_value
).condition(
default = True , # Default case (like 'else')
to_node = default_handler
)
# Connect handlers to END within parallel agentic workflow
parallel_processing.sequence(high_priority, END )
parallel_processing.sequence(billing, END )
parallel_processing.sequence(large_value, END )
parallel_processing.sequence(default_handler, END )
# Continue after all matching paths complete
aflow.sequence( START , parallel_processing, END )
return aflow
See all 64 lines
Expression syntax
Conditions in parallel agentic workflows use Python expressions that can access:
flow.input.* - Input data
flow.private.* - Private state
flow.output.* - Output data from upstream nodes
The following examples show common expression patterns you can use:
# Simple comparison
expression = "flow.input.priority == 'high'"
# Multiple conditions with 'and'
expression = "flow.input.priority == 'high' and flow.input.value > 1000"
# Multiple conditions with 'or'
expression = "flow.input.category == 'billing' or flow.input.category == 'finance'"
# Checking boolean flags
expression = "flow.private.needs_approval is True"
# Numeric comparisons
expression = "flow.input.amount > 5000"
# String operations
expression = "'urgent' in flow.input.tags"
Common patterns
The following examples show common ways to use parallel nodes in your agentic workflows:
Use this pattern to process data through multiple independent handlers: parallel = flow.parallel( evaluator = None , name = "fan_out" )
# Create multiple processors
processor1 = parallel.tool( name = "process_type_a" , tool = process_a)
processor2 = parallel.tool( name = "process_type_b" , tool = process_b)
processor3 = parallel.tool( name = "process_type_c" , tool = process_c)
# All processors run in parallel
parallel.sequence( START , processor1, END )
parallel.sequence( START , processor2, END )
parallel.sequence( START , processor3, END )
Use this pattern to route work to multiple handlers based on attributes: parallel = flow.parallel_conditions( name = "multi_handler" )
handler1 = parallel.tool( name = "handler1" , tool = handle_priority)
handler2 = parallel.tool( name = "handler2" , tool = handle_category)
handler3 = parallel.tool( name = "handler3" , tool = handle_value)
parallel.condition(
expression = "flow.input.is_priority" ,
to_node = handler1
).condition(
expression = "flow.input.category == 'special'" ,
to_node = handler2
).condition(
expression = "flow.input.value > 1000" ,
to_node = handler3
)
Use this pattern to run parallel tasks and aggregate results: # Parallel processing
parallel = flow.parallel( evaluator = None , name = "parallel_tasks" )
task1 = parallel.script( name = "task1" , script = "..." )
task2 = parallel.script( name = "task2" , script = "..." )
task3 = parallel.script( name = "task3" , script = "..." )
parallel.sequence( START , task1, END )
parallel.sequence( START , task2, END )
parallel.sequence( START , task3, END )
# Aggregate results after all tasks complete
aggregate = flow.script(
name = "aggregate_results" ,
script = """
# All parallel tasks have completed
# Aggregate their results here
flow.output.combined_results = [...]
"""
)
flow.sequence( START , parallel, aggregate, END )
See all 22 lines
Best practices
Follow these best practices when you work with parallel nodes:
Choose the Right Node Type
Use parallel() when every branch must run
Use parallel_conditions() when branch execution depends on conditions
Use conditions() (branch) when only one path must run. For more information, see the Conditions branch node
Important: Do not create parallel paths that loop back to previous nodes. This pattern creates unlimited parallel threads and can cause runtime issues.
The following example shows an incorrect pattern: # DON'T: This creates infinite parallel threads
parallel_flow = flow.parallel_conditions()
node1 = parallel_flow.script( name = "node1" , script = "..." )
node2 = parallel_flow.script( name = "node2" , script = "..." )
parallel_flow.condition( expression = "True" , to_node = node1)
parallel_flow.edge(node1, node2)
parallel_flow.edge(node2, node1) # ❌ Loop back - creates infinite threads!
Parallel agentic workflows merge at the END node inside the parallel subagentic workflow. The parent agentic workflow continues only after every parallel branch completes. parallel_agentic_workflow = flow.parallel_conditions( name = "processing" )
# Define nodes
node1 = parallel_agentic_workflow.script( name = "task1" , script = "..." )
node2 = parallel_agentic_workflow.script( name = "task2" , script = "..." )
# Add conditions
parallel_agentic_workflow.condition( expression = "..." , to_node = node1)
parallel_agentic_workflow.condition( expression = "..." , to_node = node2)
# Connect to END - automatic merge point
parallel_agentic_workflow.sequence(node1, END )
parallel_agentic_workflow.sequence(node2, END )
# This node runs only after ALL parallel paths complete
next_node = flow.script( name = "after_parallel" , script = "..." )
flow.edge(parallel_agentic_workflow, next_node)
See all 17 lines
Handle Errors in Parallel Branches
Each parallel branch can fail independently. Add error handling so you can manage failures: parallel_agentic_workflow = flow.parallel_conditions( name = "processing" )
task1 = parallel_agentic_workflow.script(
name = "task1" ,
script = "..." ,
error_handler = NodeErrorHandlerConfig(
on_error = "continue" , # Continue even if this branch fails
max_retries = 3
)
)
Example: Feature delivery agentic workflow
This example shows how you can use both conditional and unconditional parallel processing to complete feature delivery work.
from typing import List
from pydantic import BaseModel, Field
from ibm_watsonx_orchestrate.flow_builder.flows import (
Flow, flow, ScriptNode, START , END
)
class FlowInput ( BaseModel ):
feature_name: str = Field( description = "Name of the feature to deliver" )
design_needed: bool = Field( default = True , description = "Whether design work is needed" )
arch_needed: bool = Field( default = True , description = "Whether architecture work is needed" )
class FlowOutput ( BaseModel ):
status: str = Field( description = "Final status of the feature delivery" )
phases_completed: List[ str ] = Field( description = "List of completed phases" )
class PrivateState ( BaseModel ):
design_needed: bool = Field( default = False )
arch_needed: bool = Field( default = False )
phases_completed: List[ str ] = Field( default_factory = list )
@flow (
name = "feature_delivery_workflow" ,
display_name = "Feature Delivery Agentic Workflow" ,
description = "An agentic workflow demonstrating parallel execution in multiple phases" ,
input_schema = FlowInput,
output_schema = FlowOutput,
private_schema = PrivateState
)
def build_feature_delivery_flow ( aflow : Flow) -> Flow:
# Initialize state
init_state = aflow.script(
name = "init_state" ,
script = """
flow.private.design_needed = flow.input.design_needed
flow.private.arch_needed = flow.input.arch_needed
flow.private.phases_completed = []
"""
)
# PHASE 1: Conditional Parallel (Design & Architecture)
phase1_parallel = aflow.parallel_conditions(
name = "parallel_phase1" ,
display_name = "Phase 1 - Design & Architecture"
)
design_work = phase1_parallel.script(
name = "design_work" ,
script = """
print("Starting design work...")
flow.private.phases_completed.append("Design")
print("Design work completed")
"""
)
architecture_work = phase1_parallel.script(
name = "architecture_work" ,
script = """
print("Starting architecture work...")
flow.private.phases_completed.append("Architecture")
print("Architecture work completed")
"""
)
phase1_skip = phase1_parallel.script(
name = "phase1_skip" ,
script = "print('Phase 1: Skipping...')"
)
# Add conditions
phase1_parallel.condition(
expression = "flow.private.design_needed is True" ,
to_node = design_work
).condition(
expression = "flow.private.arch_needed is True" ,
to_node = architecture_work
).condition(
default = True ,
to_node = phase1_skip
)
phase1_parallel.sequence(design_work, END )
phase1_parallel.sequence(architecture_work, END )
phase1_parallel.sequence(phase1_skip, END )
# PHASE 2: Unconditional Parallel (Development)
phase2_parallel = aflow.parallel(
evaluator = None ,
name = "parallel_phase2" ,
display_name = "Phase 2 - Development"
)
squad1_work = phase2_parallel.script(
name = "squad1_work" ,
script = "print('Squad 1: Development completed')"
)
squad2_work = phase2_parallel.script(
name = "squad2_work" ,
script = "print('Squad 2: Development completed')"
)
squad3_work = phase2_parallel.script(
name = "squad3_work" ,
script = "print('Squad 3: Development completed')"
)
phase2_parallel.sequence( START , squad1_work, END )
phase2_parallel.sequence( START , squad2_work, END )
phase2_parallel.sequence( START , squad3_work, END )
# PHASE 3: Sequential (QA)
qa_work = aflow.script(
name = "qa_work" ,
script = """
print("Starting QA testing...")
flow.private.phases_completed.append("QA")
"""
)
finalize = aflow.script(
name = "finalize" ,
script = """
flow.output.status = "Feature delivery completed successfully"
flow.output.phases_completed = flow.private.phases_completed
"""
)
# Connect all phases
aflow.edge( START , init_state)
aflow.edge(init_state, phase1_parallel)
aflow.edge(phase1_parallel, phase2_parallel)
aflow.edge(phase2_parallel, qa_work)
aflow.edge(qa_work, finalize)
aflow.edge(finalize, END )
return aflow
See all 136 lines