diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c597ab308..2cd792ea9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,6 +29,9 @@ env: jobs: test-matrix: runs-on: ubuntu-latest + # Backstop: a hung multiprocessing worker (e.g. during a pickle regression) + # should not block CI longer than this. + timeout-minutes: 30 strategy: fail-fast: false matrix: diff --git a/crates/core/src/codec.rs b/crates/core/src/codec.rs index 363ee82b8..b1b9f99dc 100644 --- a/crates/core/src/codec.rs +++ b/crates/core/src/codec.rs @@ -232,16 +232,39 @@ fn strip_wire_header<'a>( #[derive(Debug)] pub struct PythonLogicalCodec { inner: Arc, + python_udf_inlining: bool, } impl PythonLogicalCodec { pub fn new(inner: Arc) -> Self { - Self { inner } + Self { + inner, + python_udf_inlining: true, + } } pub fn inner(&self) -> &Arc { &self.inner } + + /// Toggle inline encoding of Python UDFs. See + /// `SessionContext.with_python_udf_inlining` (Python) for full + /// behavior and use cases. + /// + /// Security scope: strict mode (`false`) narrows only the codec + /// layer — it stops `Expr::from_bytes` from invoking + /// `cloudpickle.loads` on the inline `DFPY*` payload. It does + /// **not** make `pickle.loads(untrusted_bytes)` safe; treat every + /// `pickle.loads` on untrusted input as unsafe regardless of this + /// setting. + pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self { + self.python_udf_inlining = enabled; + self + } + + pub fn python_udf_inlining(&self) -> bool { + self.python_udf_inlining + } } impl Default for PythonLogicalCodec { @@ -301,48 +324,104 @@ impl LogicalExtensionCodec for PythonLogicalCodec { } fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_scalar_udf(node, buf)? { + if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? { return Ok(()); } self.inner.try_encode_udf(node, buf) } fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udf) = try_decode_python_scalar_udf(buf)? { - return Ok(udf); + if self.python_udf_inlining { + if let Some(udf) = try_decode_python_scalar_udf(buf)? { + return Ok(udf); + } + } else { + refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?; } self.inner.try_decode_udf(name, buf) } fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_udaf(node, buf)? { + if self.python_udf_inlining && try_encode_python_udaf(node, buf)? { return Ok(()); } self.inner.try_encode_udaf(node, buf) } fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udaf) = try_decode_python_udaf(buf)? { - return Ok(udaf); + if self.python_udf_inlining { + if let Some(udaf) = try_decode_python_udaf(buf)? { + return Ok(udaf); + } + } else { + refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?; } self.inner.try_decode_udaf(name, buf) } fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_udwf(node, buf)? { + if self.python_udf_inlining && try_encode_python_udwf(node, buf)? { return Ok(()); } self.inner.try_encode_udwf(node, buf) } fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udwf) = try_decode_python_udwf(buf)? { - return Ok(udwf); + if self.python_udf_inlining { + if let Some(udwf) = try_decode_python_udwf(buf)? { + return Ok(udwf); + } + } else { + refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?; } self.inner.try_decode_udwf(name, buf) } } +/// Strict-mode gate: if `buf` is a well-framed inline payload for +/// `family`, return the strict-refusal error; otherwise return +/// `Ok(())` so the caller can delegate to its `inner` codec. +/// +/// Routing through [`read_framed_payload`] (rather than a bare +/// `starts_with` probe) means malformed inline bytes — wrong +/// wire-format version, mismatched Python version, truncated header — +/// surface *their* diagnostic instead of the strict-mode message. +/// The strict message implies sender intent ("inlining is disabled"), +/// so it should fire only when the bytes really would have decoded. +/// +/// Fast path: short-circuit on the family-magic prefix before +/// acquiring the GIL. Plans with many non-Python UDFs would otherwise +/// pay a GIL acquisition per decode call just to confirm "not a +/// Python UDF". `read_framed_payload` itself rejects buffers that +/// don't start with `family`, so this is purely an optimization. +fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> { + if !buf.starts_with(family) { + return Ok(()); + } + Python::attach(|py| match read_framed_payload(py, buf, family, kind)? { + Some(_) => Err(refuse_inline_payload(kind, name)), + None => Ok(()), + }) +} + +/// Build the error returned by a strict codec when it receives an +/// inline Python-UDF payload it has been told not to deserialize. +fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError { + // `Execution`, not `Plan`: this is a wire-format decode refusal at + // codec time, not a planner-stage failure. Downstream error + // classification keys off the variant — surfacing this as a planner + // error would mis-route it into "fix your SQL" buckets. + datafusion::error::DataFusionError::Execution(format!( + "Refusing to deserialize inline Python {kind} '{name}': Python UDF \ + inlining is disabled on this session. Two remediations: \ + (1) ask the sender to re-encode with inlining disabled so '{name}' \ + travels by name, and register '{name}' on this receiver; or \ + (2) enable inlining on this receiver (accepts the cloudpickle \ + execution risk on inbound payloads). Receivers cannot re-encode \ + bytes they did not produce." + )) +} + /// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked /// on the same `SessionContext`. Carries the Python-aware encoding /// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`) @@ -358,16 +437,33 @@ impl LogicalExtensionCodec for PythonLogicalCodec { #[derive(Debug)] pub struct PythonPhysicalCodec { inner: Arc, + python_udf_inlining: bool, } impl PythonPhysicalCodec { pub fn new(inner: Arc) -> Self { - Self { inner } + Self { + inner, + python_udf_inlining: true, + } } pub fn inner(&self) -> &Arc { &self.inner } + + /// Toggle inline encoding of Python UDFs on this physical codec. + /// + /// Mirrors [`PythonLogicalCodec::with_python_udf_inlining`]; see + /// that method for the full security and portability discussion. + pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self { + self.python_udf_inlining = enabled; + self + } + + pub fn python_udf_inlining(&self) -> bool { + self.python_udf_inlining + } } impl Default for PythonPhysicalCodec { @@ -391,15 +487,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { } fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_scalar_udf(node, buf)? { + if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? { return Ok(()); } self.inner.try_encode_udf(node, buf) } fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udf) = try_decode_python_scalar_udf(buf)? { - return Ok(udf); + if self.python_udf_inlining { + if let Some(udf) = try_decode_python_scalar_udf(buf)? { + return Ok(udf); + } + } else { + refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?; } self.inner.try_decode_udf(name, buf) } @@ -417,29 +517,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { } fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_udaf(node, buf)? { + if self.python_udf_inlining && try_encode_python_udaf(node, buf)? { return Ok(()); } self.inner.try_encode_udaf(node, buf) } fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udaf) = try_decode_python_udaf(buf)? { - return Ok(udaf); + if self.python_udf_inlining { + if let Some(udaf) = try_decode_python_udaf(buf)? { + return Ok(udaf); + } + } else { + refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?; } self.inner.try_decode_udaf(name, buf) } fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_udwf(node, buf)? { + if self.python_udf_inlining && try_encode_python_udwf(node, buf)? { return Ok(()); } self.inner.try_encode_udwf(node, buf) } fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udwf) = try_decode_python_udwf(buf)? { - return Ok(udwf); + if self.python_udf_inlining { + if let Some(udwf) = try_decode_python_udwf(buf)? { + return Ok(udwf); + } + } else { + refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?; } self.inner.try_decode_udwf(name, buf) } @@ -476,6 +584,9 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec) /// the caller to delegate to its `inner` codec (and eventually the /// `FunctionRegistry`). pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result>> { + if !buf.starts_with(PY_SCALAR_UDF_FAMILY) { + return Ok(None); + } Python::attach(|py| -> Result>> { let Some(payload) = read_framed_payload(py, buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")? else { @@ -732,6 +843,9 @@ pub(crate) fn try_encode_python_udwf(node: &WindowUDF, buf: &mut Vec) -> Res } pub(crate) fn try_decode_python_udwf(buf: &[u8]) -> Result>> { + if !buf.starts_with(PY_WINDOW_UDF_FAMILY) { + return Ok(None); + } Python::attach(|py| -> Result>> { let Some(payload) = read_framed_payload(py, buf, PY_WINDOW_UDF_FAMILY, "window UDF")? else { @@ -814,6 +928,9 @@ pub(crate) fn try_encode_python_udaf(node: &AggregateUDF, buf: &mut Vec) -> } pub(crate) fn try_decode_python_udaf(buf: &[u8]) -> Result>> { + if !buf.starts_with(PY_AGG_UDF_FAMILY) { + return Ok(None); + } Python::attach(|py| -> Result>> { let Some(payload) = read_framed_payload(py, buf, PY_AGG_UDF_FAMILY, "aggregate UDF")? else { diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 642afeef7..03221eb68 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -1404,6 +1404,22 @@ impl PySessionContext { physical_codec, }) } + + pub fn with_python_udf_inlining(&self, enabled: bool) -> Self { + let logical_codec = Arc::new( + PythonLogicalCodec::new(Arc::clone(self.logical_codec.inner())) + .with_python_udf_inlining(enabled), + ); + let physical_codec = Arc::new( + PythonPhysicalCodec::new(Arc::clone(self.physical_codec.inner())) + .with_python_udf_inlining(enabled), + ); + Self { + ctx: Arc::clone(&self.ctx), + logical_codec, + physical_codec, + } + } } impl PySessionContext { diff --git a/pyproject.toml b/pyproject.toml index a02f4608a..418640a49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -203,6 +203,7 @@ dev = [ "pyarrow>=19.0.0", "pygithub==2.5.0", "pytest-asyncio>=0.23.3", + "pytest-timeout>=2.3.1", "pytest>=7.4.4", "pyyaml>=6.0.3", "ruff>=0.15.1", diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 5c3501941..c5b1d7858 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -1769,3 +1769,48 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext: new = SessionContext.__new__(SessionContext) new.ctx = new_internal return new + + def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext: + """Control whether Python UDFs are embedded in serialized expressions. + + When ``enabled=True`` (the default), serialized expressions carry + the Python code for any scalar, aggregate, or window UDFs they + reference. The receiver rebuilds the UDFs from those bytes and + does not need to register them first. + + When ``enabled=False``, serialized expressions store only the + UDF names. This has two uses: + + * **Cross-language portability.** The bytes can be decoded by a + non-Python receiver, which must already have UDFs registered + under matching names. + * **Safer deserialization.** :meth:`Expr.from_bytes` will refuse + to rebuild Python UDFs rather than call ``cloudpickle.loads`` + on untrusted input. + + The setting affects :meth:`Expr.to_bytes` and + :meth:`Expr.from_bytes` whenever this session is passed as the + ``ctx`` argument. :func:`pickle.dumps` and :func:`pickle.loads` + do not pass a context, so to apply the setting through pickle, + register this session with + :func:`datafusion.ipc.set_sender_ctx` on the sender and + :func:`datafusion.ipc.set_worker_ctx` on the receiver. + + .. warning:: Security + This setting narrows only :meth:`Expr.from_bytes`. Calling + :func:`pickle.loads` on untrusted bytes remains unsafe + regardless of the toggle. + + Returns a new :class:`SessionContext` with the toggle applied; + the original session is unchanged. + + Examples: + >>> from datafusion import SessionContext + >>> strict = SessionContext().with_python_udf_inlining(enabled=False) + >>> isinstance(strict, SessionContext) + True + """ + new_internal = self.ctx.with_python_udf_inlining(enabled) + new = SessionContext.__new__(SessionContext) + new.ctx = new_internal + return new diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 7e95bc127..4fdbdc5d4 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -53,6 +53,7 @@ from ._internal import expr as expr_internal from ._internal import functions as functions_internal +from .ipc import get_sender_ctx if TYPE_CHECKING: from collections.abc import Sequence @@ -446,13 +447,18 @@ def to_bytes(self, ctx: SessionContext | None = None) -> bytes: worker process for distributed evaluation. When ``ctx`` is supplied, encoding routes through that session's - installed :class:`LogicalExtensionCodec`. When ``ctx`` is - ``None``, the default codec is used. - - Built-in functions and Python UDFs (scalar, aggregate, window) - travel inside the returned bytes; the worker does not need to - pre-register them. UDFs imported via the FFI capsule protocol - travel by name only and must be registered on the worker. + installed :class:`LogicalExtensionCodec` (so settings like + :meth:`SessionContext.with_python_udf_inlining` take effect). + When ``ctx`` is ``None``, the default codec is used (Python UDF + inlining on, no user-installed extension codec). + + Built-in functions travel inside the returned bytes. Python UDFs + (scalar, aggregate, window) also inline by default, so the worker + does not need to pre-register them; when the encoding session has + :meth:`SessionContext.with_python_udf_inlining` set to ``False``, + Python UDFs travel by name only and must be registered on the + worker. UDFs imported via the FFI capsule protocol always travel + by name only and must be registered on the worker. .. warning:: Security Bytes returned here may embed a cloudpickled Python @@ -522,7 +528,9 @@ def from_bytes(cls, buf: bytes, ctx: SessionContext | None = None) -> Expr: Accepts output of :meth:`to_bytes` or :func:`pickle.dumps`. ``ctx`` is the :class:`SessionContext` used to resolve any - function references that travel by name (e.g. FFI UDFs). When + function references that travel by name (e.g. FFI UDFs, or + Python UDFs sent with inlining disabled via + :meth:`SessionContext.with_python_udf_inlining`). When ``ctx`` is ``None`` the worker context installed via :func:`datafusion.ipc.set_worker_ctx` is consulted; if no worker context is installed, the global :class:`SessionContext` is used @@ -586,8 +594,15 @@ def __reduce__(self) -> tuple[Callable[[bytes], Expr], tuple[bytes]]: >>> e = col("a") * lit(2) >>> pickle.loads(pickle.dumps(e)).canonical_name() 'a * Int64(2)' + + The encoding side honors a driver-side sender context installed + via :func:`datafusion.ipc.set_sender_ctx` — that is how + :meth:`SessionContext.with_python_udf_inlining` propagates + through ``pickle.dumps``. The sender context is read by + ``__reduce__``, so :func:`copy.copy` and :func:`copy.deepcopy` + — which also go through ``__reduce__`` — pick it up too. """ - return (Expr._reconstruct, (self.to_bytes(),)) + return (Expr._reconstruct, (self.to_bytes(get_sender_ctx()),)) @classmethod def _reconstruct(cls, proto_bytes: bytes) -> Expr: diff --git a/python/datafusion/ipc.py b/python/datafusion/ipc.py index 8dd7fc463..bb876bfb4 100644 --- a/python/datafusion/ipc.py +++ b/python/datafusion/ipc.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -"""Worker-side setup for distributing DataFusion expressions. +"""Driver- and worker-side setup for distributing DataFusion expressions. When a :class:`Expr` is shipped to a worker process (e.g. through :func:`multiprocessing.Pool` or a Ray actor), the worker reconstructs the @@ -53,6 +53,36 @@ def init_worker(): payloads are not portable across Python minor versions. See :meth:`datafusion.Expr.to_bytes` for examples of what travels by value vs. by reference. + +On the driver side, call :func:`set_sender_ctx` to control how +:func:`pickle.dumps` encodes expressions — for example, to apply +:meth:`SessionContext.with_python_udf_inlining` to every pickled +expression on this thread: + +.. code-block:: python + + from datafusion import SessionContext + from datafusion.ipc import clear_sender_ctx, set_sender_ctx + + driver_ctx = SessionContext().with_python_udf_inlining(enabled=False) + set_sender_ctx(driver_ctx) + try: + pickle.dumps(expr) # encoded with inlining disabled + finally: + clear_sender_ctx() + +Without a sender context the default codec is used (Python UDF +inlining on). The sender context only affects pickle / ``to_bytes`` +encoding; explicit ``expr.to_bytes(ctx)`` calls still use the supplied +``ctx``. + +The thread-local holds a strong reference to the installed +:class:`SessionContext` until :func:`clear_sender_ctx` is called or +the thread exits. Long-running driver threads that install a sender +context once and never clear it will retain that session for the +lifetime of the thread; pair :func:`set_sender_ctx` with +:func:`clear_sender_ctx` (e.g. in a ``try``/``finally``) when the +sender context is only needed for a bounded scope. """ from __future__ import annotations @@ -65,8 +95,11 @@ def init_worker(): __all__ = [ + "clear_sender_ctx", "clear_worker_ctx", + "get_sender_ctx", "get_worker_ctx", + "set_sender_ctx", "set_worker_ctx", ] @@ -125,6 +158,67 @@ def get_worker_ctx() -> SessionContext | None: return getattr(_local, "ctx", None) +def set_sender_ctx(ctx: SessionContext) -> None: + """Install this driver's :class:`SessionContext` for outbound pickles. + + Controls how :func:`pickle.dumps` encodes :class:`Expr` instances on + this thread. The most useful application is propagating a session + configured with + :meth:`SessionContext.with_python_udf_inlining` so the toggle takes + effect through pickle (which otherwise calls + :meth:`Expr.to_bytes` with no context and uses the default codec). + + Idempotent: overwrites any previous value. Stored in a thread-local + slot, so worker threads on the driver may install their own contexts. + Does not affect :meth:`Expr.to_bytes` calls that pass an explicit + ``ctx`` — those continue to use the supplied context. + + Examples: + >>> from datafusion import SessionContext + >>> from datafusion.ipc import ( + ... set_sender_ctx, get_sender_ctx, clear_sender_ctx, + ... ) + >>> driver = SessionContext().with_python_udf_inlining(enabled=False) + >>> set_sender_ctx(driver) + >>> get_sender_ctx() is driver + True + >>> clear_sender_ctx() + """ + _local.sender_ctx = ctx + + +def clear_sender_ctx() -> None: + """Remove this driver's installed sender :class:`SessionContext`. + + After clearing, pickled expressions fall back to the default codec + (Python UDF inlining on). + + Examples: + >>> from datafusion import SessionContext + >>> from datafusion.ipc import ( + ... set_sender_ctx, clear_sender_ctx, get_sender_ctx, + ... ) + >>> set_sender_ctx(SessionContext()) + >>> clear_sender_ctx() + >>> get_sender_ctx() is None + True + """ + if hasattr(_local, "sender_ctx"): + del _local.sender_ctx + + +def get_sender_ctx() -> SessionContext | None: + """Return this driver's installed sender :class:`SessionContext`, or ``None``. + + Examples: + >>> from datafusion.ipc import get_sender_ctx, clear_sender_ctx + >>> clear_sender_ctx() + >>> get_sender_ctx() is None + True + """ + return getattr(_local, "sender_ctx", None) + + def _resolve_ctx( explicit_ctx: SessionContext | None = None, ) -> SessionContext: diff --git a/python/tests/_pickle_multiprocessing_helpers.py b/python/tests/_pickle_multiprocessing_helpers.py new file mode 100644 index 000000000..4f04967f2 --- /dev/null +++ b/python/tests/_pickle_multiprocessing_helpers.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The leading underscore is load-bearing: pytest with --import-mode=importlib +# (used in CI) assigns synthetic module names to test modules, which breaks +# subprocess imports during multiprocessing. An underscore-prefixed module is +# not collected as a test module, so it imports under its normal __name__ +# inside worker processes. + +from __future__ import annotations + +import pyarrow as pa +from datafusion import SessionContext, udf +from datafusion.ipc import clear_worker_ctx, set_worker_ctx + + +def make_double_udf(): + """Build the canonical UDF used in the multiprocessing tests.""" + return udf( + lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]), + [pa.int64()], + pa.int64(), + volatility="immutable", + name="double", + ) + + +def make_times_seven_udf(): + """Closure-capturing UDF — verifies cloudpickle preserves closed-over state.""" + multiplier = 7 + + def fn(arr): + return pa.array([(v.as_py() or 0) * multiplier for v in arr]) + + return udf( + fn, + [pa.int64()], + pa.int64(), + volatility="immutable", + name="times_seven", + ) + + +def init_worker_empty(): + """Pool initializer: install an empty SessionContext (no UDFs).""" + set_worker_ctx(SessionContext()) + + +def init_worker_clear(): + """Pool initializer: explicitly clear any prior worker context.""" + clear_worker_ctx() + + +def unpickle_and_describe(blob: bytes) -> str: + """Unpickle a proto-bytes blob and return its canonical name.""" + import pickle + + expr = pickle.loads(blob) # noqa: S301 + return expr.canonical_name() + + +def unpickle_and_evaluate(blob: bytes, batch: list[int]) -> list[int]: + """Unpickle an expression and evaluate it against an in-memory batch. + + Returns the result column as a Python list. Used to verify that + cloudpickled UDFs (including closure state) execute correctly in + a fresh worker process. + """ + import pickle + + expr = pickle.loads(blob) # noqa: S301 + ctx = SessionContext() + df = ctx.from_pydict({"a": batch}) + out = df.with_column("result", expr).select("result") + return out.to_pydict()["result"] diff --git a/python/tests/test_pickle_expr.py b/python/tests/test_pickle_expr.py index eb0441c49..588caa21a 100644 --- a/python/tests/test_pickle_expr.py +++ b/python/tests/test_pickle_expr.py @@ -21,27 +21,36 @@ with the pickled expression and do not need worker-side pre-registration. The worker context (:mod:`datafusion.ipc`) is only consulted for UDFs imported via the FFI capsule protocol. + +Cross-process tests live in ``test_pickle_multiprocessing.py``. """ from __future__ import annotations import pickle +import threading import pyarrow as pa import pytest from datafusion import Expr, SessionContext, col, lit, udf from datafusion.ipc import ( + clear_sender_ctx, clear_worker_ctx, + get_sender_ctx, + get_worker_ctx, + set_sender_ctx, set_worker_ctx, ) @pytest.fixture(autouse=True) def _reset_worker_ctx(): - """Ensure every test starts with no worker context installed.""" + """Ensure every test starts with no worker or sender context installed.""" clear_worker_ctx() + clear_sender_ctx() yield clear_worker_ctx() + clear_sender_ctx() def _double_udf(): @@ -124,6 +133,8 @@ def fn(arr): e = u(col("a")) blob = pickle.dumps(e) decoded = pickle.loads(blob) # noqa: S301 + # Round-trip names match; functional verification of captured state + # happens in test_pickle_multiprocessing via an actual UDF call. assert decoded.canonical_name() == e.canonical_name() def test_multi_arg_udf_round_trip(self): @@ -311,3 +322,244 @@ def test_cross_version_error_message(self): Exception, match="not portable across Python minor versions" ): Expr.from_bytes(bytes(tampered)) + + +class TestPythonUdfInliningToggle: + """`SessionContext.with_python_udf_inlining(enabled=False)` opts out of + inline Python UDF encoding for both encode and decode paths.""" + + def _build_double_udf(self): + return udf( + lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]), + [pa.int64()], + pa.int64(), + volatility="immutable", + name="double", + ) + + def test_strict_encoder_omits_inline_payload(self): + """Strict mode emits the by-name wire form: no `DFPYUDF` magic + in the blob, no cloudpickled callable. Semantic check is + sharper than a size-ratio heuristic — a renamed UDF or a + smaller-than-expected closure would still flip the magic + bytes, but might not move the size by 4x. + """ + ctx_inline = SessionContext() + ctx_strict = ctx_inline.with_python_udf_inlining(enabled=False) + u = self._build_double_udf() + e = u(col("a")) + + blob_inline = e.to_bytes(ctx_inline) + blob_strict = e.to_bytes(ctx_strict) + + # `DFPYUDF` is the scalar Python-UDF family prefix; see + # `PY_SCALAR_UDF_FAMILY` in crates/core/src/codec.rs. + assert b"DFPYUDF" in blob_inline + assert b"DFPYUDF" not in blob_strict + + def test_toggle_off_then_on_restores_inline_encoding(self): + """`with_python_udf_inlining` is per-call clone semantics: + flipping off and then on must produce a context that emits the + same inline form as a fresh default context, byte-for-byte. + + Guards against a regression where the off→on transition leaves + the codec in a sticky strict state (e.g. by mutating shared + codec state instead of cloning). + """ + u = self._build_double_udf() + e = u(col("a")) + + baseline = SessionContext() + toggled = ( + SessionContext() + .with_python_udf_inlining(enabled=False) + .with_python_udf_inlining(enabled=True) + ) + + blob_baseline = e.to_bytes(baseline) + blob_toggled = e.to_bytes(toggled) + + assert blob_baseline == blob_toggled + + # Sanity check the decoded form against a fresh ctx — the + # toggled-back blob should be self-contained inline, not a + # strict by-name payload that needs registry resolution. + decoded = Expr.from_bytes(blob_toggled, ctx=SessionContext()) + assert "double" in decoded.canonical_name() + + def test_strict_roundtrip_via_registry(self): + """When both sender and receiver disable inlining, the UDF + travels by name only and the receiver resolves it from its + registered functions.""" + strict_sender = SessionContext().with_python_udf_inlining(enabled=False) + u = self._build_double_udf() + blob = u(col("a")).to_bytes(strict_sender) + + receiver = SessionContext().with_python_udf_inlining(enabled=False) + receiver.register_udf(u) + restored = Expr.from_bytes(blob, ctx=receiver) + assert "double" in restored.canonical_name() + + def test_strict_decoder_refuses_inline_payload(self): + """An inline-encoded blob fed to a strict receiver raises with a + clear error rather than silently invoking cloudpickle.loads. + + The receiver is intentionally *not* given a matching + registration: the codec refusal must trip before the registry + is ever consulted, so registering the UDF here would only mask + a regression that moved the check after registry lookup. + """ + sender = SessionContext() + u = self._build_double_udf() + blob = u(col("a")).to_bytes(sender) + + strict_receiver = SessionContext().with_python_udf_inlining(enabled=False) + # `RuntimeError` (not bare `Exception`): the codec refusal is + # surfaced through `parse_expr` → `PyRuntimeError`. Tightening + # the assertion catches a regression that swallows the refusal + # as a different error type. + with pytest.raises(RuntimeError, match="inlining is disabled"): + Expr.from_bytes(blob, ctx=strict_receiver) + + def test_sender_ctx_propagates_through_pickle(self): + """`set_sender_ctx` makes `pickle.dumps` use a strict codec. + + Without a sender context, pickle defaults to the inline codec + and the blob contains the `DFPYUDF` family prefix. With a + strict sender context installed, the callable encodes by name + and the prefix is absent. + """ + u = self._build_double_udf() + e = u(col("a")) + + blob_default = pickle.dumps(e) + + strict_sender = SessionContext().with_python_udf_inlining(enabled=False) + set_sender_ctx(strict_sender) + try: + blob_strict = pickle.dumps(e) + finally: + clear_sender_ctx() + + assert b"DFPYUDF" in blob_default + assert b"DFPYUDF" not in blob_strict + + def test_sender_ctx_strict_roundtrip_via_pickle(self): + """End-to-end pickle round-trip with strict mode on both sides. + + Driver installs a strict sender context. Worker installs a + matching strict context with the UDF registered. The UDF + travels by name through `pickle.dumps` / `pickle.loads`. + """ + u = self._build_double_udf() + e = u(col("a")) + + strict_sender = SessionContext().with_python_udf_inlining(enabled=False) + set_sender_ctx(strict_sender) + try: + blob = pickle.dumps(e) + finally: + clear_sender_ctx() + + worker = SessionContext().with_python_udf_inlining(enabled=False) + worker.register_udf(u) + set_worker_ctx(worker) + try: + decoded = pickle.loads(blob) # noqa: S301 + finally: + clear_worker_ctx() + + assert "double" in decoded.canonical_name() + + def test_sender_ctx_strict_pickle_accepted_by_inline_worker_with_registry(self): + """A strict-encoded blob still decodes fine on an inline worker + because the wire format is the same default-codec by-name form. + Sanity check: cross-config works as long as the receiver can + resolve the name.""" + u = self._build_double_udf() + e = u(col("a")) + + strict_sender = SessionContext().with_python_udf_inlining(enabled=False) + set_sender_ctx(strict_sender) + try: + blob = pickle.dumps(e) + finally: + clear_sender_ctx() + + worker = SessionContext() + worker.register_udf(u) + set_worker_ctx(worker) + try: + decoded = pickle.loads(blob) # noqa: S301 + finally: + clear_worker_ctx() + + assert "double" in decoded.canonical_name() + + +class TestWorkerCtxLifecycle: + def test_set_and_clear(self): + assert get_worker_ctx() is None + ctx = SessionContext() + set_worker_ctx(ctx) + assert get_worker_ctx() is ctx + clear_worker_ctx() + assert get_worker_ctx() is None + + def test_clear_when_unset_is_noop(self): + clear_worker_ctx() # no error + assert get_worker_ctx() is None + + def test_thread_local_isolation(self): + main_ctx = SessionContext() + set_worker_ctx(main_ctx) + + seen_in_thread: list = [] + + def worker(): + seen_in_thread.append(get_worker_ctx()) + set_worker_ctx(SessionContext()) + seen_in_thread.append(get_worker_ctx()) + + t = threading.Thread(target=worker) + t.start() + t.join() + + # Thread saw no ctx initially (thread-local), then its own. + assert seen_in_thread[0] is None + assert seen_in_thread[1] is not main_ctx + # Main thread's ctx is unchanged by the thread's actions. + assert get_worker_ctx() is main_ctx + + +class TestSenderCtxLifecycle: + def test_set_and_clear(self): + assert get_sender_ctx() is None + ctx = SessionContext() + set_sender_ctx(ctx) + assert get_sender_ctx() is ctx + clear_sender_ctx() + assert get_sender_ctx() is None + + def test_clear_when_unset_is_noop(self): + clear_sender_ctx() # no error + assert get_sender_ctx() is None + + def test_thread_local_isolation(self): + main_ctx = SessionContext() + set_sender_ctx(main_ctx) + + seen_in_thread: list = [] + + def worker(): + seen_in_thread.append(get_sender_ctx()) + set_sender_ctx(SessionContext()) + seen_in_thread.append(get_sender_ctx()) + + t = threading.Thread(target=worker) + t.start() + t.join() + + assert seen_in_thread[0] is None + assert seen_in_thread[1] is not main_ctx + assert get_sender_ctx() is main_ctx diff --git a/python/tests/test_pickle_multiprocessing.py b/python/tests/test_pickle_multiprocessing.py new file mode 100644 index 000000000..fcce49d97 --- /dev/null +++ b/python/tests/test_pickle_multiprocessing.py @@ -0,0 +1,145 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Cross-process pickle tests for :class:`Expr`. + +Workers run with each :mod:`multiprocessing` start method (``fork``, +``forkserver``, ``spawn``). Python UDFs (scalar, aggregate, window) travel +with the pickled expression and need no worker-side pre-registration. +Worker-side helpers live in ``_pickle_multiprocessing_helpers`` — the +underscore prefix avoids pytest collection so the module imports under +its real name in worker subprocesses. +""" + +from __future__ import annotations + +import functools +import multiprocessing as mp +import pickle +import sys +from pathlib import Path + +import pytest +from datafusion import col, lit + +from . import _pickle_multiprocessing_helpers as helpers + +# `pytest --import-mode=importlib` (used in CI) does not put the test parent +# directory on `sys.path`; pytest loads `tests` via its own importlib hook. +# multiprocessing forkserver / spawn workers receive only the parent's +# `sys.path` snapshot, not pytest's hook, so they fail to import +# `tests._pickle_multiprocessing_helpers` with `ModuleNotFoundError: No module +# named 'tests'`. Append (not prepend) the parent directory of the `tests` +# package so workers can resolve it the standard way, *without* shadowing the +# installed `datafusion` wheel — the source tree's `python/datafusion/` has +# no `_internal` extension module (that lives in the wheel under +# site-packages), so prepending would break `from datafusion._internal +# import ...`. Fork start method is unaffected (inherits the already-imported +# module object). +_TESTS_PARENT = str(Path(__file__).resolve().parent.parent) +if _TESTS_PARENT not in sys.path: + sys.path.append(_TESTS_PARENT) + + +@functools.cache +def _multiprocessing_available() -> tuple[bool, str]: + """Return (available, reason). Some sandboxed environments deny semaphore + creation; without semaphores, ``multiprocessing.Pool`` cannot start. + + Cached so the probe Pool only spawns once per session, and only when a + test in this module is actually about to run — collection-only runs + (e.g. ``pytest --collect-only`` on the full suite) skip the probe. + """ + try: + ctx = mp.get_context("spawn") + with ctx.Pool(processes=1) as pool: + pool.map(int, [0]) + except (PermissionError, OSError) as exc: + return False, f"multiprocessing.Pool unavailable: {exc}" + return True, "" + + +@pytest.fixture(autouse=True) +def _skip_if_multiprocessing_unavailable(): + available, reason = _multiprocessing_available() + if not available: + pytest.skip(reason) + + +START_METHODS = [ + pytest.param( + "fork", + marks=pytest.mark.skipif( + sys.platform == "darwin", + reason="fork start method is unsafe with PyArrow/tokio on macOS", + ), + ), + "forkserver", + "spawn", +] + + +@pytest.mark.parametrize("start_method", START_METHODS) +@pytest.mark.timeout(120) +def test_builtin_pickle_via_pool(start_method): + """Built-in expressions round-trip in every start method.""" + expr = col("a") + lit(1) + blob = pickle.dumps(expr) + + ctx = mp.get_context(start_method) + with ctx.Pool(processes=2) as pool: + results = pool.map(helpers.unpickle_and_describe, [blob, blob, blob]) + + assert all(r == expr.canonical_name() for r in results) + + +@pytest.mark.parametrize("start_method", START_METHODS) +@pytest.mark.timeout(120) +def test_udf_pickle_self_contained(start_method): + """Scalar UDF travels inside the proto blob — no worker pre-registration. + + Workers start with no UDF registered. The Rust-side ``PythonUDFCodec`` + reconstructs the UDF from bytes embedded in the pickle blob. + """ + udf_obj = helpers.make_double_udf() + expr = udf_obj(col("a")) + blob = pickle.dumps(expr) + + ctx = mp.get_context(start_method) + with ctx.Pool(processes=2) as pool: + results = pool.starmap( + helpers.unpickle_and_evaluate, + [(blob, [1, 2, 3]), (blob, [10, 20, 30])], + ) + + assert results[0] == [2, 4, 6] + assert results[1] == [20, 40, 60] + + +@pytest.mark.parametrize("start_method", START_METHODS) +@pytest.mark.timeout(120) +def test_closure_capturing_udf_via_pool(start_method): + """Cloudpickle preserves closure state across the codec boundary.""" + udf_obj = helpers.make_times_seven_udf() + expr = udf_obj(col("a")) + blob = pickle.dumps(expr) + + ctx = mp.get_context(start_method) + with ctx.Pool(processes=2) as pool: + result = pool.apply(helpers.unpickle_and_evaluate, (blob, [1, 2, 3])) + + assert result == [7, 14, 21] diff --git a/uv.lock b/uv.lock index 3fd3eec4b..26ab8b20e 100644 --- a/uv.lock +++ b/uv.lock @@ -343,6 +343,7 @@ dev = [ { name = "pygithub" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-timeout" }, { name = "pyyaml" }, { name = "ruff" }, { name = "toml" }, @@ -380,6 +381,7 @@ dev = [ { name = "pygithub", specifier = "==2.5.0" }, { name = "pytest", specifier = ">=7.4.4" }, { name = "pytest-asyncio", specifier = ">=0.23.3" }, + { name = "pytest-timeout", specifier = ">=2.3.1" }, { name = "pyyaml", specifier = ">=6.0.3" }, { name = "ruff", specifier = ">=0.15.1" }, { name = "toml", specifier = ">=0.10.2" }, @@ -628,25 +630,26 @@ wheels = [ [[package]] name = "maturin" -version = "1.8.1" +version = "1.13.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "tomli", marker = "python_full_version < '3.11'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9a/08/ccb0f917722a35ab0d758be9bb5edaf645c3a3d6170061f10d396ecd273f/maturin-1.8.1.tar.gz", hash = "sha256:49cd964aabf59f8b0a6969f9860d2cdf194ac331529caae14c884f5659568857", size = 197397, upload-time = "2024-12-30T14:03:48.109Z" } +sdist = { url = "https://files.pythonhosted.org/packages/9c/1c/612d23d33ec21b9ae7ece7b3f0dd5f9dfd57b4009e9d2938165869ebd6ae/maturin-1.13.3.tar.gz", hash = "sha256:771e1e9e71a278e56db01552e0d1acfd1464259f9575b6e72842f893cd299079", size = 357934, upload-time = "2026-05-11T07:43:39.027Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4c/00/f34077315f34db8ad2ccf6bfe11b864ca27baab3a1320634da8e3cf89a48/maturin-1.8.1-py3-none-linux_armv6l.whl", hash = "sha256:7e590a23d9076b8a994f2e67bc63dc9a2d1c9a41b1e7b45ac354ba8275254e89", size = 7568415, upload-time = "2024-12-30T14:03:07.939Z" }, - { url = "https://files.pythonhosted.org/packages/5c/07/9219976135ce0cb32d2fa6ea5c6d0ad709013d9a17967312e149b98153a6/maturin-1.8.1-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:8d8251a95682c83ea60988c804b620c181911cd824aa107b4a49ac5333c92968", size = 14527816, upload-time = "2024-12-30T14:03:13.851Z" }, - { url = "https://files.pythonhosted.org/packages/e6/04/fa009a00903acdd1785d58322193140bfe358595347c39f315112dabdf9e/maturin-1.8.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b9fc1a4354cac5e32c190410208039812ea88c4a36bd2b6499268ec49ef5de00", size = 7580446, upload-time = "2024-12-30T14:03:17.64Z" }, - { url = "https://files.pythonhosted.org/packages/9b/d4/414b2aab9bbfe88182b734d3aa1b4fef7d7701e50f6be48500378b8c8721/maturin-1.8.1-py3-none-manylinux_2_12_i686.manylinux2010_i686.musllinux_1_1_i686.whl", hash = "sha256:621e171c6b39f95f1d0df69a118416034fbd59c0f89dcaea8c2ea62019deecba", size = 7650535, upload-time = "2024-12-30T14:03:21.115Z" }, - { url = "https://files.pythonhosted.org/packages/f0/64/879418a8a0196013ec1fb19eada0781c04a30e8d6d9227e80f91275a4f5b/maturin-1.8.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.musllinux_1_1_x86_64.whl", hash = "sha256:98f638739a5132962347871b85c91f525c9246ef4d99796ae98a2031e3df029f", size = 8006702, upload-time = "2024-12-30T14:03:24.318Z" }, - { url = "https://files.pythonhosted.org/packages/39/c2/605829324f8371294f70303aca130682df75318958efed246873d3d604ab/maturin-1.8.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:f9f5c47521924b6e515cbc652a042fe5f17f8747445be9d931048e5d8ddb50a4", size = 7368164, upload-time = "2024-12-30T14:03:26.582Z" }, - { url = "https://files.pythonhosted.org/packages/be/6c/30e136d397bb146b94b628c0ef7f17708281611b97849e2cf37847025ac7/maturin-1.8.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:0f4407c7353c31bfbb8cdeb82bc2170e474cbfb97b5ba27568f440c9d6c1fdd4", size = 7450889, upload-time = "2024-12-30T14:03:28.893Z" }, - { url = "https://files.pythonhosted.org/packages/1b/50/e1f5023512696d4e56096f702e2f68d6d9a30afe0a4eec82b0e27b8eb4e4/maturin-1.8.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.musllinux_1_1_ppc64le.whl", hash = "sha256:ec49cd70cad3c389946c6e2bc0bd50772a7fcb463040dd800720345897eec9bf", size = 9585819, upload-time = "2024-12-30T14:03:31.125Z" }, - { url = "https://files.pythonhosted.org/packages/b7/80/b24b5248d89d2e5982553900237a337ea098ca9297b8369ca2aa95549e0f/maturin-1.8.1-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c08767d794de8f8a11c5c8b1b47a4ff9fb6ae2d2d97679e27030f2f509c8c2a0", size = 10920801, upload-time = "2024-12-30T14:03:35.127Z" }, - { url = "https://files.pythonhosted.org/packages/6e/f4/8ede7a662fabf93456b44390a5ad22630e25fb5ddaecf787251071b2e143/maturin-1.8.1-py3-none-win32.whl", hash = "sha256:d678407713f3e10df33c5b3d7a343ec0551eb7f14d8ad9ba6febeb96f4e4c75c", size = 6873556, upload-time = "2024-12-30T14:03:37.913Z" }, - { url = "https://files.pythonhosted.org/packages/9c/22/757f093ed0e319e9648155b8c9d716765442bea5bc98ebc58ad4ad5b0524/maturin-1.8.1-py3-none-win_amd64.whl", hash = "sha256:a526f90fe0e5cb59ffb81f4ff547ddc42e823bbdeae4a31012c0893ca6dcaf46", size = 7823153, upload-time = "2024-12-30T14:03:40.33Z" }, - { url = "https://files.pythonhosted.org/packages/a4/f5/051413e04f6da25069db5e76759ecdb8cd2a8ab4a94045b5a3bf548c66fa/maturin-1.8.1-py3-none-win_arm64.whl", hash = "sha256:e95f077fd2ddd2f048182880eed458c308571a534be3eb2add4d3dac55bf57f4", size = 6552131, upload-time = "2024-12-30T14:03:45.203Z" }, + { url = "https://files.pythonhosted.org/packages/71/66/18c2aaac0b2a5dea9f1db5984ce83b905ad205cfc7c02d0091e707c0c2e7/maturin-1.13.3-py3-none-linux_armv6l.whl", hash = "sha256:3cc13929ca82aefa4adbf0f2c35419369796213c6fb0eb24e914945f50ef5d8c", size = 10190971, upload-time = "2026-05-11T07:43:10.431Z" }, + { url = "https://files.pythonhosted.org/packages/bc/71/26a988d092e4fd6a9523d46d44400a46cad7cdf3fd206ce702240c748aee/maturin-1.13.3-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:53b08bd075649ce96513ad9abf241a43cb685ed6e9e7790f8dbc2d66e95d8323", size = 19716714, upload-time = "2026-05-11T07:43:36.911Z" }, + { url = "https://files.pythonhosted.org/packages/82/5c/f3fd0e184255d9fc7e272c62af3dfa84c617b2577ef83af9ce615f5279cc/maturin-1.13.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:4cd478e6e4c56251e48ed079b8efd55b30bc5c09cf695a1bdafaeb582ee735a0", size = 10194726, upload-time = "2026-05-11T07:43:07.05Z" }, + { url = "https://files.pythonhosted.org/packages/a9/e1/f4edb69fb647b77c4769a9bfd4d6fb62961e653d164bc277ecdffac3ab61/maturin-1.13.3-py3-none-manylinux_2_12_i686.manylinux2010_i686.musllinux_1_1_i686.whl", hash = "sha256:a2675e25f313034ae6f57388cf14818f87d8961c4a96795287f3e155f59beb11", size = 10172781, upload-time = "2026-05-11T07:43:40.796Z" }, + { url = "https://files.pythonhosted.org/packages/c7/7d/a1be934690cdcc3c6609769ceaad322ab7501c2ee5bafcac1b14d609e403/maturin-1.13.3-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.musllinux_1_1_x86_64.whl", hash = "sha256:4667ef609ab446c1b5e0bfe4f9fb99699ab6d8548433f8d1a684256e0b67217f", size = 10682670, upload-time = "2026-05-11T07:43:13.132Z" }, + { url = "https://files.pythonhosted.org/packages/18/f5/372ae19b72ce8f6e37e5864ae4dc5b252ee9fce0619ccc3aa366aa3a7f97/maturin-1.13.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:3db93337ed97e60ffc878aa8b493cd7ae44d3a5e1a37256db3a4491f57565018", size = 10060363, upload-time = "2026-05-11T07:43:21.107Z" }, + { url = "https://files.pythonhosted.org/packages/cb/5b/c68340cca09368af0df80965dfabed4234205a492a93da00793c7b9aae20/maturin-1.13.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:1cc0a110b224ca90406b668a3e3c1f5a515062e59e26292f6dbaf5fd4909c6f3", size = 10017551, upload-time = "2026-05-11T07:43:33.916Z" }, + { url = "https://files.pythonhosted.org/packages/28/1e/f90fb2b000bad9e6d850cd5afb88b2f1e2a279cfb4de02ea40078484690e/maturin-1.13.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.musllinux_1_1_ppc64le.whl", hash = "sha256:c00ea6428dea17bf616fe93770837634454b28c2de1a876e42ef8036c616079a", size = 13301712, upload-time = "2026-05-11T07:43:26.492Z" }, + { url = "https://files.pythonhosted.org/packages/be/58/1670f68a8f04ccd7b90df11047bd9a046585310e84e1967cc9849cd1c5a3/maturin-1.13.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:49fd6ab08da28098ccf37afca24cdba72376ba9c1eedf9dd25ff82ed771961ff", size = 10946765, upload-time = "2026-05-11T07:43:16.135Z" }, + { url = "https://files.pythonhosted.org/packages/4b/ac/00c955c2ef134817b1a7bdaa76b0309e9c5291eb17d9ff88069eecd08bc2/maturin-1.13.3-py3-none-manylinux_2_31_riscv64.musllinux_1_1_riscv64.whl", hash = "sha256:b6741d7bf4af97da937528fd1e523c6ab54f53d9a21870fa735d6e67fd88e273", size = 10388661, upload-time = "2026-05-11T07:43:18.727Z" }, + { url = "https://files.pythonhosted.org/packages/97/c6/cbf8a51dde19c19aeba0d9b075095a2effb9b31fd312b1aae3ac79f8aea2/maturin-1.13.3-py3-none-win32.whl", hash = "sha256:0ef257e692cc756c87af5bea95ddfe7d3ac49d3376a7a87f728d63f06e7b6f8b", size = 8901838, upload-time = "2026-05-11T07:43:23.76Z" }, + { url = "https://files.pythonhosted.org/packages/a1/ff/c6a50a59dc8313097d43ac5f4d74df6a500c8cb62b0dc9e054f53e203a48/maturin-1.13.3-py3-none-win_amd64.whl", hash = "sha256:def4a435ea9d2ee93b18ba579dc8c9cf898889a66f312cd379b5e374ec3e3ad6", size = 10340801, upload-time = "2026-05-11T07:43:29.239Z" }, + { url = "https://files.pythonhosted.org/packages/6c/93/e32e79333f0902ba292b996f504f5f06be59587f7d02ab8d5ed1e3066445/maturin-1.13.3-py3-none-win_arm64.whl", hash = "sha256:2389fe92d017cea9d94e521fa0175314a4c52f79a1057b901fbc9f8686ef7d0b", size = 9706562, upload-time = "2026-05-11T07:43:31.743Z" }, ] [[package]] @@ -1238,6 +1241,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/67/17/3493c5624e48fd97156ebaec380dcaafee9506d7e2c46218ceebbb57d7de/pytest_asyncio-0.25.3-py3-none-any.whl", hash = "sha256:9e89518e0f9bd08928f97a3482fdc4e244df17529460bc038291ccaf8f85c7c3", size = 19467, upload-time = "2025-01-28T18:37:56.798Z" }, ] +[[package]] +name = "pytest-timeout" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ac/82/4c9ecabab13363e72d880f2fb504c5f750433b2b6f16e99f4ec21ada284c/pytest_timeout-2.4.0.tar.gz", hash = "sha256:7e68e90b01f9eff71332b25001f85c75495fc4e3a836701876183c4bcfd0540a", size = 17973, upload-time = "2025-05-05T19:44:34.99Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382, upload-time = "2025-05-05T19:44:33.502Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0"