Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ public static class ValueHider {

static {
KEYS.add("ssl.trust-store-pwd");
KEYS.add("scp.password");
KEYS.add("password");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class PipeSinkSubtaskManager {
private final Map<String, List<PipeSinkSubtaskLifeCycle>>
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();

private final Map<String, String> attributeSortedString2DisplayString = new HashMap<>();

public synchronized String register(
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
final PipeParameters pipeSinkParameters,
Expand Down Expand Up @@ -91,6 +93,7 @@ public synchronized String register(
final int sinkNum;
boolean realTimeFirst = false;
String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters);
if (isDataRegionSink) {
sinkNum =
pipeSinkParameters.getIntOrDefault(
Expand Down Expand Up @@ -119,7 +122,9 @@ public synchronized String register(
sinkNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
environment.setAttributeSortedString(attributeSortedString);
final String attributeDisplayStringWithPrefix =
isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + attributeDisplayString;
environment.setAttributeSortedString(attributeDisplayStringWithPrefix);

if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
final PipeSinkSubtaskExecutor executor = executorSupplier.get();
Expand All @@ -137,6 +142,12 @@ public synchronized String register(
}

for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
final String taskID =
String.format(
"%s_%s_%s",
attributeDisplayStringWithPrefix, environment.getCreationTime(), sinkIndex);
environment.setSinkTaskId(taskID);

final PipeConnector pipeSink =
isDataRegionSink
? PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
Expand Down Expand Up @@ -167,10 +178,9 @@ public synchronized String register(
// 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
final PipeSinkSubtask pipeSinkSubtask =
new PipeSinkSubtask(
String.format(
"%s_%s_%s", attributeSortedString, environment.getCreationTime(), sinkIndex),
taskID,
environment.getCreationTime(),
attributeSortedString,
attributeDisplayStringWithPrefix,
sinkIndex,
pendingQueue,
pipeSink);
Expand All @@ -181,11 +191,13 @@ public synchronized String register(

LOGGER.info(
DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED,
attributeSortedString,
attributeDisplayStringWithPrefix,
executor.getWorkingThreadName(),
executor.getCallbackThreadName());
attributeSortedString2SubtaskLifeCycleMap.put(
attributeSortedString, pipeSinkSubtaskLifeCycleList);
attributeSortedString2DisplayString.put(
attributeSortedString, attributeDisplayStringWithPrefix);
}

for (final PipeSinkSubtaskLifeCycle lifeCycle :
Expand All @@ -202,7 +214,7 @@ public synchronized void deregister(
final int regionId,
final String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
throwNoSuchSubtaskException(attributeSortedString);
}

final List<PipeSinkSubtaskLifeCycle> lifeCycles =
Expand All @@ -215,6 +227,7 @@ public synchronized void deregister(

if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
attributeSortedString2DisplayString.remove(attributeSortedString);
executor.shutdown();
LOGGER.info(
DataNodePipeMessages.THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN,
Expand All @@ -230,7 +243,7 @@ public synchronized void deregister(

public synchronized void start(final String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
throwNoSuchSubtaskException(attributeSortedString);
}

for (final PipeSinkSubtaskLifeCycle lifeCycle :
Expand All @@ -241,7 +254,7 @@ public synchronized void start(final String attributeSortedString) {

public synchronized void stop(final String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
throwNoSuchSubtaskException(attributeSortedString);
}

for (final PipeSinkSubtaskLifeCycle lifeCycle :
Expand All @@ -254,7 +267,8 @@ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
final String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(
DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK + attributeSortedString);
DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK
+ getDisplayStringForException(attributeSortedString));
}

// All subtasks share the same pending queue
Expand All @@ -264,13 +278,33 @@ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
.getPendingQueue();
}

private String generateAttributeSortedString(final PipeParameters pipeConnectorParameters) {
private static String generateAttributeSortedString(
final PipeParameters pipeConnectorParameters) {
final TreeMap<String, String> sortedStringSourceMap =
new TreeMap<>(pipeConnectorParameters.getAttribute());
sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
return sortedStringSourceMap.toString();
}

/** Masked attribute string for logs, metrics and exception messages. */
private static String generateAttributeDisplayString(
final PipeParameters pipeConnectorParameters) {
final TreeMap<String, String> filteredAttributes =
new TreeMap<>(pipeConnectorParameters.getAttribute());
filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
return new PipeParameters(filteredAttributes).toString();
}

private void throwNoSuchSubtaskException(final String attributeSortedString) {
throw new PipeException(
FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE
+ getDisplayStringForException(attributeSortedString));
}

private String getDisplayStringForException(final String attributeSortedString) {
return attributeSortedString2DisplayString.getOrDefault(attributeSortedString, "unknown");
}

///////////////////////// Singleton Instance Holder /////////////////////////

private PipeSinkSubtaskManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ private void createRate(final String taskID) {

private void createTimer(final String taskID) {
final PipeSinkSubtask sink = sinkMap.get(taskID);
compressionTimerMap.putIfAbsent(
sink.getAttributeSortedString(),
compressionTimerMap.put(
taskID,
metricService.getOrCreateTimer(
Metric.PIPE_COMPRESSION_TIME.toString(),
MetricLevel.IMPORTANT,
Expand Down Expand Up @@ -394,7 +394,7 @@ private void removeTimer(final String taskID) {
sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(sink.getCreationTime()));
compressionTimerMap.remove(sink.getAttributeSortedString());
compressionTimerMap.remove(taskID);
}

private void removeHistogram(final String taskID) {
Expand Down Expand Up @@ -492,8 +492,8 @@ public void markPipeHeartbeatEvent(final String taskID) {
rate.mark();
}

public Timer getCompressionTimer(final String attributeSortedString) {
return Objects.isNull(metricService) ? null : compressionTimerMap.get(attributeSortedString);
public Timer getCompressionTimer(final String taskID) {
return Objects.isNull(metricService) ? null : compressionTimerMap.get(taskID);
}

//////////////////////////// singleton ////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,9 +603,8 @@ protected byte[] getTransferMultiFilePieceBytes(

@Override
protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException {
if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) {
compressionTimer =
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
}
return super.compressIfNeeded(reqInBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,8 @@ private void transferBatchedEventsIfNecessary() throws IOException, WriteProcess

@Override
public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException {
if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) {
compressionTimer =
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
}
return super.compressIfNeeded(req);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,9 +594,8 @@ private void doTransfer(

@Override
public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException {
if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) {
compressionTimer =
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
}
return super.compressIfNeeded(req);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment {
private String attributeSortedString;
private String sinkTaskId;

public PipeTaskSinkRuntimeEnvironment(
final String pipeName, final long creationTime, final int regionId) {
Expand All @@ -34,4 +35,12 @@ public String getAttributeSortedString() {
public void setAttributeSortedString(String attributeSortedString) {
this.attributeSortedString = attributeSortedString;
}

public String getSinkTaskId() {
return sinkTaskId;
}

public void setSinkTaskId(final String sinkTaskId) {
this.sinkTaskId = sinkTaskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent
private final AtomicLong totalUncompressedSize = new AtomicLong(0);
private final AtomicLong totalCompressedSize = new AtomicLong(0);
protected String attributeSortedString;
protected String sinkTaskId;
protected Timer compressionTimer;
protected boolean isRealtimeFirst;

Expand Down Expand Up @@ -391,8 +392,10 @@ public void customize(
throws Exception {
final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment();
if (environment instanceof PipeTaskSinkRuntimeEnvironment) {
attributeSortedString =
((PipeTaskSinkRuntimeEnvironment) environment).getAttributeSortedString();
final PipeTaskSinkRuntimeEnvironment sinkEnvironment =
(PipeTaskSinkRuntimeEnvironment) environment;
attributeSortedString = sinkEnvironment.getAttributeSortedString();
sinkTaskId = sinkEnvironment.getSinkTaskId();
}

nodeUrls.clear();
Expand Down
Loading