Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@

public enum OtelInstrumentType {
// same order as io.opentelemetry.sdk.metrics.InstrumentType
COUNTER,
UP_DOWN_COUNTER,
HISTOGRAM,
OBSERVABLE_COUNTER,
OBSERVABLE_UP_DOWN_COUNTER,
OBSERVABLE_GAUGE,
GAUGE,
COUNTER(false),
UP_DOWN_COUNTER(false),
HISTOGRAM(false),
OBSERVABLE_COUNTER(true),
OBSERVABLE_UP_DOWN_COUNTER(true),
OBSERVABLE_GAUGE(true),
GAUGE(false);

private final boolean observable;

OtelInstrumentType(boolean observable) {
this.observable = observable;
}

public boolean isObservable() {
return observable;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package datadog.trace.bootstrap.otel.metrics.data;

import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
import datadog.trace.bootstrap.otlp.metrics.OtlpDoublePoint;

/** Reports the delta value since the last reset. */
final class OtelDoubleDelta extends OtelAggregator {
private volatile double value;
private double lastValue;

@Override
void doRecordDouble(double value) {
this.value = value;
}

@Override
OtlpDataPoint doCollect(boolean reset) {
double collectedValue = value;
double delta = collectedValue - lastValue;
if (reset) {
lastValue = collectedValue;
}
return new OtlpDoublePoint(delta);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datadog.trace.bootstrap.otlp.metrics.OtlpDoublePoint;
import java.util.concurrent.atomic.DoubleAdder;

/** Reports the sum of values since the last reset. */
final class OtelDoubleSum extends OtelAggregator {
private final DoubleAdder total = new DoubleAdder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
import datadog.trace.bootstrap.otlp.metrics.OtlpDoublePoint;

/** Always reports the latest value. */
final class OtelDoubleValue extends OtelAggregator {
private volatile double value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint;
import java.util.List;

/** Reports the histogram of values since the last reset. */
final class OtelHistogramSketch extends OtelAggregator {
private final HistogramWithSum histogram;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package datadog.trace.bootstrap.otel.metrics.data;

import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
import datadog.trace.bootstrap.otlp.metrics.OtlpLongPoint;

/** Reports the delta value since the last reset. */
final class OtelLongDelta extends OtelAggregator {
private volatile long value;
private long lastValue;

@Override
void doRecordLong(long value) {
this.value = value;
}

@Override
OtlpDataPoint doCollect(boolean reset) {
long collectedValue = value;
long delta = collectedValue - lastValue;
if (reset) {
lastValue = collectedValue;
}
return new OtlpLongPoint(delta);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datadog.trace.bootstrap.otlp.metrics.OtlpLongPoint;
import java.util.concurrent.atomic.LongAdder;

/** Reports the sum of values since the last reset. */
final class OtelLongSum extends OtelAggregator {
private final LongAdder total = new LongAdder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
import datadog.trace.bootstrap.otlp.metrics.OtlpLongPoint;

/** Always reports the latest value. */
final class OtelLongValue extends OtelAggregator {
private volatile long value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor;
import datadog.trace.bootstrap.otel.metrics.OtelInstrumentType;
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
import datadog.trace.bootstrap.otlp.metrics.OtlpMetricVisitor;
import io.opentelemetry.api.common.Attributes;
import java.util.Collections;
Expand Down Expand Up @@ -46,6 +47,7 @@ public final class OtelMetricStorage {

private final OtelInstrumentDescriptor descriptor;
private final boolean resetOnCollect;
private final boolean toggleRecordings;
private final Function<Object, OtelAggregator> aggregatorSupplier;
private volatile Recording currentRecording;

Expand All @@ -56,9 +58,12 @@ private OtelMetricStorage(
OtelInstrumentDescriptor descriptor, Supplier<OtelAggregator> aggregatorSupplier) {
this.descriptor = descriptor;
this.resetOnCollect = shouldResetOnCollect(descriptor.getType());
// no need to toggle if not resetting on collect, or if it's an observable instrument
// (observables are always invoked within the collect cycle, so no concurrent writers)
this.toggleRecordings = resetOnCollect && !descriptor.getType().isObservable();
this.aggregatorSupplier = unused -> aggregatorSupplier.get();
this.currentRecording = new Recording();
if (resetOnCollect) {
if (toggleRecordings) {
this.previousRecording = new Recording();
}
}
Expand Down Expand Up @@ -86,6 +91,10 @@ public static OtelMetricStorage newDoubleValueStorage(OtelInstrumentDescriptor d
return new OtelMetricStorage(descriptor, OtelDoubleValue::new);
}

public static OtelMetricStorage newDoubleDeltaStorage(OtelInstrumentDescriptor descriptor) {
return new OtelMetricStorage(descriptor, OtelDoubleDelta::new);
}

public static OtelMetricStorage newLongSumStorage(OtelInstrumentDescriptor descriptor) {
return new OtelMetricStorage(descriptor, OtelLongSum::new);
}
Expand All @@ -94,6 +103,10 @@ public static OtelMetricStorage newLongValueStorage(OtelInstrumentDescriptor des
return new OtelMetricStorage(descriptor, OtelLongValue::new);
}

public static OtelMetricStorage newLongDeltaStorage(OtelInstrumentDescriptor descriptor) {
return new OtelMetricStorage(descriptor, OtelLongDelta::new);
}

public static OtelMetricStorage newHistogramStorage(
OtelInstrumentDescriptor descriptor, List<Double> bucketBoundaries) {
return new OtelMetricStorage(descriptor, () -> new OtelHistogramSketch(bucketBoundaries));
Expand All @@ -108,7 +121,7 @@ public OtelInstrumentDescriptor getDescriptor() {
}

public void recordLong(long value, Object attributes) {
if (resetOnCollect) {
if (toggleRecordings) {
Recording recording = acquireRecordingForWrite();
try {
aggregator(recording.aggregators, attributes).recordLong(value);
Expand All @@ -129,7 +142,7 @@ public void recordDouble(double value, Object attributes) {
attributes);
return;
}
if (resetOnCollect) {
if (toggleRecordings) {
Recording recording = acquireRecordingForWrite();
try {
aggregator(recording.aggregators, attributes).recordDouble(value);
Expand Down Expand Up @@ -173,26 +186,8 @@ public static void registerAttributeReader(

/** Collect data for CUMULATIVE temporality, keeping aggregators for future writes. */
private void doCollect(OtlpMetricVisitor visitor) {
BiConsumer<Object, OtlpAttributeVisitor> attributesReader = null;
ClassLoader attributesClassLoader = null;

// no need to hold writers back if we are not resetting metrics on collect
for (Map.Entry<Object, OtelAggregator> entry : currentRecording.aggregators.entrySet()) {
OtelAggregator aggregator = entry.getValue();
if (!aggregator.isEmpty()) {
Object attributes = entry.getKey();
ClassLoader cl = attributes.getClass().getClassLoader();
// avoid repeated lookups when attribute class-loader is same for all records
if (attributesReader == null || cl != attributesClassLoader) {
attributesReader = ATTRIBUTE_READERS.get(cl);
attributesClassLoader = cl;
}
if (attributesReader != null) {
attributesReader.accept(attributes, visitor);
}
visitor.visitDataPoint(aggregator.collect());
}
}
collectDataPoints(currentRecording.aggregators, visitor, OtelAggregator::collect);
}

/**
Expand All @@ -205,13 +200,15 @@ private void doCollectAndReset(OtlpMetricVisitor visitor) {
// capture _current_ recording for collection, its aggregators will be reset at the end
final Recording recording = currentRecording;

// publish fresh recording for new writers, using aggregators from _previous_ recording
currentRecording = new Recording(previousRecording);
if (toggleRecordings) {
// publish fresh recording for new writers, using aggregators from _previous_ recording
currentRecording = new Recording(previousRecording);

// notify writers that the captured recording is about to be reset
ACTIVITY.addAndGet(recording, RESET_PENDING);
while (recording.activity > 1) {
Thread.yield(); // other threads are still writing to this recording
// notify writers that the captured recording is about to be reset
ACTIVITY.addAndGet(recording, RESET_PENDING);
while (recording.activity > 1) {
Thread.yield(); // other threads are still writing to this recording
}
}

Map<Object, OtelAggregator> aggregators = recording.aggregators;
Expand All @@ -221,6 +218,17 @@ private void doCollectAndReset(OtlpMetricVisitor visitor) {
aggregators.values().removeIf(OtelAggregator::isEmpty);
}

collectDataPoints(aggregators, visitor, OtelAggregator::collectAndReset);

if (toggleRecordings) {
previousRecording = recording;
}
}

private void collectDataPoints(
Map<Object, OtelAggregator> aggregators,
OtlpMetricVisitor visitor,
Function<OtelAggregator, OtlpDataPoint> collect) {
BiConsumer<Object, OtlpAttributeVisitor> attributesReader = null;
ClassLoader attributesClassLoader = null;

Expand All @@ -237,11 +245,9 @@ private void doCollectAndReset(OtlpMetricVisitor visitor) {
if (attributesReader != null) {
attributesReader.accept(attributes, visitor);
}
visitor.visitDataPoint(aggregator.collectAndReset());
visitor.visitDataPoint(collect.apply(aggregator));
}
}

previousRecording = recording;
}

private Recording acquireRecordingForWrite() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public DoubleCounter build() {

@Override
public ObservableDoubleMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage);
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleDeltaStorage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public DoubleUpDownCounter build() {

@Override
public ObservableDoubleMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage);
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleDeltaStorage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public LongCounter build() {

@Override
public ObservableLongMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongSumStorage);
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongDeltaStorage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public LongUpDownCounter build() {

@Override
public ObservableLongMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongSumStorage);
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongDeltaStorage);
}

@Override
Expand Down
Loading