> ## Documentation Index
> Fetch the complete documentation index at: https://developer.watson-orchestrate.ibm.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Parallel branch node

Use a parallel branch node to run multiple branches of your agentic workflow at the same time. When conditions match, the node runs every matching path so you can complete independent work in parallel.

Parallel branch nodes differ from conditions branch nodes in these ways:

| Feature        | Conditions branch node (`conditions()`)     | Parallel branch node (`parallel_conditions()`) |
| -------------- | ------------------------------------------- | ---------------------------------------------- |
| **Processing** | First matching condition only               | ALL matching conditions                        |
| **Semantics**  | Sequential evaluation such as if-else logic | Concurrent evaluation                          |
| **Use case**   | Exclusive paths                             | Concurrent execution                           |
| **Merging**    | Single path continues                       | Waits for all paths to complete                |

## Configuring unconditional parallel branch

To configure an unconditional parallel branch node, call the `parallel()` method in your agentic workflow. Use this node when you want every branch to run every time. Define the following parameters in this method:

Use unconditional parallel when you need to:

* Run multiple teams at the same time
* Send work to multiple services at the same time
* Process data through multiple pipelines at the same time

<ParamField path="evaluator" type="Union[Conditions, None]">
  Set to `None` for unconditional execution. All branches run.
</ParamField>

<ParamField path="name" type="string">
  Name for the parallel node. watsonx Orchestrate generates this value if you do not provide it.
</ParamField>

<ParamField path="display_name" type="string">
  Display name for the parallel node.
</ParamField>

```python Example [expandable] theme={null}
from ibm_watsonx_orchestrate.flow_builder.flows import flow, Flow, START, END

@flow(name="multi_squad_development")
def build_flow(aflow: Flow) -> Flow:
    # Create unconditional parallel agentic workflow
    parallel_dev = aflow.parallel(
        evaluator=None,  # No evaluator = all paths run
        name="parallel_development",
        display_name="Development Phase"
    )
    
    # Define work for each squad (all run in parallel)
    squad1 = parallel_dev.script(
        name="squad1_work",
        script="print('Squad 1 working...')"
    )
    
    squad2 = parallel_dev.script(
        name="squad2_work",
        script="print('Squad 2 working...')"
    )
    
    squad3 = parallel_dev.script(
        name="squad3_work",
        script="print('Squad 3 working...')"
    )
    
    # Connect each squad from START to END within parallel agentic workflow
    parallel_dev.sequence(START, squad1, END)
    parallel_dev.sequence(START, squad2, END)
    parallel_dev.sequence(START, squad3, END)
    
    # Continue after all squads complete
    completion = aflow.script(
        name="all_complete",
        script="print('All squads completed!')"
    )
    
    aflow.sequence(START, parallel_dev, completion, END)
    return aflow
```

## Configuring conditional parallel branch

To configure a conditional parallel node, call the `parallel_conditions()` method in your agentic workflow. Use this node when you want to run every branch whose condition matches. Define the following parameters in this method:

Use conditional parallel when you need to:

* Process work based on multiple criteria
* Route work to multiple handlers based on attributes
* Fan out work only when conditions match

<ParamField path="name" type="string" required>
  Name for the parallel node. watsonx Orchestrate generates this value if you leave it empty.
</ParamField>

<ParamField path="display_name" type="string">
  Display name for the parallel node. The default value is the node name.
</ParamField>

### Adding conditions to parallel branch

To add conditions to the parallel branch node, call the `condition()` method. The node runs all matching conditions in parallel. Define the following parameters in this method:

<ParamField path="to_node" type="Node" required>
  Node to run when the condition matches.
</ParamField>

<ParamField path="expression" type="string">
  Python expression to evaluate. Required unless `default=True`.
</ParamField>

<ParamField path="default" type="bool">
  When set to `True`, this parameter defines the default case.
</ParamField>

```python Example [expandable] theme={null}
from pydantic import BaseModel
from ibm_watsonx_orchestrate.flow_builder.flows import flow, Flow, START, END

class TaskInput(BaseModel):
    priority: str
    category: str
    value: float

@flow(
    name="conditional_parallel_processing",
    input_schema=TaskInput
)
def build_flow(aflow: Flow) -> Flow:
    # Create conditional parallel agentic workflow
    parallel_processing = aflow.parallel_conditions(
        name="task_processing",
        display_name="Task Processing"
    )
    
    # Define handlers within the parallel subflow
    high_priority = parallel_processing.script(
        name="high_priority_handler",
        script="print('Processing high priority task')"
    )
    
    billing = parallel_processing.script(
        name="billing_handler",
        script="print('Processing billing task')"
    )
    
    large_value = parallel_processing.script(
        name="large_value_handler",
        script="print('Processing large value task')"
    )
    
    default_handler = parallel_processing.script(
        name="default_handler",
        script="print('Default processing')"
    )
    
    # Add conditions - all matching conditions run in parallel
    parallel_processing.condition(
        expression="flow.input.priority == 'high'",
        to_node=high_priority
    ).condition(
        expression="flow.input.category == 'billing'",
        to_node=billing
    ).condition(
        expression="flow.input.value > 1000",
        to_node=large_value
    ).condition(
        default=True,  # Default case (like 'else')
        to_node=default_handler
    )
    
    # Connect handlers to END within parallel agentic workflow
    parallel_processing.sequence(high_priority, END)
    parallel_processing.sequence(billing, END)
    parallel_processing.sequence(large_value, END)
    parallel_processing.sequence(default_handler, END)
    
    # Continue after all matching paths complete
    aflow.sequence(START, parallel_processing, END)
    return aflow
```

## Expression syntax

Conditions in parallel agentic workflows use Python expressions that can access:

* `flow.input.*` - Input data
* `flow.private.*` - Private state
* `flow.output.*` - Output data from upstream nodes

The following examples show common expression patterns you can use:

```python theme={null}
# Simple comparison
expression="flow.input.priority == 'high'"

# Multiple conditions with 'and'
expression="flow.input.priority == 'high' and flow.input.value > 1000"

# Multiple conditions with 'or'
expression="flow.input.category == 'billing' or flow.input.category == 'finance'"

# Checking boolean flags
expression="flow.private.needs_approval is True"

# Numeric comparisons
expression="flow.input.amount > 5000"

# String operations
expression="'urgent' in flow.input.tags"
```

## Common patterns

The following examples show common ways to use parallel nodes in your agentic workflows:

<Tabs>
  <Tab title="Fan-Out Processing">
    Use this pattern to process data through multiple independent handlers:

    ```python theme={null}
    parallel = flow.parallel(evaluator=None, name="fan_out")

    # Create multiple processors
    processor1 = parallel.tool(name="process_type_a", tool=process_a)
    processor2 = parallel.tool(name="process_type_b", tool=process_b)
    processor3 = parallel.tool(name="process_type_c", tool=process_c)

    # All processors run in parallel
    parallel.sequence(START, processor1, END)
    parallel.sequence(START, processor2, END)
    parallel.sequence(START, processor3, END)
    ```
  </Tab>

  <Tab title="Conditional Multi-Handler">
    Use this pattern to route work to multiple handlers based on attributes:

    ```python theme={null}
    parallel = flow.parallel_conditions(name="multi_handler")

    handler1 = parallel.tool(name="handler1", tool=handle_priority)
    handler2 = parallel.tool(name="handler2", tool=handle_category)
    handler3 = parallel.tool(name="handler3", tool=handle_value)

    parallel.condition(
        expression="flow.input.is_priority",
        to_node=handler1
    ).condition(
        expression="flow.input.category == 'special'",
        to_node=handler2
    ).condition(
        expression="flow.input.value > 1000",
        to_node=handler3
    )
    ```
  </Tab>

  <Tab title="Parallel with Aggregation">
    Use this pattern to run parallel tasks and aggregate results:

    ```python Example [expandable] theme={null}
    # Parallel processing
    parallel = flow.parallel(evaluator=None, name="parallel_tasks")

    task1 = parallel.script(name="task1", script="...")
    task2 = parallel.script(name="task2", script="...")
    task3 = parallel.script(name="task3", script="...")

    parallel.sequence(START, task1, END)
    parallel.sequence(START, task2, END)
    parallel.sequence(START, task3, END)

    # Aggregate results after all tasks complete
    aggregate = flow.script(
        name="aggregate_results",
        script="""
    # All parallel tasks have completed
    # Aggregate their results here
    flow.output.combined_results = [...]
        """
    )

    flow.sequence(START, parallel, aggregate, END)
    ```
  </Tab>
</Tabs>

## Best practices

Follow these best practices when you work with parallel nodes:

<AccordionGroup>
  <Accordion title="Choose the Right Node Type">
    * **Use `parallel()`** when every branch must run
    * **Use `parallel_conditions()`** when branch execution depends on conditions
    * **Use `conditions()` (branch)** when only one path must run. For more information, see the [Conditions branch node](./branch_node)
  </Accordion>

  <Accordion title="Avoid Infinite Loops">
    <Warning>
      **Important:**

      Do not create parallel paths that loop back to previous nodes. This pattern creates unlimited parallel threads and can cause runtime issues.
    </Warning>

    The following example shows an **incorrect** pattern:

    ```python theme={null}
    # DON'T: This creates infinite parallel threads
    parallel_flow = flow.parallel_conditions()
    node1 = parallel_flow.script(name="node1", script="...")
    node2 = parallel_flow.script(name="node2", script="...")

    parallel_flow.condition(expression="True", to_node=node1)
    parallel_flow.edge(node1, node2)
    parallel_flow.edge(node2, node1)  # ❌ Loop back - creates infinite threads!
    ```
  </Accordion>

  <Accordion title="Merge Parallel Paths">
    Parallel agentic workflows merge at the `END` node inside the parallel subagentic workflow. The parent agentic workflow continues only after every parallel branch completes.

    ```python Example [expandable] theme={null}
    parallel_agentic_workflow = flow.parallel_conditions(name="processing")

    # Define nodes
    node1 = parallel_agentic_workflow.script(name="task1", script="...")
    node2 = parallel_agentic_workflow.script(name="task2", script="...")

    # Add conditions
    parallel_agentic_workflow.condition(expression="...", to_node=node1)
    parallel_agentic_workflow.condition(expression="...", to_node=node2)

    # Connect to END - automatic merge point
    parallel_agentic_workflow.sequence(node1, END)
    parallel_agentic_workflow.sequence(node2, END)

    # This node runs only after ALL parallel paths complete
    next_node = flow.script(name="after_parallel", script="...")
    flow.edge(parallel_agentic_workflow, next_node)
    ```
  </Accordion>

  <Accordion title="Handle Errors in Parallel Branches">
    Each parallel branch can fail independently. Add error handling so you can manage failures:

    ```python theme={null}
    parallel_agentic_workflow = flow.parallel_conditions(name="processing")

    task1 = parallel_agentic_workflow.script(
        name="task1",
        script="...",
        error_handler=NodeErrorHandlerConfig(
            on_error="continue",  # Continue even if this branch fails
            max_retries=3
        )
    )
    ```
  </Accordion>
</AccordionGroup>

## Example: Feature delivery agentic workflow

This example shows how you can use both conditional and unconditional parallel processing to complete feature delivery work.

```python Example [expandable] theme={null}
from typing import List
from pydantic import BaseModel, Field
from ibm_watsonx_orchestrate.flow_builder.flows import (
    Flow, flow, ScriptNode, START, END
)

class FlowInput(BaseModel):
    feature_name: str = Field(description="Name of the feature to deliver")
    design_needed: bool = Field(default=True, description="Whether design work is needed")
    arch_needed: bool = Field(default=True, description="Whether architecture work is needed")

class FlowOutput(BaseModel):
    status: str = Field(description="Final status of the feature delivery")
    phases_completed: List[str] = Field(description="List of completed phases")

class PrivateState(BaseModel):
    design_needed: bool = Field(default=False)
    arch_needed: bool = Field(default=False)
    phases_completed: List[str] = Field(default_factory=list)

@flow(
    name="feature_delivery_workflow",
    display_name="Feature Delivery Agentic Workflow",
    description="An agentic workflow demonstrating parallel execution in multiple phases",
    input_schema=FlowInput,
    output_schema=FlowOutput,
    private_schema=PrivateState
)
def build_feature_delivery_flow(aflow: Flow) -> Flow:
    # Initialize state
    init_state = aflow.script(
        name="init_state",
        script="""
flow.private.design_needed = flow.input.design_needed
flow.private.arch_needed = flow.input.arch_needed
flow.private.phases_completed = []
        """
    )
    
    # PHASE 1: Conditional Parallel (Design & Architecture)
    phase1_parallel = aflow.parallel_conditions(
        name="parallel_phase1",
        display_name="Phase 1 - Design & Architecture"
    )
    
    design_work = phase1_parallel.script(
        name="design_work",
        script="""
print("Starting design work...")
flow.private.phases_completed.append("Design")
print("Design work completed")
        """
    )
    
    architecture_work = phase1_parallel.script(
        name="architecture_work",
        script="""
print("Starting architecture work...")
flow.private.phases_completed.append("Architecture")
print("Architecture work completed")
        """
    )
    
    phase1_skip = phase1_parallel.script(
        name="phase1_skip",
        script="print('Phase 1: Skipping...')"
    )
    
    # Add conditions
    phase1_parallel.condition(
        expression="flow.private.design_needed is True",
        to_node=design_work
    ).condition(
        expression="flow.private.arch_needed is True",
        to_node=architecture_work
    ).condition(
        default=True,
        to_node=phase1_skip
    )
    
    phase1_parallel.sequence(design_work, END)
    phase1_parallel.sequence(architecture_work, END)
    phase1_parallel.sequence(phase1_skip, END)
    
    # PHASE 2: Unconditional Parallel (Development)
    phase2_parallel = aflow.parallel(
        evaluator=None,
        name="parallel_phase2",
        display_name="Phase 2 - Development"
    )
    
    squad1_work = phase2_parallel.script(
        name="squad1_work",
        script="print('Squad 1: Development completed')"
    )
    
    squad2_work = phase2_parallel.script(
        name="squad2_work",
        script="print('Squad 2: Development completed')"
    )
    
    squad3_work = phase2_parallel.script(
        name="squad3_work",
        script="print('Squad 3: Development completed')"
    )
    
    phase2_parallel.sequence(START, squad1_work, END)
    phase2_parallel.sequence(START, squad2_work, END)
    phase2_parallel.sequence(START, squad3_work, END)
    
    # PHASE 3: Sequential (QA)
    qa_work = aflow.script(
        name="qa_work",
        script="""
print("Starting QA testing...")
flow.private.phases_completed.append("QA")
        """
    )
    
    finalize = aflow.script(
        name="finalize",
        script="""
flow.output.status = "Feature delivery completed successfully"
flow.output.phases_completed = flow.private.phases_completed
        """
    )
    
    # Connect all phases
    aflow.edge(START, init_state)
    aflow.edge(init_state, phase1_parallel)
    aflow.edge(phase1_parallel, phase2_parallel)
    aflow.edge(phase2_parallel, qa_work)
    aflow.edge(qa_work, finalize)
    aflow.edge(finalize, END)
    
    return aflow
```
