Skip to content

Commit 6ab8439

Browse files
authored
Merge pull request #754 from MicrosoftDocs/main
Updated based on consolidated workflow APIs (#753)
2 parents eb120a8 + 771cadc commit 6ab8439

4 files changed

Lines changed: 226 additions & 85 deletions

File tree

agent-framework/migration-guide/from-autogen/index.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1521,8 +1521,8 @@ async def checkpoint_resume_example():
15211521

15221522
# Create new workflow instance and resume
15231523
new_workflow = create_workflow(checkpoint_storage)
1524-
async for event in new_workflow.run_stream_from_checkpoint(
1525-
chosen_checkpoint_id,
1524+
async for event in new_workflow.run_stream(
1525+
checkpoint_id=chosen_checkpoint_id,
15261526
checkpoint_storage=checkpoint_storage
15271527
):
15281528
print(f"Resumed event: {event}")
@@ -1539,8 +1539,8 @@ Checkpointing works seamlessly with human-in-the-loop workflows, allowing workfl
15391539
async def resume_with_pending_requests_example():
15401540
# Resume from checkpoint - pending requests will be re-emitted
15411541
request_info_events = []
1542-
async for event in workflow.run_stream_from_checkpoint(
1543-
checkpoint_id,
1542+
async for event in workflow.run_stream(
1543+
checkpoint_id=checkpoint_id,
15441544
checkpoint_storage=checkpoint_storage
15451545
):
15461546
if isinstance(event, RequestInfoEvent):

agent-framework/support/upgrade/requests-and-responses-upgrade-guide-python.md

Lines changed: 212 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,214 @@
11
---
2-
title: Upgrade Guide - Requests and Responses in Python Workflows from Version 1.0.0b251028
3-
description: Guide on upgrading Python workflows to support Requests and Responses in Microsoft Agent Framework.
2+
title: Upgrade Guide - Workflow APIs and Request-Response System in Python
3+
description: Guide on upgrading to consolidated workflow APIs and the new request-response system in Microsoft Agent Framework.
44
author: TaoChenOSU
55
ms.topic: tutorial
66
ms.author: taochen
7-
ms.date: 10/30/2025
7+
ms.date: 11/06/2025
88
ms.service: agent-framework
99
---
1010

11-
# Upgrade Guide: Requests and Responses in Python Workflows
11+
# Upgrade Guide: Workflow APIs and Request-Response System
1212

13-
This guide helps you upgrade your Python workflows from the previous `RequestInfoExecutor` pattern to the new integrated request-response API introduced in version [1.0.0b251104](https://github.com/microsoft/agent-framework/releases/tag/python-1.0.0b251104).
13+
This guide helps you upgrade your Python workflows to the latest API changes introduced in version [1.0.0b251104](https://github.com/microsoft/agent-framework/releases/tag/python-1.0.0b251104).
1414

1515
## Overview of Changes
1616

17-
The request-response system has been significantly simplified:
17+
This release includes two major improvements to the workflow system:
18+
19+
### 1. Consolidated Workflow Execution APIs
20+
21+
The workflow execution methods have been unified for simplicity:
22+
23+
- **Unified `run_stream()` and `run()` methods**: Replace separate checkpoint-specific methods (`run_stream_from_checkpoint()`, `run_from_checkpoint()`)
24+
- **Single interface**: Use `checkpoint_id` parameter to resume from checkpoints instead of separate methods
25+
- **Flexible checkpointing**: Configure checkpoint storage at build time or override at runtime
26+
- **Clearer semantics**: Mutually exclusive `message` (new run) and `checkpoint_id` (resume) parameters
27+
28+
### 2. Simplified Request-Response System
29+
30+
The request-response system has been streamlined:
1831

1932
- **No more `RequestInfoExecutor`**: Executors can now send requests directly
2033
- **New `@response_handler` decorator**: Replace `RequestResponse` message handlers
2134
- **Simplified request types**: No inheritance from `RequestInfoMessage` required
2235
- **Built-in capabilities**: All executors automatically support request-response functionality
2336
- **Cleaner workflow graphs**: Remove `RequestInfoExecutor` nodes from your workflows
2437

25-
## Migration Steps
38+
## Part 1: Unified Workflow Execution APIs
39+
40+
We recommend migrating to the consolidated workflow APIs first, as this forms the foundation for all workflow execution patterns.
41+
42+
### Resuming from Checkpoints
43+
44+
**Before (Old API):**
45+
46+
```python
47+
# OLD: Separate method for checkpoint resume
48+
async for event in workflow.run_stream_from_checkpoint(
49+
checkpoint_id="checkpoint-id",
50+
checkpoint_storage=checkpoint_storage
51+
):
52+
print(f"Event: {event}")
53+
```
54+
55+
**After (New API):**
56+
57+
```python
58+
# NEW: Unified method with checkpoint_id parameter
59+
async for event in workflow.run_stream(
60+
checkpoint_id="checkpoint-id",
61+
checkpoint_storage=checkpoint_storage # Optional if configured at build time
62+
):
63+
print(f"Event: {event}")
64+
```
65+
66+
**Key differences:**
67+
68+
- Use `checkpoint_id` parameter instead of separate method
69+
- Cannot provide both `message` and `checkpoint_id` (mutually exclusive)
70+
- Must provide either `message` (new run) or `checkpoint_id` (resume)
71+
- `checkpoint_storage` is optional if checkpointing was configured at build time
72+
73+
### Non-Streaming API
74+
75+
The non-streaming `run()` method follows the same pattern:
76+
77+
**Old:**
78+
79+
```python
80+
result = await workflow.run_from_checkpoint(
81+
checkpoint_id="checkpoint-id",
82+
checkpoint_storage=checkpoint_storage
83+
)
84+
```
85+
86+
**New:**
87+
88+
```python
89+
result = await workflow.run(
90+
checkpoint_id="checkpoint-id",
91+
checkpoint_storage=checkpoint_storage # Optional if configured at build time
92+
)
93+
```
94+
95+
### Checkpoint Resume with Pending Requests
96+
97+
**Important Breaking Change**: When resuming from a checkpoint that has pending `RequestInfoEvent` objects, the new API re-emits these events automatically. You must capture and respond to them.
98+
99+
**Before (Old Behavior):**
100+
101+
```python
102+
# OLD: Could provide responses directly during resume
103+
responses = {
104+
"request-id-1": "user response data",
105+
"request-id-2": "another response"
106+
}
107+
108+
async for event in workflow.run_stream_from_checkpoint(
109+
checkpoint_id="checkpoint-id",
110+
checkpoint_storage=checkpoint_storage,
111+
responses=responses # No longer supported
112+
):
113+
print(f"Event: {event}")
114+
```
115+
116+
**After (New Behavior):**
117+
118+
```python
119+
# NEW: Capture re-emitted pending requests
120+
requests: dict[str, Any] = {}
121+
122+
async for event in workflow.run_stream(checkpoint_id="checkpoint-id"):
123+
if isinstance(event, RequestInfoEvent):
124+
# Pending requests are automatically re-emitted
125+
print(f"Pending request re-emitted: {event.request_id}")
126+
requests[event.request_id] = event.data
127+
128+
# Collect user responses
129+
responses: dict[str, Any] = {}
130+
for request_id, request_data in requests.items():
131+
response = handle_request(request_data) # Your logic here
132+
responses[request_id] = response
133+
134+
# Send responses back to workflow
135+
async for event in workflow.send_responses_streaming(responses):
136+
if isinstance(event, WorkflowOutputEvent):
137+
print(f"Workflow output: {event.data}")
138+
```
139+
140+
### Complete Human-in-the-Loop Example
141+
142+
Here's a complete example showing checkpoint resume with pending human approval:
143+
144+
```python
145+
from agent_framework import (
146+
Executor,
147+
FileCheckpointStorage,
148+
RequestInfoEvent,
149+
WorkflowBuilder,
150+
WorkflowOutputEvent,
151+
WorkflowStatusEvent,
152+
handler,
153+
response_handler,
154+
)
155+
156+
# ... (Executor definitions omitted for brevity)
157+
158+
async def run_interactive_session(
159+
workflow: Workflow,
160+
initial_message: str | None = None,
161+
checkpoint_id: str | None = None,
162+
) -> str:
163+
"""Run workflow until completion, handling human input interactively."""
164+
165+
requests: dict[str, HumanApprovalRequest] = {}
166+
responses: dict[str, str] | None = None
167+
completed_output: str | None = None
168+
169+
while True:
170+
# Determine which API to call
171+
if responses:
172+
# Send responses from previous iteration
173+
event_stream = workflow.send_responses_streaming(responses)
174+
requests.clear()
175+
responses = None
176+
else:
177+
# Start new run or resume from checkpoint
178+
if initial_message:
179+
event_stream = workflow.run_stream(initial_message)
180+
elif checkpoint_id:
181+
event_stream = workflow.run_stream(checkpoint_id=checkpoint_id)
182+
else:
183+
raise ValueError("Either initial_message or checkpoint_id required")
184+
185+
# Process events
186+
async for event in event_stream:
187+
if isinstance(event, WorkflowStatusEvent):
188+
print(event)
189+
if isinstance(event, WorkflowOutputEvent):
190+
completed_output = event.data
191+
if isinstance(event, RequestInfoEvent):
192+
if isinstance(event.data, HumanApprovalRequest):
193+
requests[event.request_id] = event.data
194+
195+
# Check completion
196+
if completed_output:
197+
break
198+
199+
# Prompt for user input if we have pending requests
200+
if requests:
201+
responses = prompt_for_responses(requests)
202+
continue
203+
204+
raise RuntimeError("Workflow stopped without completing or requesting input")
205+
206+
return completed_output
207+
```
208+
209+
## Part 2: Simplified Request-Response System
210+
211+
After migrating to the unified workflow APIs, update your request-response patterns to use the new integrated system.
26212

27213
### 1. Update Imports
28214

@@ -168,81 +354,36 @@ class ApprovalRequiredExecutor(Executor):
168354
await ctx.yield_output("Rejected!")
169355
```
170356

171-
## Key Benefits of the New Pattern
357+
## Summary of Benefits
358+
359+
### Unified Workflow APIs
360+
361+
1. **Simplified Interface**: Single method for initial runs and checkpoint resume
362+
2. **Clearer Semantics**: Mutually exclusive parameters make intent explicit
363+
3. **Flexible Checkpointing**: Configure at build time or override at runtime
364+
4. **Reduced Cognitive Load**: Fewer methods to remember and maintain
365+
366+
### Request-Response System
172367

173368
1. **Simplified Architecture**: No need for separate `RequestInfoExecutor` components
174369
2. **Type Safety**: Direct type specification in `request_info()` calls
175370
3. **Cleaner Code**: Fewer imports and simpler workflow graphs
176371
4. **Better Performance**: Reduced message routing overhead
177372
5. **Enhanced Debugging**: Clearer execution flow and error handling
178373

179-
## Important: Changes to Checkpoint Resume Behavior
180-
181-
### Checkpoint Resume with Pending Requests
182-
183-
**Breaking Change**: The way workflows resume from checkpoints with pending requests has also changed.
184-
185-
**Before (Old Behavior):**
186-
187-
```python
188-
# OLD: Could provide responses directly during resume
189-
responses = {
190-
"request-id-1": "user response data",
191-
"request-id-2": "another response"
192-
}
193-
194-
async for event in workflow.run_stream_from_checkpoint(
195-
checkpoint_id="checkpoint-id",
196-
checkpoint_storage=checkpoint_storage,
197-
responses=responses # This is no longer supported
198-
):
199-
print(f"Event: {event}")
200-
```
201-
202-
**After (New Behavior):**
203-
204-
```python
205-
# NEW: Pending requests are re-emitted, must be captured and responded to
206-
requests_info_events = []
207-
async for event in workflow.run_stream_from_checkpoint(
208-
checkpoint_id="checkpoint-id",
209-
checkpoint_storage=checkpoint_storage
210-
):
211-
if isinstance(event, RequestInfoEvent):
212-
# Capture re-emitted pending requests
213-
print(f"Pending request re-emitted: {event.request_id}")
214-
requests_info_events.append(event)
215-
216-
# Handle the request and provide response
217-
# If responses are already provided, no need to handle them again
218-
responses = {}
219-
for event in requests_info_events:
220-
response = handle_request(event.data)
221-
responses[event.request_id] = response
222-
223-
# Send response back to workflow using standard mechanism
224-
async for event in workflow.send_responses_streaming(responses):
225-
if isinstance(event, WorkflowOutputEvent):
226-
print(f"Workflow completed: {event.data}")
227-
```
228-
229-
**What Changed:**
230-
231-
- You can no longer supply responses as a parameter to `run_stream_from_checkpoint()` or `run_from_checkpoint()`
232-
- When resuming from a checkpoint with pending requests, those requests will be re-emitted as `RequestInfoEvent` objects
233-
- You must capture these re-emitted events and respond using the standard `send_responses_streaming()` method or equivalent `send_response()` calls
234-
- If resuming from a checkpoint with pending requests that have already been responded to, you still need to call `run_stream_from_checkpoint()` to continue the workflow followed by `send_responses_streaming()` with the pre-supplied responses
374+
## Testing Your Migration
235375

236-
**Migration Steps for Checkpoint Resume:**
376+
### Part 1 Checklist: Workflow APIs
237377

238-
1. Remove any `responses` parameter from `run_stream_from_checkpoint()` calls
239-
2. Add event handling logic to capture re-emitted `RequestInfoEvent` objects
240-
3. Use `send_responses_streaming()` to provide responses to re-emitted requests
241-
4. Test the resume flow to ensure proper request handling
378+
1. **Update API Calls**: Replace `run_stream_from_checkpoint()` with `run_stream(checkpoint_id=...)`
379+
2. **Update API Calls**: Replace `run_from_checkpoint()` with `run(checkpoint_id=...)`
380+
3. **Remove `responses` parameter**: Delete any `responses` arguments from checkpoint resume calls
381+
4. **Add event capture**: Implement logic to capture re-emitted `RequestInfoEvent` objects
382+
5. **Test checkpoint resume**: Verify pending requests are re-emitted and handled correctly
242383

243-
### Testing Your Migration
384+
### Part 2 Checklist: Request-Response System
244385

245-
1. **Verify Imports**: Ensure no old imports remain
386+
1. **Verify Imports**: Ensure no old imports remain (`RequestInfoExecutor`, `RequestInfoMessage`, `RequestResponse`)
246387
2. **Check Request Types**: Confirm removal of `RequestInfoMessage` inheritance
247388
3. **Test Workflow Graph**: Verify removal of `RequestInfoExecutor` nodes
248389
4. **Validate Handlers**: Ensure `@response_handler` decorators are applied

agent-framework/tutorials/workflows/checkpointing-and-resuming.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ Resume execution and stream events in real-time:
522522

523523
```python
524524
# Resume from a specific checkpoint
525-
async for event in workflow.run_stream_from_checkpoint(
525+
async for event in workflow.run_stream(
526526
checkpoint_id="checkpoint-id",
527527
checkpoint_storage=checkpoint_storage
528528
):
@@ -539,7 +539,7 @@ Resume and get all results at once:
539539

540540
```python
541541
# Resume and wait for completion
542-
result = await workflow.run_from_checkpoint(
542+
result = await workflow.run(
543543
checkpoint_id="checkpoint-id",
544544
checkpoint_storage=checkpoint_storage
545545
)
@@ -556,7 +556,7 @@ When resuming from a checkpoint that contains pending requests, the workflow wil
556556
```python
557557
request_info_events = []
558558
# Resume from checkpoint - pending requests will be re-emitted
559-
async for event in workflow.run_stream_from_checkpoint(
559+
async for event in workflow.run_stream(
560560
checkpoint_id="checkpoint-id",
561561
checkpoint_storage=checkpoint_storage
562562
):
@@ -578,7 +578,7 @@ async for event in workflow.send_responses_streaming(responses):
578578
print(f"Workflow completed: {event.data}")
579579
```
580580

581-
If resuming from a checkpoint with pending requests that have already been responded to, you still need to call `run_stream_from_checkpoint()` to continue the workflow followed by `send_responses_streaming()` with the pre-supplied responses.
581+
If resuming from a checkpoint with pending requests that have already been responded to, you still need to call `run_stream()` to continue the workflow followed by `send_responses_streaming()` with the pre-supplied responses.
582582

583583
## Interactive Checkpoint Selection
584584

@@ -606,7 +606,7 @@ async def select_and_resume_checkpoint(workflow, storage):
606606

607607
# Resume from selected checkpoint
608608
print(f"Resuming from checkpoint: {selected.checkpoint_id}")
609-
async for event in workflow.run_stream_from_checkpoint(
609+
async for event in workflow.run_stream(
610610
selected.checkpoint_id,
611611
checkpoint_storage=storage
612612
):
@@ -662,7 +662,7 @@ async def main():
662662
latest = max(checkpoints, key=lambda cp: cp.timestamp)
663663
print(f"Resuming from: {latest.checkpoint_id}")
664664

665-
async for event in workflow.run_stream_from_checkpoint(latest.checkpoint_id):
665+
async for event in workflow.run_stream(latest.checkpoint_id):
666666
print(f"Resumed: {event}")
667667

668668
if __name__ == "__main__":

0 commit comments

Comments
 (0)