diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 3223ff913594e1..2452a2dde3c681 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprToSqlVisitor; +import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.ToSqlParams; import org.apache.doris.analysis.UserIdentity; @@ -468,6 +469,27 @@ protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { } } + public void validateTargetTable(Database db, OlapTable targetTable) throws UserException { + if (isMultiTable) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); + } + List columnsInfo = null; + if (columnDescs != null && !columnDescs.descs.isEmpty()) { + columnsInfo = new ArrayList<>(columnDescs.descs); + } + checkMeta(targetTable, new RoutineLoadDesc(columnSeparator, lineDelimiter, columnsInfo, precedingFilter, + whereExpr, partitionNamesInfo, deleteCondition, mergeType, sequenceCol)); + + targetTable.readLock(); + try { + NereidsStreamLoadPlanner planner = new NereidsStreamLoadPlanner(db, targetTable, + toNereidsRoutineLoadTaskInfo()); + planner.plan(new TUniqueId(0, 0)); + } finally { + targetTable.readUnlock(); + } + } + @Override public long getId() { return id; @@ -2066,7 +2088,9 @@ protected void modifyCommonJobProperties(Map jobProperties) thro jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)); } - if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + boolean hasExplicitUniqueKeyUpdateMode = jobProperties.containsKey( + CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); + if (hasExplicitUniqueKeyUpdateMode) { String modeStr = jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); TUniqueKeyUpdateMode newMode = CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr); // Validate flexible partial update constraints when changing to UPDATE_FLEXIBLE_COLUMNS @@ -2077,6 +2101,7 @@ protected void modifyCommonJobProperties(Map jobProperties) thro this.isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name()); this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); + jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS); } if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java index dcb683e9e227dd..f45a2295b1c0b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java @@ -73,7 +73,6 @@ import com.google.gson.annotations.SerializedName; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.BooleanUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -757,9 +756,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti } modifyPropertiesInternal(jobProperties, dataSourceProperties); + if (command.hasTargetTable()) { + this.tableId = command.getTargetTableId(); + } AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, - jobProperties, dataSourceProperties); + jobProperties, dataSourceProperties, command.getTargetTableId()); Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log); } finally { writeUnlock(); @@ -844,9 +846,6 @@ private void modifyPropertiesInternal(Map jobProperties, Map copiedJobProperties = Maps.newHashMap(jobProperties); modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); - if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { - this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS)); - } if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) { String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY); if ("ERROR".equalsIgnoreCase(policy)) { @@ -883,6 +882,9 @@ private void resetCloudProgress(Cloud.ResetRLProgressRequest.Builder builder) th public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties()); + if (log.getTargetTableId() != 0) { + this.tableId = log.getTargetTableId(); + } } catch (UserException e) { // should not happen LOG.error("failed to replay modify kafka routine load job: {}", id, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java index 0c05889929924e..4f97857ff7a0a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java @@ -65,7 +65,6 @@ import com.google.gson.annotations.SerializedName; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.BooleanUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -689,9 +688,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti } modifyPropertiesInternal(jobProperties, dataSourceProperties); + if (command.hasTargetTable()) { + this.tableId = command.getTargetTableId(); + } AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, - jobProperties, dataSourceProperties); + jobProperties, dataSourceProperties, command.getTargetTableId()); Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log); } finally { writeUnlock(); @@ -764,9 +766,6 @@ private void modifyPropertiesInternal(Map jobProperties, Map copiedJobProperties = Maps.newHashMap(jobProperties); modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); - if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { - this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS)); - } if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) { String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY); if ("ERROR".equalsIgnoreCase(policy)) { @@ -785,6 +784,9 @@ public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { modifyPropertiesInternal(log.getJobProperties(), (KinesisDataSourceProperties) log.getDataSourceProperties()); + if (log.getTargetTableId() != 0) { + this.tableId = log.getTargetTableId(); + } } catch (UserException e) { LOG.error("failed to replay modify kinesis routine load job: {}", id, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index c470bc789d9e78..0b6f5f32c8a564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -9284,6 +9284,12 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx } LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName); + // TODO: Phase 1 only supports altering the target table. Phase 2 will allow altering + // the target table and routine load properties in the same statement. + if (ctx.table != null) { + return new AlterRoutineLoadCommand(labelNameInfo, ctx.table.getText()); + } + Map properties = new HashMap<>(); if (ctx.properties != null) { properties.putAll(visitPropertyClause(ctx.properties)); @@ -9309,7 +9315,8 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx } } - return new AlterRoutineLoadCommand(labelNameInfo, loadPropertyMap, properties, dataSourceMapProperties); + return new AlterRoutineLoadCommand(labelNameInfo, null, + loadPropertyMap, properties, dataSourceMapProperties); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java index 367480c5d934d9..6fad5299a477ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java @@ -21,18 +21,23 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.routineload.AbstractDataSourceProperties; import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; @@ -40,6 +45,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -85,10 +91,12 @@ public class AlterRoutineLoadCommand extends AlterCommand { .build(); private final LabelNameInfo labelNameInfo; + private final String targetTableName; private final Map loadPropertyMap; private RoutineLoadDesc routineLoadDesc; private final Map jobProperties; private final Map dataSourceMapProperties; + private long targetTableId; private boolean isPartialUpdate; // save analyzed job properties. @@ -100,6 +108,7 @@ public class AlterRoutineLoadCommand extends AlterCommand { * AlterRoutineLoadCommand */ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, + String targetTableName, Map loadPropertyMap, Map jobProperties, Map dataSourceMapProperties) { @@ -108,6 +117,7 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, Objects.requireNonNull(jobProperties, "jobProperties is null"); Objects.requireNonNull(dataSourceMapProperties, "dataSourceMapProperties is null"); this.labelNameInfo = labelNameInfo; + this.targetTableName = targetTableName; this.loadPropertyMap = loadPropertyMap == null ? Maps.newHashMap() : loadPropertyMap; this.jobProperties = jobProperties; this.dataSourceMapProperties = dataSourceMapProperties; @@ -118,7 +128,11 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, Map jobProperties, Map dataSourceMapProperties) { - this(labelNameInfo, Maps.newHashMap(), jobProperties, dataSourceMapProperties); + this(labelNameInfo, null, Maps.newHashMap(), jobProperties, dataSourceMapProperties); + } + + public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, String targetTableName) { + this(labelNameInfo, targetTableName, Maps.newHashMap(), Maps.newHashMap(), Maps.newHashMap()); } public String getDbName() { @@ -133,6 +147,18 @@ public Map getAnalyzedJobProperties() { return analyzedJobProperties; } + public boolean hasTargetTable() { + return !StringUtil.isEmpty(targetTableName); + } + + public String getTargetTableName() { + return targetTableName; + } + + public long getTargetTableId() { + return targetTableId; + } + public boolean hasDataSourceProperty() { return MapUtils.isNotEmpty(dataSourceMapProperties); } @@ -161,18 +187,27 @@ public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { public void validate(ConnectContext ctx) throws UserException { labelNameInfo.validate(ctx); FeNameFormat.checkCommonName(NAME_TYPE, labelNameInfo.getLabel()); + if (hasTargetTable() && (MapUtils.isNotEmpty(loadPropertyMap) || MapUtils.isNotEmpty(jobProperties) + || MapUtils.isNotEmpty(dataSourceMapProperties))) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change does not support other properties"); + } // check routine load job properties include desired concurrent number etc. checkJobProperties(); // check load properties RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() .getJob(getDbName(), getJobName()); - this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap, - job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType()); + if (MapUtils.isNotEmpty(loadPropertyMap)) { + this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap, + job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType()); + } // check data source properties checkDataSourceProperties(); + if (hasTargetTable()) { + validateTargetTable(ctx, job); + } checkPartialUpdate(); if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties) - && routineLoadDesc == null) { + && MapUtils.isEmpty(loadPropertyMap) && !hasTargetTable()) { throw new AnalysisException("No properties are specified"); } } @@ -329,21 +364,62 @@ private void checkDataSourceProperties() throws UserException { } private void checkPartialUpdate() throws UserException { - if (!isPartialUpdate) { + RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() + .getJob(getDbName(), getJobName()); + TUniqueKeyUpdateMode uniqueKeyUpdateMode = getEffectiveUniqueKeyUpdateMode(job); + if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { return; } - RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() - .getJob(getDbName(), getDbName()); if (job.isMultiTable()) { throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load."); } Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); - Table table = db.getTableOrAnalysisException(job.getTableName()); - if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { + String tableName = hasTargetTable() ? targetTableName : job.getTableName(); + Table table = db.getTableOrAnalysisException(tableName); + if (!((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); } } + private TUniqueKeyUpdateMode getEffectiveUniqueKeyUpdateMode(RoutineLoadJob job) { + if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + return TUniqueKeyUpdateMode.valueOf( + analyzedJobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)); + } + if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS) && isPartialUpdate + && job.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPSERT) { + return TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + return job.getUniqueKeyUpdateMode(); + } + + private void validateTargetTable(ConnectContext ctx, RoutineLoadJob job) throws UserException { + if (job.isMultiTable()) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); + } + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); + Table table = db.getTableOrAnalysisException(targetTableName); + if (!(table instanceof OlapTable)) { + throw new AnalysisException("ALTER ROUTINE LOAD target table only supports OLAP table"); + } + OlapTable olapTable = (OlapTable) table; + if (olapTable.isTemporary()) { + throw new AnalysisException("Do not support load for temporary table " + olapTable.getDisplayName()); + } + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, InternalCatalog.INTERNAL_CATALOG_NAME, + job.getDbFullName(), targetTableName, PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ctx.getQualifiedUser(), ctx.getRemoteIP(), job.getDbFullName() + ": " + targetTableName); + } + if (job.isLoadToSingleTablet() + && !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) { + throw new AnalysisException( + "if load_to_single_tablet set to true, the olap table must be with random distribution"); + } + job.validateTargetTable(db, olapTable); + this.targetTableId = olapTable.getId(); + } + @Override public R accept(PlanVisitor visitor, C context) { return visitor.visitAlterRoutineLoadCommand(this, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java index 4729882f7927fb..ae7c8ad214de7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java @@ -37,12 +37,20 @@ public class AlterRoutineLoadJobOperationLog implements Writable { private Map jobProperties; @SerializedName(value = "dataSourceProperties") private AbstractDataSourceProperties dataSourceProperties; + @SerializedName(value = "targetTableId") + private long targetTableId; public AlterRoutineLoadJobOperationLog(long jobId, Map jobProperties, AbstractDataSourceProperties dataSourceProperties) { + this(jobId, jobProperties, dataSourceProperties, 0L); + } + + public AlterRoutineLoadJobOperationLog(long jobId, Map jobProperties, + AbstractDataSourceProperties dataSourceProperties, long targetTableId) { this.jobId = jobId; this.jobProperties = jobProperties; this.dataSourceProperties = dataSourceProperties; + this.targetTableId = targetTableId; } public long getJobId() { @@ -57,6 +65,10 @@ public AbstractDataSourceProperties getDataSourceProperties() { return dataSourceProperties; } + public long getTargetTableId() { + return targetTableId; + } + public static AlterRoutineLoadJobOperationLog read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, AlterRoutineLoadJobOperationLog.class); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 8b442012ff2c04..069f57199550c7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -44,8 +44,10 @@ import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -197,6 +199,24 @@ public void testUpdateLagRefreshesLatestOffsetCache() throws UserException { } } + @Test + public void testModifyPropertiesHonorsExplicitUniqueKeyUpdateModePrecedence() throws Exception { + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, + 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN); + Deencapsulation.setField(routineLoadJob, "uniqueKeyUpdateMode", TUniqueKeyUpdateMode.UPSERT); + Deencapsulation.setField(routineLoadJob, "isPartialUpdate", false); + + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPSERT"); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true"); + + Deencapsulation.invoke(routineLoadJob, "modifyPropertiesInternal", jobProperties, + new KafkaDataSourceProperties(Maps.newHashMap())); + + Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT, routineLoadJob.getUniqueKeyUpdateMode()); + Assert.assertFalse(routineLoadJob.isFixedPartialUpdate()); + } + @Test public void testUpdateLagRebuildsConvertedPropertiesAfterReplay() throws UserException { KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, @@ -415,6 +435,24 @@ public void testFromCreateStmt() throws UserException { } } + @Test + public void testReplayModifyPropertiesSwitchesTargetTableWithoutResettingProgress() { + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, + 101L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN); + Map partitionToOffset = Maps.newHashMap(); + partitionToOffset.put(1, 123L); + KafkaProgress progress = new KafkaProgress(partitionToOffset); + Deencapsulation.setField(routineLoadJob, "progress", progress); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(1L, + Maps.newHashMap(), null, 202L); + routineLoadJob.replayModifyProperties(log); + + Assert.assertEquals(202L, routineLoadJob.getTableId()); + Assert.assertSame(progress, routineLoadJob.getProgress()); + Assert.assertEquals(Long.valueOf(123L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); + } + private CreateRoutineLoadInfo initCreateRoutineLoadInfo() { Map properties = Maps.newHashMap(); properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KinesisRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KinesisRoutineLoadJobTest.java index aa1dc052605f2b..67688ab2e790e7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KinesisRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KinesisRoutineLoadJobTest.java @@ -25,6 +25,8 @@ import org.apache.doris.load.routineload.kinesis.KinesisProgress; import org.apache.doris.load.routineload.kinesis.KinesisRoutineLoadJob; import org.apache.doris.load.routineload.kinesis.KinesisTaskInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -101,6 +103,25 @@ public void testGetStatisticContainsKinesisFields() { Assert.assertEquals(100L, ((Number) statistic.get("maxMillisBehindLatest")).longValue()); } + @Test + public void testModifyPropertiesHonorsExplicitUniqueKeyUpdateModePrecedence() throws Exception { + KinesisRoutineLoadJob routineLoadJob = + new KinesisRoutineLoadJob(1L, "kinesis_routine_load_job", 1L, + 1L, "ap-southeast-1", "stream-1", UserIdentity.ADMIN); + Deencapsulation.setField(routineLoadJob, "uniqueKeyUpdateMode", TUniqueKeyUpdateMode.UPSERT); + Deencapsulation.setField(routineLoadJob, "isPartialUpdate", false); + + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPSERT"); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true"); + + Deencapsulation.invoke(routineLoadJob, "modifyPropertiesInternal", jobProperties, + new KinesisDataSourceProperties(Maps.newHashMap())); + + Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT, routineLoadJob.getUniqueKeyUpdateMode()); + Assert.assertFalse(routineLoadJob.isFixedPartialUpdate()); + } + @Test public void testHasMoreDataToConsumeShouldKeepPollingWhenLagCacheIsZero() throws Exception { KinesisRoutineLoadJob routineLoadJob = diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java index 0989bc127b046b..f7fd80622ec1eb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java @@ -19,16 +19,23 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.nereids.exceptions.ParseException; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Maps; import org.junit.jupiter.api.AfterEach; @@ -41,8 +48,16 @@ import java.util.Map; public class AlterRoutineLoadCommandTest { + private static final NereidsParser PARSER = new NereidsParser(); + private Env env; private ConnectContext connectContext; + private AccessControllerManager accessManager; + private InternalCatalog catalog; + private Database db; + private OlapTable currentTable; + private RoutineLoadManager routineLoadManager; + private RoutineLoadJob routineLoadJob; private MockedStatic envMockedStatic; private MockedStatic ctxMockedStatic; @@ -50,26 +65,38 @@ public class AlterRoutineLoadCommandTest { public void setUp() throws Exception { env = Mockito.mock(Env.class); connectContext = Mockito.mock(ConnectContext.class); + accessManager = Mockito.mock(AccessControllerManager.class); + catalog = Mockito.mock(InternalCatalog.class); + db = Mockito.mock(Database.class); + currentTable = Mockito.mock(OlapTable.class); + routineLoadManager = Mockito.mock(RoutineLoadManager.class); + routineLoadJob = Mockito.mock(RoutineLoadJob.class); envMockedStatic = Mockito.mockStatic(Env.class); ctxMockedStatic = Mockito.mockStatic(ConnectContext.class); envMockedStatic.when(Env::getCurrentEnv).thenReturn(env); ctxMockedStatic.when(ConnectContext::get).thenReturn(connectContext); Mockito.when(connectContext.getSessionVariable()).thenReturn(new SessionVariable()); Mockito.when(connectContext.getState()).thenReturn(new QueryState()); - InternalCatalog catalog = Mockito.mock(InternalCatalog.class); - Database db = Mockito.mock(Database.class); - Table tbl = Mockito.mock(Table.class); + Mockito.when(connectContext.getQualifiedUser()).thenReturn("testUser"); + Mockito.when(connectContext.getRemoteIP()).thenReturn("127.0.0.1"); envMockedStatic.when(Env::getCurrentInternalCatalog).thenReturn(catalog); Mockito.doReturn(db).when(catalog).getDbOrAnalysisException(Mockito.anyString()); - Mockito.doReturn(tbl).when(db).getTableOrAnalysisException(Mockito.anyString()); - Mockito.when(env.getRoutineLoadManager()).thenReturn(Mockito.mock(RoutineLoadManager.class)); - RoutineLoadManager rlm = env.getRoutineLoadManager(); - RoutineLoadJob rlJob = Mockito.mock(RoutineLoadJob.class); - Mockito.when(rlm.getJob(Mockito.anyString(), Mockito.anyString())).thenReturn(rlJob); - Mockito.when(rlJob.getDbFullName()).thenReturn("testDb"); - Mockito.when(rlJob.getTableName()).thenReturn("testTable"); - Mockito.when(rlJob.isMultiTable()).thenReturn(false); - Mockito.when(rlJob.getMergeType()).thenReturn(LoadTask.MergeType.APPEND); + Mockito.doReturn(currentTable).when(db).getTableOrAnalysisException(Mockito.anyString()); + Mockito.when(env.getRoutineLoadManager()).thenReturn(routineLoadManager); + Mockito.when(env.getAccessManager()).thenReturn(accessManager); + Mockito.when(accessManager.checkTblPriv(Mockito.any(ConnectContext.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(true); + Mockito.when(routineLoadManager.getJob(Mockito.anyString(), Mockito.anyString())).thenReturn(routineLoadJob); + Mockito.when(routineLoadJob.getDbFullName()).thenReturn("testDb"); + Mockito.when(routineLoadJob.getTableName()).thenReturn("testTable"); + Mockito.when(routineLoadJob.getDbId()).thenReturn(1000L); + Mockito.when(routineLoadJob.getTableId()).thenReturn(2000L); + Mockito.when(routineLoadJob.isMultiTable()).thenReturn(false); + Mockito.when(routineLoadJob.getMergeType()).thenReturn(LoadTask.MergeType.APPEND); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(false); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(currentTable.isTemporary()).thenReturn(false); } @AfterEach @@ -86,6 +113,14 @@ private void runBefore() { Mockito.when(connectContext.isSkipAuth()).thenReturn(true); } + private void mockTargetTable(Table table) { + try { + Mockito.doReturn(table).when(db).getTableOrAnalysisException("testTable2"); + } catch (AnalysisException e) { + throw new RuntimeException(e); + } + } + @Test public void testValidate() { runBefore(); @@ -115,4 +150,188 @@ public void testValidate() { Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.STRICT_MODE)); Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.TIMEZONE)); } + + @Test + public void testParseAlterRoutineLoadOnTargetTable() { + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + Assertions.assertEquals("testDb", command.getDbName()); + Assertions.assertEquals("label1", command.getJobName()); + Assertions.assertTrue(command.hasTargetTable()); + Assertions.assertEquals("testTable2", command.getTargetTableName()); + } + + @Test + public void testParseAlterRoutineLoadOnTargetTableRejectMixedProperties() { + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 PROPERTIES(\"max_error_number\"=\"1\")")); + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 FROM KAFKA(\"kafka_offsets\"=\"100\")")); + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 COLUMNS(k1)")); + } + + @Test + public void testValidateTargetTableOnlyDoesNotRequireOtherProperties() throws Exception { + runBefore(); + mockTargetTable(currentTable); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + Assertions.assertEquals("testTable2", command.getTargetTableName()); + } + + @Test + public void testValidateTargetTableRejectsMultiTableJob() throws Exception { + runBefore(); + Mockito.when(routineLoadJob.isMultiTable()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("single-table")); + } + + @Test + public void testValidateTargetTableRejectsWithoutLoadPrivilege() throws Exception { + runBefore(); + mockTargetTable(currentTable); + Mockito.when(accessManager.checkTblPriv(Mockito.any(ConnectContext.class), Mockito.anyString(), + Mockito.eq("testDb"), Mockito.eq("testTable2"), Mockito.any())).thenReturn(false); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("LOAD")); + } + + @Test + public void testValidateTargetTableRejectsTemporaryTable() throws Exception { + runBefore(); + OlapTable tempTable = Mockito.mock(OlapTable.class); + Mockito.when(tempTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(tempTable.isTemporary()).thenReturn(true); + mockTargetTable(tempTable); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("temporary table")); + } + + @Test + public void testValidateTargetTableRejectsLoadToSingleTabletWithoutRandomDistribution() throws Exception { + runBefore(); + OlapTable newTable = Mockito.mock(OlapTable.class); + Mockito.when(newTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(newTable.isTemporary()).thenReturn(false); + Mockito.when(newTable.getDefaultDistributionInfo()).thenReturn(null); + mockTargetTable(newTable); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("load_to_single_tablet")); + } + + @Test + public void testValidateTargetTableAllowsLoadToSingleTabletWithRandomDistribution() throws Exception { + runBefore(); + OlapTable newTable = Mockito.mock(OlapTable.class); + RandomDistributionInfo distributionInfo = Mockito.mock(RandomDistributionInfo.class); + Mockito.when(newTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(newTable.isTemporary()).thenReturn(false); + Mockito.when(newTable.getDefaultDistributionInfo()).thenReturn(distributionInfo); + mockTargetTable(newTable); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } + + @Test + public void testValidateTargetTablePassesTargetTableToJobValidation() throws Exception { + runBefore(); + OlapTable targetTable = Mockito.mock(OlapTable.class); + Mockito.when(targetTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(targetTable.isTemporary()).thenReturn(false); + Mockito.doReturn(targetTable).when(db).getTableOrAnalysisException("testTable2"); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(false); + Mockito.doAnswer(invocation -> { + Assertions.assertSame(db, invocation.getArgument(0)); + Assertions.assertSame(targetTable, invocation.getArgument(1)); + return null; + }).when(routineLoadJob).validateTargetTable(Mockito.any(Database.class), Mockito.any(OlapTable.class)); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } + + @Test + public void testValidateTargetTableRejectsExistingFlexiblePartialUpdateOnNonMowTable() throws Exception { + runBefore(); + OlapTable targetTable = Mockito.mock(OlapTable.class); + Mockito.when(targetTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(targetTable.isTemporary()).thenReturn(false); + Mockito.when(targetTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Mockito.doReturn(targetTable).when(db).getTableOrAnalysisException("testTable2"); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS); + Mockito.doThrow(new AnalysisException("Only unique key merge on write support partial update")) + .when(routineLoadJob).validateTargetTable(Mockito.any(Database.class), Mockito.any(OlapTable.class)); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("partial update")); + } + + @Test + public void testValidateRejectsUniqueKeyUpdateModeOnNonMowTable() { + runBefore(); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPDATE_FIXED_COLUMNS"); + + AlterRoutineLoadCommand command = new AlterRoutineLoadCommand( + new LabelNameInfo("testDb", "label1"), jobProperties, Maps.newHashMap()); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("PARTIAL_COLUMNS")); + } + + @Test + public void testValidateAllowsExplicitUpsertToOverridePartialColumnsOnNonMowTable() { + runBefore(); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPSERT"); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true"); + + AlterRoutineLoadCommand command = new AlterRoutineLoadCommand( + new LabelNameInfo("testDb", "label1"), jobProperties, Maps.newHashMap()); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } + + @Test + public void testValidateAllowsFlexibleAlterToReachFlexibleValidation() { + runBefore(); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPDATE_FLEXIBLE_COLUMNS"); + + AlterRoutineLoadCommand command = new AlterRoutineLoadCommand( + new LabelNameInfo("testDb", "label1"), jobProperties, Maps.newHashMap()); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java index 8a1550d48f5d13..5aaf6423ad22b7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java @@ -18,15 +18,19 @@ package org.apache.doris.persist; import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -86,4 +90,63 @@ public void testSerializeAlterRoutineLoadOperationLog() throws IOException, User } + @Test + public void testSerializeAlterRoutineLoadOperationLogWithTargetTableId() throws Exception { + long jobId = 1000; + long targetTableId = 2001; + Map jobProperties = Maps.newHashMap(); + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("property.group.id", "mygroup"); + KafkaDataSourceProperties routineLoadDataSourceProperties = new KafkaDataSourceProperties( + dataSourceProperties); + routineLoadDataSourceProperties.setAlter(true); + routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + routineLoadDataSourceProperties.analyze(); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId, jobProperties, + routineLoadDataSourceProperties, targetTableId); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(byteArrayOutputStream)) { + log.write(out); + } + + AlterRoutineLoadJobOperationLog readLog; + try (DataInputStream in = new DataInputStream( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))) { + readLog = AlterRoutineLoadJobOperationLog.read(in); + } + + Assert.assertEquals(targetTableId, readLog.getTargetTableId()); + } + + @Test + public void testDeserializeAlterRoutineLoadOperationLogWithoutTargetTableId() throws Exception { + long jobId = 1000; + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5"); + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("property.group.id", "mygroup"); + KafkaDataSourceProperties routineLoadDataSourceProperties = new KafkaDataSourceProperties( + dataSourceProperties); + routineLoadDataSourceProperties.setAlter(true); + routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + routineLoadDataSourceProperties.analyze(); + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId, + jobProperties, routineLoadDataSourceProperties, 0L); + String legacyJson = GsonUtils.GSON.toJson(log).replace(",\"targetTableId\":0", ""); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(byteArrayOutputStream)) { + Text.writeString(out, legacyJson); + } + + AlterRoutineLoadJobOperationLog readLog; + try (DataInputStream in = new DataInputStream( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))) { + readLog = AlterRoutineLoadJobOperationLog.read(in); + } + + Assert.assertEquals(0L, readLog.getTargetTableId()); + Assert.assertEquals("5", readLog.getJobProperties().get(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)); + } } diff --git a/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index cd2391b1775f1e..d16c184be0a814 100644 --- a/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -334,10 +334,13 @@ supportedAlterStatement | ALTER SYSTEM RENAME COMPUTE GROUP name=identifier newName=identifier #alterSystemRenameComputeGroup | ALTER RESOURCE name=identifierOrText properties=propertyClause? #alterResource | ALTER REPOSITORY name=identifier properties=propertyClause? #alterRepository - | ALTER ROUTINE LOAD FOR name=multipartIdentifier - (loadProperty (COMMA loadProperty)*)? - properties=propertyClause? - (FROM type=identifier LEFT_PAREN propertyItemList RIGHT_PAREN)? #alterRoutineLoad + | ALTER ROUTINE LOAD FOR name=multipartIdentifier + ( + ON table=identifier + | (loadProperty (COMMA loadProperty)*)? + properties=propertyClause? + (FROM type=identifier LEFT_PAREN propertyItemList RIGHT_PAREN)? + ) #alterRoutineLoad | ALTER COLOCATE GROUP name=multipartIdentifier SET LEFT_PAREN propertyItemList RIGHT_PAREN #alterColocateGroup | ALTER USER (IF EXISTS)? grantUserIdentify diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_alter.out b/regression-test/data/load_p0/routine_load/test_routine_load_alter.out index 427cdb2439463d..0764212aa7ae7d 100644 --- a/regression-test/data/load_p0/routine_load/test_routine_load_alter.out +++ b/regression-test/data/load_p0/routine_load/test_routine_load_alter.out @@ -10,3 +10,12 @@ 3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" 3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +-- !sql_alter_target_src -- +1 eab +2 eab +3 eab + +-- !sql_alter_target_dst -- +4 eab +5 eab +6 eab diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy index 32571b5e29abd8..a27035e42f2cf3 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy @@ -16,13 +16,16 @@ // under the License. import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.DeleteTopicsOptions +import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerConfig suite("test_routine_load_alter","p0") { + def kafkaCsvDataFile = "test_routine_load_alter" def kafkaCsvTpoics = [ - "test_routine_load_alter", + "test_routine_load_alter_${System.currentTimeMillis()}".toString(), ] String enabled = context.config.otherConfigs.get("enableKafkaTest") String kafka_port = context.config.otherConfigs.get("kafka_port") @@ -38,6 +41,17 @@ suite("test_routine_load_alter","p0") { props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + def adminProps = new Properties() + adminProps.put("bootstrap.servers", "${kafka_broker}".toString()) + def mainTopicAdmin = AdminClient.create(adminProps) + try { + kafkaCsvTpoics.each { topic -> + mainTopicAdmin.createTopics([new NewTopic(topic.toString(), 1, (short) 1)]).all().get() + } + } finally { + mainTopicAdmin.close() + } + // check conenction def verifyKafkaConnection = { prod -> try { @@ -63,7 +77,7 @@ suite("test_routine_load_alter","p0") { logger.info("Kafka connect success") for (String kafkaCsvTopic in kafkaCsvTpoics) { - def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def txt = new File("""${context.file.parent}/data/${kafkaCsvDataFile}.csv""").text def lines = txt.readLines() lines.each { line -> logger.info("=====${line}========") @@ -112,7 +126,7 @@ suite("test_routine_load_alter","p0") { """ sql "sync" - def count = 0 + def visibleCount = 0 while (true) { def res = sql "select count(*) from ${tableName}" def state = sql "show routine load for ${jobName}" @@ -122,13 +136,13 @@ suite("test_routine_load_alter","p0") { if (res[0][0] > 0) { break } - if (count >= 120) { + if (visibleCount >= 120) { log.error("routine load can not visible for long time") assertEquals(20, res[0][0]) break } sleep(5000) - count++ + visibleCount++ } qt_sql_before "select * from ${tableName} order by k1" @@ -145,7 +159,7 @@ suite("test_routine_load_alter","p0") { def producer = new KafkaProducer<>(props) for (String kafkaCsvTopic in kafkaCsvTpoics) { - def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def txt = new File("""${context.file.parent}/data/${kafkaCsvDataFile}.csv""").text def lines = txt.readLines() lines.each { line -> logger.info("=====${line}========") @@ -154,7 +168,7 @@ suite("test_routine_load_alter","p0") { } } - count = 0 + def offsetVisibleCount = 0 while (true) { def res = sql "select count(*) from ${tableName}" log.info("count: ${res[0][0]}".toString()) @@ -165,13 +179,13 @@ suite("test_routine_load_alter","p0") { if (res[0][0] >= 6) { break } - if (count >= 120) { + if (offsetVisibleCount >= 120) { log.error("routine load can not visible for long time") assertEquals(20, res[0][0]) break } sleep(1000) - count++ + offsetVisibleCount++ } qt_sql_alter_after "select * from ${tableName} order by k1" } finally { @@ -198,7 +212,7 @@ suite("test_routine_load_alter","p0") { sql "ALTER ROUTINE LOAD FOR ${jobName} COLUMNS(k1, k2, v1, v2, v3, v4);" sql "ALTER ROUTINE LOAD FOR ${jobName} COLUMNS TERMINATED BY ',';" sql "resume routine load for ${jobName}" - def count = 0 + def columnVisibleCount = 0 while (true) { def res = sql "select count(*) from ${tableName}" log.info("count: ${res[0][0]}".toString()) @@ -210,13 +224,13 @@ suite("test_routine_load_alter","p0") { if (res[0][0] > 0) { break } - if (count >= 120) { + if (columnVisibleCount >= 120) { log.error("routine load can not visible for long time") assertEquals(20, res[0][0]) break } sleep(1000) - count++ + columnVisibleCount++ } def res = sql "select * from ${tableName} order by k1" log.info("res: ${res.size()}".toString()) @@ -293,5 +307,199 @@ suite("test_routine_load_alter","p0") { sql "stop routine load for ${jobName}" sql "truncate table ${tableName}" } + + def mainTopicCleanupProps = new Properties() + mainTopicCleanupProps.put("bootstrap.servers", "${kafka_broker}".toString()) + def mainTopicCleanupAdmin = AdminClient.create(mainTopicCleanupProps) + try { + mainTopicCleanupAdmin.deleteTopics(kafkaCsvTpoics.collect { it.toString() }, + new DeleteTopicsOptions().timeoutMs(10000)).all().get() + } catch (Exception e) { + logger.warn("failed to delete kafka topics ${kafkaCsvTpoics}: ${e.message}".toString()) + } finally { + mainTopicCleanupAdmin.close() + } + + // test alter target table + def srcTableName = "test_routine_load_alter_src" + def dstTableName = "test_routine_load_alter_dst" + def alterTargetTopic = "test_routine_load_alter_target_table_${System.currentTimeMillis()}" + def alterTargetJob = "test_alter_target_table_${System.currentTimeMillis()}" + def alterTopicProducer = null + def alterTopicAdmin = null + try { + sql """ DROP TABLE IF EXISTS ${srcTableName} """ + sql """ DROP TABLE IF EXISTS ${dstTableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${srcTableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql """ + CREATE TABLE IF NOT EXISTS ${dstTableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql "sync" + + def topicProps = new Properties() + topicProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + alterTopicAdmin = AdminClient.create(topicProps) + alterTopicAdmin.createTopics([new NewTopic(alterTargetTopic, 1, (short) 1)]).all().get() + + def producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + alterTopicProducer = new KafkaProducer<>(producerProps) + + def firstBatch = new File("""${context.file.parent}/data/${kafkaCsvDataFile}.csv""").readLines() + firstBatch.each { line -> + alterTopicProducer.send(new ProducerRecord<>(alterTargetTopic, null, line)).get() + } + alterTopicProducer.flush() + + sql """ + CREATE ROUTINE LOAD ${alterTargetJob} ON ${srcTableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${alterTargetTopic}", + "kafka_partitions" = "0", + "kafka_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def targetVisibleCount = 0 + while (true) { + def res = sql "select count(*) from ${srcTableName}" + def state = sql "show routine load for ${alterTargetJob}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] >= 3) { + break + } + if (targetVisibleCount >= 120) { + log.error("routine load can not visible for long time") + assertEquals(3, res[0][0]) + break + } + sleep(1000) + targetVisibleCount++ + } + + sql "pause routine load for ${alterTargetJob}" + def showBeforeAlter = sql "show routine load for ${alterTargetJob}" + assertEquals(srcTableName, showBeforeAlter[0][6].toString()) + def progressBeforeAlter = showBeforeAlter[0][15].toString() + + sql "ALTER ROUTINE LOAD FOR ${alterTargetJob} ON ${dstTableName}" + + def showAfterAlter = sql "show routine load for ${alterTargetJob}" + assertEquals(dstTableName, showAfterAlter[0][6].toString()) + assertEquals(progressBeforeAlter, showAfterAlter[0][15].toString()) + + def secondBatch = [ + "4,eab,2023-07-16,def,2023-07-21:05:48:31,ghi", + "5,eab,2023-07-17,def,2023-07-22:05:48:31,ghi", + "6,eab,2023-07-18,def,2023-07-23:05:48:31,ghi" + ] + secondBatch.each { line -> + alterTopicProducer.send(new ProducerRecord<>(alterTargetTopic, null, line)).get() + } + alterTopicProducer.flush() + + sql "resume routine load for ${alterTargetJob}" + + def targetAlterVisibleCount = 0 + def stableCount = 0 + while (true) { + def srcCount = sql "select count(*) from ${srcTableName}" + def dstCount = sql "select count(*) from ${dstTableName}" + long srcCountValue = (srcCount[0][0] as Number).longValue() + long dstCountValue = (dstCount[0][0] as Number).longValue() + log.info("src count: ${srcCountValue}".toString()) + log.info("dst count: ${dstCountValue}".toString()) + if (srcCountValue == 3 && dstCountValue == 3) { + stableCount++ + if (stableCount >= 5) { + sleep(2000) + def finalSrcCount = sql "select count(*) from ${srcTableName}" + def finalDstCount = sql "select count(*) from ${dstTableName}" + assertEquals(3L, (finalSrcCount[0][0] as Number).longValue()) + assertEquals(3L, (finalDstCount[0][0] as Number).longValue()) + break + } + } else { + stableCount = 0 + if (srcCountValue > 3 || dstCountValue > 3) { + assertEquals(3L, srcCountValue) + assertEquals(3L, dstCountValue) + } + } + if (targetAlterVisibleCount >= 120) { + log.error("routine load target table alter can not visible for long time") + assertEquals(3L, srcCountValue) + assertEquals(3L, dstCountValue) + break + } + sleep(1000) + targetAlterVisibleCount++ + } + + qt_sql_alter_target_src "select k1, k2 from ${srcTableName} order by k1" + qt_sql_alter_target_dst "select k1, k2 from ${dstTableName} order by k1" + } finally { + try { + sql "stop routine load for ${alterTargetJob}" + } catch (Exception e) { + logger.warn("failed to stop alter target routine load: ${e.message}".toString()) + } + if (alterTopicProducer != null) { + alterTopicProducer.close() + } + if (alterTopicAdmin != null) { + try { + alterTopicAdmin.deleteTopics([alterTargetTopic.toString()], + new DeleteTopicsOptions().timeoutMs(10000)).all().get() + } catch (Exception e) { + logger.warn("failed to delete kafka topic ${alterTargetTopic}: ${e.message}".toString()) + } + alterTopicAdmin.close() + } + sql "truncate table ${srcTableName}" + sql "truncate table ${dstTableName}" + } } }