> ## Documentation Index
> Fetch the complete documentation index at: https://developer.watson-orchestrate.ibm.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Foreach node

Use a foreach node to create a sub agentic workflow that repeats nodes for each item in the input schema.

To configure a foreach node in an agentic workflow, call the `foreach()` method to define a nested agentic workflow.  In this method, define the following parameters:

| Parameter      | Type      | Required | Description                                                                     |
| -------------- | --------- | -------- | ------------------------------------------------------------------------------- |
| item\_schema   | BaseModel | Yes      | The schema that defines each item to iterate over.                              |
| input\_schema  | BaseModel | No       | The input schema for the nested agentic workflow executed by the foreach node.  |
| output\_schema | BaseModel | No       | The output schema for the nested agentic workflow executed by the foreach node. |

```py Example theme={null}
foreach_flow: Flow = aflow.foreach(item_schema = CustomerRecord, output_schema=Invitations)
```

Next, call the foreach agentic workflow for the node you want to repeat. You can include more than one node in the same foreach flow.

```py Example theme={null}
node2 = foreach_flow.tool(send_invitation_email)
foreach_flow.sequence(START, node2, END)
```

The example below shows how to configure a foreach node:

```py Python [expandable] theme={null}
"""
In this example, the user is retrieving a set of email addresses from a contact list, and 
for each email address, sending out an invitation.
"""

from typing import List

from pydantic import BaseModel, Field

from ibm_watsonx_orchestrate.agent_builder.tools import tool, ToolPermission
from ibm_watsonx_orchestrate.flow_builder.flows import Flow, flow, START, END

from .send_invitation_email import send_invitation_email
from .get_emails_from_customer import get_emails_from_customer, CustomerRecord


class CustomerName(BaseModel):
    name: str = Field(description="The name of the customer")

class Invitations(BaseModel):
    invitations: List[str]

@flow(
    name="send_invitation_to_customer",
    input_schema=CustomerName,
    output_schema=None
)
def build_send_invitation_to_customer_flow(aflow: Flow) -> Flow:
    """ Given a list of customers, we will iterate through the list and send email to each """
    
    get_customer_list_node = aflow.tool(get_emails_from_customer)

    # calling add_foreach will create a subflow, and we can add more node to the subflow
    foreach_flow: Flow = aflow.foreach(item_schema = CustomerRecord, output_schema=Invitations)
    
    node2 = foreach_flow.tool(send_invitation_email)
    foreach_flow.sequence(START, node2, END)

    # add the foreach flow to the main flow
    aflow.edge(START, get_customer_list_node)
    aflow.edge(get_customer_list_node, foreach_flow)
    aflow.edge(foreach_flow, END)

    return aflow

```

### Foreach processing method

You can configure how your foreach node processes input items. There are two processing methods:

* **Sequential**

  A sequential foreach processes items one after another. The next item starts only after the previous one finishes. Use this method when the order of item processing affects the result.

* **Parallel**

  A parallel foreach processes one or more items at the same time. In most cases, this method runs faster than the sequential approach.

To set the processing method, call the `policy()` method in your foreach node. This method accepts the following parameter:

| Parameter | Type          | Required | Description                                                                                                                                  |
| --------- | ------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------------- |
| kind      | ForeachPolicy | Yes      | Defines the processing method. Use `ForeachPolicy.SEQUENTIAL` for sequential processing or `ForeachPolicy.PARALLEL` for parallel processing. |

```py Python [expandable] {34-37} theme={null}
"""
In this example, the user is retrieving a set of email addresses from a contact list, and 
for each email address, sending out an invitation.
"""

from typing import List

from pydantic import BaseModel, Field

from ibm_watsonx_orchestrate.agent_builder.tools import tool, ToolPermission
from ibm_watsonx_orchestrate.flow_builder.flows import Flow, flow, START, END
from ibm_watsonx_orchestrate.flow_builder.types import ForeachPolicy

from .send_invitation_email import send_invitation_email
from .get_emails_from_customer import get_emails_from_customer, CustomerRecord


class CustomerName(BaseModel):
    name: str = Field(description="The name of the customer")

class Invitations(BaseModel):
    invitations: List[str]

@flow(
    name="send_invitation_to_customer",
    input_schema=CustomerName,
    output_schema=None
)
def build_send_invitation_to_customer_flow(aflow: Flow) -> Flow:
    """ Given a list of customers, we will iterate through the list and send email to each """
    
    get_customer_list_node = aflow.tool(get_emails_from_customer)

    # calling add_foreach will create a subflow, and we can add more node to the subflow
    foreach_flow: Flow = aflow.foreach(item_schema = CustomerRecord, output_schema=Invitations) \
        .policy(kind=ForeachPolicy.SEQUENTIAL)
        # .policy(kind=ForeachPolicy.PARALLEL) # replace with 'ForeachPolicy.PARALLEL' if need to run .foreach in parallel
    
    node2 = foreach_flow.tool(send_invitation_email)
    foreach_flow.sequence(START, node2, END)

    # add the foreach flow to the main flow
    aflow.edge(START, get_customer_list_node)
    aflow.edge(get_customer_list_node, foreach_flow)
    aflow.edge(foreach_flow, END)

    return aflow
```
