Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprToSqlVisitor;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.ToSqlParams;
import org.apache.doris.analysis.UserIdentity;
Expand Down Expand Up @@ -468,6 +469,27 @@ protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
}
}

public void validateTargetTable(Database db, OlapTable targetTable) throws UserException {
if (isMultiTable) {
throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job");
}
List<ImportColumnDesc> columnsInfo = null;
if (columnDescs != null && !columnDescs.descs.isEmpty()) {
columnsInfo = new ArrayList<>(columnDescs.descs);
}
checkMeta(targetTable, new RoutineLoadDesc(columnSeparator, lineDelimiter, columnsInfo, precedingFilter,
whereExpr, partitionNamesInfo, deleteCondition, mergeType, sequenceCol));

targetTable.readLock();
try {
NereidsStreamLoadPlanner planner = new NereidsStreamLoadPlanner(db, targetTable,
toNereidsRoutineLoadTaskInfo());
planner.plan(new TUniqueId(0, 0));
} finally {
targetTable.readUnlock();
}
}

@Override
public long getId() {
return id;
Expand Down Expand Up @@ -2066,7 +2088,9 @@ protected void modifyCommonJobProperties(Map<String, String> jobProperties) thro
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY));
}

if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
boolean hasExplicitUniqueKeyUpdateMode = jobProperties.containsKey(
CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
if (hasExplicitUniqueKeyUpdateMode) {
String modeStr = jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
TUniqueKeyUpdateMode newMode = CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr);
// Validate flexible partial update constraints when changing to UPDATE_FLEXIBLE_COLUMNS
Expand All @@ -2077,6 +2101,7 @@ protected void modifyCommonJobProperties(Map<String, String> jobProperties) thro
this.isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name());
this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate));
jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS);
}

if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -757,9 +756,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti
}

modifyPropertiesInternal(jobProperties, dataSourceProperties);
if (command.hasTargetTable()) {
this.tableId = command.getTargetTableId();
}

AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
jobProperties, dataSourceProperties);
jobProperties, dataSourceProperties, command.getTargetTableId());
Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log);
} finally {
writeUnlock();
Expand Down Expand Up @@ -844,9 +846,6 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
}
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY);
if ("ERROR".equalsIgnoreCase(policy)) {
Expand Down Expand Up @@ -883,6 +882,9 @@ private void resetCloudProgress(Cloud.ResetRLProgressRequest.Builder builder) th
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties());
if (log.getTargetTableId() != 0) {
this.tableId = log.getTargetTableId();
}
} catch (UserException e) {
// should not happen
LOG.error("failed to replay modify kafka routine load job: {}", id, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -689,9 +688,12 @@ public void modifyProperties(AlterRoutineLoadCommand command) throws UserExcepti
}

modifyPropertiesInternal(jobProperties, dataSourceProperties);
if (command.hasTargetTable()) {
this.tableId = command.getTargetTableId();
}

AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
jobProperties, dataSourceProperties);
jobProperties, dataSourceProperties, command.getTargetTableId());
Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log);
} finally {
writeUnlock();
Expand Down Expand Up @@ -764,9 +766,6 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
}
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY);
if ("ERROR".equalsIgnoreCase(policy)) {
Expand All @@ -785,6 +784,9 @@ public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(),
(KinesisDataSourceProperties) log.getDataSourceProperties());
if (log.getTargetTableId() != 0) {
this.tableId = log.getTargetTableId();
}
} catch (UserException e) {
LOG.error("failed to replay modify kinesis routine load job: {}", id, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9284,6 +9284,12 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx
}
LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName);

// TODO: Phase 1 only supports altering the target table. Phase 2 will allow altering
// the target table and routine load properties in the same statement.
if (ctx.table != null) {
return new AlterRoutineLoadCommand(labelNameInfo, ctx.table.getText());
}

Map<String, String> properties = new HashMap<>();
if (ctx.properties != null) {
properties.putAll(visitPropertyClause(ctx.properties));
Expand All @@ -9309,7 +9315,8 @@ public LogicalPlan visitAlterRoutineLoad(DorisParser.AlterRoutineLoadContext ctx
}
}

return new AlterRoutineLoadCommand(labelNameInfo, loadPropertyMap, properties, dataSourceMapProperties);
return new AlterRoutineLoadCommand(labelNameInfo, null,
loadPropertyMap, properties, dataSourceMapProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,31 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.JsonFileFormatProperties;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -85,10 +91,12 @@ public class AlterRoutineLoadCommand extends AlterCommand {
.build();

private final LabelNameInfo labelNameInfo;
private final String targetTableName;
private final Map<String, LoadProperty> loadPropertyMap;
private RoutineLoadDesc routineLoadDesc;
private final Map<String, String> jobProperties;
private final Map<String, String> dataSourceMapProperties;
private long targetTableId;
private boolean isPartialUpdate;

// save analyzed job properties.
Expand All @@ -100,6 +108,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
* AlterRoutineLoadCommand
*/
public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo,
String targetTableName,
Map<String, LoadProperty> loadPropertyMap,
Map<String, String> jobProperties,
Map<String, String> dataSourceMapProperties) {
Expand All @@ -108,6 +117,7 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo,
Objects.requireNonNull(jobProperties, "jobProperties is null");
Objects.requireNonNull(dataSourceMapProperties, "dataSourceMapProperties is null");
this.labelNameInfo = labelNameInfo;
this.targetTableName = targetTableName;
this.loadPropertyMap = loadPropertyMap == null ? Maps.newHashMap() : loadPropertyMap;
this.jobProperties = jobProperties;
this.dataSourceMapProperties = dataSourceMapProperties;
Expand All @@ -118,7 +128,11 @@ public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo,
public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo,
Map<String, String> jobProperties,
Map<String, String> dataSourceMapProperties) {
this(labelNameInfo, Maps.newHashMap(), jobProperties, dataSourceMapProperties);
this(labelNameInfo, null, Maps.newHashMap(), jobProperties, dataSourceMapProperties);
}

public AlterRoutineLoadCommand(LabelNameInfo labelNameInfo, String targetTableName) {
this(labelNameInfo, targetTableName, Maps.newHashMap(), Maps.newHashMap(), Maps.newHashMap());
}

public String getDbName() {
Expand All @@ -133,6 +147,18 @@ public Map<String, String> getAnalyzedJobProperties() {
return analyzedJobProperties;
}

public boolean hasTargetTable() {
return !StringUtil.isEmpty(targetTableName);
}

public String getTargetTableName() {
return targetTableName;
}

public long getTargetTableId() {
return targetTableId;
}

public boolean hasDataSourceProperty() {
return MapUtils.isNotEmpty(dataSourceMapProperties);
}
Expand Down Expand Up @@ -161,18 +187,27 @@ public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
public void validate(ConnectContext ctx) throws UserException {
labelNameInfo.validate(ctx);
FeNameFormat.checkCommonName(NAME_TYPE, labelNameInfo.getLabel());
if (hasTargetTable() && (MapUtils.isNotEmpty(loadPropertyMap) || MapUtils.isNotEmpty(jobProperties)
|| MapUtils.isNotEmpty(dataSourceMapProperties))) {
throw new AnalysisException("ALTER ROUTINE LOAD target table change does not support other properties");
}
// check routine load job properties include desired concurrent number etc.
checkJobProperties();
// check load properties
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getJobName());
this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap,
job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType());
if (MapUtils.isNotEmpty(loadPropertyMap)) {
this.routineLoadDesc = CreateRoutineLoadInfo.checkLoadProperties(ctx, loadPropertyMap,
job.getDbFullName(), job.getTableName(), job.isMultiTable(), job.getMergeType());
}
// check data source properties
checkDataSourceProperties();
if (hasTargetTable()) {
validateTargetTable(ctx, job);
}
checkPartialUpdate();
if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties)
&& routineLoadDesc == null) {
&& MapUtils.isEmpty(loadPropertyMap) && !hasTargetTable()) {
throw new AnalysisException("No properties are specified");
}
}
Expand Down Expand Up @@ -329,21 +364,62 @@ private void checkDataSourceProperties() throws UserException {
}

private void checkPartialUpdate() throws UserException {
if (!isPartialUpdate) {
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getJobName());
TUniqueKeyUpdateMode uniqueKeyUpdateMode = getEffectiveUniqueKeyUpdateMode(job);
if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
return;
}
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getDbName());
if (job.isMultiTable()) {
throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load.");
}
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName());
Table table = db.getTableOrAnalysisException(job.getTableName());
if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) {
String tableName = hasTargetTable() ? targetTableName : job.getTableName();
Table table = db.getTableOrAnalysisException(tableName);
if (!((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW");
}
}

private TUniqueKeyUpdateMode getEffectiveUniqueKeyUpdateMode(RoutineLoadJob job) {
if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
Comment thread
0AyanamiRei marked this conversation as resolved.
return TUniqueKeyUpdateMode.valueOf(
analyzedJobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE));
}
if (analyzedJobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS) && isPartialUpdate
&& job.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPSERT) {
return TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
}
return job.getUniqueKeyUpdateMode();
}

private void validateTargetTable(ConnectContext ctx, RoutineLoadJob job) throws UserException {
if (job.isMultiTable()) {
throw new AnalysisException("ALTER ROUTINE LOAD target table change only supports single-table job");
}
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName());
Table table = db.getTableOrAnalysisException(targetTableName);
if (!(table instanceof OlapTable)) {
throw new AnalysisException("ALTER ROUTINE LOAD target table only supports OLAP table");
}
OlapTable olapTable = (OlapTable) table;
if (olapTable.isTemporary()) {
throw new AnalysisException("Do not support load for temporary table " + olapTable.getDisplayName());
}
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, InternalCatalog.INTERNAL_CATALOG_NAME,
job.getDbFullName(), targetTableName, PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ctx.getQualifiedUser(), ctx.getRemoteIP(), job.getDbFullName() + ": " + targetTableName);
}
if (job.isLoadToSingleTablet()
&& !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) {
throw new AnalysisException(
"if load_to_single_tablet set to true, the olap table must be with random distribution");
}
job.validateTargetTable(db, olapTable);
this.targetTableId = olapTable.getId();
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitAlterRoutineLoadCommand(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,20 @@ public class AlterRoutineLoadJobOperationLog implements Writable {
private Map<String, String> jobProperties;
@SerializedName(value = "dataSourceProperties")
private AbstractDataSourceProperties dataSourceProperties;
@SerializedName(value = "targetTableId")
private long targetTableId;

public AlterRoutineLoadJobOperationLog(long jobId, Map<String, String> jobProperties,
AbstractDataSourceProperties dataSourceProperties) {
this(jobId, jobProperties, dataSourceProperties, 0L);
}

public AlterRoutineLoadJobOperationLog(long jobId, Map<String, String> jobProperties,
AbstractDataSourceProperties dataSourceProperties, long targetTableId) {
this.jobId = jobId;
this.jobProperties = jobProperties;
this.dataSourceProperties = dataSourceProperties;
this.targetTableId = targetTableId;
}

public long getJobId() {
Expand All @@ -57,6 +65,10 @@ public AbstractDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
}

public long getTargetTableId() {
return targetTableId;
}

public static AlterRoutineLoadJobOperationLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterRoutineLoadJobOperationLog.class);
Expand Down
Loading
Loading