From ff844fef31d1fb81875e6e2a9d11ec64da4410c9 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 12 May 2026 14:48:26 +0200 Subject: [PATCH 01/12] feat: mark app root spans in SDK --- langfuse/_client/attributes.py | 1 + langfuse/_client/client.py | 74 ++++--- langfuse/_client/propagation.py | 35 ++++ langfuse/_client/span_processor.py | 203 +++++++++++++++---- tests/unit/test_app_root_detection.py | 247 ++++++++++++++++++++++++ tests/unit/test_propagate_attributes.py | 10 +- 6 files changed, 508 insertions(+), 62 deletions(-) create mode 100644 tests/unit/test_app_root_detection.py diff --git a/langfuse/_client/attributes.py b/langfuse/_client/attributes.py index d34e8e403..4660b50f0 100644 --- a/langfuse/_client/attributes.py +++ b/langfuse/_client/attributes.py @@ -59,6 +59,7 @@ class LangfuseOtelSpanAttributes: # Internal AS_ROOT = "langfuse.internal.as_root" + IS_APP_ROOT = "langfuse.internal.is_app_root" # Experiments EXPERIMENT_ID = "langfuse.experiment.id" diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 5f7c0f288..2f1c8d783 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -27,6 +27,7 @@ import backoff import httpx +from opentelemetry import context as otel_context_api from opentelemetry import trace as otel_trace_api from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SpanExporter @@ -66,7 +67,9 @@ ) from langfuse._client.propagation import ( PropagatedExperimentAttributes, + _detach_context_token_safely, _propagate_attributes, + _set_langfuse_trace_id_in_baggage, ) from langfuse._client.resource_manager import LangfuseResourceManager from langfuse._client.span import ( @@ -1178,39 +1181,54 @@ def _start_as_current_otel_span_with_processed_media( name=name, end_on_exit=end_on_exit if end_on_exit is not None else True, ) as otel_span: + baggage_token = None + + if otel_span.is_recording(): + context_with_app_root_claim = _set_langfuse_trace_id_in_baggage( + trace_id=self._get_otel_trace_id(otel_span), + context=otel_context_api.get_current(), + ) + baggage_token = otel_context_api.attach(context_with_app_root_claim) + span_class = self._get_span_class( as_type or "generation" ) # default was "generation" - common_args = { - "otel_span": otel_span, - "langfuse_client": self, - "environment": self._environment, - "release": self._release, - "input": input, - "output": output, - "metadata": metadata, - "version": version, - "level": level, - "status_message": status_message, - } - if span_class in [ - LangfuseGeneration, - LangfuseEmbedding, - ]: - common_args.update( - { - "completion_start_time": completion_start_time, - "model": model, - "model_parameters": model_parameters, - "usage_details": usage_details, - "cost_details": cost_details, - "prompt": prompt, - } - ) - # For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed + try: + common_args = { + "otel_span": otel_span, + "langfuse_client": self, + "environment": self._environment, + "release": self._release, + "input": input, + "output": output, + "metadata": metadata, + "version": version, + "level": level, + "status_message": status_message, + } + + if span_class in [ + LangfuseGeneration, + LangfuseEmbedding, + ]: + common_args.update( + { + "completion_start_time": completion_start_time, + "model": model, + "model_parameters": model_parameters, + "usage_details": usage_details, + "cost_details": cost_details, + "prompt": prompt, + } + ) + # For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed + + yield span_class(**common_args) # type: ignore[arg-type] - yield span_class(**common_args) # type: ignore[arg-type] + finally: + if baggage_token is not None: + _detach_context_token_safely(baggage_token) def _get_current_otel_span(self) -> Optional[otel_trace_api.Span]: current_span = otel_trace_api.get_current_span() diff --git a/langfuse/_client/propagation.py b/langfuse/_client/propagation.py index 988f1f26e..597d8126e 100644 --- a/langfuse/_client/propagation.py +++ b/langfuse/_client/propagation.py @@ -316,6 +316,9 @@ def _get_propagated_attributes_from_context( # Handle baggage baggage_entries = baggage.get_all(context=context) for baggage_key, baggage_value in baggage_entries.items(): + if baggage_key == LANGFUSE_TRACE_ID_BAGGAGE_KEY: + continue + if baggage_key.startswith(LANGFUSE_BAGGAGE_PREFIX): span_key = _get_span_key_from_baggage_key(baggage_key) @@ -471,12 +474,44 @@ def _get_propagated_context_key(key: str) -> str: LANGFUSE_BAGGAGE_PREFIX = "langfuse_" +LANGFUSE_TRACE_ID_BAGGAGE_KEY = "langfuse_trace_id" def _get_propagated_baggage_key(key: str) -> str: return f"{LANGFUSE_BAGGAGE_PREFIX}{key}" +def _get_langfuse_trace_id_from_baggage( + context: otel_context_api.Context, +) -> Optional[str]: + value = otel_baggage_api.get_baggage( + name=LANGFUSE_TRACE_ID_BAGGAGE_KEY, + context=context, + ) + + if value is None: + return None + + return str(value).lower() + + +def _set_langfuse_trace_id_in_baggage( + *, + trace_id: str, + context: otel_context_api.Context, +) -> otel_context_api.Context: + normalized_trace_id = trace_id.lower() + + if _get_langfuse_trace_id_from_baggage(context) == normalized_trace_id: + return context + + return otel_baggage_api.set_baggage( + name=LANGFUSE_TRACE_ID_BAGGAGE_KEY, + value=normalized_trace_id, + context=context, + ) + + def _get_span_key_from_baggage_key(key: str) -> Optional[str]: if not key.startswith(LANGFUSE_BAGGAGE_PREFIX): return None diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index a684a8813..51199a9a0 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -13,27 +13,45 @@ import base64 import os -from typing import Callable, Dict, List, Optional +import threading +from dataclasses import dataclass +from typing import Callable, Dict, List, Optional, cast from opentelemetry import context as context_api from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter -from opentelemetry.trace import format_span_id +from opentelemetry.trace import format_span_id, format_trace_id +from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.environment_variables import ( LANGFUSE_FLUSH_AT, LANGFUSE_FLUSH_INTERVAL, LANGFUSE_OTEL_TRACES_EXPORT_PATH, ) -from langfuse._client.propagation import _get_propagated_attributes_from_context +from langfuse._client.propagation import ( + _get_langfuse_trace_id_from_baggage, + _get_propagated_attributes_from_context, +) from langfuse._client.span_filter import is_default_export_span, is_langfuse_span from langfuse._client.utils import span_formatter from langfuse._version import __version__ as langfuse_version from langfuse.logger import langfuse_logger +@dataclass +class _AppRootSpanState: + expected_exported_at_start: bool + ended: bool = False + + +@dataclass +class _AppRootTraceState: + active_count: int + spans: Dict[str, _AppRootSpanState] + + class LangfuseSpanProcessor(BatchSpanProcessor): """OpenTelemetry span processor that exports spans to the Langfuse API. @@ -72,6 +90,7 @@ def __init__( else [] ) self._should_export_span = should_export_span or is_default_export_span + self._initialize_app_root_state() env_flush_at = os.environ.get(LANGFUSE_FLUSH_AT, None) flush_at = flush_at or int(env_flush_at) if env_flush_at is not None else None @@ -122,6 +141,10 @@ def __init__( else None, ) + def _initialize_app_root_state(self) -> None: + self._app_root_lock = threading.Lock() + self._app_root_traces: Dict[str, _AppRootTraceState] = {} + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: context = parent_context or context_api.get_current() propagated_attributes = _get_propagated_attributes_from_context(context) @@ -133,51 +156,165 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None f"Propagated {len(propagated_attributes)} attributes to span '{format_span_id(span.context.span_id)}': {propagated_attributes}" ) + try: + self._mark_app_root_candidate(span=span, parent_context=context) + except Exception as error: + langfuse_logger.debug( + "Trace: app-root start-time check failed. Span will not be marked as app root | " + f"span_name='{getattr(span, 'name', '')}' | " + f"Error: {error}" + ) + return super().on_start(span, parent_context) def on_end(self, span: ReadableSpan) -> None: - # Only export spans that belong to the scoped project - # This is important to not send spans to wrong project in multi-project setups - if is_langfuse_span(span) and not self._is_langfuse_project_span(span): + try: + # Only export spans that belong to the scoped project + # This is important to not send spans to wrong project in multi-project setups + if is_langfuse_span(span) and not self._is_langfuse_project_span(span): + langfuse_logger.debug( + f"Security: Span rejected - belongs to project '{span.instrumentation_scope.attributes.get('public_key') if span.instrumentation_scope and span.instrumentation_scope.attributes else None}' but processor is for '{self.public_key}'. " + f"This prevents cross-project data leakage in multi-project environments." + ) + return + + # Do not export spans from blocked instrumentation scopes + if self._is_blocked_instrumentation_scope(span): + langfuse_logger.debug( + "Trace: Dropping span due to blocked instrumentation scope | " + f"span_name='{span.name}' | " + f"instrumentation_scope='{self._get_scope_name(span)}'" + ) + return + + # Apply custom or default span filter + try: + should_export = self._should_export_span(span) + except Exception as error: + langfuse_logger.error( + "Trace: should_export_span callback raised an error. " + f"Dropping span name='{span.name}' scope='{self._get_scope_name(span)}'. " + f"Error: {error}" + ) + return + + if not should_export: + langfuse_logger.debug( + "Trace: Dropping span due to should_export_span filter | " + f"span_name='{span.name}' | " + f"instrumentation_scope='{self._get_scope_name(span)}'" + ) + return + langfuse_logger.debug( - f"Security: Span rejected - belongs to project '{span.instrumentation_scope.attributes.get('public_key') if span.instrumentation_scope and span.instrumentation_scope.attributes else None}' but processor is for '{self.public_key}'. " - f"This prevents cross-project data leakage in multi-project environments." + f"Trace: Processing span name='{span._name}' | Full details:\n{span_formatter(span)}" ) - return - # Do not export spans from blocked instrumentation scopes - if self._is_blocked_instrumentation_scope(span): - langfuse_logger.debug( - "Trace: Dropping span due to blocked instrumentation scope | " - f"span_name='{span.name}' | " - f"instrumentation_scope='{self._get_scope_name(span)}'" + super().on_end(span) + finally: + self._cleanup_app_root_state(span) + + def _mark_app_root_candidate(self, *, span: Span, parent_context: Context) -> None: + self._ensure_app_root_state() + + trace_id = format_trace_id(span.context.trace_id) + span_id = format_span_id(span.context.span_id) + parent_span_id = format_span_id(span.parent.span_id) if span.parent else None + expected_exported = self._is_expected_exported_at_start(span) + propagated_trace_id = _get_langfuse_trace_id_from_baggage(parent_context) + + with self._app_root_lock: + trace_state = self._app_root_traces.get(trace_id) + + if trace_state is None: + trace_state = _AppRootTraceState(active_count=0, spans={}) + self._app_root_traces[trace_id] = trace_state + + parent_state = ( + trace_state.spans.get(parent_span_id) + if parent_span_id is not None + else None + ) + parent_expected_exported = ( + parent_state.expected_exported_at_start is True + if parent_state is not None + else False + ) + suppressed_by_parent_claim = ( + propagated_trace_id == trace_id and parent_state is None + ) + + mark_app_root = ( + expected_exported + and not parent_expected_exported + and not suppressed_by_parent_claim + ) + + trace_state.spans[span_id] = _AppRootSpanState( + expected_exported_at_start=expected_exported, ) - return + trace_state.active_count += 1 + + if mark_app_root: + span.set_attribute(LangfuseOtelSpanAttributes.IS_APP_ROOT, True) + + def _cleanup_app_root_state(self, span: ReadableSpan) -> None: + self._ensure_app_root_state() + + trace_id = format_trace_id(span.context.trace_id) + span_id = format_span_id(span.context.span_id) + + with self._app_root_lock: + trace_state = self._app_root_traces.get(trace_id) + + if trace_state is None: + return + + span_state = trace_state.spans.get(span_id) + + if span_state is not None and not span_state.ended: + span_state.ended = True + trace_state.active_count -= 1 + + if trace_state.active_count <= 0: + self._app_root_traces.pop(trace_id, None) + + def _ensure_app_root_state(self) -> None: + if not hasattr(self, "_app_root_lock"): + self._initialize_app_root_state() + + def _is_expected_exported_at_start(self, span: Span) -> bool: + readable_span = self._get_readable_span(span) + + if is_langfuse_span(readable_span) and not self._is_langfuse_project_span( + readable_span + ): + return False + + if self._is_blocked_instrumentation_scope(readable_span): + return False - # Apply custom or default span filter try: - should_export = self._should_export_span(span) + return bool(self._should_export_span(readable_span)) except Exception as error: - langfuse_logger.error( - "Trace: should_export_span callback raised an error. " - f"Dropping span name='{span.name}' scope='{self._get_scope_name(span)}'. " + langfuse_logger.debug( + "Trace: should_export_span callback raised during app-root " + f"start-time check. Span will not be marked as app root | " + f"span_name='{readable_span.name}' | " + f"instrumentation_scope='{self._get_scope_name(readable_span)}' | " f"Error: {error}" ) - return - if not should_export: - langfuse_logger.debug( - "Trace: Dropping span due to should_export_span filter | " - f"span_name='{span.name}' | " - f"instrumentation_scope='{self._get_scope_name(span)}'" - ) - return + return False - langfuse_logger.debug( - f"Trace: Processing span name='{span._name}' | Full details:\n{span_formatter(span)}" - ) + @staticmethod + def _get_readable_span(span: Span) -> ReadableSpan: + readable_span = getattr(span, "_readable_span", None) + + if not callable(readable_span): + raise TypeError("Span does not expose a readable span snapshot.") - super().on_end(span) + return cast(ReadableSpan, readable_span()) def _is_blocked_instrumentation_scope(self, span: ReadableSpan) -> bool: return ( diff --git a/tests/unit/test_app_root_detection.py b/tests/unit/test_app_root_detection.py new file mode 100644 index 000000000..6447394a5 --- /dev/null +++ b/tests/unit/test_app_root_detection.py @@ -0,0 +1,247 @@ +from opentelemetry import baggage +from opentelemetry import context as otel_context_api +from opentelemetry import trace as trace_api +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags + +from langfuse._client.attributes import LangfuseOtelSpanAttributes +from langfuse._client.constants import LANGFUSE_TRACER_NAME +from langfuse._client.propagation import LANGFUSE_TRACE_ID_BAGGAGE_KEY +from langfuse._client.span_processor import LangfuseSpanProcessor + +PUBLIC_KEY = "test-public-key" +SECRET_KEY = "test-secret-key" + + +def _create_processor(memory_exporter, **kwargs): + tracer_provider = TracerProvider() + processor = LangfuseSpanProcessor( + public_key=PUBLIC_KEY, + secret_key=SECRET_KEY, + base_url="http://test-host", + span_exporter=memory_exporter, + **kwargs, + ) + tracer_provider.add_span_processor(processor) + + return tracer_provider, processor + + +def _get_spans_by_name(memory_exporter): + return {span.name: span for span in memory_exporter.get_finished_spans()} + + +def _langfuse_tracer(tracer_provider): + return tracer_provider.get_tracer( + LANGFUSE_TRACER_NAME, + "test", + attributes={"public_key": PUBLIC_KEY}, + ) + + +def test_filtered_parent_marks_exported_children_as_app_roots(memory_exporter): + tracer_provider, processor = _create_processor(memory_exporter) + filtered_tracer = tracer_provider.get_tracer("requests") + langfuse_tracer = _langfuse_tracer(tracer_provider) + + with filtered_tracer.start_as_current_span("filtered-parent"): + with langfuse_tracer.start_as_current_span("child-a"): + pass + + with langfuse_tracer.start_as_current_span("child-b"): + pass + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert "filtered-parent" not in spans + assert spans["child-a"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert spans["child-b"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert processor._app_root_traces == {} + + +def test_exported_parent_suppresses_exported_child_app_root(memory_exporter): + tracer_provider, processor = _create_processor(memory_exporter) + langfuse_tracer = _langfuse_tracer(tracer_provider) + + with langfuse_tracer.start_as_current_span("parent"): + with langfuse_tracer.start_as_current_span("child"): + pass + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert spans["parent"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert LangfuseOtelSpanAttributes.IS_APP_ROOT not in spans["child"].attributes + assert processor._app_root_traces == {} + + +def test_filtered_direct_parent_marks_child_even_when_grandparent_exports( + memory_exporter, +): + tracer_provider, processor = _create_processor(memory_exporter) + filtered_tracer = tracer_provider.get_tracer("requests") + langfuse_tracer = _langfuse_tracer(tracer_provider) + + with langfuse_tracer.start_as_current_span("grandparent"): + with filtered_tracer.start_as_current_span("filtered-parent"): + with langfuse_tracer.start_as_current_span("child"): + pass + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert "filtered-parent" not in spans + assert ( + spans["grandparent"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + ) + assert spans["child"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert processor._app_root_traces == {} + + +def test_same_trace_baggage_claim_suppresses_local_app_root(memory_exporter): + tracer_provider, processor = _create_processor(memory_exporter) + langfuse_tracer = _langfuse_tracer(tracer_provider) + trace_id = int("1" * 32, 16) + parent_context = _remote_parent_context(trace_id=trace_id) + parent_context = baggage.set_baggage( + name=LANGFUSE_TRACE_ID_BAGGAGE_KEY, + value=format(trace_id, "032x"), + context=parent_context, + ) + + span = langfuse_tracer.start_span("downstream-root", context=parent_context) + span.end() + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert ( + LangfuseOtelSpanAttributes.IS_APP_ROOT + not in spans["downstream-root"].attributes + ) + assert processor._app_root_traces == {} + + +def test_different_trace_baggage_claim_does_not_suppress_local_app_root( + memory_exporter, +): + tracer_provider, processor = _create_processor(memory_exporter) + langfuse_tracer = _langfuse_tracer(tracer_provider) + trace_id = int("1" * 32, 16) + parent_context = _remote_parent_context(trace_id=trace_id) + parent_context = baggage.set_baggage( + name=LANGFUSE_TRACE_ID_BAGGAGE_KEY, + value="2" * 32, + context=parent_context, + ) + + span = langfuse_tracer.start_span("downstream-root", context=parent_context) + span.end() + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert ( + spans["downstream-root"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] + is True + ) + assert processor._app_root_traces == {} + + +def test_local_baggage_claim_does_not_suppress_child_of_filtered_parent( + memory_exporter, +): + def should_export_span(span: ReadableSpan) -> bool: + return span.name != "parent" + + tracer_provider, processor = _create_processor( + memory_exporter, + should_export_span=should_export_span, + ) + langfuse_tracer = _langfuse_tracer(tracer_provider) + + with langfuse_tracer.start_as_current_span("parent") as parent: + context_with_claim = baggage.set_baggage( + name=LANGFUSE_TRACE_ID_BAGGAGE_KEY, + value=format(parent.context.trace_id, "032x"), + context=otel_context_api.get_current(), + ) + token = otel_context_api.attach(context_with_claim) + + try: + with langfuse_tracer.start_as_current_span("child"): + pass + finally: + otel_context_api.detach(token) + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert "parent" not in spans + assert spans["child"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert processor._app_root_traces == {} + + +def test_start_time_false_positive_can_leave_exported_child_without_app_root( + memory_exporter, +): + def should_export_span(span: ReadableSpan) -> bool: + if span.name == "parent": + return span.end_time is None + + return True + + tracer_provider, processor = _create_processor( + memory_exporter, + should_export_span=should_export_span, + ) + langfuse_tracer = _langfuse_tracer(tracer_provider) + + with langfuse_tracer.start_as_current_span("parent"): + with langfuse_tracer.start_as_current_span("child"): + pass + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert "parent" not in spans + assert LangfuseOtelSpanAttributes.IS_APP_ROOT not in spans["child"].attributes + assert processor._app_root_traces == {} + + +def test_active_langfuse_scope_sets_baggage_after_root_start( + langfuse_memory_client, + memory_exporter, +): + with langfuse_memory_client.start_as_current_observation(name="root") as root: + baggage_entries = baggage.get_all(context=otel_context_api.get_current()) + + assert baggage_entries[LANGFUSE_TRACE_ID_BAGGAGE_KEY] == root.trace_id + + with langfuse_memory_client.start_as_current_observation(name="child"): + pass + + langfuse_memory_client.flush() + + spans = _get_spans_by_name(memory_exporter) + + assert spans["root"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert LangfuseOtelSpanAttributes.IS_APP_ROOT not in spans["child"].attributes + assert "langfuse.trace.metadata.trace_id" not in spans["child"].attributes + + +def _remote_parent_context(*, trace_id: int): + span_context = SpanContext( + trace_id=trace_id, + span_id=int("a" * 16, 16), + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + + return trace_api.set_span_in_context(NonRecordingSpan(span_context)) diff --git a/tests/unit/test_propagate_attributes.py b/tests/unit/test_propagate_attributes.py index e1085b879..c783e65dd 100644 --- a/tests/unit/test_propagate_attributes.py +++ b/tests/unit/test_propagate_attributes.py @@ -1638,6 +1638,8 @@ def test_baggage_disabled_by_default(self, langfuse_client): from opentelemetry import baggage from opentelemetry import context as otel_context + from langfuse._client.propagation import LANGFUSE_TRACE_ID_BAGGAGE_KEY + with langfuse_client.start_as_current_observation(name="parent"): with propagate_attributes( user_id="user_123", @@ -1646,7 +1648,13 @@ def test_baggage_disabled_by_default(self, langfuse_client): # Get current context and inspect baggage current_context = otel_context.get_current() baggage_entries = baggage.get_all(context=current_context) - assert len(baggage_entries) == 0 + user_baggage_entries = { + key: value + for key, value in baggage_entries.items() + if key != LANGFUSE_TRACE_ID_BAGGAGE_KEY + } + + assert user_baggage_entries == {} def test_metadata_key_with_user_id_substring_doesnt_collide( self, langfuse_client, memory_exporter From ba553a13dbb287f5f75d8394d94b765689a5bf61 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 12 May 2026 15:18:25 +0200 Subject: [PATCH 02/12] fix: avoid private otel span API --- langfuse/_client/span_processor.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index 51199a9a0..e09bbc1de 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -284,7 +284,7 @@ def _ensure_app_root_state(self) -> None: self._initialize_app_root_state() def _is_expected_exported_at_start(self, span: Span) -> bool: - readable_span = self._get_readable_span(span) + readable_span = cast(ReadableSpan, span) if is_langfuse_span(readable_span) and not self._is_langfuse_project_span( readable_span @@ -307,15 +307,6 @@ def _is_expected_exported_at_start(self, span: Span) -> bool: return False - @staticmethod - def _get_readable_span(span: Span) -> ReadableSpan: - readable_span = getattr(span, "_readable_span", None) - - if not callable(readable_span): - raise TypeError("Span does not expose a readable span snapshot.") - - return cast(ReadableSpan, readable_span()) - def _is_blocked_instrumentation_scope(self, span: ReadableSpan) -> bool: return ( span.instrumentation_scope is not None From d972db4da9a6cc187db90bf874cb048a1c0c5a1e Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Wed, 13 May 2026 14:42:04 +0200 Subject: [PATCH 03/12] chore: add CODEOWNERS for GitHub config (#1652) chore: add codeowners for github config Co-authored-by: Codex Opus 4.6 (1M context) --- .github/CODEOWNERS | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b3e7ece6e..c7993b0b4 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,2 +1,5 @@ # Currently inactive # * @langfuse/maintainers + +# Require maintainer review for GitHub configuration changes +.github/ @langfuse/maintainers From 609669a5c02fc48c9e2d7c6de7fcd1f827b9ed17 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 15 May 2026 09:28:28 +0200 Subject: [PATCH 04/12] chore(deps): bump the github-actions group across 1 directory with 5 updates (#1645) * chore(deps): bump the github-actions group across 1 directory with 5 updates Bumps the github-actions group with 5 updates in the / directory: | Package | From | To | | --- | --- | --- | | [astral-sh/setup-uv](https://github.com/astral-sh/setup-uv) | `8.0.0` | `8.1.0` | | [actions/cache](https://github.com/actions/cache) | `5.0.4` | `5.0.5` | | [github/codeql-action](https://github.com/github/codeql-action) | `4.35.1` | `4.35.4` | | [dependabot/fetch-metadata](https://github.com/dependabot/fetch-metadata) | `3.0.0` | `3.1.0` | | [slackapi/slack-github-action](https://github.com/slackapi/slack-github-action) | `3.0.1` | `3.0.3` | Updates `astral-sh/setup-uv` from 8.0.0 to 8.1.0 - [Release notes](https://github.com/astral-sh/setup-uv/releases) - [Commits](https://github.com/astral-sh/setup-uv/compare/cec208311dfd045dd5311c1add060b2062131d57...08807647e7069bb48b6ef5acd8ec9567f424441b) Updates `actions/cache` from 5.0.4 to 5.0.5 - [Release notes](https://github.com/actions/cache/releases) - [Changelog](https://github.com/actions/cache/blob/main/RELEASES.md) - [Commits](https://github.com/actions/cache/compare/668228422ae6a00e4ad889ee87cd7109ec5666a7...27d5ce7f107fe9357f9df03efb73ab90386fccae) Updates `github/codeql-action` from 4.35.1 to 4.35.4 - [Release notes](https://github.com/github/codeql-action/releases) - [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/github/codeql-action/compare/c10b8064de6f491fea524254123dbe5e09572f13...68bde559dea0fdcac2102bfdf6230c5f70eb485e) Updates `dependabot/fetch-metadata` from 3.0.0 to 3.1.0 - [Release notes](https://github.com/dependabot/fetch-metadata/releases) - [Commits](https://github.com/dependabot/fetch-metadata/compare/ffa630c65fa7e0ecfa0625b5ceda64399aea1b36...25dd0e34f4fe68f24cc83900b1fe3fe149efef98) Updates `slackapi/slack-github-action` from 3.0.1 to 3.0.3 - [Release notes](https://github.com/slackapi/slack-github-action/releases) - [Changelog](https://github.com/slackapi/slack-github-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/slackapi/slack-github-action/compare/af78098f536edbc4de71162a307590698245be95...45a88b9581bfab2566dc881e2cd66d334e621e2c) --- updated-dependencies: - dependency-name: actions/cache dependency-version: 5.0.5 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: github-actions - dependency-name: astral-sh/setup-uv dependency-version: 8.1.0 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-actions - dependency-name: dependabot/fetch-metadata dependency-version: 3.1.0 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: github-actions - dependency-name: github/codeql-action dependency-version: 4.35.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: github-actions - dependency-name: slackapi/slack-github-action dependency-version: 3.0.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: github-actions ... Signed-off-by: dependabot[bot] * ci: remove stale cache action version comment --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Tobias Wochinger --- .github/workflows/ci.yml | 10 +++++----- .github/workflows/codeql.yml | 4 ++-- .github/workflows/dependabot-merge.yml | 2 +- .github/workflows/release.yml | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9b820059c..c4fca874d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,7 +24,7 @@ jobs: with: persist-credentials: false - name: Install uv and set Python version - uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0 + uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 with: version: "0.11.2" python-version: "3.13" @@ -41,12 +41,12 @@ jobs: with: persist-credentials: false - name: Install uv and set Python version - uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0 + uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 with: version: "0.11.2" python-version: "3.13" enable-cache: true # zizmor: ignore[cache-poisoning] CI-only, no artifacts published - - uses: actions/cache@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4 # zizmor: ignore[cache-poisoning] + - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # zizmor: ignore[cache-poisoning] name: Cache mypy cache with: path: ./.mypy_cache @@ -82,7 +82,7 @@ jobs: with: persist-credentials: false - name: Install uv and set Python version - uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0 + uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 with: version: "0.11.2" python-version: ${{ matrix.python-version }} @@ -145,7 +145,7 @@ jobs: with: persist-credentials: false - name: Install uv and set Python version - uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0 + uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 with: version: "0.11.2" python-version: "3.13" diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 290bfee87..7081242de 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -61,7 +61,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1 + uses: github/codeql-action/init@68bde559dea0fdcac2102bfdf6230c5f70eb485e # v4.35.4 with: languages: ${{ matrix.language }} build-mode: ${{ matrix.build-mode }} @@ -89,6 +89,6 @@ jobs: exit 1 - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1 + uses: github/codeql-action/analyze@68bde559dea0fdcac2102bfdf6230c5f70eb485e # v4.35.4 with: category: "/language:${{matrix.language}}" diff --git a/.github/workflows/dependabot-merge.yml b/.github/workflows/dependabot-merge.yml index 8eddf89f8..8e5040e77 100644 --- a/.github/workflows/dependabot-merge.yml +++ b/.github/workflows/dependabot-merge.yml @@ -15,7 +15,7 @@ jobs: steps: - name: Dependabot metadata id: metadata - uses: dependabot/fetch-metadata@ffa630c65fa7e0ecfa0625b5ceda64399aea1b36 # v3.0.0 + uses: dependabot/fetch-metadata@25dd0e34f4fe68f24cc83900b1fe3fe149efef98 # v3.1.0 with: github-token: "${{ secrets.GITHUB_TOKEN }}" - name: Enable auto-merge for Dependabot PRs diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 80dd2d4d4..3e764bd94 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -97,7 +97,7 @@ jobs: persist-credentials: false - name: Install uv and set Python version - uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0 + uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 with: version: "0.11.2" python-version: "3.12" @@ -347,7 +347,7 @@ jobs: - name: Notify Slack on success if: success() - uses: slackapi/slack-github-action@af78098f536edbc4de71162a307590698245be95 # v3.0.1 + uses: slackapi/slack-github-action@45a88b9581bfab2566dc881e2cd66d334e621e2c # v3.0.3 with: webhook: ${{ secrets.SLACK_WEBHOOK_RELEASES }} webhook-type: incoming-webhook @@ -431,7 +431,7 @@ jobs: - name: Notify Slack on failure if: failure() - uses: slackapi/slack-github-action@af78098f536edbc4de71162a307590698245be95 # v3.0.1 + uses: slackapi/slack-github-action@45a88b9581bfab2566dc881e2cd66d334e621e2c # v3.0.3 with: webhook: ${{ secrets.SLACK_WEBHOOK_ENGINEERING }} webhook-type: incoming-webhook From 767c5eeebed7cd49480d54e5ce1d8f737d17df83 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 18 May 2026 14:50:38 +0200 Subject: [PATCH 05/12] chore(deps): bump langsmith from 0.7.22 to 0.8.0 (#1653) Bumps [langsmith](https://github.com/langchain-ai/langsmith-sdk) from 0.7.22 to 0.8.0. - [Release notes](https://github.com/langchain-ai/langsmith-sdk/releases) - [Commits](https://github.com/langchain-ai/langsmith-sdk/compare/v0.7.22...v0.8.0) --- updated-dependencies: - dependency-name: langsmith dependency-version: 0.8.0 dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- uv.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/uv.lock b/uv.lock index 29f567c44..00695f204 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <4.0" [options] -exclude-newer = "2026-05-01T14:08:03.903098393Z" +exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. exclude-newer-span = "P7D" [[package]] @@ -679,7 +679,7 @@ wheels = [ [[package]] name = "langsmith" -version = "0.7.22" +version = "0.8.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "httpx" }, @@ -692,9 +692,9 @@ dependencies = [ { name = "xxhash" }, { name = "zstandard" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/be/2a/2d5e6c67396fd228670af278c4da7bd6db2b8d11deaf6f108490b6d3f561/langsmith-0.7.22.tar.gz", hash = "sha256:35bfe795d648b069958280760564632fd28ebc9921c04f3e209c0db6a6c7dc04", size = 1134923, upload-time = "2026-03-19T22:45:23.492Z" } +sdist = { url = "https://files.pythonhosted.org/packages/a8/64/95f1f013531395f4e8ed73caeee780f65c7c58fe028cb543f8937b45611b/langsmith-0.8.0.tar.gz", hash = "sha256:59fe5b2a56bbbe14a08aa76691f84b49e8675dd21e11b57d80c6db8c08bac2e3", size = 4432996, upload-time = "2026-04-30T22:13:07.341Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/1a/94/1f5d72655ab6534129540843776c40eff757387b88e798d8b3bf7e313fd4/langsmith-0.7.22-py3-none-any.whl", hash = "sha256:6e9d5148314d74e86748cb9d3898632cad0320c9323d95f70f969e5bc078eee4", size = 359927, upload-time = "2026-03-19T22:45:21.603Z" }, + { url = "https://files.pythonhosted.org/packages/f3/e1/a4be2e696c9473bb53298df398237da5674704d781d4b748ed35aeef592a/langsmith-0.8.0-py3-none-any.whl", hash = "sha256:12cc4bc5622b835a6d841964d6034df3617bdb912dae0c1381fd0a68a9b3a3ef", size = 393268, upload-time = "2026-04-30T22:13:05.56Z" }, ] [[package]] From 8280330cd23d5817bed5754ce2cd4c3d2b3dfb19 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 19 May 2026 14:26:46 +0200 Subject: [PATCH 06/12] clean up processor --- langfuse/_client/span_processor.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index e09bbc1de..a687cf384 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -90,7 +90,9 @@ def __init__( else [] ) self._should_export_span = should_export_span or is_default_export_span - self._initialize_app_root_state() + + self._app_root_lock = threading.Lock() + self._app_root_traces: Dict[str, _AppRootTraceState] = {} env_flush_at = os.environ.get(LANGFUSE_FLUSH_AT, None) flush_at = flush_at or int(env_flush_at) if env_flush_at is not None else None @@ -141,10 +143,6 @@ def __init__( else None, ) - def _initialize_app_root_state(self) -> None: - self._app_root_lock = threading.Lock() - self._app_root_traces: Dict[str, _AppRootTraceState] = {} - def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: context = parent_context or context_api.get_current() propagated_attributes = _get_propagated_attributes_from_context(context) @@ -215,8 +213,6 @@ def on_end(self, span: ReadableSpan) -> None: self._cleanup_app_root_state(span) def _mark_app_root_candidate(self, *, span: Span, parent_context: Context) -> None: - self._ensure_app_root_state() - trace_id = format_trace_id(span.context.trace_id) span_id = format_span_id(span.context.span_id) parent_span_id = format_span_id(span.parent.span_id) if span.parent else None @@ -259,8 +255,6 @@ def _mark_app_root_candidate(self, *, span: Span, parent_context: Context) -> No span.set_attribute(LangfuseOtelSpanAttributes.IS_APP_ROOT, True) def _cleanup_app_root_state(self, span: ReadableSpan) -> None: - self._ensure_app_root_state() - trace_id = format_trace_id(span.context.trace_id) span_id = format_span_id(span.context.span_id) @@ -279,10 +273,6 @@ def _cleanup_app_root_state(self, span: ReadableSpan) -> None: if trace_state.active_count <= 0: self._app_root_traces.pop(trace_id, None) - def _ensure_app_root_state(self) -> None: - if not hasattr(self, "_app_root_lock"): - self._initialize_app_root_state() - def _is_expected_exported_at_start(self, span: Span) -> bool: readable_span = cast(ReadableSpan, span) From ffa09347b27c48b3322d7a43c54bb25c53877859 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 19 May 2026 14:42:34 +0200 Subject: [PATCH 07/12] push --- tests/conftest.py | 4 + tests/unit/test_app_root_detection.py | 228 +++++++++++++++++++++++++- 2 files changed, 231 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 1aea59889..ff97ddbd6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -95,6 +95,8 @@ def langfuse_memory_client( tracer_provider = TracerProvider(resource=Resource.create({"service.name": "test"})) def mock_init(self: Any, **kwargs: Any) -> None: + import threading + from opentelemetry.sdk.trace.export import BatchSpanProcessor from langfuse._client.span_filter import is_default_export_span @@ -107,6 +109,8 @@ def mock_init(self: Any, **kwargs: Any) -> None: self._should_export_span = ( kwargs.get("should_export_span") or is_default_export_span ) + self._app_root_lock = threading.Lock() + self._app_root_traces = {} BatchSpanProcessor.__init__( self, span_exporter=memory_exporter, diff --git a/tests/unit/test_app_root_detection.py b/tests/unit/test_app_root_detection.py index 6447394a5..8bf76d491 100644 --- a/tests/unit/test_app_root_detection.py +++ b/tests/unit/test_app_root_detection.py @@ -1,3 +1,6 @@ +import threading +from concurrent.futures import ThreadPoolExecutor + from opentelemetry import baggage from opentelemetry import context as otel_context_api from opentelemetry import trace as trace_api @@ -6,7 +9,11 @@ from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.constants import LANGFUSE_TRACER_NAME -from langfuse._client.propagation import LANGFUSE_TRACE_ID_BAGGAGE_KEY +from langfuse._client.propagation import ( + LANGFUSE_TRACE_ID_BAGGAGE_KEY, + _get_langfuse_trace_id_from_baggage, + _set_langfuse_trace_id_in_baggage, +) from langfuse._client.span_processor import LangfuseSpanProcessor PUBLIC_KEY = "test-public-key" @@ -236,6 +243,225 @@ def test_active_langfuse_scope_sets_baggage_after_root_start( assert "langfuse.trace.metadata.trace_id" not in spans["child"].attributes +def test_blocked_instrumentation_scope_parent_marks_child_as_app_root( + memory_exporter, +): + tracer_provider, processor = _create_processor( + memory_exporter, + blocked_instrumentation_scopes=["blocked.scope"], + ) + blocked_tracer = tracer_provider.get_tracer("blocked.scope") + langfuse_tracer = _langfuse_tracer(tracer_provider) + + with blocked_tracer.start_as_current_span("blocked-parent"): + with langfuse_tracer.start_as_current_span("child"): + pass + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert "blocked-parent" not in spans + assert spans["child"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert processor._app_root_traces == {} + + +def test_foreign_project_langfuse_parent_marks_child_as_app_root(memory_exporter): + tracer_provider, processor = _create_processor(memory_exporter) + foreign_tracer = tracer_provider.get_tracer( + LANGFUSE_TRACER_NAME, + "test", + attributes={"public_key": "different-public-key"}, + ) + langfuse_tracer = _langfuse_tracer(tracer_provider) + + with foreign_tracer.start_as_current_span("foreign-parent"): + with langfuse_tracer.start_as_current_span("child"): + pass + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert "foreign-parent" not in spans + assert spans["child"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert processor._app_root_traces == {} + + +def test_should_export_span_raising_does_not_mark_app_root(memory_exporter): + def should_export_span(span: ReadableSpan) -> bool: + raise RuntimeError("boom") + + tracer_provider, processor = _create_processor( + memory_exporter, + should_export_span=should_export_span, + ) + langfuse_tracer = _langfuse_tracer(tracer_provider) + + with langfuse_tracer.start_as_current_span("root"): + pass + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert "root" not in spans + assert processor._app_root_traces == {} + + +def test_mark_app_root_candidate_exception_is_swallowed(memory_exporter, monkeypatch): + tracer_provider, processor = _create_processor(memory_exporter) + langfuse_tracer = _langfuse_tracer(tracer_provider) + + def raise_boom(*args, **kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr(processor, "_mark_app_root_candidate", raise_boom) + + with langfuse_tracer.start_as_current_span("root"): + pass + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert "root" in spans + assert LangfuseOtelSpanAttributes.IS_APP_ROOT not in spans["root"].attributes + + +def test_concurrent_traces_keep_state_consistent(memory_exporter): + tracer_provider, processor = _create_processor(memory_exporter) + langfuse_tracer = _langfuse_tracer(tracer_provider) + + thread_count = 16 + spans_per_thread = 25 + barrier = threading.Barrier(thread_count) + + def worker(worker_id: int) -> None: + barrier.wait() + for span_index in range(spans_per_thread): + with langfuse_tracer.start_as_current_span( + f"root-{worker_id}-{span_index}" + ): + with langfuse_tracer.start_as_current_span( + f"child-{worker_id}-{span_index}" + ): + pass + + with ThreadPoolExecutor(max_workers=thread_count) as pool: + list(pool.map(worker, range(thread_count))) + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + expected_total = thread_count * spans_per_thread + root_spans = [span for name, span in spans.items() if name.startswith("root-")] + child_spans = [span for name, span in spans.items() if name.startswith("child-")] + + assert len(root_spans) == expected_total + assert len(child_spans) == expected_total + assert all( + span.attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + for span in root_spans + ) + assert all( + LangfuseOtelSpanAttributes.IS_APP_ROOT not in span.attributes + for span in child_spans + ) + assert processor._app_root_traces == {} + + +def test_multiple_interleaved_traces_track_state_independently(memory_exporter): + tracer_provider, processor = _create_processor(memory_exporter) + langfuse_tracer = _langfuse_tracer(tracer_provider) + + trace_a_root = langfuse_tracer.start_span("trace-a-root") + trace_a_ctx = trace_api.set_span_in_context(trace_a_root) + trace_b_root = langfuse_tracer.start_span("trace-b-root") + trace_b_ctx = trace_api.set_span_in_context(trace_b_root) + + assert len(processor._app_root_traces) == 2 + + trace_a_child = langfuse_tracer.start_span("trace-a-child", context=trace_a_ctx) + trace_b_child = langfuse_tracer.start_span("trace-b-child", context=trace_b_ctx) + + trace_b_child.end() + trace_b_root.end() + + assert len(processor._app_root_traces) == 1 + + trace_a_child.end() + trace_a_root.end() + + processor.force_flush() + + spans = _get_spans_by_name(memory_exporter) + + assert ( + spans["trace-a-root"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + ) + assert ( + spans["trace-b-root"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + ) + assert ( + LangfuseOtelSpanAttributes.IS_APP_ROOT not in spans["trace-a-child"].attributes + ) + assert ( + LangfuseOtelSpanAttributes.IS_APP_ROOT not in spans["trace-b-child"].attributes + ) + assert processor._app_root_traces == {} + + +def test_set_langfuse_trace_id_in_baggage_sets_value(): + trace_id = "a" * 32 + context = _set_langfuse_trace_id_in_baggage( + trace_id=trace_id, + context=otel_context_api.Context(), + ) + + assert _get_langfuse_trace_id_from_baggage(context) == trace_id + + +def test_set_langfuse_trace_id_in_baggage_normalizes_case(): + context = _set_langfuse_trace_id_in_baggage( + trace_id="ABCDEF" + "0" * 26, + context=otel_context_api.Context(), + ) + + assert _get_langfuse_trace_id_from_baggage(context) == "abcdef" + "0" * 26 + + +def test_set_langfuse_trace_id_in_baggage_is_idempotent_for_same_trace(): + trace_id = "a" * 32 + context = _set_langfuse_trace_id_in_baggage( + trace_id=trace_id, + context=otel_context_api.Context(), + ) + + same_context = _set_langfuse_trace_id_in_baggage( + trace_id=trace_id, + context=context, + ) + + assert same_context is context + + +def test_set_langfuse_trace_id_in_baggage_overwrites_for_different_trace(): + first = _set_langfuse_trace_id_in_baggage( + trace_id="a" * 32, + context=otel_context_api.Context(), + ) + + second = _set_langfuse_trace_id_in_baggage( + trace_id="b" * 32, + context=first, + ) + + assert second is not first + assert _get_langfuse_trace_id_from_baggage(second) == "b" * 32 + + def _remote_parent_context(*, trace_id: int): span_context = SpanContext( trace_id=trace_id, From 201b09b9f0e223e1ea4fd84db39281ead9297068 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 19 May 2026 15:04:38 +0200 Subject: [PATCH 08/12] push --- langfuse/_client/span_processor.py | 4 +--- tests/unit/test_app_root_detection.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/langfuse/_client/span_processor.py b/langfuse/_client/span_processor.py index a687cf384..371b09566 100644 --- a/langfuse/_client/span_processor.py +++ b/langfuse/_client/span_processor.py @@ -236,9 +236,7 @@ def _mark_app_root_candidate(self, *, span: Span, parent_context: Context) -> No if parent_state is not None else False ) - suppressed_by_parent_claim = ( - propagated_trace_id == trace_id and parent_state is None - ) + suppressed_by_parent_claim = propagated_trace_id == trace_id mark_app_root = ( expected_exported diff --git a/tests/unit/test_app_root_detection.py b/tests/unit/test_app_root_detection.py index 8bf76d491..4b67254f9 100644 --- a/tests/unit/test_app_root_detection.py +++ b/tests/unit/test_app_root_detection.py @@ -159,7 +159,7 @@ def test_different_trace_baggage_claim_does_not_suppress_local_app_root( assert processor._app_root_traces == {} -def test_local_baggage_claim_does_not_suppress_child_of_filtered_parent( +def test_local_baggage_claim_suppresses_child_even_when_parent_is_filtered( memory_exporter, ): def should_export_span(span: ReadableSpan) -> bool: @@ -190,7 +190,7 @@ def should_export_span(span: ReadableSpan) -> bool: spans = _get_spans_by_name(memory_exporter) assert "parent" not in spans - assert spans["child"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert LangfuseOtelSpanAttributes.IS_APP_ROOT not in spans["child"].attributes assert processor._app_root_traces == {} From d4adb3865aefb17c0db6cfb91ab5c735596cada1 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 19 May 2026 15:11:31 +0200 Subject: [PATCH 09/12] push --- tests/unit/test_app_root_detection.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_app_root_detection.py b/tests/unit/test_app_root_detection.py index 4b67254f9..a320807ed 100644 --- a/tests/unit/test_app_root_detection.py +++ b/tests/unit/test_app_root_detection.py @@ -85,17 +85,26 @@ def test_exported_parent_suppresses_exported_child_app_root(memory_exporter): assert processor._app_root_traces == {} -def test_filtered_direct_parent_marks_child_even_when_grandparent_exports( +def test_grandparent_baggage_claim_suppresses_child_through_filtered_parent( memory_exporter, ): tracer_provider, processor = _create_processor(memory_exporter) filtered_tracer = tracer_provider.get_tracer("requests") langfuse_tracer = _langfuse_tracer(tracer_provider) - with langfuse_tracer.start_as_current_span("grandparent"): - with filtered_tracer.start_as_current_span("filtered-parent"): - with langfuse_tracer.start_as_current_span("child"): - pass + with langfuse_tracer.start_as_current_span("grandparent") as grandparent: + context_with_claim = baggage.set_baggage( + name=LANGFUSE_TRACE_ID_BAGGAGE_KEY, + value=format(grandparent.context.trace_id, "032x"), + context=otel_context_api.get_current(), + ) + token = otel_context_api.attach(context_with_claim) + try: + with filtered_tracer.start_as_current_span("filtered-parent"): + with langfuse_tracer.start_as_current_span("child"): + pass + finally: + otel_context_api.detach(token) processor.force_flush() @@ -105,7 +114,7 @@ def test_filtered_direct_parent_marks_child_even_when_grandparent_exports( assert ( spans["grandparent"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True ) - assert spans["child"].attributes[LangfuseOtelSpanAttributes.IS_APP_ROOT] is True + assert LangfuseOtelSpanAttributes.IS_APP_ROOT not in spans["child"].attributes assert processor._app_root_traces == {} From 5571a0bac0edb80ccf8b3a5ea018c2453fcdfc3f Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 19 May 2026 15:30:22 +0200 Subject: [PATCH 10/12] push --- tests/live_provider/test_openai.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/live_provider/test_openai.py b/tests/live_provider/test_openai.py index 3cc05c9c6..ce5da3438 100644 --- a/tests/live_provider/test_openai.py +++ b/tests/live_provider/test_openai.py @@ -1240,7 +1240,7 @@ def test_audio_input_and_output(openai): client.chat.completions.create( name=generation_name, - model="gpt-4o-audio-preview", + model="gpt-4o-audio-2025-06-03", modalities=["text", "audio"], audio={"voice": "alloy", "format": "wav"}, messages=[ @@ -1274,7 +1274,7 @@ def test_audio_input_and_output(openai): in generation.data[0].input[0]["content"][1]["input_audio"]["data"] ) assert generation.data[0].type == "GENERATION" - assert "gpt-4o-audio-preview" in generation.data[0].model + assert "gpt-4o-audio-2025-06-03" in generation.data[0].model assert generation.data[0].start_time is not None assert generation.data[0].end_time is not None assert generation.data[0].start_time < generation.data[0].end_time From 77ae490beafe82ed9446024d7a97158257deb349 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 19 May 2026 15:35:47 +0200 Subject: [PATCH 11/12] push --- tests/unit/test_otel.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/unit/test_otel.py b/tests/unit/test_otel.py index e7eb74280..92576815f 100644 --- a/tests/unit/test_otel.py +++ b/tests/unit/test_otel.py @@ -88,6 +88,8 @@ def mock_processor_init(self, monkeypatch, memory_exporter): """Mock the LangfuseSpanProcessor initialization to avoid HTTP traffic.""" def mock_init(self, **kwargs): + import threading + from opentelemetry.sdk.trace.export import BatchSpanProcessor from langfuse._client.span_filter import is_default_export_span @@ -100,6 +102,8 @@ def mock_init(self, **kwargs): self._should_export_span = ( kwargs.get("should_export_span") or is_default_export_span ) + self._app_root_lock = threading.Lock() + self._app_root_traces = {} BatchSpanProcessor.__init__( self, span_exporter=memory_exporter, @@ -1990,6 +1994,8 @@ def multi_project_setup(self, monkeypatch): # Setup tracers with appropriate project-specific span exporting def mock_processor_init(self, **kwargs): + import threading + from opentelemetry.sdk.trace.export import BatchSpanProcessor from langfuse._client.span_filter import is_default_export_span @@ -1998,6 +2004,8 @@ def mock_processor_init(self, **kwargs): self._should_export_span = ( kwargs.get("should_export_span") or is_default_export_span ) + self._app_root_lock = threading.Lock() + self._app_root_traces = {} # Use the appropriate exporter based on the project key if self.public_key == project1_key: exporter = exporter_project1 @@ -2365,6 +2373,8 @@ def instrumentation_filtering_setup(self, monkeypatch): # Mock the LangfuseSpanProcessor to use our test exporters def mock_processor_init(self, **kwargs): + import threading + from opentelemetry.sdk.trace.export import BatchSpanProcessor from langfuse._client.span_filter import is_default_export_span @@ -2377,6 +2387,8 @@ def mock_processor_init(self, **kwargs): self._should_export_span = ( kwargs.get("should_export_span") or is_default_export_span ) + self._app_root_lock = threading.Lock() + self._app_root_traces = {} # For testing, use the appropriate exporter based on setup exporter = kwargs.get("_test_exporter", blocked_exporter) From 6a8b169995569560b3bd98380633a1205047822d Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 19 May 2026 15:46:33 +0200 Subject: [PATCH 12/12] push --- tests/live_provider/test_openai.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/live_provider/test_openai.py b/tests/live_provider/test_openai.py index ce5da3438..3575cfbb4 100644 --- a/tests/live_provider/test_openai.py +++ b/tests/live_provider/test_openai.py @@ -1237,10 +1237,11 @@ def test_audio_input_and_output(openai): content_path = "static/joke_prompt.wav" base64_string = encode_file_to_base64(content_path) + model = "gpt-audio-2025-08-28" client.chat.completions.create( name=generation_name, - model="gpt-4o-audio-2025-06-03", + model=model, modalities=["text", "audio"], audio={"voice": "alloy", "format": "wav"}, messages=[ @@ -1274,7 +1275,7 @@ def test_audio_input_and_output(openai): in generation.data[0].input[0]["content"][1]["input_audio"]["data"] ) assert generation.data[0].type == "GENERATION" - assert "gpt-4o-audio-2025-06-03" in generation.data[0].model + assert generation.data[0].model == model assert generation.data[0].start_time is not None assert generation.data[0].end_time is not None assert generation.data[0].start_time < generation.data[0].end_time