Skip to content

Commit a995f9f

Browse files
committed
fix: emit cancel:requested/cancel:completed events and call trigger_callbacks()
The kernel defines CANCEL_REQUESTED and CANCEL_COMPLETED event constants and CancellationToken.trigger_callbacks(), but no orchestrator emitted those events or called the callback mechanism — making them dead code. Changes: - Import CANCEL_REQUESTED and CANCEL_COMPLETED from amplifier_core.events - Add _cancel_requested_emitted instance flag (reset per execution) to ensure cancel:requested is only emitted once per turn, regardless of how many cancellation check points are hit - At all four cancellation exit paths in execute(): 1. Top-of-loop is_cancelled check 2. Post-provider is_immediate check (immediate cancel after LLM call) 3. asyncio.CancelledError block (immediate cancel during tool gather) 4. Post-tool-gather is_immediate check (graceful cancel path) Sequence at each path (first detection only for cancel:requested): 1. Emit cancel:requested with orchestrator, state, and turn_count 2. Call trigger_callbacks() to run registered cleanup callbacks (errors in callbacks are caught and logged; they don't block exit) 3. Emit cancel:completed with was_immediate flag This wires up the cancel event infrastructure so hooks can react to cancellation for cleanup (e.g. closing containers, releasing resources).
1 parent 0ef3ef1 commit a995f9f

File tree

2 files changed

+103
-227
lines changed

2 files changed

+103
-227
lines changed

amplifier_module_loop_basic/__init__.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from amplifier_core import HookRegistry
1313
from amplifier_core import HookResult
1414
from amplifier_core import ModuleCoordinator
15+
from amplifier_core.events import CANCEL_COMPLETED
16+
from amplifier_core.events import CANCEL_REQUESTED
1517
from amplifier_core.events import CONTENT_BLOCK_END
1618
from amplifier_core.events import CONTENT_BLOCK_START
1719
from amplifier_core.events import ORCHESTRATOR_COMPLETE
@@ -49,6 +51,8 @@ def __init__(self, config: dict[str, Any]) -> None:
4951
self.extended_thinking = config.get("extended_thinking", False)
5052
# Store ephemeral injections from tool:post hooks for next iteration
5153
self._pending_ephemeral_injections: list[dict[str, Any]] = []
54+
# Track whether cancel:requested has been emitted for the current execution
55+
self._cancel_requested_emitted: bool = False
5256

5357
async def execute(
5458
self,
@@ -59,6 +63,9 @@ async def execute(
5963
hooks: HookRegistry,
6064
coordinator: ModuleCoordinator | None = None,
6165
) -> str:
66+
# Reset cancellation event tracking for this execution
67+
self._cancel_requested_emitted = False
68+
6269
# Emit and process prompt submit (allows hooks to inject context on session start)
6370
result = await hooks.emit(PROMPT_SUBMIT, {"prompt": prompt})
6471
if coordinator:
@@ -89,6 +96,30 @@ async def execute(
8996
while self.max_iterations == -1 or iteration < self.max_iterations:
9097
# Check for cancellation at iteration start
9198
if coordinator and coordinator.cancellation.is_cancelled:
99+
# Emit cancel:requested on first detection and trigger cleanup callbacks
100+
if not self._cancel_requested_emitted:
101+
self._cancel_requested_emitted = True
102+
await hooks.emit(
103+
CANCEL_REQUESTED,
104+
{
105+
"orchestrator": "loop-basic",
106+
"state": coordinator.cancellation.state.value,
107+
"turn_count": iteration,
108+
},
109+
)
110+
try:
111+
await coordinator.cancellation.trigger_callbacks()
112+
except Exception as e:
113+
logger.warning(f"Error in cancellation callbacks: {e}")
114+
# Emit cancel:completed — orchestrator is exiting due to cancellation
115+
await hooks.emit(
116+
CANCEL_COMPLETED,
117+
{
118+
"orchestrator": "loop-basic",
119+
"was_immediate": coordinator.cancellation.is_immediate,
120+
"turn_count": iteration,
121+
},
122+
)
92123
# Emit orchestrator complete with cancelled status
93124
await hooks.emit(
94125
ORCHESTRATOR_COMPLETE,
@@ -247,6 +278,30 @@ async def execute(
247278
# This allows force-cancel to take effect as soon as the blocking
248279
# provider call completes, before processing the response
249280
if coordinator and coordinator.cancellation.is_immediate:
281+
# Emit cancel:requested on first detection and trigger cleanup callbacks
282+
if not self._cancel_requested_emitted:
283+
self._cancel_requested_emitted = True
284+
await hooks.emit(
285+
CANCEL_REQUESTED,
286+
{
287+
"orchestrator": "loop-basic",
288+
"state": coordinator.cancellation.state.value,
289+
"turn_count": iteration,
290+
},
291+
)
292+
try:
293+
await coordinator.cancellation.trigger_callbacks()
294+
except Exception as e:
295+
logger.warning(f"Error in cancellation callbacks: {e}")
296+
# Emit cancel:completed — orchestrator is exiting due to cancellation
297+
await hooks.emit(
298+
CANCEL_COMPLETED,
299+
{
300+
"orchestrator": "loop-basic",
301+
"was_immediate": coordinator.cancellation.is_immediate,
302+
"turn_count": iteration,
303+
},
304+
)
250305
# Emit cancelled status and exit
251306
await hooks.emit(
252307
ORCHESTRATOR_COMPLETE,
@@ -537,6 +592,30 @@ async def execute_single_tool(
537592
"content": f'{{"error": "Tool execution was cancelled by user", "cancelled": true, "tool": "{tc.name}"}}',
538593
}
539594
)
595+
# Emit cancel events before re-raising so hooks receive them
596+
if coordinator and not self._cancel_requested_emitted:
597+
self._cancel_requested_emitted = True
598+
await hooks.emit(
599+
CANCEL_REQUESTED,
600+
{
601+
"orchestrator": "loop-basic",
602+
"state": coordinator.cancellation.state.value,
603+
"turn_count": iteration,
604+
},
605+
)
606+
try:
607+
await coordinator.cancellation.trigger_callbacks()
608+
except Exception as e:
609+
logger.warning(f"Error in cancellation callbacks: {e}")
610+
if coordinator:
611+
await hooks.emit(
612+
CANCEL_COMPLETED,
613+
{
614+
"orchestrator": "loop-basic",
615+
"was_immediate": coordinator.cancellation.is_immediate,
616+
"turn_count": iteration,
617+
},
618+
)
540619
# Re-raise to let the cancellation propagate
541620
raise
542621

@@ -554,6 +633,30 @@ async def execute_single_tool(
554633
"content": content,
555634
}
556635
)
636+
# Emit cancel:requested on first detection and trigger cleanup callbacks
637+
if not self._cancel_requested_emitted:
638+
self._cancel_requested_emitted = True
639+
await hooks.emit(
640+
CANCEL_REQUESTED,
641+
{
642+
"orchestrator": "loop-basic",
643+
"state": coordinator.cancellation.state.value,
644+
"turn_count": iteration,
645+
},
646+
)
647+
try:
648+
await coordinator.cancellation.trigger_callbacks()
649+
except Exception as e:
650+
logger.warning(f"Error in cancellation callbacks: {e}")
651+
# Emit cancel:completed — orchestrator is exiting due to cancellation
652+
await hooks.emit(
653+
CANCEL_COMPLETED,
654+
{
655+
"orchestrator": "loop-basic",
656+
"was_immediate": coordinator.cancellation.is_immediate,
657+
"turn_count": iteration,
658+
},
659+
)
557660
await hooks.emit(
558661
ORCHESTRATOR_COMPLETE,
559662
{

tests/test_hook_modify.py

Lines changed: 0 additions & 227 deletions
This file was deleted.

0 commit comments

Comments
 (0)