Skip to main content
Use a parallel branch node to run multiple branches of your agentic workflow at the same time. When conditions match, the node runs every matching path so you can complete independent work in parallel. Parallel branch nodes differ from conditions branch nodes in these ways:
FeatureConditions branch node (conditions())Parallel branch node (parallel_conditions())
ProcessingFirst matching condition onlyALL matching conditions
SemanticsSequential evaluation such as if-else logicConcurrent evaluation
Use caseExclusive pathsConcurrent execution
MergingSingle path continuesWaits for all paths to complete

Configuring unconditional parallel branch

To configure an unconditional parallel branch node, call the parallel() method in your agentic workflow. Use this node when you want every branch to run every time. Define the following parameters in this method: Use unconditional parallel when you need to:
  • Run multiple teams at the same time
  • Send work to multiple services at the same time
  • Process data through multiple pipelines at the same time
evaluator
Union[Conditions, None]
Set to None for unconditional execution. All branches run.
name
string
Name for the parallel node. watsonx Orchestrate generates this value if you do not provide it.
display_name
string
Display name for the parallel node.
Example
from ibm_watsonx_orchestrate.flow_builder.flows import flow, Flow, START, END

@flow(name="multi_squad_development")
def build_flow(aflow: Flow) -> Flow:
    # Create unconditional parallel agentic workflow
    parallel_dev = aflow.parallel(
        evaluator=None,  # No evaluator = all paths run
        name="parallel_development",
        display_name="Development Phase"
    )
    
    # Define work for each squad (all run in parallel)
    squad1 = parallel_dev.script(
        name="squad1_work",
        script="print('Squad 1 working...')"
    )
    
    squad2 = parallel_dev.script(
        name="squad2_work",
        script="print('Squad 2 working...')"
    )
    
    squad3 = parallel_dev.script(
        name="squad3_work",
        script="print('Squad 3 working...')"
    )
    
    # Connect each squad from START to END within parallel agentic workflow
    parallel_dev.sequence(START, squad1, END)
    parallel_dev.sequence(START, squad2, END)
    parallel_dev.sequence(START, squad3, END)
    
    # Continue after all squads complete
    completion = aflow.script(
        name="all_complete",
        script="print('All squads completed!')"
    )
    
    aflow.sequence(START, parallel_dev, completion, END)
    return aflow

Configuring conditional parallel branch

To configure a conditional parallel node, call the parallel_conditions() method in your agentic workflow. Use this node when you want to run every branch whose condition matches. Define the following parameters in this method: Use conditional parallel when you need to:
  • Process work based on multiple criteria
  • Route work to multiple handlers based on attributes
  • Fan out work only when conditions match
name
string
required
Name for the parallel node. watsonx Orchestrate generates this value if you leave it empty.
display_name
string
Display name for the parallel node. The default value is the node name.

Adding conditions to parallel branch

To add conditions to the parallel branch node, call the condition() method. The node runs all matching conditions in parallel. Define the following parameters in this method:
to_node
Node
required
Node to run when the condition matches.
expression
string
Python expression to evaluate. Required unless default=True.
default
bool
When set to True, this parameter defines the default case.
Example
from pydantic import BaseModel
from ibm_watsonx_orchestrate.flow_builder.flows import flow, Flow, START, END

class TaskInput(BaseModel):
    priority: str
    category: str
    value: float

@flow(
    name="conditional_parallel_processing",
    input_schema=TaskInput
)
def build_flow(aflow: Flow) -> Flow:
    # Create conditional parallel agentic workflow
    parallel_processing = aflow.parallel_conditions(
        name="task_processing",
        display_name="Task Processing"
    )
    
    # Define handlers within the parallel subflow
    high_priority = parallel_processing.script(
        name="high_priority_handler",
        script="print('Processing high priority task')"
    )
    
    billing = parallel_processing.script(
        name="billing_handler",
        script="print('Processing billing task')"
    )
    
    large_value = parallel_processing.script(
        name="large_value_handler",
        script="print('Processing large value task')"
    )
    
    default_handler = parallel_processing.script(
        name="default_handler",
        script="print('Default processing')"
    )
    
    # Add conditions - all matching conditions run in parallel
    parallel_processing.condition(
        expression="flow.input.priority == 'high'",
        to_node=high_priority
    ).condition(
        expression="flow.input.category == 'billing'",
        to_node=billing
    ).condition(
        expression="flow.input.value > 1000",
        to_node=large_value
    ).condition(
        default=True,  # Default case (like 'else')
        to_node=default_handler
    )
    
    # Connect handlers to END within parallel agentic workflow
    parallel_processing.sequence(high_priority, END)
    parallel_processing.sequence(billing, END)
    parallel_processing.sequence(large_value, END)
    parallel_processing.sequence(default_handler, END)
    
    # Continue after all matching paths complete
    aflow.sequence(START, parallel_processing, END)
    return aflow

Expression syntax

Conditions in parallel agentic workflows use Python expressions that can access:
  • flow.input.* - Input data
  • flow.private.* - Private state
  • flow.output.* - Output data from upstream nodes
The following examples show common expression patterns you can use:
# Simple comparison
expression="flow.input.priority == 'high'"

# Multiple conditions with 'and'
expression="flow.input.priority == 'high' and flow.input.value > 1000"

# Multiple conditions with 'or'
expression="flow.input.category == 'billing' or flow.input.category == 'finance'"

# Checking boolean flags
expression="flow.private.needs_approval is True"

# Numeric comparisons
expression="flow.input.amount > 5000"

# String operations
expression="'urgent' in flow.input.tags"

Common patterns

The following examples show common ways to use parallel nodes in your agentic workflows:
Use this pattern to process data through multiple independent handlers:
parallel = flow.parallel(evaluator=None, name="fan_out")

# Create multiple processors
processor1 = parallel.tool(name="process_type_a", tool=process_a)
processor2 = parallel.tool(name="process_type_b", tool=process_b)
processor3 = parallel.tool(name="process_type_c", tool=process_c)

# All processors run in parallel
parallel.sequence(START, processor1, END)
parallel.sequence(START, processor2, END)
parallel.sequence(START, processor3, END)

Best practices

Follow these best practices when you work with parallel nodes:
  • Use parallel() when every branch must run
  • Use parallel_conditions() when branch execution depends on conditions
  • Use conditions() (branch) when only one path must run. For more information, see the Conditions branch node
Important:Do not create parallel paths that loop back to previous nodes. This pattern creates unlimited parallel threads and can cause runtime issues.
The following example shows an incorrect pattern:
# DON'T: This creates infinite parallel threads
parallel_flow = flow.parallel_conditions()
node1 = parallel_flow.script(name="node1", script="...")
node2 = parallel_flow.script(name="node2", script="...")

parallel_flow.condition(expression="True", to_node=node1)
parallel_flow.edge(node1, node2)
parallel_flow.edge(node2, node1)  # ❌ Loop back - creates infinite threads!
Parallel agentic workflows merge at the END node inside the parallel subagentic workflow. The parent agentic workflow continues only after every parallel branch completes.
Example
parallel_agentic_workflow = flow.parallel_conditions(name="processing")

# Define nodes
node1 = parallel_agentic_workflow.script(name="task1", script="...")
node2 = parallel_agentic_workflow.script(name="task2", script="...")

# Add conditions
parallel_agentic_workflow.condition(expression="...", to_node=node1)
parallel_agentic_workflow.condition(expression="...", to_node=node2)

# Connect to END - automatic merge point
parallel_agentic_workflow.sequence(node1, END)
parallel_agentic_workflow.sequence(node2, END)

# This node runs only after ALL parallel paths complete
next_node = flow.script(name="after_parallel", script="...")
flow.edge(parallel_agentic_workflow, next_node)
Each parallel branch can fail independently. Add error handling so you can manage failures:
parallel_agentic_workflow = flow.parallel_conditions(name="processing")

task1 = parallel_agentic_workflow.script(
    name="task1",
    script="...",
    error_handler=NodeErrorHandlerConfig(
        on_error="continue",  # Continue even if this branch fails
        max_retries=3
    )
)

Example: Feature delivery agentic workflow

This example shows how you can use both conditional and unconditional parallel processing to complete feature delivery work.
Example
from typing import List
from pydantic import BaseModel, Field
from ibm_watsonx_orchestrate.flow_builder.flows import (
    Flow, flow, ScriptNode, START, END
)

class FlowInput(BaseModel):
    feature_name: str = Field(description="Name of the feature to deliver")
    design_needed: bool = Field(default=True, description="Whether design work is needed")
    arch_needed: bool = Field(default=True, description="Whether architecture work is needed")

class FlowOutput(BaseModel):
    status: str = Field(description="Final status of the feature delivery")
    phases_completed: List[str] = Field(description="List of completed phases")

class PrivateState(BaseModel):
    design_needed: bool = Field(default=False)
    arch_needed: bool = Field(default=False)
    phases_completed: List[str] = Field(default_factory=list)

@flow(
    name="feature_delivery_workflow",
    display_name="Feature Delivery Agentic Workflow",
    description="An agentic workflow demonstrating parallel execution in multiple phases",
    input_schema=FlowInput,
    output_schema=FlowOutput,
    private_schema=PrivateState
)
def build_feature_delivery_flow(aflow: Flow) -> Flow:
    # Initialize state
    init_state = aflow.script(
        name="init_state",
        script="""
flow.private.design_needed = flow.input.design_needed
flow.private.arch_needed = flow.input.arch_needed
flow.private.phases_completed = []
        """
    )
    
    # PHASE 1: Conditional Parallel (Design & Architecture)
    phase1_parallel = aflow.parallel_conditions(
        name="parallel_phase1",
        display_name="Phase 1 - Design & Architecture"
    )
    
    design_work = phase1_parallel.script(
        name="design_work",
        script="""
print("Starting design work...")
flow.private.phases_completed.append("Design")
print("Design work completed")
        """
    )
    
    architecture_work = phase1_parallel.script(
        name="architecture_work",
        script="""
print("Starting architecture work...")
flow.private.phases_completed.append("Architecture")
print("Architecture work completed")
        """
    )
    
    phase1_skip = phase1_parallel.script(
        name="phase1_skip",
        script="print('Phase 1: Skipping...')"
    )
    
    # Add conditions
    phase1_parallel.condition(
        expression="flow.private.design_needed is True",
        to_node=design_work
    ).condition(
        expression="flow.private.arch_needed is True",
        to_node=architecture_work
    ).condition(
        default=True,
        to_node=phase1_skip
    )
    
    phase1_parallel.sequence(design_work, END)
    phase1_parallel.sequence(architecture_work, END)
    phase1_parallel.sequence(phase1_skip, END)
    
    # PHASE 2: Unconditional Parallel (Development)
    phase2_parallel = aflow.parallel(
        evaluator=None,
        name="parallel_phase2",
        display_name="Phase 2 - Development"
    )
    
    squad1_work = phase2_parallel.script(
        name="squad1_work",
        script="print('Squad 1: Development completed')"
    )
    
    squad2_work = phase2_parallel.script(
        name="squad2_work",
        script="print('Squad 2: Development completed')"
    )
    
    squad3_work = phase2_parallel.script(
        name="squad3_work",
        script="print('Squad 3: Development completed')"
    )
    
    phase2_parallel.sequence(START, squad1_work, END)
    phase2_parallel.sequence(START, squad2_work, END)
    phase2_parallel.sequence(START, squad3_work, END)
    
    # PHASE 3: Sequential (QA)
    qa_work = aflow.script(
        name="qa_work",
        script="""
print("Starting QA testing...")
flow.private.phases_completed.append("QA")
        """
    )
    
    finalize = aflow.script(
        name="finalize",
        script="""
flow.output.status = "Feature delivery completed successfully"
flow.output.phases_completed = flow.private.phases_completed
        """
    )
    
    # Connect all phases
    aflow.edge(START, init_state)
    aflow.edge(init_state, phase1_parallel)
    aflow.edge(phase1_parallel, phase2_parallel)
    aflow.edge(phase2_parallel, qa_work)
    aflow.edge(qa_work, finalize)
    aflow.edge(finalize, END)
    
    return aflow