Skip to main content
Use a foreach node to create a sub agentic workflow that repeats nodes for each item in the input schema. To configure a foreach node in an agentic workflow, call the foreach() method to define a nested agentic workflow. In this method, define the following parameters:
ParameterTypeRequiredDescription
item_schemaBaseModelYesThe schema that defines each item to iterate over.
input_schemaBaseModelNoThe input schema for the nested agentic workflow executed by the foreach node.
output_schemaBaseModelNoThe output schema for the nested agentic workflow executed by the foreach node.
Example
foreach_flow: Flow = aflow.foreach(item_schema = CustomerRecord, output_schema=Invitations)
Next, call the foreach agentic workflow for the node you want to repeat. You can include more than one node in the same foreach flow.
Example
node2 = foreach_flow.tool(send_invitation_email)
foreach_flow.sequence(START, node2, END)
The example below shows how to configure a foreach node:
Python
"""
In this example, the user is retrieving a set of email addresses from a contact list, and 
for each email address, sending out an invitation.
"""

from typing import List

from pydantic import BaseModel, Field

from ibm_watsonx_orchestrate.agent_builder.tools import tool, ToolPermission
from ibm_watsonx_orchestrate.flow_builder.flows import Flow, flow, START, END

from .send_invitation_email import send_invitation_email
from .get_emails_from_customer import get_emails_from_customer, CustomerRecord


class CustomerName(BaseModel):
    name: str = Field(description="The name of the customer")

class Invitations(BaseModel):
    invitations: List[str]

@flow(
    name="send_invitation_to_customer",
    input_schema=CustomerName,
    output_schema=None
)
def build_send_invitation_to_customer_flow(aflow: Flow) -> Flow:
    """ Given a list of customers, we will iterate through the list and send email to each """
    
    get_customer_list_node = aflow.tool(get_emails_from_customer)

    # calling add_foreach will create a subflow, and we can add more node to the subflow
    foreach_flow: Flow = aflow.foreach(item_schema = CustomerRecord, output_schema=Invitations)
    
    node2 = foreach_flow.tool(send_invitation_email)
    foreach_flow.sequence(START, node2, END)

    # add the foreach flow to the main flow
    aflow.edge(START, get_customer_list_node)
    aflow.edge(get_customer_list_node, foreach_flow)
    aflow.edge(foreach_flow, END)

    return aflow

Foreach processing method

You can configure how your foreach node processes input items. There are two processing methods:
  • Sequential A sequential foreach processes items one after another. The next item starts only after the previous one finishes. Use this method when the order of item processing affects the result.
  • Parallel A parallel foreach processes one or more items at the same time. In most cases, this method runs faster than the sequential approach.
To set the processing method, call the policy() method in your foreach node. This method accepts the following parameter:
ParameterTypeRequiredDescription
kindForeachPolicyYesDefines the processing method. Use ForeachPolicy.SEQUENTIAL for sequential processing or ForeachPolicy.PARALLEL for parallel processing.
Python
"""
In this example, the user is retrieving a set of email addresses from a contact list, and 
for each email address, sending out an invitation.
"""

from typing import List

from pydantic import BaseModel, Field

from ibm_watsonx_orchestrate.agent_builder.tools import tool, ToolPermission
from ibm_watsonx_orchestrate.flow_builder.flows import Flow, flow, START, END
from ibm_watsonx_orchestrate.flow_builder.types import ForeachPolicy

from .send_invitation_email import send_invitation_email
from .get_emails_from_customer import get_emails_from_customer, CustomerRecord


class CustomerName(BaseModel):
    name: str = Field(description="The name of the customer")

class Invitations(BaseModel):
    invitations: List[str]

@flow(
    name="send_invitation_to_customer",
    input_schema=CustomerName,
    output_schema=None
)
def build_send_invitation_to_customer_flow(aflow: Flow) -> Flow:
    """ Given a list of customers, we will iterate through the list and send email to each """
    
    get_customer_list_node = aflow.tool(get_emails_from_customer)

    # calling add_foreach will create a subflow, and we can add more node to the subflow
    foreach_flow: Flow = aflow.foreach(item_schema = CustomerRecord, output_schema=Invitations) \
        .policy(kind=ForeachPolicy.SEQUENTIAL)
        # .policy(kind=ForeachPolicy.PARALLEL) # replace with 'ForeachPolicy.PARALLEL' if need to run .foreach in parallel
    
    node2 = foreach_flow.tool(send_invitation_email)
    foreach_flow.sequence(START, node2, END)

    # add the foreach flow to the main flow
    aflow.edge(START, get_customer_list_node)
    aflow.edge(get_customer_list_node, foreach_flow)
    aflow.edge(foreach_flow, END)

    return aflow