diff --git a/paimon-api/src/main/java/org/apache/paimon/resource/ResourceChange.java b/paimon-api/src/main/java/org/apache/paimon/resource/ResourceChange.java new file mode 100644 index 000000000000..cf7cc7ff512e --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/resource/ResourceChange.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.annotation.Public; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; + +/** Resource change. */ +@Public +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = ResourceChange.Actions.FIELD_TYPE) +@JsonSubTypes({ + @JsonSubTypes.Type( + value = ResourceChange.UpdateResourceComment.class, + name = ResourceChange.Actions.UPDATE_COMMENT_ACTION), + @JsonSubTypes.Type( + value = ResourceChange.UpdateResourceUri.class, + name = ResourceChange.Actions.UPDATE_URI_ACTION) +}) +public interface ResourceChange extends Serializable { + + static ResourceChange updateComment(@Nullable String comment) { + return new UpdateResourceComment(comment); + } + + static ResourceChange updateUri(String uri) { + return new UpdateResourceUri(uri); + } + + /** Update comment for resource change. */ + final class UpdateResourceComment implements ResourceChange { + + private static final long serialVersionUID = 1L; + + private static final String FIELD_COMMENT = "comment"; + + @JsonProperty(FIELD_COMMENT) + private final @Nullable String comment; + + @JsonCreator + private UpdateResourceComment(@JsonProperty(FIELD_COMMENT) @Nullable String comment) { + this.comment = comment; + } + + @JsonGetter(FIELD_COMMENT) + public @Nullable String comment() { + return comment; + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + UpdateResourceComment that = (UpdateResourceComment) object; + return Objects.equals(comment, that.comment); + } + + @Override + public int hashCode() { + return Objects.hash(comment); + } + } + + /** Update URI for resource change. */ + final class UpdateResourceUri implements ResourceChange { + + private static final long serialVersionUID = 1L; + + private static final String FIELD_URI = "uri"; + + @JsonProperty(FIELD_URI) + private final String uri; + + @JsonCreator + private UpdateResourceUri(@JsonProperty(FIELD_URI) String uri) { + this.uri = uri; + } + + @JsonGetter(FIELD_URI) + public String uri() { + return uri; + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + UpdateResourceUri that = (UpdateResourceUri) object; + return Objects.equals(uri, that.uri); + } + + @Override + public int hashCode() { + return Objects.hash(uri); + } + } + + /** Actions for resource change. */ + class Actions { + static final String FIELD_TYPE = "action"; + static final String UPDATE_COMMENT_ACTION = "updateComment"; + static final String UPDATE_URI_ACTION = "updateUri"; + + private Actions() {} + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/resource/ResourceType.java b/paimon-api/src/main/java/org/apache/paimon/resource/ResourceType.java new file mode 100644 index 000000000000..727bb09699d2 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/resource/ResourceType.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.annotation.Public; + +/** + * Enumeration of resource types supported by Paimon. + * + * @since 0.4.0 + */ +@Public +public enum ResourceType { + + /** A general file resource. */ + FILE("file"), + + /** An archive resource (e.g., zip, tar). */ + ARCHIVE("archive"), + + /** A JAR resource. */ + JAR("jar"), + + /** A Python resource. */ + PY("py"); + + private final String value; + + ResourceType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + /** Parse a string value to {@link ResourceType}, case-insensitive. */ + public static ResourceType fromValue(String value) { + for (ResourceType type : values()) { + if (type.value.equalsIgnoreCase(value)) { + return type; + } + } + throw new IllegalArgumentException("Unknown resource type: " + value); + } + + @Override + public String toString() { + return value; + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index 263c4e2c0640..6f3852fc7c4d 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -28,6 +28,8 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.resource.ResourceChange; +import org.apache.paimon.resource.ResourceType; import org.apache.paimon.rest.auth.AuthProvider; import org.apache.paimon.rest.auth.RESTAuthFunction; import org.apache.paimon.rest.exceptions.AlreadyExistsException; @@ -35,6 +37,7 @@ import org.apache.paimon.rest.exceptions.NoSuchResourceException; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterFunctionRequest; +import org.apache.paimon.rest.requests.AlterResourceRequest; import org.apache.paimon.rest.requests.AlterTableRequest; import org.apache.paimon.rest.requests.AlterViewRequest; import org.apache.paimon.rest.requests.AuthTableQueryRequest; @@ -42,6 +45,7 @@ import org.apache.paimon.rest.requests.CreateBranchRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.requests.CreateFunctionRequest; +import org.apache.paimon.rest.requests.CreateResourceRequest; import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.requests.CreateTagRequest; import org.apache.paimon.rest.requests.CreateViewRequest; @@ -61,6 +65,7 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.GetFunctionResponse; +import org.apache.paimon.rest.responses.GetResourceResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetTableSnapshotResponse; import org.apache.paimon.rest.responses.GetTableTokenResponse; @@ -74,6 +79,9 @@ import org.apache.paimon.rest.responses.ListFunctionsGloballyResponse; import org.apache.paimon.rest.responses.ListFunctionsResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; +import org.apache.paimon.rest.responses.ListResourceDetailsResponse; +import org.apache.paimon.rest.responses.ListResourcesGloballyResponse; +import org.apache.paimon.rest.responses.ListResourcesResponse; import org.apache.paimon.rest.responses.ListSnapshotsResponse; import org.apache.paimon.rest.responses.ListTableDetailsResponse; import org.apache.paimon.rest.responses.ListTablesGloballyResponse; @@ -153,6 +161,7 @@ public class RESTApi { public static final String TABLE_TYPE = "tableType"; public static final String VIEW_NAME_PATTERN = "viewNamePattern"; public static final String FUNCTION_NAME_PATTERN = "functionNamePattern"; + public static final String RESOURCE_NAME_PATTERN = "resourceNamePattern"; public static final String PARTITION_NAME_PATTERN = "partitionNamePattern"; public static final String TAG_NAME_PREFIX = "tagNamePrefix"; @@ -1538,6 +1547,179 @@ public void alterView(Identifier identifier, List viewChanges) { restAuthFunction); } + // ==================== Resources ========================== + + /** + * List resources for database. + * + * @param databaseName database name + * @return a list of resource names + */ + public List listResources(String databaseName) { + return listDataFromPageApi( + queryParams -> + client.get( + resourcePaths.resources(databaseName), + queryParams, + ListResourcesResponse.class, + restAuthFunction)); + } + + /** + * List resources by page. + * + * @param databaseName database name + * @param maxResults Optional maximum number of results. + * @param pageToken Optional next page token. + * @param resourceNamePattern A sql LIKE pattern (%) for resource names. + * @return {@link PagedList}: elements and nextPageToken. + * @throws NoSuchResourceException if the database does not exist + */ + public PagedList listResourcesPaged( + String databaseName, + @Nullable Integer maxResults, + @Nullable String pageToken, + @Nullable String resourceNamePattern) { + ListResourcesResponse response = + client.get( + resourcePaths.resources(databaseName), + buildPagedQueryParams( + maxResults, + pageToken, + Pair.of(RESOURCE_NAME_PATTERN, resourceNamePattern)), + ListResourcesResponse.class, + restAuthFunction); + List resources = response.resources(); + if (resources == null) { + return new PagedList<>(emptyList(), null); + } + return new PagedList<>(resources, response.getNextPageToken()); + } + + /** + * List resource details. + * + * @param databaseName database name + * @param maxResults Optional maximum number of results. + * @param pageToken Optional next page token. + * @param resourceNamePattern A sql LIKE pattern (%) for resource names. + * @return {@link PagedList}: elements and nextPageToken. + * @throws NoSuchResourceException if the database does not exist + */ + public PagedList listResourceDetailsPaged( + String databaseName, + @Nullable Integer maxResults, + @Nullable String pageToken, + @Nullable String resourceNamePattern) { + ListResourceDetailsResponse response = + client.get( + resourcePaths.resourceDetails(databaseName), + buildPagedQueryParams( + maxResults, + pageToken, + Pair.of(RESOURCE_NAME_PATTERN, resourceNamePattern)), + ListResourceDetailsResponse.class, + restAuthFunction); + List resourceDetails = response.data(); + if (resourceDetails == null) { + return new PagedList<>(emptyList(), null); + } + return new PagedList<>(resourceDetails, response.getNextPageToken()); + } + + /** + * Get a resource by identifier. + * + * @param identifier the identifier of the resource to retrieve + * @return the resource response object + * @throws NoSuchResourceException if the resource does not exist + */ + public GetResourceResponse getResource(Identifier identifier) { + return client.get( + resourcePaths.resource(identifier.getDatabaseName(), identifier.getObjectName()), + GetResourceResponse.class, + restAuthFunction); + } + + /** + * Create a resource. + * + * @param identifier database name and resource name. + * @param comment optional comment describing the resource + * @param uri the URI pointing to the resource location + * @param resourceType the type of resource + * @throws AlreadyExistsException if a resource already exists + */ + public void createResource( + Identifier identifier, + @Nullable String comment, + String uri, + ResourceType resourceType) { + client.post( + resourcePaths.resources(identifier.getDatabaseName()), + new CreateResourceRequest( + identifier.getObjectName(), comment, uri, resourceType.getValue()), + restAuthFunction); + } + + /** + * Drop a resource. + * + * @param identifier database name and resource name. + * @throws NoSuchResourceException if the resource does not exist + */ + public void dropResource(Identifier identifier) { + client.delete( + resourcePaths.resource(identifier.getDatabaseName(), identifier.getObjectName()), + restAuthFunction); + } + + /** + * Alter a resource. + * + * @param identifier database name and resource name. + * @param changes list of resource changes to apply + * @throws NoSuchResourceException if the resource does not exist + * @throws ForbiddenException if the user lacks permission to modify the resource + */ + public void alterResource(Identifier identifier, List changes) { + client.post( + resourcePaths.resource(identifier.getDatabaseName(), identifier.getObjectName()), + new AlterResourceRequest(changes), + restAuthFunction); + } + + /** + * List resources for a catalog globally. + * + * @param databaseNamePattern A sql LIKE pattern (%) for database names. + * @param resourceNamePattern A sql LIKE pattern (%) for resource names. + * @param maxResults Optional maximum number of results. + * @param pageToken Optional next page token. + * @return {@link PagedList}: elements and nextPageToken. + */ + public PagedList listResourcesPagedGlobally( + @Nullable String databaseNamePattern, + @Nullable String resourceNamePattern, + @Nullable Integer maxResults, + @Nullable String pageToken) { + ListResourcesGloballyResponse response = + client.get( + resourcePaths.resources(), + buildPagedQueryParams( + maxResults, + pageToken, + Pair.of(DATABASE_NAME_PATTERN, databaseNamePattern), + Pair.of(RESOURCE_NAME_PATTERN, resourceNamePattern)), + ListResourcesGloballyResponse.class, + restAuthFunction); + List resources = response.data(); + if (resources == null) { + return new PagedList<>(emptyList(), null); + } + return new PagedList<>(resources, response.getNextPageToken()); + } + /** * Load token for File System of this table. * diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 28f79d040995..cbedd8a16695 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -42,6 +42,8 @@ public class ResourcePaths { protected static final String REGISTER = "register"; protected static final String FUNCTIONS = "functions"; protected static final String FUNCTION_DETAILS = "function-details"; + protected static final String RESOURCES = "resources"; + protected static final String RESOURCE_DETAILS = "resource-details"; protected static final String ID = "id"; private static final Joiner SLASH = Joiner.on("/").skipNulls(); @@ -361,4 +363,26 @@ public String function(String databaseName, String functionName) { FUNCTIONS, encodeString(functionName)); } + + public String resources(String databaseName) { + return SLASH.join(V1, prefix, DATABASES, encodeString(databaseName), RESOURCES); + } + + public String resources() { + return SLASH.join(V1, prefix, RESOURCES); + } + + public String resourceDetails(String databaseName) { + return SLASH.join(V1, prefix, DATABASES, encodeString(databaseName), RESOURCE_DETAILS); + } + + public String resource(String databaseName, String resourceName) { + return SLASH.join( + V1, + prefix, + DATABASES, + encodeString(databaseName), + RESOURCES, + encodeString(resourceName)); + } } diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/AlterResourceRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/AlterResourceRequest.java new file mode 100644 index 000000000000..5f4af196cae8 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/requests/AlterResourceRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.resource.ResourceChange; +import org.apache.paimon.rest.RESTRequest; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** Request for altering resource. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class AlterResourceRequest implements RESTRequest { + + private static final String FIELD_CHANGES = "changes"; + + @JsonProperty(FIELD_CHANGES) + private final List changes; + + @JsonCreator + public AlterResourceRequest(@JsonProperty(FIELD_CHANGES) List changes) { + this.changes = changes; + } + + @JsonGetter(FIELD_CHANGES) + public List changes() { + return changes; + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/CreateResourceRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/CreateResourceRequest.java new file mode 100644 index 000000000000..53241e0ab4d6 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/requests/CreateResourceRequest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.rest.RESTRequest; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Request for creating resource. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CreateResourceRequest implements RESTRequest { + + private static final String FIELD_NAME = "name"; + private static final String FIELD_COMMENT = "comment"; + private static final String FIELD_URI = "uri"; + private static final String FIELD_RESOURCE_TYPE = "resourceType"; + + @JsonProperty(FIELD_NAME) + private final String name; + + @JsonProperty(FIELD_COMMENT) + private final String comment; + + @JsonProperty(FIELD_URI) + private final String uri; + + @JsonProperty(FIELD_RESOURCE_TYPE) + private final String resourceType; + + @JsonCreator + public CreateResourceRequest( + @JsonProperty(FIELD_NAME) String name, + @JsonProperty(FIELD_COMMENT) String comment, + @JsonProperty(FIELD_URI) String uri, + @JsonProperty(FIELD_RESOURCE_TYPE) String resourceType) { + this.name = name; + this.comment = comment; + this.uri = uri; + this.resourceType = resourceType; + } + + @JsonGetter(FIELD_NAME) + public String name() { + return name; + } + + @JsonGetter(FIELD_COMMENT) + public String comment() { + return comment; + } + + @JsonGetter(FIELD_URI) + public String uri() { + return uri; + } + + @JsonGetter(FIELD_RESOURCE_TYPE) + public String resourceType() { + return resourceType; + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetResourceResponse.java b/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetResourceResponse.java new file mode 100644 index 000000000000..265536a275c0 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetResourceResponse.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.responses; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Response for getting a resource. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class GetResourceResponse extends AuditRESTResponse { + + private static final String FIELD_NAME = "name"; + private static final String FIELD_COMMENT = "comment"; + private static final String FIELD_URI = "uri"; + private static final String FIELD_SIZE = "size"; + private static final String FIELD_LAST_MODIFIED_TIME = "lastModifiedTime"; + private static final String FIELD_RESOURCE_TYPE = "resourceType"; + + @JsonProperty(FIELD_NAME) + private final String name; + + @JsonProperty(FIELD_COMMENT) + private final String comment; + + @JsonProperty(FIELD_URI) + private final String uri; + + @JsonProperty(FIELD_SIZE) + private final long size; + + @JsonProperty(FIELD_LAST_MODIFIED_TIME) + private final long lastModifiedTime; + + @JsonProperty(FIELD_RESOURCE_TYPE) + private final String resourceType; + + @JsonCreator + public GetResourceResponse( + @JsonProperty(FIELD_NAME) String name, + @JsonProperty(FIELD_COMMENT) String comment, + @JsonProperty(FIELD_URI) String uri, + @JsonProperty(FIELD_SIZE) long size, + @JsonProperty(FIELD_LAST_MODIFIED_TIME) long lastModifiedTime, + @JsonProperty(FIELD_RESOURCE_TYPE) String resourceType, + @JsonProperty(FIELD_OWNER) String owner, + @JsonProperty(FIELD_CREATED_AT) long createdAt, + @JsonProperty(FIELD_CREATED_BY) String createdBy, + @JsonProperty(FIELD_UPDATED_AT) long updatedAt, + @JsonProperty(FIELD_UPDATED_BY) String updatedBy) { + super(owner, createdAt, createdBy, updatedAt, updatedBy); + this.name = name; + this.comment = comment; + this.uri = uri; + this.size = size; + this.lastModifiedTime = lastModifiedTime; + this.resourceType = resourceType; + } + + @JsonGetter(FIELD_NAME) + public String name() { + return name; + } + + @JsonGetter(FIELD_COMMENT) + public String comment() { + return comment; + } + + @JsonGetter(FIELD_URI) + public String uri() { + return uri; + } + + @JsonGetter(FIELD_SIZE) + public long size() { + return size; + } + + @JsonGetter(FIELD_LAST_MODIFIED_TIME) + public long lastModifiedTime() { + return lastModifiedTime; + } + + @JsonGetter(FIELD_RESOURCE_TYPE) + public String resourceType() { + return resourceType; + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourceDetailsResponse.java b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourceDetailsResponse.java new file mode 100644 index 000000000000..dd279d4788f2 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourceDetailsResponse.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.responses; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** Response for listing resource details. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ListResourceDetailsResponse implements PagedResponse { + + private static final String FIELD_RESOURCES = "resources"; + private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken"; + + @JsonProperty(FIELD_RESOURCES) + private final List resources; + + @JsonProperty(FIELD_NEXT_PAGE_TOKEN) + private final String nextPageToken; + + public ListResourceDetailsResponse( + @JsonProperty(FIELD_RESOURCES) List resources) { + this(resources, null); + } + + @JsonCreator + public ListResourceDetailsResponse( + @JsonProperty(FIELD_RESOURCES) List resources, + @JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) { + this.resources = resources; + this.nextPageToken = nextPageToken; + } + + @JsonGetter(FIELD_RESOURCES) + public List getResources() { + return this.resources; + } + + @JsonGetter(FIELD_NEXT_PAGE_TOKEN) + public String getNextPageToken() { + return this.nextPageToken; + } + + @Override + public List data() { + return getResources(); + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourcesGloballyResponse.java b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourcesGloballyResponse.java new file mode 100644 index 000000000000..4e5acaecacba --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourcesGloballyResponse.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.responses; + +import org.apache.paimon.catalog.Identifier; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** Response for listing resources globally. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ListResourcesGloballyResponse implements PagedResponse { + + private static final String FIELD_RESOURCES = "resources"; + private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken"; + + @JsonProperty(FIELD_RESOURCES) + private final List resources; + + @JsonProperty(FIELD_NEXT_PAGE_TOKEN) + private final String nextPageToken; + + @JsonCreator + public ListResourcesGloballyResponse( + @JsonProperty(FIELD_RESOURCES) List resources, + @JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) { + this.resources = resources; + this.nextPageToken = nextPageToken; + } + + @JsonGetter(FIELD_RESOURCES) + public List resources() { + return this.resources; + } + + @JsonGetter(FIELD_NEXT_PAGE_TOKEN) + public String getNextPageToken() { + return this.nextPageToken; + } + + @Override + public List data() { + return resources(); + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourcesResponse.java b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourcesResponse.java new file mode 100644 index 000000000000..388d3215b649 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListResourcesResponse.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.responses; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** Response for listing resources. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ListResourcesResponse implements PagedResponse { + + private static final String FIELD_RESOURCES = "resources"; + private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken"; + + @JsonProperty(FIELD_RESOURCES) + private final List resources; + + @JsonProperty(FIELD_NEXT_PAGE_TOKEN) + private final String nextPageToken; + + @JsonCreator + public ListResourcesResponse( + @JsonProperty(FIELD_RESOURCES) List resources, + @JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) { + this.resources = resources; + this.nextPageToken = nextPageToken; + } + + @JsonGetter(FIELD_RESOURCES) + public List resources() { + return this.resources; + } + + @JsonGetter(FIELD_NEXT_PAGE_TOKEN) + public String getNextPageToken() { + return this.nextPageToken; + } + + @Override + public List data() { + return resources(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4a8a8b1b91b6..aeae2f193f68 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -33,6 +33,8 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.resource.Resource; +import org.apache.paimon.resource.ResourceChange; import org.apache.paimon.rest.responses.GetTagResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -750,6 +752,35 @@ public void alterFunction( throw new UnsupportedOperationException(); } + @Override + public List listResources(String databaseName) throws DatabaseNotExistException { + throw new UnsupportedOperationException(); + } + + @Override + public Resource getResource(Identifier identifier) throws ResourceNotExistException { + throw new UnsupportedOperationException(); + } + + @Override + public void createResource(Identifier identifier, Resource resource, boolean ignoreIfExists) + throws ResourceAlreadyExistException, DatabaseNotExistException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropResource(Identifier identifier, boolean ignoreIfNotExists) + throws ResourceNotExistException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterResource( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws ResourceNotExistException { + throw new UnsupportedOperationException(); + } + /** * Create a {@link FormatTable} identified by the given {@link Identifier}. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 57fa040a2acd..e902f8fd6398 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -27,6 +27,8 @@ import org.apache.paimon.function.FunctionChange; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.resource.Resource; +import org.apache.paimon.resource.ResourceChange; import org.apache.paimon.rest.responses.GetTagResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -1200,6 +1202,117 @@ void alterFunction( throws FunctionNotExistException, DefinitionAlreadyExistException, DefinitionNotExistException; + // ==================== Resources ========================== + + /** + * Get the names of all resources in a database. + * + * @param databaseName the database name + * @return a list of the names of all resources + * @throws DatabaseNotExistException if the database does not exist + */ + List listResources(String databaseName) throws DatabaseNotExistException; + + /** + * Get paged list names of resources under this database. + * + * @param databaseName Name of the database to list resources. + * @param maxResults Optional parameter indicating the maximum number of results. + * @param pageToken Optional parameter indicating the next page token. + * @param resourceNamePattern A sql LIKE pattern (%) for resource names. + * @return a list of the names of resources with provided page size and next page token. + * @throws DatabaseNotExistException if the database does not exist + */ + default PagedList listResourcesPaged( + String databaseName, + @Nullable Integer maxResults, + @Nullable String pageToken, + @Nullable String resourceNamePattern) + throws DatabaseNotExistException { + return new PagedList<>(listResources(databaseName), null); + } + + /** + * Gets an array of resource identifiers for a catalog. + * + * @param databaseNamePattern A sql LIKE pattern (%) for database names. + * @param resourceNamePattern A sql LIKE pattern (%) for resource names. + * @param maxResults Optional parameter indicating the maximum number of results. + * @param pageToken Optional parameter indicating the next page token. + * @return a list of the resource identifiers with provided page size and next page token. + */ + default PagedList listResourcesPagedGlobally( + @Nullable String databaseNamePattern, + @Nullable String resourceNamePattern, + @Nullable Integer maxResults, + @Nullable String pageToken) { + throw new UnsupportedOperationException( + "Current Catalog does not support listResourcesPagedGlobally"); + } + + /** + * Get paged list resource details under this database. + * + * @param databaseName Name of the database to list resource details. + * @param maxResults Optional parameter indicating the maximum number of results. + * @param pageToken Optional parameter indicating the next page token. + * @param resourceNamePattern A sql LIKE pattern (%) for resource names. + * @return a list of the resource details with provided page size and next page token. + * @throws DatabaseNotExistException if the database does not exist + */ + default PagedList listResourceDetailsPaged( + String databaseName, + @Nullable Integer maxResults, + @Nullable String pageToken, + @Nullable String resourceNamePattern) + throws DatabaseNotExistException { + return new PagedList<>(Collections.emptyList(), null); + } + + /** + * Get resource by identifier. + * + * @param identifier path of the resource to get + * @return the requested resource + * @throws ResourceNotExistException if the resource does not exist + */ + Resource getResource(Identifier identifier) throws ResourceNotExistException; + + /** + * Create a new resource. + * + * @param identifier path of the resource to be created + * @param resource the resource definition + * @param ignoreIfExists flag to specify behavior when a resource already exists + * @throws ResourceAlreadyExistException if resource already exists and ignoreIfExists is false + * @throws DatabaseNotExistException if the database in identifier doesn't exist + */ + void createResource(Identifier identifier, Resource resource, boolean ignoreIfExists) + throws ResourceAlreadyExistException, DatabaseNotExistException; + + /** + * Drop resource. + * + * @param identifier path of the resource to be dropped + * @param ignoreIfNotExists Flag to specify behavior when the resource does not exist + * @throws ResourceNotExistException if the resource doesn't exist + */ + void dropResource(Identifier identifier, boolean ignoreIfNotExists) + throws ResourceNotExistException; + + /** + * Alter a resource. + * + * @param identifier path of the resource to be altered + * @param changes the changes to apply to the resource + * @param ignoreIfNotExists flag to specify behavior when the resource does not exist + * @throws ResourceNotExistException if the resource doesn't exist and ignoreIfNotExists is + * false + */ + void alterResource( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws ResourceNotExistException; + // ==================== Table Auth ========================== /** @@ -1834,4 +1947,46 @@ public String name() { return name; } } + + /** Exception for trying to create a resource that already exists. */ + class ResourceAlreadyExistException extends Exception { + + private static final String MSG = "Resource %s already exists."; + + private final Identifier identifier; + + public ResourceAlreadyExistException(Identifier identifier) { + this(identifier, null); + } + + public ResourceAlreadyExistException(Identifier identifier, Throwable cause) { + super(String.format(MSG, identifier.getFullName()), cause); + this.identifier = identifier; + } + + public Identifier identifier() { + return identifier; + } + } + + /** Exception for trying to get a resource that doesn't exist. */ + class ResourceNotExistException extends Exception { + + private static final String MSG = "Resource %s doesn't exist."; + + private final Identifier identifier; + + public ResourceNotExistException(Identifier identifier) { + this(identifier, null); + } + + public ResourceNotExistException(Identifier identifier, Throwable cause) { + super(String.format(MSG, identifier), cause); + this.identifier = identifier; + } + + public Identifier identifier() { + return identifier; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 0f18f7d04540..72f50e9dff56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -24,6 +24,8 @@ import org.apache.paimon.function.FunctionChange; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.resource.Resource; +import org.apache.paimon.resource.ResourceChange; import org.apache.paimon.rest.responses.GetTagResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -364,6 +366,66 @@ public void alterFunction( wrapped.alterFunction(identifier, changes, ignoreIfNotExists); } + @Override + public List listResources(String databaseName) throws DatabaseNotExistException { + return wrapped.listResources(databaseName); + } + + @Override + public PagedList listResourcesPaged( + String databaseName, + @Nullable Integer maxResults, + @Nullable String pageToken, + @Nullable String resourceNamePattern) + throws DatabaseNotExistException { + return wrapped.listResourcesPaged(databaseName, maxResults, pageToken, resourceNamePattern); + } + + @Override + public PagedList listResourceDetailsPaged( + String databaseName, + @Nullable Integer maxResults, + @Nullable String pageToken, + @Nullable String resourceNamePattern) + throws DatabaseNotExistException { + return wrapped.listResourceDetailsPaged( + databaseName, maxResults, pageToken, resourceNamePattern); + } + + @Override + public PagedList listResourcesPagedGlobally( + @Nullable String databaseNamePattern, + @Nullable String resourceNamePattern, + @Nullable Integer maxResults, + @Nullable String pageToken) { + return wrapped.listResourcesPagedGlobally( + databaseNamePattern, resourceNamePattern, maxResults, pageToken); + } + + @Override + public Resource getResource(Identifier identifier) throws ResourceNotExistException { + return wrapped.getResource(identifier); + } + + @Override + public void createResource(Identifier identifier, Resource resource, boolean ignoreIfExists) + throws ResourceAlreadyExistException, DatabaseNotExistException { + wrapped.createResource(identifier, resource, ignoreIfExists); + } + + @Override + public void dropResource(Identifier identifier, boolean ignoreIfNotExists) + throws ResourceNotExistException { + wrapped.dropResource(identifier, ignoreIfNotExists); + } + + @Override + public void alterResource( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws ResourceNotExistException { + wrapped.alterResource(identifier, changes, ignoreIfNotExists); + } + @Override public void markDonePartitions(Identifier identifier, List> partitions) throws TableNotExistException { diff --git a/paimon-core/src/main/java/org/apache/paimon/resource/AbstractResource.java b/paimon-core/src/main/java/org/apache/paimon/resource/AbstractResource.java new file mode 100644 index 000000000000..e9948d907651 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/resource/AbstractResource.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.utils.IOUtils; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +/** Abstract base implementation of {@link Resource} with common fields and accessors. */ +public abstract class AbstractResource implements Resource { + + private static final long serialVersionUID = 1L; + + private final Identifier identifier; + @Nullable private final String comment; + private final String uri; + private final long size; + private final long lastModifiedTime; + private final FileIO fileIO; + + protected AbstractResource( + Identifier identifier, + @Nullable String comment, + String uri, + long size, + long lastModifiedTime, + FileIO fileIO) { + this.identifier = identifier; + this.comment = comment; + this.uri = uri; + this.size = size; + this.lastModifiedTime = lastModifiedTime; + this.fileIO = fileIO; + } + + @JsonGetter("name") + @Override + public String name() { + return identifier.getObjectName(); + } + + @JsonGetter("fullName") + @Override + public String fullName() { + return identifier.getFullName(); + } + + @JsonIgnore + public Identifier identifier() { + return identifier; + } + + @JsonGetter("comment") + public @Nullable String commentOrNull() { + return comment; + } + + @Override + public Optional comment() { + return Optional.ofNullable(comment); + } + + @JsonGetter("uri") + @Override + public String uri() { + return uri; + } + + @JsonGetter("size") + @Override + public long size() { + return size; + } + + @JsonGetter("lastModifiedTime") + @Override + public long lastModifiedTime() { + return lastModifiedTime; + } + + @JsonGetter("resourceType") + public String resourceTypeValue() { + return resourceType().getValue(); + } + + @Override + public byte[] toBytes() { + try { + return IOUtils.readFully(newInputStream(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public SeekableInputStream newInputStream() throws IOException { + return fileIO.newInputStream(new Path(uri)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AbstractResource that = (AbstractResource) o; + return size == that.size + && lastModifiedTime == that.lastModifiedTime + && Objects.equals(identifier, that.identifier) + && Objects.equals(comment, that.comment) + && Objects.equals(uri, that.uri); + } + + @Override + public int hashCode() { + return Objects.hash(identifier, comment, uri, size, lastModifiedTime); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/resource/ArchiveResource.java b/paimon-core/src/main/java/org/apache/paimon/resource/ArchiveResource.java new file mode 100644 index 000000000000..2397fdda6227 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/resource/ArchiveResource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; + +import javax.annotation.Nullable; + +/** A {@link Resource} implementation for archive resources (e.g., zip, tar). */ +public class ArchiveResource extends AbstractResource { + + private static final long serialVersionUID = 1L; + + public ArchiveResource( + Identifier identifier, + @Nullable String comment, + String uri, + long size, + long lastModifiedTime, + FileIO fileIO) { + super(identifier, comment, uri, size, lastModifiedTime, fileIO); + } + + @Override + public ResourceType resourceType() { + return ResourceType.ARCHIVE; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/resource/FileResource.java b/paimon-core/src/main/java/org/apache/paimon/resource/FileResource.java new file mode 100644 index 000000000000..4e9266c8deaa --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/resource/FileResource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; + +import javax.annotation.Nullable; + +/** A {@link Resource} implementation for general file resources. */ +public class FileResource extends AbstractResource { + + private static final long serialVersionUID = 1L; + + public FileResource( + Identifier identifier, + @Nullable String comment, + String uri, + long size, + long lastModifiedTime, + FileIO fileIO) { + super(identifier, comment, uri, size, lastModifiedTime, fileIO); + } + + @Override + public ResourceType resourceType() { + return ResourceType.FILE; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/resource/JarResource.java b/paimon-core/src/main/java/org/apache/paimon/resource/JarResource.java new file mode 100644 index 000000000000..0b3fc32dc7a7 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/resource/JarResource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; + +import javax.annotation.Nullable; + +/** A {@link Resource} implementation for JAR resources. */ +public class JarResource extends AbstractResource { + + private static final long serialVersionUID = 1L; + + public JarResource( + Identifier identifier, + @Nullable String comment, + String uri, + long size, + long lastModifiedTime, + FileIO fileIO) { + super(identifier, comment, uri, size, lastModifiedTime, fileIO); + } + + @Override + public ResourceType resourceType() { + return ResourceType.JAR; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/resource/PyResource.java b/paimon-core/src/main/java/org/apache/paimon/resource/PyResource.java new file mode 100644 index 000000000000..4389e989282e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/resource/PyResource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; + +import javax.annotation.Nullable; + +/** A {@link Resource} implementation for Python resources. */ +public class PyResource extends AbstractResource { + + private static final long serialVersionUID = 1L; + + public PyResource( + Identifier identifier, + @Nullable String comment, + String uri, + long size, + long lastModifiedTime, + FileIO fileIO) { + super(identifier, comment, uri, size, lastModifiedTime, fileIO); + } + + @Override + public ResourceType resourceType() { + return ResourceType.PY; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/resource/Resource.java b/paimon-core/src/main/java/org/apache/paimon/resource/Resource.java new file mode 100644 index 000000000000..0325f63b6dcd --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/resource/Resource.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.annotation.Public; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.SeekableInputStream; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; + +/** + * A resource provides basic abstraction for external resources managed by Paimon, such as files, + * archives, JARs, and Python scripts. + * + * @since 0.4.0 + */ +@Public +public interface Resource extends Serializable { + + /** A name to identify this resource. */ + String name(); + + /** Full name of the resource, default is database.resourceName. */ + String fullName(); + + /** Optional comment describing this resource. */ + Optional comment(); + + /** The URI pointing to the location of this resource. */ + String uri(); + + /** The size of this resource in bytes. */ + long size(); + + /** The last modified time of this resource in milliseconds since epoch. */ + long lastModifiedTime(); + + /** The type of this resource. */ + ResourceType resourceType(); + + /** Returns the contents of this resource as bytes. */ + byte[] toBytes(); + + /** Opens a new input stream for this resource. */ + SeekableInputStream newInputStream() throws IOException; + + /** + * Creates a {@link Resource} instance based on the given {@link ResourceType}. + * + * @param resourceType the type of resource to create + * @param identifier the identifier of the resource + * @param comment optional comment describing the resource + * @param uri the URI pointing to the resource location + * @param size the size of the resource in bytes + * @param lastModifiedTime the last modified time in milliseconds since epoch + * @param fileIO the {@link FileIO} used to read the resource URI + * @return a concrete {@link Resource} instance + */ + static Resource toResource( + ResourceType resourceType, + Identifier identifier, + @Nullable String comment, + String uri, + long size, + long lastModifiedTime, + FileIO fileIO) { + String name = identifier.getObjectName(); + switch (resourceType) { + case FILE: + return new FileResource(identifier, comment, uri, size, lastModifiedTime, fileIO); + case ARCHIVE: + return new ArchiveResource( + identifier, comment, uri, size, lastModifiedTime, fileIO); + case JAR: + if (!name.endsWith(".jar")) { + throw new IllegalArgumentException( + "JAR resource name must end with '.jar', but got: " + name); + } + return new JarResource(identifier, comment, uri, size, lastModifiedTime, fileIO); + case PY: + if (!name.endsWith(".py")) { + throw new IllegalArgumentException( + "PY resource name must end with '.py', but got: " + name); + } + return new PyResource(identifier, comment, uri, size, lastModifiedTime, fileIO); + default: + throw new IllegalArgumentException("Unknown resource type: " + resourceType); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 9d76cfdf4fef..ddfb25161cda 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -42,6 +42,9 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.resource.Resource; +import org.apache.paimon.resource.ResourceChange; +import org.apache.paimon.resource.ResourceType; import org.apache.paimon.rest.exceptions.AlreadyExistsException; import org.apache.paimon.rest.exceptions.BadRequestException; import org.apache.paimon.rest.exceptions.ForbiddenException; @@ -52,6 +55,7 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.GetFunctionResponse; +import org.apache.paimon.rest.responses.GetResourceResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetTagResponse; import org.apache.paimon.rest.responses.GetViewResponse; @@ -910,6 +914,139 @@ public PagedList listFunctionDetailsPaged( } } + @Override + public List listResources(String databaseName) throws DatabaseNotExistException { + try { + return api.listResources(databaseName); + } catch (NoSuchResourceException e) { + throw new DatabaseNotExistException(databaseName); + } catch (ForbiddenException e) { + throw new DatabaseNoPermissionException(databaseName, e); + } + } + + @Override + public Resource getResource(Identifier identifier) throws ResourceNotExistException { + try { + GetResourceResponse response = api.getResource(identifier); + return toResource(identifier, response); + } catch (NoSuchResourceException e) { + throw new ResourceNotExistException(identifier, e); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); + } + } + + @Override + public void createResource(Identifier identifier, Resource resource, boolean ignoreIfExists) + throws ResourceAlreadyExistException, DatabaseNotExistException { + RESTFunctionValidator.checkFunctionName(identifier.getObjectName()); + try { + api.createResource( + identifier, + resource.comment().orElse(null), + resource.uri(), + resource.resourceType()); + } catch (NoSuchResourceException e) { + throw new DatabaseNotExistException(identifier.getDatabaseName(), e); + } catch (AlreadyExistsException e) { + if (ignoreIfExists) { + return; + } + throw new ResourceAlreadyExistException(identifier, e); + } + } + + @Override + public void dropResource(Identifier identifier, boolean ignoreIfNotExists) + throws ResourceNotExistException { + RESTFunctionValidator.checkFunctionName(identifier.getObjectName()); + try { + api.dropResource(identifier); + } catch (NoSuchResourceException e) { + if (ignoreIfNotExists) { + return; + } + throw new ResourceNotExistException(identifier, e); + } + } + + @Override + public void alterResource( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws ResourceNotExistException { + try { + api.alterResource(identifier, changes); + } catch (NoSuchResourceException e) { + if (!ignoreIfNotExists) { + throw new ResourceNotExistException(identifier, e); + } + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); + } catch (BadRequestException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public PagedList listResourcesPaged( + String databaseName, + @Nullable Integer maxResults, + @Nullable String pageToken, + @Nullable String resourceNamePattern) + throws DatabaseNotExistException { + try { + return api.listResourcesPaged(databaseName, maxResults, pageToken, resourceNamePattern); + } catch (NoSuchResourceException e) { + throw new DatabaseNotExistException(databaseName); + } + } + + @Override + public PagedList listResourceDetailsPaged( + String databaseName, + @Nullable Integer maxResults, + @Nullable String pageToken, + @Nullable String resourceNamePattern) + throws DatabaseNotExistException { + try { + PagedList resources = + api.listResourceDetailsPaged( + databaseName, maxResults, pageToken, resourceNamePattern); + return new PagedList<>( + resources.getElements().stream() + .map(r -> toResource(Identifier.create(databaseName, r.name()), r)) + .collect(Collectors.toList()), + resources.getNextPageToken()); + } catch (NoSuchResourceException e) { + throw new DatabaseNotExistException(databaseName); + } + } + + private Resource toResource(Identifier identifier, GetResourceResponse response) { + String uri = response.uri(); + return Resource.toResource( + ResourceType.fromValue(response.resourceType()), + identifier, + response.comment(), + uri, + response.size(), + response.lastModifiedTime(), + fileIOForData(new Path(uri), identifier)); + } + + @Override + public PagedList listResourcesPagedGlobally( + @Nullable String databaseNamePattern, + @Nullable String resourceNamePattern, + @Nullable Integer maxResults, + @Nullable String pageToken) { + PagedList resources = + api.listResourcesPagedGlobally( + databaseNamePattern, resourceNamePattern, maxResults, pageToken); + return new PagedList<>(resources.getElements(), resources.getNextPageToken()); + } + @Override public View getView(Identifier identifier) throws ViewNotExistException { try { diff --git a/paimon-core/src/test/java/org/apache/paimon/resource/ResourceTest.java b/paimon-core/src/test/java/org/apache/paimon/resource/ResourceTest.java new file mode 100644 index 000000000000..c52b302de4fa --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/resource/ResourceTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.resource; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.local.LocalFileIO; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link Resource}. */ +public class ResourceTest { + + @TempDir Path tempDir; + + @Test + public void testReadLocalResource() throws Exception { + byte[] data = new byte[] {1, 2, 3, 4}; + Path file = tempDir.resolve("resource.bin"); + Files.write(file, data); + + Resource resource = + Resource.toResource( + ResourceType.FILE, + Identifier.create("default", "resource"), + null, + file.toString(), + data.length, + Files.getLastModifiedTime(file).toMillis(), + LocalFileIO.create()); + + assertThat(resource.toBytes()).isEqualTo(data); + try (SeekableInputStream in = resource.newInputStream()) { + assertThat(in.read()).isEqualTo(1); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index 4d4f101bd614..43c300ae10b4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -19,6 +19,7 @@ package org.apache.paimon.rest; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.function.Function; import org.apache.paimon.function.FunctionChange; import org.apache.paimon.function.FunctionDefinition; @@ -28,12 +29,17 @@ import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.UpperTransform; +import org.apache.paimon.resource.Resource; +import org.apache.paimon.resource.ResourceChange; +import org.apache.paimon.resource.ResourceType; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterFunctionRequest; +import org.apache.paimon.rest.requests.AlterResourceRequest; import org.apache.paimon.rest.requests.AlterTableRequest; import org.apache.paimon.rest.requests.AlterViewRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.requests.CreateFunctionRequest; +import org.apache.paimon.rest.requests.CreateResourceRequest; import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.requests.CreateViewRequest; import org.apache.paimon.rest.requests.RenameTableRequest; @@ -42,6 +48,7 @@ import org.apache.paimon.rest.responses.AuthTableQueryResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.GetFunctionResponse; +import org.apache.paimon.rest.responses.GetResourceResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetTableTokenResponse; import org.apache.paimon.rest.responses.GetViewResponse; @@ -367,6 +374,49 @@ public static AlterFunctionRequest alterFunctionRequest() { return new AlterFunctionRequest(functionChanges); } + public static Resource resource(Identifier identifier) { + return Resource.toResource( + ResourceType.FILE, + identifier, + "comment", + "/path/to/" + identifier.getObjectName(), + 1024L, + System.currentTimeMillis(), + LocalFileIO.create()); + } + + public static GetResourceResponse getResourceResponse() { + Resource resource = resource(Identifier.create(databaseName(), "resource")); + return new GetResourceResponse( + resource.name(), + resource.comment().orElse(null), + resource.uri(), + resource.size(), + resource.lastModifiedTime(), + resource.resourceType().getValue(), + "owner", + 1L, + "owner", + 1L, + "owner"); + } + + public static CreateResourceRequest createResourceRequest() { + Resource resource = resource(Identifier.create(databaseName(), "resource")); + return new CreateResourceRequest( + resource.name(), + resource.comment().orElse(null), + resource.uri(), + resource.resourceType().getValue()); + } + + public static AlterResourceRequest alterResourceRequest() { + List resourceChanges = new ArrayList<>(); + resourceChanges.add(ResourceChange.updateComment("comment")); + resourceChanges.add(ResourceChange.updateUri("/new/path/to/resource")); + return new AlterResourceRequest(resourceChanges); + } + private static ViewSchema viewSchema() { List fields = Arrays.asList( diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java index 1b8b1f71d0c0..b4ffcac45b32 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java @@ -20,10 +20,12 @@ import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterFunctionRequest; +import org.apache.paimon.rest.requests.AlterResourceRequest; import org.apache.paimon.rest.requests.AlterTableRequest; import org.apache.paimon.rest.requests.AlterViewRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.requests.CreateFunctionRequest; +import org.apache.paimon.rest.requests.CreateResourceRequest; import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.requests.CreateViewRequest; import org.apache.paimon.rest.requests.RenameTableRequest; @@ -34,6 +36,7 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.GetFunctionResponse; +import org.apache.paimon.rest.responses.GetResourceResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetTableTokenResponse; import org.apache.paimon.rest.responses.GetViewResponse; @@ -300,6 +303,41 @@ public void alterFunctionRequestParseTest() throws JsonProcessingException { assertEquals(parseData.changes().size(), request.changes().size()); } + @Test + public void getResourceResponseParseTest() throws Exception { + GetResourceResponse response = MockRESTMessage.getResourceResponse(); + String responseStr = RESTApi.toJson(response); + GetResourceResponse parseData = RESTApi.fromJson(responseStr, GetResourceResponse.class); + assertEquals(response.name(), parseData.name()); + assertEquals(response.comment(), parseData.comment()); + assertEquals(response.uri(), parseData.uri()); + assertEquals(response.size(), parseData.size()); + assertEquals(response.lastModifiedTime(), parseData.lastModifiedTime()); + assertEquals(response.resourceType(), parseData.resourceType()); + } + + @Test + public void createResourceRequestParseTest() throws JsonProcessingException { + CreateResourceRequest request = MockRESTMessage.createResourceRequest(); + String requestStr = RESTApi.toJson(request); + CreateResourceRequest parseData = RESTApi.fromJson(requestStr, CreateResourceRequest.class); + assertEquals(request.name(), parseData.name()); + assertEquals(request.comment(), parseData.comment()); + assertEquals(request.uri(), parseData.uri()); + assertEquals(request.resourceType(), parseData.resourceType()); + } + + @Test + public void alterResourceRequestParseTest() throws JsonProcessingException { + AlterResourceRequest request = MockRESTMessage.alterResourceRequest(); + String requestStr = RESTApi.toJson(request); + AlterResourceRequest parseData = RESTApi.fromJson(requestStr, AlterResourceRequest.class); + assertEquals(request.changes().size(), parseData.changes().size()); + for (int i = 0; i < request.changes().size(); i++) { + assertEquals(request.changes().get(i), parseData.changes().get(i)); + } + } + @Test public void authTableQueryResponseParseTest() throws Exception { AuthTableQueryResponse response = MockRESTMessage.authTableQueryResponse(); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index af5d94e3f632..ec7e863b5c4e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -45,10 +45,14 @@ import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.Transform; +import org.apache.paimon.resource.Resource; +import org.apache.paimon.resource.ResourceChange; +import org.apache.paimon.resource.ResourceType; import org.apache.paimon.rest.auth.AuthProvider; import org.apache.paimon.rest.auth.RESTAuthParameter; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterFunctionRequest; +import org.apache.paimon.rest.requests.AlterResourceRequest; import org.apache.paimon.rest.requests.AlterTableRequest; import org.apache.paimon.rest.requests.AlterViewRequest; import org.apache.paimon.rest.requests.AuthTableQueryRequest; @@ -56,6 +60,7 @@ import org.apache.paimon.rest.requests.CreateBranchRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.requests.CreateFunctionRequest; +import org.apache.paimon.rest.requests.CreateResourceRequest; import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.requests.CreateTagRequest; import org.apache.paimon.rest.requests.CreateViewRequest; @@ -73,6 +78,7 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.GetFunctionResponse; +import org.apache.paimon.rest.responses.GetResourceResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetTableSnapshotResponse; import org.apache.paimon.rest.responses.GetTableTokenResponse; @@ -86,6 +92,9 @@ import org.apache.paimon.rest.responses.ListFunctionsGloballyResponse; import org.apache.paimon.rest.responses.ListFunctionsResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; +import org.apache.paimon.rest.responses.ListResourceDetailsResponse; +import org.apache.paimon.rest.responses.ListResourcesGloballyResponse; +import org.apache.paimon.rest.responses.ListResourcesResponse; import org.apache.paimon.rest.responses.ListSnapshotsResponse; import org.apache.paimon.rest.responses.ListTableDetailsResponse; import org.apache.paimon.rest.responses.ListTablesGloballyResponse; @@ -163,12 +172,15 @@ import static org.apache.paimon.rest.RESTApi.MAX_RESULTS; import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN; import static org.apache.paimon.rest.RESTApi.PARTITION_NAME_PATTERN; +import static org.apache.paimon.rest.RESTApi.RESOURCE_NAME_PATTERN; import static org.apache.paimon.rest.RESTApi.TABLE_NAME_PATTERN; import static org.apache.paimon.rest.RESTApi.TABLE_TYPE; import static org.apache.paimon.rest.RESTApi.TAG_NAME_PREFIX; import static org.apache.paimon.rest.RESTApi.VIEW_NAME_PATTERN; import static org.apache.paimon.rest.ResourcePaths.FUNCTIONS; import static org.apache.paimon.rest.ResourcePaths.FUNCTION_DETAILS; +import static org.apache.paimon.rest.ResourcePaths.RESOURCES; +import static org.apache.paimon.rest.ResourcePaths.RESOURCE_DETAILS; import static org.apache.paimon.rest.ResourcePaths.TABLE_DETAILS; import static org.apache.paimon.rest.ResourcePaths.VIEWS; import static org.apache.paimon.rest.ResourcePaths.VIEW_DETAILS; @@ -197,6 +209,7 @@ public class RESTCatalogServer { private final List noPermissionTables = new ArrayList<>(); private final List noPermissionViews = new ArrayList<>(); private final Map functionStore = new HashMap<>(); + private final Map resourceStore = new HashMap<>(); private final Map> columnAuthHandler = new HashMap<>(); private final Map> rowFilterAuthHandler = new HashMap<>(); private final Map> columnMaskingAuthHandler = new HashMap<>(); @@ -353,11 +366,12 @@ && isTableByIdRequest(request.getPath())) { } else if (StringUtils.startsWith( request.getPath(), resourcePaths.functions())) { return functionsHandle(parameters); - } else if (request.getPath().startsWith(databaseUri)) { + } else if (StringUtils.startsWith( + request.getPath(), resourcePaths.resources())) { + return resourcesHandle(parameters); + } else if (resourcePath.startsWith(databaseUri)) { String[] resources = - request.getPath() - .substring((databaseUri + "/").length()) - .split("/"); + resourcePath.substring((databaseUri + "/").length()).split("/"); String databaseName = RESTUtil.decodeString(resources[0]); if (noPermissionDatabases.contains(databaseName)) { throw new Catalog.DatabaseNoPermissionException(databaseName); @@ -462,6 +476,12 @@ && isTableByIdRequest(request.getPath())) { resources.length >= 5 && ResourcePaths.TABLES.equals(resources[1]) && ResourcePaths.TAGS.equals(resources[3]); + boolean isResources = + resources.length == 2 && resources[1].startsWith(RESOURCES); + boolean isResourceDetails = + resources.length == 2 && resources[1].startsWith(RESOURCE_DETAILS); + boolean isResource = + resources.length == 3 && RESOURCES.equals(resources[1]); Identifier identifier = resources.length >= 3 && !"rename".equals(resources[2]) @@ -594,6 +614,19 @@ && isTableByIdRequest(request.getPath())) { restAuthParameter.method(), identifier, restAuthParameter.data()); + } else if (isResource) { + return resourceApiHandler( + identifier, + restAuthParameter.method(), + restAuthParameter.data()); + } else if (isResources) { + return resourcesApiHandler( + databaseName, + restAuthParameter.method(), + restAuthParameter.data(), + parameters); + } else if (isResourceDetails) { + return resourceDetailsHandle(databaseName, parameters); } else { return databaseHandle( restAuthParameter.method(), @@ -738,6 +771,22 @@ && isTableByIdRequest(request.getPath())) { e.getMessage(), 409); return mockResponse(response, 409); + } catch (Catalog.ResourceNotExistException e) { + response = + new ErrorResponse( + "RESOURCE", + e.identifier().getObjectName(), + e.getMessage(), + 404); + return mockResponse(response, 404); + } catch (Catalog.ResourceAlreadyExistException e) { + response = + new ErrorResponse( + "RESOURCE", + e.identifier().getObjectName(), + e.getMessage(), + 409); + return mockResponse(response, 409); } catch (IllegalArgumentException e) { response = new ErrorResponse(null, null, e.getMessage(), 400); return mockResponse(response, 400); @@ -757,9 +806,11 @@ && isTableByIdRequest(request.getPath())) { response = new ErrorResponse(null, null, e.getMessage(), 500); return mockResponse(response, 500); } + String errorMsg = + e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); return new MockResponse() .setResponseCode(500) - .setBody(e.getCause().getMessage()); + .setBody(errorMsg != null ? errorMsg : "Internal Server Error"); } } }; @@ -1271,6 +1322,164 @@ private GetFunctionResponse toGetFunctionResponse(Function function) { "owner"); } + // ==================== Resource Handlers ========================== + + private MockResponse resourcesApiHandler( + String databaseName, String method, String data, Map parameters) + throws Exception { + switch (method) { + case "GET": + String resourceNamePattern = parameters.get(RESOURCE_NAME_PATTERN); + List resourceNames = + resourceStore.keySet().stream() + .map(Identifier::fromString) + .filter( + id -> + id.getDatabaseName().equals(databaseName) + && (Objects.isNull(resourceNamePattern) + || matchNamePattern( + id.getObjectName(), + resourceNamePattern))) + .map(Identifier::getObjectName) + .collect(Collectors.toList()); + return generateFinalListResourcesResponse(parameters, resourceNames); + case "POST": + CreateResourceRequest createRequest = + RESTApi.fromJson(data, CreateResourceRequest.class); + Identifier resourceId = Identifier.create(databaseName, createRequest.name()); + LOG.info( + "resourcesApiHandler POST: databaseName={}, resourceId.fullName={}", + databaseName, + resourceId.getFullName()); + if (resourceStore.containsKey(resourceId.getFullName())) { + throw new Catalog.ResourceAlreadyExistException(resourceId); + } + Resource resource = + Resource.toResource( + ResourceType.fromValue(createRequest.resourceType()), + resourceId, + createRequest.comment(), + createRequest.uri(), + 0L, + System.currentTimeMillis(), + LocalFileIO.create()); + resourceStore.put(resourceId.getFullName(), resource); + return new MockResponse().setResponseCode(200); + default: + return new MockResponse().setResponseCode(404); + } + } + + private MockResponse resourceApiHandler(Identifier identifier, String method, String data) + throws Exception { + if (!resourceStore.containsKey(identifier.getFullName())) { + throw new Catalog.ResourceNotExistException(identifier); + } + switch (method) { + case "GET": + Resource resource = resourceStore.get(identifier.getFullName()); + GetResourceResponse getResponse = toGetResourceResponse(resource); + return mockResponse(getResponse, 200); + case "DELETE": + resourceStore.remove(identifier.getFullName()); + return new MockResponse().setResponseCode(200); + case "POST": + AlterResourceRequest alterRequest = + RESTApi.fromJson(data, AlterResourceRequest.class); + Resource existingResource = resourceStore.get(identifier.getFullName()); + String newComment = existingResource.comment().orElse(null); + String newUri = existingResource.uri(); + for (ResourceChange change : alterRequest.changes()) { + if (change instanceof ResourceChange.UpdateResourceComment) { + newComment = ((ResourceChange.UpdateResourceComment) change).comment(); + } else if (change instanceof ResourceChange.UpdateResourceUri) { + newUri = ((ResourceChange.UpdateResourceUri) change).uri(); + } + } + Resource updatedResource = + Resource.toResource( + existingResource.resourceType(), + identifier, + newComment, + newUri, + existingResource.size(), + System.currentTimeMillis(), + LocalFileIO.create()); + resourceStore.put(identifier.getFullName(), updatedResource); + return new MockResponse().setResponseCode(200); + default: + return new MockResponse().setResponseCode(404); + } + } + + private MockResponse resourceDetailsHandle( + String databaseName, Map parameters) { + String resourceNamePattern = parameters.get(RESOURCE_NAME_PATTERN); + List resourceDetails = + resourceStore.entrySet().stream() + .filter( + e -> { + Identifier id = Identifier.fromString(e.getKey()); + return id.getDatabaseName().equals(databaseName) + && (Objects.isNull(resourceNamePattern) + || matchNamePattern( + id.getObjectName(), + resourceNamePattern)); + }) + .map(e -> toGetResourceResponse(e.getValue())) + .collect(Collectors.toList()); + + int maxResults; + try { + maxResults = getMaxResults(parameters); + } catch (NumberFormatException e) { + return handleInvalidMaxResults(parameters); + } + String pageToken = parameters.getOrDefault(PAGE_TOKEN, null); + + if (!resourceDetails.isEmpty()) { + PagedList pagedResourceDetails = + buildPagedEntities(resourceDetails, maxResults, pageToken); + return mockResponse( + new ListResourceDetailsResponse( + pagedResourceDetails.getElements(), + pagedResourceDetails.getNextPageToken()), + 200); + } + return mockResponse(new ListResourceDetailsResponse(Collections.emptyList(), null), 200); + } + + private MockResponse generateFinalListResourcesResponse( + Map parameters, List resourceNames) { + int maxResults; + try { + maxResults = getMaxResults(parameters); + } catch (NumberFormatException e) { + return handleInvalidMaxResults(parameters); + } + String pageToken = parameters.getOrDefault(PAGE_TOKEN, null); + PagedList pagedResources = buildPagedEntities(resourceNames, maxResults, pageToken); + RESTResponse response = + new ListResourcesResponse( + pagedResources.getElements(), pagedResources.getNextPageToken()); + return mockResponse(response, 200); + } + + private GetResourceResponse toGetResourceResponse(Resource resource) { + return new GetResourceResponse( + resource.name(), + resource.comment().orElse(null), + resource.uri(), + resource.size(), + resource.lastModifiedTime(), + resource.resourceType().getValue(), + "owner", + 1L, + "owner", + 1L, + "owner"); + } + private MockResponse databasesApiHandler( String method, String data, Map parameters) throws Exception { switch (method) { @@ -2349,6 +2558,44 @@ private List listViews(Map parameters) { return fullViews; } + private MockResponse resourcesHandle(Map parameters) { + RESTResponse response; + List resources = listResources(parameters); + if (!resources.isEmpty()) { + int maxResults; + try { + maxResults = getMaxResults(parameters); + } catch (NumberFormatException e) { + return handleInvalidMaxResults(parameters); + } + String pageToken = parameters.get(PAGE_TOKEN); + PagedList pagedResources = + buildPagedEntities(resources, maxResults, pageToken); + response = + new ListResourcesGloballyResponse( + pagedResources.getElements(), pagedResources.getNextPageToken()); + } else { + response = new ListResourcesGloballyResponse(Collections.emptyList(), null); + } + return mockResponse(response, 200); + } + + private List listResources(Map parameters) { + String resourceNamePattern = parameters.get(RESOURCE_NAME_PATTERN); + String databaseNamePattern = parameters.get(DATABASE_NAME_PATTERN); + List fullResources = new ArrayList<>(); + for (Map.Entry entry : resourceStore.entrySet()) { + Identifier identifier = Identifier.fromString(entry.getKey()); + if ((Objects.isNull(databaseNamePattern) + || matchNamePattern(identifier.getDatabaseName(), databaseNamePattern)) + && (Objects.isNull(resourceNamePattern) + || matchNamePattern(identifier.getObjectName(), resourceNamePattern))) { + fullResources.add(identifier); + } + } + return fullResources; + } + private MockResponse viewHandle(String method, Identifier identifier, String requestData) throws Exception { RESTResponse response; @@ -2801,6 +3048,8 @@ private String getPagedKey(T entity) { } else if (entity instanceof GetFunctionResponse) { GetFunctionResponse functionResponse = (GetFunctionResponse) entity; return functionResponse.name(); + } else if (entity instanceof GetResourceResponse) { + return ((GetResourceResponse) entity).name(); } else if (entity instanceof Identifier) { Identifier identifier = (Identifier) entity; return identifier.getFullName(); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 6ff873aed17d..832c8963d676 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -62,6 +62,8 @@ import org.apache.paimon.predicate.Transform; import org.apache.paimon.predicate.UpperTransform; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.resource.Resource; +import org.apache.paimon.resource.ResourceChange; import org.apache.paimon.rest.auth.DLFToken; import org.apache.paimon.rest.exceptions.BadRequestException; import org.apache.paimon.rest.exceptions.ForbiddenException; @@ -2602,6 +2604,320 @@ public void testValidateFunctionName() throws Exception { () -> RESTFunctionValidator.checkFunctionName(null)); } + @Test + void testResource() throws Exception { + Identifier identifierWithSlash = new Identifier("rest_catalog_db", "resource/"); + catalog.createDatabase(identifierWithSlash.getDatabaseName(), false); + assertThrows( + IllegalArgumentException.class, + () -> + catalog.createResource( + identifierWithSlash, + MockRESTMessage.resource(identifierWithSlash), + false)); + assertThrows( + Catalog.ResourceNotExistException.class, + () -> catalog.getResource(identifierWithSlash)); + assertThrows( + IllegalArgumentException.class, + () -> catalog.dropResource(identifierWithSlash, true)); + + Identifier identifierWithoutAlphabet = new Identifier("rest_catalog_db", "-"); + assertThrows( + IllegalArgumentException.class, + () -> + catalog.createResource( + identifierWithoutAlphabet, + MockRESTMessage.resource(identifierWithoutAlphabet), + false)); + assertThrows( + Catalog.ResourceNotExistException.class, + () -> catalog.getResource(identifierWithoutAlphabet)); + assertThrows( + IllegalArgumentException.class, + () -> catalog.dropResource(identifierWithoutAlphabet, true)); + + Identifier identifier = Identifier.fromString("rest_catalog_db.resource.na_me-01"); + Resource resource = MockRESTMessage.resource(identifier); + + catalog.createResource(identifier, resource, true); + assertThrows( + Catalog.ResourceAlreadyExistException.class, + () -> catalog.createResource(identifier, resource, false)); + + assertThat(catalog.listResources(identifier.getDatabaseName()).contains(resource.name())) + .isTrue(); + + Resource getResource = catalog.getResource(identifier); + assertThat(getResource.name()).isEqualTo(resource.name()); + assertThat(getResource.uri()).isEqualTo(resource.uri()); + assertThat(getResource.resourceType()).isEqualTo(resource.resourceType()); + assertThat(getResource.comment()).isEqualTo(resource.comment()); + + catalog.dropResource(identifier, true); + + assertThat(catalog.listResources(identifier.getDatabaseName()).contains(resource.name())) + .isFalse(); + assertThrows( + Catalog.ResourceNotExistException.class, + () -> catalog.dropResource(identifier, false)); + assertThrows( + Catalog.ResourceNotExistException.class, () -> catalog.getResource(identifier)); + } + + @Test + void testListResourcesPaged() throws Exception { + String databaseName = "resources_paged_db"; + catalog.createDatabase(databaseName, false); + + // empty database returns empty list + PagedList pagedResources = + catalog.listResourcesPaged(databaseName, null, null, null); + assertThat(pagedResources.getElements()).isEmpty(); + assertNull(pagedResources.getNextPageToken()); + + String[] resourceNames = {"res1", "res2", "res3", "abd", "def", "resource_name"}; + for (String name : resourceNames) { + Identifier id = Identifier.create(databaseName, name); + catalog.createResource(id, MockRESTMessage.resource(id), false); + } + + // null maxResults returns all resources + String[] sortedNames = Arrays.stream(resourceNames).sorted().toArray(String[]::new); + pagedResources = catalog.listResourcesPaged(databaseName, null, null, null); + assertThat(pagedResources.getElements()).containsExactly(sortedNames); + assertNull(pagedResources.getNextPageToken()); + + // paged traversal + int maxResults = 2; + pagedResources = catalog.listResourcesPaged(databaseName, maxResults, null, null); + assertThat(pagedResources.getElements()).containsExactly("abd", "def"); + assertEquals("def", pagedResources.getNextPageToken()); + + pagedResources = + catalog.listResourcesPaged( + databaseName, maxResults, pagedResources.getNextPageToken(), null); + assertThat(pagedResources.getElements()).containsExactly("res1", "res2"); + assertEquals("res2", pagedResources.getNextPageToken()); + + pagedResources = + catalog.listResourcesPaged( + databaseName, maxResults, pagedResources.getNextPageToken(), null); + assertThat(pagedResources.getElements()).containsExactly("res3", "resource_name"); + assertEquals("resource_name", pagedResources.getNextPageToken()); + + pagedResources = + catalog.listResourcesPaged( + databaseName, maxResults, pagedResources.getNextPageToken(), null); + assertThat(pagedResources.getElements()).isEmpty(); + assertNull(pagedResources.getNextPageToken()); + + // pattern matching + pagedResources = catalog.listResourcesPaged(databaseName, null, null, "res%"); + assertThat(pagedResources.getElements()) + .containsExactly("res1", "res2", "res3", "resource_name"); + assertNull(pagedResources.getNextPageToken()); + + pagedResources = catalog.listResourcesPaged(databaseName, null, null, "resource_%"); + assertThat(pagedResources.getElements()).containsExactly("resource_name"); + assertNull(pagedResources.getNextPageToken()); + + // non-existing database + assertThatExceptionOfType(Catalog.DatabaseNotExistException.class) + .isThrownBy(() -> catalog.listResourcesPaged("non_existing_db", null, null, null)); + + // invalid patterns + Assertions.assertThrows( + BadRequestException.class, + () -> catalog.listResourcesPaged(databaseName, null, null, "%res")); + Assertions.assertThrows( + BadRequestException.class, + () -> catalog.listResourcesPaged(databaseName, null, null, "re%s")); + } + + @Test + void testListResourceDetailsPaged() throws Exception { + String databaseName = "resource_details_paged_db"; + catalog.createDatabase(databaseName, false); + + // empty database returns empty list + PagedList pagedDetails = + catalog.listResourceDetailsPaged(databaseName, null, null, null); + assertThat(pagedDetails.getElements()).isEmpty(); + assertNull(pagedDetails.getNextPageToken()); + + String[] resourceNames = {"res1", "res2", "res3", "abd", "def", "resource_name"}; + for (String name : resourceNames) { + Identifier id = Identifier.create(databaseName, name); + catalog.createResource(id, MockRESTMessage.resource(id), false); + } + + // null maxResults returns all + pagedDetails = catalog.listResourceDetailsPaged(databaseName, null, null, null); + List fullNames = + pagedDetails.getElements().stream() + .map(Resource::fullName) + .collect(Collectors.toList()); + String[] sortedNames = Arrays.stream(resourceNames).sorted().toArray(String[]::new); + assertThat(fullNames) + .containsExactly( + Arrays.stream(sortedNames) + .map(n -> Identifier.create(databaseName, n).getFullName()) + .toArray(String[]::new)); + assertNull(pagedDetails.getNextPageToken()); + + // paged traversal + int maxResults = 2; + pagedDetails = catalog.listResourceDetailsPaged(databaseName, maxResults, null, null); + assertEquals(maxResults, pagedDetails.getElements().size()); + assertThat( + pagedDetails.getElements().stream() + .map(Resource::fullName) + .collect(Collectors.toList())) + .containsExactly( + Identifier.create(databaseName, "abd").getFullName(), + Identifier.create(databaseName, "def").getFullName()); + assertEquals("def", pagedDetails.getNextPageToken()); + + pagedDetails = + catalog.listResourceDetailsPaged( + databaseName, maxResults, pagedDetails.getNextPageToken(), null); + assertEquals(maxResults, pagedDetails.getElements().size()); + assertThat( + pagedDetails.getElements().stream() + .map(Resource::fullName) + .collect(Collectors.toList())) + .containsExactly( + Identifier.create(databaseName, "res1").getFullName(), + Identifier.create(databaseName, "res2").getFullName()); + assertEquals("res2", pagedDetails.getNextPageToken()); + + pagedDetails = + catalog.listResourceDetailsPaged( + databaseName, maxResults, pagedDetails.getNextPageToken(), null); + assertEquals(maxResults, pagedDetails.getElements().size()); + assertThat( + pagedDetails.getElements().stream() + .map(Resource::fullName) + .collect(Collectors.toList())) + .containsExactly( + Identifier.create(databaseName, "res3").getFullName(), + Identifier.create(databaseName, "resource_name").getFullName()); + assertEquals("resource_name", pagedDetails.getNextPageToken()); + + pagedDetails = + catalog.listResourceDetailsPaged( + databaseName, maxResults, pagedDetails.getNextPageToken(), null); + assertThat(pagedDetails.getElements()).isEmpty(); + assertNull(pagedDetails.getNextPageToken()); + + // pattern matching + pagedDetails = catalog.listResourceDetailsPaged(databaseName, null, null, "res%"); + assertThat( + pagedDetails.getElements().stream() + .map(Resource::fullName) + .collect(Collectors.toList())) + .containsExactly( + Identifier.create(databaseName, "res1").getFullName(), + Identifier.create(databaseName, "res2").getFullName(), + Identifier.create(databaseName, "res3").getFullName(), + Identifier.create(databaseName, "resource_name").getFullName()); + + // non-existing database + assertThatExceptionOfType(Catalog.DatabaseNotExistException.class) + .isThrownBy( + () -> + catalog.listResourceDetailsPaged( + "non_existing_db", null, null, null)); + + // invalid patterns + Assertions.assertThrows( + BadRequestException.class, + () -> catalog.listResourceDetailsPaged(databaseName, null, null, "%res")); + Assertions.assertThrows( + BadRequestException.class, + () -> catalog.listResourceDetailsPaged(databaseName, null, null, "re%s")); + } + + @Test + void testListResourcesPagedGlobally() throws Exception { + String databaseName = "list_resources_paged_globally_db"; + String databaseName2 = "sample_resource_db"; + String databaseNamePattern = "list_resources_paged_globally%"; + + catalog.createDatabase(databaseName, false); + catalog.createDatabase(databaseName2, false); + + String[] resourceNames = {"res1", "res2", "abd"}; + for (String name : resourceNames) { + Identifier id = Identifier.create(databaseName, name); + catalog.createResource(id, MockRESTMessage.resource(id), false); + } + Identifier crossDbResource = Identifier.create(databaseName2, "res1"); + catalog.createResource(crossDbResource, MockRESTMessage.resource(crossDbResource), false); + + // list all under databaseNamePattern + Identifier[] expectedIdentifiers = + Arrays.stream(resourceNames) + .map(n -> Identifier.create(databaseName, n)) + .toArray(Identifier[]::new); + PagedList pagedResources = + catalog.listResourcesPagedGlobally(databaseNamePattern, null, null, null); + assertThat(pagedResources.getElements()).containsExactlyInAnyOrder(expectedIdentifiers); + assertNull(pagedResources.getNextPageToken()); + + // paged traversal with loop + List allCollected = new ArrayList<>(); + String pageToken = null; + do { + pagedResources = + catalog.listResourcesPagedGlobally(databaseNamePattern, null, 1, pageToken); + allCollected.addAll(pagedResources.getElements()); + pageToken = pagedResources.getNextPageToken(); + } while (pageToken != null); + assertThat(allCollected).containsExactlyInAnyOrder(expectedIdentifiers); + + // pattern matching on resource name + pagedResources = + catalog.listResourcesPagedGlobally(databaseNamePattern, "res%", null, null); + assertThat(pagedResources.getElements()) + .containsExactlyInAnyOrder( + Identifier.create(databaseName, "res1"), + Identifier.create(databaseName, "res2")); + + // null databaseNamePattern returns resources from all databases + pagedResources = catalog.listResourcesPagedGlobally(null, "res1", null, null); + assertThat(pagedResources.getElements()) + .containsExactlyInAnyOrder( + Identifier.create(databaseName, "res1"), crossDbResource); + } + + @Test + void testAlterResource() throws Exception { + Identifier identifier = new Identifier("rest_catalog_db", "alter_resource_name"); + catalog.createDatabase(identifier.getDatabaseName(), false); + Resource resource = MockRESTMessage.resource(identifier); + ResourceChange updateUri = ResourceChange.updateUri("/new/path/to/resource"); + assertDoesNotThrow( + () -> catalog.alterResource(identifier, ImmutableList.of(updateUri), true)); + assertThrows( + Catalog.ResourceNotExistException.class, + () -> catalog.alterResource(identifier, ImmutableList.of(updateUri), false)); + catalog.createResource(identifier, resource, true); + + // update uri + catalog.alterResource(identifier, ImmutableList.of(updateUri), false); + Resource catalogResource = catalog.getResource(identifier); + assertThat(catalogResource.uri()).isEqualTo("/new/path/to/resource"); + + // update comment + String newComment = "new comment"; + catalog.alterResource( + identifier, ImmutableList.of(ResourceChange.updateComment(newComment)), false); + catalogResource = catalog.getResource(identifier); + assertThat(catalogResource.comment().orElse(null)).isEqualTo(newComment); + } + @Test void testTableAuth() throws Exception { Identifier identifier = Identifier.create("test_table_db", "auth_table"); diff --git a/paimon-python/pypaimon/api/api_request.py b/paimon-python/pypaimon/api/api_request.py index 39a01da93e2a..f1c9517fe1c7 100644 --- a/paimon-python/pypaimon/api/api_request.py +++ b/paimon-python/pypaimon/api/api_request.py @@ -23,6 +23,7 @@ from pypaimon.common.json_util import json_field from pypaimon.function.function_change import FunctionChange from pypaimon.function.function_definition import FunctionDefinition +from pypaimon.resource.resource_change import ResourceChange from pypaimon.schema.data_types import DataField from pypaimon.schema.schema import Schema from pypaimon.schema.schema_change import SchemaChange @@ -157,6 +158,39 @@ def to_dict(self) -> Dict: } +@dataclass +class CreateResourceRequest(RESTRequest): + FIELD_NAME = "name" + FIELD_COMMENT = "comment" + FIELD_URI = "uri" + FIELD_RESOURCE_TYPE = "resourceType" + + name: str = json_field(FIELD_NAME) + comment: Optional[str] = json_field(FIELD_COMMENT, default=None) + uri: Optional[str] = json_field(FIELD_URI, default=None) + resource_type: Optional[str] = json_field(FIELD_RESOURCE_TYPE, default=None) + + def to_dict(self) -> Dict: + return { + self.FIELD_NAME: self.name, + self.FIELD_COMMENT: self.comment, + self.FIELD_URI: self.uri, + self.FIELD_RESOURCE_TYPE: self.resource_type, + } + + +@dataclass +class AlterResourceRequest(RESTRequest): + FIELD_CHANGES = "changes" + + changes: List[ResourceChange] = json_field(FIELD_CHANGES) + + def to_dict(self) -> Dict: + return { + self.FIELD_CHANGES: [c.to_dict() for c in self.changes] + } + + # Wire DTO for ``POST /databases/{db}/tables/{tbl}/tags``. Mirrors Java # ``CreateTagRequest`` (paimon-api/.../rest/requests/CreateTagRequest.java) — only # three fields are serialized. ``ignoreIfExists`` is intentionally NOT included diff --git a/paimon-python/pypaimon/api/api_response.py b/paimon-python/pypaimon/api/api_response.py index 2df704b234e6..3745c2915ec1 100644 --- a/paimon-python/pypaimon/api/api_response.py +++ b/paimon-python/pypaimon/api/api_response.py @@ -600,3 +600,175 @@ def to_dict(self) -> Dict: result["functions"] = None result["nextPageToken"] = self.next_page_token return result + + +@dataclass +class GetResourceResponse(AuditRESTResponse): + """Response for getting a resource.""" + FIELD_NAME = "name" + FIELD_COMMENT = "comment" + FIELD_URI = "uri" + FIELD_SIZE = "size" + FIELD_LAST_MODIFIED_TIME = "lastModifiedTime" + FIELD_RESOURCE_TYPE = "resourceType" + + name: Optional[str] = json_field(FIELD_NAME, default=None) + comment: Optional[str] = json_field(FIELD_COMMENT, default=None) + uri: Optional[str] = json_field(FIELD_URI, default=None) + size: int = json_field(FIELD_SIZE, default=0) + last_modified_time: int = json_field(FIELD_LAST_MODIFIED_TIME, default=0) + resource_type: Optional[str] = json_field(FIELD_RESOURCE_TYPE, default=None) + + def __init__( + self, + name: Optional[str] = None, + comment: Optional[str] = None, + uri: Optional[str] = None, + size: int = 0, + last_modified_time: int = 0, + resource_type: Optional[str] = None, + owner: Optional[str] = None, + created_at: Optional[int] = None, + created_by: Optional[str] = None, + updated_at: Optional[int] = None, + updated_by: Optional[str] = None, + ): + super().__init__(owner, created_at, created_by, updated_at, updated_by) + self.name = name + self.comment = comment + self.uri = uri + self.size = size + self.last_modified_time = last_modified_time + self.resource_type = resource_type + + @classmethod + def from_dict(cls, data: Dict) -> "GetResourceResponse": + return cls( + name=data.get("name"), + comment=data.get("comment"), + uri=data.get("uri"), + size=data.get("size", 0), + last_modified_time=data.get("lastModifiedTime", 0), + resource_type=data.get("resourceType"), + owner=data.get("owner"), + created_at=data.get("createdAt"), + created_by=data.get("createdBy"), + updated_at=data.get("updatedAt"), + updated_by=data.get("updatedBy"), + ) + + def to_dict(self) -> Dict: + result = { + "name": self.name, + "comment": self.comment, + "uri": self.uri, + "size": self.size, + "lastModifiedTime": self.last_modified_time, + "resourceType": self.resource_type, + } + if self.owner is not None: + result["owner"] = self.owner + if self.created_at is not None: + result["createdAt"] = self.created_at + if self.created_by is not None: + result["createdBy"] = self.created_by + if self.updated_at is not None: + result["updatedAt"] = self.updated_at + if self.updated_by is not None: + result["updatedBy"] = self.updated_by + return result + + +@dataclass +class ListResourcesResponse(PagedResponse[str]): + """Response for listing resources.""" + FIELD_RESOURCES = "resources" + + resources: Optional[List[str]] = json_field(FIELD_RESOURCES, default=None) + next_page_token: Optional[str] = json_field( + PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None) + + def data(self) -> Optional[List[str]]: + return self.resources + + def get_next_page_token(self) -> Optional[str]: + return self.next_page_token + + +@dataclass +class ListResourceDetailsResponse(PagedResponse['GetResourceResponse']): + """Response for listing resource details.""" + FIELD_RESOURCES = "resources" + + resources: Optional[List[GetResourceResponse]] = json_field( + FIELD_RESOURCES, default=None) + next_page_token: Optional[str] = json_field( + PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None) + + def data(self) -> Optional[List[GetResourceResponse]]: + return self.resources + + def get_next_page_token(self) -> Optional[str]: + return self.next_page_token + + @classmethod + def from_dict(cls, data: Dict) -> "ListResourceDetailsResponse": + resources = data.get("resources") + if resources is not None: + resources = [GetResourceResponse.from_dict(d) for d in resources] + return cls( + resources=resources, + next_page_token=data.get("nextPageToken"), + ) + + def to_dict(self) -> Dict: + result = {} + if self.resources is not None: + result["resources"] = [d.to_dict() for d in self.resources] + else: + result["resources"] = None + result["nextPageToken"] = self.next_page_token + return result + + +@dataclass +class ListResourcesGloballyResponse(PagedResponse[Identifier]): + """Response for listing resources globally across databases.""" + FIELD_RESOURCES = "resources" + + resources: Optional[List[Identifier]] = json_field(FIELD_RESOURCES, default=None) + next_page_token: Optional[str] = json_field( + PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None) + + def data(self) -> Optional[List[Identifier]]: + return self.resources + + def get_next_page_token(self) -> Optional[str]: + return self.next_page_token + + @classmethod + def from_dict(cls, data: Dict) -> "ListResourcesGloballyResponse": + resources = data.get("resources") + if resources is not None: + resources = [ + Identifier.from_string(f) if isinstance(f, str) else + Identifier.create(f.get("database"), f.get("object")) + if isinstance(f, dict) else f + for f in resources + ] + return cls( + resources=resources, + next_page_token=data.get("nextPageToken"), + ) + + def to_dict(self) -> Dict: + result = {} + if self.resources is not None: + result["resources"] = [ + {"database": f.get_database_name(), "object": f.get_object_name()} + for f in self.resources + ] + else: + result["resources"] = None + result["nextPageToken"] = self.next_page_token + return result diff --git a/paimon-python/pypaimon/api/resource_paths.py b/paimon-python/pypaimon/api/resource_paths.py index fad221c3bf87..89d2b0a117e7 100644 --- a/paimon-python/pypaimon/api/resource_paths.py +++ b/paimon-python/pypaimon/api/resource_paths.py @@ -30,6 +30,8 @@ class ResourcePaths: PARTITIONS = "partitions" FUNCTIONS = "functions" FUNCTION_DETAILS = "function-details" + RESOURCES = "resources" + RESOURCE_DETAILS = "resource-details" TAGS = "tags" BRANCHES = "branches" RENAME = "rename" @@ -104,6 +106,20 @@ def function(self, database_name: str, function_name: str) -> str: return "{}/{}/{}/{}/{}".format(self.base_path, self.DATABASES, RESTUtil.encode_string(database_name), self.FUNCTIONS, RESTUtil.encode_string(function_name)) + def resources(self, database_name: Optional[str] = None) -> str: + if database_name: + return "{}/{}/{}/{}".format(self.base_path, self.DATABASES, + RESTUtil.encode_string(database_name), self.RESOURCES) + return "{}/{}".format(self.base_path, self.RESOURCES) + + def resource_details(self, database_name: str) -> str: + return "{}/{}/{}/{}".format(self.base_path, self.DATABASES, + RESTUtil.encode_string(database_name), self.RESOURCE_DETAILS) + + def resource(self, database_name: str, resource_name: str) -> str: + return "{}/{}/{}/{}/{}".format(self.base_path, self.DATABASES, RESTUtil.encode_string(database_name), + self.RESOURCES, RESTUtil.encode_string(resource_name)) + def tags(self, database_name: str, table_name: str) -> str: return "{}/{}/{}/{}/{}/{}".format( self.base_path, self.DATABASES, RESTUtil.encode_string(database_name), diff --git a/paimon-python/pypaimon/api/rest_api.py b/paimon-python/pypaimon/api/rest_api.py index b6ed08860c5f..6f32a1e34fc4 100755 --- a/paimon-python/pypaimon/api/rest_api.py +++ b/paimon-python/pypaimon/api/rest_api.py @@ -26,7 +26,8 @@ CreateFunctionRequest, CreateTableRequest, CreateTagRequest, ForwardBranchRequest, RenameBranchRequest, RenameTableRequest, - RollbackTableRequest) + RollbackTableRequest, CreateResourceRequest, + AlterResourceRequest) from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse, GetDatabaseResponse, GetFunctionResponse, GetTableResponse, @@ -40,7 +41,10 @@ ListTablesResponse, ListTagsResponse, PagedList, PagedResponse, GetTableSnapshotResponse, - Partition) + Partition, GetResourceResponse, + ListResourcesResponse, + ListResourceDetailsResponse, + ListResourcesGloballyResponse) from pypaimon.api.auth import AuthProviderFactory, RESTAuthFunction from pypaimon.api.client import HttpClient from pypaimon.api.resource_paths import ResourcePaths @@ -62,6 +66,7 @@ class RESTApi: TABLE_NAME_PATTERN = "tableNamePattern" TABLE_TYPE = "tableType" FUNCTION_NAME_PATTERN = "functionNamePattern" + RESOURCE_NAME_PATTERN = "resourceNamePattern" PARTITION_NAME_PATTERN = "partitionNamePattern" TAG_NAME_PREFIX = "tagNamePrefix" TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000 @@ -688,6 +693,124 @@ def alter_function(self, identifier: Identifier, changes: List) -> None: self.rest_auth_function, ) + # ==================== Resources ========================== + + def list_resources(self, database_name: str) -> List[str]: + return self.__list_data_from_page_api( + lambda query_params: self.client.get_with_params( + self.resource_paths.resources(database_name), + query_params, + ListResourcesResponse, + self.rest_auth_function, + ) + ) + + def list_resources_paged( + self, + database_name: str, + max_results: Optional[int] = None, + page_token: Optional[str] = None, + resource_name_pattern: Optional[str] = None, + ) -> PagedList[str]: + response = self.client.get_with_params( + self.resource_paths.resources(database_name), + self.__build_paged_query_params( + max_results, + page_token, + {self.RESOURCE_NAME_PATTERN: resource_name_pattern}, + ), + ListResourcesResponse, + self.rest_auth_function, + ) + resources = response.resources if response.resources else [] + return PagedList(resources, response.get_next_page_token()) + + def list_resource_details_paged( + self, + database_name: str, + max_results: Optional[int] = None, + page_token: Optional[str] = None, + resource_name_pattern: Optional[str] = None, + ) -> PagedList[GetResourceResponse]: + response = self.client.get_with_params( + self.resource_paths.resource_details(database_name), + self.__build_paged_query_params( + max_results, + page_token, + {self.RESOURCE_NAME_PATTERN: resource_name_pattern}, + ), + ListResourceDetailsResponse, + self.rest_auth_function, + ) + resource_details = response.data() if response.data() else [] + return PagedList(resource_details, response.get_next_page_token()) + + def list_resources_paged_globally( + self, + database_name_pattern: Optional[str] = None, + resource_name_pattern: Optional[str] = None, + max_results: Optional[int] = None, + page_token: Optional[str] = None, + ) -> PagedList: + response = self.client.get_with_params( + self.resource_paths.resources(), + self.__build_paged_query_params( + max_results, + page_token, + { + self.DATABASE_NAME_PATTERN: database_name_pattern, + self.RESOURCE_NAME_PATTERN: resource_name_pattern, + }, + ), + ListResourcesGloballyResponse, + self.rest_auth_function, + ) + resources = response.data() if response.data() else [] + return PagedList(resources, response.get_next_page_token()) + + def get_resource(self, identifier: Identifier) -> GetResourceResponse: + return self.client.get( + self.resource_paths.resource( + identifier.get_database_name(), identifier.get_object_name()), + GetResourceResponse, + self.rest_auth_function, + ) + + def create_resource( + self, + identifier: Identifier, + comment: Optional[str], + uri: str, + resource_type: 'ResourceType', + ) -> None: + request = CreateResourceRequest( + name=identifier.get_object_name(), + comment=comment, + uri=uri, + resource_type=resource_type.get_value(), + ) + self.client.post( + self.resource_paths.resources(identifier.get_database_name()), + request, + self.rest_auth_function, + ) + + def drop_resource(self, identifier: Identifier) -> None: + self.client.delete( + self.resource_paths.resource( + identifier.get_database_name(), identifier.get_object_name()), + self.rest_auth_function, + ) + + def alter_resource(self, identifier: Identifier, changes: List) -> None: + request = AlterResourceRequest(changes=changes) + self.client.post( + self.resource_paths.resource( + identifier.get_database_name(), identifier.get_object_name()), + request, + self.rest_auth_function, + ) + @staticmethod def __validate_identifier(identifier: Identifier): if not identifier: diff --git a/paimon-python/pypaimon/catalog/catalog_exception.py b/paimon-python/pypaimon/catalog/catalog_exception.py index d81c90f63b51..aa02494ada3a 100644 --- a/paimon-python/pypaimon/catalog/catalog_exception.py +++ b/paimon-python/pypaimon/catalog/catalog_exception.py @@ -103,6 +103,22 @@ def __init__(self, identifier: Identifier): super().__init__(f"Function {identifier.get_full_name()} already exists") +class ResourceNotExistException(CatalogException): + """Resource not exist exception""" + + def __init__(self, identifier: Identifier): + self.identifier = identifier + super().__init__(f"Resource {identifier.get_full_name()} doesn't exist.") + + +class ResourceAlreadyExistException(CatalogException): + """Resource already exist exception""" + + def __init__(self, identifier: Identifier): + self.identifier = identifier + super().__init__(f"Resource {identifier.get_full_name()} already exists.") + + class ColumnNotExistException(CatalogException): """Column not exist exception""" diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index d6e89d50b9b1..3b54caed8f0c 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -33,6 +33,7 @@ DefinitionAlreadyExistException, DefinitionNotExistException, TagNotExistException, TagAlreadyExistException, BranchNotExistException, BranchAlreadyExistException, + ResourceNotExistException, ResourceAlreadyExistException, ) from pypaimon.catalog.database import Database from pypaimon.catalog.rest.property_change import PropertyChange @@ -493,6 +494,129 @@ def list_function_details_paged( except NoSuchResourceException as e: raise DatabaseNotExistException(database_name) from e + # Resource CRUD: mirrors Java RESTCatalog resource handlers. + def list_resources(self, database_name: str) -> List[str]: + try: + return self.rest_api.list_resources(database_name) + except NoSuchResourceException as e: + raise DatabaseNotExistException(database_name) from e + except ForbiddenException as e: + raise DatabaseNoPermissionException(database_name) from e + + def get_resource(self, identifier: Union[str, Identifier]) -> 'Resource': + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + response = self.rest_api.get_resource(identifier) + return self._to_resource(identifier, response) + except NoSuchResourceException as e: + raise ResourceNotExistException(identifier) from e + except ForbiddenException as e: + raise TableNoPermissionException(identifier) from e + + def create_resource(self, identifier: Union[str, Identifier], + resource: 'Resource', ignore_if_exists: bool = False) -> None: + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + RESTApi.check_function_name(identifier.get_object_name()) + try: + self.rest_api.create_resource( + identifier, + resource.comment(), + resource.uri(), + resource.resource_type(), + ) + except NoSuchResourceException as e: + raise DatabaseNotExistException(identifier.get_database_name()) from e + except AlreadyExistsException as e: + if ignore_if_exists: + return + raise ResourceAlreadyExistException(identifier) from e + + def drop_resource(self, identifier: Union[str, Identifier], + ignore_if_not_exists: bool = False) -> None: + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + RESTApi.check_function_name(identifier.get_object_name()) + try: + self.rest_api.drop_resource(identifier) + except NoSuchResourceException as e: + if ignore_if_not_exists: + return + raise ResourceNotExistException(identifier) from e + + def alter_resource(self, identifier: Union[str, Identifier], + changes: List['ResourceChange'], + ignore_if_not_exists: bool = False) -> None: + if not isinstance(identifier, Identifier): + identifier = Identifier.from_string(identifier) + try: + self.rest_api.alter_resource(identifier, changes) + except NoSuchResourceException as e: + if not ignore_if_not_exists: + raise ResourceNotExistException(identifier) from e + except ForbiddenException as e: + raise TableNoPermissionException(identifier) from e + except BadRequestException as e: + raise IllegalArgumentError(str(e)) from e + + def list_resources_paged( + self, + database_name: str, + max_results: Optional[int] = None, + page_token: Optional[str] = None, + resource_name_pattern: Optional[str] = None, + ) -> PagedList[str]: + try: + return self.rest_api.list_resources_paged( + database_name, max_results, page_token, resource_name_pattern) + except NoSuchResourceException as e: + raise DatabaseNotExistException(database_name) from e + + def list_resources_paged_globally( + self, + database_name_pattern: Optional[str] = None, + resource_name_pattern: Optional[str] = None, + max_results: Optional[int] = None, + page_token: Optional[str] = None, + ) -> PagedList[Identifier]: + result = self.rest_api.list_resources_paged_globally( + database_name_pattern, resource_name_pattern, max_results, page_token) + resources = result.elements if result.elements else [] + return PagedList(resources, result.next_page_token) + + def list_resource_details_paged( + self, + database_name: str, + max_results: Optional[int] = None, + page_token: Optional[str] = None, + resource_name_pattern: Optional[str] = None, + ) -> PagedList['Resource']: + try: + result = self.rest_api.list_resource_details_paged( + database_name, max_results, page_token, resource_name_pattern) + resources = [ + self._to_resource(Identifier.create(database_name, resp.name), resp) + for resp in result.elements + ] + return PagedList(resources, result.next_page_token) + except NoSuchResourceException as e: + raise DatabaseNotExistException(database_name) from e + + def _to_resource(self, identifier: Identifier, response) -> 'Resource': + from pypaimon.resource.resource import Resource + from pypaimon.resource.resource_type import ResourceType + uri = response.uri + return Resource.to_resource( + ResourceType.from_value(response.resource_type), + identifier, + response.comment, + uri, + response.size, + response.last_modified_time, + self.file_io_for_data(uri, identifier) if uri else None, + ) + # Tag CRUD: mirrors Java RESTCatalog tag handlers. def create_tag(self, identifier: Union[str, Identifier], tag_name: str, snapshot_id: Optional[int] = None, diff --git a/paimon-python/pypaimon/resource/__init__.py b/paimon-python/pypaimon/resource/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/paimon-python/pypaimon/resource/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/resource/resource.py b/paimon-python/pypaimon/resource/resource.py new file mode 100644 index 000000000000..2ff5f292e1a3 --- /dev/null +++ b/paimon-python/pypaimon/resource/resource.py @@ -0,0 +1,185 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +from pypaimon.common.identifier import Identifier +from pypaimon.resource.resource_type import ResourceType + + +class Resource: + """A resource provides basic abstraction for external resources managed by + Paimon, such as files, archives, JARs, and Python scripts. + + Mirrors Java ``org.apache.paimon.resource.Resource``. + """ + + def name(self) -> str: + """A name to identify this resource.""" + raise NotImplementedError + + def full_name(self) -> str: + """Full name of the resource, default is database.resourceName.""" + raise NotImplementedError + + def comment(self) -> Optional[str]: + """Optional comment describing this resource.""" + raise NotImplementedError + + def uri(self) -> str: + """The URI pointing to the location of this resource.""" + raise NotImplementedError + + def size(self) -> int: + """The size of this resource in bytes.""" + raise NotImplementedError + + def last_modified_time(self) -> int: + """The last modified time of this resource in milliseconds since epoch.""" + raise NotImplementedError + + def resource_type(self) -> ResourceType: + """The type of this resource.""" + raise NotImplementedError + + def to_bytes(self) -> bytes: + """Returns the contents of this resource as bytes.""" + raise NotImplementedError + + def new_input_stream(self): + """Opens a new input stream for this resource.""" + raise NotImplementedError + + @staticmethod + def to_resource( + resource_type: ResourceType, + identifier: Identifier, + comment: Optional[str], + uri: str, + size: int, + last_modified_time: int, + file_io=None, + ) -> "Resource": + """Creates a ``Resource`` instance based on the given ``ResourceType``.""" + name = identifier.get_object_name() + if resource_type == ResourceType.FILE: + return FileResource(identifier, comment, uri, size, last_modified_time, file_io) + elif resource_type == ResourceType.ARCHIVE: + return ArchiveResource(identifier, comment, uri, size, last_modified_time, file_io) + elif resource_type == ResourceType.JAR: + if not name.endswith(".jar"): + raise ValueError( + "JAR resource name must end with '.jar', but got: {}".format(name)) + return JarResource(identifier, comment, uri, size, last_modified_time, file_io) + elif resource_type == ResourceType.PY: + if not name.endswith(".py"): + raise ValueError( + "PY resource name must end with '.py', but got: {}".format(name)) + return PyResource(identifier, comment, uri, size, last_modified_time, file_io) + else: + raise ValueError("Unknown resource type: {}".format(resource_type)) + + +class AbstractResource(Resource): + """Abstract base implementation of ``Resource`` with common fields and accessors.""" + + def __init__( + self, + identifier: Identifier, + comment: Optional[str], + uri: str, + size: int, + last_modified_time: int, + file_io=None, + ): + self._identifier = identifier + self._comment = comment + self._uri = uri + self._size = size + self._last_modified_time = last_modified_time + self._file_io = file_io + + def name(self) -> str: + return self._identifier.get_object_name() + + def full_name(self) -> str: + return self._identifier.get_full_name() + + def identifier(self) -> Identifier: + return self._identifier + + def comment(self) -> Optional[str]: + return self._comment + + def uri(self) -> str: + return self._uri + + def size(self) -> int: + return self._size + + def last_modified_time(self) -> int: + return self._last_modified_time + + def to_bytes(self) -> bytes: + with self.new_input_stream() as stream: + return stream.read() + + def new_input_stream(self): + if self._file_io is None: + raise RuntimeError("FileIO is not available for resource: {}".format(self.full_name())) + return self._file_io.new_input_stream(self._uri) + + def __eq__(self, other): + if not isinstance(other, AbstractResource): + return False + return (self._size == other._size + and self._last_modified_time == other._last_modified_time + and self._identifier == other._identifier + and self._comment == other._comment + and self._uri == other._uri) + + def __hash__(self): + return hash((self._identifier, self._comment, self._uri, + self._size, self._last_modified_time)) + + +class FileResource(AbstractResource): + """A ``Resource`` implementation for general file resources.""" + + def resource_type(self) -> ResourceType: + return ResourceType.FILE + + +class ArchiveResource(AbstractResource): + """A ``Resource`` implementation for archive resources.""" + + def resource_type(self) -> ResourceType: + return ResourceType.ARCHIVE + + +class JarResource(AbstractResource): + """A ``Resource`` implementation for JAR resources.""" + + def resource_type(self) -> ResourceType: + return ResourceType.JAR + + +class PyResource(AbstractResource): + """A ``Resource`` implementation for Python resources.""" + + def resource_type(self) -> ResourceType: + return ResourceType.PY diff --git a/paimon-python/pypaimon/resource/resource_change.py b/paimon-python/pypaimon/resource/resource_change.py new file mode 100644 index 000000000000..af83f55f8ac9 --- /dev/null +++ b/paimon-python/pypaimon/resource/resource_change.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Dict, Optional + + +class Actions: + """Actions for resource change.""" + FIELD_TYPE = "action" + UPDATE_COMMENT_ACTION = "updateComment" + UPDATE_URI_ACTION = "updateUri" + + +class ResourceChange: + """Represents a change to a resource. + + Mirrors Java ``org.apache.paimon.resource.ResourceChange``. + """ + + def __init__(self, action: str): + self._action = action + + @staticmethod + def update_comment(comment: Optional[str]) -> "UpdateResourceComment": + return UpdateResourceComment(comment) + + @staticmethod + def update_uri(uri: str) -> "UpdateResourceUri": + return UpdateResourceUri(uri) + + def to_dict(self) -> Dict: + raise NotImplementedError + + @classmethod + def from_dict(cls, data: Dict) -> "ResourceChange": + action = data.get(Actions.FIELD_TYPE) + if action == Actions.UPDATE_COMMENT_ACTION: + return UpdateResourceComment(data.get("comment")) + elif action == Actions.UPDATE_URI_ACTION: + return UpdateResourceUri(data["uri"]) + else: + raise ValueError("Unknown resource change action: {}".format(action)) + + +class UpdateResourceComment(ResourceChange): + """Update comment for resource change.""" + + def __init__(self, comment: Optional[str]): + super().__init__(Actions.UPDATE_COMMENT_ACTION) + self.comment = comment + + def to_dict(self) -> Dict: + return {Actions.FIELD_TYPE: self._action, "comment": self.comment} + + def __eq__(self, other): + if not isinstance(other, UpdateResourceComment): + return False + return self.comment == other.comment + + def __hash__(self): + return hash(self.comment) + + +class UpdateResourceUri(ResourceChange): + """Update URI for resource change.""" + + def __init__(self, uri: str): + super().__init__(Actions.UPDATE_URI_ACTION) + self.uri = uri + + def to_dict(self) -> Dict: + return {Actions.FIELD_TYPE: self._action, "uri": self.uri} + + def __eq__(self, other): + if not isinstance(other, UpdateResourceUri): + return False + return self.uri == other.uri + + def __hash__(self): + return hash(self.uri) diff --git a/paimon-python/pypaimon/resource/resource_type.py b/paimon-python/pypaimon/resource/resource_type.py new file mode 100644 index 000000000000..ca201f2f7954 --- /dev/null +++ b/paimon-python/pypaimon/resource/resource_type.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from enum import Enum + + +class ResourceType(Enum): + """Enumeration of resource types supported by Paimon. + + Mirrors Java ``org.apache.paimon.resource.ResourceType``. + """ + + #: A general file resource. + FILE = "file" + + #: An archive resource (e.g., zip, tar). + ARCHIVE = "archive" + + #: A JAR resource. + JAR = "jar" + + #: A Python resource. + PY = "py" + + def get_value(self) -> str: + return self.value + + @staticmethod + def from_value(value: str) -> "ResourceType": + """Parse a string value to ``ResourceType``, case-insensitive.""" + if value is not None: + for resource_type in ResourceType: + if resource_type.value.lower() == value.lower(): + return resource_type + raise ValueError("Unknown resource type: {}".format(value)) + + def __str__(self) -> str: + return self.value diff --git a/paimon-python/pypaimon/tests/rest/rest_resource_test.py b/paimon-python/pypaimon/tests/rest/rest_resource_test.py new file mode 100644 index 000000000000..f427c51bed56 --- /dev/null +++ b/paimon-python/pypaimon/tests/rest/rest_resource_test.py @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import shutil +import tempfile +import unittest +import uuid + +from pypaimon.api.api_response import ConfigResponse +from pypaimon.api.auth import BearTokenAuthProvider +from pypaimon.catalog.catalog_exception import ( + ResourceNotExistException, + ResourceAlreadyExistException, +) +from pypaimon.catalog.catalog_context import CatalogContext +from pypaimon.catalog.rest.rest_catalog import RESTCatalog +from pypaimon.common.identifier import Identifier +from pypaimon.common.options import Options +from pypaimon.resource.resource import FileResource +from pypaimon.resource.resource_change import ResourceChange +from pypaimon.resource.resource_type import ResourceType +from pypaimon.tests.rest.rest_server import RESTCatalogServer + + +def _mock_resource(identifier: Identifier) -> FileResource: + return FileResource( + identifier=identifier, + comment="comment", + uri="/a/b/c.txt", + size=100, + last_modified_time=1, + file_io=None, + ) + + +class RESTResourceTest(unittest.TestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="resource_test_") + self.config = ConfigResponse(defaults={"prefix": "mock-test"}) + self.token = str(uuid.uuid4()) + self.server = RESTCatalogServer( + data_path=self.temp_dir, + auth_provider=BearTokenAuthProvider(self.token), + config=self.config, + warehouse="warehouse", + ) + self.server.start() + + options = Options({ + "metastore": "rest", + "uri": f"http://localhost:{self.server.port}", + "warehouse": "warehouse", + "token.provider": "bear", + "token": self.token, + }) + self.catalog = RESTCatalog(CatalogContext.create_from_options(options)) + + def tearDown(self): + self.server.shutdown() + import gc + gc.collect() + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_resource_type(self): + self.assertEqual(ResourceType.from_value("FILE"), ResourceType.FILE) + self.assertEqual(ResourceType.from_value("jar"), ResourceType.JAR) + self.assertEqual(ResourceType.PY.get_value(), "py") + with self.assertRaises(ValueError): + ResourceType.from_value("unknown") + + def test_resource(self): + self.catalog.create_database("rest_catalog_db", True) + + identifier = Identifier.from_string("rest_catalog_db.resource_na_me-01") + resource = _mock_resource(identifier) + + self.catalog.drop_resource(identifier, True) + self.catalog.create_resource(identifier, resource, True) + with self.assertRaises(ResourceAlreadyExistException): + self.catalog.create_resource(identifier, resource, False) + + self.assertIn(resource.name(), + self.catalog.list_resources(identifier.get_database_name())) + + get_resource = self.catalog.get_resource(identifier) + self.assertEqual(get_resource.name(), resource.name()) + self.assertEqual(get_resource.uri(), resource.uri()) + self.assertEqual(get_resource.comment(), resource.comment()) + self.assertEqual(get_resource.resource_type(), ResourceType.FILE) + + self.catalog.drop_resource(identifier, True) + self.assertNotIn(resource.name(), + self.catalog.list_resources(identifier.get_database_name())) + + with self.assertRaises(ResourceNotExistException): + self.catalog.drop_resource(identifier, False) + with self.assertRaises(ResourceNotExistException): + self.catalog.get_resource(identifier) + + def test_alter_resource(self): + identifier = Identifier.create("rest_catalog_db", "alter_resource_name") + self.catalog.create_database(identifier.get_database_name(), True) + self.catalog.drop_resource(identifier, True) + + with self.assertRaises(ResourceNotExistException): + self.catalog.alter_resource( + identifier, [ResourceChange.update_comment("c")], False) + + self.catalog.create_resource(identifier, _mock_resource(identifier), True) + + new_comment = "new comment" + self.catalog.alter_resource( + identifier, [ResourceChange.update_comment(new_comment)], False) + self.assertEqual(self.catalog.get_resource(identifier).comment(), new_comment) + + new_uri = "/x/y/z.txt" + self.catalog.alter_resource( + identifier, [ResourceChange.update_uri(new_uri)], False) + self.assertEqual(self.catalog.get_resource(identifier).uri(), new_uri) + + def test_list_resources(self): + db1 = "db_rest_catalog_db" + db2 = "db2_rest_catalog" + identifier = Identifier.create(db1, "list_resource") + identifier1 = Identifier.create(db1, "resource") + identifier2 = Identifier.create(db2, "list_resource") + identifier3 = Identifier.create(db2, "resource") + + self.catalog.create_database(db1, True) + self.catalog.create_database(db2, True) + self.catalog.create_resource(identifier, _mock_resource(identifier), True) + self.catalog.create_resource(identifier1, _mock_resource(identifier1), True) + self.catalog.create_resource(identifier2, _mock_resource(identifier2), True) + self.catalog.create_resource(identifier3, _mock_resource(identifier3), True) + + result = self.catalog.list_resources_paged(db1, None, None, None) + self.assertEqual( + set(result.elements), + {identifier.get_object_name(), identifier1.get_object_name()}, + ) + + result = self.catalog.list_resources_paged(db1, None, None, "res%") + self.assertEqual(result.elements, [identifier1.get_object_name()]) + + result = self.catalog.list_resources_paged_globally("db2_rest%", "res%", None, None) + self.assertEqual(len(result.elements), 1) + self.assertEqual(result.elements[0].get_full_name(), identifier3.get_full_name()) + + result = self.catalog.list_resource_details_paged(db2, 4, None, "res%") + self.assertEqual(len(result.elements), 1) + self.assertEqual(result.elements[0].full_name(), identifier3.get_full_name()) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py b/paimon-python/pypaimon/tests/rest/rest_server.py index 55cf2d6baaee..4c015459bebb 100755 --- a/paimon-python/pypaimon/tests/rest/rest_server.py +++ b/paimon-python/pypaimon/tests/rest/rest_server.py @@ -45,7 +45,11 @@ ListPartitionsResponse, ListTablesResponse, ListTagsResponse, PagedList, Partition, - RESTResponse, ErrorResponse) + RESTResponse, ErrorResponse, + GetResourceResponse, + ListResourcesResponse, + ListResourceDetailsResponse, + ListResourcesGloballyResponse) from pypaimon.api.resource_paths import ResourcePaths from pypaimon.api.rest_util import RESTUtil from pypaimon.catalog.catalog_exception import (BranchAlreadyExistException, @@ -60,7 +64,9 @@ DefinitionAlreadyExistException, DefinitionNotExistException, TagNotExistException, - TagAlreadyExistException) + TagAlreadyExistException, + ResourceNotExistException, + ResourceAlreadyExistException) from pypaimon.catalog.rest.table_metadata import TableMetadata from pypaimon.common.identifier import Identifier from pypaimon.api.typedef import RESTAuthParameter @@ -80,6 +86,7 @@ TABLE_TYPE = "tableType" VIEW_NAME_PATTERN = "viewNamePattern" FUNCTION_NAME_PATTERN = "functionNamePattern" +RESOURCE_NAME_PATTERN = "resourceNamePattern" PARTITION_NAME_PATTERN = "partitionNamePattern" TAG_NAME_PREFIX = "tagNamePrefix" MAX_RESULTS = "maxResults" @@ -215,6 +222,7 @@ def __init__(self, data_path: str, auth_provider, config: ConfigResponse, wareho self.table_latest_snapshot_store: Dict[str, str] = {} self.table_partitions_store: Dict[str, List] = {} self.function_store: Dict[str, Dict] = {} # key: "db.func_name", value: GetFunctionResponse-like dict + self.resource_store: Dict[str, GetResourceResponse] = {} # key: "db.resource_name" # Tag store: key = full table name, value = {tag_name: GetTagResponse}. self.tag_store: Dict[str, Dict[str, GetTagResponse]] = {} # Branch store: key = full table name, value = set of branch names. @@ -392,6 +400,10 @@ def _route_request(self, method: str, resource_path: str, parameters: Dict[str, if resource_path == self.resource_paths.functions() and method == "GET": return self._functions_globally_handle(parameters) + # Global resources endpoint (catalog-scoped) + if resource_path == self.resource_paths.resources() and method == "GET": + return self._resources_globally_handle(parameters) + database = resource_path.split("/")[4] # Database-specific endpoints if resource_path.startswith(self.resource_paths.database(database)): @@ -422,6 +434,10 @@ def _route_request(self, method: str, resource_path: str, parameters: Dict[str, return self._functions_handle(method, data, database_name, parameters) elif resource_type == ResourcePaths.FUNCTION_DETAILS: return self._function_details_handle(database_name, parameters) + elif resource_type == ResourcePaths.RESOURCES: + return self._resources_handle(method, data, database_name, parameters) + elif resource_type == ResourcePaths.RESOURCE_DETAILS: + return self._resource_details_handle(database_name, parameters) elif len(path_parts) >= 3: # Individual resource operations @@ -435,6 +451,8 @@ def _route_request(self, method: str, resource_path: str, parameters: Dict[str, return self._table_partitions_handle(method, identifier, parameters) elif resource_type == ResourcePaths.FUNCTIONS: return self._function_handle(method, data, identifier) + elif resource_type == ResourcePaths.RESOURCES: + return self._resource_handle(method, data, identifier) return self._mock_response(ErrorResponse(None, None, "Not Found", 404), 404) @@ -480,6 +498,16 @@ def _route_request(self, method: str, resource_path: str, parameters: Dict[str, ErrorResponse.RESOURCE_TYPE_FUNCTION, e.identifier.get_full_name(), str(e), 409 ) return self._mock_response(response, 409) + except ResourceNotExistException as e: + response = ErrorResponse( + "RESOURCE", e.identifier.get_object_name(), str(e), 404 + ) + return self._mock_response(response, 404) + except ResourceAlreadyExistException as e: + response = ErrorResponse( + "RESOURCE", e.identifier.get_full_name(), str(e), 409 + ) + return self._mock_response(response, 409) except TagNotExistException as e: response = ErrorResponse( ErrorResponse.RESOURCE_TYPE_TAG, e.tag, str(e), 404 @@ -753,6 +781,165 @@ def _generate_final_list_functions_globally_response(self, parameters: Dict[str, response = ListFunctionsGloballyResponse(functions=[], next_page_token=None) return self._mock_response(response, 200) + # ======================= Resource Handlers =============================== + + def _resources_handle(self, method: str, data: str, database_name: str, + parameters: Dict[str, str]) -> Tuple[str, int]: + """Handle database-scoped resource list / create.""" + if method == "GET": + resource_name_pattern = parameters.get(RESOURCE_NAME_PATTERN) + resources = [ + key.split(".", 1)[1] + for key in self.resource_store.keys() + if key.startswith(database_name + ".") + and (not resource_name_pattern + or self._match_name_pattern(key.split(".", 1)[1], resource_name_pattern)) + ] + return self._generate_final_list_resources_response(parameters, resources) + elif method == "POST": + import json as json_module + request_dict = json_module.loads(data) + resource_name = request_dict.get("name") + key = f"{database_name}.{resource_name}" + if key in self.resource_store: + identifier = Identifier.create(database_name, resource_name) + raise ResourceAlreadyExistException(identifier) + self.resource_store[key] = GetResourceResponse( + name=resource_name, + comment=request_dict.get("comment"), + uri=request_dict.get("uri"), + size=request_dict.get("size", 0), + last_modified_time=request_dict.get("lastModifiedTime", 0), + resource_type=request_dict.get("resourceType"), + owner="owner", + created_at=1, + created_by="owner", + updated_at=1, + updated_by="owner", + ) + return self._mock_response("", 200) + return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405) + + def _resource_handle(self, method: str, data: str, identifier: Identifier) -> Tuple[str, int]: + """Handle individual resource operations (GET, POST alter, DELETE).""" + key = identifier.get_full_name() + if method == "GET": + if key not in self.resource_store: + raise ResourceNotExistException(identifier) + return self._mock_response(self.resource_store[key], 200) + elif method == "POST": + if key not in self.resource_store: + raise ResourceNotExistException(identifier) + import json as json_module + request_dict = json_module.loads(data) + changes = request_dict.get("changes", []) + self._apply_resource_changes(identifier, changes) + return self._mock_response("", 200) + elif method == "DELETE": + if key not in self.resource_store: + raise ResourceNotExistException(identifier) + del self.resource_store[key] + return self._mock_response("", 200) + return self._mock_response(ErrorResponse(None, None, "Method Not Allowed", 405), 405) + + def _resource_details_handle(self, database_name: str, + parameters: Dict[str, str]) -> Tuple[str, int]: + """Handle resource details listing.""" + resource_name_pattern = parameters.get(RESOURCE_NAME_PATTERN) + details = [] + for key, resp in self.resource_store.items(): + if key.startswith(database_name + "."): + resource_name = key.split(".", 1)[1] + if not resource_name_pattern or self._match_name_pattern(resource_name, resource_name_pattern): + details.append(resp) + return self._generate_final_list_resource_details_response(parameters, details) + + def _resources_globally_handle(self, parameters: Dict[str, str]) -> Tuple[str, int]: + """Handle catalog-scoped resource listing.""" + database_name_pattern = parameters.get(DATABASE_NAME_PATTERN) + resource_name_pattern = parameters.get(RESOURCE_NAME_PATTERN) + identifiers = [] + for key in self.resource_store.keys(): + db_name, resource_name = key.split(".", 1) + if database_name_pattern and not self._match_name_pattern(db_name, database_name_pattern): + continue + if resource_name_pattern and not self._match_name_pattern(resource_name, resource_name_pattern): + continue + identifiers.append(Identifier.create(db_name, resource_name)) + return self._generate_final_list_resources_globally_response(parameters, identifiers) + + def _apply_resource_changes(self, identifier: Identifier, changes: List[Dict]) -> None: + """Apply resource changes to the resource store, mirroring Java mock server logic.""" + from pypaimon.resource.resource_change import Actions + key = identifier.get_full_name() + resource_resp = self.resource_store[key] + + comment = resource_resp.comment + uri = resource_resp.uri + + for change in changes: + action = change.get(Actions.FIELD_TYPE) + if action == Actions.UPDATE_COMMENT_ACTION: + comment = change.get("comment") + elif action == Actions.UPDATE_URI_ACTION: + uri = change.get("uri") + + self.resource_store[key] = GetResourceResponse( + name=resource_resp.name, + comment=comment, + uri=uri, + size=resource_resp.size, + last_modified_time=resource_resp.last_modified_time, + resource_type=resource_resp.resource_type, + owner=resource_resp.owner, + created_at=resource_resp.created_at, + created_by=resource_resp.created_by, + updated_at=resource_resp.updated_at, + updated_by=resource_resp.updated_by, + ) + + def _generate_final_list_resources_response(self, parameters: Dict[str, str], + resources: List[str]) -> Tuple[str, int]: + if resources: + max_results = self._get_max_results(parameters) + page_token = parameters.get(PAGE_TOKEN) + paged = self._build_paged_entities(resources, max_results, page_token) + response = ListResourcesResponse( + resources=paged.elements, + next_page_token=paged.next_page_token + ) + else: + response = ListResourcesResponse(resources=[], next_page_token=None) + return self._mock_response(response, 200) + + def _generate_final_list_resource_details_response(self, parameters: Dict[str, str], + details: List) -> Tuple[str, int]: + if details: + max_results = self._get_max_results(parameters) + page_token = parameters.get(PAGE_TOKEN) + paged = self._build_paged_entities(details, max_results, page_token) + response = ListResourceDetailsResponse( + resources=paged.elements, + next_page_token=paged.next_page_token, + ) + else: + response = ListResourceDetailsResponse(resources=[], next_page_token=None) + return self._mock_response(response, 200) + + def _generate_final_list_resources_globally_response(self, parameters: Dict[str, str], + identifiers: List) -> Tuple[str, int]: + if identifiers: + max_results = self._get_max_results(parameters) + page_token = parameters.get(PAGE_TOKEN) + paged = self._build_paged_entities(identifiers, max_results, page_token) + response = ListResourcesGloballyResponse( + resources=paged.elements, + next_page_token=paged.next_page_token, + ) + else: + response = ListResourcesGloballyResponse(resources=[], next_page_token=None) + return self._mock_response(response, 200) + def _table_partitions_handle( self, method: str, identifier: Identifier, parameters: Dict[str, str]) -> Tuple[str, int]: """Handle table partitions listing"""