diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index c13f87ae5794f..f9ef2a64a8346 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -429,6 +429,7 @@ public static class ValueHider { static { KEYS.add("ssl.trust-store-pwd"); + KEYS.add("scp.password"); KEYS.add("password"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 367b92104062d..ca2d5820ec956 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -63,6 +63,8 @@ public class PipeSinkSubtaskManager { private final Map> attributeSortedString2SubtaskLifeCycleMap = new HashMap<>(); + private final Map attributeSortedString2DisplayString = new HashMap<>(); + public synchronized String register( final Supplier executorSupplier, final PipeParameters pipeSinkParameters, @@ -91,6 +93,7 @@ public synchronized String register( final int sinkNum; boolean realTimeFirst = false; String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); + final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters); if (isDataRegionSink) { sinkNum = pipeSinkParameters.getIntOrDefault( @@ -119,7 +122,9 @@ public synchronized String register( sinkNum = 1; attributeSortedString = "schema_" + attributeSortedString; } - environment.setAttributeSortedString(attributeSortedString); + final String attributeDisplayStringWithPrefix = + isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + attributeDisplayString; + environment.setAttributeSortedString(attributeDisplayStringWithPrefix); if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { final PipeSinkSubtaskExecutor executor = executorSupplier.get(); @@ -137,6 +142,12 @@ public synchronized String register( } for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) { + final String taskID = + String.format( + "%s_%s_%s", + attributeDisplayStringWithPrefix, environment.getCreationTime(), sinkIndex); + environment.setSinkTaskId(taskID); + final PipeConnector pipeSink = isDataRegionSink ? PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters) @@ -167,10 +178,9 @@ public synchronized String register( // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle final PipeSinkSubtask pipeSinkSubtask = new PipeSinkSubtask( - String.format( - "%s_%s_%s", attributeSortedString, environment.getCreationTime(), sinkIndex), + taskID, environment.getCreationTime(), - attributeSortedString, + attributeDisplayStringWithPrefix, sinkIndex, pendingQueue, pipeSink); @@ -181,11 +191,13 @@ public synchronized String register( LOGGER.info( DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED, - attributeSortedString, + attributeDisplayStringWithPrefix, executor.getWorkingThreadName(), executor.getCallbackThreadName()); attributeSortedString2SubtaskLifeCycleMap.put( attributeSortedString, pipeSinkSubtaskLifeCycleList); + attributeSortedString2DisplayString.put( + attributeSortedString, attributeDisplayStringWithPrefix); } for (final PipeSinkSubtaskLifeCycle lifeCycle : @@ -202,7 +214,7 @@ public synchronized void deregister( final int regionId, final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); + throwNoSuchSubtaskException(attributeSortedString); } final List lifeCycles = @@ -215,6 +227,7 @@ public synchronized void deregister( if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); + attributeSortedString2DisplayString.remove(attributeSortedString); executor.shutdown(); LOGGER.info( DataNodePipeMessages.THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN, @@ -230,7 +243,7 @@ public synchronized void deregister( public synchronized void start(final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); + throwNoSuchSubtaskException(attributeSortedString); } for (final PipeSinkSubtaskLifeCycle lifeCycle : @@ -241,7 +254,7 @@ public synchronized void start(final String attributeSortedString) { public synchronized void stop(final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); + throwNoSuchSubtaskException(attributeSortedString); } for (final PipeSinkSubtaskLifeCycle lifeCycle : @@ -254,7 +267,8 @@ public UnboundedBlockingPendingQueue getPipeSinkPendingQueue( final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { throw new PipeException( - DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK + attributeSortedString); + DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK + + getDisplayStringForException(attributeSortedString)); } // All subtasks share the same pending queue @@ -264,13 +278,33 @@ public UnboundedBlockingPendingQueue getPipeSinkPendingQueue( .getPendingQueue(); } - private String generateAttributeSortedString(final PipeParameters pipeConnectorParameters) { + private static String generateAttributeSortedString( + final PipeParameters pipeConnectorParameters) { final TreeMap sortedStringSourceMap = new TreeMap<>(pipeConnectorParameters.getAttribute()); sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY); return sortedStringSourceMap.toString(); } + /** Masked attribute string for logs, metrics and exception messages. */ + private static String generateAttributeDisplayString( + final PipeParameters pipeConnectorParameters) { + final TreeMap filteredAttributes = + new TreeMap<>(pipeConnectorParameters.getAttribute()); + filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY); + return new PipeParameters(filteredAttributes).toString(); + } + + private void throwNoSuchSubtaskException(final String attributeSortedString) { + throw new PipeException( + FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + + getDisplayStringForException(attributeSortedString)); + } + + private String getDisplayStringForException(final String attributeSortedString) { + return attributeSortedString2DisplayString.getOrDefault(attributeSortedString, "unknown"); + } + ///////////////////////// Singleton Instance Holder ///////////////////////// private PipeSinkSubtaskManager() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java index dd7707d1b9685..9b2f876ff2d20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java @@ -199,8 +199,8 @@ private void createRate(final String taskID) { private void createTimer(final String taskID) { final PipeSinkSubtask sink = sinkMap.get(taskID); - compressionTimerMap.putIfAbsent( - sink.getAttributeSortedString(), + compressionTimerMap.put( + taskID, metricService.getOrCreateTimer( Metric.PIPE_COMPRESSION_TIME.toString(), MetricLevel.IMPORTANT, @@ -394,7 +394,7 @@ private void removeTimer(final String taskID) { sink.getAttributeSortedString(), Tag.CREATION_TIME.toString(), String.valueOf(sink.getCreationTime())); - compressionTimerMap.remove(sink.getAttributeSortedString()); + compressionTimerMap.remove(taskID); } private void removeHistogram(final String taskID) { @@ -492,8 +492,8 @@ public void markPipeHeartbeatEvent(final String taskID) { rate.mark(); } - public Timer getCompressionTimer(final String attributeSortedString) { - return Objects.isNull(metricService) ? null : compressionTimerMap.get(attributeSortedString); + public Timer getCompressionTimer(final String taskID) { + return Objects.isNull(metricService) ? null : compressionTimerMap.get(taskID); } //////////////////////////// singleton //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 649ef35c4ce0c..d8c6624f75501 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -603,9 +603,8 @@ protected byte[] getTransferMultiFilePieceBytes( @Override protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException { - if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { - compressionTimer = - PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString); + if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) { + compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId); } return super.compressIfNeeded(reqInBytes); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 9adbcf6cf16d1..c35320338ff9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -510,9 +510,8 @@ private void transferBatchedEventsIfNecessary() throws IOException, WriteProcess @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { - if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { - compressionTimer = - PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString); + if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) { + compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId); } return super.compressIfNeeded(req); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 5e6297d843851..8352f987d7996 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -594,9 +594,8 @@ private void doTransfer( @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { - if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { - compressionTimer = - PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString); + if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) { + compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId); } return super.compressIfNeeded(req); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java index b838289134882..26081d9c78a6a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java @@ -21,6 +21,7 @@ public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment { private String attributeSortedString; + private String sinkTaskId; public PipeTaskSinkRuntimeEnvironment( final String pipeName, final long creationTime, final int regionId) { @@ -34,4 +35,12 @@ public String getAttributeSortedString() { public void setAttributeSortedString(String attributeSortedString) { this.attributeSortedString = attributeSortedString; } + + public String getSinkTaskId() { + return sinkTaskId; + } + + public void setSinkTaskId(final String sinkTaskId) { + this.sinkTaskId = sinkTaskId; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index a52779650f82a..b5662aeec2ce9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -189,6 +189,7 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent private final AtomicLong totalUncompressedSize = new AtomicLong(0); private final AtomicLong totalCompressedSize = new AtomicLong(0); protected String attributeSortedString; + protected String sinkTaskId; protected Timer compressionTimer; protected boolean isRealtimeFirst; @@ -391,8 +392,10 @@ public void customize( throws Exception { final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment(); if (environment instanceof PipeTaskSinkRuntimeEnvironment) { - attributeSortedString = - ((PipeTaskSinkRuntimeEnvironment) environment).getAttributeSortedString(); + final PipeTaskSinkRuntimeEnvironment sinkEnvironment = + (PipeTaskSinkRuntimeEnvironment) environment; + attributeSortedString = sinkEnvironment.getAttributeSortedString(); + sinkTaskId = sinkEnvironment.getSinkTaskId(); } nodeUrls.clear();