Skip to content

Commit

Permalink
[Feature][sql-gateway] Update Flink to 1.18 stable version and comple…
Browse files Browse the repository at this point in the history
…te ScalephSqlGatewayService

format code
  • Loading branch information
LiuBodong committed Oct 30, 2023
1 parent 62a8c73 commit 2a36d23
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 45 deletions.
13 changes: 1 addition & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,6 @@
<name>Sonatype Nexus Snapshots</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<modules>
Expand Down Expand Up @@ -171,7 +160,7 @@
<akka.version>2.5.21</akka.version>
<protobuf.version>3.21.5</protobuf.version>
<netty.version>4.1.82.Final</netty.version>
<flink.version>1.19-SNAPSHOT</flink.version>
<flink.version>1.18.0</flink.version>
<flink.base.version>1.17</flink.base.version>
<flink-jdbc.version>3.1.1</flink-jdbc.version>
<paimon.version>0.4.0-incubating</paimon.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,38 @@
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import cn.sliew.scaleph.engine.sql.gateway.services.CatalogService;
import cn.sliew.scaleph.engine.sql.gateway.services.OperationService;
import cn.sliew.scaleph.engine.sql.gateway.services.ResultFetcherService;
import cn.sliew.scaleph.engine.sql.gateway.services.SessionService;
import cn.sliew.scaleph.engine.sql.gateway.services.SqlService;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class ScalephSqlGatewayService implements SqlGatewayService {

private SessionService sessionService;

public ScalephSqlGatewayService(SessionService sessionService) {
private final SessionService sessionService;
private final CatalogService catalogService;
private final OperationService operationService;
private final SqlService sqlService;
private final ResultFetcherService resultFetcherService;

@Autowired
public ScalephSqlGatewayService(
SessionService sessionService,
CatalogService catalogService,
OperationService operationService,
SqlService sqlService,
ResultFetcherService resultFetcherService) {
this.sessionService = sessionService;
this.catalogService = catalogService;
this.operationService = operationService;
this.sqlService = sqlService;
this.resultFetcherService = resultFetcherService;
}

@Override
Expand Down Expand Up @@ -75,91 +96,102 @@ public EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) th
@Override
public OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> callable)
throws SqlGatewayException {
return null;
return operationService.submitOperation(sessionHandle, callable);
}

@Override
public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle)
throws SqlGatewayException {}
throws SqlGatewayException {
operationService.cancelOperation(sessionHandle, operationHandle);
}

@Override
public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle)
throws SqlGatewayException {}
throws SqlGatewayException {
operationService.closeOperation(sessionHandle, operationHandle);
}

@Override
public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle)
throws SqlGatewayException {
return null;
return operationService.getOperationInfo(sessionHandle, operationHandle);
}

@Override
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle)
throws SqlGatewayException {
return null;
return operationService.getOperationResultSchema(sessionHandle, operationHandle);
}

@Override
public OperationHandle executeStatement(SessionHandle sessionHandle, String s, long l, Configuration configuration)
public OperationHandle executeStatement(
SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration configuration)
throws SqlGatewayException {
return null;
return sqlService.executeStatement(sessionHandle, statement, executionTimeoutMs, configuration);
}

@Override
public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long l, int i)
public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows)
throws SqlGatewayException {
return null;
return resultFetcherService.fetchResults(sessionHandle, operationHandle, token, maxRows);
}

@Override
public ResultSet fetchResults(
SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation fetchOrientation, int i)
SessionHandle sessionHandle,
OperationHandle operationHandle,
FetchOrientation fetchOrientation,
int maxRows)
throws SqlGatewayException {
return null;
return resultFetcherService.fetchResults(sessionHandle, operationHandle, fetchOrientation, maxRows);
}

@Override
public String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException {
return null;
return catalogService.getCurrentCatalog(sessionHandle);
}

@Override
public Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException {
return null;
return catalogService.listCatalogs(sessionHandle);
}

@Override
public Set<String> listDatabases(SessionHandle sessionHandle, String s) throws SqlGatewayException {
return null;
public Set<String> listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException {
return catalogService.listDatabases(sessionHandle, catalogName);
}

@Override
public Set<TableInfo> listTables(
SessionHandle sessionHandle, String s, String s1, Set<CatalogBaseTable.TableKind> set)
SessionHandle sessionHandle,
String catalogName,
String databaseName,
Set<CatalogBaseTable.TableKind> tableKinds)
throws SqlGatewayException {
return null;
return catalogService.listTables(sessionHandle, catalogName, databaseName, tableKinds);
}

@Override
public ResolvedCatalogBaseTable<?> getTable(SessionHandle sessionHandle, ObjectIdentifier objectIdentifier)
public ResolvedCatalogBaseTable<?> getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
throws SqlGatewayException {
return null;
return catalogService.getTable(sessionHandle, tableIdentifier);
}

@Override
public Set<FunctionInfo> listUserDefinedFunctions(SessionHandle sessionHandle, String s, String s1)
throws SqlGatewayException {
return null;
public Set<FunctionInfo> listUserDefinedFunctions(
SessionHandle sessionHandle, String catalogName, String databaseName) throws SqlGatewayException {
return catalogService.listUserDefinedFunctions(sessionHandle, catalogName, databaseName);
}

@Override
public Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle) throws SqlGatewayException {
return null;
return catalogService.listSystemFunctions(sessionHandle);
}

@Override
public FunctionDefinition getFunctionDefinition(
SessionHandle sessionHandle, UnresolvedIdentifier unresolvedIdentifier) throws SqlGatewayException {
return null;
SessionHandle sessionHandle, UnresolvedIdentifier functionIdentifier) throws SqlGatewayException {
return catalogService.getFunctionDefinition(sessionHandle, functionIdentifier);
}

@Override
Expand All @@ -168,7 +200,8 @@ public GatewayInfo getGatewayInfo() {
}

@Override
public List<String> completeStatement(SessionHandle sessionHandle, String s, int i) throws SqlGatewayException {
return null;
public List<String> completeStatement(SessionHandle sessionHandle, String statement, int position)
throws SqlGatewayException {
return sqlService.completeStatement(sessionHandle, statement, position);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle oper
* @param operationHandle handle to identify the operation.
*/
ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle)
throws Exception;
throws SqlGatewayException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,13 @@ public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHand

@Override
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle)
throws Exception {
throws SqlGatewayException {
FlinkSqlGatewaySession session = sessionService.getSession(sessionHandle);
SessionContext sessionContext = session.getSessionContext();
return sessionContext.getOperationManager().getOperationResultSchema(operationHandle);
try {
return sessionContext.getOperationManager().getOperationResultSchema(operationHandle);
} catch (Exception e) {
throw new SqlGatewayException(e);
}
}
}

0 comments on commit 2a36d23

Please sign in to comment.