Skip to content

Commit

Permalink
Load HDFS file system in separate class loader
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Nov 14, 2023
1 parent 0e5d54e commit e71bbdc
Show file tree
Hide file tree
Showing 27 changed files with 667 additions and 134 deletions.
6 changes: 6 additions & 0 deletions client/trino-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive-hadoop2</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/trino-server-rpm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@
<destination>/usr/lib/trino/plugin</destination>
<base>${server.tar.package}/plugin</base>
<includes>
<include>*/*</include>
<include>*/**</include>
</includes>
</rule>

Expand Down
12 changes: 12 additions & 0 deletions core/trino-server/src/main/provisio/trino.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
<artifact id="${project.groupId}:trino-delta-lake:zip:${project.version}">
<unpack />
</artifact>
<artifact id="${project.groupId}:trino-hdfs:zip:${project.version}">
<unpack useRoot="true" />
</artifact>
</artifactSet>

<artifactSet to="plugin/druid">
Expand Down Expand Up @@ -114,6 +117,9 @@
<artifact id="${project.groupId}:trino-hive-hadoop2:zip:${project.version}">
<unpack />
</artifact>
<artifact id="${project.groupId}:trino-hdfs:zip:${project.version}">
<unpack useRoot="true" />
</artifact>
</artifactSet>

<artifactSet to="plugin/http-event-listener">
Expand All @@ -126,12 +132,18 @@
<artifact id="${project.groupId}:trino-hudi:zip:${project.version}">
<unpack />
</artifact>
<artifact id="${project.groupId}:trino-hdfs:zip:${project.version}">
<unpack useRoot="true" />
</artifact>
</artifactSet>

<artifactSet to="plugin/iceberg">
<artifact id="${project.groupId}:trino-iceberg:zip:${project.version}">
<unpack />
</artifact>
<artifact id="${project.groupId}:trino-hdfs:zip:${project.version}">
<unpack useRoot="true" />
</artifact>
</artifactSet>

<artifactSet to="plugin/ignite">
Expand Down
11 changes: 8 additions & 3 deletions lib/trino-filesystem-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
Expand Down Expand Up @@ -59,12 +64,12 @@

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
<artifactId>trino-spi</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,62 @@
package io.trino.filesystem.manager;

import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemFactory;
import io.trino.filesystem.azure.AzureFileSystemModule;
import io.trino.filesystem.gcs.GcsFileSystemFactory;
import io.trino.filesystem.gcs.GcsFileSystemModule;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.filesystem.hdfs.HdfsFileSystemModule;
import io.trino.filesystem.s3.S3FileSystemFactory;
import io.trino.filesystem.s3.S3FileSystemModule;
import io.trino.filesystem.tracing.TracingFileSystemFactory;
import io.trino.hdfs.HdfsModule;
import io.trino.hdfs.authentication.HdfsAuthenticationModule;
import io.trino.hdfs.azure.HiveAzureModule;
import io.trino.hdfs.cos.HiveCosModule;
import io.trino.hdfs.gcs.HiveGcsModule;
import io.trino.hdfs.rubix.RubixEnabledConfig;
import io.trino.hdfs.rubix.RubixModule;
import io.trino.hdfs.s3.HiveS3Module;
import io.trino.spi.NodeManager;

import java.util.Map;
import java.util.Optional;

import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.MapBinder.newMapBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static java.util.Objects.requireNonNull;

public class FileSystemModule
extends AbstractConfigurationAwareModule
{
private final String catalogName;
private final NodeManager nodeManager;
private final OpenTelemetry openTelemetry;

public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
}

@Override
protected void setup(Binder binder)
{
FileSystemConfig config = buildConfigObject(FileSystemConfig.class);

binder.bind(HdfsFileSystemFactoryHolder.class).in(SINGLETON);
newOptionalBinder(binder, HdfsFileSystemLoader.class);

if (config.isHadoopEnabled()) {
install(new HdfsFileSystemModule());
install(new HdfsModule());
install(new HdfsAuthenticationModule());
install(conditionalModule(RubixEnabledConfig.class, RubixEnabledConfig::isCacheEnabled, new RubixModule()));
install(new HiveCosModule());
install(new HiveGcsModule());
HdfsFileSystemLoader loader = new HdfsFileSystemLoader(
getProperties(),
!config.isNativeAzureEnabled(),
!config.isNativeGcsEnabled(),
!config.isNativeS3Enabled(),
catalogName,
nodeManager,
openTelemetry);

loader.configure().forEach(this::consumeProperty);
binder.bind(HdfsFileSystemLoader.class).toInstance(loader);
}

var factories = newMapBinder(binder, String.class, TrinoFileSystemFactory.class);
Expand All @@ -71,48 +79,32 @@ protected void setup(Binder binder)
factories.addBinding("abfs").to(AzureFileSystemFactory.class);
factories.addBinding("abfss").to(AzureFileSystemFactory.class);
}
else if (config.isHadoopEnabled()) {
install(new HiveAzureModule());
}

if (config.isNativeS3Enabled()) {
install(new S3FileSystemModule());
factories.addBinding("s3").to(S3FileSystemFactory.class);
factories.addBinding("s3a").to(S3FileSystemFactory.class);
factories.addBinding("s3n").to(S3FileSystemFactory.class);
}
else if (config.isHadoopEnabled()) {
install(new HiveS3Module());
}

if (config.isNativeGcsEnabled()) {
install(new GcsFileSystemModule());
factories.addBinding("gs").to(GcsFileSystemFactory.class);
}
else {
install(new HiveGcsModule());
}
}

@Provides
@Singleton
public TrinoFileSystemFactory createFileSystemFactory(
HdfsFileSystemFactoryHolder hdfsFileSystemFactory,
Optional<HdfsFileSystemLoader> hdfsFileSystemLoader,
LifeCycleManager lifeCycleManager,
Map<String, TrinoFileSystemFactory> factories,
Tracer tracer)
{
TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(hdfsFileSystemFactory.value(), factories);
return new TracingFileSystemFactory(tracer, delegate);
}
Optional<TrinoFileSystemFactory> hdfsFactory = hdfsFileSystemLoader.map(HdfsFileSystemLoader::create);
hdfsFactory.ifPresent(lifeCycleManager::addInstance);

public static class HdfsFileSystemFactoryHolder
{
@Inject(optional = true)
private HdfsFileSystemFactory hdfsFileSystemFactory;

public Optional<TrinoFileSystemFactory> value()
{
return Optional.ofNullable(hdfsFileSystemFactory);
}
TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(hdfsFactory, factories);
return new TracingFileSystemFactory(tracer, delegate);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.manager;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Enumeration;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;

// based on io.trino.server.PluginClassLoader
final class HdfsClassLoader
extends URLClassLoader
{
public HdfsClassLoader(List<URL> urls)
{
// This class loader should not have access to the system (application) class loader
super(urls.toArray(URL[]::new), getPlatformClassLoader());
}

@Override
protected Class<?> loadClass(String name, boolean resolve)
throws ClassNotFoundException
{
synchronized (getClassLoadingLock(name)) {
// Check if class is in the loaded classes cache
Class<?> cachedClass = findLoadedClass(name);
if (cachedClass != null) {
return resolveClass(cachedClass, resolve);
}

// If this is an override class, only check override class loader
if (isOverrideClass(name)) {
return resolveClass(overrideClassLoader().loadClass(name), resolve);
}

// Look for class locally
return super.loadClass(name, resolve);
}
}

private Class<?> resolveClass(Class<?> clazz, boolean resolve)
{
if (resolve) {
resolveClass(clazz);
}
return clazz;
}

@Override
public URL getResource(String name)
{
// If this is an override resource, only check override class loader
if (isOverrideResource(name)) {
return overrideClassLoader().getResource(name);
}

// Look for resource locally
return super.getResource(name);
}

@Override
public Enumeration<URL> getResources(String name)
throws IOException
{
// If this is an override resource, use override resources
if (isOverrideResource(name)) {
return overrideClassLoader().getResources(name);
}

// Use local resources
return super.getResources(name);
}

private ClassLoader overrideClassLoader()
{
return getClass().getClassLoader();
}

private static boolean isOverrideResource(String name)
{
return isOverrideClass(name.replace('.', '/'));
}

private static boolean isOverrideClass(String name)
{
// SPI packages from io.trino.server.PluginManager and dependencies of trino-filesystem
return hasPackage(name, "io.trino.spi.") ||
hasPackage(name, "com.fasterxml.jackson.annotation.") ||
hasPackage(name, "io.airlift.slice.") ||
hasPackage(name, "org.openjdk.jol.") ||
hasPackage(name, "io.opentelemetry.api.") ||
hasPackage(name, "io.opentelemetry.context.") ||
hasPackage(name, "com.google.common.") ||
hasExactPackage(name, "io.trino.memory.context.") ||
hasExactPackage(name, "io.trino.filesystem.");
}

private static boolean hasPackage(String name, String packageName)
{
checkArgument(!packageName.isEmpty() && packageName.charAt(packageName.length() - 1) == '.');
return name.startsWith(packageName);
}

private static boolean hasExactPackage(String name, String packageName)
{
checkArgument(!packageName.isEmpty() && packageName.charAt(packageName.length() - 1) == '.');
return name.startsWith(packageName) && (name.lastIndexOf('.') == (packageName.length() - 1));
}
}
Loading

0 comments on commit e71bbdc

Please sign in to comment.