Skip to content

Commit

Permalink
feat: add job verticle to deploy missing entity verticles
Browse files Browse the repository at this point in the history
  • Loading branch information
halber committed Nov 14, 2023
1 parent 559342a commit ff576d4
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 8 deletions.
18 changes: 17 additions & 1 deletion src/main/java/io/neonbee/NeonBee.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static io.neonbee.internal.deploy.Deployables.allTo;
import static io.neonbee.internal.deploy.Deployables.anyTo;
import static io.neonbee.internal.deploy.Deployables.fromDeployables;
import static io.neonbee.internal.helper.ConfigHelper.notFound;
import static io.neonbee.internal.helper.HostHelper.getHostIp;
import static io.neonbee.internal.scanner.DeployableScanner.scanForDeployableClasses;
import static io.vertx.core.Future.all;
Expand Down Expand Up @@ -70,7 +71,9 @@
import io.neonbee.internal.codec.ImmutableJsonObjectMessageCodec;
import io.neonbee.internal.deploy.Deployable;
import io.neonbee.internal.deploy.Deployables;
import io.neonbee.internal.helper.ConfigHelper;
import io.neonbee.internal.helper.FileSystemHelper;
import io.neonbee.internal.job.RedeployEntitiesJob;
import io.neonbee.internal.json.ConfigurableJsonFactory.ConfigurableJsonCodec;
import io.neonbee.internal.json.ImmutableJsonArray;
import io.neonbee.internal.json.ImmutableJsonObject;
Expand Down Expand Up @@ -547,6 +550,7 @@ private Future<Void> deploySystemVerticles() {
}
optionalVerticles.add(deployableWatchVerticle(options.getModelsDirectory(), ModelRefreshVerticle::new));
optionalVerticles.add(deployableWatchVerticle(options.getModulesDirectory(), DeployerVerticle::new));
optionalVerticles.add(deployableRedeployEntitiesJobVerticle(options));

LOGGER.info("Deploying system verticles ...");
return all(List.of(fromDeployables(requiredVerticles).compose(allTo(this)),
Expand All @@ -556,7 +560,8 @@ private Future<Void> deploySystemVerticles() {
}).map(Deployables::new).compose(anyTo(this)))).mapEmpty();
}

private Future<Optional<? extends Deployable>> deployableWatchVerticle(Path dirPath,
private Future<Optional<? extends Deployable>> deployableWatchVerticle(
Path dirPath,
Function<Path, ? extends Verticle> verticleFactory) {
if (options.doNotWatchFiles()) {
return succeededFuture(Optional.empty());
Expand All @@ -574,6 +579,17 @@ private Future<Optional<? extends Deployable>> deployableWatchVerticle(Path dirP
});
}

private Future<Optional<? extends Deployable>> deployableRedeployEntitiesJobVerticle(NeonBeeOptions options) {
if (!options.shouldRedeployEntities()) {
return succeededFuture(Optional.empty());
}

return ConfigHelper.readConfig(vertx, RedeployEntitiesJob.class.getName())
.compose(config -> fromVerticle(vertx, RedeployEntitiesJob.create(config)))
.recover(notFound(() -> fromVerticle(vertx, new RedeployEntitiesJob())))
.map(Optional::of);
}

/**
* Deploy the server verticle handling the endpoints.
*
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/neonbee/NeonBeeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ default Path getLogDirectory() {
*/
boolean doNotWatchFiles();

/**
* Check if NeonBee should redeploy EntityVerticles if there are not deployed.
*
* @return true if NeonBee should redeploy EntityVerticles if there are not deployed, otherwise false.
*/
boolean shouldRedeployEntities();

/**
* Get the port number of the event bus. If not set, a random port will be selected.
*
Expand Down Expand Up @@ -254,6 +261,8 @@ class Mutable implements NeonBeeOptions {

private boolean doNotWatchFiles;

private boolean shouldRedeployEntities;

private Integer serverPort;

private Set<NeonBeeProfile> activeProfiles = parseProfiles(DEFAULT_ACTIVE_PROFILES);
Expand Down Expand Up @@ -414,6 +423,24 @@ public Mutable setDoNotWatchFiles(boolean doNotWatchFiles) {
return this;
}

@Override
public boolean shouldRedeployEntities() {
return shouldRedeployEntities;
}

/**
* Sets whether NeonBee should re-deploy EntityVerticles if there are not deployed.
*
* @param shouldRedeployEntities flag true/false
* @return a reference to this, so the API can be used fluently
*/
@Option(longName = "should-redeploy-entities", shortName = "redeploy-entities", flag = false)
@Description("Set whether to redeploy entity verticles that are not deployed")
public Mutable setShouldRedeployEntities(boolean shouldRedeployEntities) {
this.shouldRedeployEntities = shouldRedeployEntities;
return this;
}

@Override
public boolean isClustered() {
return clustered;
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/neonbee/entity/EntityVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,17 @@ public final String getName() {
// Entity verticle are generally not exposed via any web interface, but only via the event bus. Also, they are
// generally never accessed directly, but only via the shared entity name map, so return a generated name here.
// The name must be unique in the Vert.x instance / cluster and the same for every entity verticle of this type.
return String.format("_%s-%d", getClass().getSimpleName(), getClass().getName().hashCode());
return getName(getClass());
}

/**
* Returns a unique name for a given EntityVerticle class.
*
* @param clazz the EntityVerticle class
* @return a unique name for a given EntityVerticle class
*/
public static String getName(Class<? extends EntityVerticle> clazz) {
return String.format("_%s-%d", clazz.getSimpleName(), clazz.getName().hashCode());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@
*/
public class ClusterEntityRegistry implements Registry<String> {

private static final String QUALIFIED_NAME = "qualifiedName";
/**
* The key for the qualified name.
*/
public static final String QUALIFIED_NAME_KEY = "qualifiedName";

private static final String ENTITY_NAME = "entityName";
/**
* The key for the entity name.
*/
public static final String ENTITY_NAME_KEY = "entityName";

@VisibleForTesting
final WriteSafeRegistry<JsonObject> clusteringInformation;
Expand All @@ -56,7 +62,7 @@ public ClusterEntityRegistry(Vertx vertx, String registryName) {

@VisibleForTesting
static JsonObject clusterRegistrationInformation(String sharedMapKey, String value) {
return JsonObject.of(QUALIFIED_NAME, value, ENTITY_NAME, sharedMapKey);
return JsonObject.of(QUALIFIED_NAME_KEY, value, ENTITY_NAME_KEY, sharedMapKey);
}

@Override
Expand Down Expand Up @@ -96,7 +102,7 @@ public Future<JsonArray> get(String sharedMapKey) {
* @param clusterNodeId the ID of the cluster node
* @return the future
*/
Future<JsonArray> getClusteringInformation(String clusterNodeId) {
public Future<JsonArray> getClusteringInformation(String clusterNodeId) {
return clusteringInformation.get(clusterNodeId);
}

Expand All @@ -123,8 +129,8 @@ public Future<Void> unregisterNode(String clusterNodeId) {
for (Object o : registeredEntities) {
if (remove(map, o)) {
JsonObject jo = (JsonObject) o;
String entityName = jo.getString(ENTITY_NAME);
String qualifiedName = jo.getString(QUALIFIED_NAME);
String entityName = jo.getString(ENTITY_NAME_KEY);
String qualifiedName = jo.getString(QUALIFIED_NAME_KEY);
futureList.add(unregister(entityName, qualifiedName));
}
}
Expand Down
214 changes: 214 additions & 0 deletions src/main/java/io/neonbee/internal/job/RedeployEntitiesJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package io.neonbee.internal.job;

import static io.neonbee.internal.deploy.DeployableVerticle.fromClass;
import static io.neonbee.internal.deploy.Deployables.fromDeployables;
import static io.neonbee.internal.scanner.DeployableScanner.scanForDeployableClasses;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;

import io.neonbee.NeonBee;
import io.neonbee.NeonBeeDeployable;
import io.neonbee.NeonBeeProfile;
import io.neonbee.data.DataContext;
import io.neonbee.entity.EntityVerticle;
import io.neonbee.internal.Registry;
import io.neonbee.internal.cluster.ClusterHelper;
import io.neonbee.internal.cluster.entity.ClusterEntityRegistry;
import io.neonbee.internal.deploy.Deployable;
import io.neonbee.internal.deploy.Deployables;
import io.neonbee.job.JobSchedule;
import io.neonbee.job.JobVerticle;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;

/**
* A job that redeploys all entity verticles that are not deployed in the cluster.
*/
@NeonBeeDeployable(namespace = NeonBeeDeployable.NEONBEE_NAMESPACE, autoDeploy = false)
public class RedeployEntitiesJob extends JobVerticle {

private static final LoggingFacade LOGGER = LoggingFacade.create();

private static final Duration DEFAULT_INTERVAL = Duration.ofMinutes(5L);

/**
* Create a new ReregisterEntitiesJob job verticle with the default configuration.
*/
public RedeployEntitiesJob() {
this(new JobSchedule(DEFAULT_INTERVAL));
}

/**
* Create a new ReregisterEntitiesJob job verticle.
*
* @param schedule the schedule to use when starting this job verticle
*/
public RedeployEntitiesJob(JobSchedule schedule) {
this(schedule, false);
}

/**
* Create a new ReregisterEntitiesJob job verticle. Optionally undeploy the verticle when the job execution ended
* (hit the end instant or one time execution)
*
* @param schedule the schedule to use when starting this job verticle
* @param undeployWhenDone if true, undeploy the verticle when done
*/
public RedeployEntitiesJob(JobSchedule schedule, boolean undeployWhenDone) {
super(schedule, undeployWhenDone);
}

/**
* Create a new ReregisterEntitiesJob job verticle from the given configuration.
*
* @param config the configuration
* @return a new ReregisterEntitiesJob job verticle
*/
public static RedeployEntitiesJob create(JsonObject config) {
boolean undeployWhenDone = config.getBoolean("undeployWhenDone", false);
Duration interval = Duration.parse(config.getString("interval", "PT5M"));
JobSchedule jobSchedule = new JobSchedule(interval);
return new RedeployEntitiesJob(jobSchedule, undeployWhenDone);
}

@Override
public Future<?> execute(DataContext context) {
LOGGER.correlateWith(context).info("Start scanning of missing entities in the cluster");
long startTime = System.currentTimeMillis();

// get the currently deployed entity verticles
NeonBee neonBee = NeonBee.get(getVertx());
Registry<String> entityRegistry = neonBee.getEntityRegistry();
if (entityRegistry instanceof ClusterEntityRegistry) {
LOGGER.correlateWith(context).debug("Getting registered entities from cluster");

ClusterEntityRegistry clusterEntityRegistry = ((ClusterEntityRegistry) entityRegistry);
Future<JsonArray> clusteringInformation = clusterEntityRegistry
.getClusteringInformation(ClusterHelper.getClusterNodeId(vertx))
.onSuccess(event -> LOGGER.correlateWith(context).debug("Got registered entities from cluster"))
.onFailure(error -> LOGGER.correlateWith(context)
.error("Failed getting registered entities from cluster", error));

Future<Map<String, Class<? extends EntityVerticle>>> entitiesFromClassPath = classPathEntityVerticles(vertx)
.onSuccess(event -> LOGGER.correlateWith(context)
.info("Finished re-registering of entities. Took {} ms",
System.currentTimeMillis() - startTime))
.onFailure(error -> LOGGER.correlateWith(context)
.error("Failed reregistering entities", error));

return Future.all(clusteringInformation, entitiesFromClassPath)
.map(compositeFuture -> findMissingEntityVerticles(
context,
entitiesFromClassPath.result(),
clusteringInformation.result()))
.compose(difference -> deployMissingEntityVerticles(context, neonBee, difference))
.onFailure(error -> LOGGER.correlateWith(context)
.error("Failed getting registered entities from cluster", error));
} else {
// if it is not a clustered deployment we have nothing to do
return Future.succeededFuture();
}
}

/**
* Find all entity verticles that are not deployed in the cluster.
*
* @param context the data context
* @param classPathEntitiesMap the entity verticles from the class path
* @param clusteringInformation the clustering information
* @return a map of entity verticles that are not deployed in the cluster
*/
private Map<String, Class<? extends EntityVerticle>> findMissingEntityVerticles(
DataContext context,
Map<String, Class<? extends EntityVerticle>> classPathEntitiesMap,
JsonArray clusteringInformation) {
Set<String> deployedEntitiesSet = qualifiedNamesSet(context, clusteringInformation);
Map<String, Class<? extends EntityVerticle>> difference = new HashMap<>(classPathEntitiesMap);
difference.keySet().removeAll(deployedEntitiesSet);
return difference;
}

private Set<String> qualifiedNamesSet(DataContext context, JsonArray clusteringInformation) {
Set<String> deployedEntitiesSet;
if (clusteringInformation == null) {
LOGGER.correlateWith(context).debug("No entities registered in cluster");
deployedEntitiesSet = Set.of();
} else {
deployedEntitiesSet = clusteringInformation
.stream()
.map(jo -> (JsonObject) jo)
.map(jo -> jo.getString(ClusterEntityRegistry.QUALIFIED_NAME_KEY))
.collect(Collectors.toSet());
}
return deployedEntitiesSet;
}

private Future<Object> deployMissingEntityVerticles(DataContext context, NeonBee neonBee,
Map<String, Class<? extends EntityVerticle>> difference) {
if (difference.isEmpty()) {
LOGGER.correlateWith(context).info(
"Skipping reconciliation as all EntityVerticles are already deployed on NeonBee node {}.",
neonBee.getNodeId());
return Future.succeededFuture();
} else {
List<Future<? extends Deployable>> toDeploy = difference.entrySet()
.stream()
.peek(stringClassEntry -> LOGGER.correlateWith(context).info(
"Deploying missing EntityVerticles \"{}\" on NeonBee node {}.",
stringClassEntry.getKey(),
neonBee.getNodeId()))
.map(stringClassEntry -> fromClass(vertx, stringClassEntry.getValue()))
.collect(Collectors.toList());

return fromDeployables(toDeploy)
.compose(Deployables.allTo(neonBee))
.onSuccess(deployment -> LOGGER.correlateWith(context).info(
"Successfully deployed EntityVerticles \"{}\" on NeonBee node {}.",
deployment.getDeployable().getIdentifier(), neonBee.getNodeId()))
.mapEmpty();
}
}

private Future<Map<String, Class<? extends EntityVerticle>>> classPathEntityVerticles(Vertx vertx) {
return scanForDeployableClasses(vertx).map(verticles -> verticles.stream()
.filter(EntityVerticle.class::isAssignableFrom)
.filter(verticleClass -> filterByAutoDeployAndProfiles(verticleClass, activeProfiles()))
.map(verticleClass -> (Class<? extends EntityVerticle>) verticleClass)
.map(verticleClass -> {
NeonBeeDeployable annotation = verticleClass.getAnnotation(NeonBeeDeployable.class);
String namespace = annotation != null ? annotation.namespace() + "/" : "";

return new AbstractMap.SimpleEntry<String, Class<? extends EntityVerticle>>(
namespace + EntityVerticle.getName(verticleClass),
verticleClass);
})
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(existingValue, newValue) -> existingValue)));
}

private Set<NeonBeeProfile> activeProfiles() {
return NeonBee.get(getVertx()).getOptions().getActiveProfiles();
}

@VisibleForTesting
boolean filterByAutoDeployAndProfiles(Class<? extends Verticle> verticleClass,
Collection<NeonBeeProfile> activeProfiles) {
NeonBeeDeployable annotation = verticleClass.getAnnotation(NeonBeeDeployable.class);
return annotation.autoDeploy() && annotation.profile().isActive(activeProfiles);
}
}
Loading

0 comments on commit ff576d4

Please sign in to comment.