diff --git a/pom.xml b/pom.xml
index c7e5e061f..6cd3d5448 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,17 +81,6 @@
Sonatype Nexus Snapshots
https://s01.oss.sonatype.org/content/repositories/snapshots/
-
- apache.snapshots
- Apache Development Snapshot Repository
- https://repository.apache.org/content/repositories/snapshots/
-
- true
-
-
- true
-
-
@@ -171,7 +160,7 @@
2.5.21
3.21.5
4.1.82.Final
- 1.19-SNAPSHOT
+ 1.18.0
1.17
3.1.1
0.4.0-incubating
diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java
index 7df328d90..50ebec309 100644
--- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java
+++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java
@@ -33,17 +33,38 @@
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import cn.sliew.scaleph.engine.sql.gateway.services.CatalogService;
+import cn.sliew.scaleph.engine.sql.gateway.services.OperationService;
+import cn.sliew.scaleph.engine.sql.gateway.services.ResultFetcherService;
import cn.sliew.scaleph.engine.sql.gateway.services.SessionService;
+import cn.sliew.scaleph.engine.sql.gateway.services.SqlService;
import lombok.extern.slf4j.Slf4j;
@Slf4j
+@Component
public class ScalephSqlGatewayService implements SqlGatewayService {
- private SessionService sessionService;
-
- public ScalephSqlGatewayService(SessionService sessionService) {
+ private final SessionService sessionService;
+ private final CatalogService catalogService;
+ private final OperationService operationService;
+ private final SqlService sqlService;
+ private final ResultFetcherService resultFetcherService;
+
+ @Autowired
+ public ScalephSqlGatewayService(
+ SessionService sessionService,
+ CatalogService catalogService,
+ OperationService operationService,
+ SqlService sqlService,
+ ResultFetcherService resultFetcherService) {
this.sessionService = sessionService;
+ this.catalogService = catalogService;
+ this.operationService = operationService;
+ this.sqlService = sqlService;
+ this.resultFetcherService = resultFetcherService;
}
@Override
@@ -75,91 +96,102 @@ public EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) th
@Override
public OperationHandle submitOperation(SessionHandle sessionHandle, Callable callable)
throws SqlGatewayException {
- return null;
+ return operationService.submitOperation(sessionHandle, callable);
}
@Override
public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle)
- throws SqlGatewayException {}
+ throws SqlGatewayException {
+ operationService.cancelOperation(sessionHandle, operationHandle);
+ }
@Override
public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle)
- throws SqlGatewayException {}
+ throws SqlGatewayException {
+ operationService.closeOperation(sessionHandle, operationHandle);
+ }
@Override
public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle)
throws SqlGatewayException {
- return null;
+ return operationService.getOperationInfo(sessionHandle, operationHandle);
}
@Override
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle)
throws SqlGatewayException {
- return null;
+ return operationService.getOperationResultSchema(sessionHandle, operationHandle);
}
@Override
- public OperationHandle executeStatement(SessionHandle sessionHandle, String s, long l, Configuration configuration)
+ public OperationHandle executeStatement(
+ SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration configuration)
throws SqlGatewayException {
- return null;
+ return sqlService.executeStatement(sessionHandle, statement, executionTimeoutMs, configuration);
}
@Override
- public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long l, int i)
+ public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows)
throws SqlGatewayException {
- return null;
+ return resultFetcherService.fetchResults(sessionHandle, operationHandle, token, maxRows);
}
@Override
public ResultSet fetchResults(
- SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation fetchOrientation, int i)
+ SessionHandle sessionHandle,
+ OperationHandle operationHandle,
+ FetchOrientation fetchOrientation,
+ int maxRows)
throws SqlGatewayException {
- return null;
+ return resultFetcherService.fetchResults(sessionHandle, operationHandle, fetchOrientation, maxRows);
}
@Override
public String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException {
- return null;
+ return catalogService.getCurrentCatalog(sessionHandle);
}
@Override
public Set listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException {
- return null;
+ return catalogService.listCatalogs(sessionHandle);
}
@Override
- public Set listDatabases(SessionHandle sessionHandle, String s) throws SqlGatewayException {
- return null;
+ public Set listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException {
+ return catalogService.listDatabases(sessionHandle, catalogName);
}
@Override
public Set listTables(
- SessionHandle sessionHandle, String s, String s1, Set set)
+ SessionHandle sessionHandle,
+ String catalogName,
+ String databaseName,
+ Set tableKinds)
throws SqlGatewayException {
- return null;
+ return catalogService.listTables(sessionHandle, catalogName, databaseName, tableKinds);
}
@Override
- public ResolvedCatalogBaseTable> getTable(SessionHandle sessionHandle, ObjectIdentifier objectIdentifier)
+ public ResolvedCatalogBaseTable> getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
throws SqlGatewayException {
- return null;
+ return catalogService.getTable(sessionHandle, tableIdentifier);
}
@Override
- public Set listUserDefinedFunctions(SessionHandle sessionHandle, String s, String s1)
- throws SqlGatewayException {
- return null;
+ public Set listUserDefinedFunctions(
+ SessionHandle sessionHandle, String catalogName, String databaseName) throws SqlGatewayException {
+ return catalogService.listUserDefinedFunctions(sessionHandle, catalogName, databaseName);
}
@Override
public Set listSystemFunctions(SessionHandle sessionHandle) throws SqlGatewayException {
- return null;
+ return catalogService.listSystemFunctions(sessionHandle);
}
@Override
public FunctionDefinition getFunctionDefinition(
- SessionHandle sessionHandle, UnresolvedIdentifier unresolvedIdentifier) throws SqlGatewayException {
- return null;
+ SessionHandle sessionHandle, UnresolvedIdentifier functionIdentifier) throws SqlGatewayException {
+ return catalogService.getFunctionDefinition(sessionHandle, functionIdentifier);
}
@Override
@@ -168,7 +200,8 @@ public GatewayInfo getGatewayInfo() {
}
@Override
- public List completeStatement(SessionHandle sessionHandle, String s, int i) throws SqlGatewayException {
- return null;
+ public List completeStatement(SessionHandle sessionHandle, String statement, int position)
+ throws SqlGatewayException {
+ return sqlService.completeStatement(sessionHandle, statement, position);
}
}
diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/OperationService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/OperationService.java
index 8327c92c7..093fbf2bd 100644
--- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/OperationService.java
+++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/OperationService.java
@@ -82,5 +82,5 @@ OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle oper
* @param operationHandle handle to identify the operation.
*/
ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle)
- throws Exception;
+ throws SqlGatewayException;
}
diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/OperationServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/OperationServiceImpl.java
index a556f7ee3..ce8eda287 100644
--- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/OperationServiceImpl.java
+++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/OperationServiceImpl.java
@@ -99,9 +99,13 @@ public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHand
@Override
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle)
- throws Exception {
+ throws SqlGatewayException {
FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle);
SessionContext sessionContext = session.getSessionContext();
- return sessionContext.getOperationManager().getOperationResultSchema(operationHandle);
+ try {
+ return sessionContext.getOperationManager().getOperationResultSchema(operationHandle);
+ } catch (Exception e) {
+ throw new SqlGatewayException(e);
+ }
}
}