diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index f66b5242d888..ba18738c0d96 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -50,6 +50,7 @@ public void setup(Binder binder) install(new JdbcDiagnosticModule()); install(new IdentifierMappingModule()); install(new RemoteQueryModifierModule()); + install(new RetryingConnectionFactoryModule()); newOptionalBinder(binder, ConnectorAccessControl.class); newOptionalBinder(binder, QueryBuilder.class).setDefault().to(DefaultQueryBuilder.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/LazyConnectionFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/LazyConnectionFactory.java index 9db72ad77fa5..284cc1ef8d01 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/LazyConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/LazyConnectionFactory.java @@ -16,7 +16,6 @@ import com.google.errorprone.annotations.ThreadSafe; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; -import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; import io.trino.spi.connector.ConnectorSession; import jakarta.annotation.Nullable; @@ -30,10 +29,10 @@ public final class LazyConnectionFactory implements ConnectionFactory { - private final StatisticsAwareConnectionFactory delegate; + private final ConnectionFactory delegate; @Inject - public LazyConnectionFactory(StatisticsAwareConnectionFactory delegate) + public LazyConnectionFactory(RetryingConnectionFactory delegate) { this.delegate = requireNonNull(delegate, "delegate is null"); } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java index 38cb7bdf1b4a..0993cdb26e59 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java @@ -24,7 +24,7 @@ import java.sql.Connection; import java.sql.SQLException; -import java.sql.SQLRecoverableException; +import java.sql.SQLTransientException; import static java.time.temporal.ChronoUnit.MILLIS; import static java.time.temporal.ChronoUnit.SECONDS; @@ -33,20 +33,22 @@ public class RetryingConnectionFactory implements ConnectionFactory { - private static final RetryPolicy RETRY_POLICY = RetryPolicy.builder() - .withMaxDuration(java.time.Duration.of(30, SECONDS)) - .withMaxAttempts(5) - .withBackoff(50, 5_000, MILLIS, 4) - .handleIf(RetryingConnectionFactory::isSqlRecoverableException) - .abortOn(TrinoException.class) - .build(); + private final RetryPolicy retryPolicy; - private final StatisticsAwareConnectionFactory delegate; + private final ConnectionFactory delegate; @Inject - public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate) + public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate, RetryStrategy retryStrategy) { + requireNonNull(retryStrategy); this.delegate = requireNonNull(delegate, "delegate is null"); + this.retryPolicy = RetryPolicy.builder() + .withMaxDuration(java.time.Duration.of(30, SECONDS)) + .withMaxAttempts(5) + .withBackoff(50, 5_000, MILLIS, 4) + .handleIf(retryStrategy::isExceptionRecoverable) + .abortOn(TrinoException.class) + .build(); } @Override @@ -54,7 +56,7 @@ public Connection openConnection(ConnectorSession session) throws SQLException { try { - return Failsafe.with(RETRY_POLICY) + return Failsafe.with(retryPolicy) .get(() -> delegate.openConnection(session)); } catch (FailsafeException ex) { @@ -72,9 +74,19 @@ public void close() delegate.close(); } - private static boolean isSqlRecoverableException(Throwable exception) + public interface RetryStrategy { - return Throwables.getCausalChain(exception).stream() - .anyMatch(SQLRecoverableException.class::isInstance); + boolean isExceptionRecoverable(Throwable exception); + } + + public static class DefaultRetryStrategy + implements RetryStrategy + { + @Override + public boolean isExceptionRecoverable(Throwable exception) + { + return Throwables.getCausalChain(exception).stream() + .anyMatch(SQLTransientException.class::isInstance); + } } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java new file mode 100644 index 000000000000..a0815d38e84c --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.inject.AbstractModule; +import com.google.inject.Scopes; +import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy; +import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; + +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; + +public class RetryingConnectionFactoryModule + extends AbstractModule +{ + @Override + public void configure() + { + bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); + newOptionalBinder(binder(), RetryStrategy.class) + .setDefault() + .to(DefaultRetryStrategy.class) + .in(Scopes.SINGLETON); + } +} diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java index a6b41a1f9f07..cd0fb3dd1d6e 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestLazyConnectionFactory.java @@ -32,10 +32,13 @@ public class TestLazyConnectionFactory public void testNoConnectionIsCreated() throws Exception { - Injector injector = Guice.createInjector(binder -> binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance( - session -> { - throw new AssertionError("Expected no connection creation"); - })); + Injector injector = Guice.createInjector(binder -> { + binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance( + session -> { + throw new AssertionError("Expected no connection creation"); + }); + binder.install(new RetryingConnectionFactoryModule()); + }); try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class); Connection ignored = lazyConnectionFactory.openConnection(SESSION)) { @@ -50,8 +53,11 @@ public void testConnectionCannotBeReusedAfterClose() BaseJdbcConfig config = new BaseJdbcConfig() .setConnectionUrl(format("jdbc:h2:mem:test%s;DB_CLOSE_DELAY=-1", System.nanoTime() + ThreadLocalRandom.current().nextLong())); - Injector injector = Guice.createInjector(binder -> binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance( - new DriverConnectionFactory(new Driver(), config, new EmptyCredentialProvider()))); + Injector injector = Guice.createInjector(binder -> { + binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance( + new DriverConnectionFactory(new Driver(), config, new EmptyCredentialProvider())); + binder.install(new RetryingConnectionFactoryModule()); + }); try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class)) { Connection connection = lazyConnectionFactory.openConnection(SESSION); diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java index eda556b7be9d..d85c1c5d1ef8 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java @@ -13,11 +13,13 @@ */ package io.trino.plugin.jdbc; +import com.google.common.base.Throwables; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Scopes; +import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; import io.trino.spi.StandardErrorCode; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -26,17 +28,20 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.SQLRecoverableException; +import java.sql.SQLTransientException; import java.util.ArrayDeque; import java.util.Deque; import java.util.stream.Stream; import static com.google.common.reflect.Reflection.newProxy; +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.RETURN; import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_NPE; import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_SQL_EXCEPTION; import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_SQL_RECOVERABLE_EXCEPTION; +import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_SQL_TRANSIENT_EXCEPTION; import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_TRINO_EXCEPTION; -import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_WRAPPED_SQL_RECOVERABLE_EXCEPTION; +import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_WRAPPED_SQL_TRANSIENT_EXCEPTION; import static io.trino.spi.block.TestingSession.SESSION; import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; import static java.util.Objects.requireNonNull; @@ -68,7 +73,7 @@ public void testSimplyReturnConnection() @Test public void testRetryAndStopOnTrinoException() { - Injector injector = createInjector(THROW_SQL_RECOVERABLE_EXCEPTION, THROW_TRINO_EXCEPTION); + Injector injector = createInjector(THROW_SQL_TRANSIENT_EXCEPTION, THROW_TRINO_EXCEPTION); ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class); MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class); @@ -82,7 +87,7 @@ public void testRetryAndStopOnTrinoException() @Test public void testRetryAndStopOnSqlException() { - Injector injector = createInjector(THROW_SQL_RECOVERABLE_EXCEPTION, THROW_SQL_EXCEPTION); + Injector injector = createInjector(THROW_SQL_TRANSIENT_EXCEPTION, THROW_SQL_EXCEPTION); ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class); MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class); @@ -111,7 +116,7 @@ public void testNullPointerException() public void testRetryAndReturn() throws Exception { - Injector injector = createInjector(THROW_SQL_RECOVERABLE_EXCEPTION, RETURN); + Injector injector = createInjector(THROW_SQL_TRANSIENT_EXCEPTION, RETURN); ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class); MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class); @@ -125,7 +130,21 @@ public void testRetryAndReturn() public void testRetryOnWrappedAndReturn() throws Exception { - Injector injector = createInjector(THROW_WRAPPED_SQL_RECOVERABLE_EXCEPTION, RETURN); + Injector injector = createInjector(THROW_WRAPPED_SQL_TRANSIENT_EXCEPTION, RETURN); + ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class); + MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class); + + Connection connection = factory.openConnection(SESSION); + + assertThat(connection).isNotNull(); + assertThat(mock.getCallCount()).isEqualTo(2); + } + + @Test + public void testOverridingRetryStrategyWorks() + throws Exception + { + Injector injector = createInjectorWithOverridenStrategy(THROW_SQL_RECOVERABLE_EXCEPTION, RETURN); ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class); MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class); @@ -141,9 +160,32 @@ private static Injector createInjector(MockConnectorFactory.Action... actions) binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); + binder.install(new RetryingConnectionFactoryModule()); + }); + } + + private static Injector createInjectorWithOverridenStrategy(MockConnectorFactory.Action... actions) + { + return Guice.createInjector(binder -> { + binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); + binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); + binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); + binder.install(new RetryingConnectionFactoryModule()); + newOptionalBinder(binder, RetryStrategy.class).setBinding().to(OverrideRetryStrategy.class).in(Scopes.SINGLETON); }); } + private static class OverrideRetryStrategy + implements RetryStrategy + { + @Override + public boolean isExceptionRecoverable(Throwable exception) + { + return Throwables.getCausalChain(exception).stream() + .anyMatch(SQLRecoverableException.class::isInstance); + } + } + public static class MockConnectorFactory implements ConnectionFactory { @@ -181,6 +223,10 @@ public Connection openConnection(ConnectorSession session) throw new SQLRecoverableException("Testing sql recoverable exception"); case THROW_WRAPPED_SQL_RECOVERABLE_EXCEPTION: throw new RuntimeException(new SQLRecoverableException("Testing sql recoverable exception")); + case THROW_SQL_TRANSIENT_EXCEPTION: + throw new SQLTransientException("Testing sql transient exception"); + case THROW_WRAPPED_SQL_TRANSIENT_EXCEPTION: + throw new RuntimeException(new SQLTransientException("Testing sql transient exception")); } throw new IllegalStateException("Unsupported action:" + action); } @@ -191,6 +237,8 @@ public enum Action THROW_SQL_EXCEPTION, THROW_SQL_RECOVERABLE_EXCEPTION, THROW_WRAPPED_SQL_RECOVERABLE_EXCEPTION, + THROW_SQL_TRANSIENT_EXCEPTION, + THROW_WRAPPED_SQL_TRANSIENT_EXCEPTION, THROW_NPE, RETURN, } diff --git a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java index 891844ed1d51..d3c2f545861c 100644 --- a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java +++ b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.oracle; +import com.google.common.base.Throwables; import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Module; @@ -26,15 +27,15 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; -import io.trino.plugin.jdbc.RetryingConnectionFactory; +import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; import io.trino.plugin.jdbc.credential.CredentialProvider; -import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; import io.trino.plugin.jdbc.ptf.Query; import io.trino.spi.function.table.ConnectorTableFunction; import oracle.jdbc.OracleConnection; import oracle.jdbc.OracleDriver; import java.sql.SQLException; +import java.sql.SQLRecoverableException; import java.util.Properties; import static com.google.inject.multibindings.Multibinder.newSetBinder; @@ -54,6 +55,7 @@ public void configure(Binder binder) configBinder(binder).bindConfig(OracleConfig.class); newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class)).setBinding().toInstance(ORACLE_MAX_LIST_EXPRESSIONS); newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, RetryStrategy.class).setBinding().to(OracleRetryStrategy.class).in(Scopes.SINGLETON); } @Provides @@ -77,11 +79,22 @@ public static ConnectionFactory connectionFactory(BaseJdbcConfig config, Credent openTelemetry); } - return new RetryingConnectionFactory(new StatisticsAwareConnectionFactory(new DriverConnectionFactory( + return new DriverConnectionFactory( new OracleDriver(), config.getConnectionUrl(), connectionProperties, credentialProvider, - openTelemetry))); + openTelemetry); + } + + private static class OracleRetryStrategy + implements RetryStrategy + { + @Override + public boolean isExceptionRecoverable(Throwable exception) + { + return Throwables.getCausalChain(exception).stream() + .anyMatch(SQLRecoverableException.class::isInstance); + } } } diff --git a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java index 976d80e5e83e..6bf8e1ed04d9 100644 --- a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java +++ b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java @@ -22,6 +22,7 @@ import io.trino.plugin.jdbc.ConnectionFactory; import io.trino.plugin.jdbc.DriverConnectionFactory; import io.trino.plugin.jdbc.RetryingConnectionFactory; +import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy; import io.trino.plugin.jdbc.credential.StaticCredentialProvider; import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; import io.trino.testing.ResourcePresence; @@ -130,7 +131,7 @@ private ConnectionFactory getConnectionFactory(String connectionUrl, String user new OracleDriver(), new BaseJdbcConfig().setConnectionUrl(connectionUrl), StaticCredentialProvider.of(username, password))); - return new RetryingConnectionFactory(connectionFactory); + return new RetryingConnectionFactory(connectionFactory, new DefaultRetryStrategy()); } @Override diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java index 3a617b6ede59..c18bdc26d041 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java @@ -47,6 +47,7 @@ import io.trino.plugin.jdbc.LazyConnectionFactory; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; import io.trino.plugin.jdbc.QueryBuilder; +import io.trino.plugin.jdbc.RetryingConnectionFactoryModule; import io.trino.plugin.jdbc.ReusableConnectionFactoryModule; import io.trino.plugin.jdbc.StatsCollecting; import io.trino.plugin.jdbc.TypeHandlingJdbcConfig; @@ -96,6 +97,7 @@ public PhoenixClientModule(String catalogName) protected void setup(Binder binder) { install(new RemoteQueryModifierModule()); + install(new RetryingConnectionFactoryModule()); binder.bind(ConnectorSplitManager.class).annotatedWith(ForJdbcDynamicFiltering.class).to(PhoenixSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);