From fb5a55bd0dba94541d407b508fd74da3130dceef Mon Sep 17 00:00:00 2001 From: Refrain Date: Fri, 26 Jun 2026 11:51:32 +0800 Subject: [PATCH 1/7] [feature](fe) Add routine load target-table alter support ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Allow paused single-table Routine Load jobs to switch their target table with ALTER ROUTINE LOAD FOR [db.]job ON , while preserving existing progress and replaying the new table binding from edit log. ### Release note Support ALTER ROUTINE LOAD ... ON
to switch the target table for paused single-table routine load jobs. ### Check List (For Author) - Test: FE unit test - "/data/data3/huangruixin/include/src-master/apache-maven-3.9.9/bin/mvn -pl fe-core -am -DskipITs -Dcheckstyle.skip=true -DfailIfNoTests=false -Dtest=org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommandTest,org.apache.doris.persist.AlterRoutineLoadOperationLogTest,org.apache.doris.load.routineload.KafkaRoutineLoadJobTest test" - Kinesis unit tests skipped per user request - Behavior changed: Yes (new ALTER ROUTINE LOAD target-table switch behavior) - Does this need documentation: Yes (documented in /data/data3/huangruixin/docs/routine-load-alter-table-design.html) --- .../load/routineload/RoutineLoadJob.java | 25 +++ .../kafka/KafkaRoutineLoadJob.java | 8 +- .../kinesis/KinesisRoutineLoadJob.java | 8 +- .../nereids/parser/LogicalPlanBuilder.java | 7 +- .../commands/AlterRoutineLoadCommand.java | 73 ++++++- .../AlterRoutineLoadJobOperationLog.java | 12 ++ .../routineload/KafkaRoutineLoadJobTest.java | 19 ++ .../commands/AlterRoutineLoadCommandTest.java | 180 +++++++++++++++-- .../AlterRoutineLoadOperationLogTest.java | 63 ++++++ .../org/apache/doris/nereids/DorisParser.g4 | 11 +- .../test_routine_load_alter.groovy | 191 ++++++++++++++++++ 11 files changed, 572 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 3223ff913594e1..e7c69da5a77488 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -468,6 +468,31 @@ protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { } } + protected RoutineLoadDesc buildRoutineLoadDescSnapshot() { + List columnsInfo = null; + if (columnDescs != null && !columnDescs.descs.isEmpty()) { + columnsInfo = new ArrayList<>(columnDescs.descs); + } + return new RoutineLoadDesc(columnSeparator, lineDelimiter, columnsInfo, precedingFilter, whereExpr, + partitionNamesInfo, deleteCondition, mergeType, sequenceCol); + } + + public void validateTargetTable(Database db, OlapTable targetTable) throws UserException { + if (isMultiTable) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); + } + checkMeta(targetTable, buildRoutineLoadDescSnapshot()); + + targetTable.readLock(); + try { + NereidsStreamLoadPlanner planner = new NereidsStreamLoadPlanner(db, targetTable, + toNereidsRoutineLoadTaskInfo()); + planner.plan(new TUniqueId(0, 0)); + } finally { + targetTable.readUnlock(); + } + } + @Override public long getId() { return id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java index dcb683e9e227dd..c66f5e2f1a5e70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java @@ -757,9 +757,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti } modifyPropertiesInternal(jobProperties, dataSourceProperties); + if (command.hasTargetTable()) { + this.tableId = command.getTargetTableId(); + } AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, - jobProperties, dataSourceProperties); + jobProperties, dataSourceProperties, command.getTargetTableId()); Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log); } finally { writeUnlock(); @@ -883,6 +886,9 @@ private void resetCloudProgress(Cloud.ResetRLProgressRequest.Builder builder) th public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties()); + if (log.getTargetTableId() != 0) { + this.tableId = log.getTargetTableId(); + } } catch (UserException e) { // should not happen LOG.error("failed to replay modify kafka routine load job: {}", id, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java index 0c05889929924e..a60f1b1f309d76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java @@ -689,9 +689,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti } modifyPropertiesInternal(jobProperties, dataSourceProperties); + if (command.hasTargetTable()) { + this.tableId = command.getTargetTableId(); + } AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, - jobProperties, dataSourceProperties); + jobProperties, dataSourceProperties, command.getTargetTableId()); Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log); } finally { writeUnlock(); @@ -785,6 +788,9 @@ public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { modifyPropertiesInternal(log.getJobProperties(), (KinesisDataSourceProperties) log.getDataSourceProperties()); + if (log.getTargetTableId() != 0) { + this.tableId = log.getTargetTableId(); + } } catch (UserException e) { LOG.error("failed to replay modify kinesis routine load job: {}", id, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index c470bc789d9e78..4116eab04c844a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -9284,6 +9284,10 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx } LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName); + if (ctx.table != null) { + return new AlterRoutineLoadCommand(labelNameInfo, ctx.table.getText()); + } + Map properties = new HashMap<>(); if (ctx.properties != null) { properties.putAll(visitPropertyClause(ctx.properties)); @@ -9309,7 +9313,8 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx } } - return new AlterRoutineLoadCommand(labelNameInfo, loadPropertyMap, properties, dataSourceMapProperties); + return new AlterRoutineLoadCommand(labelNameInfo, null, + loadPropertyMap, properties, dataSourceMapProperties); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java index 367480c5d934d9..dce80149607c60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java @@ -21,12 +21,16 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties; import org.apache.doris.load.RoutineLoadDesc; @@ -40,6 +44,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.mysql.privilege.PrivPredicate; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -85,10 +90,12 @@ public class AlterRoutineLoadCommand extends AlterCommand { .build(); private final LabelNameInfo labelNameInfo; + private final String targetTableName; private final Map loadPropertyMap; private RoutineLoadDesc routineLoadDesc; private final Map jobProperties; private final Map dataSourceMapProperties; + private long targetTableId; private boolean isPartialUpdate; // save analyzed job properties. @@ -100,6 +107,7 @@ public class AlterRoutineLoadCommand extends AlterCommand { * AlterRoutineLoadCommand */ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, + String targetTableName, Map loadPropertyMap, Map jobProperties, Map dataSourceMapProperties) { @@ -108,6 +116,7 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, Objects.requireNonNull(jobProperties, "jobProperties is null"); Objects.requireNonNull(dataSourceMapProperties, "dataSourceMapProperties is null"); this.labelNameInfo = labelNameInfo; + this.targetTableName = targetTableName; this.loadPropertyMap = loadPropertyMap == null ? Maps.newHashMap() : loadPropertyMap; this.jobProperties = jobProperties; this.dataSourceMapProperties = dataSourceMapProperties; @@ -118,7 +127,11 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, Map jobProperties, Map dataSourceMapProperties) { - this(labelNameInfo, Maps.newHashMap(), jobProperties, dataSourceMapProperties); + this(labelNameInfo, null, Maps.newHashMap(), jobProperties, dataSourceMapProperties); + } + + public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, String targetTableName) { + this(labelNameInfo, targetTableName, Maps.newHashMap(), Maps.newHashMap(), Maps.newHashMap()); } public String getDbName() { @@ -133,6 +146,18 @@ public Map getAnalyzedJobProperties() { return analyzedJobProperties; } + public boolean hasTargetTable() { + return !StringUtil.isEmpty(targetTableName); + } + + public String getTargetTableName() { + return targetTableName; + } + + public long getTargetTableId() { + return targetTableId; + } + public boolean hasDataSourceProperty() { return MapUtils.isNotEmpty(dataSourceMapProperties); } @@ -161,18 +186,27 @@ public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception { public void validate(ConnectContext ctx) throws UserException { labelNameInfo.validate(ctx); FeNameFormat.checkCommonName(NAME_TYPE, labelNameInfo.getLabel()); + if (hasTargetTable() && (MapUtils.isNotEmpty(loadPropertyMap) || MapUtils.isNotEmpty(jobProperties) + || MapUtils.isNotEmpty(dataSourceMapProperties))) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change does not support other properties"); + } // check routine load job properties include desired concurrent number etc. checkJobProperties(); // check load properties RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() .getJob(getDbName(), getJobName()); - this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap, - job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType()); + if (MapUtils.isNotEmpty(loadPropertyMap)) { + this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap, + job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType()); + } // check data source properties checkDataSourceProperties(); + if (hasTargetTable()) { + validateTargetTable(ctx, job); + } checkPartialUpdate(); if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties) - && routineLoadDesc == null) { + && MapUtils.isEmpty(loadPropertyMap) && !hasTargetTable()) { throw new AnalysisException("No properties are specified"); } } @@ -333,17 +367,44 @@ private void checkPartialUpdate() throws UserException { return; } RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() - .getJob(getDbName(), getDbName()); + .getJob(getDbName(), getJobName()); if (job.isMultiTable()) { throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load."); } Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); - Table table = db.getTableOrAnalysisException(job.getTableName()); + String tableName = hasTargetTable() ? targetTableName : job.getTableName(); + Table table = db.getTableOrAnalysisException(tableName); if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); } } + private void validateTargetTable(ConnectContext ctx, RoutineLoadJob job) throws UserException { + if (job.isMultiTable()) { + throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); + } + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); + Table table = db.getTableOrAnalysisException(targetTableName); + if (!(table instanceof OlapTable)) { + throw new AnalysisException("ALTER ROUTINE LOAD target table only supports OLAP table"); + } + OlapTable olapTable = (OlapTable) table; + if (olapTable.isTemporary()) { + throw new AnalysisException("Do not support load for temporary table " + olapTable.getDisplayName()); + } + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, InternalCatalog.INTERNAL_CATALOG_NAME, + job.getDbFullName(), targetTableName, PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ctx.getQualifiedUser(), ctx.getRemoteIP(), job.getDbFullName() + ": " + targetTableName); + } + if (job.isLoadToSingleTablet() + && !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) { + throw new AnalysisException("if load_to_single_tablet set to true, the olap table must be with random distribution"); + } + job.validateTargetTable(db, olapTable); + this.targetTableId = olapTable.getId(); + } + @Override public R accept(PlanVisitor visitor, C context) { return visitor.visitAlterRoutineLoadCommand(this, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java index 4729882f7927fb..ae7c8ad214de7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java @@ -37,12 +37,20 @@ public class AlterRoutineLoadJobOperationLog implements Writable { private Map jobProperties; @SerializedName(value = "dataSourceProperties") private AbstractDataSourceProperties dataSourceProperties; + @SerializedName(value = "targetTableId") + private long targetTableId; public AlterRoutineLoadJobOperationLog(long jobId, Map jobProperties, AbstractDataSourceProperties dataSourceProperties) { + this(jobId, jobProperties, dataSourceProperties, 0L); + } + + public AlterRoutineLoadJobOperationLog(long jobId, Map jobProperties, + AbstractDataSourceProperties dataSourceProperties, long targetTableId) { this.jobId = jobId; this.jobProperties = jobProperties; this.dataSourceProperties = dataSourceProperties; + this.targetTableId = targetTableId; } public long getJobId() { @@ -57,6 +65,10 @@ public AbstractDataSourceProperties getDataSourceProperties() { return dataSourceProperties; } + public long getTargetTableId() { + return targetTableId; + } + public static AlterRoutineLoadJobOperationLog read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, AlterRoutineLoadJobOperationLog.class); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 8b442012ff2c04..7a1d11513256c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; +import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TResourceInfo; @@ -415,6 +416,24 @@ public void testFromCreateStmt() throws UserException { } } + @Test + public void testReplayModifyPropertiesSwitchesTargetTableWithoutResettingProgress() { + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, + 101L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN); + Map partitionToOffset = Maps.newHashMap(); + partitionToOffset.put(1, 123L); + KafkaProgress progress = new KafkaProgress(partitionToOffset); + Deencapsulation.setField(routineLoadJob, "progress", progress); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(1L, + Maps.newHashMap(), null, 202L); + routineLoadJob.replayModifyProperties(log); + + Assert.assertEquals(202L, routineLoadJob.getTableId()); + Assert.assertSame(progress, routineLoadJob.getProgress()); + Assert.assertEquals(Long.valueOf(123L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); + } + private CreateRoutineLoadInfo initCreateRoutineLoadInfo() { Map properties = Maps.newHashMap(); properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java index 0989bc127b046b..f52b269bfb79e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java @@ -19,16 +19,23 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.nereids.exceptions.ParseException; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Maps; import org.junit.jupiter.api.AfterEach; @@ -41,8 +48,16 @@ import java.util.Map; public class AlterRoutineLoadCommandTest { + private static final NereidsParser PARSER = new NereidsParser(); + private Env env; private ConnectContext connectContext; + private AccessControllerManager accessManager; + private InternalCatalog catalog; + private Database db; + private OlapTable currentTable; + private RoutineLoadManager routineLoadManager; + private RoutineLoadJob routineLoadJob; private MockedStatic envMockedStatic; private MockedStatic ctxMockedStatic; @@ -50,26 +65,38 @@ public class AlterRoutineLoadCommandTest { public void setUp() throws Exception { env = Mockito.mock(Env.class); connectContext = Mockito.mock(ConnectContext.class); + accessManager = Mockito.mock(AccessControllerManager.class); + catalog = Mockito.mock(InternalCatalog.class); + db = Mockito.mock(Database.class); + currentTable = Mockito.mock(OlapTable.class); + routineLoadManager = Mockito.mock(RoutineLoadManager.class); + routineLoadJob = Mockito.mock(RoutineLoadJob.class); envMockedStatic = Mockito.mockStatic(Env.class); ctxMockedStatic = Mockito.mockStatic(ConnectContext.class); envMockedStatic.when(Env::getCurrentEnv).thenReturn(env); ctxMockedStatic.when(ConnectContext::get).thenReturn(connectContext); Mockito.when(connectContext.getSessionVariable()).thenReturn(new SessionVariable()); Mockito.when(connectContext.getState()).thenReturn(new QueryState()); - InternalCatalog catalog = Mockito.mock(InternalCatalog.class); - Database db = Mockito.mock(Database.class); - Table tbl = Mockito.mock(Table.class); + Mockito.when(connectContext.getQualifiedUser()).thenReturn("testUser"); + Mockito.when(connectContext.getRemoteIP()).thenReturn("127.0.0.1"); envMockedStatic.when(Env::getCurrentInternalCatalog).thenReturn(catalog); Mockito.doReturn(db).when(catalog).getDbOrAnalysisException(Mockito.anyString()); - Mockito.doReturn(tbl).when(db).getTableOrAnalysisException(Mockito.anyString()); - Mockito.when(env.getRoutineLoadManager()).thenReturn(Mockito.mock(RoutineLoadManager.class)); - RoutineLoadManager rlm = env.getRoutineLoadManager(); - RoutineLoadJob rlJob = Mockito.mock(RoutineLoadJob.class); - Mockito.when(rlm.getJob(Mockito.anyString(), Mockito.anyString())).thenReturn(rlJob); - Mockito.when(rlJob.getDbFullName()).thenReturn("testDb"); - Mockito.when(rlJob.getTableName()).thenReturn("testTable"); - Mockito.when(rlJob.isMultiTable()).thenReturn(false); - Mockito.when(rlJob.getMergeType()).thenReturn(LoadTask.MergeType.APPEND); + Mockito.doReturn(currentTable).when(db).getTableOrAnalysisException(Mockito.anyString()); + Mockito.when(env.getRoutineLoadManager()).thenReturn(routineLoadManager); + Mockito.when(env.getAccessManager()).thenReturn(accessManager); + Mockito.when(accessManager.checkTblPriv(Mockito.any(ConnectContext.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(true); + Mockito.when(routineLoadManager.getJob(Mockito.anyString(), Mockito.anyString())).thenReturn(routineLoadJob); + Mockito.when(routineLoadJob.getDbFullName()).thenReturn("testDb"); + Mockito.when(routineLoadJob.getTableName()).thenReturn("testTable"); + Mockito.when(routineLoadJob.getDbId()).thenReturn(1000L); + Mockito.when(routineLoadJob.getTableId()).thenReturn(2000L); + Mockito.when(routineLoadJob.isMultiTable()).thenReturn(false); + Mockito.when(routineLoadJob.getMergeType()).thenReturn(LoadTask.MergeType.APPEND); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(false); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(currentTable.isTemporary()).thenReturn(false); } @AfterEach @@ -86,6 +113,14 @@ private void runBefore() { Mockito.when(connectContext.isSkipAuth()).thenReturn(true); } + private void mockTargetTable(Table table) { + try { + Mockito.doReturn(table).when(db).getTableOrAnalysisException("testTable2"); + } catch (AnalysisException e) { + throw new RuntimeException(e); + } + } + @Test public void testValidate() { runBefore(); @@ -115,4 +150,125 @@ public void testValidate() { Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.STRICT_MODE)); Assertions.assertTrue(command.getAnalyzedJobProperties().containsKey(CreateRoutineLoadInfo.TIMEZONE)); } + + @Test + public void testParseAlterRoutineLoadOnTargetTable() { + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + Assertions.assertEquals("testDb", command.getDbName()); + Assertions.assertEquals("label1", command.getJobName()); + Assertions.assertTrue(command.hasTargetTable()); + Assertions.assertEquals("testTable2", command.getTargetTableName()); + } + + @Test + public void testParseAlterRoutineLoadOnTargetTableRejectMixedProperties() { + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 PROPERTIES(\"max_error_number\"=\"1\")")); + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 FROM KAFKA(\"kafka_offsets\"=\"100\")")); + Assertions.assertThrows(ParseException.class, () -> PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2 COLUMNS(k1)")); + } + + @Test + public void testValidateTargetTableOnlyDoesNotRequireOtherProperties() throws Exception { + runBefore(); + mockTargetTable(currentTable); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + Assertions.assertEquals("testTable2", command.getTargetTableName()); + } + + @Test + public void testValidateTargetTableRejectsMultiTableJob() throws Exception { + runBefore(); + Mockito.when(routineLoadJob.isMultiTable()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("single-table")); + } + + @Test + public void testValidateTargetTableRejectsWithoutLoadPrivilege() throws Exception { + runBefore(); + mockTargetTable(currentTable); + Mockito.when(accessManager.checkTblPriv(Mockito.any(ConnectContext.class), Mockito.anyString(), + Mockito.eq("testDb"), Mockito.eq("testTable2"), Mockito.any())).thenReturn(false); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("LOAD")); + } + + @Test + public void testValidateTargetTableRejectsTemporaryTable() throws Exception { + runBefore(); + OlapTable tempTable = Mockito.mock(OlapTable.class); + Mockito.when(tempTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(tempTable.isTemporary()).thenReturn(true); + mockTargetTable(tempTable); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("temporary table")); + } + + @Test + public void testValidateTargetTableRejectsLoadToSingleTabletWithoutRandomDistribution() throws Exception { + runBefore(); + OlapTable newTable = Mockito.mock(OlapTable.class); + Mockito.when(newTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(newTable.isTemporary()).thenReturn(false); + Mockito.when(newTable.getDefaultDistributionInfo()).thenReturn(null); + mockTargetTable(newTable); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("load_to_single_tablet")); + } + + @Test + public void testValidateTargetTableAllowsLoadToSingleTabletWithRandomDistribution() throws Exception { + runBefore(); + OlapTable newTable = Mockito.mock(OlapTable.class); + RandomDistributionInfo distributionInfo = Mockito.mock(RandomDistributionInfo.class); + Mockito.when(newTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(newTable.isTemporary()).thenReturn(false); + Mockito.when(newTable.getDefaultDistributionInfo()).thenReturn(distributionInfo); + mockTargetTable(newTable); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(true); + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } + + @Test + public void testValidateTargetTablePassesTargetTableToJobValidation() throws Exception { + runBefore(); + OlapTable targetTable = Mockito.mock(OlapTable.class); + Mockito.when(targetTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(targetTable.isTemporary()).thenReturn(false); + Mockito.doReturn(targetTable).when(db).getTableOrAnalysisException("testTable2"); + Mockito.when(routineLoadJob.isLoadToSingleTablet()).thenReturn(false); + Mockito.doAnswer(invocation -> { + Assertions.assertSame(db, invocation.getArgument(0)); + Assertions.assertSame(targetTable, invocation.getArgument(1)); + return null; + }).when(routineLoadJob).validateTargetTable(Mockito.any(Database.class), Mockito.any(OlapTable.class)); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java index 8a1550d48f5d13..063a6067581c17 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java @@ -19,14 +19,18 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.io.Text; import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -86,4 +90,63 @@ public void testSerializeAlterRoutineLoadOperationLog() throws IOException, User } + @Test + public void testSerializeAlterRoutineLoadOperationLogWithTargetTableId() throws Exception { + long jobId = 1000; + long targetTableId = 2001; + Map jobProperties = Maps.newHashMap(); + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("property.group.id", "mygroup"); + KafkaDataSourceProperties routineLoadDataSourceProperties = new KafkaDataSourceProperties( + dataSourceProperties); + routineLoadDataSourceProperties.setAlter(true); + routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + routineLoadDataSourceProperties.analyze(); + + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId, jobProperties, + routineLoadDataSourceProperties, targetTableId); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(byteArrayOutputStream)) { + log.write(out); + } + + AlterRoutineLoadJobOperationLog readLog; + try (DataInputStream in = new DataInputStream( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))) { + readLog = AlterRoutineLoadJobOperationLog.read(in); + } + + Assert.assertEquals(targetTableId, readLog.getTargetTableId()); + } + + @Test + public void testDeserializeAlterRoutineLoadOperationLogWithoutTargetTableId() throws Exception { + long jobId = 1000; + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5"); + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put("property.group.id", "mygroup"); + KafkaDataSourceProperties routineLoadDataSourceProperties = new KafkaDataSourceProperties( + dataSourceProperties); + routineLoadDataSourceProperties.setAlter(true); + routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + routineLoadDataSourceProperties.analyze(); + AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId, + jobProperties, routineLoadDataSourceProperties, 0L); + String legacyJson = GsonUtils.GSON.toJson(log).replace(",\"targetTableId\":0", ""); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(byteArrayOutputStream)) { + Text.writeString(out, legacyJson); + } + + AlterRoutineLoadJobOperationLog readLog; + try (DataInputStream in = new DataInputStream( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))) { + readLog = AlterRoutineLoadJobOperationLog.read(in); + } + + Assert.assertEquals(0L, readLog.getTargetTableId()); + Assert.assertEquals("5", readLog.getJobProperties().get(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)); + } } diff --git a/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index cd2391b1775f1e..d16c184be0a814 100644 --- a/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-sql-parser/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -334,10 +334,13 @@ supportedAlterStatement | ALTER SYSTEM RENAME COMPUTE GROUP name=identifier newName=identifier #alterSystemRenameComputeGroup | ALTER RESOURCE name=identifierOrText properties=propertyClause? #alterResource | ALTER REPOSITORY name=identifier properties=propertyClause? #alterRepository - | ALTER ROUTINE LOAD FOR name=multipartIdentifier - (loadProperty (COMMA loadProperty)*)? - properties=propertyClause? - (FROM type=identifier LEFT_PAREN propertyItemList RIGHT_PAREN)? #alterRoutineLoad + | ALTER ROUTINE LOAD FOR name=multipartIdentifier + ( + ON table=identifier + | (loadProperty (COMMA loadProperty)*)? + properties=propertyClause? + (FROM type=identifier LEFT_PAREN propertyItemList RIGHT_PAREN)? + ) #alterRoutineLoad | ALTER COLOCATE GROUP name=multipartIdentifier SET LEFT_PAREN propertyItemList RIGHT_PAREN #alterColocateGroup | ALTER USER (IF EXISTS)? grantUserIdentify diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy index 32571b5e29abd8..f19ac5bb729904 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy @@ -16,6 +16,7 @@ // under the License. import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerConfig @@ -293,5 +294,195 @@ suite("test_routine_load_alter","p0") { sql "stop routine load for ${jobName}" sql "truncate table ${tableName}" } + + // test alter target table + def srcTableName = "test_routine_load_alter_src" + def dstTableName = "test_routine_load_alter_dst" + def alterTargetTopic = "test_routine_load_alter_target_table_${System.currentTimeMillis()}" + def alterTargetJob = "test_alter_target_table_${System.currentTimeMillis()}" + def alterTopicProducer = null + def alterTopicAdmin = null + try { + sql """ DROP TABLE IF EXISTS ${srcTableName} """ + sql """ DROP TABLE IF EXISTS ${dstTableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${srcTableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql """ + CREATE TABLE IF NOT EXISTS ${dstTableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql "sync" + + def topicProps = new Properties() + topicProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + alterTopicAdmin = AdminClient.create(topicProps) + alterTopicAdmin.createTopics([new NewTopic(alterTargetTopic, 1, (short) 1)]).all().get() + + def producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + alterTopicProducer = new KafkaProducer<>(producerProps) + + def firstBatch = new File("""${context.file.parent}/data/${kafkaCsvTpoics[0]}.csv""").readLines() + firstBatch.each { line -> + alterTopicProducer.send(new ProducerRecord<>(alterTargetTopic, null, line)).get() + } + alterTopicProducer.flush() + + sql """ + CREATE ROUTINE LOAD ${alterTargetJob} ON ${srcTableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${alterTargetTopic}", + "kafka_partitions" = "0", + "kafka_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + count = 0 + while (true) { + def res = sql "select count(*) from ${srcTableName}" + def state = sql "show routine load for ${alterTargetJob}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] >= 3) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(3, res[0][0]) + break + } + sleep(1000) + count++ + } + + sql "pause routine load for ${alterTargetJob}" + def showBeforeAlter = sql "show routine load for ${alterTargetJob}" + assertEquals(srcTableName, showBeforeAlter[0][6].toString()) + def progressBeforeAlter = showBeforeAlter[0][15].toString() + + sql "ALTER ROUTINE LOAD FOR ${alterTargetJob} ON ${dstTableName}" + + def showAfterAlter = sql "show routine load for ${alterTargetJob}" + assertEquals(dstTableName, showAfterAlter[0][6].toString()) + assertEquals(progressBeforeAlter, showAfterAlter[0][15].toString()) + + def secondBatch = [ + "4,eab,2023-07-16,def,2023-07-21:05:48:31,ghi", + "5,eab,2023-07-17,def,2023-07-22:05:48:31,ghi", + "6,eab,2023-07-18,def,2023-07-23:05:48:31,ghi" + ] + secondBatch.each { line -> + alterTopicProducer.send(new ProducerRecord<>(alterTargetTopic, null, line)).get() + } + alterTopicProducer.flush() + + sql "resume routine load for ${alterTargetJob}" + + count = 0 + def stableCount = 0 + while (true) { + def srcCount = sql "select count(*) from ${srcTableName}" + def dstCount = sql "select count(*) from ${dstTableName}" + long srcCountValue = (srcCount[0][0] as Number).longValue() + long dstCountValue = (dstCount[0][0] as Number).longValue() + log.info("src count: ${srcCountValue}".toString()) + log.info("dst count: ${dstCountValue}".toString()) + if (srcCountValue == 3 && dstCountValue == 3) { + stableCount++ + if (stableCount >= 5) { + sleep(2000) + def finalSrcCount = sql "select count(*) from ${srcTableName}" + def finalDstCount = sql "select count(*) from ${dstTableName}" + assertEquals(3L, (finalSrcCount[0][0] as Number).longValue()) + assertEquals(3L, (finalDstCount[0][0] as Number).longValue()) + break + } + } else { + stableCount = 0 + if (srcCountValue > 3 || dstCountValue > 3) { + assertEquals(3L, srcCountValue) + assertEquals(3L, dstCountValue) + } + } + if (count >= 120) { + log.error("routine load target table alter can not visible for long time") + assertEquals(3L, srcCountValue) + assertEquals(3L, dstCountValue) + break + } + sleep(1000) + count++ + } + + def srcRows = sql "select k1, k2 from ${srcTableName} order by k1" + def dstRows = sql "select k1, k2 from ${dstTableName} order by k1" + assertEquals(3, srcRows.size()) + assertEquals(3, dstRows.size()) + assertEquals(1, srcRows[0][0]) + assertEquals("eab", srcRows[0][1]) + assertEquals(2, srcRows[1][0]) + assertEquals("eab", srcRows[1][1]) + assertEquals(3, srcRows[2][0]) + assertEquals("eab", srcRows[2][1]) + assertEquals(4, dstRows[0][0]) + assertEquals("eab", dstRows[0][1]) + assertEquals(5, dstRows[1][0]) + assertEquals("eab", dstRows[1][1]) + assertEquals(6, dstRows[2][0]) + assertEquals("eab", dstRows[2][1]) + } finally { + try { + sql "stop routine load for ${alterTargetJob}" + } catch (Exception e) { + logger.warn("failed to stop alter target routine load: ${e.message}".toString()) + } + if (alterTopicProducer != null) { + alterTopicProducer.close() + } + if (alterTopicAdmin != null) { + alterTopicAdmin.close() + } + sql "truncate table ${srcTableName}" + sql "truncate table ${dstTableName}" + } } } From 03cf6269e260ad9bfbb2ce4fa13b26076317f650 Mon Sep 17 00:00:00 2001 From: Refrain Date: Fri, 26 Jun 2026 14:39:11 +0800 Subject: [PATCH 2/7] [fix](fe) Tighten routine load alter validation ### What problem does this PR solve? Issue Number: None Related PR: #64878 Problem Summary: Follow-up review found two issues in the new routine load target-table alter support. First, `AlterRoutineLoadCommand` had an import order regression that could fail FE checkstyle. Second, alter validation only rechecked `PARTIAL_COLUMNS=true` from the current command, which left the effective partial-update state under-validated when the existing job or a `unique_key_update_mode` change required merge-on-write semantics. This change restores import ordering, validates the effective unique key update mode against the destination table, and adds focused FE unit coverage for those cases. ### Release note Routine Load alter now rejects target-table or unique-key-update changes that are incompatible with partial update requirements. ### Check List (For Author) - Test: Unit Test - Behavior changed: Yes - Does this need documentation: No --- .../load/routineload/RoutineLoadJob.java | 3 +- .../commands/AlterRoutineLoadCommand.java | 24 ++++++++++--- .../commands/AlterRoutineLoadCommandTest.java | 34 +++++++++++++++++++ 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index e7c69da5a77488..407ce7ecd86fba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprToSqlVisitor; +import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.ToSqlParams; import org.apache.doris.analysis.UserIdentity; @@ -469,7 +470,7 @@ protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { } protected RoutineLoadDesc buildRoutineLoadDescSnapshot() { - List columnsInfo = null; + List columnsInfo = null; if (columnDescs != null && !columnDescs.descs.isEmpty()) { columnsInfo = new ArrayList<>(columnDescs.descs); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java index dce80149607c60..7d392e5d6d6fea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java @@ -37,6 +37,7 @@ import org.apache.doris.load.routineload.AbstractDataSourceProperties; import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; @@ -44,7 +45,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -363,22 +364,35 @@ private void checkDataSourceProperties() throws UserException { } private void checkPartialUpdate() throws UserException { - if (!isPartialUpdate) { - return; - } RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() .getJob(getDbName(), getJobName()); + TUniqueKeyUpdateMode uniqueKeyUpdateMode = getEffectiveUniqueKeyUpdateMode(job); + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { + return; + } if (job.isMultiTable()) { throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load."); } Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); String tableName = hasTargetTable() ? targetTableName : job.getTableName(); Table table = db.getTableOrAnalysisException(tableName); - if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { + if (!((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); } } + private TUniqueKeyUpdateMode getEffectiveUniqueKeyUpdateMode(RoutineLoadJob job) { + if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + return TUniqueKeyUpdateMode.valueOf( + analyzedJobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)); + } + if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS) && isPartialUpdate + && job.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPSERT) { + return TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + return job.getUniqueKeyUpdateMode(); + } + private void validateTargetTable(ConnectContext ctx, RoutineLoadJob job) throws UserException { if (job.isMultiTable()) { throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java index f52b269bfb79e6..93aa02e72b179f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java @@ -271,4 +271,38 @@ public void testValidateTargetTablePassesTargetTableToJobValidation() throws Exc Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); } + + @Test + public void testValidateTargetTableRejectsExistingFlexiblePartialUpdateOnNonMowTable() throws Exception { + runBefore(); + OlapTable targetTable = Mockito.mock(OlapTable.class); + Mockito.when(targetTable.getType()).thenReturn(Table.TableType.OLAP); + Mockito.when(targetTable.isTemporary()).thenReturn(false); + Mockito.when(targetTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Mockito.doReturn(targetTable).when(db).getTableOrAnalysisException("testTable2"); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS); + Mockito.doThrow(new AnalysisException("Only unique key merge on write support partial update")) + .when(routineLoadJob).validateTargetTable(Mockito.any(Database.class), Mockito.any(OlapTable.class)); + + AlterRoutineLoadCommand command = (AlterRoutineLoadCommand) PARSER.parseSingle( + "ALTER ROUTINE LOAD FOR testDb.label1 ON testTable2"); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("partial update")); + } + + @Test + public void testValidateRejectsUniqueKeyUpdateModeOnNonMowTable() { + runBefore(); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPDATE_FIXED_COLUMNS"); + + AlterRoutineLoadCommand command = new AlterRoutineLoadCommand( + new LabelNameInfo("testDb", "label1"), jobProperties, Maps.newHashMap()); + + Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) + .getMessage().contains("PARTIAL_COLUMNS")); + } } From 9b2cf366a50563ec48449548b75778605826517a Mon Sep 17 00:00:00 2001 From: Refrain Date: Mon, 29 Jun 2026 09:40:17 +0800 Subject: [PATCH 3/7] [fix](fe) Fix routine load alter checkstyle issues ### What problem does this PR solve? Issue Number: None Related PR: #64878 Problem Summary: The routine load target-table alter change hit FE checkstyle in CI because one validation error message exceeded the line-length limit and a unit-test import order did not match the FE custom import ordering rule. This commit makes the minimal formatting-only fixes so the branch aligns with FE style checks. ### Release note None ### Check List (For Author) - Test: No need to test (formatting-only fix requested by reviewer; no local build or test run) - Behavior changed: No - Does this need documentation: No --- .../nereids/trees/plans/commands/AlterRoutineLoadCommand.java | 3 ++- .../apache/doris/persist/AlterRoutineLoadOperationLogTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java index 7d392e5d6d6fea..cd6e95f8c3c2a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java @@ -413,7 +413,8 @@ private void validateTargetTable(ConnectContext ctx, RoutineLoadJob job) throws } if (job.isLoadToSingleTablet() && !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) { - throw new AnalysisException("if load_to_single_tablet set to true, the olap table must be with random distribution"); + throw new AnalysisException( + "if load_to_single_tablet set to true, the olap table must be with random distribution"); } job.validateTargetTable(db, olapTable); this.targetTableId = olapTable.getId(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java index 063a6067581c17..5aaf6423ad22b7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java @@ -18,8 +18,8 @@ package org.apache.doris.persist; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; From 760890d5d7eef15765b1fc92f16783d7301870c2 Mon Sep 17 00:00:00 2001 From: Refrain Date: Mon, 29 Jun 2026 10:56:33 +0800 Subject: [PATCH 4/7] [fix](fe) Refine routine load alter target-table follow-up ### What problem does this PR solve? Issue Number: None Related PR: #64878 Problem Summary: The routine load target-table alter implementation had a single-use helper for constructing the validation load descriptor snapshot, and the parser branch for table-only alter did not mark the intended phase-one scope. This commit inlines the one-off snapshot construction at the validation call site and documents that the current parser branch only supports target table alteration before future support for combining target-table and property changes. ### Release note None ### Check List (For Author) - Test: No need to test (review follow-up only; no local build or test run) - Behavior changed: No - Does this need documentation: No --- .../doris/load/routineload/RoutineLoadJob.java | 16 ++++++---------- .../doris/nereids/parser/LogicalPlanBuilder.java | 2 ++ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 407ce7ecd86fba..0b8ca3c5b52f30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -469,20 +469,16 @@ protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { } } - protected RoutineLoadDesc buildRoutineLoadDescSnapshot() { - List columnsInfo = null; - if (columnDescs != null && !columnDescs.descs.isEmpty()) { - columnsInfo = new ArrayList<>(columnDescs.descs); - } - return new RoutineLoadDesc(columnSeparator, lineDelimiter, columnsInfo, precedingFilter, whereExpr, - partitionNamesInfo, deleteCondition, mergeType, sequenceCol); - } - public void validateTargetTable(Database db, OlapTable targetTable) throws UserException { if (isMultiTable) { throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job"); } - checkMeta(targetTable, buildRoutineLoadDescSnapshot()); + List columnsInfo = null; + if (columnDescs != null && !columnDescs.descs.isEmpty()) { + columnsInfo = new ArrayList<>(columnDescs.descs); + } + checkMeta(targetTable, new RoutineLoadDesc(columnSeparator, lineDelimiter, columnsInfo, precedingFilter, + whereExpr, partitionNamesInfo, deleteCondition, mergeType, sequenceCol)); targetTable.readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 4116eab04c844a..0b6f5f32c8a564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -9284,6 +9284,8 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx } LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName); + // TODO: Phase 1 only supports altering the target table. Phase 2 will allow altering + // the target table and routine load properties in the same statement. if (ctx.table != null) { return new AlterRoutineLoadCommand(labelNameInfo, ctx.table.getText()); } From 931b71726a52611851f489bdd831a78fb06e1ece Mon Sep 17 00:00:00 2001 From: Refrain Date: Mon, 29 Jun 2026 14:28:24 +0800 Subject: [PATCH 5/7] [fix](fe) Align routine load alter partial update mode ### What problem does this PR solve? Issue Number: None Related PR: #64878 Problem Summary: ALTER ROUTINE LOAD validation handled unique_key_update_mode and legacy partial_columns with different precedence from the mutation path. An ALTER containing unique_key_update_mode=UPSERT and partial_columns=true could pass validation on a non-MoW table, then be applied as UPDATE_FIXED_COLUMNS. Flexible partial-update ALTERs on non-MoW tables were also rejected by the generic PARTIAL_COLUMNS validation before reaching the flexible partial-update validation path. This change makes generic partial-column validation apply only to fixed partial update mode and makes the mutation path ignore legacy partial_columns when an explicit unique_key_update_mode is present. The routine load alter regression test also declares its polling counters as local variables to satisfy regression framework script checks. ### Release note None ### Check List (For Author) - Test: Unit Test - ./run-fe-ut.sh --run org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommandTest,org.apache.doris.load.routineload.KafkaRoutineLoadJobTest,org.apache.doris.load.routineload.KinesisRoutineLoadJobTest - Behavior changed: No - Does this need documentation: No --- .../load/routineload/RoutineLoadJob.java | 4 ++- .../kafka/KafkaRoutineLoadJob.java | 4 --- .../kinesis/KinesisRoutineLoadJob.java | 4 --- .../commands/AlterRoutineLoadCommand.java | 2 +- .../routineload/KafkaRoutineLoadJobTest.java | 19 ++++++++++++ .../KinesisRoutineLoadJobTest.java | 21 ++++++++++++++ .../commands/AlterRoutineLoadCommandTest.java | 29 +++++++++++++++++++ .../test_routine_load_alter.groovy | 6 ++-- 8 files changed, 76 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0b8ca3c5b52f30..2dd001b29e9eee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -2088,7 +2088,8 @@ protected void modifyCommonJobProperties(Map jobProperties) thro jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)); } - if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + boolean hasExplicitUniqueKeyUpdateMode = jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); + if (hasExplicitUniqueKeyUpdateMode) { String modeStr = jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); TUniqueKeyUpdateMode newMode = CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr); // Validate flexible partial update constraints when changing to UPDATE_FLEXIBLE_COLUMNS @@ -2099,6 +2100,7 @@ protected void modifyCommonJobProperties(Map jobProperties) thro this.isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name()); this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); + jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS); } if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java index c66f5e2f1a5e70..f45a2295b1c0b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java @@ -73,7 +73,6 @@ import com.google.gson.annotations.SerializedName; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.BooleanUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -847,9 +846,6 @@ private void modifyPropertiesInternal(Map jobProperties, Map copiedJobProperties = Maps.newHashMap(jobProperties); modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); - if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { - this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS)); - } if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) { String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY); if ("ERROR".equalsIgnoreCase(policy)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java index a60f1b1f309d76..4f97857ff7a0a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kinesis/KinesisRoutineLoadJob.java @@ -65,7 +65,6 @@ import com.google.gson.annotations.SerializedName; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.BooleanUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -767,9 +766,6 @@ private void modifyPropertiesInternal(Map jobProperties, Map copiedJobProperties = Maps.newHashMap(jobProperties); modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); - if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { - this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS)); - } if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) { String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY); if ("ERROR".equalsIgnoreCase(policy)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java index cd6e95f8c3c2a6..6fad5299a477ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java @@ -367,7 +367,7 @@ private void checkPartialUpdate() throws UserException { RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() .getJob(getDbName(), getJobName()); TUniqueKeyUpdateMode uniqueKeyUpdateMode = getEffectiveUniqueKeyUpdateMode(job); - if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { + if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { return; } if (job.isMultiTable()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 7a1d11513256c8..069f57199550c7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -47,6 +47,7 @@ import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -198,6 +199,24 @@ public void testUpdateLagRefreshesLatestOffsetCache() throws UserException { } } + @Test + public void testModifyPropertiesHonorsExplicitUniqueKeyUpdateModePrecedence() throws Exception { + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, + 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN); + Deencapsulation.setField(routineLoadJob, "uniqueKeyUpdateMode", TUniqueKeyUpdateMode.UPSERT); + Deencapsulation.setField(routineLoadJob, "isPartialUpdate", false); + + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPSERT"); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true"); + + Deencapsulation.invoke(routineLoadJob, "modifyPropertiesInternal", jobProperties, + new KafkaDataSourceProperties(Maps.newHashMap())); + + Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT, routineLoadJob.getUniqueKeyUpdateMode()); + Assert.assertFalse(routineLoadJob.isFixedPartialUpdate()); + } + @Test public void testUpdateLagRebuildsConvertedPropertiesAfterReplay() throws UserException { KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L, diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KinesisRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KinesisRoutineLoadJobTest.java index aa1dc052605f2b..67688ab2e790e7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KinesisRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KinesisRoutineLoadJobTest.java @@ -25,6 +25,8 @@ import org.apache.doris.load.routineload.kinesis.KinesisProgress; import org.apache.doris.load.routineload.kinesis.KinesisRoutineLoadJob; import org.apache.doris.load.routineload.kinesis.KinesisTaskInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -101,6 +103,25 @@ public void testGetStatisticContainsKinesisFields() { Assert.assertEquals(100L, ((Number) statistic.get("maxMillisBehindLatest")).longValue()); } + @Test + public void testModifyPropertiesHonorsExplicitUniqueKeyUpdateModePrecedence() throws Exception { + KinesisRoutineLoadJob routineLoadJob = + new KinesisRoutineLoadJob(1L, "kinesis_routine_load_job", 1L, + 1L, "ap-southeast-1", "stream-1", UserIdentity.ADMIN); + Deencapsulation.setField(routineLoadJob, "uniqueKeyUpdateMode", TUniqueKeyUpdateMode.UPSERT); + Deencapsulation.setField(routineLoadJob, "isPartialUpdate", false); + + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPSERT"); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true"); + + Deencapsulation.invoke(routineLoadJob, "modifyPropertiesInternal", jobProperties, + new KinesisDataSourceProperties(Maps.newHashMap())); + + Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT, routineLoadJob.getUniqueKeyUpdateMode()); + Assert.assertFalse(routineLoadJob.isFixedPartialUpdate()); + } + @Test public void testHasMoreDataToConsumeShouldKeepPollingWhenLagCacheIsZero() throws Exception { KinesisRoutineLoadJob routineLoadJob = diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java index 93aa02e72b179f..f7fd80622ec1eb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommandTest.java @@ -305,4 +305,33 @@ public void testValidateRejectsUniqueKeyUpdateModeOnNonMowTable() { Assertions.assertTrue(Assertions.assertThrows(Exception.class, () -> command.validate(connectContext)) .getMessage().contains("PARTIAL_COLUMNS")); } + + @Test + public void testValidateAllowsExplicitUpsertToOverridePartialColumnsOnNonMowTable() { + runBefore(); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPSERT"); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true"); + + AlterRoutineLoadCommand command = new AlterRoutineLoadCommand( + new LabelNameInfo("testDb", "label1"), jobProperties, Maps.newHashMap()); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } + + @Test + public void testValidateAllowsFlexibleAlterToReachFlexibleValidation() { + runBefore(); + Mockito.when(routineLoadJob.getUniqueKeyUpdateMode()).thenReturn(TUniqueKeyUpdateMode.UPSERT); + Mockito.when(currentTable.getEnableUniqueKeyMergeOnWrite()).thenReturn(false); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPDATE_FLEXIBLE_COLUMNS"); + + AlterRoutineLoadCommand command = new AlterRoutineLoadCommand( + new LabelNameInfo("testDb", "label1"), jobProperties, Maps.newHashMap()); + + Assertions.assertDoesNotThrow(() -> command.validate(connectContext)); + } } diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy index f19ac5bb729904..81817a60090e93 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy @@ -155,7 +155,7 @@ suite("test_routine_load_alter","p0") { } } - count = 0 + def count = 0 while (true) { def res = sql "select count(*) from ${tableName}" log.info("count: ${res[0][0]}".toString()) @@ -375,7 +375,7 @@ suite("test_routine_load_alter","p0") { """ sql "sync" - count = 0 + def count = 0 while (true) { def res = sql "select count(*) from ${srcTableName}" def state = sql "show routine load for ${alterTargetJob}" @@ -417,7 +417,7 @@ suite("test_routine_load_alter","p0") { sql "resume routine load for ${alterTargetJob}" - count = 0 + def count = 0 def stableCount = 0 while (true) { def srcCount = sql "select count(*) from ${srcTableName}" From 48f1b8c98a9525dcaf78df75897067e5faa65bd0 Mon Sep 17 00:00:00 2001 From: Refrain Date: Mon, 29 Jun 2026 15:54:22 +0800 Subject: [PATCH 6/7] [fix](fe) Fix routine load alter checkstyle line length ### What problem does this PR solve? Issue Number: close #xxx Related PR: #64878 Problem Summary: The PR pipeline failed in FE checkstyle because a RoutineLoadJob line added for altering unique key update mode exceeded the 120-character limit. Wrap the containsKey call without changing behavior so FE checkstyle can pass. ### Release note None ### Check List (For Author) - Test: Manual test - ./build.sh --fe -j60 - git diff --check -- fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java - Behavior changed: No - Does this need documentation: No --- .../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 2dd001b29e9eee..2452a2dde3c681 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -2088,7 +2088,8 @@ protected void modifyCommonJobProperties(Map jobProperties) thro jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)); } - boolean hasExplicitUniqueKeyUpdateMode = jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); + boolean hasExplicitUniqueKeyUpdateMode = jobProperties.containsKey( + CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); if (hasExplicitUniqueKeyUpdateMode) { String modeStr = jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); TUniqueKeyUpdateMode newMode = CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr); From 0f1be9e1aa029625b1f257ceac9fe5f6776bc2fc Mon Sep 17 00:00:00 2001 From: Refrain Date: Mon, 29 Jun 2026 19:52:53 +0800 Subject: [PATCH 7/7] [fix](regression) Fix routine load alter test output checks ### What problem does this PR solve? Issue Number: None Related PR: #64878 Problem Summary: The routine load alter regression test checked deterministic target-table switch rows with direct Groovy assertions and reused local variable names in the same suite scope. CI reported the case as invalid, and the local framework rejected the duplicate variables during Groovy compilation. This records the deterministic source and destination row checks through qt_sql output checks, uses distinct polling counter names, and keeps Kafka topics isolated so repeated local runs do not consume retained messages. ### Release note None ### Check List (For Author) - Test: Regression test - ./run-regression-test.sh --run -d load_p0/routine_load -s test_routine_load_alter -c 'jdbc:mysql://127.0.0.1:49030/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round' -ha 127.0.0.1:48030 -conf enableKafkaTest=true -conf kafka_port=19194 -conf externalEnvIp=127.0.0.1 - Behavior changed: No - Does this need documentation: No --- .../routine_load/test_routine_load_alter.out | 9 ++ .../test_routine_load_alter.groovy | 87 +++++++++++-------- 2 files changed, 61 insertions(+), 35 deletions(-) diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_alter.out b/regression-test/data/load_p0/routine_load/test_routine_load_alter.out index 427cdb2439463d..0764212aa7ae7d 100644 --- a/regression-test/data/load_p0/routine_load/test_routine_load_alter.out +++ b/regression-test/data/load_p0/routine_load/test_routine_load_alter.out @@ -10,3 +10,12 @@ 3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" 3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" +-- !sql_alter_target_src -- +1 eab +2 eab +3 eab + +-- !sql_alter_target_dst -- +4 eab +5 eab +6 eab diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy index 81817a60090e93..a27035e42f2cf3 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_alter.groovy @@ -16,14 +16,16 @@ // under the License. import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.DeleteTopicsOptions import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerConfig suite("test_routine_load_alter","p0") { + def kafkaCsvDataFile = "test_routine_load_alter" def kafkaCsvTpoics = [ - "test_routine_load_alter", + "test_routine_load_alter_${System.currentTimeMillis()}".toString(), ] String enabled = context.config.otherConfigs.get("enableKafkaTest") String kafka_port = context.config.otherConfigs.get("kafka_port") @@ -39,6 +41,17 @@ suite("test_routine_load_alter","p0") { props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + def adminProps = new Properties() + adminProps.put("bootstrap.servers", "${kafka_broker}".toString()) + def mainTopicAdmin = AdminClient.create(adminProps) + try { + kafkaCsvTpoics.each { topic -> + mainTopicAdmin.createTopics([new NewTopic(topic.toString(), 1, (short) 1)]).all().get() + } + } finally { + mainTopicAdmin.close() + } + // check conenction def verifyKafkaConnection = { prod -> try { @@ -64,7 +77,7 @@ suite("test_routine_load_alter","p0") { logger.info("Kafka connect success") for (String kafkaCsvTopic in kafkaCsvTpoics) { - def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def txt = new File("""${context.file.parent}/data/${kafkaCsvDataFile}.csv""").text def lines = txt.readLines() lines.each { line -> logger.info("=====${line}========") @@ -113,7 +126,7 @@ suite("test_routine_load_alter","p0") { """ sql "sync" - def count = 0 + def visibleCount = 0 while (true) { def res = sql "select count(*) from ${tableName}" def state = sql "show routine load for ${jobName}" @@ -123,13 +136,13 @@ suite("test_routine_load_alter","p0") { if (res[0][0] > 0) { break } - if (count >= 120) { + if (visibleCount >= 120) { log.error("routine load can not visible for long time") assertEquals(20, res[0][0]) break } sleep(5000) - count++ + visibleCount++ } qt_sql_before "select * from ${tableName} order by k1" @@ -146,7 +159,7 @@ suite("test_routine_load_alter","p0") { def producer = new KafkaProducer<>(props) for (String kafkaCsvTopic in kafkaCsvTpoics) { - def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def txt = new File("""${context.file.parent}/data/${kafkaCsvDataFile}.csv""").text def lines = txt.readLines() lines.each { line -> logger.info("=====${line}========") @@ -155,7 +168,7 @@ suite("test_routine_load_alter","p0") { } } - def count = 0 + def offsetVisibleCount = 0 while (true) { def res = sql "select count(*) from ${tableName}" log.info("count: ${res[0][0]}".toString()) @@ -166,13 +179,13 @@ suite("test_routine_load_alter","p0") { if (res[0][0] >= 6) { break } - if (count >= 120) { + if (offsetVisibleCount >= 120) { log.error("routine load can not visible for long time") assertEquals(20, res[0][0]) break } sleep(1000) - count++ + offsetVisibleCount++ } qt_sql_alter_after "select * from ${tableName} order by k1" } finally { @@ -199,7 +212,7 @@ suite("test_routine_load_alter","p0") { sql "ALTER ROUTINE LOAD FOR ${jobName} COLUMNS(k1, k2, v1, v2, v3, v4);" sql "ALTER ROUTINE LOAD FOR ${jobName} COLUMNS TERMINATED BY ',';" sql "resume routine load for ${jobName}" - def count = 0 + def columnVisibleCount = 0 while (true) { def res = sql "select count(*) from ${tableName}" log.info("count: ${res[0][0]}".toString()) @@ -211,13 +224,13 @@ suite("test_routine_load_alter","p0") { if (res[0][0] > 0) { break } - if (count >= 120) { + if (columnVisibleCount >= 120) { log.error("routine load can not visible for long time") assertEquals(20, res[0][0]) break } sleep(1000) - count++ + columnVisibleCount++ } def res = sql "select * from ${tableName} order by k1" log.info("res: ${res.size()}".toString()) @@ -295,6 +308,18 @@ suite("test_routine_load_alter","p0") { sql "truncate table ${tableName}" } + def mainTopicCleanupProps = new Properties() + mainTopicCleanupProps.put("bootstrap.servers", "${kafka_broker}".toString()) + def mainTopicCleanupAdmin = AdminClient.create(mainTopicCleanupProps) + try { + mainTopicCleanupAdmin.deleteTopics(kafkaCsvTpoics.collect { it.toString() }, + new DeleteTopicsOptions().timeoutMs(10000)).all().get() + } catch (Exception e) { + logger.warn("failed to delete kafka topics ${kafkaCsvTpoics}: ${e.message}".toString()) + } finally { + mainTopicCleanupAdmin.close() + } + // test alter target table def srcTableName = "test_routine_load_alter_src" def dstTableName = "test_routine_load_alter_dst" @@ -350,7 +375,7 @@ suite("test_routine_load_alter","p0") { producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") alterTopicProducer = new KafkaProducer<>(producerProps) - def firstBatch = new File("""${context.file.parent}/data/${kafkaCsvTpoics[0]}.csv""").readLines() + def firstBatch = new File("""${context.file.parent}/data/${kafkaCsvDataFile}.csv""").readLines() firstBatch.each { line -> alterTopicProducer.send(new ProducerRecord<>(alterTargetTopic, null, line)).get() } @@ -375,7 +400,7 @@ suite("test_routine_load_alter","p0") { """ sql "sync" - def count = 0 + def targetVisibleCount = 0 while (true) { def res = sql "select count(*) from ${srcTableName}" def state = sql "show routine load for ${alterTargetJob}" @@ -385,13 +410,13 @@ suite("test_routine_load_alter","p0") { if (res[0][0] >= 3) { break } - if (count >= 120) { + if (targetVisibleCount >= 120) { log.error("routine load can not visible for long time") assertEquals(3, res[0][0]) break } sleep(1000) - count++ + targetVisibleCount++ } sql "pause routine load for ${alterTargetJob}" @@ -417,7 +442,7 @@ suite("test_routine_load_alter","p0") { sql "resume routine load for ${alterTargetJob}" - def count = 0 + def targetAlterVisibleCount = 0 def stableCount = 0 while (true) { def srcCount = sql "select count(*) from ${srcTableName}" @@ -443,32 +468,18 @@ suite("test_routine_load_alter","p0") { assertEquals(3L, dstCountValue) } } - if (count >= 120) { + if (targetAlterVisibleCount >= 120) { log.error("routine load target table alter can not visible for long time") assertEquals(3L, srcCountValue) assertEquals(3L, dstCountValue) break } sleep(1000) - count++ + targetAlterVisibleCount++ } - def srcRows = sql "select k1, k2 from ${srcTableName} order by k1" - def dstRows = sql "select k1, k2 from ${dstTableName} order by k1" - assertEquals(3, srcRows.size()) - assertEquals(3, dstRows.size()) - assertEquals(1, srcRows[0][0]) - assertEquals("eab", srcRows[0][1]) - assertEquals(2, srcRows[1][0]) - assertEquals("eab", srcRows[1][1]) - assertEquals(3, srcRows[2][0]) - assertEquals("eab", srcRows[2][1]) - assertEquals(4, dstRows[0][0]) - assertEquals("eab", dstRows[0][1]) - assertEquals(5, dstRows[1][0]) - assertEquals("eab", dstRows[1][1]) - assertEquals(6, dstRows[2][0]) - assertEquals("eab", dstRows[2][1]) + qt_sql_alter_target_src "select k1, k2 from ${srcTableName} order by k1" + qt_sql_alter_target_dst "select k1, k2 from ${dstTableName} order by k1" } finally { try { sql "stop routine load for ${alterTargetJob}" @@ -479,6 +490,12 @@ suite("test_routine_load_alter","p0") { alterTopicProducer.close() } if (alterTopicAdmin != null) { + try { + alterTopicAdmin.deleteTopics([alterTargetTopic.toString()], + new DeleteTopicsOptions().timeoutMs(10000)).all().get() + } catch (Exception e) { + logger.warn("failed to delete kafka topic ${alterTargetTopic}: ${e.message}".toString()) + } alterTopicAdmin.close() } sql "truncate table ${srcTableName}"