From 01b0c89e2e70ea54312509e0a348e4a945a4a208 Mon Sep 17 00:00:00 2001 From: Asitha Nanayakkara Date: Wed, 14 Mar 2018 09:28:30 +0530 Subject: [PATCH] Implement dtx-branch restore - Restore already prepared branches after node fail over or node restart. --- .../broker/amqp/codec/AmqpChannel.java | 2 +- .../messaging/broker/core/Broker.java | 7 + .../messaging/broker/core/BrokerImpl.java | 25 ++++ .../messaging/broker/core/Metadata.java | 18 ++- .../broker/core/SecureBrokerImpl.java | 5 + .../broker/core/store/DbMessageStore.java | 22 ++- .../broker/core/store/MessageStore.java | 15 +- .../broker/core/store/NullMessageStore.java | 14 +- .../broker/core/store/dao/MessageDao.java | 18 +++ .../store/dao/impl/DtxCrudOperationsDao.java | 135 +++++++++++++++--- .../dao/impl/MessageCrudOperationsDao.java | 13 +- .../core/store/dao/impl/MessageDaoImpl.java | 27 +++- .../core/store/dao/impl/RDBMSConstants.java | 13 ++ .../transaction/AutoCommitTransaction.java | 5 + .../broker/core/transaction/Branch.java | 34 ++++- .../core/transaction/BranchFactory.java | 1 + .../core/transaction/BrokerTransaction.java | 9 +- .../transaction/BrokerTransactionFactory.java | 9 +- .../transaction/DistributedTransaction.java | 5 + .../DtxStateTransitionException.java | 35 +++++ .../ForwardingBrokerTransaction.java | 5 + .../core/transaction/LocalTransaction.java | 5 + .../broker/core/transaction/Registry.java | 108 +++++++++----- .../UnknownDtxBranchException.java | 38 +++++ .../broker/core/transaction/XidImpl.java | 11 ++ .../DistributedTransactionValidationTest.java | 11 +- .../transaction/TransactionTimeoutTest.java | 10 +- 27 files changed, 501 insertions(+), 99 deletions(-) create mode 100644 modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/DtxStateTransitionException.java create mode 100644 modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/UnknownDtxBranchException.java diff --git a/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/codec/AmqpChannel.java b/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/codec/AmqpChannel.java index 8f8aa68247a..b987dcc070b 100644 --- a/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/codec/AmqpChannel.java +++ b/modules/broker-amqp/src/main/java/io/ballerina/messaging/broker/amqp/codec/AmqpChannel.java @@ -248,7 +248,7 @@ public void acknowledge(long deliveryTag, boolean multiple) throws BrokerExcepti } if (ackData != null) { transaction.dequeue(ackData.getQueueName(), ackData.getMessage()); - if (isNonTransactional()) { + if (!transaction.inTransactionBlock()) { ackData = unackedMessageMap.removeMarkedAcknowledgment(deliveryTag); ackData.getMessage().release(); } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/Broker.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/Broker.java index 4fbab9b9d55..83dd36882c6 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/Broker.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/Broker.java @@ -294,4 +294,11 @@ static long getNextMessageId() { * @return a new DistributedTransaction object */ BrokerTransaction newDistributedTransaction(); + + /** + * Restore enqueued data that is already in prepared state. + * @param xid + * @param messages + */ + Set restoreDtxPreparedMessages(Xid xid, Collection messages) throws BrokerException; } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/BrokerImpl.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/BrokerImpl.java index 16d38119773..853c801f4b9 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/BrokerImpl.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/BrokerImpl.java @@ -124,6 +124,7 @@ public BrokerImpl(StartupContext startupContext) throws Exception { initDefaultDeadLetterQueue(); this.brokerTransactionFactory = new BrokerTransactionFactory(this, messageStore); + brokerTransactionFactory.syncWithMessageStore(messageStore); startupContext.registerService(Broker.class, this); initRestApi(startupContext); @@ -625,6 +626,29 @@ public BrokerTransaction newDistributedTransaction() { return brokerTransactionFactory.newDistributedTransaction(); } + @Override + public Set restoreDtxPreparedMessages(Xid xid, Collection messages) throws BrokerException { + Set queueHandlers = new HashSet<>(); + lock.readLock().lock(); + try { + for (Message message : messages) { + try { + messageStore.add(xid, message.shallowCopy()); + for (String queueName : message.getAttachedDurableQueues()) { + QueueHandler queueHandler = queueRegistry.getQueueHandler(queueName); + queueHandler.prepareForEnqueue(xid, message.shallowCopy()); + queueHandlers.add(queueHandler); + } + } finally { + message.release(); + } + } + } finally { + lock.readLock().unlock(); + } + return queueHandlers; + } + private class BrokerHelper { public void startMessageDelivery() { @@ -669,6 +693,7 @@ public void activate() { try { queueRegistry.reloadQueuesOnBecomingActive(); exchangeRegistry.reloadExchangesOnBecomingActive(queueRegistry); + brokerTransactionFactory.syncWithMessageStore(messageStore); } catch (BrokerException e) { LOGGER.error("Error on loading data from the database on becoming active ", e); } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/Metadata.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/Metadata.java index 4cc7b05f320..edc71b5d8c5 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/Metadata.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/Metadata.java @@ -80,6 +80,15 @@ public Metadata(String routingKey, String exchangeName, long contentLength) { this.headers = FieldTable.EMPTY_TABLE; } + public Metadata(String routingKey, String exchangeName, long contentLength, byte[] propertyBytes) throws Exception { + this.routingKey = routingKey; + this.exchangeName = exchangeName; + this.contentLength = contentLength; + this.properties = FieldTable.EMPTY_TABLE; + this.headers = FieldTable.EMPTY_TABLE; + setPropertiesFromBytes(propertyBytes); + } + public String getRoutingKey() { return routingKey; } @@ -101,7 +110,6 @@ public Metadata shallowCopyWith(String routingKey, String exchangeName) { metadata.properties = properties; metadata.headers = headers; return metadata; - } @Override @@ -153,7 +161,7 @@ public void addHeader(String name, String value) { headers.add(ShortString.parseString(name), FieldValue.parseLongString(value)); } - public byte[] getBytes() { + public byte[] getPropertiesAsBytes() { long size = properties.getSize() + headers.getSize(); byte[] bytes = new byte[(int) size]; ByteBuf buffer = Unpooled.wrappedBuffer(bytes); @@ -162,4 +170,10 @@ public byte[] getBytes() { headers.write(buffer); return bytes; } + + private void setPropertiesFromBytes(byte[] bytes) throws Exception { + ByteBuf buffer = Unpooled.wrappedBuffer(bytes); + properties = FieldTable.parse(buffer); + headers = FieldTable.parse(buffer); + } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/SecureBrokerImpl.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/SecureBrokerImpl.java index 9101d387a7e..a8d5548f61e 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/SecureBrokerImpl.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/SecureBrokerImpl.java @@ -299,4 +299,9 @@ public BrokerTransaction newLocalTransaction() { public BrokerTransaction newDistributedTransaction() { return new SecureBrokerTransaction(broker.newDistributedTransaction(), subject, authHandler); } + + @Override + public Set restoreDtxPreparedMessages(Xid xid, Collection messages) throws BrokerException { + return broker.restoreDtxPreparedMessages(xid, messages); + } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/DbMessageStore.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/DbMessageStore.java index 1d194f1cf47..4ab03123f99 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/DbMessageStore.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/DbMessageStore.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.concurrent.ThreadFactory; +import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; import javax.transaction.xa.Xid; @@ -112,7 +113,7 @@ void commit(Xid xid, TransactionData transactionData) throws BrokerException { } @Override - protected void rollback(Xid xid) throws BrokerException { + public void rollback(Xid xid) throws BrokerException { try { messageDao.rollbackPreparedData(xid); } catch (DaoException e) { @@ -142,4 +143,23 @@ public void prepare(Xid xid, TransactionData transactionData) throws BrokerExcep throw new BrokerException(e.getMessage(), e); } } + + @Override + public void retrieveStoredXids(Consumer xidConsumer) throws BrokerException { + try { + messageDao.retrieveAllStoredXids(xidConsumer); + } catch (DaoException e) { + throw new BrokerException(e.getMessage(), e); + } + } + + @Override + public Collection recoverEnqueuedMessages(Xid xid) throws BrokerException { + try { + return messageDao.retrieveAllEnqueuedMessages(xid); + } catch (DaoException e) { + throw new BrokerException(e.getMessage(), e); + } + } + } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/MessageStore.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/MessageStore.java index a27019d107f..778144504b4 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/MessageStore.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/MessageStore.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import javax.transaction.xa.Xid; /** @@ -131,7 +132,7 @@ private void updateDeletableMessages(TransactionData transactionData) { } } - public void cancel(Xid xid) throws BrokerException { + public void remove(Xid xid) throws BrokerException { rollback(xid); clear(xid); } @@ -142,8 +143,10 @@ public void branch(Xid xid) { public void clear(Xid xid) { TransactionData transactionData = transactionMap.remove(xid); - transactionData.releaseEnqueueMessages(); - transactionData.clear(); + if (Objects.nonNull(transactionData)) { + transactionData.releaseEnqueueMessages(); + transactionData.clear(); + } } abstract void publishMessageToStore(Message message); @@ -156,11 +159,15 @@ public void clear(Xid xid) { abstract void commit(Xid xid, TransactionData transactionData) throws BrokerException; - protected abstract void rollback(Xid xid) throws BrokerException; + public abstract void rollback(Xid xid) throws BrokerException; public abstract void fillMessageData(QueueBuffer queueBuffer, Message message); public abstract Collection readAllMessagesForQueue(String queueName) throws BrokerException; public abstract void prepare(Xid xid, TransactionData transactionData) throws BrokerException; + + public abstract void retrieveStoredXids(Consumer consumer) throws BrokerException; + + public abstract Collection recoverEnqueuedMessages(Xid xid) throws BrokerException; } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/NullMessageStore.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/NullMessageStore.java index 789976153b0..38343cc6505 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/NullMessageStore.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/NullMessageStore.java @@ -19,11 +19,13 @@ package io.ballerina.messaging.broker.core.store; +import io.ballerina.messaging.broker.core.BrokerException; import io.ballerina.messaging.broker.core.Message; import io.ballerina.messaging.broker.core.queue.QueueBuffer; import java.util.Collection; import java.util.Collections; +import java.util.function.Consumer; import javax.transaction.xa.Xid; /** @@ -57,7 +59,7 @@ void commit(Xid xid, TransactionData transactionData) { } @Override - protected void rollback(Xid xid) { + public void rollback(Xid xid) { // Do nothing } @@ -75,4 +77,14 @@ public Collection readAllMessagesForQueue(String queueName) { public void prepare(Xid xid, TransactionData transactionData) { // Do nothing } + + @Override + public void retrieveStoredXids(Consumer consumer) throws BrokerException { + // Do nothing + } + + @Override + public Collection recoverEnqueuedMessages(Xid xid) { + return Collections.emptyList(); + } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/MessageDao.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/MessageDao.java index d8aca37edf6..89f94286dab 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/MessageDao.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/MessageDao.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import javax.transaction.xa.Xid; /** @@ -78,4 +79,21 @@ public interface MessageDao { * @param xid {@link Xid} of the rollback operation related transaction */ void rollbackPreparedData(Xid xid) throws DaoException; + + /** + * Retrieve all the xids stored on the database and invoke the xidConsumer for each {@link Xid} found. + * + * @param xidConsumer {@link Consumer} that accept an Xid. + * @throws DaoException throws on database failure. + */ + void retrieveAllStoredXids(Consumer xidConsumer) throws DaoException; + + /** + * Retrieve prepared enqueued messages for a given {@link Xid} from storage. + * + * @param xid {@link Xid} of the prepared branch + * @return Enqueued messages + * @throws DaoException throws when the message retrieval fails due to an error. + */ + Collection retrieveAllEnqueuedMessages(Xid xid) throws DaoException; } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/DtxCrudOperationsDao.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/DtxCrudOperationsDao.java index b7613bda60a..631fca92524 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/DtxCrudOperationsDao.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/DtxCrudOperationsDao.java @@ -21,17 +21,24 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.ballerina.messaging.broker.common.BaseDao; +import io.ballerina.messaging.broker.common.DaoException; import io.ballerina.messaging.broker.core.Broker; import io.ballerina.messaging.broker.core.ContentChunk; import io.ballerina.messaging.broker.core.Message; +import io.ballerina.messaging.broker.core.Metadata; import io.ballerina.messaging.broker.core.store.QueueDetachEventList; +import io.ballerina.messaging.broker.core.transaction.XidImpl; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; +import java.util.HashMap; import java.util.Map; +import java.util.function.Consumer; import javax.sql.DataSource; import javax.transaction.xa.Xid; @@ -106,7 +113,7 @@ private void prepareMetadataBatch(long internalXid, PreparedStatement insertMeta insertMetadataStatement.setString(3, message.getMetadata().getExchangeName()); insertMetadataStatement.setString(4, message.getMetadata().getRoutingKey()); insertMetadataStatement.setLong(5, message.getMetadata().getContentLength()); - insertMetadataStatement.setBytes(6, message.getMetadata().getBytes()); + insertMetadataStatement.setBytes(6, message.getMetadata().getPropertiesAsBytes()); insertMetadataStatement.addBatch(); } @@ -162,27 +169,6 @@ private void copyFromPreparedTables(Connection connection, long internalXid, } } - public long getInternalXid(Connection connection, Xid xid) throws SQLException { - - PreparedStatement selectInternalXidStatement = null; - ResultSet resultSet = null; - long internalXid = -1; - try { - selectInternalXidStatement = connection.prepareStatement(RDBMSConstants.PS_DTX_SELECT_INTERNAL_XID); - selectInternalXidStatement.setInt(1, xid.getFormatId()); - selectInternalXidStatement.setBytes(2, xid.getGlobalTransactionId()); - selectInternalXidStatement.setBytes(3, xid.getBranchQualifier()); - resultSet = selectInternalXidStatement.executeQuery(); - if (resultSet.first()) { - internalXid = resultSet.getLong(1); - } - return internalXid; - } finally { - close(resultSet); - close(selectInternalXidStatement); - } - } - public void removePreparedData(Connection connection, long internalXid) throws SQLException { PreparedStatement deleteXidStatement = null; try { @@ -204,4 +190,109 @@ public void restoreDequeueMessages(Connection connection, long internalXid) thro close(preparedStatement); } } + + public void retrieveAllXids(Connection connection, Consumer xidConsumer) throws SQLException { + PreparedStatement statement = null; + ResultSet resultSet = null; + try { + statement = connection.prepareStatement(RDBMSConstants.PS_DTX_SELECT_ALL_XIDS); + resultSet = statement.executeQuery(); + while (resultSet.next()) { + XidImpl xid = new XidImpl( + resultSet.getLong(1), + resultSet.getInt(2), + resultSet.getBytes(3), + resultSet.getBytes(4) + ); + xidConsumer.accept(xid); + } + } finally { + close(resultSet); + close(statement); + } + } + + public Collection retrieveEnqueuedMessages(Connection connection, long internalXid) + throws DaoException { + HashMap enqueuedMessages = new HashMap<>(); + try { + populateMetadata(connection, internalXid, enqueuedMessages); + populateContent(connection, internalXid, enqueuedMessages); + updateQueueMapping(connection, internalXid, enqueuedMessages); + return enqueuedMessages.values(); + } catch (Exception e) { + throw new DaoException("Error occurred while retrieving enqueued dtx messages", e); + } + } + + private void updateQueueMapping(Connection connection, long internalXid, HashMap enqueuedMessages) + throws SQLException { + PreparedStatement queueMappingStmt = null; + ResultSet resultSet = null; + + try { + queueMappingStmt = connection.prepareStatement(RDBMSConstants.PS_DTX_SELECT_QUEUE_MAPPING); + queueMappingStmt.setLong(1, internalXid); + + resultSet = queueMappingStmt.executeQuery(); + while (resultSet.next()) { + long messageId = resultSet.getLong("MESSAGE_ID"); + String queueName = resultSet.getString("QUEUE_NAME"); + Message message = enqueuedMessages.get(messageId); + message.addAttachedDurableQueue(queueName); + } + } finally { + close(resultSet); + close(queueMappingStmt); + } + } + + private void populateContent(Connection connection, long internalXid, HashMap enqueuedMessages) + throws SQLException { + PreparedStatement retrieveContentStmt = null; + ResultSet resultSet = null; + try { + retrieveContentStmt = connection.prepareStatement(RDBMSConstants.PS_DTX_SELECT_ENQUEUED_CONTENT); + retrieveContentStmt.setLong(1, internalXid); + + resultSet = retrieveContentStmt.executeQuery(); + + while (resultSet.next()) { + long messageId = resultSet.getLong("MESSAGE_ID"); + Message message = enqueuedMessages.get(messageId); + long offset = resultSet.getLong("CONTENT_OFFSET"); + byte[] contentBytes = resultSet.getBytes("MESSAGE_CONTENT"); + ByteBuf byteBuf = Unpooled.wrappedBuffer(contentBytes); + message.addChunk(new ContentChunk(offset, byteBuf)); + } + } finally { + close(resultSet); + close(retrieveContentStmt); + } + } + + private void populateMetadata(Connection connection, long internalXid, HashMap enqueuedMessages) + throws Exception { + PreparedStatement retrieveMetadataStmt = null; + ResultSet resultSet = null; + try { + retrieveMetadataStmt = connection.prepareStatement(RDBMSConstants.PS_DTX_SELECT_ENQUEUED_METADATA); + retrieveMetadataStmt.setLong(1, internalXid); + + resultSet = retrieveMetadataStmt.executeQuery(); + while (resultSet.next()) { + Metadata metadata = new Metadata(resultSet.getString("ROUTING_KEY"), + resultSet.getString("EXCHANGE_NAME"), + resultSet.getLong("CONTENT_LENGTH"), + resultSet.getBytes("MESSAGE_METADATA") + ); + + Message message = new Message(resultSet.getLong("MESSAGE_ID"), metadata); + enqueuedMessages.put(message.getInternalId(), message); + } + } finally { + close(resultSet); + close(retrieveMetadataStmt); + } + } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/MessageCrudOperationsDao.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/MessageCrudOperationsDao.java index 04e58bc45fc..f80f4476a0b 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/MessageCrudOperationsDao.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/MessageCrudOperationsDao.java @@ -21,14 +21,12 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.ballerina.messaging.broker.common.BaseDao; -import io.ballerina.messaging.broker.common.data.types.FieldTable; import io.ballerina.messaging.broker.core.BrokerException; import io.ballerina.messaging.broker.core.ContentChunk; import io.ballerina.messaging.broker.core.Message; import io.ballerina.messaging.broker.core.Metadata; import io.ballerina.messaging.broker.core.metrics.BrokerMetricManager; import io.ballerina.messaging.broker.core.store.QueueDetachEventList; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.wso2.carbon.metrics.core.Timer.Context; @@ -112,7 +110,7 @@ private void prepareMetadata(PreparedStatement metadataStmt, Message message) th metadataStmt.setString(2, metadata.getExchangeName()); metadataStmt.setString(3, metadata.getRoutingKey()); metadataStmt.setLong(4, metadata.getContentLength()); - metadataStmt.setBytes(5, metadata.getBytes()); + metadataStmt.setBytes(5, metadata.getPropertiesAsBytes()); metadataStmt.addBatch(); } @@ -222,11 +220,8 @@ private void populateMessageWithMetadata(Connection connection, String routingKey = metadataResultSet.getString(3); long contentLength = metadataResultSet.getLong(4); byte[] bytes = metadataResultSet.getBytes(5); - ByteBuf buffer = Unpooled.wrappedBuffer(bytes); try { - Metadata metadata = new Metadata(routingKey, exchangeName, contentLength); - metadata.setProperties(FieldTable.parse(buffer)); - metadata.setHeaders(FieldTable.parse(buffer)); + Metadata metadata = new Metadata(routingKey, exchangeName, contentLength, bytes); List messages = messageMap.get(messageId); for (Message message : messages) { @@ -236,8 +231,6 @@ private void populateMessageWithMetadata(Connection connection, } } catch (Exception e) { throw new BrokerException("Error occurred while parsing metadata properties", e); - } finally { - buffer.release(); } } } finally { @@ -273,7 +266,7 @@ private void populateContent(Connection connection, String idList, List messages = messageMap.get(messageId); for (Message message : messages) { if (Objects.nonNull(message)) { - message.addChunk(new ContentChunk(offset, Unpooled.copiedBuffer(bytes))); + message.addChunk(new ContentChunk(offset, Unpooled.wrappedBuffer(bytes))); } } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/MessageDaoImpl.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/MessageDaoImpl.java index c163f43e44d..7f913894890 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/MessageDaoImpl.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/MessageDaoImpl.java @@ -20,15 +20,18 @@ package io.ballerina.messaging.broker.core.store.dao.impl; import io.ballerina.messaging.broker.common.DaoException; +import io.ballerina.messaging.broker.common.util.function.ThrowingConsumer; import io.ballerina.messaging.broker.core.Message; import io.ballerina.messaging.broker.core.store.TransactionData; import io.ballerina.messaging.broker.core.store.dao.MessageDao; +import java.sql.Connection; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import javax.transaction.xa.Xid; /** @@ -74,10 +77,8 @@ public void read(Map> readList) throws DaoException { public void prepare(Xid xid, TransactionData transactionData) throws DaoException { dtxCrudOperationsDao.transaction(connection -> { long internalXid = dtxCrudOperationsDao.storeXid(connection, xid); - dtxCrudOperationsDao.prepareEnqueueMessages(connection, internalXid, - transactionData.getEnqueueMessages()); - dtxCrudOperationsDao.prepareDetachMessages(connection, internalXid, - transactionData.getDetachMessageMap()); + dtxCrudOperationsDao.prepareEnqueueMessages(connection, internalXid, transactionData.getEnqueueMessages()); + dtxCrudOperationsDao.prepareDetachMessages(connection, internalXid, transactionData.getDetachMessageMap()); crudOperationsDao.detachFromQueue(connection, transactionData.getDetachMessageMap()); xidToInternalIdMap.put(xid, internalXid); }); @@ -107,6 +108,24 @@ public void rollbackPreparedData(Xid xid) throws DaoException { xidToInternalIdMap.remove(xid); } + @Override + public void retrieveAllStoredXids(Consumer xidConsumer) throws DaoException { + dtxCrudOperationsDao.transaction((ThrowingConsumer) connection -> + dtxCrudOperationsDao.retrieveAllXids(connection, xid -> { + xidToInternalIdMap.put(xid, xid.getInternalXid()); + xidConsumer.accept(xid); + })); + } + + @Override + public Collection retrieveAllEnqueuedMessages(Xid xid) throws DaoException { + return dtxCrudOperationsDao.selectAndGetOperation(connection -> + dtxCrudOperationsDao.retrieveEnqueuedMessages(connection, getInternalXid(xid)) + ); + + + } + private long getInternalXid(Xid xid) { Long id = xidToInternalIdMap.get(xid); if (Objects.isNull(id)) { diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/RDBMSConstants.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/RDBMSConstants.java index 8f123b452a5..dae7a3ed570 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/RDBMSConstants.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/store/dao/impl/RDBMSConstants.java @@ -117,4 +117,17 @@ private RDBMSConstants() { public static final String PS_DTX_RESTORE_DEQUEUE_MAPPING = "INSERT INTO MB_QUEUE_MAPPING (MESSAGE_ID, QUEUE_NAME) " + "SELECT MESSAGE_ID, QUEUE_NAME FROM MB_DTX_DEQUEUE_MAPPING WHERE INTERNAL_XID=?"; + + public static final String PS_DTX_SELECT_ALL_XIDS = + "SELECT INTERNAL_XID, FORMAT_CODE, BRANCH_ID, GLOBAL_ID FROM MB_DTX_XID"; + + public static final String PS_DTX_SELECT_ENQUEUED_METADATA = + "SELECT MESSAGE_ID, EXCHANGE_NAME, ROUTING_KEY, CONTENT_LENGTH, MESSAGE_METADATA FROM " + + "MB_DTX_ENQUEUE_METADATA WHERE INTERNAL_XID=?"; + + public static final String PS_DTX_SELECT_ENQUEUED_CONTENT = + "SELECT MESSAGE_ID, CONTENT_OFFSET, MESSAGE_CONTENT FROM MB_DTX_ENQUEUE_CONTENT WHERE INTERNAL_XID=?"; + + public static final String PS_DTX_SELECT_QUEUE_MAPPING = + "SELECT MESSAGE_ID, QUEUE_NAME FROM MB_DTX_ENQUEUE_MAPPING WHERE INTERNAL_XID=?"; } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/AutoCommitTransaction.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/AutoCommitTransaction.java index a98faae0032..e747373f8cb 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/AutoCommitTransaction.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/AutoCommitTransaction.java @@ -104,5 +104,10 @@ public void setTimeout(Xid xid, long timeout, TimeUnit timeUnit) throws Validati throw new ValidationException("dtx.set-timeout called on non-transactional channel"); } + @Override + public boolean inTransactionBlock() { + return false; + } + } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/Branch.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/Branch.java index 6c8643b787d..981c42f1e8c 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/Branch.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/Branch.java @@ -27,6 +27,7 @@ import io.ballerina.messaging.broker.core.store.MessageStore; import io.ballerina.messaging.broker.core.util.MessageTracer; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -76,6 +77,11 @@ public enum State { */ PREPARED, + /** + * Already prepared branch that is yet to be fully restored from persistence store. + */ + PARTIAL_RESTORE, + /** * Branch heuristically committed */ @@ -84,9 +90,10 @@ public enum State { /** * Branch heuristically rolled back */ - HEUR_RB + HEUR_RB; } + /** * States of associated sessions for the branch. */ @@ -101,7 +108,6 @@ private enum SessionState { */ SUSPENDED } - private State state; private Xid xid; @@ -140,11 +146,16 @@ public void dequeue(String queueName, Message message) throws BrokerException { } public void prepare() throws BrokerException { + state = State.PRE_PREPARE; messageStore.prepare(xid); + state = State.PREPARED; MessageTracer.trace(xid, MessageTracer.PREPARED); } public void commit(boolean onePhase) throws BrokerException { + if (state == State.PARTIAL_RESTORE) { + affectedQueueHandlers.addAll(recoverEnqueuedMessages()); + } messageStore.flush(xid, onePhase); for (QueueHandler queueHandler: affectedQueueHandlers) { queueHandler.commit(xid); @@ -159,7 +170,7 @@ public void rollback() { } public void dtxRollback() throws BrokerException { - messageStore.cancel(xid); + messageStore.remove(xid); rollbackQueueHandlers(); MessageTracer.trace(xid, MessageTracer.ROLLBACK); } @@ -254,6 +265,11 @@ public void setTimeoutTaskFuture(ScheduledFuture future) { this.timeoutTaskFuture = future; } + private Set recoverEnqueuedMessages() throws BrokerException { + Collection messages = messageStore.recoverEnqueuedMessages(xid); + return broker.restoreDtxPreparedMessages(xid, messages); + } + /** * Check whether the branch has timed out. Returns true if the transaction timed out before prepare is invoked. * @return True if expired false otherwise. @@ -265,4 +281,16 @@ public boolean isExpired() { public Future getTimeoutTaskFuture() { return timeoutTaskFuture; } + + void markAsRecoveryBranch() { + state = State.PARTIAL_RESTORE; + } + + boolean isPrepared() { + return state == State.PREPARED || state == State.PARTIAL_RESTORE; + } + + boolean isRollbackOnly () { + return state == State.ROLLBACK_ONLY; + } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BranchFactory.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BranchFactory.java index 39b129c9b07..290ae708f31 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BranchFactory.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BranchFactory.java @@ -37,6 +37,7 @@ public class BranchFactory { private final MessageStore messageStore; private final EnqueueDequeueStrategy enqueueDequeueStrategy; + BranchFactory(Broker broker, MessageStore messageStore) { this.broker = broker; this.messageStore = messageStore; diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BrokerTransaction.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BrokerTransaction.java index 1febbd8defa..be4824fc0cb 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BrokerTransaction.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BrokerTransaction.java @@ -47,6 +47,7 @@ interface Action { */ void onRollback(); + } /** * Dequeue a message from queue. @@ -55,7 +56,6 @@ interface Action { * @param message An acknowledgement message */ void dequeue(String queue, Message message) throws BrokerException; - /** * Enqueue a message to a queue. * @@ -145,4 +145,11 @@ interface Action { * @param timeUnit {@link TimeUnit} of the provided timeout */ void setTimeout(Xid xid, long timeout, TimeUnit timeUnit) throws ValidationException; + + /** + * Returns true if the transaction object is within a transaction block. + * + * @return true if within a transaction block, false otherwise. + */ + boolean inTransactionBlock(); } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BrokerTransactionFactory.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BrokerTransactionFactory.java index 26fa5948bb5..328b94131a0 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BrokerTransactionFactory.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/BrokerTransactionFactory.java @@ -20,6 +20,7 @@ package io.ballerina.messaging.broker.core.transaction; import io.ballerina.messaging.broker.core.Broker; +import io.ballerina.messaging.broker.core.BrokerException; import io.ballerina.messaging.broker.core.store.MessageStore; /** @@ -31,9 +32,13 @@ public class BrokerTransactionFactory { private final Registry registry; - public BrokerTransactionFactory(Broker broker, MessageStore messageStore) { + public BrokerTransactionFactory(Broker broker, MessageStore messageStore) throws BrokerException { this.branchFactory = new BranchFactory(broker, messageStore); - this.registry = new Registry(); + this.registry = new Registry(branchFactory); + } + + public void syncWithMessageStore(MessageStore messageStore) throws BrokerException { + registry.syncWithMessageStore(messageStore); } public LocalTransaction newLocalTransaction() { diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/DistributedTransaction.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/DistributedTransaction.java index 59a7ce6aca2..386afdc9631 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/DistributedTransaction.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/DistributedTransaction.java @@ -178,4 +178,9 @@ public void forget(Xid xid) throws ValidationException { public void setTimeout(Xid xid, long timeout, TimeUnit timeUnit) throws ValidationException { transactionRegistry.setTimeout(xid, timeout, timeUnit); } + + @Override + public boolean inTransactionBlock() { + return enqueueDequeueStrategy instanceof Branch; + } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/DtxStateTransitionException.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/DtxStateTransitionException.java new file mode 100644 index 00000000000..9c2f08d5c7b --- /dev/null +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/DtxStateTransitionException.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package io.ballerina.messaging.broker.core.transaction; + +import io.ballerina.messaging.broker.common.ValidationException; + +import javax.transaction.xa.Xid; + +/** + * Exception to handle invalid state transitions of a transaction branch. + */ +public class DtxStateTransitionException extends ValidationException { + + DtxStateTransitionException(Xid xid, Branch.State from, Branch.State to) { + super("Invalid branch state transition request for xid " + xid + " from " + from + " to " + to); + } + +} diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/ForwardingBrokerTransaction.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/ForwardingBrokerTransaction.java index 723278e82da..5d58e5df2fc 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/ForwardingBrokerTransaction.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/ForwardingBrokerTransaction.java @@ -102,4 +102,9 @@ public void forget(Xid xid) throws ValidationException { public void setTimeout(Xid xid, long timeout, TimeUnit timeUnit) throws ValidationException { brokerTransaction.setTimeout(xid, timeout, timeUnit); } + + @Override + public boolean inTransactionBlock() { + return brokerTransaction.inTransactionBlock(); + } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/LocalTransaction.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/LocalTransaction.java index 4279fa784f6..8ac21f9e493 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/LocalTransaction.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/LocalTransaction.java @@ -148,6 +148,11 @@ public void setTimeout(Xid xid, long timeout, TimeUnit timeUnit) throws Validati throw new ValidationException("dtx.set-timeout called on local-transactional channel"); } + @Override + public boolean inTransactionBlock() { + return true; + } + @Override public void addPostTransactionAction(Action postTransactionAction) { postTransactionActions.add(postTransactionAction); diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/Registry.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/Registry.java index fc85d67a728..3d34f72012c 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/Registry.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/Registry.java @@ -22,11 +22,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.ballerina.messaging.broker.common.ValidationException; import io.ballerina.messaging.broker.core.BrokerException; +import io.ballerina.messaging.broker.core.store.MessageStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -51,12 +53,25 @@ public class Registry { private static final String TIMED_OUT_ERROR_MSG = "Transaction timed out for xid "; + /** + * {@link Xid} to branch mapping of branches in memory. + */ private final Map branchMap; private final ScheduledExecutorService branchTimeoutExecutorService; - Registry() { + private final BranchFactory branchFactory; + + /** + * {@link Xid}s of already prepared branches that are currently not in memory. This can be due to a node fail-over + * or a node restart. + */ + private final Set storedXidSet; + + Registry(BranchFactory branchFactory) { + this.branchFactory = branchFactory; branchMap = new ConcurrentHashMap<>(); + storedXidSet = ConcurrentHashMap.newKeySet(); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("DtxBranchTimeoutExecutor-%d").build(); this.branchTimeoutExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); } @@ -68,14 +83,24 @@ public void register(Branch branch) throws ValidationException { } public void unregister(Xid xid) { - branchMap.remove(xid); + if (Objects.isNull(branchMap.remove(xid))) { + storedXidSet.remove(xid); + } } - public Branch getBranch(Xid xid) { + public Branch getBranch(Xid xid) throws ValidationException { + if (storedXidSet.contains(xid)) { + throw new ValidationException("Branch is in prepared stage. Branch can be only be " + + "committed or rollbacked."); + } return branchMap.get(xid); } public synchronized void prepare(Xid xid) throws ValidationException, BrokerException { + if (storedXidSet.contains(xid)) { + throw new DtxStateTransitionException(xid, Branch.State.PREPARED, Branch.State.PREPARED); + } + Branch branch = branchMap.get(xid); if (Objects.isNull(branch)) { throw new ValidationException(UNKNOWN_XID_ERROR_MSG + xid); @@ -94,10 +119,7 @@ public synchronized void prepare(Xid xid) throws ValidationException, BrokerExce } else if (branch.getState() != Branch.State.ACTIVE) { throw new ValidationException("Cannot prepare a branch in state " + branch.getState()); } - - branch.setState(Branch.State.PRE_PREPARE); branch.prepare(); - branch.setState(Branch.State.PREPARED); } private void checkForBranchExpiration(Branch branch) throws ValidationException { @@ -109,14 +131,42 @@ private void checkForBranchExpiration(Branch branch) throws ValidationException public synchronized void commit(Xid xid, boolean onePhase) throws ValidationException, BrokerException { Branch branch = branchMap.get(xid); - validateCommitRequest(xid, onePhase, branch); + if (Objects.isNull(branch)) { + branch = checkForBranchRecovery(xid); + } else { + if (branch.hasAssociatedActiveSessions()) { + throw new ValidationException(ASSOCIATED_XID_ERROR_MSG + xid); + } + checkForBranchExpiration(branch); + if (branch.isRollbackOnly()) { + throw new ValidationException("Branch is set to rollback only. Can't commit with xid " + xid); + } + if (!onePhase && !branch.isPrepared()) { + throw new ValidationException("Cannot call two-phase commit on a non-prepared branch for xid " + xid); + } + } + + if (onePhase && branch.isPrepared()) { + throw new ValidationException("Cannot call one-phase commit on a prepared branch for xid " + xid); + } branch.clearAssociations(); - branch.setState(Branch.State.FORGOTTEN); branch.commit(onePhase); + branch.setState(Branch.State.FORGOTTEN); unregister(xid); } + private Branch checkForBranchRecovery(Xid xid) throws UnknownDtxBranchException { + Branch branch; + if (storedXidSet.contains(xid)) { + branch = branchFactory.createBranch(xid); + branch.markAsRecoveryBranch(); + } else { + throw new UnknownDtxBranchException(xid); + } + return branch; + } + private boolean cancelTimeoutTask(Branch branch) { Future timeoutTaskFuture = branch.getTimeoutTaskFuture(); return Objects.isNull(timeoutTaskFuture) @@ -124,38 +174,17 @@ private boolean cancelTimeoutTask(Branch branch) { || timeoutTaskFuture.cancel(false); } - private void validateCommitRequest(Xid xid, boolean onePhase, Branch branch) throws ValidationException { - if (Objects.isNull(branch)) { - throw new ValidationException(UNKNOWN_XID_ERROR_MSG + xid); - } - if (branch.hasAssociatedActiveSessions()) { - throw new ValidationException(ASSOCIATED_XID_ERROR_MSG + xid); - } - checkForBranchExpiration(branch); - if (branch.getState() == Branch.State.ROLLBACK_ONLY) { - throw new ValidationException("Branch is set to rollback only. Can't commit with xid " + xid); - } - if (onePhase && branch.getState() == Branch.State.PREPARED) { - throw new ValidationException("Cannot call one-phase commit on a prepared branch for xid " + xid); - } - if (!onePhase && branch.getState() != Branch.State.PREPARED) { - throw new ValidationException("Cannot call two-phase commit on a non-prepared branch for xid " + xid); - } - } - public synchronized void rollback(Xid xid) throws ValidationException, BrokerException { Branch branch = branchMap.get(xid); if (Objects.isNull(branch)) { - throw new ValidationException(UNKNOWN_XID_ERROR_MSG + xid); - } - - checkForBranchExpiration(branch); - - if (branch.hasAssociatedActiveSessions()) { - throw new ValidationException(ASSOCIATED_XID_ERROR_MSG + xid); + branch = checkForBranchRecovery(xid); + } else { + checkForBranchExpiration(branch); + if (branch.hasAssociatedActiveSessions()) { + throw new ValidationException(ASSOCIATED_XID_ERROR_MSG + xid); + } + branch.clearAssociations(); } - - branch.clearAssociations(); branch.dtxRollback(); branch.setState(Branch.State.FORGOTTEN); unregister(xid); @@ -196,7 +225,7 @@ public void setTimeout(Xid xid, long timeout, TimeUnit timeUnit) throws Validati LOGGER.debug("timing out dtx task with xid {}", xid); synchronized (branch) { - if (branch.getState() == Branch.State.PREPARED) { + if (branch.isPrepared()) { LOGGER.debug("Branch already prepared. Won't be timed out. Xid {}", xid); return; } @@ -211,4 +240,9 @@ public void setTimeout(Xid xid, long timeout, TimeUnit timeUnit) throws Validati }, timeout, timeUnit); branch.setTimeoutTaskFuture(future); } + + void syncWithMessageStore(MessageStore messageStore) throws BrokerException { + storedXidSet.clear(); + messageStore.retrieveStoredXids(storedXidSet::add); + } } diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/UnknownDtxBranchException.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/UnknownDtxBranchException.java new file mode 100644 index 00000000000..8c5598325ce --- /dev/null +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/UnknownDtxBranchException.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package io.ballerina.messaging.broker.core.transaction; + +import io.ballerina.messaging.broker.common.ValidationException; + +import javax.transaction.xa.Xid; + +/** + * Exception class to handle requests for branches with unknown {@link Xid}s. + */ +public class UnknownDtxBranchException extends ValidationException { + + UnknownDtxBranchException(Xid xid) { + super("Unknown branch for xid " + xid); + } + + UnknownDtxBranchException(Xid xid, Throwable cause) { + super("Unknown branch for xid " + xid, cause); + } +} diff --git a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/XidImpl.java b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/XidImpl.java index a2dd486cc46..22e6a61cb51 100644 --- a/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/XidImpl.java +++ b/modules/broker-core/src/main/java/io/ballerina/messaging/broker/core/transaction/XidImpl.java @@ -37,6 +37,8 @@ public class XidImpl implements Xid { private final byte[] globalTransactionId; + private long internalXid; + @SuppressFBWarnings(value = {"EI_EXPOSE_REP2", "EI_EXPOSE_REP2"}, justification = "Data holder of global transaction identifier and branch qualifier") public XidImpl(int formatId, byte[] branchQualifier, byte[] globalTransactionId) { @@ -45,6 +47,11 @@ public XidImpl(int formatId, byte[] branchQualifier, byte[] globalTransactionId) this.globalTransactionId = globalTransactionId; } + public XidImpl(long internalXid, int formatId, byte[] branchQualifier, byte[] globalTransactionId) { + this(formatId, branchQualifier, globalTransactionId); + this.internalXid = internalXid; + } + @Override public int getFormatId() { return formatId; @@ -87,4 +94,8 @@ public String toString() { ", globalTransactionId=" + new String(globalTransactionId, StandardCharsets.UTF_8) + '}'; } + + public long getInternalXid() { + return this.internalXid; + } } diff --git a/modules/broker-core/src/test/java/io/ballerina/messaging/broker/core/transaction/DistributedTransactionValidationTest.java b/modules/broker-core/src/test/java/io/ballerina/messaging/broker/core/transaction/DistributedTransactionValidationTest.java index 3bd511eb97f..3dd244d713f 100644 --- a/modules/broker-core/src/test/java/io/ballerina/messaging/broker/core/transaction/DistributedTransactionValidationTest.java +++ b/modules/broker-core/src/test/java/io/ballerina/messaging/broker/core/transaction/DistributedTransactionValidationTest.java @@ -20,6 +20,7 @@ package io.ballerina.messaging.broker.core.transaction; import io.ballerina.messaging.broker.common.ValidationException; +import io.ballerina.messaging.broker.core.BrokerException; import io.ballerina.messaging.broker.core.configuration.BrokerCoreConfiguration; import io.ballerina.messaging.broker.core.metrics.NullBrokerMetricManager; import io.ballerina.messaging.broker.core.store.MemBackedStoreFactory; @@ -50,8 +51,8 @@ public void setXid() { } @BeforeMethod - public void setUp() { - transactionRegistry = new Registry(); + public void setUp() throws BrokerException { + transactionRegistry = new Registry(new BranchFactory(null, new NullMessageStore())); transaction = new DistributedTransaction(new BranchFactory(null, new NullMessageStore()), transactionRegistry); } @@ -103,8 +104,7 @@ public void testPrepareWithAssociatedActiveSession() throws Exception { transaction.prepare(xid); } - @Test (expectedExceptions = ValidationException.class, - expectedExceptionsMessageRegExp = "Branch not found with xid .*") + @Test (expectedExceptions = UnknownDtxBranchException.class) public void testCommitWithUnknownXid() throws Exception { transaction.commit(xid, true); } @@ -127,8 +127,7 @@ public void testCommitWithRollbackOnlyBranch() throws Exception { transaction.commit(xid, true); } - @Test (expectedExceptions = ValidationException.class, - expectedExceptionsMessageRegExp = "Branch not found with xid .*") + @Test (expectedExceptions = UnknownDtxBranchException.class) public void testRollbackWithUnknownXid() throws Exception { transaction.rollback(xid); } diff --git a/modules/broker-core/src/test/java/io/ballerina/messaging/broker/core/transaction/TransactionTimeoutTest.java b/modules/broker-core/src/test/java/io/ballerina/messaging/broker/core/transaction/TransactionTimeoutTest.java index edd3a29d74c..ad46232ee29 100644 --- a/modules/broker-core/src/test/java/io/ballerina/messaging/broker/core/transaction/TransactionTimeoutTest.java +++ b/modules/broker-core/src/test/java/io/ballerina/messaging/broker/core/transaction/TransactionTimeoutTest.java @@ -52,9 +52,9 @@ public void setXid() { @BeforeMethod public void setUp() throws ValidationException { - Registry transactionRegistry = new Registry(); - transaction = new DistributedTransaction(new BranchFactory(null, new NullMessageStore()), - transactionRegistry); + BranchFactory branchFactory = new BranchFactory(null, new NullMessageStore()); + Registry transactionRegistry = new Registry(branchFactory); + transaction = new DistributedTransaction(branchFactory, transactionRegistry); StoreFactory storeFactory = new MemBackedStoreFactory(new NullBrokerMetricManager(), new BrokerCoreConfiguration()); branch = new Branch(xid, storeFactory.getMessageStore(), null); @@ -75,7 +75,7 @@ public void testTimeoutWithValueZero() throws Exception { @Test public void testTransactionAfterTimeout() throws Exception { transaction.setTimeout(xid, 2, TimeUnit.MILLISECONDS); - TimeUnit.MILLISECONDS.sleep(4); + TimeUnit.MILLISECONDS.sleep(10); Assert.assertEquals(branch.isExpired(), true); Assert.assertEquals(branch.getState(), Branch.State.TIMED_OUT); } @@ -91,7 +91,7 @@ public void testTransactionBeforeTimeout() throws Exception { public void testTimeoutForAlreadyPreparedBranch() throws Exception { branch.setState(Branch.State.PREPARED); transaction.setTimeout(xid, 2, TimeUnit.MILLISECONDS); - TimeUnit.MILLISECONDS.sleep(4); + TimeUnit.MILLISECONDS.sleep(10); Assert.assertEquals(branch.isExpired(), false); Assert.assertEquals(branch.getState(), Branch.State.PREPARED);