Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ env:
jobs:
test-matrix:
runs-on: ubuntu-latest
# Backstop: a hung multiprocessing worker (e.g. during a pickle regression)
# should not block CI longer than this.
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
Expand Down
155 changes: 135 additions & 20 deletions crates/core/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,39 @@ fn strip_wire_header<'a>(
#[derive(Debug)]
pub struct PythonLogicalCodec {
inner: Arc<dyn LogicalExtensionCodec>,
python_udf_inlining: bool,
}

impl PythonLogicalCodec {
pub fn new(inner: Arc<dyn LogicalExtensionCodec>) -> Self {
Self { inner }
Self {
inner,
python_udf_inlining: true,
}
}

pub fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
&self.inner
}

/// Toggle inline encoding of Python UDFs. See
/// `SessionContext.with_python_udf_inlining` (Python) for full
/// behavior and use cases.
///
/// Security scope: strict mode (`false`) narrows only the codec
/// layer — it stops `Expr::from_bytes` from invoking
/// `cloudpickle.loads` on the inline `DFPY*` payload. It does
/// **not** make `pickle.loads(untrusted_bytes)` safe; treat every
/// `pickle.loads` on untrusted input as unsafe regardless of this
/// setting.
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
self.python_udf_inlining = enabled;
self
}

pub fn python_udf_inlining(&self) -> bool {
self.python_udf_inlining
}
}

impl Default for PythonLogicalCodec {
Expand Down Expand Up @@ -301,48 +324,102 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
}

fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_scalar_udf(node, buf)? {
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udf(node, buf)
}

fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
return Ok(udf);
if self.python_udf_inlining {
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
return Ok(udf);
}
} else {
refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
}
self.inner.try_decode_udf(name, buf)
}

fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_udaf(node, buf)? {
if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udaf(node, buf)
}

fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
if let Some(udaf) = try_decode_python_udaf(buf)? {
return Ok(udaf);
if self.python_udf_inlining {
if let Some(udaf) = try_decode_python_udaf(buf)? {
return Ok(udaf);
}
} else {
refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
}
self.inner.try_decode_udaf(name, buf)
}

fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_udwf(node, buf)? {
if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udwf(node, buf)
}

fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
if let Some(udwf) = try_decode_python_udwf(buf)? {
return Ok(udwf);
if self.python_udf_inlining {
if let Some(udwf) = try_decode_python_udwf(buf)? {
return Ok(udwf);
}
} else {
refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
}
self.inner.try_decode_udwf(name, buf)
}
}

/// Strict-mode gate: if `buf` is a well-framed inline payload for
/// `family`, return the strict-refusal error; otherwise return
/// `Ok(())` so the caller can delegate to its `inner` codec.
///
/// Routing through [`read_framed_payload`] (rather than a bare
/// `starts_with` probe) means malformed inline bytes — wrong
/// wire-format version, mismatched Python version, truncated header —
/// surface *their* diagnostic instead of the strict-mode message.
/// The strict message implies sender intent ("inlining is disabled"),
/// so it should fire only when the bytes really would have decoded.
///
/// Fast path: short-circuit on the family-magic prefix before
/// acquiring the GIL. Plans with many non-Python UDFs would otherwise
/// pay a GIL acquisition per decode call just to confirm "not a
/// Python UDF". `read_framed_payload` itself rejects buffers that
/// don't start with `family`, so this is purely an optimization.
fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> {
if !buf.starts_with(family) {
return Ok(());
}
Python::attach(|py| match read_framed_payload(py, buf, family, kind)? {
Some(_) => Err(refuse_inline_payload(kind, name)),
None => Ok(()),
})
}

/// Build the error returned by a strict codec when it receives an
/// inline Python-UDF payload it has been told not to deserialize.
fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError {
// `Execution`, not `Plan`: this is a wire-format decode refusal at
// codec time, not a planner-stage failure. Downstream error
// classification keys off the variant — surfacing this as a planner
// error would mis-route it into "fix your SQL" buckets.
datafusion::error::DataFusionError::Execution(format!(
"Refusing to deserialize inline Python {kind} '{name}': Python UDF \
inlining is disabled on this session. Ask the sender to re-encode \
with inlining disabled (so the UDF travels by name), or register \
'{name}' on this receiver's session and enable inlining on both \
sides — receivers cannot re-encode bytes they did not produce."
Comment on lines +417 to +419
))
}

/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked
/// on the same `SessionContext`. Carries the Python-aware encoding
/// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`)
Expand All @@ -358,16 +435,33 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
#[derive(Debug)]
pub struct PythonPhysicalCodec {
inner: Arc<dyn PhysicalExtensionCodec>,
python_udf_inlining: bool,
}

impl PythonPhysicalCodec {
pub fn new(inner: Arc<dyn PhysicalExtensionCodec>) -> Self {
Self { inner }
Self {
inner,
python_udf_inlining: true,
}
}

pub fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
&self.inner
}

/// Toggle inline encoding of Python UDFs on this physical codec.
///
/// Mirrors [`PythonLogicalCodec::with_python_udf_inlining`]; see
/// that method for the full security and portability discussion.
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
self.python_udf_inlining = enabled;
self
}

pub fn python_udf_inlining(&self) -> bool {
self.python_udf_inlining
}
}

impl Default for PythonPhysicalCodec {
Expand All @@ -391,15 +485,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
}

fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_scalar_udf(node, buf)? {
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udf(node, buf)
}

fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
return Ok(udf);
if self.python_udf_inlining {
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
return Ok(udf);
}
} else {
refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
}
self.inner.try_decode_udf(name, buf)
}
Expand All @@ -417,29 +515,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
}

fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_udaf(node, buf)? {
if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udaf(node, buf)
}

fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
if let Some(udaf) = try_decode_python_udaf(buf)? {
return Ok(udaf);
if self.python_udf_inlining {
if let Some(udaf) = try_decode_python_udaf(buf)? {
return Ok(udaf);
}
} else {
refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
}
self.inner.try_decode_udaf(name, buf)
}

fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_udwf(node, buf)? {
if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udwf(node, buf)
}

fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
if let Some(udwf) = try_decode_python_udwf(buf)? {
return Ok(udwf);
if self.python_udf_inlining {
if let Some(udwf) = try_decode_python_udwf(buf)? {
return Ok(udwf);
}
} else {
refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
}
self.inner.try_decode_udwf(name, buf)
}
Expand Down Expand Up @@ -476,6 +582,9 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>)
/// the caller to delegate to its `inner` codec (and eventually the
/// `FunctionRegistry`).
pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<ScalarUDF>>> {
if !buf.starts_with(PY_SCALAR_UDF_FAMILY) {
return Ok(None);
}
Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> {
let Some(payload) = read_framed_payload(py, buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")?
else {
Expand Down Expand Up @@ -732,6 +841,9 @@ pub(crate) fn try_encode_python_udwf(node: &WindowUDF, buf: &mut Vec<u8>) -> Res
}

pub(crate) fn try_decode_python_udwf(buf: &[u8]) -> Result<Option<Arc<WindowUDF>>> {
if !buf.starts_with(PY_WINDOW_UDF_FAMILY) {
return Ok(None);
}
Python::attach(|py| -> Result<Option<Arc<WindowUDF>>> {
let Some(payload) = read_framed_payload(py, buf, PY_WINDOW_UDF_FAMILY, "window UDF")?
else {
Expand Down Expand Up @@ -814,6 +926,9 @@ pub(crate) fn try_encode_python_udaf(node: &AggregateUDF, buf: &mut Vec<u8>) ->
}

pub(crate) fn try_decode_python_udaf(buf: &[u8]) -> Result<Option<Arc<AggregateUDF>>> {
if !buf.starts_with(PY_AGG_UDF_FAMILY) {
return Ok(None);
}
Python::attach(|py| -> Result<Option<Arc<AggregateUDF>>> {
let Some(payload) = read_framed_payload(py, buf, PY_AGG_UDF_FAMILY, "aggregate UDF")?
else {
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,22 @@ impl PySessionContext {
physical_codec,
})
}

pub fn with_python_udf_inlining(&self, enabled: bool) -> Self {
let logical_codec = Arc::new(
PythonLogicalCodec::new(Arc::clone(self.logical_codec.inner()))
.with_python_udf_inlining(enabled),
);
let physical_codec = Arc::new(
PythonPhysicalCodec::new(Arc::clone(self.physical_codec.inner()))
.with_python_udf_inlining(enabled),
);
Self {
ctx: Arc::clone(&self.ctx),
logical_codec,
physical_codec,
}
}
}

impl PySessionContext {
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ dev = [
"pyarrow>=19.0.0",
"pygithub==2.5.0",
"pytest-asyncio>=0.23.3",
"pytest-timeout>=2.3.1",
"pytest>=7.4.4",
"pyyaml>=6.0.3",
"ruff>=0.15.1",
Expand Down
45 changes: 45 additions & 0 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1769,3 +1769,48 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext:
new = SessionContext.__new__(SessionContext)
new.ctx = new_internal
return new

def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
"""Control whether Python UDFs are embedded in serialized expressions.

When ``enabled=True`` (the default), serialized expressions carry
the Python code for any scalar, aggregate, or window UDFs they
reference. The receiver rebuilds the UDFs from those bytes and
does not need to register them first.

When ``enabled=False``, serialized expressions store only the
UDF names. This has two uses:

* **Cross-language portability.** The bytes can be decoded by a
non-Python receiver, which must already have UDFs registered
under matching names.
* **Safer deserialization.** :meth:`Expr.from_bytes` will refuse
to rebuild Python UDFs rather than call ``cloudpickle.loads``
on untrusted input.

The setting affects :meth:`Expr.to_bytes` and
:meth:`Expr.from_bytes` whenever this session is passed as the
``ctx`` argument. :func:`pickle.dumps` and :func:`pickle.loads`
do not pass a context, so to apply the setting through pickle,
register this session with
:func:`datafusion.ipc.set_sender_ctx` on the sender and
:func:`datafusion.ipc.set_worker_ctx` on the receiver.

.. warning:: Security
This setting narrows only :meth:`Expr.from_bytes`. Calling
:func:`pickle.loads` on untrusted bytes remains unsafe
regardless of the toggle.

Returns a new :class:`SessionContext` with the toggle applied;
the original session is unchanged.

Examples:
>>> from datafusion import SessionContext
>>> strict = SessionContext().with_python_udf_inlining(enabled=False)
>>> isinstance(strict, SessionContext)
True
"""
new_internal = self.ctx.with_python_udf_inlining(enabled)
new = SessionContext.__new__(SessionContext)
new.ctx = new_internal
return new
Loading
Loading