Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pymongo/asynchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
NoReturn,
Optional,
Sequence,
Union,
)

from bson import CodecOptions, _convert_raw_document_lists_to_streams
from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore
from pymongo.message import _GetMore, _OpMsg, _RawBatchGetMore
from pymongo.response import PinnedResponse
from pymongo.typings import _Address, _DocumentOut, _DocumentType

Expand Down Expand Up @@ -145,7 +144,7 @@ async def _maybe_pin_connection(self, conn: AsyncConnection) -> None:

def _unpack_response(
self,
response: Union[_OpReply, _OpMsg],
response: _OpMsg,
cursor_id: Optional[int],
codec_options: CodecOptions[Mapping[str, Any]],
user_fields: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -328,7 +327,7 @@ def __init__(

def _unpack_response( # type: ignore[override]
self,
response: Union[_OpReply, _OpMsg],
response: _OpMsg,
cursor_id: Optional[int],
codec_options: CodecOptions[dict[str, Any]],
user_fields: Optional[Mapping[str, Any]] = None,
Expand Down
5 changes: 2 additions & 3 deletions pymongo/asynchronous/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from pymongo.message import (
_GetMore,
_OpMsg,
_OpReply,
_Query,
_RawBatchGetMore,
_RawBatchQuery,
Expand Down Expand Up @@ -864,7 +863,7 @@ def collation(self, collation: Optional[_CollationIn]) -> AsyncCursor[_DocumentT

def _unpack_response(
self,
response: Union[_OpReply, _OpMsg],
response: _OpMsg,
cursor_id: Optional[int],
codec_options: CodecOptions, # type: ignore[type-arg]
user_fields: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -1189,7 +1188,7 @@ def __init__(

def _unpack_response(
self,
response: Union[_OpReply, _OpMsg],
response: _OpMsg,
cursor_id: Optional[int],
codec_options: CodecOptions[Mapping[str, Any]],
user_fields: Optional[Mapping[str, Any]] = None,
Expand Down
38 changes: 14 additions & 24 deletions pymongo/asynchronous/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from pymongo import _csot, helpers_shared, message
from pymongo._telemetry import _CommandTelemetry
from pymongo.compression_support import _NO_COMPRESSION
from pymongo.message import _OpMsg, _OpReply
from pymongo.message import _OpMsg
from pymongo.monitoring import _is_speculative_authenticate

if TYPE_CHECKING:
Expand Down Expand Up @@ -75,7 +75,7 @@ async def _network_command_core(
cursor_id: Optional[int] = None,
orig: Optional[MutableMapping[str, Any]] = None,
speculative_hello: bool = False,
) -> tuple[list[_DocumentOut], Optional[Union[_OpReply, _OpMsg]], datetime.timedelta]:
) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]:
"""Send/receive a command and return (docs, raw_reply, duration).

Handles APM logging, send/receive, unpacking, response processing,
Expand All @@ -84,7 +84,7 @@ async def _network_command_core(
"""
publish = listeners is not None and listeners.enabled_for_commands
name = next(iter(spec))
reply: Optional[Union[_OpReply, _OpMsg]] = None
reply: Optional[_OpMsg] = None
docs: list[_DocumentOut] = []

with _CommandTelemetry(client, conn, spec, dbname, request_id, start) as cmd_telemetry:
Expand Down Expand Up @@ -191,7 +191,7 @@ async def command(
conn: AsyncConnection,
dbname: str,
spec: MutableMapping[str, Any],
is_mongos: bool,
is_mongos: bool, # noqa: ARG001
read_preference: Optional[_ServerMode],
codec_options: CodecOptions[_DocumentType],
session: Optional[AsyncClientSession],
Expand All @@ -205,7 +205,7 @@ async def command(
parse_write_concern_error: bool = False,
collation: Optional[_CollationIn] = None,
compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None,
use_op_msg: bool = False,
use_op_msg: bool = False, # noqa: ARG001
unacknowledged: bool = False,
user_fields: Optional[Mapping[str, Any]] = None,
exhaust_allowed: bool = False,
Expand Down Expand Up @@ -239,14 +239,10 @@ async def command(
:param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed.
"""
name = next(iter(spec))
ns = dbname + ".$cmd"
speculative_hello = False

# Publish the original command document, perhaps with lsid and $clusterTime.
orig = spec
if is_mongos and not use_op_msg:
assert read_preference is not None
spec = message._maybe_add_read_preference(spec, read_preference)
if read_concern and not (session and session.in_transaction):
if read_concern.level:
spec["readConcern"] = read_concern.document
Expand All @@ -271,21 +267,15 @@ async def command(
conn.apply_timeout(client, spec)
_csot.apply_write_concern(spec, write_concern)

if use_op_msg:
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0
request_id, msg, size, max_doc_size = message._op_msg(
flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx
)
# If this is an unacknowledged write then make sure the encoded doc(s)
# are small enough, otherwise rely on the server to return an error.
if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size:
message._raise_document_too_large(name, size, max_bson_size)
else:
request_id, msg, size = message._query(
0, ns, 0, -1, spec, None, codec_options, compression_ctx
)
max_doc_size = 0
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0
request_id, msg, size, max_doc_size = message._op_msg(
flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx
)
# If this is an unacknowledged write then make sure the encoded doc(s)
# are small enough, otherwise rely on the server to return an error.
if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size:
message._raise_document_too_large(name, size, max_bson_size)

if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD:
message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD)
Expand Down
13 changes: 6 additions & 7 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
ZlibContext,
ZstdContext,
)
from pymongo.message import _OpMsg, _OpReply
from pymongo.message import _OpMsg
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import _ServerMode
from pymongo.typings import _Address, _CollationIn
Expand Down Expand Up @@ -235,13 +235,12 @@ async def unpin(self) -> None:
await self.close_conn(ConnectionClosedReason.STALE)

def hello_cmd(self) -> dict[str, Any]:
# Handshake spec requires us to use OP_MSG+hello command for the
# initial handshake in load balanced or stable API mode.
# As of PYTHON-5713, always use OP_MSG for the handshake since all
# supported servers (MongoDB 4.2+, wire version >= 8) support it.
self.op_msg_enabled = True
if self.opts.server_api or self.hello_ok or self.opts.load_balanced:
self.op_msg_enabled = True
return {HelloCompat.CMD: 1}
else:
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}

async def hello(self) -> Hello[dict[str, Any]]:
return await self._hello(None, None)
Expand Down Expand Up @@ -447,7 +446,7 @@ async def send_message(self, message: bytes, max_doc_size: int) -> None:
except BaseException as error:
await self._raise_connection_failure(error)

async def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]:
async def receive_message(self, request_id: Optional[int]) -> _OpMsg:
"""Receive a raw BSON message or raise ConnectionFailure.

If any exception is raised, the socket is closed.
Expand Down
Loading
Loading