Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ._core.middleware import Middleware, MiddlewareResult, resolve_handler_source
from ._core.pipetools import ContextCommandPipeline as _ContextCommandPipeline
from ._core.pump import Pump
from ._core.queues.abc import Consumer, Producer, Queue
from ._core.queues.abc import Consumer, Delivery, Producer, Queue
from ._core.queues.memory import MemoryQueue
from ._core.related_events import AnyIORelatedEvents, RelatedEvents

Expand All @@ -30,6 +30,7 @@
"Consumer",
"ContextCommandPipeline",
"ContextPipeline",
"Delivery",
"DIAdapter",
"Dispatcher",
"Event",
Expand Down
19 changes: 10 additions & 9 deletions cq/_core/dispatchers/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Awaitable, Callable
from typing import Protocol, Self, runtime_checkable

from cq._core.middleware import Middleware, MiddlewareGroup, deliver_message
from cq._core.middleware import Middleware, MiddlewareGroup


@runtime_checkable
Expand All @@ -29,16 +29,17 @@ def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
self.__middleware_group.add(*middlewares)
return self

async def _deliver(
async def _invoke(
self,
message: I,
handler: Callable[[I], Awaitable[O]],
message: I,
/,
fail_silently: bool = False,
) -> O:
return await deliver_message(
message,
handler,
self.__middleware_group,
fail_silently,
)
try:
return await self.__middleware_group.invoke(handler, message)
except Exception:
if fail_silently:
return NotImplemented

raise
6 changes: 3 additions & 3 deletions cq/_core/dispatchers/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def dispatch(self, message: I, /) -> O:
self._trigger_listeners(message, task_group)

for handler in self._handlers_from(type(message)):
return await self._deliver(message, handler, handler.fail_silently)
return await self._invoke(handler, message, handler.fail_silently)

return NotImplemented

Expand All @@ -100,8 +100,8 @@ async def dispatch(self, message: I, /) -> None:

for handler in self._handlers_from(type(message)):
task_group.start_soon(
self._deliver,
message,
self._invoke,
handler,
message,
handler.fail_silently,
)
2 changes: 1 addition & 1 deletion cq/_core/dispatchers/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def add_static_step[T](
return self

async def dispatch(self, message: I, /) -> O:
return await self._deliver(message, self.__steps.execute)
return await self._invoke(self.__steps.execute, message)


class ContextPipeline[I]:
Expand Down
15 changes: 0 additions & 15 deletions cq/_core/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,6 @@ async def __call__(
return value


async def deliver_message[I, O](
message: I,
handler: Callable[[I], Awaitable[O]],
middleware_group: MiddlewareGroup[[I], O],
fail_silently: bool = False,
) -> O:
try:
return await middleware_group.invoke(handler, message)
except Exception:
if fail_silently:
return NotImplemented

raise


def _is_gen_middleware[**P, T](
middleware: Middleware[P, T],
) -> TypeGuard[GeneratorMiddleware[P, T]]:
Expand Down
16 changes: 8 additions & 8 deletions cq/_core/pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import anyio

from cq._core.middleware import Middleware, MiddlewareGroup, deliver_message
from cq._core.middleware import Middleware, MiddlewareGroup
from cq._core.queues.abc import Consumer


Expand All @@ -24,13 +24,13 @@ def add_middlewares(self, *middlewares: Middleware[[T], Any]) -> Self:
return self

async def drain(self) -> None:
async for message in self.consumer:
await deliver_message(
message,
self.dispatcher,
self.__middleware_group,
self.fail_silently,
)
async for delivery in self.consumer:
try:
async with delivery as message:
await self.__middleware_group.invoke(self.dispatcher, message)
except Exception:
if not self.fail_silently:
raise

@asynccontextmanager
async def draining(
Expand Down
13 changes: 7 additions & 6 deletions cq/_core/queues/abc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import abstractmethod
from collections.abc import AsyncIterator
from typing import Protocol, runtime_checkable
from collections.abc import AsyncIterable
from typing import AsyncContextManager, Protocol, runtime_checkable


@runtime_checkable
Expand All @@ -16,12 +16,13 @@ async def send(self, message: T, /) -> None:


@runtime_checkable
class Consumer[T](Protocol):
class Delivery[T](AsyncContextManager[T], Protocol):
__slots__ = ()

@abstractmethod
def __aiter__(self) -> AsyncIterator[T]:
raise NotImplementedError

@runtime_checkable
class Consumer[T](AsyncIterable[Delivery[T]], Protocol):
__slots__ = ()


@runtime_checkable
Expand Down
9 changes: 5 additions & 4 deletions cq/_core/queues/memory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, nullcontext
from types import TracebackType
from typing import Any, Self

Expand All @@ -8,7 +8,7 @@

from cq._core.middleware import Middleware
from cq._core.pump import Pump
from cq._core.queues.abc import Queue
from cq._core.queues.abc import Delivery, Queue


class MemoryQueue[T](Queue[T]):
Expand All @@ -31,8 +31,9 @@ async def __aexit__(
) -> None:
await self.close()

def __aiter__(self) -> AsyncIterator[T]:
return aiter(self.__consumer)
async def __aiter__(self) -> AsyncIterator[Delivery[T]]:
async for message in self.__consumer:
yield nullcontext(message)

async def close(self) -> None:
await self.__producer.aclose()
Expand Down
4 changes: 3 additions & 1 deletion docs/guides/queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ Some workloads benefit from putting a queue between the producer of a message an
`Queue` is the combination of two protocols, kept separate so you can type producer-side and consumer-side ends independently:

* `Producer[T]` exposes `send(message)` and is callable.
* `Consumer[T]` exposes `__aiter__` and yields messages as they arrive.
* `Consumer[T]` is async iterable and yields a `Delivery[T]` for each incoming message.

Each `Delivery[T]` is an async context manager around a single message: entering it exposes the message, leaving it marks the end of processing to the queue, with or without an exception. This is where a transport-specific implementation can plug acknowledgement or requeue logic. `Pump` already handles the lifecycle, so most users never interact with `Delivery` directly.

Any object that satisfies these protocols can act as a queue. The library provides `MemoryQueue` as the default in-process implementation, and you are free to write your own without changing the rest of the API.

Expand Down
Loading