diff --git a/Makefile b/Makefile index 4d3f2b4df..f8fb89d7e 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ MVN_TEST := mvn test -T 4C CURRENT_DIR := $(shell pwd) DOCKER_VOLUME := -v $(HOME)/.m2:/root/.m2 -v $(CURRENT_DIR):/ws -DOCKER_IMG := flowci/javasdk:1.0 +DOCKER_IMG := maven:3-openjdk-8 DOCKER_RUN := docker run -it --rm -w /ws $(DOCKER_VOLUME) --network host $(DOCKER_IMG) DOCKER_BUILD := ./build.sh diff --git a/core/src/main/java/com/flowci/core/agent/config/AgentConfig.java b/core/src/main/java/com/flowci/core/agent/config/AgentConfig.java deleted file mode 100644 index f20464164..000000000 --- a/core/src/main/java/com/flowci/core/agent/config/AgentConfig.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2018 flow.ci - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.flowci.core.agent.config; - -import com.flowci.core.common.config.AppProperties; -import com.flowci.core.common.domain.Variables.App; -import com.flowci.core.common.helper.ThreadHelper; -import com.flowci.domain.Settings; -import java.util.Objects; -import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.env.Environment; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -/** - * @author yang - */ -@Log4j2 -@Configuration -public class AgentConfig { - - @Autowired - private Environment env; - - @Autowired - private AppProperties.Zookeeper zkProperties; - - @Autowired - private AppProperties.RabbitMQ rabbitProperties; - - @Bean("baseSettings") - public Settings baseSettings() { - Settings.Zookeeper zk = new Settings.Zookeeper(); - zk.setRoot(zkProperties.getAgentRoot()); - zk.setHost(getZkHost()); - - Settings.RabbitMQ mq = new Settings.RabbitMQ(); - mq.setUri(getRabbitUri()); - mq.setCallback(rabbitProperties.getCallbackQueue()); - mq.setShellLogEx(rabbitProperties.getShellLogEx()); - mq.setTtyLogEx(rabbitProperties.getTtyLogEx()); - - Settings settings = new Settings(); - settings.setZookeeper(zk); - settings.setQueue(mq); - - log.info(settings); - return settings; - } - - private String getZkHost() { - return env.getProperty(App.ZookeeperHost, zkProperties.getHost()); - } - - private String getRabbitUri() { - String uri = rabbitProperties.getUri().toString(); - String domain = env.getProperty(App.RabbitHost); - - if (Objects.isNull(domain)) { - return uri; - } - - return uri.replace(rabbitProperties.getUri().getHost(), domain); - } -} diff --git a/core/src/main/java/com/flowci/core/agent/consumer/OnAgentCreated.java b/core/src/main/java/com/flowci/core/agent/consumer/OnAgentCreated.java index ecc7ae855..2e7626da1 100644 --- a/core/src/main/java/com/flowci/core/agent/consumer/OnAgentCreated.java +++ b/core/src/main/java/com/flowci/core/agent/consumer/OnAgentCreated.java @@ -3,7 +3,7 @@ import com.flowci.core.agent.event.AgentCreatedEvent; import com.flowci.core.common.domain.PushEvent; import com.flowci.core.common.manager.SocketPushManager; -import com.flowci.domain.Agent; +import com.flowci.core.agent.domain.Agent; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; diff --git a/core/src/main/java/com/flowci/core/agent/consumer/OnAgentStatusChange.java b/core/src/main/java/com/flowci/core/agent/consumer/OnAgentStatusChange.java index ec0a10423..664107526 100644 --- a/core/src/main/java/com/flowci/core/agent/consumer/OnAgentStatusChange.java +++ b/core/src/main/java/com/flowci/core/agent/consumer/OnAgentStatusChange.java @@ -19,7 +19,7 @@ import com.flowci.core.agent.event.AgentStatusEvent; import com.flowci.core.common.domain.PushEvent; import com.flowci.core.common.manager.SocketPushManager; -import com.flowci.domain.Agent; +import com.flowci.core.agent.domain.Agent; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; diff --git a/core/src/main/java/com/flowci/core/agent/controller/AgentController.java b/core/src/main/java/com/flowci/core/agent/controller/AgentController.java index 68e0aa380..4116478be 100644 --- a/core/src/main/java/com/flowci/core/agent/controller/AgentController.java +++ b/core/src/main/java/com/flowci/core/agent/controller/AgentController.java @@ -17,21 +17,18 @@ package com.flowci.core.agent.controller; import com.flowci.core.agent.domain.AgentAction; -import com.flowci.core.agent.domain.AgentInit; import com.flowci.core.agent.domain.CreateOrUpdateAgent; import com.flowci.core.agent.domain.DeleteAgent; import com.flowci.core.agent.service.AgentService; import com.flowci.core.auth.annotation.Action; import com.flowci.core.job.service.LoggingService; -import com.flowci.domain.Agent; -import com.flowci.domain.Settings; +import com.flowci.core.agent.domain.Agent; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; -import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.io.InputStream; import java.util.List; @@ -83,15 +80,6 @@ public Agent delete(@Validated @RequestBody DeleteAgent body) { // Functions require agent token header // -------------------------------------------------------- - @PostMapping("/api/connect") - public Settings connect(@RequestHeader(AgentAuth.HeaderAgentToken) String token, - @RequestBody AgentInit init, - HttpServletRequest request) { - init.setToken(token); - init.setIp(request.getRemoteHost()); - return agentService.connect(init); - } - @PostMapping("/api/profile") public void profile(@RequestHeader(AgentAuth.HeaderAgentToken) String token, @RequestBody Agent.Resource resource) { diff --git a/core/src/main/java/com/flowci/core/agent/controller/AgentHostController.java b/core/src/main/java/com/flowci/core/agent/controller/AgentHostController.java index 00bac2d44..ee91ea8f6 100644 --- a/core/src/main/java/com/flowci/core/agent/controller/AgentHostController.java +++ b/core/src/main/java/com/flowci/core/agent/controller/AgentHostController.java @@ -18,7 +18,7 @@ import com.flowci.core.agent.domain.AgentHost; import com.flowci.core.agent.domain.AgentHostAction; -import com.flowci.core.agent.domain.CreateOrUpdateSshAgentHost; +import com.flowci.core.agent.domain.SaveAgentHost; import com.flowci.core.agent.service.AgentHostService; import com.flowci.core.auth.annotation.Action; import lombok.extern.log4j.Log4j2; @@ -58,7 +58,7 @@ public AgentHost deleteByName(@PathVariable String name) { @PostMapping @Action(AgentHostAction.CREATE_UPDATE) - public AgentHost createOrUpdate(@RequestBody @Validated CreateOrUpdateSshAgentHost body) { + public AgentHost createOrUpdate(@RequestBody @Validated SaveAgentHost body) { AgentHost host = body.toObj(); agentHostService.createOrUpdate(host); return host; diff --git a/core/src/main/java/com/flowci/core/agent/dao/AgentDao.java b/core/src/main/java/com/flowci/core/agent/dao/AgentDao.java index 7ee5b710a..756c2d824 100644 --- a/core/src/main/java/com/flowci/core/agent/dao/AgentDao.java +++ b/core/src/main/java/com/flowci/core/agent/dao/AgentDao.java @@ -16,22 +16,20 @@ package com.flowci.core.agent.dao; -import com.flowci.domain.Agent; -import com.flowci.domain.Agent.Status; -import java.util.List; -import java.util.Set; +import com.flowci.core.agent.domain.Agent; import org.springframework.data.mongodb.repository.MongoRepository; import org.springframework.stereotype.Repository; +import java.util.List; +import java.util.Set; + /** * @author yang */ @Repository -public interface AgentDao extends MongoRepository { - - List findAllByStatusAndTagsIn(Status status, Set tags); +public interface AgentDao extends MongoRepository, CustomAgentDao { - List findAllByStatus(Status status); + List findAllByTagsIn(Set tags); List findAllByHostId(String hostId); diff --git a/core/src/main/java/com/flowci/core/agent/dao/AgentIndexInitializer.java b/core/src/main/java/com/flowci/core/agent/dao/AgentIndexInitializer.java deleted file mode 100644 index a7bafaaac..000000000 --- a/core/src/main/java/com/flowci/core/agent/dao/AgentIndexInitializer.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2018 flow.ci - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.flowci.core.agent.dao; - -import com.flowci.domain.Agent; -import javax.annotation.PostConstruct; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.domain.Sort.Direction; -import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.index.Index; -import org.springframework.stereotype.Component; - -/** - * @author yang - */ -@Component -public class AgentIndexInitializer { - - @Autowired - protected MongoOperations mongoOps; - - @PostConstruct - public void createIndexOnName() { - mongoOps.indexOps(Agent.class) - .ensureIndex(new Index().on("name", Direction.ASC).unique()); - } - - @PostConstruct - public void createIndexOnToken() { - mongoOps.indexOps(Agent.class) - .ensureIndex(new Index().on("token", Direction.ASC).unique()); - } -} diff --git a/core/src/main/java/com/flowci/core/agent/dao/CustomAgentDao.java b/core/src/main/java/com/flowci/core/agent/dao/CustomAgentDao.java new file mode 100644 index 000000000..86d8e76c8 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/dao/CustomAgentDao.java @@ -0,0 +1,8 @@ +package com.flowci.core.agent.dao; + +import com.flowci.core.agent.domain.Agent; + +public interface CustomAgentDao { + + long updateAllStatus(Agent.Status status); +} diff --git a/core/src/main/java/com/flowci/core/agent/dao/CustomAgentDaoImpl.java b/core/src/main/java/com/flowci/core/agent/dao/CustomAgentDaoImpl.java new file mode 100644 index 000000000..679e6f294 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/dao/CustomAgentDaoImpl.java @@ -0,0 +1,20 @@ +package com.flowci.core.agent.dao; + +import com.flowci.core.agent.domain.Agent; +import com.mongodb.client.result.UpdateResult; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; + +public class CustomAgentDaoImpl implements CustomAgentDao { + + @Autowired + private MongoOperations operations; + + @Override + public long updateAllStatus(Agent.Status status) { + UpdateResult r = operations.updateMulti(new Query(), new Update().set("status", status), Agent.class); + return r.getModifiedCount(); + } +} diff --git a/domain/src/main/java/com/flowci/domain/Agent.java b/core/src/main/java/com/flowci/core/agent/domain/Agent.java similarity index 69% rename from domain/src/main/java/com/flowci/domain/Agent.java rename to core/src/main/java/com/flowci/core/agent/domain/Agent.java index 9ed2999e9..f3cdd396d 100644 --- a/domain/src/main/java/com/flowci/domain/Agent.java +++ b/core/src/main/java/com/flowci/core/agent/domain/Agent.java @@ -14,15 +14,19 @@ * limitations under the License. */ -package com.flowci.domain; +package com.flowci.core.agent.domain; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.flowci.core.common.domain.Mongoable; import com.flowci.domain.Common.OS; +import com.flowci.domain.SimpleKeyPair; import com.google.common.base.Strings; import lombok.*; import lombok.experimental.Accessors; +import org.springframework.data.mongodb.core.index.Indexed; +import org.springframework.data.mongodb.core.mapping.Document; -import java.io.Serializable; +import java.time.Instant; import java.util.Collections; import java.util.Date; import java.util.Set; @@ -30,23 +34,24 @@ /** * @author yang */ -@Data +@Getter +@Setter @NoArgsConstructor @Accessors(chain = true) -@EqualsAndHashCode(of = {"id"}) -public class Agent implements Serializable { +@Document +public class Agent extends Mongoable { public static final String PATH_SLASH = "/"; public enum Status { - CREATED, - OFFLINE, - BUSY, + STARTING, // sent start signal to provider + + IDLE, // started, without running task - IDLE; + BUSY; // running a task public byte[] getBytes() { return this.toString().getBytes(); @@ -73,10 +78,10 @@ public static class Resource { private int freeDisk; // in MB } - private String id; - + @Indexed(name = "index_agent_name", unique = true) private String name; + @Indexed(name = "index_agent_token", unique = true) private String token; private String url; @@ -86,18 +91,22 @@ public static class Resource { */ private String hostId; - private Common.OS os = OS.UNKNOWN; + private boolean k8sCluster; + + private OS os = OS.UNKNOWN; private Resource resource = new Resource(); private Set tags = Collections.emptySet(); - private Status status = Status.CREATED; + private Status status = Status.OFFLINE; private Date statusUpdatedAt; private String jobId; + private String containerId; // for started from host + @JsonIgnore private SimpleKeyPair rsa; @@ -116,8 +125,14 @@ public void setStatus(Status status) { } @JsonIgnore - public boolean hasUrl() { - return !Strings.isNullOrEmpty(url); + public boolean isStartingOver(int seconds) { + if (status == Status.STARTING) { + Instant expire = createdAt.toInstant().plusSeconds(seconds); + if (Instant.now().isAfter(expire)) { + return true; + } + } + return false; } @JsonIgnore @@ -125,14 +140,19 @@ public boolean hasJob() { return !Strings.isNullOrEmpty(jobId); } + @JsonIgnore + public String getQueueName() { + return "queue.agent." + id; + } + @JsonIgnore public boolean isBusy() { - return isOnline() && status == Status.BUSY; + return status == Status.BUSY; } @JsonIgnore public boolean isIdle() { - return isOnline() && status == Status.IDLE; + return status == Status.IDLE; } @JsonIgnore @@ -146,7 +166,7 @@ public boolean isOnline() { } @JsonIgnore - public String getQueueName() { - return "queue.agent." + id; + public boolean isStarting() { + return status == Status.STARTING; } } diff --git a/core/src/main/java/com/flowci/core/agent/domain/AgentInit.java b/core/src/main/java/com/flowci/core/agent/domain/AgentInit.java index 3cc905758..a4fbafa31 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/AgentInit.java +++ b/core/src/main/java/com/flowci/core/agent/domain/AgentInit.java @@ -16,7 +16,6 @@ package com.flowci.core.agent.domain; -import com.flowci.domain.Agent; import com.flowci.domain.Common; import lombok.Getter; import lombok.Setter; @@ -28,6 +27,8 @@ @Setter public class AgentInit { + private Boolean k8sCluster; // is agent running in k8s cluster + private String token; private String ip; @@ -36,5 +37,7 @@ public class AgentInit { private Common.OS os; + private Agent.Status status; + private Agent.Resource resource = new Agent.Resource(); } diff --git a/core/src/main/java/com/flowci/core/agent/domain/AgentWithJob.java b/core/src/main/java/com/flowci/core/agent/domain/AgentWithJob.java index fdca14c0c..56a50ce36 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/AgentWithJob.java +++ b/core/src/main/java/com/flowci/core/agent/domain/AgentWithJob.java @@ -17,7 +17,7 @@ package com.flowci.core.agent.domain; import com.flowci.core.job.domain.Job; -import com.flowci.domain.Agent.Status; +import com.flowci.core.agent.domain.Agent.Status; import java.util.Set; import lombok.Data; diff --git a/core/src/main/java/com/flowci/core/agent/domain/CmdStdLog.java b/core/src/main/java/com/flowci/core/agent/domain/CmdStdLog.java deleted file mode 100644 index b5dfc2643..000000000 --- a/core/src/main/java/com/flowci/core/agent/domain/CmdStdLog.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.flowci.core.agent.domain; - -import java.util.Map; -import java.util.Optional; - -public abstract class CmdStdLog { - - public static final String ID_HEADER = "id"; - - public static final String STEP_ID_HEADER = "stepId"; - - public static Optional getFromHeader(Map src, String header) { - if (src == null) { - return Optional.empty(); - } - - Object id = src.get(header); - if (id == null) { - return Optional.empty(); - } - - return Optional.of(id.toString()); - } -} diff --git a/core/src/main/java/com/flowci/core/agent/domain/K8sAgentHost.java b/core/src/main/java/com/flowci/core/agent/domain/K8sAgentHost.java new file mode 100644 index 000000000..d205e5af4 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/domain/K8sAgentHost.java @@ -0,0 +1,22 @@ +package com.flowci.core.agent.domain; + +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; +import org.springframework.data.mongodb.core.mapping.Document; + +@Getter +@Setter +@Document(collection = "agent_host") +public class K8sAgentHost extends AgentHost { + + @NonNull + private String namespace; + + @NonNull + private String secret; // secret for config file + + public K8sAgentHost() { + setType(Type.K8s); + } +} diff --git a/core/src/main/java/com/flowci/core/agent/domain/CreateOrUpdateSshAgentHost.java b/core/src/main/java/com/flowci/core/agent/domain/SaveAgentHost.java similarity index 82% rename from core/src/main/java/com/flowci/core/agent/domain/CreateOrUpdateSshAgentHost.java rename to core/src/main/java/com/flowci/core/agent/domain/SaveAgentHost.java index 79bdef5b4..46ec6fd98 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/CreateOrUpdateSshAgentHost.java +++ b/core/src/main/java/com/flowci/core/agent/domain/SaveAgentHost.java @@ -29,7 +29,7 @@ @Getter @Setter -public class CreateOrUpdateSshAgentHost { +public class SaveAgentHost { private String id; @@ -41,15 +41,14 @@ public class CreateOrUpdateSshAgentHost { @NotEmpty private String name; - @NotEmpty private String secret; - @NotEmpty private String user; - @NotEmpty private String ip; + private String namespace; + @Min(1) @Max(Integer.MAX_VALUE) private int port = 22; @@ -63,10 +62,10 @@ public AgentHost toObj() { SshAgentHost host = new SshAgentHost(); host.setId(id); host.setName(name); + host.setTags(tags); host.setSecret(secret); host.setUser(user); host.setIp(ip); - host.setTags(tags); host.setMaxSize(maxSize); host.setPort(port); return host; @@ -76,6 +75,18 @@ public AgentHost toObj() { LocalUnixAgentHost host = new LocalUnixAgentHost(); host.setId(id); host.setName(name); + host.setTags(tags); + host.setMaxSize(maxSize); + return host; + } + + if (type == AgentHost.Type.K8s) { + K8sAgentHost host = new K8sAgentHost(); + host.setId(id); + host.setName(name); + host.setTags(tags); + host.setSecret(secret); + host.setNamespace(namespace); host.setMaxSize(maxSize); return host; } diff --git a/core/src/main/java/com/flowci/core/agent/domain/ShellIn.java b/core/src/main/java/com/flowci/core/agent/domain/ShellIn.java index 4eea94d65..bdbf842fe 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/ShellIn.java +++ b/core/src/main/java/com/flowci/core/agent/domain/ShellIn.java @@ -36,13 +36,13 @@ public final class ShellIn extends CmdIn { @JsonIgnore private String condition; - private List scripts = new LinkedList<>(); + private List scripts; private int timeout = 1800; - private Vars inputs = new StringVars(); + private Vars inputs; - private Set envFilters = new LinkedHashSet<>(); + private Set envFilters; public ShellIn() { super(Type.SHELL); diff --git a/core/src/main/java/com/flowci/core/agent/domain/ShellLog.java b/core/src/main/java/com/flowci/core/agent/domain/ShellLog.java new file mode 100644 index 000000000..122ed3467 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/domain/ShellLog.java @@ -0,0 +1,15 @@ +package com.flowci.core.agent.domain; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public final class ShellLog { + + private String jobId; + + private String stepId; + + private String log; +} diff --git a/core/src/main/java/com/flowci/core/agent/domain/TtyCmd.java b/core/src/main/java/com/flowci/core/agent/domain/TtyCmd.java index bc3981337..50f96f666 100644 --- a/core/src/main/java/com/flowci/core/agent/domain/TtyCmd.java +++ b/core/src/main/java/com/flowci/core/agent/domain/TtyCmd.java @@ -54,6 +54,6 @@ public final static class Log { private String id; - private String content; + private String log; } } diff --git a/core/src/main/java/com/flowci/core/agent/domain/Util.java b/core/src/main/java/com/flowci/core/agent/domain/Util.java new file mode 100644 index 000000000..bbf3c10ac --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/domain/Util.java @@ -0,0 +1,14 @@ +package com.flowci.core.agent.domain; + +public abstract class Util { + + private static final String LockPathSuffix = "-lock"; + + public static String getZkLockPath(String root, Agent agent) { + return root + Agent.PATH_SLASH + agent.getId() + LockPathSuffix; + } + + public static String getAgentIdFromLockPath(String lockPath) { + return lockPath.replace(LockPathSuffix, ""); + } +} diff --git a/core/src/main/java/com/flowci/core/agent/domain/Variables.java b/core/src/main/java/com/flowci/core/agent/domain/Variables.java deleted file mode 100644 index 2be6844cf..000000000 --- a/core/src/main/java/com/flowci/core/agent/domain/Variables.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.flowci.core.agent.domain; - -public abstract class Variables { - - public static final String SERVER_URL = "FLOWCI_SERVER_URL"; - - public static final String AGENT_TOKEN = "FLOWCI_AGENT_TOKEN"; - - public static final String AGENT_LOG_LEVEL = "FLOWCI_AGENT_LOG_LEVEL"; - - public static final String AGENT_VOLUMES = "FLOWCI_AGENT_VOLUMES"; - - public static final String AGENT_WORKSPACE = "FLOWCI_AGENT_WORKSPACE"; -} diff --git a/core/src/main/java/com/flowci/core/agent/event/AgentCreatedEvent.java b/core/src/main/java/com/flowci/core/agent/event/AgentCreatedEvent.java index fcc58d05e..e0a65c26d 100644 --- a/core/src/main/java/com/flowci/core/agent/event/AgentCreatedEvent.java +++ b/core/src/main/java/com/flowci/core/agent/event/AgentCreatedEvent.java @@ -16,8 +16,7 @@ package com.flowci.core.agent.event; -import com.flowci.core.agent.domain.AgentHost; -import com.flowci.domain.Agent; +import com.flowci.core.agent.domain.Agent; import lombok.Getter; import org.springframework.context.ApplicationEvent; @@ -26,11 +25,8 @@ public class AgentCreatedEvent extends ApplicationEvent { private final Agent agent; - private final AgentHost host; - - public AgentCreatedEvent(Object source, Agent agent, AgentHost host) { + public AgentCreatedEvent(Object source, Agent agent) { super(source); this.agent = agent; - this.host = host; } } diff --git a/core/src/main/java/com/flowci/core/agent/event/AgentIdleEvent.java b/core/src/main/java/com/flowci/core/agent/event/AgentIdleEvent.java new file mode 100644 index 000000000..d069878c4 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/event/AgentIdleEvent.java @@ -0,0 +1,23 @@ +package com.flowci.core.agent.event; + +import com.flowci.core.common.event.BroadcastEvent; +import com.flowci.core.agent.domain.Agent; +import lombok.Getter; + +/** + * Has a idle agent + */ +@Getter +public class AgentIdleEvent extends BroadcastEvent { + + private Agent agent; + + public AgentIdleEvent() { + super(); + } + + public AgentIdleEvent(Object source, Agent agent) { + super(source); + this.agent = agent; + } +} diff --git a/core/src/main/java/com/flowci/core/agent/event/AgentStatusEvent.java b/core/src/main/java/com/flowci/core/agent/event/AgentStatusEvent.java index 5fc7dae02..811e95d61 100644 --- a/core/src/main/java/com/flowci/core/agent/event/AgentStatusEvent.java +++ b/core/src/main/java/com/flowci/core/agent/event/AgentStatusEvent.java @@ -16,7 +16,7 @@ package com.flowci.core.agent.event; -import com.flowci.domain.Agent; +import com.flowci.core.agent.domain.Agent; import lombok.Getter; import org.springframework.context.ApplicationEvent; diff --git a/core/src/main/java/com/flowci/core/agent/event/CmdSentEvent.java b/core/src/main/java/com/flowci/core/agent/event/CmdSentEvent.java index e3f36afbf..4af093053 100644 --- a/core/src/main/java/com/flowci/core/agent/event/CmdSentEvent.java +++ b/core/src/main/java/com/flowci/core/agent/event/CmdSentEvent.java @@ -17,7 +17,7 @@ package com.flowci.core.agent.event; import com.flowci.core.agent.domain.CmdIn; -import com.flowci.domain.Agent; +import com.flowci.core.agent.domain.Agent; import lombok.Getter; import org.springframework.context.ApplicationEvent; diff --git a/core/src/main/java/com/flowci/core/agent/event/EventFromClient.java b/core/src/main/java/com/flowci/core/agent/event/EventFromClient.java new file mode 100644 index 000000000..e1756b111 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/event/EventFromClient.java @@ -0,0 +1,19 @@ +package com.flowci.core.agent.event; + +import lombok.Getter; +import org.springframework.context.ApplicationEvent; +import org.springframework.web.socket.WebSocketSession; + +@Getter +public abstract class EventFromClient extends ApplicationEvent { + + private final String token; + + private final WebSocketSession session; + + protected EventFromClient(Object source, String token, WebSocketSession session) { + super(source); + this.token = token; + this.session = session; + } +} diff --git a/core/src/main/java/com/flowci/core/agent/event/OnCmdOutEvent.java b/core/src/main/java/com/flowci/core/agent/event/OnCmdOutEvent.java new file mode 100644 index 000000000..f76ca6337 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/event/OnCmdOutEvent.java @@ -0,0 +1,14 @@ +package com.flowci.core.agent.event; + +import lombok.Getter; + +public class OnCmdOutEvent extends EventFromClient { + + @Getter + private final byte[] raw; + + public OnCmdOutEvent(Object source, byte[] raw) { + super(source, null, null); + this.raw = raw; + } +} diff --git a/core/src/main/java/com/flowci/core/agent/event/OnConnectedEvent.java b/core/src/main/java/com/flowci/core/agent/event/OnConnectedEvent.java new file mode 100644 index 000000000..bacd20758 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/event/OnConnectedEvent.java @@ -0,0 +1,16 @@ +package com.flowci.core.agent.event; + +import com.flowci.core.agent.domain.AgentInit; +import lombok.Getter; +import org.springframework.web.socket.WebSocketSession; + +@Getter +public class OnConnectedEvent extends EventFromClient { + + private final AgentInit init; + + public OnConnectedEvent(Object source, String token, WebSocketSession session, AgentInit init) { + super(source, token, session); + this.init = init; + } +} diff --git a/core/src/main/java/com/flowci/core/agent/event/OnDisconnectedEvent.java b/core/src/main/java/com/flowci/core/agent/event/OnDisconnectedEvent.java new file mode 100644 index 000000000..0646b7cfa --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/event/OnDisconnectedEvent.java @@ -0,0 +1,10 @@ +package com.flowci.core.agent.event; + +import org.springframework.web.socket.WebSocketSession; + +public class OnDisconnectedEvent extends EventFromClient { + + public OnDisconnectedEvent(Object source, String token, WebSocketSession session) { + super(source, token, session); + } +} diff --git a/core/src/main/java/com/flowci/core/agent/event/OnShellLogEvent.java b/core/src/main/java/com/flowci/core/agent/event/OnShellLogEvent.java new file mode 100644 index 000000000..45dc24807 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/event/OnShellLogEvent.java @@ -0,0 +1,20 @@ +package com.flowci.core.agent.event; + +import lombok.Getter; + +@Getter +public class OnShellLogEvent extends EventFromClient { + + private final String jobId; + + private final String stepId; + + private final String b64Log; + + public OnShellLogEvent(Object source, String jobId, String stepId, String b64Log) { + super(source, null, null); + this.jobId = jobId; + this.stepId = stepId; + this.b64Log = b64Log; + } +} diff --git a/core/src/main/java/com/flowci/core/agent/event/OnTTYLogEvent.java b/core/src/main/java/com/flowci/core/agent/event/OnTTYLogEvent.java new file mode 100644 index 000000000..2f590467a --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/event/OnTTYLogEvent.java @@ -0,0 +1,17 @@ +package com.flowci.core.agent.event; + +import lombok.Getter; + +@Getter +public class OnTTYLogEvent extends EventFromClient { + + private final String ttyId; + + private final String body; + + public OnTTYLogEvent(Object source, String ttyId, String body) { + super(source, null, null); + this.ttyId = ttyId; + this.body = body; + } +} diff --git a/core/src/main/java/com/flowci/core/agent/health/AgentSettingsCheck.java b/core/src/main/java/com/flowci/core/agent/health/AgentSettingsCheck.java deleted file mode 100644 index b622bd830..000000000 --- a/core/src/main/java/com/flowci/core/agent/health/AgentSettingsCheck.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 flow.ci - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.flowci.core.agent.health; - -import com.flowci.domain.Settings; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.actuate.health.AbstractHealthIndicator; -import org.springframework.boot.actuate.health.Health; -import org.springframework.stereotype.Component; - -@Component -public class AgentSettingsCheck extends AbstractHealthIndicator { - - @Autowired - private Settings baseSettings; - - @Override - protected void doHealthCheck(Health.Builder builder) throws Exception { - builder.withDetail("settings", baseSettings); - } -} diff --git a/core/src/main/java/com/flowci/core/agent/manager/AgentEventManager.java b/core/src/main/java/com/flowci/core/agent/manager/AgentEventManager.java new file mode 100644 index 000000000..a61af9f17 --- /dev/null +++ b/core/src/main/java/com/flowci/core/agent/manager/AgentEventManager.java @@ -0,0 +1,179 @@ +package com.flowci.core.agent.manager; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flowci.core.agent.domain.AgentInit; +import com.flowci.core.agent.domain.ShellLog; +import com.flowci.core.agent.domain.TtyCmd; +import com.flowci.core.agent.event.*; +import com.flowci.core.common.domain.StatusCode; +import com.flowci.core.common.domain.http.ResponseMessage; +import com.flowci.core.common.manager.SpringEventManager; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.BinaryMessage; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.BinaryWebSocketHandler; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Handle event from agent via websocket + */ +@Log4j2 +@Component +public class AgentEventManager extends BinaryWebSocketHandler { + + private final static int EventLength = 10; + + private final static String EventConnect = "connect___"; + + private final static String EventCmdOut = "cmd_out___"; + + private final static String EventShellLog = "slog______"; + + private final static String EventTTYLog = "tlog______"; + + private final static String HeaderToken = "Token"; + + private final Map agentSessionStore = new ConcurrentHashMap<>(); + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private SpringEventManager eventManager; + + public void writeMessage(String token, ResponseMessage msg) { + WebSocketSession session = agentSessionStore.get(token); + + try { + byte[] bytes = objectMapper.writeValueAsBytes(msg); + session.sendMessage(new BinaryMessage(bytes)); + } catch (IOException e) { + log.warn("Unable to write response message for agent {}: {}", token, e.getMessage()); + } + } + + public void writeMessage(String token, byte[] bytes) throws IOException { + WebSocketSession session = agentSessionStore.get(token); + if (session == null) { + log.warn("Agent {} not connected", token); + return; + } + + session.sendMessage(new BinaryMessage(bytes)); + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + // ignore, handle connect event on connect event + } + + @Override + protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { + byte[] bytes = message.getPayload().array(); + + if (bytes.length < (EventLength + 2)) { + log.warn("invalid binary message"); + return; + } + + String token = getToken(session); + String event = getEvent(bytes); + byte[] body = getBody(bytes); + + if (EventConnect.equals(event)) { + onConnected(session, token, body); + return; + } + + if (EventCmdOut.equals(event)) { + onCmdOut(token, body); + return; + } + + if (EventShellLog.equals(event)) { + onShellLog(body); + return; + } + + if (EventTTYLog.equals(event)) { + onTtyLog(body); + } + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + // ignore, handled in afterConnectionClosed + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + String token = getToken(session); + eventManager.publish(new OnDisconnectedEvent(this, token, session)); + agentSessionStore.remove(token, session); + } + + private void onConnected(WebSocketSession session, String token, byte[] body) { + try { + AgentInit init = objectMapper.readValue(body, AgentInit.class); + Objects.requireNonNull(init.getStatus(), "Agent status is missing"); + + init.setToken(token); + init.setIp(session.getRemoteAddress() == null ? null : session.getRemoteAddress().toString()); + + eventManager.publish(new OnConnectedEvent(this, token, session, init)); + agentSessionStore.put(token, session); + writeMessage(token, new ResponseMessage(StatusCode.OK, null)); + log.debug("Agent {} is connected with status {}", token, init.getStatus()); + } catch (Exception e) { + log.warn(e); + writeMessage(token, new ResponseMessage(StatusCode.FATAL, e.getMessage(), null)); + } + } + + private void onCmdOut(String token, byte[] body) { + try { + eventManager.publish(new OnCmdOutEvent(this, body)); + log.debug("Agent {} got cmd back: {}", token, new String(body)); + } catch (Exception e) { + log.warn(e); + } + } + + private void onShellLog(byte[] body) { + try { + ShellLog item = objectMapper.readValue(body, ShellLog.class); + eventManager.publish(new OnShellLogEvent(this, item.getJobId(), item.getStepId(), item.getLog())); + } catch (IOException e) { + log.warn(e); + } + } + + private void onTtyLog(byte[] body) { + try { + TtyCmd.Log item = objectMapper.readValue(body, TtyCmd.Log.class); + eventManager.publish(new OnTTYLogEvent(this, item.getId(), item.getLog())); + } catch (IOException e) { + log.warn(e); + } + } + + private static String getToken(WebSocketSession session) { + return session.getHandshakeHeaders().get(HeaderToken).get(0); + } + + private static String getEvent(byte[] bytes) { + return new String(Arrays.copyOf(bytes, EventLength)).trim(); + } + + private static byte[] getBody(byte[] bytes) { + return Arrays.copyOfRange(bytes, EventLength + 1, bytes.length); + } +} diff --git a/core/src/main/java/com/flowci/core/agent/service/AgentHostService.java b/core/src/main/java/com/flowci/core/agent/service/AgentHostService.java index 1da1e26e5..00db29b1c 100644 --- a/core/src/main/java/com/flowci/core/agent/service/AgentHostService.java +++ b/core/src/main/java/com/flowci/core/agent/service/AgentHostService.java @@ -19,7 +19,6 @@ import java.util.List; import com.flowci.core.agent.domain.AgentHost; -import com.flowci.domain.Agent; public interface AgentHostService { diff --git a/core/src/main/java/com/flowci/core/agent/service/AgentHostServiceImpl.java b/core/src/main/java/com/flowci/core/agent/service/AgentHostServiceImpl.java index 64dbd1a57..81dbc5a48 100644 --- a/core/src/main/java/com/flowci/core/agent/service/AgentHostServiceImpl.java +++ b/core/src/main/java/com/flowci/core/agent/service/AgentHostServiceImpl.java @@ -18,15 +18,16 @@ import com.flowci.core.agent.dao.AgentDao; import com.flowci.core.agent.dao.AgentHostDao; -import com.flowci.core.agent.domain.AgentHost; -import com.flowci.core.agent.domain.LocalUnixAgentHost; -import com.flowci.core.agent.domain.SshAgentHost; -import com.flowci.core.agent.event.AgentCreatedEvent; +import com.flowci.core.agent.domain.*; import com.flowci.core.agent.event.AgentHostStatusEvent; import com.flowci.core.common.config.AppProperties; +import com.flowci.core.common.domain.Variables; import com.flowci.core.common.helper.CacheHelper; import com.flowci.core.common.manager.SpringEventManager; +import com.flowci.core.common.manager.SpringTaskManager; +import com.flowci.core.common.service.SettingService; import com.flowci.core.job.event.NoIdleAgentEvent; +import com.flowci.core.secret.domain.KubeConfigSecret; import com.flowci.core.secret.domain.RSASecret; import com.flowci.core.secret.domain.Secret; import com.flowci.core.secret.event.GetSecretEvent; @@ -34,26 +35,21 @@ import com.flowci.docker.ContainerManager; import com.flowci.docker.DockerManager; import com.flowci.docker.DockerSSHManager; -import com.flowci.docker.domain.DockerStartOption; -import com.flowci.docker.domain.SSHOption; -import com.flowci.domain.Agent; +import com.flowci.docker.K8sManager; +import com.flowci.docker.domain.*; import com.flowci.exception.NotAvailableException; import com.flowci.exception.NotFoundException; import com.flowci.util.StringHelper; -import com.flowci.zookeeper.ZookeeperClient; -import com.flowci.zookeeper.ZookeeperException; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; -import com.github.dockerjava.api.model.Container; import com.google.common.base.Preconditions; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; +import org.springframework.core.env.Environment; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -64,33 +60,29 @@ import java.nio.file.Paths; import java.util.*; -import static com.flowci.core.agent.domain.Variables.*; +import static com.flowci.core.secret.domain.Secret.Category.KUBE_CONFIG; import static com.flowci.core.secret.domain.Secret.Category.SSH_RSA; @Log4j2 @Service public class AgentHostServiceImpl implements AgentHostService { - private static final String DefaultImage = "flowci/agent:latest"; - private static final String DefaultWorkspace = "/ws"; private static final String DockerSock = "/var/run/docker.sock"; - private static final String ContainerNamePrefix = "ci-agent"; + private static final String ContainerNamePrefix = "flowci-agent"; - private final Map, OnCreateAndInit> mapping = new HashMap<>(3); + private final Map, HostAdaptor> mapping = new HashMap<>(3); private final Cache poolManagerCache = CacheHelper.createLocalCache(10, 600, new PoolManagerRemover()); - private String collectTaskZkPath; - @Autowired - private AppProperties appProperties; + private Environment environment; @Autowired - private String serverUrl; + private AppProperties appProperties; @Autowired private AgentDao agentDao; @@ -102,23 +94,24 @@ public class AgentHostServiceImpl implements AgentHostService { private SpringEventManager eventManager; @Autowired - private ZookeeperClient zk; + private TaskExecutor appTaskExecutor; @Autowired - private AppProperties.Zookeeper zkProperties; + private DockerManager dockerManager; @Autowired - private TaskExecutor appTaskExecutor; + private SpringTaskManager taskManager; @Autowired - private DockerManager dockerManager; + private AgentService agentService; @Autowired - private AgentService agentService; + private SettingService settingService; { - mapping.put(LocalUnixAgentHost.class, new OnLocalSocketHostCreate()); - mapping.put(SshAgentHost.class, new OnSshHostCreate()); + mapping.put(LocalUnixAgentHost.class, new LocalSocketHostAdaptor()); + mapping.put(SshAgentHost.class, new SshHostAdaptor()); + mapping.put(K8sAgentHost.class, new K8sHostAdaptor()); } //==================================================================== @@ -167,7 +160,7 @@ public void sync(AgentHost host) { } ContainerManager cm = optional.get().getContainerManager(); - List containerList; + List containerList; try { containerList = cm.list(null, ContainerNamePrefix + "*"); @@ -185,7 +178,7 @@ public void sync(AgentHost host) { for (AgentItemWrapper item : containerSet) { try { - List list = cm.list(null, item.getName()); + List list = cm.list(null, item.getName()); if (list.size() > 0) { cm.delete(list.get(0).getId()); } @@ -203,46 +196,44 @@ public boolean start(AgentHost host) { return false; } + log.info("try to start agent from host {}", host.getName()); + List agents = agentDao.findAllByHostId(host.getId()); List startList = new LinkedList<>(); DockerManager dockerManager = manager.get(); ContainerManager cm = dockerManager.getContainerManager(); - + // try to resume if offline, add to start list if resume failed for (Agent agent : agents) { - // add just created agent to list to start later - if (agent.getStatus() == Agent.Status.CREATED) { - startList.add(agent); + if (!agent.isOffline()) { continue; } - // try to resume, add to start list if failed - if (agent.getStatus() == Agent.Status.OFFLINE) { - try { - List list = cm.list(null, getContainerName(agent)); - - // container not exist - if (list.isEmpty()) { - startList.add(agent); - continue; - } - - Container container = list.get(0); - cm.resume(container.getId()); - log.info("Agent {} been resumed", agent.getName()); - return true; - } catch (Exception e) { - log.warn("Unable to resume agent {}", agent.getName()); + try { + List list = cm.list(null, getContainerName(agent)); + + // container not exist + if (list.isEmpty()) { startList.add(agent); + continue; } + + Unit container = list.get(0); + cm.resume(container.getId()); + log.info("Agent {} been resumed", agent.getName()); + return true; + } catch (Exception e) { + log.warn("Unable to resume agent {}", agent.getName()); + startList.add(agent); } } // start from offline, and delete if cannot be started for (Agent agent : startList) { try { - cm.start(buildStartOption(agent)); + StartOption startOption = mapping.get(host.getClass()).buildStartOption(host, agent); + cm.start(startOption); log.info("Agent {} been started", agent.getName()); return true; } catch (Exception e) { @@ -256,12 +247,19 @@ public boolean start(AgentHost host) { // create new agent if (agents.size() < host.getMaxSize()) { - String name = String.format("%s-%s", host.getName(), StringHelper.randomString(5)); + String random = StringHelper.randomString(5); + String name = String.format("%s-%s", host.getName(), random); + Agent agent = null; try { agent = agentService.create(name, host.getTags(), Optional.of(host.getId())); - cm.start(buildStartOption(agent)); - eventManager.publish(new AgentCreatedEvent(this, agent, host)); + + StartOption startOption = mapping.get(host.getClass()).buildStartOption(host, agent); + String cid = cm.start(startOption); + + agent.setContainerId(cid); + agentService.update(agent, Agent.Status.STARTING); + log.info("Agent {} been created and started", name); return true; } catch (Exception e) { @@ -300,7 +298,27 @@ public void testConn(AgentHost host) { @Override public void collect(AgentHost host) { - // TODO: not implemented + Optional optional = getDockerManager(host); + if (!optional.isPresent()) { + log.warn("unable to collect agents in host {} since fail to get pool manager", host.getName()); + return; + } + + DockerManager manager = optional.get(); + List agents = agentDao.findAllByHostId(host.getId()); + + for (Agent agent : agents) { + // delete agent if not started after 60 seconds + if (agent.isStartingOver(60)) { + try { + manager.getContainerManager().delete(agent.getContainerId()); + agentService.delete(agent); + log.info("Agent {} is collected since not started over 60 seconds", agent.getName()); + } catch (Exception e) { + log.warn("failed to collect agent {} : {}", agent.getName(), e.getMessage()); + } + } + } } @Override @@ -318,8 +336,8 @@ public void removeAll(AgentHost host) { ContainerManager cm = optional.get().getContainerManager(); try { - List containers = cm.list(null, ContainerNamePrefix + "*"); - for (Container c : containers) { + List containers = cm.list(null, ContainerNamePrefix + "*"); + for (Unit c : containers) { cm.delete(c.getId()); } } catch (Exception e) { @@ -329,19 +347,11 @@ public void removeAll(AgentHost host) { @Scheduled(cron = "0 0/5 * * * ?") public void scheduleCollect() { - try { - if (!lock()) { - return; - } - - log.info("Start to collect agents from host"); + taskManager.run("agent-host-collect", () -> { for (AgentHost host : list()) { collect(host); } - log.info("Collection finished"); - } finally { - clean(); - } + }); } //==================================================================== @@ -350,7 +360,6 @@ public void scheduleCollect() { @EventListener public void onContextReady(ContextRefreshedEvent event) { - initZkNodeForCronTask(); autoCreateLocalAgentHost(); syncAgents(); } @@ -376,27 +385,8 @@ public void onNoIdleAgent(NoIdleAgentEvent event) { // %% Private functions //==================================================================== - private DockerStartOption buildStartOption(Agent agent) { - DockerStartOption option = new DockerStartOption(); - option.setImage(DefaultImage); - option.setName(getContainerName(agent)); - - option.addEnv(SERVER_URL, serverUrl); - option.addEnv(AGENT_TOKEN, agent.getToken()); - option.addEnv(AGENT_LOG_LEVEL, "DEBUG"); - option.addEnv(AGENT_VOLUMES, System.getenv(AGENT_VOLUMES)); - option.addEnv(AGENT_WORKSPACE, DefaultWorkspace); - - option.addBind(DockerSock, DockerSock); - return option; - } - public String getContainerName(Agent agent) { - return String.format("%s.%s", ContainerNamePrefix, agent.getName()); - } - - private void initZkNodeForCronTask() { - collectTaskZkPath = ZKPaths.makePath(zkProperties.getCronRoot(), "agent-host-collect"); + return String.format("%s-%s", ContainerNamePrefix, StringHelper.escapeNumber(agent.getName())); } public void autoCreateLocalAgentHost() { @@ -458,39 +448,87 @@ private void updateAgentHostStatus(AgentHost host, AgentHost.Status newStatus) { eventManager.publish(new AgentHostStatusEvent(this, host)); } - /** - * check zk and lock - */ - private boolean lock() { - try { - zk.create(CreateMode.EPHEMERAL, collectTaskZkPath, null); - return true; - } catch (ZookeeperException e) { - log.warn("Unable to init agent host collect task : {}", e.getMessage()); - return false; - } + //==================================================================== + // %% Private classes + //==================================================================== + + private Secret getSecret(String secret, Secret.Category expected) { + GetSecretEvent event = new GetSecretEvent(this, secret); + eventManager.publish(event); + + Secret c = event.getFetched(); + Preconditions.checkArgument(c != null, "Secret not found"); + Preconditions.checkArgument(c.getCategory() == expected, "Invalid secret category"); + + return c; } - private void clean() { - try { - zk.delete(collectTaskZkPath, false); - } catch (ZookeeperException ignore) { + private interface HostAdaptor { + void create(AgentHost host); + + DockerManager init(AgentHost host) throws Exception; + + StartOption buildStartOption(AgentHost host, Agent agent); + } + + private abstract class AbstractHostAdaptor implements HostAdaptor { + + protected void initStartOption(StartOption option, Agent agent) { + option.setImage(environment.getProperty(Variables.Agent.DockerImage, "flowci/agent")); + option.setName(getContainerName(agent)); + + option.addEnv(Variables.Agent.ServerUrl, settingService.get().getServerUrl()); + option.addEnv(Variables.Agent.Token, agent.getToken()); + option.addEnv(Variables.Agent.LogLevel, System.getenv(Variables.App.LogLevel)); + option.addEnv(Variables.Agent.Volumes, System.getenv(Variables.Agent.Volumes)); + option.addEnv(Variables.Agent.Workspace, DefaultWorkspace); } } - //==================================================================== - // %% Private classes - //==================================================================== + private class K8sHostAdaptor extends AbstractHostAdaptor { - private interface OnCreateAndInit { + @Override + public void create(AgentHost host) { + K8sAgentHost k8sHost = (K8sAgentHost) host; + Preconditions.checkArgument(k8sHost.getSecret() != null, "Secret name must be defined"); + agentHostDao.insert(k8sHost); + } - void create(AgentHost host); + @Override + public DockerManager init(AgentHost host) throws Exception { + K8sAgentHost k8sHost = (K8sAgentHost) host; + KubeConfigSecret secret = (KubeConfigSecret) getSecret(k8sHost.getSecret(), KUBE_CONFIG); + String namespace = k8sHost.getNamespace(); - DockerManager init(AgentHost host) throws Exception; + K8sOption option = new KubeConfigOption(namespace, secret.getContent().getData()); + K8sManager manager = new K8sManager(option); + + // check namespace + if (!manager.hasNamespace()) { + throw new Exception(String.format("namespace '%s' not exist", namespace)); + } + + log.debug("k8s manager initialized"); + return manager; + } + + @Override + public StartOption buildStartOption(AgentHost host, Agent agent) { + PodStartOption option = new PodStartOption(); + initStartOption(option, agent); + + option.setLabel(ContainerNamePrefix); + + option.addEnv(Variables.Agent.K8sEnabled, Boolean.TRUE.toString()); + option.addEnv(Variables.Agent.K8sInCluster, Boolean.TRUE.toString()); + + // TODO: check is deployed in the k8s cluster + return option; + } } - private class OnLocalSocketHostCreate implements OnCreateAndInit { + private class LocalSocketHostAdaptor extends AbstractHostAdaptor { @Override public void create(AgentHost host) { @@ -517,6 +555,14 @@ public DockerManager init(AgentHost host) { return dockerManager; } + @Override + public StartOption buildStartOption(AgentHost host, Agent agent) { + ContainerStartOption option = new ContainerStartOption(); + initStartOption(option, agent); + option.addBind(DockerSock, DockerSock); + return option; + } + private boolean hasCreated() { return agentHostDao.findAllByType(AgentHost.Type.LocalUnixSocket).size() > 0; } @@ -530,7 +576,7 @@ private void deleteIfExist() { } } - private class OnSshHostCreate implements OnCreateAndInit { + private class SshHostAdaptor extends AbstractHostAdaptor { @Override public void create(AgentHost host) { @@ -542,18 +588,19 @@ public void create(AgentHost host) { @Override public DockerManager init(AgentHost host) throws Exception { SshAgentHost sshHost = (SshAgentHost) host; - GetSecretEvent event = new GetSecretEvent(this, sshHost.getSecret()); - eventManager.publish(event); - - Secret c = event.getFetched(); - Preconditions.checkArgument(c != null, "Secret not found"); - Preconditions.checkArgument(c.getCategory() == SSH_RSA, "Invalid credential category"); - - RSASecret rsa = (RSASecret) c; + RSASecret rsa = (RSASecret) getSecret(sshHost.getSecret(), SSH_RSA); SSHOption option = SSHOption.of(rsa.getPrivateKey(), sshHost.getIp(), sshHost.getUser(), sshHost.getPort()); return new DockerSSHManager(option); } + + @Override + public StartOption buildStartOption(AgentHost host, Agent agent) { + ContainerStartOption option = new ContainerStartOption(); + initStartOption(option, agent); + option.addBind(DockerSock, DockerSock); + return option; + } } @AllArgsConstructor(staticName = "of") @@ -576,8 +623,8 @@ public String getName() { return ((Agent) object).getName(); } - if (object instanceof Container) { - return ((Container) object).getNames()[0]; + if (object instanceof Unit) { + return ((Unit) object).getName(); } throw new IllegalArgumentException(); diff --git a/core/src/main/java/com/flowci/core/agent/service/AgentService.java b/core/src/main/java/com/flowci/core/agent/service/AgentService.java index fa156cda6..f6d21e3a9 100644 --- a/core/src/main/java/com/flowci/core/agent/service/AgentService.java +++ b/core/src/main/java/com/flowci/core/agent/service/AgentService.java @@ -16,11 +16,9 @@ package com.flowci.core.agent.service; -import com.flowci.core.agent.domain.AgentInit; import com.flowci.core.agent.domain.CmdIn; import com.flowci.core.job.domain.Job; -import com.flowci.domain.Agent; -import com.flowci.domain.Settings; +import com.flowci.core.agent.domain.Agent; import java.util.List; import java.util.Optional; @@ -32,8 +30,6 @@ */ public interface AgentService { - Settings connect(AgentInit initData); - /** * Get agent by id */ @@ -49,11 +45,6 @@ public interface AgentService { */ Agent getByToken(String token); - /** - * Get zookeeper path - */ - String getPath(Agent agent); - /** * Check agent token is existed */ @@ -67,11 +58,10 @@ public interface AgentService { /** * Find agent by status and tags from database * - * @param status Status * @param tags Agent tags, optional * @throws com.flowci.exception.NotFoundException */ - List find(Agent.Status status, Set tags); + List find(Set tags); /** * Delete agent by token @@ -95,6 +85,7 @@ public interface AgentService { /** * Try to lock agent resource, and set agent status to BUSY + * * @return return agent instance, otherwise return empty */ Optional tryLock(String jobId, String agentId); @@ -119,6 +110,11 @@ public interface AgentService { */ Agent update(String token, Agent.Resource resource); + /** + * Update agent status + */ + Agent update(Agent agent, Agent.Status status); + /** * Dispatch cmd to agent */ diff --git a/core/src/main/java/com/flowci/core/agent/service/AgentServiceImpl.java b/core/src/main/java/com/flowci/core/agent/service/AgentServiceImpl.java index 16b3bfc88..3f7aae20b 100644 --- a/core/src/main/java/com/flowci/core/agent/service/AgentServiceImpl.java +++ b/core/src/main/java/com/flowci/core/agent/service/AgentServiceImpl.java @@ -16,38 +16,33 @@ package com.flowci.core.agent.service; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.flowci.core.agent.dao.AgentDao; import com.flowci.core.agent.domain.AgentInit; import com.flowci.core.agent.domain.CmdIn; -import com.flowci.core.agent.event.AgentStatusEvent; -import com.flowci.core.agent.event.CmdSentEvent; +import com.flowci.core.agent.domain.Util; +import com.flowci.core.agent.event.*; +import com.flowci.core.agent.manager.AgentEventManager; import com.flowci.core.common.config.AppProperties; import com.flowci.core.common.helper.CipherHelper; import com.flowci.core.common.helper.ThreadHelper; import com.flowci.core.common.manager.SpringEventManager; -import com.flowci.core.common.rabbit.RabbitOperations; +import com.flowci.core.common.manager.SpringTaskManager; import com.flowci.core.job.domain.Job; import com.flowci.core.job.event.NoIdleAgentEvent; import com.flowci.core.job.event.StopJobConsumerEvent; -import com.flowci.domain.Agent; -import com.flowci.domain.Agent.Status; -import com.flowci.domain.Settings; +import com.flowci.core.agent.domain.Agent; +import com.flowci.core.agent.domain.Agent.Status; import com.flowci.exception.DuplicateException; import com.flowci.exception.NotFoundException; import com.flowci.tree.Selector; import com.flowci.util.ObjectsHelper; import com.flowci.zookeeper.ZookeeperClient; import com.flowci.zookeeper.ZookeeperException; -import com.google.common.collect.ImmutableSet; import lombok.extern.log4j.Log4j2; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.zookeeper.CreateMode; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; @@ -58,6 +53,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import static com.flowci.core.agent.domain.Agent.Status.IDLE; +import static com.flowci.core.agent.domain.Agent.Status.OFFLINE; + /** * Manage agent from zookeeper nodes * - The ephemeral node present agent, path is /{root}/{agent id} @@ -69,8 +67,6 @@ @Service public class AgentServiceImpl implements AgentService { - private static final String LockPathSuffix = "-lock"; - private static final long RetryIntervalOnNotFound = 10 * 1000; // 10 seconds @Autowired @@ -82,44 +78,39 @@ public class AgentServiceImpl implements AgentService { @Autowired private AgentDao agentDao; - @Autowired - private RabbitOperations agentQueueManager; - @Autowired private SpringEventManager eventManager; @Autowired - private Settings baseSettings; + private SpringTaskManager taskManager; @Autowired private ObjectMapper objectMapper; + @Autowired + private AgentEventManager agentEventManager; + // key is flow id, private final Map acquireLocks = new ConcurrentHashMap<>(); @PostConstruct - private void init() { - initRootNode(); - initAgentsFromZk(); + public void initAgentStatus() { + taskManager.run("init-agent-status", () -> { + for (Agent agent : agentDao.findAll()) { + if (agent.isStarting() || agent.isOffline()) { + continue; + } + + agent.setStatus(OFFLINE); + agentDao.save(agent); + } + }); } //==================================================================== // %% Public Methods //==================================================================== - @Override - public Settings connect(AgentInit init) { - Agent target = getByToken(init.getToken()); - target.setUrl("http://" + init.getIp() + ":" + init.getPort()); - target.setOs(init.getOs()); - target.setResource(init.getResource()); - agentDao.save(target); - - Settings settings = ObjectsHelper.copy(baseSettings); - settings.setAgent(target); - return settings; - } - @Override public Agent get(String id) { Optional optional = agentDao.findById(id); @@ -158,19 +149,13 @@ public List list() { } @Override - public String getPath(Agent agent) { - String root = zkProperties.getAgentRoot(); - return root + Agent.PATH_SLASH + agent.getId(); - } - - @Override - public List find(Status status, Set tags) { + public List find(Set tags) { List agents; if (ObjectsHelper.hasCollection(tags)) { - agents = agentDao.findAllByStatusAndTagsIn(status, tags); + agents = agentDao.findAllByTagsIn(tags); } else { - agents = agentDao.findAllByStatus(status); + agents = agentDao.findAll(); } return agents; @@ -186,8 +171,7 @@ public Agent delete(String token) { @Override public void delete(Agent agent) { agentDao.delete(agent); - agentQueueManager.delete(agent.getQueueName()); - log.debug("{} has been deleted", agent); + log.debug("{} has been deleted", agent.getName()); } @Override @@ -233,22 +217,17 @@ public Optional acquire(Job job, Function canContinue) { public Optional tryLock(String jobId, String agentId) { // check agent is available form db Agent agent = get(agentId); - if (agent.isBusy()) { return Optional.empty(); } + // lock and set status to busy try { - // check agent status from zk - Status status = getStatusFromZk(agent); - if (status != Status.IDLE) { - return Optional.empty(); - } - - // lock and set status to busy - agent.setJobId(jobId); - String zkLockPath = getLockPath(agent); - zk.lock(zkLockPath, path -> updateAgentStatus(agent, Status.BUSY)); + String zkLockPath = Util.getZkLockPath(zkProperties.getAgentRoot(), agent); + zk.lock(zkLockPath, path -> { + agent.setJobId(jobId); + update(agent, Status.BUSY); + }); return Optional.of(agent); } catch (ZookeeperException e) { log.debug(e); @@ -259,12 +238,15 @@ public Optional tryLock(String jobId, String agentId) { @Override public void tryRelease(String agentId) { Agent agent = get(agentId); - if (agent.isIdle()) { - return; - } - agent.setJobId(null); - updateAgentStatus(agent, Status.IDLE); + + switch (agent.getStatus()) { + case OFFLINE: + update(agent, OFFLINE); + return; + case BUSY: + update(agent, IDLE); + } } @Override @@ -283,13 +265,10 @@ public Agent create(String name, Set tags, Optional hostId) { try { agentDao.insert(agent); - agentQueueManager.declare(agent.getQueueName(), false); + eventManager.publish(new AgentCreatedEvent(this, agent)); return agent; } catch (DuplicateKeyException e) { throw new DuplicateException("Agent name {0} is already defined", name); - } catch (IOException e) { - log.warn("Unable to declare agent queue, cause {}", e.getMessage()); - return agent; } } @@ -314,14 +293,28 @@ public Agent update(String token, Agent.Resource resource) { return agent; } + @Override + public Agent update(Agent agent, Status status) { + if (agent.getStatus() == status) { + agentDao.save(agent); + return agent; + } + + agent.setStatus(status); + agentDao.save(agent); + + eventManager.publish(new AgentStatusEvent(this, agent)); + return agent; + } + @Override public void dispatch(CmdIn cmd, Agent agent) { try { byte[] body = objectMapper.writeValueAsBytes(cmd); - agentQueueManager.send(agent.getQueueName(), body); + agentEventManager.writeMessage(agent.getToken(), body); eventManager.publish(new CmdSentEvent(this, agent, cmd)); - } catch (JsonProcessingException e) { - e.printStackTrace(); + } catch (IOException e) { + log.warn(e); } } @@ -329,8 +322,46 @@ public void dispatch(CmdIn cmd, Agent agent) { // %% Spring Event Listener //==================================================================== + @EventListener(ContextRefreshedEvent.class) + public void lockNodeCleanup() { + List children = zk.children(zkProperties.getAgentRoot()); + for (String path : children) { + String agentId = Util.getAgentIdFromLockPath(path); + Optional optional = agentDao.findById(agentId); + + if (!optional.isPresent()) { + try { + zk.delete(path, true); + } catch (Throwable ignore) { + } + } + } + } + @EventListener - public void notifyToFindAvailableAgent(AgentStatusEvent event) { + public void onConnected(OnConnectedEvent event) { + Agent target = getByToken(event.getToken()); + AgentInit init = event.getInit(); + + target.setK8sCluster(init.getK8sCluster()); + target.setUrl("http://" + init.getIp() + ":" + init.getPort()); + target.setOs(init.getOs()); + target.setResource(init.getResource()); + + update(target, init.getStatus()); + syncLockNode(target, true); + } + + @EventListener + public void onDisconnected(OnDisconnectedEvent event) { + Agent target = getByToken(event.getToken()); + + update(target, OFFLINE); + syncLockNode(target, false); + } + + @EventListener + public void notifyToFindAgent(AgentStatusEvent event) { Agent agent = event.getAgent(); if (!agent.hasJob()) { @@ -341,7 +372,12 @@ public void notifyToFindAvailableAgent(AgentStatusEvent event) { return; } - // notify all consumer to find agent + eventManager.publish(new AgentIdleEvent(this, agent)); + } + + @EventListener + public void doNotifyToFindAgent(AgentIdleEvent event) { + Agent agent = event.getAgent(); acquireLocks.computeIfPresent(agent.getJobId(), (s, lock) -> { ThreadHelper.notifyAll(lock); return lock; @@ -363,87 +399,10 @@ public void stopJobsThatWaitingForAgent(StopJobConsumerEvent event) { // %% Private methods //==================================================================== - /** - * Get agent id from zookeeper path - *

- * Ex: /agents/123123, should get 123123 - */ - private static String getAgentIdFromPath(String path) { - int index = path.lastIndexOf(Agent.PATH_SLASH); - return path.substring(index + 1); - } - - private void initRootNode() { - String root = zkProperties.getAgentRoot(); + private void syncLockNode(Agent agent, boolean isCreate) { + String lockPath = Util.getZkLockPath(zkProperties.getAgentRoot(), agent); - try { - zk.create(CreateMode.PERSISTENT, root, null); - } catch (ZookeeperException ignore) { - - } - - try { - zk.watchChildren(root, new RootNodeListener()); - } catch (ZookeeperException e) { - log.error(e.getMessage()); - } - } - - private void initAgentsFromZk() { - for (Agent agent : agentDao.findAll()) { - String zkPath = getPath(agent); - String zkLockPath = getLockPath(agent); - - // set to offline if zk node not exist - if (!zk.exist(zkPath)) { - agent.setStatus(Status.OFFLINE); - agentDao.save(agent); - zk.delete(zkLockPath, false); - continue; - } - - // sync status and lock node - Status status = getStatusFromZk(agent); - agent.setStatus(status); - agentDao.save(agent); - syncLockNode(agent, Type.CHILD_ADDED); - } - } - - /** - * Update agent status from ZK and DB - * - * @param agent target agent - * @param status new status - */ - private void updateAgentStatus(Agent agent, Status status) { - if (agent.getStatus() == status) { - agentDao.save(agent); - return; - } - - agent.setStatus(status); - - try { - // try update zookeeper status if new status not same with zk - Status current = getStatusFromZk(agent); - if (current != status) { - zk.set(getPath(agent), status.getBytes()); - } - } catch (ZookeeperException e) { - // set agent to offline when zk exception - agent.setStatus(Status.OFFLINE); - log.warn("Unable to update status on zk node: {}", e.getMessage()); - } finally { - agentDao.save(agent); - eventManager.publish(new AgentStatusEvent(this, agent)); - } - } - - private void syncLockNode(Agent agent, Type type) { - String lockPath = getLockPath(agent); - - if (type == Type.CHILD_ADDED || type == Type.CHILD_UPDATED || type == Type.CONNECTION_RECONNECTED) { + if (isCreate) { try { zk.create(CreateMode.PERSISTENT, lockPath, null); } catch (Throwable ignore) { @@ -452,26 +411,15 @@ private void syncLockNode(Agent agent, Type type) { return; } - if (type == Type.CHILD_REMOVED) { - try { - zk.delete(lockPath, true); - } catch (Throwable ignore) { + try { + zk.delete(lockPath, true); + } catch (Throwable ignore) { - } } } - private String getLockPath(Agent agent) { - return getPath(agent) + LockPathSuffix; - } - - private Status getStatusFromZk(Agent agent) { - byte[] statusInBytes = zk.get(getPath(agent)); - return Status.fromBytes(statusInBytes); - } - private Optional acquire(String jobId, Selector selector) { - List agents = find(Agent.Status.IDLE, selector.getLabel()); + List agents = find(selector.getLabel()); if (agents.isEmpty()) { return Optional.empty(); @@ -482,8 +430,11 @@ private Optional acquire(String jobId, Selector selector) { // try to lock it while (availableList.hasNext()) { Agent agent = availableList.next(); - Optional locked = tryLock(jobId, agent.getId()); + if (!agent.isIdle()) { + continue; + } + Optional locked = tryLock(jobId, agent.getId()); if (locked.isPresent()) { return locked; } @@ -502,59 +453,4 @@ private static class AcquireLock { private boolean stop = false; } - - private class RootNodeListener implements PathChildrenCacheListener { - - private final Set ChildOperations = ImmutableSet.of( - Type.CHILD_ADDED, - Type.CHILD_REMOVED, - Type.CHILD_UPDATED - ); - - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { - if (ChildOperations.contains(event.getType())) { - handleAgentStatusChange(event); - } - } - - private void handleAgentStatusChange(PathChildrenCacheEvent event) { - String path = event.getData().getPath(); - - // TODO: handle status change only within one node - - // do not handle event from lock node - if (path.endsWith(LockPathSuffix)) { - log.debug("Lock node '{}' event '{}' received", path, event.getType()); - return; - } - - String agentId = getAgentIdFromPath(path); - Agent agent = get(agentId); - - if (event.getType() == Type.CHILD_ADDED) { - syncLockNode(agent, Type.CHILD_ADDED); - - // status is reported from agent - Status status = getStatusFromZk(agent); - updateAgentStatus(agent, status); - log.debug("Event '{}' of agent '{}' with status '{}'", event.getType(), agent.getName(), Status.IDLE); - return; - } - - if (event.getType() == Type.CHILD_REMOVED) { - syncLockNode(agent, Type.CHILD_REMOVED); - updateAgentStatus(agent, Status.OFFLINE); - log.debug("Event '{}' of agent '{}' with status '{}'", event.getType(), agent.getName(), - Status.OFFLINE); - return; - } - - if (event.getType() == Type.CONNECTION_RECONNECTED) { - Status status = getStatusFromZk(agent); - updateAgentStatus(agent, status); - log.debug("Event '{}' of agent '{}' with status '{}'", event.getType(), agent.getName(), status); - } - } - } } diff --git a/core/src/main/java/com/flowci/core/auth/config/AuthConfig.java b/core/src/main/java/com/flowci/core/auth/config/AuthConfig.java index d2f23bd21..e3c5504b3 100644 --- a/core/src/main/java/com/flowci/core/auth/config/AuthConfig.java +++ b/core/src/main/java/com/flowci/core/auth/config/AuthConfig.java @@ -19,55 +19,22 @@ import com.flowci.core.agent.domain.AgentAction; import com.flowci.core.agent.domain.AgentHostAction; import com.flowci.core.auth.domain.PermissionMap; -import com.flowci.core.common.config.AppProperties; +import com.flowci.core.common.domain.Settings; import com.flowci.core.config.domain.ConfigAction; -import com.flowci.core.secret.domain.SecretAction; import com.flowci.core.flow.domain.FlowAction; import com.flowci.core.job.domain.JobAction; +import com.flowci.core.secret.domain.SecretAction; import com.flowci.core.user.domain.User; import com.flowci.core.user.domain.UserAction; -import com.github.benmanes.caffeine.cache.Caffeine; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cache.Cache; -import org.springframework.cache.caffeine.CaffeineCache; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.concurrent.TimeUnit; - /** * @author yang */ @Configuration public class AuthConfig { - private static final String CACHE_ONLINE = "online_users"; - - private static final String CACHE_REFRESH_TOKEN = "refresh_tokens"; - - private static final long MaxCacheSize = 500; - - @Autowired - private AppProperties.Auth authProperties; - - @Bean - public Cache onlineUsersCache() { - return new CaffeineCache(CACHE_ONLINE, - Caffeine.newBuilder() - .maximumSize(MaxCacheSize) - .expireAfterWrite(authProperties.getExpireSeconds(), TimeUnit.SECONDS) - .build()); - } - - @Bean - public Cache refreshTokenCache() { - return new CaffeineCache(CACHE_REFRESH_TOKEN, - Caffeine.newBuilder() - .maximumSize(MaxCacheSize) - .expireAfterWrite(authProperties.getRefreshExpiredSeconds(), TimeUnit.SECONDS) - .build()); - } - @Bean public PermissionMap actionMap() { PermissionMap permissionMap = new PermissionMap(); @@ -80,6 +47,7 @@ public PermissionMap actionMap() { permissionMap.add(User.Role.Admin, AgentHostAction.ALL); permissionMap.add(User.Role.Admin, UserAction.ALL); permissionMap.add(User.Role.Admin, ConfigAction.ALL); + permissionMap.add(User.Role.Admin, Settings.Action.ALL); // developer permissionMap.add(User.Role.Developer, @@ -89,6 +57,7 @@ public PermissionMap actionMap() { permissionMap.add(User.Role.Developer, AgentAction.GET, AgentAction.LIST); permissionMap.add(User.Role.Developer, AgentHostAction.GET, AgentHostAction.LIST); permissionMap.add(User.Role.Developer, UserAction.CHANGE_PASSWORD, UserAction.UPDATE_AVATAR); + permissionMap.add(User.Role.Developer, Settings.Action.GET); return permissionMap; } diff --git a/core/src/main/java/com/flowci/core/auth/dao/CustomUserAuthDao.java b/core/src/main/java/com/flowci/core/auth/dao/CustomUserAuthDao.java new file mode 100644 index 000000000..3717d86e0 --- /dev/null +++ b/core/src/main/java/com/flowci/core/auth/dao/CustomUserAuthDao.java @@ -0,0 +1,8 @@ +package com.flowci.core.auth.dao; + +public interface CustomUserAuthDao { + + void update(String id, String token); + + void update(String id, String token, String refreshToken); +} diff --git a/core/src/main/java/com/flowci/core/auth/dao/CustomUserAuthDaoImpl.java b/core/src/main/java/com/flowci/core/auth/dao/CustomUserAuthDaoImpl.java new file mode 100644 index 000000000..26b3d5696 --- /dev/null +++ b/core/src/main/java/com/flowci/core/auth/dao/CustomUserAuthDaoImpl.java @@ -0,0 +1,31 @@ +package com.flowci.core.auth.dao; + +import com.flowci.core.auth.domain.UserAuth; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.query.Update; + +import static org.springframework.data.mongodb.core.query.Criteria.where; +import static org.springframework.data.mongodb.core.query.Query.query; + +public class CustomUserAuthDaoImpl implements CustomUserAuthDao { + + @Autowired + private MongoOperations operations; + + @Override + public void update(String id, String token) { + operations.findAndModify( + query(where("_id").is(id)), + new Update().set("token", token), + UserAuth.class); + } + + @Override + public void update(String id, String token, String refreshToken) { + operations.findAndModify( + query(where("_id").is(id)), + new Update().set("token", token).set("refreshToken", refreshToken), + UserAuth.class); + } +} diff --git a/core/src/main/java/com/flowci/core/auth/dao/UserAuthDao.java b/core/src/main/java/com/flowci/core/auth/dao/UserAuthDao.java new file mode 100644 index 000000000..8ad3429fc --- /dev/null +++ b/core/src/main/java/com/flowci/core/auth/dao/UserAuthDao.java @@ -0,0 +1,17 @@ +package com.flowci.core.auth.dao; + +import com.flowci.core.auth.domain.UserAuth; +import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.stereotype.Repository; + +import java.util.Optional; + +@Repository +public interface UserAuthDao extends CustomUserAuthDao, MongoRepository { + + Optional findByEmail(String email); + + Optional findByToken(String token); + + void deleteByEmail(String email); +} diff --git a/core/src/main/java/com/flowci/core/auth/domain/UserAuth.java b/core/src/main/java/com/flowci/core/auth/domain/UserAuth.java new file mode 100644 index 000000000..ef64a3f09 --- /dev/null +++ b/core/src/main/java/com/flowci/core/auth/domain/UserAuth.java @@ -0,0 +1,21 @@ +package com.flowci.core.auth.domain; + +import com.flowci.core.common.domain.Mongoable; +import lombok.Getter; +import lombok.Setter; +import org.springframework.data.mongodb.core.index.Indexed; +import org.springframework.data.mongodb.core.mapping.Document; + +@Getter +@Setter +@Document("user_auth") +public final class UserAuth extends Mongoable { + + @Indexed(unique = true, name = "index_auth_email") + private String email; + + @Indexed(unique = true, name = "index_auth_token") + private String token; + + private String refreshToken; +} diff --git a/core/src/main/java/com/flowci/core/auth/service/AuthServiceImpl.java b/core/src/main/java/com/flowci/core/auth/service/AuthServiceImpl.java index 0d85edbb4..f254e33fa 100644 --- a/core/src/main/java/com/flowci/core/auth/service/AuthServiceImpl.java +++ b/core/src/main/java/com/flowci/core/auth/service/AuthServiceImpl.java @@ -18,18 +18,20 @@ package com.flowci.core.auth.service; import com.flowci.core.auth.annotation.Action; +import com.flowci.core.auth.dao.UserAuthDao; import com.flowci.core.auth.domain.PermissionMap; import com.flowci.core.auth.domain.Tokens; +import com.flowci.core.auth.domain.UserAuth; import com.flowci.core.auth.helper.JwtHelper; import com.flowci.core.common.config.AppProperties; import com.flowci.core.common.manager.SessionManager; import com.flowci.core.user.domain.User; import com.flowci.core.user.service.UserService; import com.flowci.exception.AuthenticationException; +import com.flowci.exception.NotFoundException; import com.flowci.util.HashingHelper; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cache.Cache; import org.springframework.stereotype.Service; import java.util.Objects; @@ -43,16 +45,13 @@ public class AuthServiceImpl implements AuthService { private AppProperties.Auth authProperties; @Autowired - private UserService userService; + private UserAuthDao userAuthDao; @Autowired - private SessionManager sessionManager; - - @Autowired - private Cache onlineUsersCache; + private UserService userService; @Autowired - private Cache refreshTokenCache; + private SessionManager sessionManager; @Autowired private PermissionMap permissionMap; @@ -70,23 +69,23 @@ public Tokens login(String email, String passwordOnMd5) { throw new AuthenticationException("Invalid password"); } - // create token String token = JwtHelper.create(user, authProperties.getExpireSeconds()); - sessionManager.set(user); - onlineUsersCache.put(email, user); - - // create refresh token String refreshToken = HashingHelper.md5(email + passwordOnMd5); - refreshTokenCache.put(email, refreshToken); + UserAuth auth = new UserAuth(); + auth.setEmail(email); + auth.setToken(token); + auth.setRefreshToken(refreshToken); + save(auth); + + sessionManager.set(user); return new Tokens(token, refreshToken); } @Override public void logout() { User user = sessionManager.remove(); - onlineUsersCache.evict(user.getEmail()); - refreshTokenCache.evict(user.getEmail()); + userAuthDao.deleteByEmail(user.getEmail()); } @Override @@ -107,20 +106,26 @@ public Tokens refresh(Tokens tokens) { String refreshToken = tokens.getRefreshToken(); String email = JwtHelper.decode(token); - if (!Objects.equals(refreshTokenCache.get(email, String.class), refreshToken)) { + Optional auth = userAuthDao.findByEmail(email); + if (!auth.isPresent()) { + throw new AuthenticationException("Not signed in"); + } + + UserAuth userAuth = auth.get(); + if (!Objects.equals(userAuth.getRefreshToken(), refreshToken)) { throw new AuthenticationException("Invalid refresh token"); } // find user from online cache or database - User user = onlineUsersCache.get(email, User.class); + User user = getUser(email); if (Objects.isNull(user)) { - user = userService.getByEmail(email); + throw new AuthenticationException("User not found"); } boolean verify = JwtHelper.verify(token, user, false); if (verify) { String newToken = JwtHelper.create(user, authProperties.getExpireSeconds()); - onlineUsersCache.put(email, user); + userAuthDao.update(userAuth.getId(), newToken); return new Tokens(newToken, refreshToken); } @@ -141,7 +146,7 @@ public boolean set(String token) { public Optional get(String token) { String email = JwtHelper.decode(token); - User user = onlineUsersCache.get(email, User.class); + User user = getUser(email); if (Objects.isNull(user)) { return Optional.empty(); } @@ -153,4 +158,22 @@ public Optional get(String token) { return Optional.empty(); } + + private User getUser(String email) { + try { + return userService.getByEmail(email); + } catch (NotFoundException e) { + return null; + } + } + + private void save(UserAuth auth) { + Optional optional = userAuthDao.findByEmail(auth.getEmail()); + if (optional.isPresent()) { + UserAuth exist = optional.get(); + userAuthDao.update(exist.getId(), auth.getToken(), auth.getRefreshToken()); + return; + } + userAuthDao.insert(auth); + } } diff --git a/core/src/main/java/com/flowci/core/common/adviser/ExceptionAdviser.java b/core/src/main/java/com/flowci/core/common/adviser/ExceptionAdviser.java index 1b27ec89e..c6e479fa6 100644 --- a/core/src/main/java/com/flowci/core/common/adviser/ExceptionAdviser.java +++ b/core/src/main/java/com/flowci/core/common/adviser/ExceptionAdviser.java @@ -34,15 +34,16 @@ */ @Log4j2 @ControllerAdvice({ - "com.flowci.core.auth", - "com.flowci.core.user", - "com.flowci.core.flow", - "com.flowci.core.job", - "com.flowci.core.agent", - "com.flowci.core.stats", - "com.flowci.core.secret", - "com.flowci.core.plugin", - "com.flowci.core.config" + "com.flowci.core.auth", + "com.flowci.core.user", + "com.flowci.core.flow", + "com.flowci.core.job", + "com.flowci.core.agent", + "com.flowci.core.stats", + "com.flowci.core.secret", + "com.flowci.core.plugin", + "com.flowci.core.config", + "com.flowci.core.common.controller" }) public class ExceptionAdviser { @@ -50,8 +51,8 @@ public class ExceptionAdviser { @ResponseBody @ResponseStatus(HttpStatus.OK) @ExceptionHandler({ - MethodArgumentNotValidException.class, - MissingServletRequestParameterException.class + MethodArgumentNotValidException.class, + MissingServletRequestParameterException.class }) public ResponseMessage inputArgumentException(Exception e) { return new ResponseMessage<>(ErrorCode.INVALID_ARGUMENT, e.getMessage(), null); diff --git a/core/src/main/java/com/flowci/core/common/adviser/ResponseMessageAdviser.java b/core/src/main/java/com/flowci/core/common/adviser/ResponseMessageAdviser.java index 28cb492c0..43f5acae9 100644 --- a/core/src/main/java/com/flowci/core/common/adviser/ResponseMessageAdviser.java +++ b/core/src/main/java/com/flowci/core/common/adviser/ResponseMessageAdviser.java @@ -30,15 +30,16 @@ * @author yang */ @ControllerAdvice({ - "com.flowci.core.auth", - "com.flowci.core.user", - "com.flowci.core.flow", - "com.flowci.core.job", - "com.flowci.core.agent", - "com.flowci.core.stats", - "com.flowci.core.secret", - "com.flowci.core.plugin", - "com.flowci.core.config" + "com.flowci.core.auth", + "com.flowci.core.user", + "com.flowci.core.flow", + "com.flowci.core.job", + "com.flowci.core.agent", + "com.flowci.core.stats", + "com.flowci.core.secret", + "com.flowci.core.plugin", + "com.flowci.core.config", + "com.flowci.core.common.controller" }) public class ResponseMessageAdviser implements ResponseBodyAdvice { diff --git a/core/src/main/java/com/flowci/core/common/config/AppConfig.java b/core/src/main/java/com/flowci/core/common/config/AppConfig.java index d317e384a..bfc7a42a7 100644 --- a/core/src/main/java/com/flowci/core/common/config/AppConfig.java +++ b/core/src/main/java/com/flowci/core/common/config/AppConfig.java @@ -22,7 +22,6 @@ import com.flowci.core.common.helper.ThreadHelper; import com.flowci.util.FileHelper; import lombok.extern.log4j.Log4j2; -import org.apache.http.client.utils.URIBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.web.servlet.MultipartProperties; import org.springframework.cache.annotation.EnableCaching; @@ -38,7 +37,6 @@ import javax.annotation.PostConstruct; import java.io.IOException; -import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.Executor; @@ -71,16 +69,6 @@ public void initUploadDir() throws IOException { FileHelper.createDirectory(path); } - @Bean("serverUrl") - public String serverUrl() throws URISyntaxException { - URIBuilder builder = new URIBuilder(appProperties.getUrl()); - String url = builder.toString(); - if (url.endsWith("/")) { - url = url.substring(0, url.length() - 1); - } - return url; - } - @Bean("tmpDir") public Path tmpDir() { return Paths.get(appProperties.getWorkspace().toString(), "tmp"); diff --git a/core/src/main/java/com/flowci/core/common/config/AppProperties.java b/core/src/main/java/com/flowci/core/common/config/AppProperties.java index 4fb6cae13..90605a627 100644 --- a/core/src/main/java/com/flowci/core/common/config/AppProperties.java +++ b/core/src/main/java/com/flowci/core/common/config/AppProperties.java @@ -16,9 +16,7 @@ package com.flowci.core.common.config; -import java.net.URI; -import java.net.URL; -import java.nio.file.Path; +import com.flowci.util.StringHelper; import lombok.Data; import org.hibernate.validator.constraints.Length; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -27,8 +25,10 @@ import org.springframework.context.annotation.PropertySource; import org.springframework.validation.annotation.Validated; -import javax.validation.constraints.Email; import javax.validation.constraints.NotBlank; +import java.net.URI; +import java.net.URL; +import java.nio.file.Path; /** * @author yang @@ -47,9 +47,6 @@ public class AppProperties { // static site resource private Path siteDir; - @NotBlank - private String url; - @NotBlank @Length(max = 16, min = 16) private String secret; @@ -147,9 +144,13 @@ public static class RabbitMQ { private String callbackQueue; - private String shellLogEx; // fanout exchange for shell log + private String shellLogQueue; // fanout exchange for shell log + + private String ttyLogQueue; - private String ttyLogEx; // fanout exchange for tty log + private String wsBroadcastEx; // fanout exchange for websocket broadcast event + + private String eventBroadcastEx; // fanout exchange for event broadcast event private String jobDlQueue; // job dead letter queue @@ -181,4 +182,21 @@ public static class Minio { private String secret; } + + @Data + public static class K8s { + + private String namespace; + + private String pod; + + private String podIp; + + // indicate that is deployed in cluster + public boolean isInCluster() { + return StringHelper.hasValue(namespace) + && StringHelper.hasValue(pod) + && StringHelper.hasValue(podIp); + } + } } diff --git a/core/src/main/java/com/flowci/core/common/config/K8sConfig.java b/core/src/main/java/com/flowci/core/common/config/K8sConfig.java new file mode 100644 index 000000000..c54589ddd --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/config/K8sConfig.java @@ -0,0 +1,18 @@ +package com.flowci.core.common.config; + +import com.flowci.docker.domain.Variables; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class K8sConfig { + + @Bean("k8sProperties") + public AppProperties.K8s k8sProperties() { + AppProperties.K8s props = new AppProperties.K8s(); + props.setNamespace(System.getenv(Variables.NAMESPACE)); + props.setPod(System.getenv(Variables.POD_NAME)); + props.setPodIp(System.getenv(Variables.POD_IP)); + return props; + } +} diff --git a/core/src/main/java/com/flowci/core/common/config/MongoConfig.java b/core/src/main/java/com/flowci/core/common/config/MongoConfig.java index dd9e33706..9e3fc9ca1 100644 --- a/core/src/main/java/com/flowci/core/common/config/MongoConfig.java +++ b/core/src/main/java/com/flowci/core/common/config/MongoConfig.java @@ -17,17 +17,15 @@ package com.flowci.core.common.config; import com.fasterxml.jackson.databind.ObjectMapper; +import com.flowci.core.agent.domain.K8sAgentHost; import com.flowci.core.agent.domain.LocalUnixAgentHost; import com.flowci.core.agent.domain.SshAgentHost; import com.flowci.core.common.mongo.EncryptConverter; import com.flowci.core.common.mongo.VariableMapConverter; -import com.flowci.core.secret.domain.AndroidSign; +import com.flowci.core.secret.domain.*; import com.flowci.core.config.domain.SmtpConfig; import com.flowci.core.config.domain.TextConfig; import com.flowci.core.job.domain.JobItem; -import com.flowci.core.secret.domain.AuthSecret; -import com.flowci.core.secret.domain.RSASecret; -import com.flowci.core.secret.domain.TokenSecret; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import lombok.extern.log4j.Log4j2; @@ -88,9 +86,11 @@ public MongoMappingContext mongoMappingContext() throws ClassNotFoundException { context.addEntity(RSASecret.class); context.addEntity(TokenSecret.class); context.addEntity(AndroidSign.class); + context.addEntity(KubeConfigSecret.class); context.addEntity(LocalUnixAgentHost.class); context.addEntity(SshAgentHost.class); + context.addEntity(K8sAgentHost.class); return context; } diff --git a/core/src/main/java/com/flowci/core/common/config/QueueConfig.java b/core/src/main/java/com/flowci/core/common/config/QueueConfig.java index 08878767b..ccfbe5d0e 100644 --- a/core/src/main/java/com/flowci/core/common/config/QueueConfig.java +++ b/core/src/main/java/com/flowci/core/common/config/QueueConfig.java @@ -17,7 +17,6 @@ package com.flowci.core.common.config; import com.flowci.core.common.helper.ThreadHelper; -import com.flowci.core.common.rabbit.QueueOperations; import com.flowci.core.common.rabbit.RabbitOperations; import com.flowci.util.StringHelper; import com.rabbitmq.client.BuiltinExchangeType; @@ -61,64 +60,51 @@ public Connection rabbitConnection(ThreadPoolTaskExecutor rabbitTaskExecutor) th return factory.newConnection(rabbitTaskExecutor.getThreadPoolExecutor()); } - @Bean("callbackQueueManager") - public QueueOperations callbackQueueManager(Connection rabbitConnection) throws IOException { - String queue = rabbitProperties.getCallbackQueue(); - QueueOperations manager = new QueueOperations(rabbitConnection, 10, queue); - manager.declare(true); + @Bean("jobsQueueManager") + public RabbitOperations jobsQueueManager(Connection rabbitConnection) throws IOException { + RabbitOperations manager = new RabbitOperations(rabbitConnection, 1); + + // setup dead letter queue + String queue = rabbitProperties.getJobDlQueue(); + manager.declare(queue, true); + + Map args = new HashMap<>(0); + String exchange = rabbitProperties.getJobDlExchange(); + manager.declareExchangeAndBind(exchange, BuiltinExchangeType.DIRECT, true, false, args, queue, JobDlRoutingKey); + return manager; } - @Bean("shellLogQueue") - public String getShellLogQ() { - return "shelllog.q." + StringHelper.randomString(8); + @Bean("wsBroadcastQueue") + public String wsBroadcastQueue() { + return "bc.ws.q." + StringHelper.randomString(8); } - @Bean("ttyLogQueue") - public String getTtyLogQ() { - return "ttylog.q." + StringHelper.randomString(8); + @Bean + public String eventBroadcastQueue() { + return "bc.event.q." + StringHelper.randomString(8); } - @Bean("logQueueManager") - public RabbitOperations logQueueManager(Connection rabbitConnection, - String shellLogQueue, - String ttyLogQueue) throws IOException { + @Bean("broadcastQueueManager") + public RabbitOperations broadcastQueueManager(Connection rabbitConnection, + String wsBroadcastQueue, + String eventBroadcastQueue) throws IOException { RabbitOperations manager = new RabbitOperations(rabbitConnection, 10); - manager.declareTemp(shellLogQueue); + manager.declareTemp(wsBroadcastQueue); manager.declareExchangeAndBind( - rabbitProperties.getShellLogEx(), + rabbitProperties.getWsBroadcastEx(), BuiltinExchangeType.FANOUT, - shellLogQueue, + wsBroadcastQueue, StringHelper.EMPTY ); - manager.declareTemp(ttyLogQueue); + manager.declareTemp(eventBroadcastQueue); manager.declareExchangeAndBind( - rabbitProperties.getTtyLogEx(), + rabbitProperties.getEventBroadcastEx(), BuiltinExchangeType.FANOUT, - ttyLogQueue, + eventBroadcastQueue, StringHelper.EMPTY ); return manager; } - - @Bean("jobsQueueManager") - public RabbitOperations jobsQueueManager(Connection rabbitConnection) throws IOException { - RabbitOperations manager = new RabbitOperations(rabbitConnection, 1); - - // setup dead letter queue - String queue = rabbitProperties.getJobDlQueue(); - manager.declare(queue, true); - - Map args = new HashMap<>(0); - String exchange = rabbitProperties.getJobDlExchange(); - manager.declareExchangeAndBind(exchange, BuiltinExchangeType.DIRECT, true, false, args, queue, JobDlRoutingKey); - - return manager; - } - - @Bean("agentQueueManager") - public RabbitOperations agentQueueManager(Connection rabbitConnection) throws IOException { - return new RabbitOperations(rabbitConnection, 1); - } } diff --git a/core/src/main/java/com/flowci/core/common/config/WebConfig.java b/core/src/main/java/com/flowci/core/common/config/WebConfig.java index d8c673fc4..9fbad8dd4 100644 --- a/core/src/main/java/com/flowci/core/common/config/WebConfig.java +++ b/core/src/main/java/com/flowci/core/common/config/WebConfig.java @@ -87,6 +87,7 @@ public void addInterceptors(InterceptorRegistry registry) { .addPathPatterns("/plugins/**") .addPathPatterns("/secrets/**") .addPathPatterns("/configs/**") + .addPathPatterns("/system/**") .addPathPatterns("/auth/logout") .excludePathPatterns("/agents/api/**"); diff --git a/core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java b/core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java index 1ff17fdad..74396c8ec 100644 --- a/core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java +++ b/core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java @@ -16,22 +16,27 @@ package com.flowci.core.common.config; +import com.flowci.core.agent.manager.AgentEventManager; import com.flowci.core.common.helper.ThreadHelper; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; -import org.springframework.web.socket.config.annotation.StompEndpointRegistry; -import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; +import org.springframework.web.socket.config.annotation.*; +import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean; /** * @author yang */ @Configuration +@EnableWebSocket @EnableWebSocketMessageBroker -public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { +public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, WebSocketConfigurer { + + @Autowired + private AgentEventManager agentEventManager; /** * To subscribe git test status update for flow @@ -133,4 +138,18 @@ public void configureClientOutboundChannel(ChannelRegistration registration) { ThreadPoolTaskExecutor executor = ThreadHelper.createTaskExecutor(10, 5, 10, "ws-outbound-"); registration.taskExecutor(executor); } + + @Bean + public ServletServerContainerFactoryBean createWebSocketContainer() { + int bufferSize = 64 * 1024; + ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); + container.setMaxTextMessageBufferSize(bufferSize); + container.setMaxBinaryMessageBufferSize(bufferSize); + return container; + } + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(agentEventManager, "/agent").setAllowedOrigins("*"); + } } diff --git a/core/src/main/java/com/flowci/core/common/config/ZookeeperConfig.java b/core/src/main/java/com/flowci/core/common/config/ZookeeperConfig.java index 7ff7dcaf2..ff44ea2ad 100644 --- a/core/src/main/java/com/flowci/core/common/config/ZookeeperConfig.java +++ b/core/src/main/java/com/flowci/core/common/config/ZookeeperConfig.java @@ -22,6 +22,7 @@ import com.flowci.zookeeper.ZookeeperClient; import com.flowci.zookeeper.ZookeeperException; import lombok.extern.log4j.Log4j2; +import org.apache.zookeeper.CreateMode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -59,6 +60,10 @@ public ZookeeperClient zookeeperClient(TaskExecutor appTaskExecutor) { client = new ZookeeperClient(host, retry, timeout, appTaskExecutor); client.start(); + + initRoots(client, zkProperties.getCronRoot()); + initRoots(client, zkProperties.getAgentRoot()); + return client; } @@ -73,6 +78,14 @@ public void close() { } } + private void initRoots(ZookeeperClient client, String rootPath) { + try { + client.create(CreateMode.PERSISTENT, rootPath, null); + } catch (ZookeeperException ignore) { + + } + } + private void startEmbeddedServer() { Path path = Paths.get(zkProperties.getDataDir()); String address = "0.0.0.0"; diff --git a/core/src/main/java/com/flowci/core/common/controller/SettingsController.java b/core/src/main/java/com/flowci/core/common/controller/SettingsController.java new file mode 100644 index 000000000..a7f1a4559 --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/controller/SettingsController.java @@ -0,0 +1,27 @@ +package com.flowci.core.common.controller; + +import com.flowci.core.auth.annotation.Action; +import com.flowci.core.common.domain.Settings; +import com.flowci.core.common.service.SettingService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequestMapping("/system/settings") +public class SettingsController { + + @Autowired + private SettingService settingService; + + @GetMapping + @Action(Settings.Action.GET) + public Settings get() { + return settingService.get(); + } + + @PostMapping + @Action(Settings.Action.UPDATE) + public void save(@RequestBody Settings settings) { + settingService.save(settings); + } +} diff --git a/core/src/main/java/com/flowci/core/common/dao/SettingsDao.java b/core/src/main/java/com/flowci/core/common/dao/SettingsDao.java new file mode 100644 index 000000000..5ab7d2efd --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/dao/SettingsDao.java @@ -0,0 +1,9 @@ +package com.flowci.core.common.dao; + +import com.flowci.core.common.domain.Settings; +import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface SettingsDao extends MongoRepository { +} diff --git a/core/src/main/java/com/flowci/core/common/domain/Settings.java b/core/src/main/java/com/flowci/core/common/domain/Settings.java new file mode 100644 index 000000000..a51a51abb --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/domain/Settings.java @@ -0,0 +1,43 @@ +package com.flowci.core.common.domain; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.ImmutableList; +import lombok.Getter; +import lombok.Setter; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.List; + +/** + * System settings + */ +@Document +@Getter +@Setter +public class Settings extends Mongoable { + + public static class Action { + + public static final String GET = "get_sys_settings"; + + public static final String UPDATE = "update_sys_settings"; + + public static final List ALL = ImmutableList.of( + GET, + UPDATE + ); + } + + public final static String DefaultId = "system_settings"; + + private String serverUrl; + + public Settings() { + setId(DefaultId); + } + + @JsonIgnore + public String getId() { + return this.id; + } +} diff --git a/core/src/main/java/com/flowci/core/common/domain/Variables.java b/core/src/main/java/com/flowci/core/common/domain/Variables.java index fba52c8f7..a63d7f466 100644 --- a/core/src/main/java/com/flowci/core/common/domain/Variables.java +++ b/core/src/main/java/com/flowci/core/common/domain/Variables.java @@ -23,13 +23,11 @@ public abstract class Variables { public abstract static class App { - public static final String Url = "FLOWCI_SERVER_URL"; + public static final String LogLevel = "FLOWCI_LOG_LEVEL"; - public static final String Host = "FLOWCI_SERVER_HOST"; - - public static final String RabbitHost = "FLOWCI_RABBIT_HOST"; + public static final String ServerUrl = "FLOWCI_SERVER_URL"; - public static final String ZookeeperHost = "FLOWCI_ZOOKEEPER_HOST"; + public static final String Host = "FLOWCI_SERVER_HOST"; } @@ -37,8 +35,6 @@ public abstract static class Flow { public static final String Name = "FLOWCI_FLOW_NAME"; - public static final String Webhook = "FLOWCI_FLOW_WEBHOOK"; - public static final String GitUrl = "FLOWCI_GIT_URL"; // set public static final String GitBranch = "FLOWCI_GIT_BRANCH"; // set @@ -77,10 +73,22 @@ public abstract static class Step { public abstract static class Agent { + public static final String DockerImage = "FLOWCI_AGENT_IMAGE"; + + public static final String ServerUrl = "FLOWCI_SERVER_URL"; + public static final String Token = "FLOWCI_AGENT_TOKEN"; public static final String Workspace = "FLOWCI_AGENT_WORKSPACE"; public static final String PluginDir = "FLOWCI_AGENT_PLUGIN_DIR"; + + public static final String K8sEnabled = "FLOWCI_AGENT_K8S_ENABLED"; + + public static final String K8sInCluster = "FLOWCI_AGENT_K8S_IN_CLUSTER"; + + public static final String LogLevel = "FLOWCI_AGENT_LOG_LEVEL"; + + public static final String Volumes = "FLOWCI_AGENT_VOLUMES"; } } diff --git a/core/src/main/java/com/flowci/core/common/event/BroadcastEvent.java b/core/src/main/java/com/flowci/core/common/event/BroadcastEvent.java new file mode 100644 index 000000000..c5215b18c --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/event/BroadcastEvent.java @@ -0,0 +1,22 @@ +package com.flowci.core.common.event; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.context.ApplicationEvent; + +@Getter +@Setter +public abstract class BroadcastEvent extends ApplicationEvent { + + private final static Object Source = new Object(); + + private boolean internal; + + public BroadcastEvent() { + super(Source); + } + + public BroadcastEvent(Object source) { + super(source); + } +} diff --git a/core/src/main/java/com/flowci/core/common/health/K8s.java b/core/src/main/java/com/flowci/core/common/health/K8s.java new file mode 100644 index 000000000..99163dc53 --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/health/K8s.java @@ -0,0 +1,26 @@ +package com.flowci.core.common.health; + +import com.flowci.core.common.config.AppProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.stereotype.Component; + +@Component +public class K8s extends AbstractHealthIndicator { + + @Autowired + private AppProperties.K8s k8sProperties; + + @Override + protected void doHealthCheck(Health.Builder builder) throws Exception { + Status status = k8sProperties.isInCluster() ? Status.UP : Status.DOWN; + builder.status(status); + if (status == Status.UP) { + builder.withDetail("namespace", k8sProperties.getNamespace()) + .withDetail("pod", k8sProperties.getPod()) + .withDetail("pod ip", k8sProperties.getPodIp()); + } + } +} diff --git a/core/src/main/java/com/flowci/core/common/health/RabbitMQCheck.java b/core/src/main/java/com/flowci/core/common/health/RabbitMQ.java similarity index 96% rename from core/src/main/java/com/flowci/core/common/health/RabbitMQCheck.java rename to core/src/main/java/com/flowci/core/common/health/RabbitMQ.java index dad169028..10a0c8663 100644 --- a/core/src/main/java/com/flowci/core/common/health/RabbitMQCheck.java +++ b/core/src/main/java/com/flowci/core/common/health/RabbitMQ.java @@ -27,7 +27,7 @@ import java.util.Map; @Component -public class RabbitMQCheck extends AbstractHealthIndicator { +public class RabbitMQ extends AbstractHealthIndicator { @Autowired private Connection connection; diff --git a/core/src/main/java/com/flowci/core/common/helper/CacheHelper.java b/core/src/main/java/com/flowci/core/common/helper/CacheHelper.java index adc8bb1f6..6c59b08ac 100644 --- a/core/src/main/java/com/flowci/core/common/helper/CacheHelper.java +++ b/core/src/main/java/com/flowci/core/common/helper/CacheHelper.java @@ -19,6 +19,8 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; +import org.springframework.cache.CacheManager; +import org.springframework.cache.caffeine.CaffeineCacheManager; import java.util.concurrent.TimeUnit; @@ -41,6 +43,17 @@ public static Cache createLocalCache(int maxSize, int expireInSecon .build(); } + public static CacheManager createCacheManager(int expireInSeconds, int maxSize) { + Caffeine caffeine = Caffeine.newBuilder() + .initialCapacity(maxSize) + .maximumSize(maxSize) + .expireAfterWrite(expireInSeconds, TimeUnit.SECONDS); + + CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager(); + caffeineCacheManager.setCaffeine(caffeine); + return caffeineCacheManager; + } + private CacheHelper() { } diff --git a/core/src/main/java/com/flowci/core/common/manager/SocketPushManager.java b/core/src/main/java/com/flowci/core/common/manager/SocketPushManager.java index cd3c303ee..8df659e51 100644 --- a/core/src/main/java/com/flowci/core/common/manager/SocketPushManager.java +++ b/core/src/main/java/com/flowci/core/common/manager/SocketPushManager.java @@ -17,14 +17,20 @@ package com.flowci.core.common.manager; import com.fasterxml.jackson.databind.ObjectMapper; +import com.flowci.core.common.config.AppProperties; import com.flowci.core.common.domain.PushBody; import com.flowci.core.common.domain.PushEvent; +import com.flowci.core.common.rabbit.RabbitOperations; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * @author yang @@ -33,17 +39,41 @@ @Component public class SocketPushManager { + private final static String HeaderTopic = "TOPIC"; + @Autowired private ObjectMapper objectMapper; + @Autowired + private AppProperties.RabbitMQ rabbitProperties; + @Autowired private SimpMessagingTemplate simpMessagingTemplate; + @Autowired + private String wsBroadcastQueue; + + @Autowired + private RabbitOperations broadcastQueueManager; + + @EventListener(ContextRefreshedEvent.class) + public void subscribeBroadcastQueue() throws IOException { + broadcastQueueManager.startConsumer(wsBroadcastQueue, true, (headers, body, envelope) -> { + try { + String topic = headers.get(HeaderTopic).toString(); + simpMessagingTemplate.convertAndSend(topic, body); + } catch (Exception e) { + log.warn(e); + } + return false; + }); + } + public void push(String topic, PushEvent event, Object obj) { try { PushBody push = new PushBody(event, obj); - String json = objectMapper.writeValueAsString(push); - simpMessagingTemplate.convertAndSend(topic, json); + byte[] data = objectMapper.writeValueAsBytes(push); + push(topic, data); } catch (Exception e) { log.warn(e.getMessage()); } @@ -51,7 +81,9 @@ public void push(String topic, PushEvent event, Object obj) { public void push(String topic, byte[] bytes) { try { - simpMessagingTemplate.convertAndSend(topic, bytes); + Map headers = new HashMap<>(1); + headers.put(HeaderTopic, topic); + broadcastQueueManager.sendToEx(rabbitProperties.getWsBroadcastEx(), bytes, headers); } catch (Exception e) { log.warn(e.getMessage()); } diff --git a/core/src/main/java/com/flowci/core/common/manager/SpringEventManagerImpl.java b/core/src/main/java/com/flowci/core/common/manager/SpringEventManagerImpl.java index e0644ca19..57f4d05e2 100644 --- a/core/src/main/java/com/flowci/core/common/manager/SpringEventManagerImpl.java +++ b/core/src/main/java/com/flowci/core/common/manager/SpringEventManagerImpl.java @@ -17,19 +17,80 @@ package com.flowci.core.common.manager; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flowci.core.common.config.AppProperties; +import com.flowci.core.common.event.BroadcastEvent; +import com.flowci.core.common.rabbit.RabbitOperations; +import com.google.common.collect.ImmutableMap; +import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; +import java.io.IOException; +import java.util.Map; + +@Log4j2 @Component("eventManager") public class SpringEventManagerImpl implements SpringEventManager { + private final static String HeaderClass = "CLASS"; + + @Autowired + private AppProperties.RabbitMQ rabbitProperties; + + @Autowired + private ObjectMapper objectMapper; + @Autowired private ApplicationEventPublisher applicationEventPublisher; + @Autowired + private String eventBroadcastQueue; + + @Autowired + private RabbitOperations broadcastQueueManager; + + @EventListener(ContextRefreshedEvent.class) + public void subscribeBroadcastQueue() throws IOException { + broadcastQueueManager.startConsumer(eventBroadcastQueue, true, (headers, body, envelope) -> { + try { + String className = headers.get(HeaderClass).toString(); + BroadcastEvent event = (BroadcastEvent) objectMapper.readValue(body, Class.forName(className)); + event.setInternal(true); + publish(event); + } catch (Exception e) { + log.warn(e); + } + return false; + }); + } + public T publish(T event) { + if (shouldBroadcast(event)) { + try { + byte[] bytes = objectMapper.writeValueAsBytes(event); + Map header = ImmutableMap.of(HeaderClass, event.getClass().getName()); + this.broadcastQueueManager.sendToEx(rabbitProperties.getEventBroadcastEx(), bytes, header); + } catch (JsonProcessingException e) { + log.warn(e); + } + return event; + } + applicationEventPublisher.publishEvent(event); return event; } + + private static boolean shouldBroadcast(T event) { + if (event instanceof BroadcastEvent) { + BroadcastEvent be = (BroadcastEvent) event; + return !be.isInternal(); + } + return false; + } } diff --git a/core/src/main/java/com/flowci/core/common/manager/SpringTaskManager.java b/core/src/main/java/com/flowci/core/common/manager/SpringTaskManager.java new file mode 100644 index 000000000..0e6d42cd8 --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/manager/SpringTaskManager.java @@ -0,0 +1,57 @@ +package com.flowci.core.common.manager; + +import com.flowci.core.common.config.AppProperties; +import com.flowci.zookeeper.ZookeeperClient; +import com.flowci.zookeeper.ZookeeperException; +import lombok.extern.log4j.Log4j2; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * Run task among all duplications + */ +@Log4j2 +@Component +public class SpringTaskManager { + + @Autowired + private ZookeeperClient zk; + + @Autowired + private AppProperties.Zookeeper zkProperties; + + public void run(String name, Runnable task) { + try { + if (!lock(name)) { + return; + } + + log.info("task {} started", name); + task.run(); + log.info("task {} finished", name); + } finally { + release(name); + } + } + + private boolean lock(String name) { + try { + String path = ZKPaths.makePath(zkProperties.getCronRoot(), name); + zk.create(CreateMode.EPHEMERAL, path, null); + return true; + } catch (ZookeeperException e) { + return false; + } + } + + private void release(String name) { + try { + String path = ZKPaths.makePath(zkProperties.getCronRoot(), name); + zk.delete(path, false); + } catch (ZookeeperException ignore) { + + } + } +} diff --git a/core/src/main/java/com/flowci/core/common/rabbit/RabbitOperations.java b/core/src/main/java/com/flowci/core/common/rabbit/RabbitOperations.java index 05f0f18e1..b959fdae3 100644 --- a/core/src/main/java/com/flowci/core/common/rabbit/RabbitOperations.java +++ b/core/src/main/java/com/flowci/core/common/rabbit/RabbitOperations.java @@ -96,6 +96,16 @@ public boolean purge(String queue) { } } + public boolean sendToEx(String ex, byte[] body, Map headers) { + try { + AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build(); + this.channel.basicPublish(ex, StringHelper.EMPTY, props, body); + return true; + } catch (IOException e) { + return false; + } + } + /** * Send to routing key with default exchange */ diff --git a/core/src/main/java/com/flowci/core/common/service/SettingService.java b/core/src/main/java/com/flowci/core/common/service/SettingService.java new file mode 100644 index 000000000..083e4a7b8 --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/service/SettingService.java @@ -0,0 +1,10 @@ +package com.flowci.core.common.service; + +import com.flowci.core.common.domain.Settings; + +public interface SettingService { + + Settings get(); + + void save(Settings settings); +} diff --git a/core/src/main/java/com/flowci/core/common/service/SettingServiceImpl.java b/core/src/main/java/com/flowci/core/common/service/SettingServiceImpl.java new file mode 100644 index 000000000..a6fdbc493 --- /dev/null +++ b/core/src/main/java/com/flowci/core/common/service/SettingServiceImpl.java @@ -0,0 +1,65 @@ +package com.flowci.core.common.service; + +import com.flowci.core.common.dao.SettingsDao; +import com.flowci.core.common.domain.Settings; +import com.flowci.core.common.domain.Variables; +import com.flowci.core.common.manager.SpringTaskManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.Optional; + +@Service +public class SettingServiceImpl implements SettingService { + + @Autowired + private Environment environment; + + @Autowired + private ServerProperties serverProperties; + + @Autowired + private SettingsDao settingsDao; + + @Autowired + private SpringTaskManager taskManager; + + /** + * Set default server url + * value from FLOWCI_SERVER_URL or http://FLOWCI_SERVER_ADDRESS:FLOWCI_SERVER_PORT + */ + @PostConstruct + public void setDefaultValue() { + taskManager.run("init-default-settings", () -> { + Optional optional = settingsDao.findById(Settings.DefaultId); + if (!optional.isPresent()) { + + String address = serverProperties.getAddress().toString().replace("/", ""); + String serverUrl = environment.getProperty( + Variables.App.ServerUrl, + String.format("http://%s:%s", address, serverProperties.getPort()) + ); + + Settings s = new Settings(); + s.setServerUrl(serverUrl); + settingsDao.save(s); + } + }); + } + + @Override + public Settings get() { + return settingsDao.findById(Settings.DefaultId).get(); + } + + @Override + public void save(Settings settings) { + Settings o = get(); + o.setServerUrl(settings.getServerUrl()); + + settingsDao.save(o); + } +} diff --git a/core/src/main/java/com/flowci/core/flow/config/FlowConfig.java b/core/src/main/java/com/flowci/core/flow/config/FlowConfig.java index 6d0e3d8ba..349cab588 100644 --- a/core/src/main/java/com/flowci/core/flow/config/FlowConfig.java +++ b/core/src/main/java/com/flowci/core/flow/config/FlowConfig.java @@ -27,6 +27,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; import java.io.IOException; import java.util.List; @@ -60,4 +62,9 @@ public List