From b30200fcf8321639d521b2066c526c1b18e2651e Mon Sep 17 00:00:00 2001 From: sdkottegoda Date: Fri, 3 Aug 2018 09:36:11 +0530 Subject: [PATCH 1/4] Set close parameters in REST --- .../broker/amqp/rest/api/ConnectionsApi.java | 17 ++++--- .../broker/amqp/AmqpConnectionManager.java | 36 ++++++-------- .../codec/handlers/AmqpConnectionHandler.java | 47 ++++++++++++++----- .../amqp/rest/ConnectionsApiDelegate.java | 33 ++++++++----- .../src/main/resources/swagger.yaml | 46 ++++++++++++------ 5 files changed, 114 insertions(+), 65 deletions(-) diff --git a/modules/broker-amqp/src/gen/java/io/ballerina/messaging/broker/amqp/rest/api/ConnectionsApi.java b/modules/broker-amqp/src/gen/java/io/ballerina/messaging/broker/amqp/rest/api/ConnectionsApi.java index 17a2a6b0c27..17b9dc91679 100644 --- a/modules/broker-amqp/src/gen/java/io/ballerina/messaging/broker/amqp/rest/api/ConnectionsApi.java +++ b/modules/broker-amqp/src/gen/java/io/ballerina/messaging/broker/amqp/rest/api/ConnectionsApi.java @@ -72,11 +72,13 @@ public ConnectionsApi(AmqpConnectionManager connectionManager, AuthorizationHand @ApiResponse(code = 403, message = "User is not autherized to perform operation", response = Error.class), @ApiResponse(code = 404, message = "The specified resource was not found", response = Error.class) }) - public Response closeConnection(@Context Request request, @PathParam("id") @ApiParam("Identifier of the " - + "connection") Integer id, - @DefaultValue("false") @QueryParam("force") @ApiParam("If set to true the broker" - + " will close the underlying connection without trying to communicate with the connected AMQP client. If set to false, the broker will send a connection close frame to the client which will respond back with a connection close ok. Once this, frame is received, the connection will be closed.") Boolean force) { - return connectionsApiDelegate.closeConnection(id, force, (Subject) request.getSession().getAttribute + public Response closeConnection(@Context Request request, @PathParam("id") @ApiParam("Identifier of the connection") Integer id, + @DefaultValue("false") @QueryParam("force") @ApiParam("If set to true the broker will close the underlying connection without trying to communicate with the connected AMQP client." + + " If set to false, the broker will send a connection close frame to the client and the connection will be closed when the " + + "client responds back with a connection close ok.") Boolean force, + @DefaultValue("false") @QueryParam("used") @ApiParam("If set to false, the broker will close the connection only if there are no AMQP channels registered on it. If set to true," + + " the connection will be closed regardless of the registered number of channels.") Boolean used) { + return connectionsApiDelegate.closeConnection(id, force, used, (Subject) request.getSession().getAttribute (BrokerAuthConstants.AUTHENTICATION_ID)); } @@ -110,8 +112,11 @@ public Response getAllConnections(@Context Request request) { }) public Response closeChannel(@Context Request request, @PathParam("connectionId") @ApiParam("Identifier of the connection") Integer connectionId, - @PathParam("channelId") @ApiParam("Identifier of the channel") Integer channelId) { + @PathParam("channelId") @ApiParam("Identifier of the channel") Integer channelId, + @DefaultValue("false") @QueryParam("used") @ApiParam("If set to false, the broker will close the channel only if there are no AMQP consumers for it. If set to true, " + + "the channel will be closed regardless of the number of active consumers.") Boolean used) { return connectionsApiDelegate.closeChannel(connectionId, channelId, + used, (Subject) request.getSession().getAttribute(BrokerAuthConstants.AUTHENTICATION_ID)); } diff --git a/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/AmqpConnectionManager.java b/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/AmqpConnectionManager.java index f1b3e608a13..bb0a980357e 100644 --- a/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/AmqpConnectionManager.java +++ b/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/AmqpConnectionManager.java @@ -22,6 +22,7 @@ import io.ballerina.messaging.broker.amqp.codec.AmqpChannelView; import io.ballerina.messaging.broker.amqp.codec.handlers.AmqpConnectionHandler; import io.ballerina.messaging.broker.common.ResourceNotFoundException; +import io.ballerina.messaging.broker.common.ValidationException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -72,29 +73,20 @@ public List getConnections() { } /** - * Closes an AMQP connection specified by the identifier. + * Closes an AMQP connection specified by the identifier according to the parameters set. * * @param id the connection identifier - * @param reason reason to close connection - * @return integer representing the number of channels registered + * @param force if set to true the connection will be closed by the broker without negotiating with the AMQP client + * @param used if set to true, the connection will be closed regardless of the number of active channels + * registered + * @param reason reason to close connection @return integer representing the number of channels registered * @throws ResourceNotFoundException if the the specified connection id does not exist + * @throws ValidationException if active channels exist for the connection, and if used is set to false */ - public int closeConnection(int id, String reason) throws ResourceNotFoundException { + public int closeConnection(int id, boolean force, boolean used, String reason) + throws ResourceNotFoundException, ValidationException { AmqpConnectionHandler connectionHandler = getConnection(id); - return connectionHandler.closeConnection(reason); - } - - /** - * Forces disconnection of an AMQP connection specified by the identifier from the broker side. - * - * @param id the connection identifier - * @param reason reason to close connection - * @return integer representing the number of channels registered - * @throws ResourceNotFoundException if the the specified connection id does not exist - */ - public int forceCloseConnection(int id, String reason) throws ResourceNotFoundException { - AmqpConnectionHandler connectionHandler = getConnection(id); - return connectionHandler.forceCloseConnection(reason); + return connectionHandler.closeConnection(reason, force, used); } /** @@ -102,13 +94,15 @@ public int forceCloseConnection(int id, String reason) throws ResourceNotFoundEx * * @param connectionId unique integer representing the connection * @param channelId integer representing the channel id within the connection id + * @param used if set to true, the channel will be closed regardless of the number of active consumers + * registered * @param reason reason to close channel */ - public void forceDisconnectChannel(int connectionId, int channelId, String reason) throws - ResourceNotFoundException { + public void closeChannel(int connectionId, int channelId, boolean used, String reason) + throws ResourceNotFoundException, ValidationException { AmqpConnectionHandler connectionHandler = getConnection(connectionId); if (Objects.nonNull(connectionHandler.getChannel(channelId))) { - connectionHandler.forceDisconnectChannel(channelId, reason); + connectionHandler.closeChannel(channelId, used, reason); } else { throw new ResourceNotFoundException( "Channel id " + channelId + " does not exist for connection " + connectionId); diff --git a/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandler.java b/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandler.java index 8d37d7a92ca..ba80f219560 100644 --- a/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandler.java +++ b/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandler.java @@ -34,6 +34,7 @@ import io.ballerina.messaging.broker.amqp.codec.frames.GeneralFrame; import io.ballerina.messaging.broker.amqp.codec.frames.ProtocolInitFrame; import io.ballerina.messaging.broker.amqp.metrics.AmqpMetricManager; +import io.ballerina.messaging.broker.common.ValidationException; import io.ballerina.messaging.broker.common.data.types.ShortString; import io.ballerina.messaging.broker.core.Broker; import io.netty.channel.Channel; @@ -286,29 +287,47 @@ public int getId() { } /** - * Sends a connection close frame to the client. + * Closes the AMQP connection with a client according to the parameters set. * * @param reason reason to close connection + * @param force if set to true the connection will be closed by the broker without negotiating with the AMQP client + * @param used if set to true, the connection will be closed regardless of the number of active channels + * registered * @return int representing the number of channels registered on the connection */ - public int closeConnection(String reason) { - LOGGER.info("Closing connection {}. Reason: {}", getId(), reason); + public int closeConnection(String reason, boolean force, boolean used) throws ValidationException { int numberOfChannels = channels.size(); + if (!used && numberOfChannels > 0) { + throw new ValidationException("Cannot close connection. " + numberOfChannels + " active channels exist " + + "and used parameter is not set."); + } + if (force) { + forceCloseConnection(reason); + } else { + closeConnection(reason); + } + return numberOfChannels; + } + + /** + * Sends a connection close frame to the client. + * + * @param reason reason to close connection + */ + private void closeConnection(String reason) { + LOGGER.info("Closing connection {}. Reason: {}", getId(), reason); ctx.writeAndFlush(new ConnectionClose(AmqConstant.CONNECTION_FORCED, ShortString.parseString("Broker forced close connection. " + reason), 0, 0)); - return numberOfChannels; } /** * Closes the underlying connection with the client. * * @param reason reason to force close connection - * @return int representing the number of channels registered on the connection */ - public int forceCloseConnection(String reason) { + private void forceCloseConnection(String reason) { LOGGER.info("Force closing connection {}. Reason: {}", getId(), reason); - int numberOfChannels = channels.size(); ChannelFuture close = ctx.close(); close.addListener(future -> { if (future.isSuccess()) { @@ -317,17 +336,23 @@ public int forceCloseConnection(String reason) { LOGGER.error("Error occurred while closing connection {}", getId(), future.cause()); } }); - return numberOfChannels; } /** * Sends a channel close frame to the client. * * @param channelId the identifier of the channel - * @param reason reason to disconnection channel + * @param used if set to true, the channel will be closed regardless of the number of active consumers + * registered + * @param reason reason to disconnection channel */ - public void forceDisconnectChannel(int channelId, String reason) { - LOGGER.info("Force closing channel {} of connection {}. Reason: {}", channelId, getId(), reason); + public void closeChannel(int channelId, boolean used, String reason) throws ValidationException { + int numberOfConsumers = channelViews.get(channelId).getConsumerCount(); + if (!used && numberOfConsumers > 0) { + throw new ValidationException("Cannot close channel. " + numberOfConsumers + " active consumers exist " + + "and used parameter is not set."); + } + LOGGER.info("Closing channel {} of connection {}. Reason: {}", channelId, getId(), reason); ctx.writeAndFlush(new ChannelClose(channelId, AmqConstant.CHANNEL_CLOSED, ShortString.parseString("Broker forced close channel. " + reason), 0, 0)); diff --git a/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/rest/ConnectionsApiDelegate.java b/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/rest/ConnectionsApiDelegate.java index 9f71ccfbdfb..a3cde7781ae 100644 --- a/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/rest/ConnectionsApiDelegate.java +++ b/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/rest/ConnectionsApiDelegate.java @@ -30,9 +30,11 @@ import io.ballerina.messaging.broker.auth.authorization.AuthorizationHandler; import io.ballerina.messaging.broker.auth.authorization.enums.ResourceAuthScope; import io.ballerina.messaging.broker.common.ResourceNotFoundException; +import io.ballerina.messaging.broker.common.ValidationException; import java.util.ArrayList; import java.util.List; import javax.security.auth.Subject; +import javax.ws.rs.BadRequestException; import javax.ws.rs.NotAuthorizedException; import javax.ws.rs.NotFoundException; import javax.ws.rs.core.Response; @@ -78,22 +80,21 @@ public Response getAllConnections(Subject subject) { * * @param id connection identifier * @param force indicates if the connection should be closed forcefully or not + * @param used if set to true, the connection will be closed regardless of the number of active channels + * registered * @param subject authentication subject containing user information of the user that has invoked the API * @return HTTP/1.1 202 accepted with {@link RequestAcceptedResponse} */ - public Response closeConnection(int id, boolean force, Subject subject) { + public Response closeConnection(int id, boolean force, boolean used, Subject subject) { try { authHandler.handle(ResourceAuthScope.CONNECTIONS_CLOSE, subject); - int registeredChannelCount; - if (force) { - registeredChannelCount = connectionManager.forceCloseConnection(id, - "Forced connection close request " - + "received from REST API."); - } else { - registeredChannelCount = connectionManager.closeConnection(id, + int registeredChannelCount = connectionManager.closeConnection(id, + force, + used, "Connection close request received from " - + "REST API."); - } + + "REST API with parameters " + + "force=" + force + ", " + + "used=" + used + ""); return Response.accepted() .entity(new CloseConnectionResponse().numberOfChannelsRegistered(registeredChannelCount)) .build(); @@ -101,6 +102,8 @@ public Response closeConnection(int id, boolean force, Subject subject) { throw new NotFoundException(e.getMessage(), e); } catch (AuthException e) { throw new NotAuthorizedException(e.getMessage(), e); + } catch (ValidationException e) { + throw new BadRequestException(e.getMessage(), e); } } @@ -109,14 +112,16 @@ public Response closeConnection(int id, boolean force, Subject subject) { * * @param connectionId connection identifier * @param channelId channel id + * @param used if set to true, the channel will be closed regardless of the number of active consumers + * registered * @param subject authentication subject containing user information of the user that has invoked the API * @return HTTP/1.1 202 accepted with {@link RequestAcceptedResponse} */ - public Response closeChannel(Integer connectionId, Integer channelId, Subject subject) { + public Response closeChannel(Integer connectionId, Integer channelId, boolean used, Subject subject) { try { authHandler.handle(ResourceAuthScope.CHANNEL_CLOSE, subject); - connectionManager.forceDisconnectChannel(connectionId, channelId, "Channel close request received from " - + "REST API."); + connectionManager.closeChannel(connectionId, channelId, used, "Channel close request received from " + + "REST API."); return Response.accepted() .entity(new RequestAcceptedResponse().message( "Request accepted for forceful disconnection of channel " + channelId + " of " @@ -126,6 +131,8 @@ public Response closeChannel(Integer connectionId, Integer channelId, Subject su throw new NotFoundException(e.getMessage(), e); } catch (AuthException e) { throw new NotAuthorizedException(e.getMessage(), e); + } catch (ValidationException e) { + throw new BadRequestException(e.getMessage(), e); } } diff --git a/modules/broker-amqp/src/main/resources/swagger.yaml b/modules/broker-amqp/src/main/resources/swagger.yaml index e562b8cdd9f..5664653f505 100644 --- a/modules/broker-amqp/src/main/resources/swagger.yaml +++ b/modules/broker-amqp/src/main/resources/swagger.yaml @@ -46,20 +46,29 @@ paths: type: integer required: true description: Identifier of the connection - - in: query - name: force - type: boolean - required: false - default: false - description: >- - If set to true the broker will close the underlying connection without trying to communicate with the - connected AMQP client. If set to false, the broker will send a connection close frame to the client and the - connection will be closed when the client responds back with a connection close ok. delete: operationId: closeConnection summary: Close the specified connection. description: >- Disconnects the specified amqp connection if the connection exists in the broker + parameters: + - in: query + name: force + type: boolean + required: false + default: false + description: >- + If set to true the broker will close the underlying connection without trying to communicate with the + connected AMQP client. If set to false, the broker will send a connection close frame to the client and the + connection will be closed when the client responds back with a connection close ok. + - in: query + name: used + type: boolean + required: false + default: false + description: >- + If set to false, the broker will close the connection only if there are no AMQP channels registered on + it. If set to true, the connection will be closed regardless of the registered number of channels. produces: - application/json responses: @@ -82,16 +91,25 @@ paths: type: integer required: true description: Identifier of the connection - - in: path - name: channelId - type: integer - required: true - description: Identifier of the channel delete: operationId: closeChannel summary: Force disconnect the specified channel. description: >- Disconnects the specified amqp channel if an active channel exists in the broker + parameters: + - in: path + name: channelId + type: integer + required: true + description: Identifier of the channel + - in: query + name: used + type: boolean + required: false + default: false + description: >- + If set to false, the broker will close the channel only if there are no AMQP consumers for it. If set to + true, the channel will be closed regardless of the number of active consumers. produces: - application/json responses: From 8f70be9fb26bb8026577cb15613b37281b0daa66 Mon Sep 17 00:00:00 2001 From: sdkottegoda Date: Fri, 3 Aug 2018 10:56:25 +0530 Subject: [PATCH 2/4] Tests for close parameters --- .../handlers/AmqpConnectionHandlerTest.java | 71 +++++++++++++++++++ .../rest/ConnectionsRestApiTest.java | 59 +++++++++++++-- 2 files changed, 126 insertions(+), 4 deletions(-) create mode 100644 modules/broker-amqp/src/test/java/io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandlerTest.java diff --git a/modules/broker-amqp/src/test/java/io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandlerTest.java b/modules/broker-amqp/src/test/java/io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandlerTest.java new file mode 100644 index 00000000000..6da3afdfd62 --- /dev/null +++ b/modules/broker-amqp/src/test/java/io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandlerTest.java @@ -0,0 +1,71 @@ +package io.ballerina.messaging.broker.amqp.codec.handlers; + +import io.ballerina.messaging.broker.amqp.AmqpConnectionManager; +import io.ballerina.messaging.broker.amqp.codec.AmqpChannel; +import io.ballerina.messaging.broker.amqp.codec.AmqpChannelFactory; +import io.ballerina.messaging.broker.amqp.metrics.AmqpMetricManager; +import io.ballerina.messaging.broker.common.ValidationException; +import io.ballerina.messaging.broker.core.Broker; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class AmqpConnectionHandlerTest { + + AmqpConnectionHandler connectionHandler; + + AmqpChannel amqpChannel; + + @BeforeMethod + public void setUp() { + Broker broker = Mockito.mock(Broker.class); + AmqpMetricManager metricManager = Mockito.mock(AmqpMetricManager.class); + Mockito.doNothing().when(metricManager).incrementChannelCount(); + AmqpChannelFactory amqpChannelFactory = Mockito.mock(AmqpChannelFactory.class); + amqpChannel = Mockito.mock(AmqpChannel.class); + AmqpConnectionManager amqpConnectionManager = Mockito.mock(AmqpConnectionManager.class); + connectionHandler = new AmqpConnectionHandler(metricManager, amqpChannelFactory, amqpConnectionManager); + connectionHandler.attachBroker(broker); + Mockito.when(amqpChannelFactory.createChannel(broker, 1, connectionHandler)).thenReturn(amqpChannel); + } + + @Test + public void testCloseConnection() throws Exception { + + int channelCount = 5; + for (int i = 1; i <= channelCount; i++) { + connectionHandler.createChannel(i); + } + + //test force=false, used=false + try { + connectionHandler.closeConnection("something", false, false); + Assert.fail("Expected ValidationException not thrown"); + } catch (ValidationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot close connection. " + channelCount + " active channels exist and used " + + "parameter is not set."); + } + } + + @Test + public void testCloseChannel() throws Exception { + + int channelId = 1; + int consumerCount = 5; + connectionHandler.createChannel(channelId); + Mockito.when(amqpChannel.getConsumerCount()).thenReturn(consumerCount); + + //test force=false, used=false + try { + connectionHandler.closeChannel(channelId, false, "something"); + Assert.fail("Expected ValidationException not thrown"); + } catch (ValidationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot close channel. " + consumerCount + " active consumers exist and used " + + "parameter is not set."); + } + } + +} diff --git a/modules/integration/src/test/java/io/ballerina/messaging/broker/integration/standalone/rest/ConnectionsRestApiTest.java b/modules/integration/src/test/java/io/ballerina/messaging/broker/integration/standalone/rest/ConnectionsRestApiTest.java index 5a0236b0f47..53fdc225a68 100644 --- a/modules/integration/src/test/java/io/ballerina/messaging/broker/integration/standalone/rest/ConnectionsRestApiTest.java +++ b/modules/integration/src/test/java/io/ballerina/messaging/broker/integration/standalone/rest/ConnectionsRestApiTest.java @@ -71,7 +71,8 @@ public class ConnectionsRestApiTest { private static final String CONNECTIONS_API_PATH = "/transports/amqp/connections"; - private static final String FORCE_TRUE_QUERY_PARAM = "?force=true"; + private static final String FORCE_TRUE_QUERY_PARAM = "force=true"; + private static final String USED_TRUE_QUERY_PARAM = "used=true"; private List connections; @@ -165,7 +166,8 @@ public void testCloseConnections(String username, String password, String hostNa //Send delete request HttpDelete httpDelete = new HttpDelete(apiBasePath + CONNECTIONS_API_PATH + "/" - + connectionMetadataBeforeClosing[1].getId()); + + connectionMetadataBeforeClosing[1].getId() + "?" + + USED_TRUE_QUERY_PARAM); ClientHelper.setAuthHeader(httpDelete, username, password); CloseableHttpResponse connectionCloseResponse = client.execute(httpDelete); Assert.assertEquals(connectionCloseResponse.getStatusLine().getStatusCode(), HttpStatus.SC_ACCEPTED, @@ -187,6 +189,30 @@ public void testCloseConnections(String username, String password, String hostNa "Connection " + connectionMetadataBeforeClosing[2].getId() + " does not exist."); } + @Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"}) + @Test + public void testCloseConnectionsIfUsedFalse(String username, String password, String hostName, String port) throws + Exception { + + int connectionCount = 3; + //Create 3 connections each having 0, 1 and 2 channels respectively + for (int i = 0; i < connectionCount; i++) { + connections.add(createConnection(i, username, password, hostName, port)); + } + + ConnectionMetadata[] connectionMetadataBeforeClosing = getConnections(username, password); + Assert.assertEquals(connectionMetadataBeforeClosing.length, connectionCount, + "Incorrect connection count before closing connection."); + + //Send delete request + HttpDelete httpDelete = new HttpDelete(apiBasePath + CONNECTIONS_API_PATH + "/" + + connectionMetadataBeforeClosing[1].getId()); + ClientHelper.setAuthHeader(httpDelete, username, password); + CloseableHttpResponse connectionCloseResponse = client.execute(httpDelete); + Assert.assertEquals(connectionCloseResponse.getStatusLine().getStatusCode(), HttpStatus.SC_BAD_REQUEST, + "Incorrect status code while closing connections"); + } + @Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"}) @Test public void testCloseNonExistentConnection(String username, String password, String hostName, String port) @@ -243,7 +269,8 @@ public void testForceCloseConnections(String username, String password, String h //Send delete request HttpDelete httpDelete = new HttpDelete(apiBasePath + CONNECTIONS_API_PATH + "/" - + connectionMetadataBeforeClosing[1].getId() + FORCE_TRUE_QUERY_PARAM); + + connectionMetadataBeforeClosing[1].getId() + "?" + + FORCE_TRUE_QUERY_PARAM + "&" + USED_TRUE_QUERY_PARAM); ClientHelper.setAuthHeader(httpDelete, username, password); CloseableHttpResponse connectionCloseResponse = client.execute(httpDelete); Assert.assertEquals(connectionCloseResponse.getStatusLine().getStatusCode(), HttpStatus.SC_ACCEPTED, @@ -329,7 +356,8 @@ public void testCloseChannels(String username, String password, String hostName, //Send delete request HttpDelete httpDelete = new HttpDelete(apiBasePath + CONNECTIONS_API_PATH + "/" - + connectionMetadataBeforeClosing[0].getId() + "/" + "channels" + "/2"); + + connectionMetadataBeforeClosing[0].getId() + "/" + "channels" + "/2" + + "?" + USED_TRUE_QUERY_PARAM); ClientHelper.setAuthHeader(httpDelete, username, password); CloseableHttpResponse channelCloseResponse = client.execute(httpDelete); Assert.assertEquals(channelCloseResponse.getStatusLine().getStatusCode(), HttpStatus.SC_ACCEPTED, @@ -344,6 +372,29 @@ public void testCloseChannels(String username, String password, String hostName, } + @Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"}) + @Test + public void testCloseChannelsIfUsedFalse(String username, String password, String hostName, String port) throws + Exception { + + int channelCount = 3; + connections.add(createConnection(channelCount, username, password, hostName, port)); + + ConnectionMetadata[] connectionMetadataBeforeClosing = getConnections(username, password); + Assert.assertEquals(connectionMetadataBeforeClosing.length, 1, + "Incorrect connection count before closing channel."); + Assert.assertEquals(connectionMetadataBeforeClosing[0].getChannelCount().intValue(), channelCount, + "Incorrect channel count before closing channel."); + + //Send delete request + HttpDelete httpDelete = new HttpDelete(apiBasePath + CONNECTIONS_API_PATH + "/" + + connectionMetadataBeforeClosing[0].getId() + "/" + "channels" + "/2"); + ClientHelper.setAuthHeader(httpDelete, username, password); + CloseableHttpResponse channelCloseResponse = client.execute(httpDelete); + Assert.assertEquals(channelCloseResponse.getStatusLine().getStatusCode(), HttpStatus.SC_BAD_REQUEST, + "Incorrect status code while closing connections"); + } + @Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"}) @Test public void testCloseNonExistentChannel(String username, String password, String hostName, String port) From b23ef1dfb706b55f5c3bcbfe60e849cf0e519809 Mon Sep 17 00:00:00 2001 From: sdkottegoda Date: Fri, 3 Aug 2018 13:31:20 +0530 Subject: [PATCH 3/4] Add CLI close connection/channel parameters --- .../broker/client/cmd/impl/close/CloseChannelCmd.java | 7 ++++++- .../broker/client/cmd/impl/close/CloseConnectionCmd.java | 9 ++++++++- .../messaging/broker/client/utils/Constants.java | 6 +++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/cmd/impl/close/CloseChannelCmd.java b/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/cmd/impl/close/CloseChannelCmd.java index 4eb3286b0fb..11597cfdd06 100644 --- a/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/cmd/impl/close/CloseChannelCmd.java +++ b/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/cmd/impl/close/CloseChannelCmd.java @@ -36,6 +36,11 @@ public class CloseChannelCmd extends CloseCmd { @Parameter(description = "Identifier of the channel to be closed", required = true) private String channelId = ""; + @Parameter(names = {Constants.IF_USED_FLAG}, + description = "If set to true, the connection will be closed from the broker without " + + "communicating with the amqp client") + private boolean ifUsed = false; + public CloseChannelCmd(String rootCommand) { super(rootCommand); } @@ -43,7 +48,7 @@ public CloseChannelCmd(String rootCommand) { @Override public void execute() { executeClose(Constants.CONNECTIONS_URL_PARAM + connectionId + "/" + Constants.CHANNELS_URL_PARAM - + channelId, + + channelId + Constants.QUERY_PARAM_BEGINNING + Constants.USED_QUERY_PARAM + ifUsed, "Channel close request submitted successfully"); } diff --git a/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/cmd/impl/close/CloseConnectionCmd.java b/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/cmd/impl/close/CloseConnectionCmd.java index b55eccad475..b73c4e46d07 100644 --- a/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/cmd/impl/close/CloseConnectionCmd.java +++ b/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/cmd/impl/close/CloseConnectionCmd.java @@ -37,13 +37,20 @@ public class CloseConnectionCmd extends CloseCmd { + "communicating with the amqp client") private boolean force = false; + @Parameter(names = {Constants.IF_USED_FLAG}, + description = "If set to true, the connection will be closed from the broker without " + + "communicating with the amqp client") + private boolean ifUsed = false; + public CloseConnectionCmd(String rootCommand) { super(rootCommand); } @Override public void execute() { - executeClose(Constants.CONNECTIONS_URL_PARAM + connectionId + Constants.FORCE_QUERY_PARAM + force, + executeClose(Constants.CONNECTIONS_URL_PARAM + connectionId + Constants.QUERY_PARAM_BEGINNING + + Constants.FORCE_QUERY_PARAM + force + Constants.QUERY_PARAM_APPENDING + Constants + .USED_QUERY_PARAM + ifUsed, "Connection close request submitted successfully"); } diff --git a/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/utils/Constants.java b/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/utils/Constants.java index f6d33d8967e..36f66aa2b03 100644 --- a/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/utils/Constants.java +++ b/modules/broker-cli-client/src/main/java/io/ballerina/messaging/broker/client/utils/Constants.java @@ -68,9 +68,13 @@ public class Constants { public static final String HTTP_DELETE = "DELETE"; //http query parameters - public static final String FORCE_QUERY_PARAM = "?force="; + public static final String QUERY_PARAM_BEGINNING = "?"; + public static final String QUERY_PARAM_APPENDING = "&"; + public static final String FORCE_QUERY_PARAM = "force="; + public static final String USED_QUERY_PARAM = "used="; //flags public static final String HELP_FLAG = "--help"; + public static final String IF_USED_FLAG = "--if-used"; public static final String QUEUE_FLAG = "--queue"; } From ee042a71c1abb78c42bc61c056620ec7d40e05d3 Mon Sep 17 00:00:00 2001 From: sdkottegoda Date: Fri, 3 Aug 2018 13:32:00 +0530 Subject: [PATCH 4/4] Modify CLI close tests --- .../broker/integration/standalone/cli/CloseCmdTest.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/modules/integration/src/test/java/io/ballerina/messaging/broker/integration/standalone/cli/CloseCmdTest.java b/modules/integration/src/test/java/io/ballerina/messaging/broker/integration/standalone/cli/CloseCmdTest.java index e86b2602954..65013e90d1b 100644 --- a/modules/integration/src/test/java/io/ballerina/messaging/broker/integration/standalone/cli/CloseCmdTest.java +++ b/modules/integration/src/test/java/io/ballerina/messaging/broker/integration/standalone/cli/CloseCmdTest.java @@ -123,7 +123,7 @@ public void testCloseChannel(String username, String password, String hostName, ConnectionMetadata[] connectionMetadataBeforeClosing = getConnections(username, password); String[] cmd = {CLI_ROOT_COMMAND, Constants.CMD_CLOSE, Constants.CMD_CHANNEL, "1", "--connection", - connectionMetadataBeforeClosing[0].getId().toString()}; + connectionMetadataBeforeClosing[0].getId().toString(), Constants.IF_USED_FLAG}; String expectedLog = "Request accepted for forceful disconnection of channel 1 of connection " + connectionMetadataBeforeClosing[0].getId().toString(); @@ -180,13 +180,12 @@ public void testCloseConnection(String username, String password, String hostNam int channelCount = 3; //Create 3 connections each having 0, 1 and 2 channels respectively - List connections = new ArrayList<>(); connections.add(createConnection(channelCount, username, password, hostName, port)); ConnectionMetadata[] connectionMetadataBeforeClosing = getConnections(username, password); String[] cmd = {CLI_ROOT_COMMAND, Constants.CMD_CLOSE, Constants.CMD_CONNECTION, - connectionMetadataBeforeClosing[0].getId().toString()}; + connectionMetadataBeforeClosing[0].getId().toString(), Constants.IF_USED_FLAG}; String expectedLog = "Connection close request submitted successfully"; Main.main(cmd); @@ -198,9 +197,6 @@ public void testCloseConnection(String username, String password, String hostNam username, password); Assert.assertEquals(connectionMetadataAfterClosing.length, expectedConnectionCount, "Incorrect connection count after closing connection."); - - - closeConnections(connections); } @Test(groups = "StreamReading", description = "test command 'close connection [connection-id]' with a "