Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Taskmanager stability issues #67

Merged
merged 4 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
Expand Down Expand Up @@ -64,8 +65,11 @@ public interface RabbitMQWorkerObserver {

private final BrokerConnectionFactory factory;
private final List<RabbitMQWorkerObserver> observers = new ArrayList<>();
private Consumer consumer;
private final AtomicBoolean tryStartingConsuming = new AtomicBoolean();
private boolean isShutdown;
private DefaultConsumer consumer;
private Channel channel;
private String queueName;

/**
* Constructor
Expand Down Expand Up @@ -104,51 +108,79 @@ public void removeObserver(final RabbitMQWorkerObserver observer) {
* @throws IOException
*/
public void start() throws IOException {
final Connection c = factory.getConnection();
channel = c.createChannel();
channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE);
final String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, AERIUS_EVENT_EXCHANGE, "");

consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body)
throws IOException {
RabbitMQWorkerMonitor.this.handleDelivery(properties);
tryStartingConsuming.set(true);
while (!isShutdown) {
try {
stopAndStartConsumer();
LOG.debug("Successfully (re)started consumer RabbitMQWorkerMonitor");
if (consumer.getChannel().isOpen()) {
tryStartingConsuming.set(false);
break;
}
} catch (final ShutdownSignalException | IOException e1) {
LOG.warn("(Re)starting consumer RabbitMQWorkerMonitor failed, retrying", e1);
}
}
}

private void stopAndStartConsumer() throws IOException {
synchronized (this) {
if (consumer != null) {
try {
if (consumer.getChannel().isOpen()) {
consumer.getChannel().basicCancel(queueName);
}
} catch (final AlreadyClosedException | IOException e) {
LOG.debug("Exception while stopping consuming, ignoring.", e);
}
}
final Connection c = factory.getConnection();
channel = c.createChannel();
channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, AERIUS_EVENT_EXCHANGE, "");
consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body)
throws IOException {
RabbitMQWorkerMonitor.this.handleDelivery(properties);
}

@Override
public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) {
if (sig.isInitiatedByApplication()) {
LOG.info("Worker event monitor {} was shut down by the application.", consumerTag);
BertScholten marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOG.debug("Worker event monitor {} was shut down.", consumerTag);
// restart
try {
@Override
public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) {
if (sig.isInitiatedByApplication()) {
LOG.info("Worker event monitor {} was shut down by the application.", consumerTag);
} else {
LOG.debug("Worker event monitor {} was shut down.", consumerTag);
// restart
try {
channel.abort();
try {
channel.abort();
} catch (final IOException e) {
// Eat error when closing channel.
}
if (!tryStartingConsuming.get()) {
start();
LOG.info("Restarted worker event monitor {}", consumerTag);
}
} catch (final IOException e) {
// Eat error when closing channel.
LOG.debug("Worker event monitor restart failed", e);
}
start();
LOG.info("Restarted worker event monitor {}", consumerTag);
} catch (final IOException e) {
LOG.debug("Worker event monitor restart failed", e);
}
}
}
};
channel.basicConsume(queue, true, consumer);
};
channel.basicConsume(queueName, true, consumer);
}
}

private void handleDelivery(final AMQP.BasicProperties properties) {
final Map<String, Object> headers = properties.getHeaders();
final String queueName = getParam(headers, HEADER_PARAM_QUEUE);
final String workerQueueName = getParam(headers, HEADER_PARAM_QUEUE);
final int workerSize = getParamInt(headers, HEADER_PARAM_WORKER_SIZE, -1);
final int utilisationSize = getParamInt(headers, HEADER_PARAM_UTILISATION, -1);

synchronized (observers) {
observers.forEach(ro -> ro.updateWorkers(queueName, workerSize, utilisationSize));
observers.forEach(ro -> ro.updateWorkers(workerQueueName, workerSize, utilisationSize));
}
}

Expand Down Expand Up @@ -188,6 +220,7 @@ private int getParamInt(final Map<String, Object> headers, final String param, f
* Shutdown the monitoring process.
*/
public void shutdown() {
isShutdown = true;
try {
channel.close();
} catch (final IOException | TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -74,6 +75,7 @@ void setUp() throws Exception {
final Connection mockConnection = Mockito.mock(Connection.class);
mockChannel = Mockito.mock(Channel.class);
doReturn(mockChannel).when(mockConnection).createChannel();
when(mockChannel.isOpen()).thenReturn(true);
final Queue.DeclareOk mockDeclareOk = Mockito.mock(Queue.DeclareOk.class);
doReturn(mockDeclareOk).when(mockChannel).queueDeclare();
monitor = new RabbitMQWorkerMonitor(new BrokerConnectionFactory(executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public void start() throws IOException {
LOG.warn("(Re)starting consumer for {} failed, retrying in a while", taskQueueName, e1);
warned = true;
}
delayRetry();
if (!isShutdown) {
delayRetry();
}
}
}
}
Expand Down Expand Up @@ -168,7 +170,6 @@ private void handleShutdownSignal(final ShutdownSignalException ssg) {
return;
}
if (!tryStartingConsuming.get() && tryConnecting.compareAndSet(false, true) && messageReceivedHandler != null) {
delayRetry();
messageReceivedHandler.handleShutdownSignal();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,25 @@ private String getWorkerReplyQueue() {
private void tryStartReplyConsumer() {
boolean warn = true;
while (!isShutdown) {
final Connection connection = factory.getConnection();

Connection connection = null;
try {
connection = factory.getConnection();
connection.addShutdownListener(this::restartConnection);
startReplyConsumer(connection);
LOG.info("Successfully (re)started reply consumer for queue {}", workerQueueName);
break;
} catch (final ShutdownSignalException | IOException e1) {
connection.removeShutdownListener(this::restartConnection);
if (connection != null) {
connection.removeShutdownListener(this::restartConnection);
}
if (warn) {
LOG.warn("(Re)starting reply consumer for queue {} failed, retrying in a while", workerQueueName);
LOG.trace("(Re)starting failed with exception:", e1);
warn = false;
}
delayRetry(DEFAULT_RETRY_SECONDS);
if (!isShutdown) {
delayRetry(DEFAULT_RETRY_SECONDS);
}
}
}
}
Expand Down
Loading