Skip to content

Latest commit

 

History

History
435 lines (322 loc) · 16.5 KB

File metadata and controls

435 lines (322 loc) · 16.5 KB
title Microsoft Agent Framework Workflows - Checkpoints
description In-depth look at Checkpoints in Microsoft Agent Framework Workflows.
zone_pivot_groups programming-languages
author TaoChenOSU
ms.topic tutorial
ms.author taochen
ms.date 03/11/2026
ms.service agent-framework

Microsoft Agent Framework Workflows - Checkpoints

This page provides an overview of Checkpoints in the Microsoft Agent Framework Workflow system.

Overview

Checkpoints allow you to save the state of a workflow at specific points during its execution, and resume from those points later. This feature is particularly useful for the following scenarios:

  • Long-running workflows where you want to avoid losing progress in case of failures.
  • Long-running workflows where you want to pause and resume execution at a later time.
  • Workflows that require periodic state saving for auditing or compliance purposes.
  • Workflows that need to be migrated across different environments or instances.

When Are Checkpoints Created?

Remember that workflows are executed in supersteps, as documented in the core concepts. Checkpoints are created at the end of each superstep, after all executors in that superstep have completed their execution. A checkpoint captures the entire state of the workflow, including:

  • The current state of all executors
  • All pending messages in the workflow for the next superstep
  • Pending requests and responses
  • Shared states

Capturing Checkpoints

::: zone pivot="programming-language-csharp"

To enable checkpointing, a CheckpointManager needs to be provided when running the workflow. A checkpoint can then be accessed via a SuperStepCompletedEvent, or through the Checkpoints property on the run.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

::: zone-end

::: zone pivot="programming-language-python"

To enable checkpointing, a CheckpointStorage needs to be provided when creating a workflow. A checkpoint can then be accessed via the storage. Agent Framework ships three built-in implementations — pick the one that matches your durability and deployment needs:

Provider Package Durability Best for
InMemoryCheckpointStorage agent-framework In-process only Tests, demos, short-lived workflows
FileCheckpointStorage agent-framework Local disk Single-machine workflows, local development
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB Production, distributed, cross-process workflows

All three implement the same CheckpointStorage protocol, so you can swap providers without changing workflow or executor code.

InMemoryCheckpointStorage keeps checkpoints in process memory. Best for tests, demos, and short-lived workflows where you do not need durability across restarts.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

FileCheckpointStorage persists checkpoints to a local directory on disk. Best for single-machine workflows that need to survive process restarts, and for local development.

from agent_framework import (
    FileCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage backed by a directory on disk.
# storage_path is required — there is no default directory.
checkpoint_storage = FileCheckpointStorage("/var/lib/agent-framework/checkpoints")

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

See the Security Considerations section for guidance on restricting which Python types can be deserialized via the allowed_checkpoint_types parameter.

CosmosCheckpointStorage persists checkpoints to Azure Cosmos DB NoSQL. Best for production and distributed workflows that need durable, cross-process checkpointing. Install the optional provider package:

pip install agent-framework-azure-cosmos --pre

The database and container are created automatically on first use, with /workflow_name as the partition key for efficient per-workflow queries. The recommended authentication mode is managed identity / RBAC via an Azure TokenCredential such as DefaultAzureCredential:

from azure.identity.aio import DefaultAzureCredential
from agent_framework import WorkflowBuilder
from agent_framework_azure_cosmos import CosmosCheckpointStorage

# CosmosCheckpointStorage is an async context manager — it closes the underlying
# Cosmos client on exit when it created the client itself.
async with (
    DefaultAzureCredential() as credential,
    CosmosCheckpointStorage(
        endpoint="https://<account>.documents.azure.com:443/",
        credential=credential,
        database_name="agent-framework",
        container_name="workflow-checkpoints",
    ) as checkpoint_storage,
):
    # Build a workflow with checkpointing enabled
    builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
    builder.add_edge(start_executor, executor_b)
    builder.add_edge(executor_b, executor_c)
    builder.add_edge(executor_b, end_executor)
    workflow = builder.build()

    # Run the workflow
    async for event in workflow.run(input, stream=True):
        ...

    # Access checkpoints from the storage
    checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Account key authentication is also supported by passing the key directly as the credential argument:

from agent_framework_azure_cosmos import CosmosCheckpointStorage

checkpoint_storage = CosmosCheckpointStorage(
    endpoint="https://<account>.documents.azure.com:443/",
    credential="<your-account-key>",
    database_name="agent-framework",
    container_name="workflow-checkpoints",
)

Connection details can also be supplied entirely through environment variables:

Variable Description
AZURE_COSMOS_ENDPOINT Cosmos DB account endpoint
AZURE_COSMOS_DATABASE_NAME Database name
AZURE_COSMOS_CONTAINER_NAME Container name
AZURE_COSMOS_KEY Account key (optional if using Azure credentials)

CosmosCheckpointStorage also accepts a pre-created CosmosClient (via cosmos_client=) or ContainerProxy (via container_client=) if your application already manages the Cosmos client lifecycle.


::: zone-end

Resuming from Checkpoints

::: zone pivot="programming-language-csharp"

You can resume a workflow from a specific checkpoint directly on the same run.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

::: zone-end

::: zone pivot="programming-language-python"

You can resume a workflow from a specific checkpoint directly on the same workflow instance.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

::: zone-end

Rehydrating from Checkpoints

::: zone pivot="programming-language-csharp"

Or you can rehydrate a workflow from a checkpoint into a new run instance.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

::: zone-end

::: zone pivot="programming-language-python"

Or you can rehydrate a new workflow instance from a checkpoint.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

::: zone-end

Save Executor States

::: zone pivot="programming-language-csharp"

To ensure that the state of an executor is captured in a checkpoint, the executor must override the OnCheckpointingAsync method and save its state to the workflow context.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Also, to ensure the state is correctly restored when resuming from a checkpoint, the executor must override the OnCheckpointRestoredAsync method and load its state from the workflow context.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

::: zone-end

::: zone pivot="programming-language-python"

To ensure that the state of an executor is captured in a checkpoint, the executor must override the on_checkpoint_save method and return its state as a dictionary.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Also, to ensure the state is correctly restored when resuming from a checkpoint, the executor must override the on_checkpoint_restore method and restore its state from the provided state dictionary.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

::: zone-end

Security Considerations

Important

Checkpoint storage is a trust boundary. Whether you use the built-in storage implementations or a custom one, the storage backend must be treated as trusted, private infrastructure. Never load checkpoints from untrusted or potentially tampered sources.

::: zone pivot="programming-language-csharp"

Ensure that the storage location used for checkpoints is secured appropriately. Only authorized services and users should have read or write access to checkpoint data.

::: zone-end

::: zone pivot="programming-language-python"

Pickle serialization

Both FileCheckpointStorage and CosmosCheckpointStorage use Python's pickle module to serialize non-JSON-native state such as dataclasses, datetimes, and custom objects. To mitigate the risks of arbitrary code execution during deserialization, both providers use a restricted unpickler by default. Only a built-in set of safe Python types (primitives, datetime, uuid, Decimal, common collections, etc.) and all agent_framework internal types are permitted during deserialization. Any other type encountered in a checkpoint causes deserialization to fail with a WorkflowCheckpointException.

To allow additional application-specific types, pass them via the allowed_checkpoint_types parameter using "module:qualname" format:

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage accepts the same parameter:

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

If your threat model does not permit pickle-based serialization at all, use InMemoryCheckpointStorage or implement a custom CheckpointStorage with an alternative serialization strategy.

Storage location responsibility

FileCheckpointStorage requires an explicit storage_path parameter — there is no default directory. While the framework validates against path traversal attacks, securing the storage directory itself (file permissions, encryption at rest, access controls) is the developer's responsibility. Only authorized processes should have read or write access to the checkpoint directory.

CosmosCheckpointStorage relies on Azure Cosmos DB for storage. Use managed identity / RBAC where possible, scope the database and container to the workflow service, and rotate account keys if you use key-based auth. As with file storage, only authorized principals should have read or write access to the Cosmos DB container that holds checkpoint documents.

::: zone-end

Next Steps