Skip to content
This repository has been archived by the owner on Oct 5, 2021. It is now read-only.

Commit

Permalink
Merge pull request #517 from sdkottegoda/channel_close_flags
Browse files Browse the repository at this point in the history
Introduce Channel and Connection close flags
  • Loading branch information
sdkottegoda authored Aug 10, 2018
2 parents ac85b94 + ee042a7 commit 25c26d7
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ public ConnectionsApi(AmqpConnectionManager connectionManager, AuthorizationHand
@ApiResponse(code = 403, message = "User is not autherized to perform operation", response = Error.class),
@ApiResponse(code = 404, message = "The specified resource was not found", response = Error.class)
})
public Response closeConnection(@Context Request request, @PathParam("id") @ApiParam("Identifier of the "
+ "connection") Integer id,
@DefaultValue("false") @QueryParam("force") @ApiParam("If set to true the broker"
+ " will close the underlying connection without trying to communicate with the connected AMQP client. If set to false, the broker will send a connection close frame to the client which will respond back with a connection close ok. Once this, frame is received, the connection will be closed.") Boolean force) {
return connectionsApiDelegate.closeConnection(id, force, (Subject) request.getSession().getAttribute
public Response closeConnection(@Context Request request, @PathParam("id") @ApiParam("Identifier of the connection") Integer id,
@DefaultValue("false") @QueryParam("force") @ApiParam("If set to true the broker will close the underlying connection without trying to communicate with the connected AMQP client."
+ " If set to false, the broker will send a connection close frame to the client and the connection will be closed when the "
+ "client responds back with a connection close ok.") Boolean force,
@DefaultValue("false") @QueryParam("used") @ApiParam("If set to false, the broker will close the connection only if there are no AMQP channels registered on it. If set to true,"
+ " the connection will be closed regardless of the registered number of channels.") Boolean used) {
return connectionsApiDelegate.closeConnection(id, force, used, (Subject) request.getSession().getAttribute
(BrokerAuthConstants.AUTHENTICATION_ID));
}

Expand Down Expand Up @@ -110,8 +112,11 @@ public Response getAllConnections(@Context Request request) {
})
public Response closeChannel(@Context Request request,
@PathParam("connectionId") @ApiParam("Identifier of the connection") Integer connectionId,
@PathParam("channelId") @ApiParam("Identifier of the channel") Integer channelId) {
@PathParam("channelId") @ApiParam("Identifier of the channel") Integer channelId,
@DefaultValue("false") @QueryParam("used") @ApiParam("If set to false, the broker will close the channel only if there are no AMQP consumers for it. If set to true, "
+ "the channel will be closed regardless of the number of active consumers.") Boolean used) {
return connectionsApiDelegate.closeChannel(connectionId, channelId,
used,
(Subject) request.getSession().getAttribute(BrokerAuthConstants.AUTHENTICATION_ID));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.ballerina.messaging.broker.amqp.codec.AmqpChannelView;
import io.ballerina.messaging.broker.amqp.codec.handlers.AmqpConnectionHandler;
import io.ballerina.messaging.broker.common.ResourceNotFoundException;
import io.ballerina.messaging.broker.common.ValidationException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -72,43 +73,36 @@ public List<AmqpConnectionHandler> getConnections() {
}

/**
* Closes an AMQP connection specified by the identifier.
* Closes an AMQP connection specified by the identifier according to the parameters set.
*
* @param id the connection identifier
* @param reason reason to close connection
* @return integer representing the number of channels registered
* @param force if set to true the connection will be closed by the broker without negotiating with the AMQP client
* @param used if set to true, the connection will be closed regardless of the number of active channels
* registered
* @param reason reason to close connection @return integer representing the number of channels registered
* @throws ResourceNotFoundException if the the specified connection id does not exist
* @throws ValidationException if active channels exist for the connection, and if used is set to false
*/
public int closeConnection(int id, String reason) throws ResourceNotFoundException {
public int closeConnection(int id, boolean force, boolean used, String reason)
throws ResourceNotFoundException, ValidationException {
AmqpConnectionHandler connectionHandler = getConnection(id);
return connectionHandler.closeConnection(reason);
}

/**
* Forces disconnection of an AMQP connection specified by the identifier from the broker side.
*
* @param id the connection identifier
* @param reason reason to close connection
* @return integer representing the number of channels registered
* @throws ResourceNotFoundException if the the specified connection id does not exist
*/
public int forceCloseConnection(int id, String reason) throws ResourceNotFoundException {
AmqpConnectionHandler connectionHandler = getConnection(id);
return connectionHandler.forceCloseConnection(reason);
return connectionHandler.closeConnection(reason, force, used);
}

/**
* Closes an AMQP channel specified by the connection id and the channel id.
*
* @param connectionId unique integer representing the connection
* @param channelId integer representing the channel id within the connection id
* @param used if set to true, the channel will be closed regardless of the number of active consumers
* registered
* @param reason reason to close channel
*/
public void forceDisconnectChannel(int connectionId, int channelId, String reason) throws
ResourceNotFoundException {
public void closeChannel(int connectionId, int channelId, boolean used, String reason)
throws ResourceNotFoundException, ValidationException {
AmqpConnectionHandler connectionHandler = getConnection(connectionId);
if (Objects.nonNull(connectionHandler.getChannel(channelId))) {
connectionHandler.forceDisconnectChannel(channelId, reason);
connectionHandler.closeChannel(channelId, used, reason);
} else {
throw new ResourceNotFoundException(
"Channel id " + channelId + " does not exist for connection " + connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.ballerina.messaging.broker.amqp.codec.frames.GeneralFrame;
import io.ballerina.messaging.broker.amqp.codec.frames.ProtocolInitFrame;
import io.ballerina.messaging.broker.amqp.metrics.AmqpMetricManager;
import io.ballerina.messaging.broker.common.ValidationException;
import io.ballerina.messaging.broker.common.data.types.ShortString;
import io.ballerina.messaging.broker.core.Broker;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -286,29 +287,47 @@ public int getId() {
}

/**
* Sends a connection close frame to the client.
* Closes the AMQP connection with a client according to the parameters set.
*
* @param reason reason to close connection
* @param force if set to true the connection will be closed by the broker without negotiating with the AMQP client
* @param used if set to true, the connection will be closed regardless of the number of active channels
* registered
* @return int representing the number of channels registered on the connection
*/
public int closeConnection(String reason) {
LOGGER.info("Closing connection {}. Reason: {}", getId(), reason);
public int closeConnection(String reason, boolean force, boolean used) throws ValidationException {
int numberOfChannels = channels.size();
if (!used && numberOfChannels > 0) {
throw new ValidationException("Cannot close connection. " + numberOfChannels + " active channels exist "
+ "and used parameter is not set.");
}
if (force) {
forceCloseConnection(reason);
} else {
closeConnection(reason);
}
return numberOfChannels;
}

/**
* Sends a connection close frame to the client.
*
* @param reason reason to close connection
*/
private void closeConnection(String reason) {
LOGGER.info("Closing connection {}. Reason: {}", getId(), reason);
ctx.writeAndFlush(new ConnectionClose(AmqConstant.CONNECTION_FORCED,
ShortString.parseString("Broker forced close connection. " + reason),
0, 0));
return numberOfChannels;
}

/**
* Closes the underlying connection with the client.
*
* @param reason reason to force close connection
* @return int representing the number of channels registered on the connection
*/
public int forceCloseConnection(String reason) {
private void forceCloseConnection(String reason) {
LOGGER.info("Force closing connection {}. Reason: {}", getId(), reason);
int numberOfChannels = channels.size();
ChannelFuture close = ctx.close();
close.addListener(future -> {
if (future.isSuccess()) {
Expand All @@ -317,17 +336,23 @@ public int forceCloseConnection(String reason) {
LOGGER.error("Error occurred while closing connection {}", getId(), future.cause());
}
});
return numberOfChannels;
}

/**
* Sends a channel close frame to the client.
*
* @param channelId the identifier of the channel
* @param reason reason to disconnection channel
* @param used if set to true, the channel will be closed regardless of the number of active consumers
* registered
* @param reason reason to disconnection channel
*/
public void forceDisconnectChannel(int channelId, String reason) {
LOGGER.info("Force closing channel {} of connection {}. Reason: {}", channelId, getId(), reason);
public void closeChannel(int channelId, boolean used, String reason) throws ValidationException {
int numberOfConsumers = channelViews.get(channelId).getConsumerCount();
if (!used && numberOfConsumers > 0) {
throw new ValidationException("Cannot close channel. " + numberOfConsumers + " active consumers exist "
+ "and used parameter is not set.");
}
LOGGER.info("Closing channel {} of connection {}. Reason: {}", channelId, getId(), reason);
ctx.writeAndFlush(new ChannelClose(channelId, AmqConstant.CHANNEL_CLOSED,
ShortString.parseString("Broker forced close channel. " + reason),
0, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import io.ballerina.messaging.broker.auth.authorization.AuthorizationHandler;
import io.ballerina.messaging.broker.auth.authorization.enums.ResourceAuthScope;
import io.ballerina.messaging.broker.common.ResourceNotFoundException;
import io.ballerina.messaging.broker.common.ValidationException;
import java.util.ArrayList;
import java.util.List;
import javax.security.auth.Subject;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -78,29 +80,30 @@ public Response getAllConnections(Subject subject) {
*
* @param id connection identifier
* @param force indicates if the connection should be closed forcefully or not
* @param used if set to true, the connection will be closed regardless of the number of active channels
* registered
* @param subject authentication subject containing user information of the user that has invoked the API
* @return HTTP/1.1 202 accepted with {@link RequestAcceptedResponse}
*/
public Response closeConnection(int id, boolean force, Subject subject) {
public Response closeConnection(int id, boolean force, boolean used, Subject subject) {
try {
authHandler.handle(ResourceAuthScope.CONNECTIONS_CLOSE, subject);
int registeredChannelCount;
if (force) {
registeredChannelCount = connectionManager.forceCloseConnection(id,
"Forced connection close request "
+ "received from REST API.");
} else {
registeredChannelCount = connectionManager.closeConnection(id,
int registeredChannelCount = connectionManager.closeConnection(id,
force,
used,
"Connection close request received from "
+ "REST API.");
}
+ "REST API with parameters "
+ "force=" + force + ", "
+ "used=" + used + "");
return Response.accepted()
.entity(new CloseConnectionResponse().numberOfChannelsRegistered(registeredChannelCount))
.build();
} catch (ResourceNotFoundException e) {
throw new NotFoundException(e.getMessage(), e);
} catch (AuthException e) {
throw new NotAuthorizedException(e.getMessage(), e);
} catch (ValidationException e) {
throw new BadRequestException(e.getMessage(), e);
}
}

Expand All @@ -109,14 +112,16 @@ public Response closeConnection(int id, boolean force, Subject subject) {
*
* @param connectionId connection identifier
* @param channelId channel id
* @param used if set to true, the channel will be closed regardless of the number of active consumers
* registered
* @param subject authentication subject containing user information of the user that has invoked the API
* @return HTTP/1.1 202 accepted with {@link RequestAcceptedResponse}
*/
public Response closeChannel(Integer connectionId, Integer channelId, Subject subject) {
public Response closeChannel(Integer connectionId, Integer channelId, boolean used, Subject subject) {
try {
authHandler.handle(ResourceAuthScope.CHANNEL_CLOSE, subject);
connectionManager.forceDisconnectChannel(connectionId, channelId, "Channel close request received from "
+ "REST API.");
connectionManager.closeChannel(connectionId, channelId, used, "Channel close request received from "
+ "REST API.");
return Response.accepted()
.entity(new RequestAcceptedResponse().message(
"Request accepted for forceful disconnection of channel " + channelId + " of "
Expand All @@ -126,6 +131,8 @@ public Response closeChannel(Integer connectionId, Integer channelId, Subject su
throw new NotFoundException(e.getMessage(), e);
} catch (AuthException e) {
throw new NotAuthorizedException(e.getMessage(), e);
} catch (ValidationException e) {
throw new BadRequestException(e.getMessage(), e);
}
}

Expand Down
46 changes: 32 additions & 14 deletions modules/broker-amqp/src/main/resources/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,29 @@ paths:
type: integer
required: true
description: Identifier of the connection
- in: query
name: force
type: boolean
required: false
default: false
description: >-
If set to true the broker will close the underlying connection without trying to communicate with the
connected AMQP client. If set to false, the broker will send a connection close frame to the client and the
connection will be closed when the client responds back with a connection close ok.
delete:
operationId: closeConnection
summary: Close the specified connection.
description: >-
Disconnects the specified amqp connection if the connection exists in the broker
parameters:
- in: query
name: force
type: boolean
required: false
default: false
description: >-
If set to true the broker will close the underlying connection without trying to communicate with the
connected AMQP client. If set to false, the broker will send a connection close frame to the client and the
connection will be closed when the client responds back with a connection close ok.
- in: query
name: used
type: boolean
required: false
default: false
description: >-
If set to false, the broker will close the connection only if there are no AMQP channels registered on
it. If set to true, the connection will be closed regardless of the registered number of channels.
produces:
- application/json
responses:
Expand All @@ -82,16 +91,25 @@ paths:
type: integer
required: true
description: Identifier of the connection
- in: path
name: channelId
type: integer
required: true
description: Identifier of the channel
delete:
operationId: closeChannel
summary: Force disconnect the specified channel.
description: >-
Disconnects the specified amqp channel if an active channel exists in the broker
parameters:
- in: path
name: channelId
type: integer
required: true
description: Identifier of the channel
- in: query
name: used
type: boolean
required: false
default: false
description: >-
If set to false, the broker will close the channel only if there are no AMQP consumers for it. If set to
true, the channel will be closed regardless of the number of active consumers.
produces:
- application/json
responses:
Expand Down
Loading

0 comments on commit 25c26d7

Please sign in to comment.