Skip to content

Commit

Permalink
[Feature][scaleph-ui-react] update flink kubernetes job detail web (#698
Browse files Browse the repository at this point in the history
)

* feature: update swagger info

* feature: flink kuberentes job detail web

* feature: flink kuberentes job detail web

* feature: flink kuberentes job detail web

* feature: flink kuberentes job detail web

* feature: flink kuberentes job detail web

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Mar 15, 2024
1 parent 6e2fefb commit 335a54a
Show file tree
Hide file tree
Showing 23 changed files with 694 additions and 295 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private Info apiInfo() {
return new Info()
.title("Scaleph API文档")
.description("Scaleph API文档")
.version("1.0.5-SNAPSHOT")
.version("2.0.3-SNAPSHOT")
.termsOfService("https://flowerfine.github.io/scaleph-website/zh")
.license(new License().name("Apache 2.0").url("https://github.com/flowerfine/scaleph/blob/dev/LICENSE"))
.contact(contact());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.validation.Valid;
import java.net.URI;
import java.util.List;
import java.util.Optional;

@Tag(name = "Flink Kubernetes管理-Job管理")
@RestController
Expand Down Expand Up @@ -171,6 +172,17 @@ public ResponseEntity<Page<WsFlinkKubernetesJobInstanceDTO>> listInstances(@Vali
return new ResponseEntity<>(result, HttpStatus.OK);
}

@Logging
@GetMapping("instances/current")
@Operation(summary = "获取任务当前实例", description = "获取任务当前实例")
public ResponseEntity<ResponseVO<WsFlinkKubernetesJobInstanceDTO>> currentInstance(@RequestParam("wsFlinkKubernetesJobId") Long wsFlinkKubernetesJobId) throws Exception {
Optional<WsFlinkKubernetesJobInstanceDTO> optional = wsFlinkKubernetesJobInstanceService.selectCurrent(wsFlinkKubernetesJobId);
if (optional.isPresent()) {
return new ResponseEntity<>(ResponseVO.success(optional.get()), HttpStatus.OK);
}
return new ResponseEntity<>(ResponseVO.error("not found"), HttpStatus.NOT_FOUND);
}

@Logging
@GetMapping("/instances/asYaml/{id}")
@Operation(summary = "查询 YAML 格式 Job 实例", description = "查询 YAML 格式 Job 实例")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO {
@TableField("user_flink_configuration")
private String userFlinkConfiguration;

@TableField("merged_flink_configuration")
private String mergedFlinkConfiguration;

@TableField(value = "`state`", updateStrategy = FieldStrategy.IGNORED)
private ResourceLifecycleState state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<result column="job_manager" property="jobManager"/>
<result column="task_manager" property="taskManager"/>
<result column="user_flink_configuration" property="userFlinkConfiguration"/>
<result column="merged_flink_configuration" property="mergedFlinkConfiguration"/>
<result column="state" property="state"/>
<result column="job_state" property="jobState"/>
<result column="error" property="error"/>
Expand All @@ -58,6 +59,7 @@
<result column="job_manager" property="jobManager"/>
<result column="task_manager" property="taskManager"/>
<result column="user_flink_configuration" property="userFlinkConfiguration"/>
<result column="merged_flink_configuration" property="mergedFlinkConfiguration"/>
<result column="state" property="state"/>
<result column="job_state" property="jobState"/>
<result column="error" property="error"/>
Expand All @@ -75,7 +77,7 @@
id, creator, create_time, editor, update_time,
ws_flink_kubernetes_job_id, instance_id,
parallelism, upgrade_mode, allow_non_restored_state,
job_manager, task_manager, user_flink_configuration,
job_manager, task_manager, user_flink_configuration, merged_flink_configuration,
`state`, job_state, `error`, cluster_info, task_manager_info,
start_time, end_time, duration
</sql>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;

@Component
Expand Down Expand Up @@ -99,7 +98,7 @@ private void addLogging(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDep
private void mergeJobInstance(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
spec.setJobManager(TemplateMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class));
spec.setTaskManager(TemplateMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class));
spec.setFlinkConfiguration(TemplateMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class));
spec.setFlinkConfiguration(jobInstanceDTO.getMergedFlinkConfiguration());
JobSpec job = spec.getJob();
if (jobInstanceDTO.getParallelism() != null) {
job.setParallelism(jobInstanceDTO.getParallelism());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ default WsFlinkKubernetesJobInstance toDo(WsFlinkKubernetesJobInstanceDTO dto) {
if (CollectionUtils.isEmpty(dto.getUserFlinkConfiguration()) == false) {
entity.setUserFlinkConfiguration(JacksonUtil.toJsonString(dto.getUserFlinkConfiguration()));
}
if (CollectionUtils.isEmpty(dto.getMergedFlinkConfiguration()) == false) {
entity.setMergedFlinkConfiguration(JacksonUtil.toJsonString(dto.getMergedFlinkConfiguration()));
}
if (CollectionUtils.isEmpty(dto.getClusterInfo()) == false) {
entity.setClusterInfo(JacksonUtil.toJsonString(dto.getClusterInfo()));
}
Expand All @@ -79,6 +82,9 @@ default WsFlinkKubernetesJobInstanceDTO toDto(WsFlinkKubernetesJobInstance entit
if (StringUtils.hasText(entity.getUserFlinkConfiguration())) {
dto.setUserFlinkConfiguration(JacksonUtil.parseJsonString(entity.getUserFlinkConfiguration(), Map.class));
}
if (StringUtils.hasText(entity.getMergedFlinkConfiguration())) {
dto.setMergedFlinkConfiguration(JacksonUtil.parseJsonString(entity.getMergedFlinkConfiguration(), Map.class));
}
if (StringUtils.hasText(entity.getClusterInfo())) {
dto.setClusterInfo(JacksonUtil.parseJsonString(entity.getClusterInfo(), Map.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO {
@Schema(description = "user flink configuration")
private Map<String, String> userFlinkConfiguration;

@Schema(description = "merged flink configuration")
private Map<String, String> mergedFlinkConfiguration;

@Schema(description = "deploy state")
private ResourceLifecycleState state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceSavepointMapper;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.util.TemplateMerger;
import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceDeployParam;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobState;
Expand Down Expand Up @@ -161,16 +162,16 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti
if (param.getUserFlinkConfiguration() != null) {
record.setUserFlinkConfiguration(JacksonUtil.toJsonString(param.getUserFlinkConfiguration()));
}
wsFlinkKubernetesJobInstanceMapper.insert(record);
WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(record.getId());
WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob();
String yaml = asYaml(record.getId());
WsFlinkKubernetesJobDTO jobDTO = wsFlinkKubernetesJobService.selectOne(param.getWsFlinkKubernetesJobId());
Long clusterCredentialId = null;
String resource = null;
WatchCallbackHandler callbackHandler = null;
switch (jobDTO.getDeploymentKind()) {
case FLINK_DEPLOYMENT:
clusterCredentialId = jobDTO.getFlinkDeployment().getClusterCredentialId();
Map<String, String> flinkConfiguration = jobDTO.getFlinkDeployment().getFlinkConfiguration();
Map<String, String> mergedFlinkConfiguration = TemplateMerger.merge(flinkConfiguration, param.getUserFlinkConfiguration(), Map.class);
record.setMergedFlinkConfiguration(JacksonUtil.toJsonString(mergedFlinkConfiguration));
resource = Constant.FLINK_DEPLOYMENT;
callbackHandler = flinkDeploymentWatchCallbackHandler;
break;
Expand All @@ -181,6 +182,10 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti
break;
default:
}
wsFlinkKubernetesJobInstanceMapper.insert(record);

WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(record.getId());
String yaml = asYaml(record.getId());
flinkKubernetesOperatorService.deployJob(clusterCredentialId, yaml);
// add watch
Map<String, String> lables = metadataHandler.generateLables(jobInstanceDTO);
Expand Down
27 changes: 17 additions & 10 deletions scaleph-ui-react/config/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,7 @@ export default [
{
path: '/workspace/engine/compute/flink/deployment/detail',
component: './Project/Workspace/Engine/Compute/Flink/Deployment/Detail',
},
{
name: 'job',
path: '/workspace/engine/compute/flink/job',
component: './Project/Workspace/Engine/Compute/Flink/Job',
},
{
path: '/workspace/engine/compute/flink/job/detail',
component: './Project/Workspace/Engine/Compute/Flink/Job/Detail',
},
}
]
}
]
Expand Down Expand Up @@ -283,6 +274,22 @@ export default [
},
]
},
{
name: 'project.operation',
path: '/workspace/operation',
icon: 'solution',
routes: [
{
name: 'flink',
path: '/workspace/operation/compute/flink/job',
component: './Project/Workspace/Engine/Compute/Flink/Job',
},
{
path: '/workspace/operation/compute/flink/job/detail',
component: './Project/Workspace/Engine/Compute/Flink/Job/Detail',
}
]
},
]
},
{
Expand Down
3 changes: 2 additions & 1 deletion scaleph-ui-react/src/locales/zh-CN/menu.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export default {
'menu.project.engine.compute.flink.template': '部署模板',
'menu.project.engine.compute.flink.session-cluster': 'Session 集群',
'menu.project.engine.compute.flink.deployment': 'Deployment',
'menu.project.engine.compute.flink.job': '任务',
'menu.project.data-integration': '数据集成',
'menu.project.data-integration.seatunnel': 'SeaTunnel',
'menu.project.data-integration.flink-cdc': 'Flink CDC',
Expand All @@ -27,6 +26,8 @@ export default {
'menu.project.dag-scheduler': 'DAG 调度',
'menu.project.data-service': '数据服务',
'menu.project.data-service.config': '接口配置',
'menu.project.operation': '运维中心',
'menu.project.operation.flink': 'Flink任务',

'menu.resource': '资源',
'menu.resource.jar': '公共 Jar',
Expand Down
17 changes: 11 additions & 6 deletions scaleph-ui-react/src/locales/zh-CN/pages/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1014,17 +1014,22 @@ export default {
'pages.project.flink.kubernetes.job.detail.metrics': 'Metrics',
'pages.project.flink.kubernetes.job.detail.logs': 'Logs',

'pages.project.flink.kubernetes.job.detail.overview': '总览',
'pages.project.flink.kubernetes.job.detail.overview.artifact': 'Artifact',
'pages.project.flink.kubernetes.job.detail.overview.resource': '资源详情',
'pages.project.flink.kubernetes.job.detail.overview.configuration': 'Flink 配置',
'pages.project.flink.kubernetes.job.detail.savepoint': '状态管理',
'pages.project.flink.kubernetes.job.detail.savepoint.timeStamp': 'TimeStamp',
'pages.project.flink.kubernetes.job.detail.savepoint.formatType': 'Format',
'pages.project.flink.kubernetes.job.detail.savepoint.triggerType': 'Trigger Type',
'pages.project.flink.kubernetes.job.detail.savepoint.location': 'Location',
'pages.project.flink.kubernetes.job.detail.yaml': 'YAML',
'pages.project.flink.kubernetes.job.detail.instanceList': 'Intances',
'pages.project.flink.kubernetes.job.detail.instanceList': '历史实例',
'pages.project.flink.kubernetes.job.detail.instanceList.startTime': 'Start Time',
'pages.project.flink.kubernetes.job.detail.instanceList.endTime': 'End Time',
'pages.project.flink.kubernetes.job.detail.instanceList.duration': 'Duration',
'pages.project.flink.kubernetes.job.detail.instanceList.upgradeMode': 'Upgrade Mode',
'pages.project.flink.kubernetes.job.detail.savepoint': 'Savepoint',
'pages.project.flink.kubernetes.job.detail.savepoint.timeStamp': 'TimeStamp',
'pages.project.flink.kubernetes.job.detail.savepoint.formatType': 'Format',
'pages.project.flink.kubernetes.job.detail.savepoint.triggerType': 'Trigger Type',
'pages.project.flink.kubernetes.job.detail.savepoint.location': 'Location',


'pages.project.doris.template': 'Template',
'pages.project.doris.template.name': '名称',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import React, {useRef} from "react";
import {ActionType, ProCard, ProDescriptions, ProFormInstance} from "@ant-design/pro-components";
import {connect, useAccess, useIntl} from "@umijs/max";
import {Props} from '@/typings';
import {WsArtifactFlinkJar, WsFlinkKubernetesJob} from "@/services/project/typings";
import {ProDescriptionsItemProps} from "@ant-design/pro-descriptions";
import {ProCoreActionType} from "@ant-design/pro-utils/es/typing";
import {Button, message, Popconfirm} from "antd";
import {
AreaChartOutlined,
CameraOutlined,
CaretRightOutlined,
CloseOutlined,
DashboardOutlined, OrderedListOutlined,
RedoOutlined
} from "@ant-design/icons";
import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService";

const ArtifactFlinkJarWeb: React.FC<Props<WsFlinkKubernetesJob>> = (props: any) => {
const intl = useIntl();
const access = useAccess();
const actionRef = useRef<ActionType>();
const formRef = useRef<ProFormInstance>();


const descriptionColumns: ProDescriptionsItemProps<WsArtifactFlinkJar>[] = [
{
title: intl.formatMessage({id: 'pages.project.artifact.name'}),
key: `name`,
renderText: (text: any, record: WsArtifactFlinkJar, index: number, action: ProCoreActionType) => {
return record.artifact?.name
}
},
{
title: intl.formatMessage({id: 'pages.project.artifact.jar.fileName'}),
key: `fileName`,
dataIndex: 'fileName'
},
{
title: intl.formatMessage({id: 'pages.resource.flinkRelease.version'}),
key: `flinkVersion`,
dataIndex: 'flinkVersion',
renderText: (text: any, record: WsArtifactFlinkJar, index: number, action: ProCoreActionType) => {
return record.flinkVersion?.label
}
},
{
title: intl.formatMessage({id: 'pages.project.artifact.jar.entryClass'}),
key: `entryClass`,
dataIndex: 'entryClass'
},
{
title: intl.formatMessage({id: 'pages.project.artifact.jar.jarParams'}),
key: `jarParams`,
dataIndex: 'jarParams',
valueType: 'jsonCode'
},
{
title: intl.formatMessage({id: 'app.common.data.remark'}),
key: `remark`,
dataIndex: 'remark',
renderText: (text: any, record: WsArtifactFlinkJar, index: number, action: ProCoreActionType) => {
return record.artifact?.remark
}
},
{
title: intl.formatMessage({id: 'app.common.data.createTime'}),
key: `createTime`,
dataIndex: 'createTime',
},
{
title: intl.formatMessage({id: 'app.common.data.updateTime'}),
key: `updateTime`,
dataIndex: 'updateTime',
}
]

return (
<ProDescriptions
column={2}
bordered={true}
dataSource={props.flinkKubernetesJobDetail.job?.artifactFlinkJar}
columns={descriptionColumns}
/>
);
}


const mapModelToProps = ({flinkKubernetesJobDetail}: any) => ({flinkKubernetesJobDetail})
export default connect(mapModelToProps)(ArtifactFlinkJarWeb);
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import React, {useEffect, useState} from "react";
import {ProColumns, ProTable} from "@ant-design/pro-components";
import {connect, useAccess, useIntl} from "@umijs/max";

type Config = {
key: string;
value?: any;
};

const FlinkKubernetesJobDetailConfigurationWeb: React.FC = (props: any) => {
const intl = useIntl();
const access = useAccess();
const [dataSource, setDataSource] = useState<Config[]>([])

useEffect(() => {
if (props.flinkKubernetesJobDetail.job?.jobInstance) {
const config: Array<Config> = []
Object.entries<[string, any][]>(props.flinkKubernetesJobDetail.job?.jobInstance?.mergedFlinkConfiguration ? {...props.flinkKubernetesJobDetail.job?.jobInstance?.mergedFlinkConfiguration} : {}).forEach(([key, value]) => {
config.push({
key: key,
value: value
})
});
setDataSource(config)
}
}, [props.flinkKubernetesJobDetail.job]);

const tableColumns: ProColumns<Config>[] = [
{
dataIndex: 'key',
width: '40%'
},
{
dataIndex: 'value',
},
]

return (
<ProTable<Config>
rowKey="key"
columns={tableColumns}
dataSource={dataSource}
bordered
options={false}
search={false}
showHeader={false}
/>
);
}

const mapModelToProps = ({flinkKubernetesJobDetail}: any) => ({flinkKubernetesJobDetail})
export default connect(mapModelToProps)(FlinkKubernetesJobDetailConfigurationWeb);
Loading

0 comments on commit 335a54a

Please sign in to comment.