Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6133] Improvement(core): Supports get Fileset schema location in the AuthorizationUtils #6211

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public class Constants {

public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local";
public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs";
public static final String LOCATION = "location";
Abyss-lord marked this conversation as resolved.
Show resolved Hide resolved
public static final String SLASH = "/";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
import org.apache.gravitino.GravitinoEnv;
Expand All @@ -39,8 +41,10 @@
import org.apache.gravitino.Schema;
import org.apache.gravitino.catalog.CatalogManager;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.catalog.hadoop.Constants;
import org.apache.gravitino.catalog.hive.HiveConstants;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
import org.apache.gravitino.dto.authorization.PrivilegeDTO;
import org.apache.gravitino.dto.util.DTOConverters;
Expand All @@ -51,10 +55,12 @@
import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
import org.apache.gravitino.exceptions.NoSuchUserException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.meta.RoleEntity;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.utils.MetadataObjectUtil;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -465,30 +471,72 @@ public static List<String> getMetadataObjectLocation(
}
break;
case SCHEMA:
{
Catalog catalogObj =
GravitinoEnv.getInstance()
.catalogDispatcher()
.loadCatalog(
NameIdentifier.of(ident.namespace().level(0), ident.namespace().level(1)));
LOG.info("Catalog provider is %s", catalogObj.provider());
if (catalogObj.provider().equals("hive")) {
Schema schema = GravitinoEnv.getInstance().schemaDispatcher().loadSchema(ident);
if (schema.properties().containsKey(HiveConstants.LOCATION)) {
Catalog catalogObj =
GravitinoEnv.getInstance()
.catalogDispatcher()
.loadCatalog(
NameIdentifier.of(ident.namespace().level(0), ident.namespace().level(1)));
Schema schema = GravitinoEnv.getInstance().schemaDispatcher().loadSchema(ident);

switch (catalogObj.type()) {
case RELATIONAL:
LOG.info("Catalog provider is {}", catalogObj.provider());
if ("hive".equals(catalogObj.provider())
&& schema.properties().containsKey(HiveConstants.LOCATION)) {
String schemaLocation = schema.properties().get(HiveConstants.LOCATION);
if (schemaLocation != null && schemaLocation.isEmpty()) {
if (StringUtils.isNotBlank(schemaLocation)) {
locations.add(schemaLocation);
} else {
LOG.warn("Schema %s location is not found", ident);
LOG.warn("Schema {} location is not found", ident);
}
}
}
// TODO: [#6133] Supports get Fileset schema location in the AuthorizationUtils
break;

case FILESET:
if (catalogObj instanceof HasPropertyMetadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to judge HasPropertyMetdatadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerqi Is the fileset catalog guaranteed to implement the HasPropertyMetadata interface?

Copy link
Contributor

@jerqi jerqi Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the fileset catalogs have the property location although they implement the HasPropertyMetdata interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current processing logic is

  1. If the provider is Hadoop, use HasPropertyMetadata to get the location of the Schema.
  2. In other cases, consider adding paths to all filesets below the Schema.
    WDYT, @jerqi

Copy link
Contributor

@jerqi jerqi Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 seems weird for me. Is this consistent with Hive schema implement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 seems weird for me. Is this consistent with Hive schema implement.

If it's not a Hadoop fileset, can we only get the path from the fileset.

HasPropertyMetadata catalogObjWithProperties = (HasPropertyMetadata) catalogObj;
Map<String, String> properties = schema.properties();
String schemaLocation =
(String)
catalogObjWithProperties
.schemaPropertiesMetadata()
.getOrDefault(properties, Constants.LOCATION);
Abyss-lord marked this conversation as resolved.
Show resolved Hide resolved
if (StringUtils.isNotBlank(schemaLocation)) {
locations.add(normalizeFilesetLocation(schemaLocation));
}
} else {
FilesetCatalog filesetCatalog = catalogObj.asFilesetCatalog();
String catalogObjLocation = catalogObj.properties().get(Constants.LOCATION);
Namespace namespace = NamespaceUtil.toFileset(ident);
NameIdentifier[] nameIdentifiers = filesetCatalog.listFilesets(namespace);
if (nameIdentifiers.length == 0) {
LOG.warn(
Abyss-lord marked this conversation as resolved.
Show resolved Hide resolved
"{} is empty, use catalog location {} as schema location.",
ident.toString(),
catalogObjLocation);
locations.add(catalogObjLocation);
} else {
Arrays.stream(nameIdentifiers)
.forEach(
nameIdentifier -> {
Fileset fileset = filesetCatalog.loadFileset(nameIdentifier);
String filesetLocation = fileset.storageLocation();
if (!StringUtils.isNotBlank(filesetLocation))
locations.add(filesetLocation);
});
}
}
break;

default:
LOG.warn("Unsupported catalog type {}", catalogObj.type());
break;
}
break;

case TABLE:
{
Catalog catalogObj =
catalogObj =
GravitinoEnv.getInstance()
.catalogDispatcher()
.loadCatalog(
Expand All @@ -497,7 +545,7 @@ public static List<String> getMetadataObjectLocation(
Table table = GravitinoEnv.getInstance().tableDispatcher().loadTable(ident);
if (table.properties().containsKey(HiveConstants.LOCATION)) {
String tableLocation = table.properties().get(HiveConstants.LOCATION);
if (tableLocation != null && tableLocation.isEmpty()) {
if (StringUtils.isNotBlank(tableLocation)) {
locations.add(tableLocation);
} else {
LOG.warn("Table %s location is not found", ident);
Expand Down Expand Up @@ -531,4 +579,17 @@ private static NameIdentifier getObjectNameIdentifier(
String metalake, MetadataObject metadataObject) {
return NameIdentifier.parse(String.format("%s.%s", metalake, metadataObject.fullName()));
}

/**
* Normalize the fileset location to end with a slash.
*
* @param location the hadoop schema location
* @return if the location ends with a slash, return the location; otherwise, return the location
* with a slash at the end.
*/
public static String normalizeFilesetLocation(String location) {
Preconditions.checkArgument(StringUtils.isNotBlank(location), "Location is blank");
location = location.endsWith(Constants.SLASH) ? location : location + Constants.SLASH;
return location;
}
}
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.gravitino.utils;

import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import org.apache.gravitino.Entity;
Expand Down Expand Up @@ -107,6 +108,17 @@ public static Namespace ofFileset(String metalake, String catalog, String schema
return Namespace.of(metalake, catalog, schema);
}

/**
* Create a namespace for fileset from a schema name identifier.
*
* @param ident The schema name identifier.
* @return A namespace for fileset.
*/
public static Namespace toFileset(NameIdentifier ident) {
Abyss-lord marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(ident.namespace().length() == 2, "Invalid namespace length");
return ofFileset(ident.namespace().level(0), ident.namespace().level(1), ident.name());
}

/**
* Create a namespace for topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,33 @@
*/
package org.apache.gravitino.utils;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.IllegalNamespaceException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestNamespaceUtil {
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
private final PrintStream originalOut = System.out;
private final PrintStream originalErr = System.err;

@BeforeEach
void setUp() {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
}

@AfterEach
public void restoreStreams() {
System.setOut(originalOut);
System.setErr(originalErr);
}

@Test
public void testCheckNamespace() {
Expand Down Expand Up @@ -82,4 +103,23 @@ public void testCheckNamespace() {
Assertions.assertTrue(
excep5.getMessage().contains("Model version namespace must be non-null and have 4 levels"));
}

@Test
void testToFileset() {
NameIdentifier ident = NameIdentifier.of("metalake_demo", "catalog", "schema");
Namespace filesetNamespace = NamespaceUtil.toFileset(ident);
Assertions.assertEquals(3, filesetNamespace.levels().length);
Assertions.assertEquals("metalake_demo", filesetNamespace.level(0));
Assertions.assertEquals("catalog", filesetNamespace.level(1));
Assertions.assertEquals("schema", filesetNamespace.level(2));
}

@Test
void testToFilesetWithIncorrectLevel() {
NameIdentifier ident1 = NameIdentifier.of("metalake_demo", "catalog", "schema", "table");
Assertions.assertThrows(IllegalArgumentException.class, () -> NamespaceUtil.toFileset(ident1));

NameIdentifier ident2 = NameIdentifier.of("metalake_demo", "catalog");
Assertions.assertThrows(IllegalArgumentException.class, () -> NamespaceUtil.toFileset(ident2));
}
}
Loading