diff --git a/source/pom.xml b/source/pom.xml
index c74c56b..e6f4e1f 100644
--- a/source/pom.xml
+++ b/source/pom.xml
@@ -19,7 +19,13 @@
4.0.0
- nl.aerius
+
+ nl.aerius
+ aerius-root-pom
+ 1.0.0
+
+
+
taskmanager-parent
1.10.0-SNAPSHOT
pom
@@ -45,24 +51,9 @@
HEAD
-
-
- aerius-nexus
- https://nexus.aerius.nl/repository/maven-releases/
-
-
- aerius-nexus
- https://nexus.aerius.nl/repository/maven-snapshots/
-
-
-
- 11
- 11
+ 11
- UTF-8
- UTF-8
- 1.2.0
2.28.0
4.2.13
2.0.5
@@ -70,9 +61,6 @@
4.9.0
aerius_taskmanager
- ${project.artifactId}
- aerius
- https://sonarcloud.io
taskmanager-report/target/site/jacoco-aggregate/jacoco.xml
${basedir}/../${aggregate.report.dir}
@@ -87,13 +75,6 @@
5.16.0
-
-
- com.google.code.gson
- gson
- 2.10
-
-
commons-cli
@@ -124,7 +105,7 @@
jcl-over-slf4j
${slf4j.version}
-
+
io.opentelemetry
@@ -365,26 +346,6 @@
-
- org.apache.maven.plugins
- maven-enforcer-plugin
- 3.1.0
-
-
- enforce-maven
-
- enforce
-
-
-
-
- [3.6.3,)
-
-
-
-
-
-
@@ -413,35 +374,5 @@
taskmanager-report
-
-
- dependency-check
-
-
-
- org.owasp
- dependency-check-maven
- false
-
-
-
- aggregate
-
-
-
-
-
-
-
- ALL
-
- false
-
- true
-
-
-
-
-
diff --git a/source/taskmanager-client/pom.xml b/source/taskmanager-client/pom.xml
index fbe03b7..3b42acb 100644
--- a/source/taskmanager-client/pom.xml
+++ b/source/taskmanager-client/pom.xml
@@ -37,12 +37,6 @@
amqp-client
-
-
- com.google.code.gson
- gson
-
-
org.apache.httpcomponents
diff --git a/source/taskmanager/pom.xml b/source/taskmanager/pom.xml
index e63e885..0d102f2 100644
--- a/source/taskmanager/pom.xml
+++ b/source/taskmanager/pom.xml
@@ -43,6 +43,12 @@
commons-cli
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+
+
io.opentelemetry
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/PriorityTaskSchedulerFileHandler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/PriorityTaskSchedulerFileHandler.java
index f49c17a..513fb50 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/PriorityTaskSchedulerFileHandler.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/PriorityTaskSchedulerFileHandler.java
@@ -18,17 +18,13 @@
import java.io.File;
import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
import java.util.Locale;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import nl.aerius.taskmanager.domain.PriorityTaskQueue;
import nl.aerius.taskmanager.domain.PriorityTaskSchedule;
@@ -40,10 +36,9 @@ class PriorityTaskSchedulerFileHandler implements SchedulerFileConfigurationHand
private static final Logger LOG = LoggerFactory.getLogger(PriorityTaskSchedulerFileHandler.class);
- private static final String FILE_PREFIX = "priority-task-scheduler.";
private static final String ENV_PREFIX = "AERIUS_PRIORITY_TASK_SCHEDULER_";
- private final Gson gson = new GsonBuilder().setPrettyPrinting().excludeFieldsWithoutExposeAnnotation().create();
+ private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public PriorityTaskSchedule read(final File file) throws IOException {
@@ -53,26 +48,16 @@ public PriorityTaskSchedule read(final File file) throws IOException {
}
private PriorityTaskSchedule readFromFile(final File file) throws IOException {
- try (final Reader reader = Files.newBufferedReader(file.toPath(), StandardCharsets.UTF_8)) {
- return gson.fromJson(reader, PriorityTaskSchedule.class);
- }
+ return objectMapper.readValue(file, PriorityTaskSchedule.class);
}
- private PriorityTaskSchedule readFromEnvironment(final String workerQueueName) {
+ private PriorityTaskSchedule readFromEnvironment(final String workerQueueName) throws JsonProcessingException {
final String environmentKey = ENV_PREFIX + workerQueueName.toUpperCase(Locale.ROOT);
final String environmentValue = System.getenv(environmentKey);
if (environmentValue != null) {
LOG.info("Using configuration for worker queue {} from environment", workerQueueName);
- return gson.fromJson(environmentValue, PriorityTaskSchedule.class);
+ return objectMapper.readValue(environmentValue, PriorityTaskSchedule.class);
}
return null;
}
-
- @Override
- public void write(final File path, final PriorityTaskSchedule priorityTaskSchedule) throws IOException {
- final File targetFile = new File(path, FILE_PREFIX + priorityTaskSchedule.getWorkerQueueName() + ".json");
- try (final Writer writer = Files.newBufferedWriter(targetFile.toPath(), StandardCharsets.UTF_8)) {
- writer.write(gson.toJson(priorityTaskSchedule));
- }
- }
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/SchedulerFileConfigurationHandler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/SchedulerFileConfigurationHandler.java
index 08c9c81..845a2d5 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/SchedulerFileConfigurationHandler.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/SchedulerFileConfigurationHandler.java
@@ -37,13 +37,4 @@ interface SchedulerFileConfigurationHandler schedule) throws IOExce
}
final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName);
- taskScheduleBucket.updateQueues(schedule.getTaskQueues(), schedule.isDurable());
+ taskScheduleBucket.updateQueues(schedule.getQueues(), schedule.isDurable());
return taskScheduleBucket.isRunning();
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskSchedulerWatcher.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskSchedulerWatcher.java
index cdd8d53..a53eb3c 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskSchedulerWatcher.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskSchedulerWatcher.java
@@ -120,7 +120,7 @@ private S rewriteQueueNames(final File file) throws IOException {
final WorkerQueueType workerQueueType = new WorkerQueueType(schedule.getWorkerQueueName());
schedule.setWorkerQueueName(workerQueueType.getWorkerQueueName());
- schedule.getTaskQueues().forEach(s -> s.setQueueName(workerQueueType.getTaskQueueName(s.getQueueName())));
+ schedule.getQueues().forEach(s -> s.setQueueName(workerQueueType.getTaskQueueName(s.getQueueName())));
return schedule;
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/PriorityTaskQueue.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/PriorityTaskQueue.java
index 977b02c..0cd131d 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/PriorityTaskQueue.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/PriorityTaskQueue.java
@@ -16,22 +16,13 @@
*/
package nl.aerius.taskmanager.domain;
-import java.io.Serializable;
-
-import com.google.gson.annotations.Expose;
-
/**
* The configuration of a task, which is used by Task Consumer to retrieve the queue name
* and used by the scheduler to determine if a message should be handled or not.
- *
*/
-public class PriorityTaskQueue extends TaskQueue implements Serializable {
-
- private static final long serialVersionUID = 7719329377305394882L;
+public class PriorityTaskQueue extends TaskQueue {
- @Expose
private int priority;
- @Expose
private double maxCapacityUse;
/**
@@ -43,11 +34,10 @@ public PriorityTaskQueue() {
/**
* @param queueName The name of the queue the task corresponds to.
- * @param description The description of this task queue.
* @param priority The priority this task should have.
* @param maxCapacityUse The maximum capacity this task can use of the total workers assigned. Should be a fraction.
*/
- public PriorityTaskQueue(final String queueName, final String description, final int priority, final double maxCapacityUse) {
+ public PriorityTaskQueue(final String queueName, final int priority, final double maxCapacityUse) {
super(queueName);
this.priority = priority;
this.maxCapacityUse = maxCapacityUse;
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/PriorityTaskSchedule.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/PriorityTaskSchedule.java
index 42b77eb..d1bae4b 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/PriorityTaskSchedule.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/PriorityTaskSchedule.java
@@ -20,5 +20,4 @@
* The configuration of a TaskScheduler.
*/
public class PriorityTaskSchedule extends TaskSchedule {
-
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskQueue.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskQueue.java
index e53e118..3571f9e 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskQueue.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskQueue.java
@@ -16,13 +16,15 @@
*/
package nl.aerius.taskmanager.domain;
-import com.google.gson.annotations.Expose;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* Base class for a single task queue configuration.
*/
+@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
+@JsonSubTypes({ @JsonSubTypes.Type(PriorityTaskQueue.class) })
public class TaskQueue {
- @Expose
private String queueName;
protected TaskQueue() {
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java
index 2fd5719..d15377a 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java
@@ -19,21 +19,16 @@
import java.util.ArrayList;
import java.util.List;
-import com.google.gson.annotations.Expose;
-
/**
* Base class for a single schedule configuration.
* @param specific task queue configuration class
*/
public class TaskSchedule {
- @Expose
private String workerQueueName;
- @Expose
private Boolean durable;
- @Expose
private List queues = new ArrayList<>();
public String getWorkerQueueName() {
@@ -44,12 +39,12 @@ public void setWorkerQueueName(final String workerQueueName) {
this.workerQueueName = workerQueueName;
}
- public List getTaskQueues() {
+ public List getQueues() {
return queues;
}
- public void setTaskConfigurations(final List taskConfigurations) {
- this.queues = taskConfigurations;
+ public void setQueues(final List queues) {
+ this.queues = queues;
}
public void setDurable(final boolean durable) {
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java
index d861dde..e50d7de 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java
@@ -20,6 +20,7 @@
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
@@ -41,10 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.stream.JsonReader;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.client.configuration.ConnectionConfiguration;
@@ -59,6 +58,7 @@ public class RabbitMQQueueMonitor {
private static final Logger LOG = LoggerFactory.getLogger(RabbitMQQueueMonitor.class);
private static final int TIMEOUT = (int) TimeUnit.SECONDS.toMillis(3);
+ private final ObjectMapper objectMapper = new ObjectMapper();
private final ConnectionConfiguration configuration;
private final CloseableHttpClient httpClient;
private final HttpHost targetHost;
@@ -110,11 +110,11 @@ public void updateWorkerQueueState(final String queueName, final WorkerSizeObser
final String apiPath = "/api/queues/" + virtualHost + "/" + queueName;
try {
- final JsonElement je = getJsonResultFromApi(apiPath);
- if (je == null) {
+ final JsonNode jsonObject = getJsonResultFromApi(apiPath);
+
+ if (jsonObject == null) {
LOG.error("Queue configuration from RabbitMQ admin json get call returned null.");
} else {
- final JsonObject jsonObject = je.getAsJsonObject();
final int numberOfWorkers = getJsonIntPrimitive(jsonObject, "consumers");
final int numberOfMessages = getJsonIntPrimitive(jsonObject, "messages");
@@ -126,31 +126,23 @@ public void updateWorkerQueueState(final String queueName, final WorkerSizeObser
}
}
- private static int getJsonIntPrimitive(final JsonObject jsonObject, final String key) {
- final int value;
- if (jsonObject == null || jsonObject.getAsJsonPrimitive(key) == null) {
- value = 0;
- } else {
- value = jsonObject.getAsJsonPrimitive(key).getAsInt();
- }
- return value;
+ private static int getJsonIntPrimitive(final JsonNode jsonObject, final String key) {
+ return jsonObject == null || !jsonObject.has(key) ? 0 : jsonObject.get(key).intValue();
}
- protected JsonElement getJsonResultFromApi(final String apiPath) throws URISyntaxException, IOException {
- JsonElement returnElement = null;
- final URI uri = new URI("http://" + configuration.getBrokerHost() + ":" + configuration.getBrokerManagementPort() + apiPath);
+ protected JsonNode getJsonResultFromApi(final String apiPath) throws URISyntaxException, IOException {
+ final URI uri = new URL("http", configuration.getBrokerHost(), configuration.getBrokerManagementPort(), apiPath).toURI();
+
try (final CloseableHttpResponse response = httpClient.execute(targetHost, new HttpGet(uri), context)) {
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
- try (final InputStreamReader is = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8);
- final JsonReader jr = new JsonReader(is)) {
- returnElement = JsonParser.parseReader(jr);
+ try (final InputStreamReader is = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)) {
+ return objectMapper.readTree(is);
}
} else {
throw new IOException(String.format("Status code wasn't 200 when retrieving json result. Status was: %d, %s",
response.getStatusLine().getStatusCode(), response.getStatusLine()));
}
}
- return returnElement;
}
private static RequestConfig getDefaultRequestConfig() {
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/PriorityTaskSchedulerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/PriorityTaskSchedulerTest.java
index 4a8e8fa..cea1411 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/PriorityTaskSchedulerTest.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/PriorityTaskSchedulerTest.java
@@ -71,14 +71,14 @@ void setUp() throws IOException, InterruptedException {
final TaskConsumer taskConsumer3 = createMockTaskConsumer(QUEUE3);
final PriorityTaskSchedule configuration = new PriorityTaskSchedule();
configuration.setWorkerQueueName("TEST");
- final PriorityTaskQueue tc1 = new PriorityTaskQueue(QUEUE1, "", 0, TEST_CAPACITY);
- final PriorityTaskQueue tc2 = new PriorityTaskQueue(QUEUE2, "", 1, TEST_CAPACITY);
- final PriorityTaskQueue tc3 = new PriorityTaskQueue(QUEUE3, "", 1, TEST_CAPACITY);
- configuration.getTaskQueues().add(tc1);
- configuration.getTaskQueues().add(tc2);
- configuration.getTaskQueues().add(tc3);
+ final PriorityTaskQueue tc1 = new PriorityTaskQueue(QUEUE1, 0, TEST_CAPACITY);
+ final PriorityTaskQueue tc2 = new PriorityTaskQueue(QUEUE2, 1, TEST_CAPACITY);
+ final PriorityTaskQueue tc3 = new PriorityTaskQueue(QUEUE3, 1, TEST_CAPACITY);
+ configuration.getQueues().add(tc1);
+ configuration.getQueues().add(tc2);
+ configuration.getQueues().add(tc3);
scheduler = (PriorityTaskScheduler) factory.createScheduler(configuration.getWorkerQueueName());
- configuration.getTaskQueues().forEach(scheduler::updateQueue);
+ configuration.getQueues().forEach(scheduler::updateQueue);
task1 = createTask(taskConsumer1, "1", QUEUE1);
task2a = createTask(taskConsumer2, "2a", QUEUE2);
task2b = createTask(taskConsumer2, "2b", QUEUE2);
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java
index fbd7b5f..b2a7062 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java
@@ -68,7 +68,7 @@ void testAddScheduler() throws IOException, InterruptedException {
@Test
void testModifyQueue() throws IOException, InterruptedException {
assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running");
- schedule.getTaskQueues().get(0).setPriority(30);
+ schedule.getQueues().get(0).setPriority(30);
assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler updated");
taskManager.removeTaskScheduler(schedule.getWorkerQueueName());
}
@@ -76,7 +76,7 @@ void testModifyQueue() throws IOException, InterruptedException {
@Test
void testRemoveQueue() throws IOException, InterruptedException {
assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running");
- schedule.getTaskQueues().remove(0);
+ schedule.getQueues().remove(0);
assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler updated");
taskManager.removeTaskScheduler(schedule.getWorkerQueueName());
}
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java
index 581591a..16182ac 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java
@@ -25,9 +25,8 @@
import org.junit.jupiter.api.Test;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-import com.google.gson.stream.JsonReader;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.client.configuration.ConnectionConfiguration;
@@ -38,6 +37,7 @@
class RabbitMQQueueMonitorTest {
private static final String DUMMY = "dummy";
+ private final ObjectMapper objectMapper = new ObjectMapper();
@Test
void testGetWorkerQueueState() throws InterruptedException {
@@ -57,13 +57,10 @@ public void onDeltaNumberOfWorkersUpdate(final int deltaNumberOfWorkers) {
};
final RabbitMQQueueMonitor rpm = new RabbitMQQueueMonitor(configuration) {
@Override
- protected JsonElement getJsonResultFromApi(final String apiPath) {
+ protected JsonNode getJsonResultFromApi(final String apiPath) throws IOException {
try (final InputStream fr = getClass().getResourceAsStream("queue_aerius.worker.ops.txt");
- final InputStreamReader is = new InputStreamReader(fr);
- final JsonReader jr = new JsonReader(is)) {
- return JsonParser.parseReader(jr);
- } catch (final IOException e) {
- throw new RuntimeException(e);
+ final InputStreamReader is = new InputStreamReader(fr)) {
+ return objectMapper.readTree(is);
}
}
};