Skip to content
This repository has been archived by the owner on Oct 5, 2021. It is now read-only.

Commit

Permalink
Merge pull request #370 from Asitha/dtx-restore
Browse files Browse the repository at this point in the history
Implement dtx-branch restore
  • Loading branch information
Asitha authored Mar 29, 2018
2 parents 8221c11 + 01b0c89 commit 5fbf275
Show file tree
Hide file tree
Showing 27 changed files with 501 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void acknowledge(long deliveryTag, boolean multiple) throws BrokerExcepti
}
if (ackData != null) {
transaction.dequeue(ackData.getQueueName(), ackData.getMessage());
if (isNonTransactional()) {
if (!transaction.inTransactionBlock()) {
ackData = unackedMessageMap.removeMarkedAcknowledgment(deliveryTag);
ackData.getMessage().release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,11 @@ static long getNextMessageId() {
* @return a new DistributedTransaction object
*/
BrokerTransaction newDistributedTransaction();

/**
* Restore enqueued data that is already in prepared state.
* @param xid
* @param messages
*/
Set<QueueHandler> restoreDtxPreparedMessages(Xid xid, Collection<Message> messages) throws BrokerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public BrokerImpl(StartupContext startupContext) throws Exception {
initDefaultDeadLetterQueue();

this.brokerTransactionFactory = new BrokerTransactionFactory(this, messageStore);
brokerTransactionFactory.syncWithMessageStore(messageStore);

startupContext.registerService(Broker.class, this);
initRestApi(startupContext);
Expand Down Expand Up @@ -625,6 +626,29 @@ public BrokerTransaction newDistributedTransaction() {
return brokerTransactionFactory.newDistributedTransaction();
}

@Override
public Set<QueueHandler> restoreDtxPreparedMessages(Xid xid, Collection<Message> messages) throws BrokerException {
Set<QueueHandler> queueHandlers = new HashSet<>();
lock.readLock().lock();
try {
for (Message message : messages) {
try {
messageStore.add(xid, message.shallowCopy());
for (String queueName : message.getAttachedDurableQueues()) {
QueueHandler queueHandler = queueRegistry.getQueueHandler(queueName);
queueHandler.prepareForEnqueue(xid, message.shallowCopy());
queueHandlers.add(queueHandler);
}
} finally {
message.release();
}
}
} finally {
lock.readLock().unlock();
}
return queueHandlers;
}

private class BrokerHelper {

public void startMessageDelivery() {
Expand Down Expand Up @@ -669,6 +693,7 @@ public void activate() {
try {
queueRegistry.reloadQueuesOnBecomingActive();
exchangeRegistry.reloadExchangesOnBecomingActive(queueRegistry);
brokerTransactionFactory.syncWithMessageStore(messageStore);
} catch (BrokerException e) {
LOGGER.error("Error on loading data from the database on becoming active ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ public Metadata(String routingKey, String exchangeName, long contentLength) {
this.headers = FieldTable.EMPTY_TABLE;
}

public Metadata(String routingKey, String exchangeName, long contentLength, byte[] propertyBytes) throws Exception {
this.routingKey = routingKey;
this.exchangeName = exchangeName;
this.contentLength = contentLength;
this.properties = FieldTable.EMPTY_TABLE;
this.headers = FieldTable.EMPTY_TABLE;
setPropertiesFromBytes(propertyBytes);
}

public String getRoutingKey() {
return routingKey;
}
Expand All @@ -101,7 +110,6 @@ public Metadata shallowCopyWith(String routingKey, String exchangeName) {
metadata.properties = properties;
metadata.headers = headers;
return metadata;

}

@Override
Expand Down Expand Up @@ -153,7 +161,7 @@ public void addHeader(String name, String value) {
headers.add(ShortString.parseString(name), FieldValue.parseLongString(value));
}

public byte[] getBytes() {
public byte[] getPropertiesAsBytes() {
long size = properties.getSize() + headers.getSize();
byte[] bytes = new byte[(int) size];
ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
Expand All @@ -162,4 +170,10 @@ public byte[] getBytes() {
headers.write(buffer);
return bytes;
}

private void setPropertiesFromBytes(byte[] bytes) throws Exception {
ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
properties = FieldTable.parse(buffer);
headers = FieldTable.parse(buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,9 @@ public BrokerTransaction newLocalTransaction() {
public BrokerTransaction newDistributedTransaction() {
return new SecureBrokerTransaction(broker.newDistributedTransaction(), subject, authHandler);
}

@Override
public Set<QueueHandler> restoreDtxPreparedMessages(Xid xid, Collection<Message> messages) throws BrokerException {
return broker.restoreDtxPreparedMessages(xid, messages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.util.Collection;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import javax.transaction.xa.Xid;

Expand Down Expand Up @@ -112,7 +113,7 @@ void commit(Xid xid, TransactionData transactionData) throws BrokerException {
}

@Override
protected void rollback(Xid xid) throws BrokerException {
public void rollback(Xid xid) throws BrokerException {
try {
messageDao.rollbackPreparedData(xid);
} catch (DaoException e) {
Expand Down Expand Up @@ -142,4 +143,23 @@ public void prepare(Xid xid, TransactionData transactionData) throws BrokerExcep
throw new BrokerException(e.getMessage(), e);
}
}

@Override
public void retrieveStoredXids(Consumer<Xid> xidConsumer) throws BrokerException {
try {
messageDao.retrieveAllStoredXids(xidConsumer);
} catch (DaoException e) {
throw new BrokerException(e.getMessage(), e);
}
}

@Override
public Collection<Message> recoverEnqueuedMessages(Xid xid) throws BrokerException {
try {
return messageDao.retrieveAllEnqueuedMessages(xid);
} catch (DaoException e) {
throw new BrokerException(e.getMessage(), e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import javax.transaction.xa.Xid;

/**
Expand Down Expand Up @@ -131,7 +132,7 @@ private void updateDeletableMessages(TransactionData transactionData) {
}
}

public void cancel(Xid xid) throws BrokerException {
public void remove(Xid xid) throws BrokerException {
rollback(xid);
clear(xid);
}
Expand All @@ -142,8 +143,10 @@ public void branch(Xid xid) {

public void clear(Xid xid) {
TransactionData transactionData = transactionMap.remove(xid);
transactionData.releaseEnqueueMessages();
transactionData.clear();
if (Objects.nonNull(transactionData)) {
transactionData.releaseEnqueueMessages();
transactionData.clear();
}
}

abstract void publishMessageToStore(Message message);
Expand All @@ -156,11 +159,15 @@ public void clear(Xid xid) {

abstract void commit(Xid xid, TransactionData transactionData) throws BrokerException;

protected abstract void rollback(Xid xid) throws BrokerException;
public abstract void rollback(Xid xid) throws BrokerException;

public abstract void fillMessageData(QueueBuffer queueBuffer, Message message);

public abstract Collection<Message> readAllMessagesForQueue(String queueName) throws BrokerException;

public abstract void prepare(Xid xid, TransactionData transactionData) throws BrokerException;

public abstract void retrieveStoredXids(Consumer<Xid> consumer) throws BrokerException;

public abstract Collection<Message> recoverEnqueuedMessages(Xid xid) throws BrokerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package io.ballerina.messaging.broker.core.store;

import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Message;
import io.ballerina.messaging.broker.core.queue.QueueBuffer;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
import javax.transaction.xa.Xid;

/**
Expand Down Expand Up @@ -57,7 +59,7 @@ void commit(Xid xid, TransactionData transactionData) {
}

@Override
protected void rollback(Xid xid) {
public void rollback(Xid xid) {
// Do nothing
}

Expand All @@ -75,4 +77,14 @@ public Collection<Message> readAllMessagesForQueue(String queueName) {
public void prepare(Xid xid, TransactionData transactionData) {
// Do nothing
}

@Override
public void retrieveStoredXids(Consumer<Xid> consumer) throws BrokerException {
// Do nothing
}

@Override
public Collection<Message> recoverEnqueuedMessages(Xid xid) {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.transaction.xa.Xid;

/**
Expand Down Expand Up @@ -78,4 +79,21 @@ public interface MessageDao {
* @param xid {@link Xid} of the rollback operation related transaction
*/
void rollbackPreparedData(Xid xid) throws DaoException;

/**
* Retrieve all the xids stored on the database and invoke the xidConsumer for each {@link Xid} found.
*
* @param xidConsumer {@link Consumer} that accept an Xid.
* @throws DaoException throws on database failure.
*/
void retrieveAllStoredXids(Consumer<Xid> xidConsumer) throws DaoException;

/**
* Retrieve prepared enqueued messages for a given {@link Xid} from storage.
*
* @param xid {@link Xid} of the prepared branch
* @return Enqueued messages
* @throws DaoException throws when the message retrieval fails due to an error.
*/
Collection<Message> retrieveAllEnqueuedMessages(Xid xid) throws DaoException;
}
Loading

0 comments on commit 5fbf275

Please sign in to comment.