From 958f7f802615ce93bbcc1a8b4cfa88ea4ddd39cb Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Wed, 3 Jun 2026 15:31:23 +0200 Subject: [PATCH 1/2] feat(python): add agent-framework-hosting-a2a channel Add a hosting channel that exposes the host target (agent or workflow) as a peer agent over the Agent-to-Agent (A2A) protocol (JSON-RPC plus a served agent card). Requests are handled by a host-routed HostAgentExecutor that drives the host pipeline (ChannelContext.run/ run_stream) instead of wrapping the target directly, so sessions, linking, and run/response hooks apply. Maps the A2A conversation/context id to a ChannelSession isolation key and the caller to a ChannelIdentity; streaming emits incremental task artifacts. Includes tests, README, and workspace registration. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/hosting-a2a/LICENSE | 21 ++ python/packages/hosting-a2a/README.md | 35 +++ .../agent_framework_hosting_a2a/__init__.py | 24 ++ .../agent_framework_hosting_a2a/_channel.py | 142 ++++++++++ .../agent_framework_hosting_a2a/_executor.py | 195 ++++++++++++++ python/packages/hosting-a2a/pyproject.toml | 102 +++++++ .../tests/hosting_a2a/test_channel.py | 251 ++++++++++++++++++ python/pyproject.toml | 2 + python/uv.lock | 23 ++ 9 files changed, 795 insertions(+) create mode 100644 python/packages/hosting-a2a/LICENSE create mode 100644 python/packages/hosting-a2a/README.md create mode 100644 python/packages/hosting-a2a/agent_framework_hosting_a2a/__init__.py create mode 100644 python/packages/hosting-a2a/agent_framework_hosting_a2a/_channel.py create mode 100644 python/packages/hosting-a2a/agent_framework_hosting_a2a/_executor.py create mode 100644 python/packages/hosting-a2a/pyproject.toml create mode 100644 python/packages/hosting-a2a/tests/hosting_a2a/test_channel.py diff --git a/python/packages/hosting-a2a/LICENSE b/python/packages/hosting-a2a/LICENSE new file mode 100644 index 00000000000..9e841e7a26e --- /dev/null +++ b/python/packages/hosting-a2a/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/hosting-a2a/README.md b/python/packages/hosting-a2a/README.md new file mode 100644 index 00000000000..f77626cc23d --- /dev/null +++ b/python/packages/hosting-a2a/README.md @@ -0,0 +1,35 @@ +# agent-framework-hosting-a2a + +Agent-to-Agent (A2A) protocol channel for `agent-framework-hosting`. + +Exposes the hosted target (an `Agent` or a `Workflow`) as an A2A peer agent: it +publishes an agent card and JSON-RPC routes and drives every request through the +host pipeline, so host sessions, request metadata, and run/response hooks all +apply. + +```python +from agent_framework.openai import OpenAIChatClient +from agent_framework_hosting import AgentFrameworkHost +from agent_framework_hosting_a2a import A2AChannel + +agent = OpenAIChatClient().as_agent(name="Assistant") + +host = AgentFrameworkHost( + target=agent, + channels=[A2AChannel(url="https://my-host.example.com/")], +) +host.serve(port=8000) +``` + +By default the channel mounts at the app root so the well-known agent card is +reachable at `/.well-known/agent-card.json`, with the JSON-RPC endpoint at `/`. +The A2A `context_id` maps onto the host session (caller-supplied session family). +A default agent card is derived from the target's name and description; pass a +fully-specified `agent_card` to override it. + +> **Note:** Task state is held in an in-memory A2A task store for this version; it +> is independent of the host's session storage and is not persisted across +> restarts. + +The base host plumbing lives in +[`agent-framework-hosting`](https://pypi.org/project/agent-framework-hosting/). diff --git a/python/packages/hosting-a2a/agent_framework_hosting_a2a/__init__.py b/python/packages/hosting-a2a/agent_framework_hosting_a2a/__init__.py new file mode 100644 index 00000000000..c2cfab8cad5 --- /dev/null +++ b/python/packages/hosting-a2a/agent_framework_hosting_a2a/__init__.py @@ -0,0 +1,24 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""A2A (Agent-to-Agent) channel for :mod:`agent_framework_hosting`. + +Exposes the hosted target (an ``Agent`` or a ``Workflow``) as an A2A peer agent +— publishing an agent card and JSON-RPC routes — while routing every request +through the host pipeline so sessions, request metadata, and hooks apply. +""" + +import importlib.metadata + +from ._channel import A2AChannel +from ._executor import HostAgentExecutor + +try: + __version__ = importlib.metadata.version(__name__) +except importlib.metadata.PackageNotFoundError: + __version__ = "0.0.0" + +__all__ = [ + "A2AChannel", + "HostAgentExecutor", + "__version__", +] diff --git a/python/packages/hosting-a2a/agent_framework_hosting_a2a/_channel.py b/python/packages/hosting-a2a/agent_framework_hosting_a2a/_channel.py new file mode 100644 index 00000000000..71d9a74b1df --- /dev/null +++ b/python/packages/hosting-a2a/agent_framework_hosting_a2a/_channel.py @@ -0,0 +1,142 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""A2A (Agent-to-Agent) channel for :mod:`agent_framework_hosting`. + +Exposes the hosted target as an A2A peer agent: it publishes an agent card and +JSON-RPC routes, and drives every request through the host pipeline via +:class:`HostAgentExecutor`. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import AgentCapabilities, AgentCard, AgentInterface, AgentSkill +from agent_framework_hosting import ( + ChannelContext, + ChannelContribution, + ChannelResponseHook, + ChannelRunHook, +) + +from ._executor import HostAgentExecutor + +try: + from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes +except ImportError as exc: # pragma: no cover - guards against incompatible a2a-sdk layout + raise ImportError( + "agent-framework-hosting-a2a requires a2a-sdk route helpers (create_agent_card_routes, create_jsonrpc_routes)." + ) from exc + + +class A2AChannel: + """Channel that exposes the hosted target over the A2A protocol. + + The A2A ``context_id`` maps onto the host session (caller-supplied session + family) and each request is routed through :class:`ChannelContext`, so host + session resolution and hooks apply. + + Note: + Task state is held in an in-memory A2A task store for this version; it + is independent of the host's session storage and is not persisted. + """ + + name: str = "a2a" + + def __init__( + self, + *, + name: str | None = None, + path: str = "", + url: str = "/", + agent_name: str | None = None, + agent_description: str | None = None, + agent_version: str = "1.0.0", + agent_card: AgentCard | None = None, + skills: Sequence[AgentSkill] | None = None, + streaming: bool = True, + rpc_url: str = "/", + card_url: str = "/.well-known/agent-card.json", + run_hook: ChannelRunHook | None = None, + response_hook: ChannelResponseHook | None = None, + ) -> None: + """Configure the A2A channel. + + Keyword Args: + name: Override the channel name (defaults to ``"a2a"``). + path: Sub-path to mount the channel under; empty string (default) + mounts the agent-card and JSON-RPC routes at the app root so + the well-known card path is reachable. + url: Public URL advertised in the agent card's interface (the base + URL clients use to reach the JSON-RPC endpoint). + agent_name: Name advertised in the default agent card. Defaults to + the hosted target's name. + agent_description: Description advertised in the default agent card. + Defaults to the hosted target's description. + agent_version: Version advertised in the default agent card. + agent_card: A fully-specified agent card; when provided it takes + precedence over the ``agent_*``/``url``/``skills`` fields. + skills: Skills advertised in the default agent card. + streaming: Consume the target via streaming and publish incremental + A2A task artifacts (default ``True``). + rpc_url: Path for the JSON-RPC endpoint (relative to ``path``). + card_url: Path for the agent-card endpoint (relative to ``path``). + run_hook: Optional run hook applied to each request. + response_hook: Optional response hook applied to originating replies. + """ + if name is not None: + self.name = name + self.path = path + self._url = url + self._agent_name = agent_name + self._agent_description = agent_description + self._agent_version = agent_version + self._agent_card = agent_card + self._skills = list(skills) if skills is not None else [] + self._streaming = streaming + self._rpc_url = rpc_url + self._card_url = card_url + self._run_hook = run_hook + self._response_hook = response_hook + + def _build_agent_card(self, context: ChannelContext) -> AgentCard: + """Derive a default agent card from the hosted target, if not supplied.""" + if self._agent_card is not None: + return self._agent_card + target: Any = context.target + name = self._agent_name or getattr(target, "name", None) or self.name + description = self._agent_description or getattr(target, "description", None) or f"{name} (A2A)" + return AgentCard( + name=name, + description=description, + version=self._agent_version, + default_input_modes=["text"], + default_output_modes=["text"], + capabilities=AgentCapabilities(streaming=self._streaming), + supported_interfaces=[AgentInterface(url=self._url, protocol_binding="JSONRPC")], + skills=self._skills, + ) + + def contribute(self, context: ChannelContext) -> ChannelContribution: + """Build the A2A request handler and contribute its routes.""" + agent_card = self._build_agent_card(context) + executor = HostAgentExecutor( + context, + channel_name=self.name, + streaming=self._streaming, + run_hook=self._run_hook, + response_hook=self._response_hook, + ) + handler = DefaultRequestHandler( + agent_executor=executor, + task_store=InMemoryTaskStore(), + agent_card=agent_card, + ) + routes = [ + *create_agent_card_routes(agent_card, card_url=self._card_url), + *create_jsonrpc_routes(handler, self._rpc_url), + ] + return ChannelContribution(routes=routes) diff --git a/python/packages/hosting-a2a/agent_framework_hosting_a2a/_executor.py b/python/packages/hosting-a2a/agent_framework_hosting_a2a/_executor.py new file mode 100644 index 00000000000..495209442e1 --- /dev/null +++ b/python/packages/hosting-a2a/agent_framework_hosting_a2a/_executor.py @@ -0,0 +1,195 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Host-routed A2A :class:`AgentExecutor`. + +Unlike ``agent_framework_a2a.A2AExecutor`` (which calls ``agent.run`` directly +and manages its own session), :class:`HostAgentExecutor` routes every incoming +A2A request through the host pipeline via :class:`ChannelContext` — so host +session resolution, request metadata, and run/response hooks all apply. The A2A +``context_id`` maps onto :class:`ChannelSession` (caller-supplied session +family). +""" + +from __future__ import annotations + +import base64 +import re +from asyncio import CancelledError +from typing import Any, cast + +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue +from a2a.server.tasks import TaskUpdater +from a2a.types import Part, Task, TaskState +from agent_framework import Content +from agent_framework_hosting import ( + ChannelContext, + ChannelIdentity, + ChannelRequest, + ChannelResponseHook, + ChannelRunHook, + ChannelSession, + logger, +) + +try: + from a2a.helpers import new_task_from_user_message +except ImportError: # pragma: no cover - older a2a-sdk layout + from a2a.utils import new_task_from_user_message # type: ignore[no-redef, attr-defined, import-not-found] + +_DATA_URI_PATTERN = re.compile(r"^data:(?P[^;]+);base64,(?P[A-Za-z0-9+/=]+)$") + + +def _contents_to_parts(contents: list[Content]) -> list[Part]: + """Convert Agent Framework contents into A2A parts (text, uri, inline data).""" + parts: list[Part] = [] + for content in contents: + if content.type == "text" and content.text: + parts.append(Part(text=content.text)) + elif content.type == "uri" and content.uri: + parts.append(Part(url=content.uri, media_type=content.media_type or "")) + elif content.type == "data" and content.uri: + match = _DATA_URI_PATTERN.match(content.uri) + if match is None: + logger.warning("A2AChannel could not parse data URI; omitted.") + continue + parts.append(Part(raw=base64.b64decode(match.group("data")), media_type=content.media_type or "")) + else: + logger.warning("A2AChannel does not support content type: %s. Omitted.", content.type) + return parts + + +class HostAgentExecutor(AgentExecutor): + """A2A executor that drives the hosted target through :class:`ChannelContext`.""" + + def __init__( + self, + context: ChannelContext, + *, + channel_name: str, + streaming: bool = True, + run_hook: ChannelRunHook | None = None, + response_hook: ChannelResponseHook | None = None, + ) -> None: + """Bind the executor to the host context. + + Args: + context: The host-supplied :class:`ChannelContext`. + + Keyword Args: + channel_name: The owning channel's name (stamped on requests). + streaming: When ``True`` (default) the target is consumed via + :meth:`ChannelContext.run_stream` and incremental updates are + published as A2A task artifacts; otherwise the full reply is + published as a single working-state message. + run_hook: Optional :data:`ChannelRunHook` applied to the request. + response_hook: Optional :data:`ChannelResponseHook` applied to the + originating final response. + """ + super().__init__() + self._ctx = context + self._channel_name = channel_name + self._streaming = streaming + self._run_hook = run_hook + self._response_hook = response_hook + + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + """Publish a cancellation event for the in-flight task.""" + if context.context_id is None: + raise ValueError("Context ID must be provided in the RequestContext") + updater = TaskUpdater(event_queue, context.task_id or "", context.context_id) + await updater.cancel() + + async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + """Route an A2A request through the host and publish task events.""" + if context.context_id is None: + raise ValueError("Context ID must be provided in the RequestContext") + if context.message is None: + raise ValueError("Message must be provided in the RequestContext") + + query = context.get_user_input() + task: Task | None = context.current_task + if not task: + task = cast(Task, new_task_from_user_message(context.message)) # type: ignore[redundant-cast] + await event_queue.enqueue_event(task) + + task_id: str = task.id + updater = TaskUpdater(event_queue, task_id, context.context_id) + await updater.submit() + + try: + await updater.start_work() + request = self._build_request(query, context, task_id) + if request.stream: + await self._run_stream(request, updater, protocol_request=context.message) + else: + await self._run(request, updater, protocol_request=context.message) + await updater.complete() + except CancelledError: + await updater.update_status(state=TaskState.TASK_STATE_CANCELED) + except Exception as exc: + logger.exception("A2AChannel encountered an error during execution.") + await updater.update_status( + state=TaskState.TASK_STATE_FAILED, + message=updater.new_agent_message([Part(text=str(exc))]), + ) + + def _build_request(self, query: Any, context: RequestContext, task_id: str) -> ChannelRequest: + """Build the channel-neutral request from the A2A request context.""" + context_id = cast(str, context.context_id) + return ChannelRequest( + channel=self._channel_name, + operation="message.create", + input=query if isinstance(query, str) else str(query), + session=ChannelSession(isolation_key=context_id), + stream=self._streaming, + identity=ChannelIdentity(channel=self._channel_name, native_id=context_id), + attributes={"task_id": task_id}, + ) + + async def _run(self, request: ChannelRequest, updater: TaskUpdater, *, protocol_request: Any) -> None: + """Non-streaming: run the target and publish the reply as task messages.""" + result = await self._ctx.run( + request, + run_hook=self._run_hook, + protocol_request=protocol_request, + response_hook=self._response_hook, + channel_name=self._channel_name, + ) + response: Any = result.result + messages: list[Any] = list(getattr(response, "messages", None) or []) + for message in messages: + if getattr(message, "role", None) == "user": + continue + contents: list[Content] = list(getattr(message, "contents", None) or []) + parts = _contents_to_parts(contents) + if parts: + await updater.update_status( + state=TaskState.TASK_STATE_WORKING, + message=updater.new_agent_message(parts=parts), + ) + + async def _run_stream(self, request: ChannelRequest, updater: TaskUpdater, *, protocol_request: Any) -> None: + """Streaming: publish incremental updates as task artifacts.""" + streamed_ids: set[str] = set() + stream = await self._ctx.run_stream( + request, + run_hook=self._run_hook, + protocol_request=protocol_request, + response_hook=self._response_hook, + channel_name=self._channel_name, + ) + async for update in stream: + contents: list[Content] = list(getattr(update, "contents", None) or []) + parts = _contents_to_parts(contents) + if not parts: + continue + message_id: str | None = getattr(update, "message_id", None) + await updater.add_artifact( + parts=parts, + artifact_id=message_id, + append=True if message_id is not None and message_id in streamed_ids else None, + ) + if message_id is not None: + streamed_ids.add(message_id) + await stream.get_final_response() diff --git a/python/packages/hosting-a2a/pyproject.toml b/python/packages/hosting-a2a/pyproject.toml new file mode 100644 index 00000000000..ee72982537a --- /dev/null +++ b/python/packages/hosting-a2a/pyproject.toml @@ -0,0 +1,102 @@ +[project] +name = "agent-framework-hosting-a2a" +description = "Agent-to-Agent (A2A) protocol channel for agent-framework-hosting." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0a260424" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + "agent-framework-hosting>=1.0.0a260424,<2", + "a2a-sdk>=1.0.0,<2", + "starlette>=0.37", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_hosting_a2a"] +exclude = ['tests'] + +[tool.mypy] +plugins = ['pydantic.mypy'] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_hosting_a2a"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.mypy] +help = "Run MyPy for this package." +cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting_a2a" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_hosting_a2a --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" + +[dependency-groups] +dev = [] diff --git a/python/packages/hosting-a2a/tests/hosting_a2a/test_channel.py b/python/packages/hosting-a2a/tests/hosting_a2a/test_channel.py new file mode 100644 index 00000000000..7cac15d0d81 --- /dev/null +++ b/python/packages/hosting-a2a/tests/hosting_a2a/test_channel.py @@ -0,0 +1,251 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Unit tests for :class:`A2AChannel` and :class:`HostAgentExecutor`.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator, Awaitable +from dataclasses import dataclass, field +from typing import Any + +import pytest +from a2a.server.events import EventQueue +from a2a.types import AgentCard, Message, Part, Role, Task, TaskState +from agent_framework import Content +from agent_framework_hosting import ChannelContribution, ChannelRequest, HostedRunResult + +from agent_framework_hosting_a2a import A2AChannel, HostAgentExecutor + +# --------------------------------------------------------------------------- # +# Fakes # +# --------------------------------------------------------------------------- # + + +@dataclass +class _FakeResp: + text: str + messages: list[Message] = field(default_factory=list) + + +@dataclass +class _FakeUpdate: + text: str + contents: list[Content] = field(default_factory=list) + message_id: str | None = None + + +class _FakeStream: + def __init__(self, chunks: list[str]) -> None: + self._chunks = chunks + self._final = _FakeResp(text="".join(chunks)) + + def __aiter__(self) -> AsyncIterator[_FakeUpdate]: + async def _gen() -> AsyncIterator[_FakeUpdate]: + for i, c in enumerate(self._chunks): + yield _FakeUpdate(text=c, contents=[Content.from_text(text=c)], message_id=f"m{i}") + + return _gen() + + async def get_final_response(self) -> _FakeResp: + return self._final + + +@dataclass +class _FakeTarget: + name: str = "Assistant" + description: str = "A helpful assistant." + + +class _FakeContext: + def __init__( + self, + *, + reply: str = "hello", + chunks: list[str] | None = None, + ) -> None: + self.target = _FakeTarget() + self._reply = reply + self._chunks = chunks or [reply] + self.requests: list[ChannelRequest] = [] + + async def run( + self, + request: ChannelRequest, + *, + run_hook: Any | None = None, + protocol_request: Any | None = None, + response_hook: Any | None = None, + channel_name: str | None = None, + ) -> HostedRunResult[Any]: + if run_hook is not None: + maybe_request = run_hook(request, target=self.target, protocol_request=protocol_request) + if isinstance(maybe_request, Awaitable): + request = await maybe_request + else: + request = maybe_request + self.requests.append(request) + msg = Message(role=Role.ROLE_AGENT, parts=[Part(text=self._reply)]) + result = HostedRunResult(_FakeResp(text=self._reply, messages=[msg])) + if response_hook is not None: + maybe_result = response_hook(result, request=request, channel_name=channel_name or request.channel) + if isinstance(maybe_result, Awaitable): + return await maybe_result + return maybe_result + return result + + async def run_stream( + self, + request: ChannelRequest, + *, + run_hook: Any | None = None, + protocol_request: Any | None = None, + stream_update_hook: Any | None = None, + response_hook: Any | None = None, + channel_name: str | None = None, + ) -> _FakeStream: + if run_hook is not None: + maybe_request = run_hook(request, target=self.target, protocol_request=protocol_request) + if isinstance(maybe_request, Awaitable): + request = await maybe_request + else: + request = maybe_request + self.requests.append(request) + return _FakeStream(self._chunks) + + +class _RecordingEventQueue(EventQueue): + def __init__(self) -> None: + super().__init__() + self.events: list[Any] = [] + + async def enqueue_event(self, event: Any) -> None: + self.events.append(event) + await super().enqueue_event(event) + + +class _FakeRequestContext: + def __init__(self, *, context_id: str, text: str, current_task: Task | None = None) -> None: + self.context_id = context_id + self.task_id: str | None = None + self.message = Message( + message_id="msg-1", + context_id=context_id, + role=Role.ROLE_USER, + parts=[Part(text=text)], + ) + self.current_task = current_task + self._text = text + + def get_user_input(self) -> str: + return self._text + + +def _status_states(events: list[Any]) -> list[int]: + states: list[int] = [] + for event in events: + status = getattr(event, "status", None) + if status is not None and getattr(status, "state", None): + states.append(status.state) + return states + + +# --------------------------------------------------------------------------- # +# A2AChannel tests # +# --------------------------------------------------------------------------- # + + +def test_default_name_and_root_path() -> None: + channel = A2AChannel() + assert channel.name == "a2a" + assert channel.path == "" + + +def test_build_agent_card_defaults_from_target() -> None: + channel = A2AChannel(url="https://example.com/") + card = channel._build_agent_card(_FakeContext()) # type: ignore[arg-type] + assert card.name == "Assistant" + assert card.description == "A helpful assistant." + assert card.capabilities.streaming is True + assert card.supported_interfaces[0].url == "https://example.com/" + + +def test_build_agent_card_override_wins() -> None: + custom = AgentCard(name="Custom", description="custom card", version="9.9.9") + channel = A2AChannel(agent_card=custom) + card = channel._build_agent_card(_FakeContext()) # type: ignore[arg-type] + assert card.name == "Custom" + assert card.version == "9.9.9" + + +def test_contribute_returns_card_and_jsonrpc_routes() -> None: + channel = A2AChannel(url="https://example.com/") + contribution = channel.contribute(_FakeContext()) # type: ignore[arg-type] + assert isinstance(contribution, ChannelContribution) + paths = {getattr(r, "path", None) for r in contribution.routes} + assert "/.well-known/agent-card.json" in paths + assert any(p == "/" for p in paths) + + +# --------------------------------------------------------------------------- # +# HostAgentExecutor tests # +# --------------------------------------------------------------------------- # + + +async def test_execute_routes_through_host_and_completes() -> None: + ctx = _FakeContext(reply="hi back") + executor = HostAgentExecutor(ctx, channel_name="a2a", streaming=False) # type: ignore[arg-type] + queue = _RecordingEventQueue() + request_context = _FakeRequestContext(context_id="conv-1", text="hello") + + await executor.execute(request_context, queue) # type: ignore[arg-type] + + # Routed through the host with the context id mapped onto the session. + assert len(ctx.requests) == 1 + request = ctx.requests[0] + assert request.channel == "a2a" + assert request.input == "hello" + assert request.session is not None + assert request.session.isolation_key == "conv-1" + assert request.identity is not None + assert request.identity.native_id == "conv-1" + # Task progressed to a completed state. + assert TaskState.TASK_STATE_COMPLETED in _status_states(queue.events) + + +async def test_execute_streaming_emits_artifacts() -> None: + ctx = _FakeContext(chunks=["foo", "bar"]) + executor = HostAgentExecutor(ctx, channel_name="a2a", streaming=True) # type: ignore[arg-type] + queue = _RecordingEventQueue() + request_context = _FakeRequestContext(context_id="conv-2", text="hello") + + await executor.execute(request_context, queue) # type: ignore[arg-type] + + artifact_events = [e for e in queue.events if getattr(e, "artifact", None)] + assert artifact_events, "expected at least one artifact update event" + assert ctx.requests[0].stream is True + assert TaskState.TASK_STATE_COMPLETED in _status_states(queue.events) + + +async def test_execute_requires_context_id() -> None: + ctx = _FakeContext() + executor = HostAgentExecutor(ctx, channel_name="a2a") # type: ignore[arg-type] + queue = _RecordingEventQueue() + request_context = _FakeRequestContext(context_id="x", text="hello") + request_context.context_id = None # type: ignore[assignment] + + with pytest.raises(ValueError, match="Context ID"): + await executor.execute(request_context, queue) # type: ignore[arg-type] + + +def test_contents_to_parts_conversion() -> None: + from agent_framework_hosting_a2a._executor import _contents_to_parts + + contents = [ + Content.from_text(text="hello"), + Content.from_uri(uri="https://x/y.png", media_type="image/png"), + Content.from_data(data=b"AAAA", media_type="image/png"), + ] + parts = _contents_to_parts(contents) + assert parts[0].text == "hello" + assert parts[1].url == "https://x/y.png" + assert parts[2].raw == b"AAAA" diff --git a/python/pyproject.toml b/python/pyproject.toml index 4804fff87c0..ea9bcd3dd07 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -90,6 +90,7 @@ agent-framework-hosting-invocations = { workspace = true } agent-framework-hosting-telegram = { workspace = true } agent-framework-hosting-activity-protocol = { workspace = true } agent-framework-hosting-discord = { workspace = true } +agent-framework-hosting-a2a = { workspace = true } agent-framework-hyperlight = { workspace = true } agent-framework-lab = { workspace = true } agent-framework-mem0 = { workspace = true } @@ -216,6 +217,7 @@ executionEnvironments = [ { root = "packages/hosting-invocations/tests", reportPrivateUsage = "none" }, { root = "packages/hosting-telegram/tests", reportPrivateUsage = "none" }, { root = "packages/hosting-activity-protocol/tests", reportPrivateUsage = "none" }, + { root = "packages/hosting-a2a/tests", reportPrivateUsage = "none" }, { root = "packages/lab/gaia/tests", reportPrivateUsage = "none" }, { root = "packages/lab/lightning/tests", reportPrivateUsage = "none" }, { root = "packages/lab/tau2/tests", reportPrivateUsage = "none" }, diff --git a/python/uv.lock b/python/uv.lock index 3040c3ac75f..0a0becc285a 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -48,6 +48,7 @@ members = [ "agent-framework-gemini", "agent-framework-github-copilot", "agent-framework-hosting", + "agent-framework-hosting-a2a", "agent-framework-hosting-activity-protocol", "agent-framework-hosting-discord", "agent-framework-hosting-invocations", @@ -647,6 +648,28 @@ provides-extras = ["serve", "disk"] [package.metadata.requires-dev] dev = [{ name = "httpx", specifier = ">=0.28.1" }] +[[package]] +name = "agent-framework-hosting-a2a" +version = "1.0.0a260424" +source = { editable = "packages/hosting-a2a" } +dependencies = [ + { name = "a2a-sdk", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "agent-framework-hosting", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "starlette", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.metadata] +requires-dist = [ + { name = "a2a-sdk", specifier = ">=1.0.0,<2" }, + { name = "agent-framework-core", editable = "packages/core" }, + { name = "agent-framework-hosting", editable = "packages/hosting" }, + { name = "starlette", specifier = ">=0.37" }, +] + +[package.metadata.requires-dev] +dev = [] + [[package]] name = "agent-framework-hosting-activity-protocol" version = "1.0.0a260424" From 52816030a0d3744c3a4487fca5713e11fd9669b4 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Fri, 12 Jun 2026 09:26:14 +0200 Subject: [PATCH 2/2] Address A2A hosting channel review feedback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/hosting-a2a/README.md | 3 +- .../agent_framework_hosting_a2a/_channel.py | 15 ++--- .../tests/hosting_a2a/test_channel.py | 64 ++++++++++++++++++- 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/python/packages/hosting-a2a/README.md b/python/packages/hosting-a2a/README.md index f77626cc23d..4beab43952e 100644 --- a/python/packages/hosting-a2a/README.md +++ b/python/packages/hosting-a2a/README.md @@ -25,7 +25,8 @@ By default the channel mounts at the app root so the well-known agent card is reachable at `/.well-known/agent-card.json`, with the JSON-RPC endpoint at `/`. The A2A `context_id` maps onto the host session (caller-supplied session family). A default agent card is derived from the target's name and description; pass a -fully-specified `agent_card` to override it. +fully-specified `agent_card` to override it. To advertise additional protocol +bindings in the generated card, pass `supported_interfaces`. > **Note:** Task state is held in an in-memory A2A task store for this version; it > is independent of the host's session storage and is not persisted across diff --git a/python/packages/hosting-a2a/agent_framework_hosting_a2a/_channel.py b/python/packages/hosting-a2a/agent_framework_hosting_a2a/_channel.py index 71d9a74b1df..585725ac636 100644 --- a/python/packages/hosting-a2a/agent_framework_hosting_a2a/_channel.py +++ b/python/packages/hosting-a2a/agent_framework_hosting_a2a/_channel.py @@ -13,6 +13,7 @@ from typing import Any from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes from a2a.server.tasks import InMemoryTaskStore from a2a.types import AgentCapabilities, AgentCard, AgentInterface, AgentSkill from agent_framework_hosting import ( @@ -24,13 +25,6 @@ from ._executor import HostAgentExecutor -try: - from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes -except ImportError as exc: # pragma: no cover - guards against incompatible a2a-sdk layout - raise ImportError( - "agent-framework-hosting-a2a requires a2a-sdk route helpers (create_agent_card_routes, create_jsonrpc_routes)." - ) from exc - class A2AChannel: """Channel that exposes the hosted target over the A2A protocol. @@ -57,6 +51,7 @@ def __init__( agent_version: str = "1.0.0", agent_card: AgentCard | None = None, skills: Sequence[AgentSkill] | None = None, + supported_interfaces: Sequence[AgentInterface] | None = None, streaming: bool = True, rpc_url: str = "/", card_url: str = "/.well-known/agent-card.json", @@ -80,6 +75,8 @@ def __init__( agent_card: A fully-specified agent card; when provided it takes precedence over the ``agent_*``/``url``/``skills`` fields. skills: Skills advertised in the default agent card. + supported_interfaces: Interfaces advertised in the default agent card. + Defaults to one JSON-RPC interface using ``url``. streaming: Consume the target via streaming and publish incremental A2A task artifacts (default ``True``). rpc_url: Path for the JSON-RPC endpoint (relative to ``path``). @@ -96,6 +93,7 @@ def __init__( self._agent_version = agent_version self._agent_card = agent_card self._skills = list(skills) if skills is not None else [] + self._supported_interfaces = list(supported_interfaces) if supported_interfaces is not None else None self._streaming = streaming self._rpc_url = rpc_url self._card_url = card_url @@ -116,7 +114,8 @@ def _build_agent_card(self, context: ChannelContext) -> AgentCard: default_input_modes=["text"], default_output_modes=["text"], capabilities=AgentCapabilities(streaming=self._streaming), - supported_interfaces=[AgentInterface(url=self._url, protocol_binding="JSONRPC")], + supported_interfaces=self._supported_interfaces + or [AgentInterface(url=self._url, protocol_binding="JSONRPC")], skills=self._skills, ) diff --git a/python/packages/hosting-a2a/tests/hosting_a2a/test_channel.py b/python/packages/hosting-a2a/tests/hosting_a2a/test_channel.py index 7cac15d0d81..467c67d431d 100644 --- a/python/packages/hosting-a2a/tests/hosting_a2a/test_channel.py +++ b/python/packages/hosting-a2a/tests/hosting_a2a/test_channel.py @@ -4,15 +4,21 @@ from __future__ import annotations +import asyncio from collections.abc import AsyncIterator, Awaitable +from contextlib import asynccontextmanager from dataclasses import dataclass, field from typing import Any import pytest +import uvicorn from a2a.server.events import EventQueue -from a2a.types import AgentCard, Message, Part, Role, Task, TaskState -from agent_framework import Content -from agent_framework_hosting import ChannelContribution, ChannelRequest, HostedRunResult +from a2a.types import AgentCard, AgentInterface, Message, Part, Role, Task, TaskState +from agent_framework import AgentResponse, Content +from agent_framework import Message as AFMessage +from agent_framework_a2a import A2AAgent +from agent_framework_hosting import AgentFrameworkHost, ChannelContribution, ChannelRequest, HostedRunResult +from starlette.types import ASGIApp from agent_framework_hosting_a2a import A2AChannel, HostAgentExecutor @@ -140,6 +146,33 @@ def get_user_input(self) -> str: return self._text +class _HostedAgent: + name = "HostedAssistant" + description = "A hosted test assistant." + + async def run(self, messages: Any = None, *, stream: bool = False, **_kwargs: Any) -> AgentResponse[Any]: + text = messages.text if isinstance(messages, AFMessage) else str(messages) + return AgentResponse(messages=[AFMessage(role="assistant", contents=[Content.from_text(text=f"host: {text}")])]) + + +@asynccontextmanager +async def _serve_app(app: ASGIApp, *, port: int) -> AsyncIterator[str]: + config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning", lifespan="on") + server = uvicorn.Server(config) + task = asyncio.create_task(server.serve()) + try: + for _ in range(100): + if server.started: + break + await asyncio.sleep(0.01) + else: + raise RuntimeError("Test A2A server did not start") + yield f"http://127.0.0.1:{port}" + finally: + server.should_exit = True + await task + + def _status_states(events: list[Any]) -> list[int]: states: list[int] = [] for event in events: @@ -169,6 +202,16 @@ def test_build_agent_card_defaults_from_target() -> None: assert card.supported_interfaces[0].url == "https://example.com/" +def test_build_agent_card_accepts_supported_interfaces() -> None: + interfaces = [ + AgentInterface(url="https://example.com/jsonrpc", protocol_binding="JSONRPC"), + AgentInterface(url="https://example.com/grpc", protocol_binding="GRPC"), + ] + channel = A2AChannel(supported_interfaces=interfaces) + card = channel._build_agent_card(_FakeContext()) # type: ignore[arg-type] + assert card.supported_interfaces == interfaces + + def test_build_agent_card_override_wins() -> None: custom = AgentCard(name="Custom", description="custom card", version="9.9.9") channel = A2AChannel(agent_card=custom) @@ -237,6 +280,21 @@ async def test_execute_requires_context_id() -> None: await executor.execute(request_context, queue) # type: ignore[arg-type] +async def test_a2a_agent_can_call_hosted_channel(unused_tcp_port: int) -> None: + host = AgentFrameworkHost(target=_HostedAgent(), channels=[A2AChannel(streaming=False)]) + + async with ( + _serve_app(host.app, port=unused_tcp_port) as base_url, + A2AAgent( + url=base_url, + timeout=5.0, + ) as agent, + ): + response = await agent.run("hello") + + assert response.messages[0].text == "host: hello" + + def test_contents_to_parts_conversion() -> None: from agent_framework_hosting_a2a._executor import _contents_to_parts