summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-09 08:48:18 +0000
committerGordon Sim <gsim@apache.org>2007-05-09 08:48:18 +0000
commit41d3e70cdf4c93aba0589ed7aec61499a86c6119 (patch)
treed4b52849b875b16590241747b899bf3cc1bc86cb /java/broker
parent04186f293032c5720f9500eb2a07433572f13d24 (diff)
downloadqpid-python-41d3e70cdf4c93aba0589ed7aec61499a86c6119.tar.gz
Patch from Arnaud Simon (asimon@redhat.com) to fix tests broken by earlier changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java197
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java148
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java82
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java82
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java158
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java228
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java13
14 files changed, 847 insertions, 152 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index ae1cf43f6c..40462cf2bf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -228,10 +228,14 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
try
{
queue.delete();
- _messageStore.destroyQueue(queue);
+ if( queue.isDurable() )
+ {
+ _messageStore.destroyQueue(queue);
+ }
}
catch (Exception ex)
{
+ ex.printStackTrace();
JMException jme = new JMException(ex.getMessage());
jme.initCause(ex);
throw new MBeanException(jme, "Error in deleting queue " + queueName);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 9711bbf4d2..6dd80b3bfa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -73,10 +73,14 @@ public class AMQChannel
*/
private AtomicLong _deliveryTag = new AtomicLong(0);
- /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
+ /**
+ * A channel has a default queue (the last declared) that is used when no queue name is explictily set
+ */
private AMQQueue _defaultQueue;
- /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
+ /**
+ * This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request.
+ */
private int _consumerTag;
/**
@@ -86,7 +90,9 @@ public class AMQChannel
*/
private AMQMessage _currentMessage;
- /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
+ /**
+ * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
+ */
private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -118,7 +124,8 @@ public class AMQChannel
public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges)
- throws AMQException
+ throws
+ AMQException
{
_session = session;
_channelId = channelId;
@@ -132,7 +139,9 @@ public class AMQChannel
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
- /** Sets this channel to be part of a local transaction */
+ /**
+ * Sets this channel to be part of a local transaction
+ */
public void setLocalTransactional()
{
_txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
@@ -193,23 +202,25 @@ public class AMQChannel
}
- public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher)
+ throws
+ AMQException
{
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
- _txnContext);
+ _txnContext);
_currentMessage.setPublisher(publisher);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
- throws AMQException
+ throws
+ AMQException
{
if (_currentMessage == null)
{
throw new AMQException("Received content header without previously receiving a BasicPublish frame");
- }
- else
+ } else
{
if (_log.isTraceEnabled())
{
@@ -228,7 +239,8 @@ public class AMQChannel
}
public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession)
- throws AMQException
+ throws
+ AMQException
{
if (_currentMessage == null)
{
@@ -261,7 +273,9 @@ public class AMQChannel
}
}
- protected void routeCurrentMessage() throws AMQException
+ protected void routeCurrentMessage()
+ throws
+ AMQException
{
try
{
@@ -294,14 +308,15 @@ public class AMQChannel
* @param exclusive Flag requesting exclusive access to the queue
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
- *
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
- *
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive)
+ throws
+ AMQException,
+ ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -318,28 +333,11 @@ public class AMQChannel
}
- public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
+ public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag)
+ throws
+ AMQException
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
-
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(message);
-
- return true;
- }
-
- public void visitComplete()
- {
- }
- });
- }
-
- AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
+ final AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
{
q.unregisterProtocolSession(session, _channelId, consumerTag);
@@ -350,25 +348,27 @@ public class AMQChannel
* Called from the protocol session to close this channel and clean up. T
*
* @param session The session to close
- *
* @throws AMQException if there is an error during closure
*/
- public void close(AMQProtocolSession session) throws AMQException
+ public void close(AMQProtocolSession session)
+ throws
+ AMQException
{
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
}
- private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+ private void unsubscribeAllConsumers(AMQProtocolSession session)
+ throws
+ AMQException
{
if (_log.isInfoEnabled())
{
if (!_consumerTag2QueueMap.isEmpty())
{
_log.info("Unsubscribing all consumers on channel " + toString());
- }
- else
+ } else
{
_log.info("No consumers to unsubscribe on channel " + toString());
}
@@ -400,13 +400,12 @@ public class AMQChannel
if (queue == null)
{
_log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
- }
- else
+ } else
{
if (_log.isDebugEnabled())
{
_log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
- ") with a queue(" + queue + ") for " + consumerTag);
+ ") with a queue(" + queue + ") for " + consumerTag);
}
}
}
@@ -431,7 +430,9 @@ public class AMQChannel
*
* @throws org.apache.qpid.AMQException if the requeue fails
*/
- public void requeue() throws AMQException
+ public void requeue()
+ throws
+ AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -452,12 +453,11 @@ public class AMQChannel
// if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- }
- else
+ } else
{
deliveryContext = _txnContext;
}
@@ -489,10 +489,11 @@ public class AMQChannel
* Requeue a single message
*
* @param deliveryTag The message to requeue
- *
* @throws AMQException If something goes wrong.
*/
- public void requeue(long deliveryTag) throws AMQException
+ public void requeue(long deliveryTag)
+ throws
+ AMQException
{
UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
@@ -516,12 +517,11 @@ public class AMQChannel
// if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- }
- else
+ } else
{
deliveryContext = _txnContext;
}
@@ -532,19 +532,17 @@ public class AMQChannel
deliveryContext.deliver(unacked.message, unacked.queue, true);
//Deliver increments the message count but we have already deliverted this once so don't increment it again
// this was because deliver did an increment changed this.
- }
- else
+ } else
{
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
- " but no queue defined and no DeadLetter queue so DROPPING message.");
+ " but no queue defined and no DeadLetter queue so DROPPING message.");
// _log.error("Requested requeue of message:" + deliveryTag +
// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
//
// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
//
}
- }
- else
+ } else
{
_log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
@@ -554,10 +552,12 @@ public class AMQChannel
{
int count = 0;
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(UnacknowledgedMessage message)
+ throws
+ AMQException
{
_log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
- "[" + message.deliveryTag + "]");
+ "[" + message.deliveryTag + "]");
return false; // Continue
}
@@ -577,10 +577,11 @@ public class AMQChannel
* Called to resend all outstanding unacknowledged messages to this same channel.
*
* @param requeue Are the messages to be requeued or dropped.
- *
* @throws AMQException When something goes wrong.
*/
- public void resend(final boolean requeue) throws AMQException
+ public void resend(final boolean requeue)
+ throws
+ AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
@@ -595,7 +596,9 @@ public class AMQChannel
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(UnacknowledgedMessage message)
+ throws
+ AMQException
{
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
@@ -606,13 +609,11 @@ public class AMQChannel
if (_consumerTag2QueueMap.containsKey(consumerTag))
{
msgToResend.add(message);
- }
- else // consumer has gone
+ } else // consumer has gone
{
msgToRequeue.add(message);
}
- }
- else
+ } else
{
// Message has no consumer tag, so was "delivered" to a GET
// or consumer no longer registered
@@ -622,13 +623,11 @@ public class AMQChannel
if (requeue)
{
msgToRequeue.add(message);
- }
- else
+ } else
{
_log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
- }
- else
+ } else
{
_log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
@@ -649,8 +648,7 @@ public class AMQChannel
if (!msgToResend.isEmpty())
{
_log.info("Preparing (" + msgToResend.size() + ") message to resend.");
- }
- else
+ } else
{
_log.info("No message to resend.");
}
@@ -699,8 +697,7 @@ public class AMQChannel
}
//move this message to requeue
msgToRequeue.add(message);
- }
- else
+ } else
{
if (_log.isDebugEnabled())
{
@@ -710,8 +707,7 @@ public class AMQChannel
_unacknowledgedMessageMap.remove(message.deliveryTag);
}
} // sync(sub.getSendLock)
- }
- else
+ } else
{
if (_log.isInfoEnabled())
@@ -740,12 +736,11 @@ public class AMQChannel
if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- }
- else
+ } else
{
deliveryContext = _txnContext;
}
@@ -768,14 +763,17 @@ public class AMQChannel
* since we may get an ack for a delivery tag that was generated from the deleted queue.
*
* @param queue the queue that has been deleted
- *
* @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
- public void queueDeleted(final AMQQueue queue) throws AMQException
+ public void queueDeleted(final AMQQueue queue)
+ throws
+ AMQException
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(UnacknowledgedMessage message)
+ throws
+ AMQException
{
if (message.queue == queue)
{
@@ -787,7 +785,7 @@ public class AMQChannel
catch (AMQException e)
{
_log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
- e, e);
+ e, e);
}
}
return false;
@@ -805,10 +803,11 @@ public class AMQChannel
* @param deliveryTag the last delivery tag
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
- *
* @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ throws
+ AMQException
{
synchronized (_unacknowledgedMessageMap.getLock())
{
@@ -842,7 +841,7 @@ public class AMQChannel
boolean suspend;
suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
- || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+ || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
setSuspended(suspend);
}
@@ -868,8 +867,7 @@ public class AMQChannel
{
q.deliverAsync();
}
- }
- else
+ } else
{
_log.debug("Suspending channel " + this);
}
@@ -881,16 +879,20 @@ public class AMQChannel
return _suspended.get();
}
- public void commit() throws AMQException
+ public void commit()
+ throws
+ AMQException
{
if (!isTransactional())
{
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _txnContext.commit();
+ _txnContext.commit();
}
- public void rollback() throws AMQException
+ public void rollback()
+ throws
+ AMQException
{
_txnContext.rollback();
}
@@ -919,14 +921,16 @@ public class AMQChannel
return _storeContext;
}
- public void processReturns(AMQProtocolSession session) throws AMQException
+ public void processReturns(AMQProtocolSession session)
+ throws
+ AMQException
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
session.getProtocolOutputConverter().writeReturn(message, _channelId,
- bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
}
@@ -937,8 +941,7 @@ public class AMQChannel
if (isSuspended())
{
return true;
- }
- else
+ } else
{
boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
if (!willSuspend)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
index 581bca2efe..a027b90743 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
@@ -19,49 +19,75 @@ package org.apache.qpid.server.messageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.TransactionRecord;
+import org.apache.qpid.server.txn.MemoryEnqueueRecord;
+import org.apache.qpid.server.txn.MemoryDequeueRecord;
import org.apache.qpid.server.exception.*;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
import javax.transaction.xa.Xid;
-import java.util.Collection;
+import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
/**
+ * This a simple in-memory implementation of a message store i.e. nothing is persisted
+ * <p/>
* Created by Arnaud Simon
* Date: 26-Apr-2007
* Time: 08:23:45
*/
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore implements MessageStore
{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+
+ // The table of message with its corresponding stream containing the message body
+ private Map<StorableMessage, ByteArrayOutputStream> _stagedMessages;
+ // The queue/messages association
+ private Map<StorableQueue, List<StorableMessage>> _queueMap;
+ // the message ID
+ private long _messageID = 0;
+ // The transaction manager
+ private TransactionManager _txm;
+
+ //========================================================================
+ // Interface MessageStore
+ //========================================================================
public void removeExchange(Exchange exchange)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
- throws
- InternalErrorException
+ throws
+ InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void createExchange(Exchange exchange)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
@@ -69,14 +95,21 @@ public class MemoryMessageStore implements MessageStore
InternalErrorException,
IllegalArgumentException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _log.info("Configuring memory message store");
+ // Initialise the maps
+ _stagedMessages = new HashMap<StorableMessage, ByteArrayOutputStream>();
+ _queueMap = new HashMap<StorableQueue, List<StorableMessage>>();
+ _txm = tm;
+ _txm.configure(this, "txn", config);
}
public void close()
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _log.info("Closing memory message store");
+ _stagedMessages.clear();
+ _queueMap.clear();
}
public void createQueue(StorableQueue queue)
@@ -84,7 +117,12 @@ public class MemoryMessageStore implements MessageStore
InternalErrorException,
QueueAlreadyExistsException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (_queueMap.containsKey(queue))
+ {
+ throw new QueueAlreadyExistsException("queue " + queue + " already exists");
+ }
+ // add this queue into the map
+ _queueMap.put(queue, new LinkedList<StorableMessage>());
}
public void destroyQueue(StorableQueue queue)
@@ -92,7 +130,12 @@ public class MemoryMessageStore implements MessageStore
InternalErrorException,
QueueDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " does not exist");
+ }
+ // remove this queue from the map
+ _queueMap.remove(queue);
}
public void stage(StorableMessage m)
@@ -100,7 +143,12 @@ public class MemoryMessageStore implements MessageStore
InternalErrorException,
MessageAlreadyStagedException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (_stagedMessages.containsKey(m))
+ {
+ throw new MessageAlreadyStagedException("message " + m + " already staged");
+ }
+ _stagedMessages.put(m, new ByteArrayOutputStream());
+ m.staged();
}
public void appendContent(StorableMessage m, byte[] data, int offset, int size)
@@ -108,7 +156,11 @@ public class MemoryMessageStore implements MessageStore
InternalErrorException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ _stagedMessages.get(m).write(data, offset, size);
}
public byte[] loadContent(StorableMessage m, int offset, int size)
@@ -116,7 +168,15 @@ public class MemoryMessageStore implements MessageStore
InternalErrorException,
MessageDoesntExistException
{
- return new byte[0]; //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ byte[] result = new byte[size];
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ buf.put(_stagedMessages.get(m).toByteArray(), offset, size);
+ buf.get(result);
+ return result;
}
public void destroy(StorableMessage m)
@@ -124,7 +184,11 @@ public class MemoryMessageStore implements MessageStore
InternalErrorException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ _stagedMessages.remove(m);
}
public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
@@ -135,7 +199,31 @@ public class MemoryMessageStore implements MessageStore
UnknownXidException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (xid != null)
+ {
+ // this is a tx operation
+ TransactionRecord enqueueRecord = new MemoryEnqueueRecord(m, queue);
+ _txm.getTransaction(xid).addRecord(enqueueRecord);
+ } else
+ {
+ if (!_stagedMessages.containsKey(m))
+ {
+ try
+ {
+ stage(m);
+ } catch (MessageAlreadyStagedException e)
+ {
+ throw new InternalErrorException(e);
+ }
+ appendContent(m, m.getData(), 0, m.getPayloadSize());
+ }
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+ }
+ _queueMap.get(queue).add(m);
+ m.enqueue(queue);
+ }
}
public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
@@ -145,25 +233,43 @@ public class MemoryMessageStore implements MessageStore
InvalidXidException,
UnknownXidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (xid != null)
+ {
+ // this is a tx operation
+ TransactionRecord dequeueRecord = new MemoryDequeueRecord(m, queue);
+ _txm.getTransaction(xid).addRecord(dequeueRecord);
+ } else
+ {
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+ }
+ m.dequeue(queue);
+ _queueMap.get(queue).remove(m);
+ if (!m.isEnqueued())
+ {
+ // we can delete this message
+ _stagedMessages.remove(m);
+ }
+ }
}
public Collection<StorableQueue> getAllQueues()
throws
InternalErrorException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueMap.keySet();
}
public Collection<StorableMessage> getAllMessages(StorableQueue queue)
throws
InternalErrorException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueMap.get(queue);
}
public long getNewMessageId()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _messageID++;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index eefff090df..32c6eb2c9b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -432,10 +432,10 @@ public class AMQMessage implements StorableMessage
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
- for (AMQQueue q : _transientMessageData.getDestinationQueues())
- {
- _messageHandle.enqueue(storeContext, _messageId, q);
- }
+ // for (AMQQueue q : _transientMessageData.getDestinationQueues())
+ // {
+ // _messageHandle.enqueue(storeContext, _messageId, q);
+ // }
if (_transientMessageData.getContentHeaderBody().bodySize == 0)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
index 5978e3b10d..5539627820 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
@@ -169,7 +169,10 @@ public class StorableMessageHandle implements AMQMessageHandle
{
try
{
- _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue);
+ if (queue.isDurable())
+ {
+ _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue);
+ }
} catch (Exception e)
{
throw new AMQException("PRoblem during message enqueue", e);
@@ -182,7 +185,10 @@ public class StorableMessageHandle implements AMQMessageHandle
{
try
{
- _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue);
+ if (queue.isDurable())
+ {
+ _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue);
+ }
} catch (Exception e)
{
throw new AMQException("PRoblem during message dequeue", e);
@@ -199,8 +205,8 @@ public class StorableMessageHandle implements AMQMessageHandle
if (_payload == null)
{
int bodySize = (int) _contentHeaderBody.bodySize;
- _buffer = ByteBuffer.allocate(bodySize);
_payload = new byte[bodySize];
+ _buffer = ByteBuffer.wrap(_payload);
for (ContentChunk contentBody : _chunks)
{
int chunkSize = contentBody.getSize();
@@ -208,7 +214,6 @@ public class StorableMessageHandle implements AMQMessageHandle
contentBody.getData().get(chunk);
_buffer.put(chunk);
}
- _buffer.get(_payload);
}
return _payload;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
index 0d25ab0e32..d899bb972c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
@@ -39,7 +39,7 @@ public class DequeueRecord implements TransactionRecord
UnknownXidException,
MessageDoesntExistException
{
- // do nothing
+ // nothing
}
public void rollback(MessageStore store)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
index eff623ca7c..05756a8c23 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
@@ -46,7 +46,8 @@ public class DistributedTransactionalContext implements TransactionalContext
//========================================================================
// The logger for this class
private static final Logger _log = Logger.getLogger(DistributedTransactionalContext.class);
-
+ private static final Object _lockXID = new Object();
+ private static int _count = 0;
//========================================================================
// Instance Fields
//========================================================================
@@ -60,7 +61,6 @@ public class DistributedTransactionalContext implements TransactionalContext
final private List<RequiredDeliveryException> _returnMessages;
// for generating xids
private byte[] _txId = ("txid").getBytes();
- private int _count = 0;
public DistributedTransactionalContext(TransactionManager transactionManager, MessageStore messageStore, StoreContext storeContext,
List<RequiredDeliveryException> returnMessages)
@@ -75,15 +75,21 @@ public class DistributedTransactionalContext implements TransactionalContext
throws
AMQException
{
- // begin the transaction and pass the XID through the context
- Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId);
- try
+ if (_storeContext.getPayload() == null)
{
- _transactionManager.begin(xid);
- _storeContext.setPayload(xid);
- } catch (Exception e)
- {
- throw new AMQException("Problem during transaction begin", e);
+ synchronized (_lockXID)
+ {
+ // begin the transaction and pass the XID through the context
+ Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId);
+ try
+ {
+ _transactionManager.begin(xid);
+ _storeContext.setPayload(xid);
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem during transaction begin", e);
+ }
+ }
}
}
@@ -93,11 +99,18 @@ public class DistributedTransactionalContext implements TransactionalContext
{
try
{
- _transactionManager.commit_one_phase((Xid) _storeContext.getPayload());
+ if (_storeContext.getPayload() != null)
+ {
+ _transactionManager.commit_one_phase((Xid) _storeContext.getPayload());
+ }
} catch (Exception e)
{
throw new AMQException("Problem during transaction commit", e);
}
+ finally
+ {
+ _storeContext.setPayload(null);
+ }
}
public void rollback()
@@ -106,11 +119,18 @@ public class DistributedTransactionalContext implements TransactionalContext
{
try
{
- _transactionManager.rollback((Xid) _storeContext.getPayload());
+ if (_storeContext.getPayload() != null)
+ {
+ _transactionManager.rollback((Xid) _storeContext.getPayload());
+ }
} catch (Exception e)
{
throw new AMQException("Problem during transaction rollback", e);
}
+ finally
+ {
+ _storeContext.setPayload(null);
+ }
}
public void messageFullyReceived(boolean persistent)
@@ -142,6 +162,7 @@ public class DistributedTransactionalContext implements TransactionalContext
throws
AMQException
{
+ beginTranIfNecessary();
if (multiple)
{
if (deliveryTag == 0)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java
new file mode 100644
index 0000000000..abbd2ad923
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.exception.*;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 13:59:47
+ */
+public class MemoryDequeueRecord implements TransactionRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryDequeueRecord.class);
+ // the queue
+ StorableQueue _queue;
+ // the message
+ StorableMessage _message;
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public MemoryDequeueRecord( StorableMessage m, StorableQueue queue)
+ {
+ _queue = queue;
+ _message = m;
+ }
+
+ //========================================================================
+ // Interface TransactionRecord
+ //========================================================================
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ store.dequeue(null, _message, _queue);
+ }
+
+ public void rollback(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ // do nothing
+ }
+
+ public void prepare(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ // do nothing
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java
new file mode 100644
index 0000000000..159a5a471c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.exception.*;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 14:00:04
+ */
+public class MemoryEnqueueRecord implements TransactionRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryDequeueRecord.class);
+
+ // the queue
+ StorableQueue _queue;
+ // the message
+ StorableMessage _message;
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public MemoryEnqueueRecord(StorableMessage m, StorableQueue queue)
+ {
+ _queue = queue;
+ _message = m;
+ }
+ //========================================================================
+ // Interface TransactionRecord
+ //========================================================================
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ store.enqueue(null, _message, _queue);
+ }
+
+ public void rollback(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ // do nothing
+ }
+
+ public void prepare(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ // do nothing
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java
new file mode 100644
index 0000000000..4cd52b1a72
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java
@@ -0,0 +1,158 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 14:30:41
+ */
+public class MemoryTransaction implements Transaction
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryTransaction.class);
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // Indicates whether this transaction is prepared
+ private boolean _prepared = false;
+ // Indicates that this transaction has heuristically rolled back
+ private boolean _heurRollBack = false;
+ // The list of Abstract records associated with this tx
+ private List<TransactionRecord> _records = new LinkedList<TransactionRecord>();
+ // The date when this tx has been created.
+ private long _dateCreated;
+ // The timeout in seconds
+ private long _timeout;
+
+ //=========================================================
+ // Constructors
+ //=========================================================
+ /**
+ * Create a transaction that wraps a BDB tx and set the creation date.
+ *
+ */
+ public MemoryTransaction()
+ {
+ _dateCreated = System.currentTimeMillis();
+ }
+
+ //=========================================================
+ // Getter and Setter methods
+ //=========================================================
+ /**
+ * Notify that this tx has been prepared
+ */
+ public void prepare()
+ {
+ _prepared = true;
+ }
+
+ /**
+ * Specify whether this transaction is prepared
+ *
+ * @return true if this transaction is prepared, false otherwise
+ */
+ public boolean isPrepared()
+ {
+ return _prepared;
+ }
+
+ /**
+ * Notify that this tx has been heuristically rolled back
+ */
+ public void heurRollback()
+ {
+ _heurRollBack = true;
+ }
+
+ /**
+ * Specify whether this transaction has been heuristically rolled back
+ *
+ * @return true if this transaction has been heuristically rolled back , false otherwise
+ */
+ public boolean isHeurRollback()
+ {
+ return _heurRollBack;
+ }
+
+ /**
+ * Add an abstract record to this tx.
+ *
+ * @param record The record to be added
+ */
+ public void addRecord(TransactionRecord record)
+ {
+ _records.add(record);
+ }
+
+ /**
+ * Get the list of records associated with this tx.
+ *
+ * @return The list of records associated with this tx.
+ */
+ public List<TransactionRecord> getrecords()
+ {
+ return _records;
+ }
+
+ /**
+ * Set this tx timeout
+ *
+ * @param timeout This tx timeout in seconds
+ */
+ public void setTimeout(long timeout)
+ {
+ _timeout = timeout;
+ }
+
+ /**
+ * Get this tx timeout
+ *
+ * @return This tx timeout in seconds
+ */
+ public long getTimeout()
+ {
+ return _timeout;
+ }
+
+ /**
+ * Specify whether this tx has expired
+ *
+ * @return true if this tx has expired, false otherwise
+ */
+ public boolean hasExpired()
+ {
+ long currentDate = System.currentTimeMillis();
+ boolean result = currentDate - _dateCreated > _timeout * 1000;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("transaction has expired");
+ }
+ return result;
+ }
+ }
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
index a132bcefe6..e740ff9c30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
@@ -18,9 +18,13 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
import javax.transaction.xa.Xid;
import java.util.Set;
+import java.util.HashMap;
/**
* Created by Arnaud Simon
@@ -29,13 +33,65 @@ import java.util.Set;
*/
public class MemoryTransactionManager implements TransactionManager
{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryTransactionManager.class);
+
+ private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout";
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // The underlying BDB message store
+ private MessageStore _messagStore;
+ // A map of XID/BDBtx
+ private HashMap<Xid, Transaction> _xidMap;
+ // A map of in-doubt txs
+ private HashMap<Xid, MemoryTransaction> _indoubtXidMap;
+
+ // A default tx timeout in sec
+ private int _defaultTimeout; // set to 10s if not specified in the config
+
+ //========================================================================
+ // Interface TransactionManager
+ //========================================================================
+ public void configure(MessageStore messageStroe, String base, Configuration config)
+ {
+ _messagStore = messageStroe;
+ if (config != null)
+ {
+ _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 10);
+ } else
+ {
+ _defaultTimeout = 10;
+ }
+ _log.info("Using transaction timeout of " + _defaultTimeout + " s");
+ _xidMap = new HashMap<Xid, Transaction>();
+ _indoubtXidMap = new HashMap<Xid, MemoryTransaction>();
+ }
public XAFlag begin(Xid xid)
throws
InternalErrorException,
InvalidXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ if (xid == null)
+ {
+ throw new InvalidXidException(xid, "null xid");
+ }
+ if (_xidMap.containsKey(xid))
+ {
+ throw new InvalidXidException(xid, "Xid already exist");
+ }
+ MemoryTransaction tx = new MemoryTransaction();
+ tx.setTimeout(_defaultTimeout);
+ _xidMap.put(xid, tx);
+ return XAFlag.ok;
+ }
}
public XAFlag prepare(Xid xid)
@@ -44,7 +100,35 @@ public class MemoryTransactionManager implements TransactionManager
CommandInvalidException,
UnknownXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ // get the transaction
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ XAFlag result = XAFlag.ok;
+ if (tx.hasExpired())
+ {
+ result = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ if (tx.isPrepared())
+ {
+ throw new CommandInvalidException("TransactionImpl is already prepared");
+ }
+ if (tx.getrecords().size() == 0)
+ {
+ // the tx was read only (no work has been done)
+ _xidMap.remove(xid);
+ result = XAFlag.rdonly;
+ } else
+ {
+ // we need to persist the tx records
+ tx.prepare();
+ }
+ }
+ return result;
+ }
}
public XAFlag rollback(Xid xid)
@@ -53,7 +137,28 @@ public class MemoryTransactionManager implements TransactionManager
CommandInvalidException,
UnknownXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ // get the transaction
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ XAFlag flag = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else
+ {
+ for (TransactionRecord record : tx.getrecords())
+ {
+ record.rollback(_messagStore);
+ }
+ _xidMap.remove(xid);
+ }
+ if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ }
+ return flag;
+ }
}
public XAFlag commit(Xid xid)
@@ -63,7 +168,44 @@ public class MemoryTransactionManager implements TransactionManager
UnknownXidException,
NotPreparedException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ // get the transaction
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ XAFlag flag = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ if (!tx.isPrepared())
+ {
+ throw new NotPreparedException("TransactionImpl is not prepared");
+ }
+ for (TransactionRecord record : tx.getrecords())
+ {
+ try
+ {
+ record.commit(_messagStore, xid);
+ } catch (InvalidXidException e)
+ {
+ throw new UnknownXidException(xid, e);
+ } catch (Exception e)
+ {
+ // this should not happen as the queue and the message must exist
+ _log.error("Error when committing distributed transaction heurmix mode returned: " + xid);
+ flag = XAFlag.heurmix;
+ }
+ }
+ _xidMap.remove(xid);
+ }
+ return flag;
+ }
}
public XAFlag commit_one_phase(Xid xid)
@@ -72,7 +214,47 @@ public class MemoryTransactionManager implements TransactionManager
CommandInvalidException,
UnknownXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ XAFlag flag = XAFlag.ok;
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ // we need to prepare the tx
+ tx.prepare();
+ try
+ {
+ for (TransactionRecord record : tx.getrecords())
+ {
+ try
+ {
+ record.commit(_messagStore, xid);
+ } catch (InvalidXidException e)
+ {
+ throw new UnknownXidException(xid, e);
+ } catch (Exception e)
+ {
+ // this should not happen as the queue and the message must exist
+ _log.error("Error when committing transaction heurmix mode returned: " + xid);
+ flag = XAFlag.heurmix;
+ }
+ }
+ }
+ finally
+ {
+ _xidMap.remove(xid);
+ }
+ }
+ return flag;
+ }
}
public void forget(Xid xid)
@@ -81,7 +263,10 @@ public class MemoryTransactionManager implements TransactionManager
CommandInvalidException,
UnknownXidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ _xidMap.remove(xid);
+ }
}
public void setTimeout(Xid xid, long timeout)
@@ -89,7 +274,8 @@ public class MemoryTransactionManager implements TransactionManager
InternalErrorException,
UnknownXidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ Transaction tx = getTransaction(xid);
+ tx.setTimeout(timeout);
}
public long getTimeout(Xid xid)
@@ -97,7 +283,8 @@ public class MemoryTransactionManager implements TransactionManager
InternalErrorException,
UnknownXidException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ Transaction tx = getTransaction(xid);
+ return tx.getTimeout();
}
public Set<Xid> recover(boolean startscan, boolean endscan)
@@ -105,7 +292,7 @@ public class MemoryTransactionManager implements TransactionManager
InternalErrorException,
CommandInvalidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _indoubtXidMap.keySet();
}
public void HeuristicOutcome(Xid xid)
@@ -113,13 +300,32 @@ public class MemoryTransactionManager implements TransactionManager
UnknownXidException,
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ if (!tx.isPrepared())
+ {
+ // heuristically rollback this tx
+ for (TransactionRecord record : tx.getrecords())
+ {
+ record.rollback(_messagStore);
+ }
+ tx.heurRollback();
+ }
+ // add this branch in the list of indoubt tx
+ _indoubtXidMap.put(xid, tx);
+ }
}
public Transaction getTransaction(Xid xid)
throws
UnknownXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ Transaction tx = _xidMap.get(xid);
+ if (tx == null)
+ {
+ throw new UnknownXidException(xid);
+ }
+ return tx;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 857fb350a0..496c94dae9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -93,6 +93,7 @@ public class NonTransactionalContext implements TransactionalContext
{
try
{
+ message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
index cd2f619f7e..2dc6ec2b77 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
@@ -26,10 +26,24 @@ package org.apache.qpid.server.txn;
public interface Transaction
{
- /**
+ /**
* Add an abstract record to this tx.
*
* @param record The record to be added
*/
public void addRecord(TransactionRecord record);
+
+ /**
+ * Set this tx timeout
+ *
+ * @param timeout This tx timeout in seconds
+ */
+ public void setTimeout(long timeout);
+
+ /**
+ * Get this tx timeout
+ *
+ * @return This tx timeout in seconds
+ */
+ public long getTimeout();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
index 8047236985..bcbf2c9de4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
@@ -19,6 +19,8 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.commons.configuration.Configuration;
import javax.transaction.xa.Xid;
import java.util.Set;
@@ -30,6 +32,17 @@ import java.util.Set;
*/
public interface TransactionManager
{
+
+ /**
+ * Configure this TM with the Message store implementation
+ *
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @param messageStroe the message store associated with the TM
+ */
+ public void configure(MessageStore messageStroe, String base, Configuration config);
+
/**
* Begin a transaction branch identified by Xid
*