| title | Microsoft Agent Framework Workflows - Executors |
|---|---|
| description | In-depth look at Executors in Microsoft Agent Framework Workflows. |
| zone_pivot_groups | programming-languages |
| author | TaoChenOSU |
| ms.topic | conceptual |
| ms.author | taochen |
| ms.date | 03/24/2026 |
| ms.service | agent-framework |
Executors are the fundamental building blocks that process messages in a workflow. They are autonomous processing units that receive typed messages, perform operations, and can produce output messages or events.
Each executor has a unique identifier and can handle specific message types. Executors can be:
- Custom logic components — process data, call APIs, or transform messages
- AI agents — use LLMs to generate responses (see Agents in Workflows)
::: zone pivot="programming-language-csharp"
Important
The recommended way to define executor message handlers in C# is to use the [MessageHandler] attribute on methods within a partial class that derives from Executor. This uses compile-time source generation for handler registration, providing better performance, compile-time validation, and Native AOT compatibility.
Executors derive from the Executor base class and use the [MessageHandler] attribute to declare handler methods. The class must be marked partial to enable source generation.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class UppercaseExecutor() : Executor("UppercaseExecutor")
{
[MessageHandler]
private ValueTask<string> HandleAsync(string message, IWorkflowContext context)
{
string result = message.ToUpperInvariant();
return ValueTask.FromResult(result); // Return value is automatically sent to connected executors
}
}You can also send messages manually without returning a value:
internal sealed partial class UppercaseExecutor() : Executor("UppercaseExecutor")
{
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
string result = message.ToUpperInvariant();
await context.SendMessageAsync(result); // Manually send messages to connected executors
}
}Tip
Executors can hold mutable state. If a stateful executor is shared across workflow runs, it must implement IResettableExecutor to clear stale state between runs. See Resettable Executors for details.
Handle multiple input types by defining multiple [MessageHandler] methods:
internal sealed partial class SampleExecutor() : Executor("SampleExecutor")
{
[MessageHandler]
private ValueTask<string> HandleStringAsync(string message, IWorkflowContext context)
{
return ValueTask.FromResult(message.ToUpperInvariant());
}
[MessageHandler]
private ValueTask<int> HandleIntAsync(int message, IWorkflowContext context)
{
return ValueTask.FromResult(message * 2);
}
}Create an executor from a function using the BindExecutor extension method:
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");The IWorkflowContext provides methods for interacting with the workflow during execution:
SendMessageAsync— send messages to connected executorsYieldOutputAsync— produce workflow outputs returned/streamed to the caller
internal sealed partial class OutputExecutor() : Executor("OutputExecutor")
{
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
await context.YieldOutputAsync("Hello, World!");
}
}If a handler neither sends messages nor yields outputs, it can simply perform side effects:
internal sealed partial class LogExecutor() : Executor("LogExecutor")
{
[MessageHandler]
private void Handle(string message, IWorkflowContext context)
{
Console.WriteLine("Doing some work...");
}
}::: zone-end
::: zone pivot="programming-language-python"
Executors inherit from the Executor base class. Each executor uses methods decorated with the @handler decorator. Handlers must have proper type annotations to specify the message types they process.
from agent_framework import (
Executor,
WorkflowContext,
handler,
)
class UpperCase(Executor):
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
await ctx.send_message(text.upper())Create an executor from a function using the @executor decorator:
from agent_framework import (
WorkflowContext,
executor,
)
@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
await ctx.send_message(text.upper())Handle multiple input types by defining multiple handlers:
class SampleExecutor(Executor):
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(text.upper())
@handler
async def double_integer(self, number: int, ctx: WorkflowContext[int]) -> None:
await ctx.send_message(number * 2)As an alternative to type annotations, you can specify types explicitly via decorator parameters:
Important
When using explicit type parameters, you must specify all types via the decorator — you cannot mix explicit parameters with type annotations. The input parameter is required; output and workflow_output are optional.
class ExplicitTypesExecutor(Executor):
@handler(input=str, output=str)
async def to_upper_case(self, text, ctx) -> None:
await ctx.send_message(text.upper())
@handler(input=str | int, output=str)
async def handle_mixed(self, message, ctx) -> None:
await ctx.send_message(str(message).upper())
@handler(input=str, output=int, workflow_output=bool)
async def process_with_workflow_output(self, message, ctx) -> None:
await ctx.send_message(len(message))
await ctx.yield_output(True)The WorkflowContext provides methods for interacting with the workflow during execution:
send_message— send messages to connected executorsyield_output— produce workflow outputs returned/streamed to the caller
class OutputExecutor(Executor):
@handler
async def handle(self, message: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output("Hello, World!")If a handler neither sends messages nor yields outputs, no type parameter is needed:
class LogExecutor(Executor):
@handler
async def handle(self, message: str, ctx: WorkflowContext) -> None:
print("Doing some work...")::: zone-end
[!div class="nextstepaction"] Edges