diff --git a/src/agents/voice/result.py b/src/agents/voice/result.py index c41f3684dc..bac58dc129 100644 --- a/src/agents/voice/result.py +++ b/src/agents/voice/result.py @@ -68,6 +68,12 @@ def __init__( self._completed_session = False self._stored_exception: BaseException | None = None self._tracing_span: Span[SpeechGroupSpanData] | None = None + # Count of synthetic turn_ended events whose lifecycle was already + # finalized synchronously (see `_turn_done`). The dispatcher should not + # call `_finish_turn` again for these, because doing so could + # prematurely finish a later turn's tracing span that started after + # them. + self._pending_synthetic_turn_ends: int = 0 async def _start_turn(self): if self._started_processing_turn: @@ -221,9 +227,29 @@ async def _turn_done(self): ) self._text_buffer = "" elif self._started_processing_turn: + # Turn was started (turn_started emitted) but produced no + # synthesizable text. Finish turn state synchronously so a + # follow-up `_add_text` for the next turn does not race the + # dispatcher and skip its `turn_started`. The synthesized + # `turn_ended` flows through the dispatcher to stay ordered with + # any in-flight audio segments; a counter tells the dispatcher + # this event's `_finish_turn` already ran so it doesn't clobber + # a span that belongs to a later turn started in the meantime. + self._finish_turn() + self._pending_synthetic_turn_ends += 1 local_queue = asyncio.Queue() self._ordered_tasks.append(local_queue) await local_queue.put(VoiceStreamEventLifecycle(event="turn_ended")) + self._done_processing = True + if self._dispatcher_task is None: + self._dispatcher_task = asyncio.create_task(self._dispatch_audio()) + # Wait for the dispatcher to drain this synthesized `turn_ended` + # so the next turn's `turn_started` cannot precede it on + # `self._queue`. + await asyncio.gather(*self._tasks) + while self._pending_synthetic_turn_ends > 0: + await asyncio.sleep(0) + return self._done_processing = True if self._dispatcher_task is None: self._dispatcher_task = asyncio.create_task(self._dispatch_audio()) @@ -262,7 +288,13 @@ async def _dispatch_audio(self): if isinstance(chunk, VoiceStreamEventLifecycle): local_queue.task_done() if chunk.event == "turn_ended": - self._finish_turn() + if self._pending_synthetic_turn_ends > 0: + # `_turn_done` already ran `_finish_turn` synchronously + # for this event; skip to avoid finishing a span that + # belongs to a later turn started in the meantime. + self._pending_synthetic_turn_ends -= 1 + else: + self._finish_turn() break await self._queue.put(VoiceStreamEventLifecycle(event="session_ended")) diff --git a/tests/voice/test_pipeline.py b/tests/voice/test_pipeline.py index f92a857dba..2ac1d75d8a 100644 --- a/tests/voice/test_pipeline.py +++ b/tests/voice/test_pipeline.py @@ -372,3 +372,41 @@ async def test_voicepipeline_multi_turn_on_start_exception_does_not_abort() -> N assert events[-1] == "session_ended" assert "error" not in events + + +@pytest.mark.asyncio +async def test_voicepipeline_empty_workflow_yield_emits_turn_ended() -> None: + # Workflow yields only an empty string (e.g. an LLM keepalive delta). + # `turn_started` must still be balanced by `turn_ended` even though no + # audio is synthesized. + fake_stt = FakeSTT(["first"]) + workflow = FakeWorkflow([[""]]) + fake_tts = FakeTTS() + config = VoicePipelineConfig(tts_settings=TTSModelSettings(buffer_size=1)) + pipeline = VoicePipeline( + workflow=workflow, stt_model=fake_stt, tts_model=fake_tts, config=config + ) + audio_input = AudioInput(buffer=np.zeros(2, dtype=np.int16)) + result = await pipeline.run(audio_input) + events, audio_chunks = await asyncio.wait_for(extract_events(result), timeout=2.0) + assert events == ["turn_started", "turn_ended", "session_ended"] + assert audio_chunks == [] + + +@pytest.mark.asyncio +async def test_voicepipeline_empty_turn_followed_by_real_turn_balances_lifecycle() -> None: + # An empty workflow yield followed by a non-empty turn must still + # produce `turn_started` for the second turn; otherwise the dispatcher's + # stale `turn_ended` could finish a span that belongs to the next turn. + fake_stt = FakeSTT(["first", "second"]) + workflow = FakeWorkflow([[""], ["hello"]]) + fake_tts = FakeTTS() + config = VoicePipelineConfig(tts_settings=TTSModelSettings(buffer_size=1)) + pipeline = VoicePipeline( + workflow=workflow, stt_model=fake_stt, tts_model=fake_tts, config=config + ) + streamed_audio_input = await FakeStreamedAudioInput.get(count=2) + result = await pipeline.run(streamed_audio_input) + events, _ = await asyncio.wait_for(extract_events(result), timeout=2.0) + lifecycle = [e for e in events if e in {"turn_started", "turn_ended"}] + assert lifecycle == ["turn_started", "turn_ended", "turn_started", "turn_ended"]