diff options
| author | Gordon Sim <gsim@apache.org> | 2007-05-09 08:48:18 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-05-09 08:48:18 +0000 |
| commit | 41d3e70cdf4c93aba0589ed7aec61499a86c6119 (patch) | |
| tree | d4b52849b875b16590241747b899bf3cc1bc86cb /java/broker/src | |
| parent | 04186f293032c5720f9500eb2a07433572f13d24 (diff) | |
| download | qpid-python-41d3e70cdf4c93aba0589ed7aec61499a86c6119.tar.gz | |
Patch from Arnaud Simon (asimon@redhat.com) to fix tests broken by earlier changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
14 files changed, 847 insertions, 152 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index ae1cf43f6c..40462cf2bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -228,10 +228,14 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr try { queue.delete(); - _messageStore.destroyQueue(queue); + if( queue.isDurable() ) + { + _messageStore.destroyQueue(queue); + } } catch (Exception ex) { + ex.printStackTrace(); JMException jme = new JMException(ex.getMessage()); jme.initCause(ex); throw new MBeanException(jme, "Error in deleting queue " + queueName); diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 9711bbf4d2..6dd80b3bfa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -73,10 +73,14 @@ public class AMQChannel */ private AtomicLong _deliveryTag = new AtomicLong(0); - /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ + /** + * A channel has a default queue (the last declared) that is used when no queue name is explictily set + */ private AMQQueue _defaultQueue; - /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ + /** + * This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. + */ private int _consumerTag; /** @@ -86,7 +90,9 @@ public class AMQChannel */ private AMQMessage _currentMessage; - /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */ + /** + * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. + */ private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>(); private final MessageStore _messageStore; @@ -118,7 +124,8 @@ public class AMQChannel public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges) - throws AMQException + throws + AMQException { _session = session; _channelId = channelId; @@ -132,7 +139,9 @@ public class AMQChannel _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } - /** Sets this channel to be part of a local transaction */ + /** + * Sets this channel to be part of a local transaction + */ public void setLocalTransactional() { _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages); @@ -193,23 +202,25 @@ public class AMQChannel } - public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException + public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) + throws + AMQException { _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, - _txnContext); + _txnContext); _currentMessage.setPublisher(publisher); } public void publishContentHeader(ContentHeaderBody contentHeaderBody) - throws AMQException + throws + AMQException { if (_currentMessage == null) { throw new AMQException("Received content header without previously receiving a BasicPublish frame"); - } - else + } else { if (_log.isTraceEnabled()) { @@ -228,7 +239,8 @@ public class AMQChannel } public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) - throws AMQException + throws + AMQException { if (_currentMessage == null) { @@ -261,7 +273,9 @@ public class AMQChannel } } - protected void routeCurrentMessage() throws AMQException + protected void routeCurrentMessage() + throws + AMQException { try { @@ -294,14 +308,15 @@ public class AMQChannel * @param exclusive Flag requesting exclusive access to the queue * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber - * * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests - * * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) + throws + AMQException, + ConsumerTagNotUniqueException { if (tag == null) { @@ -318,28 +333,11 @@ public class AMQChannel } - public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException + public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag) + throws + AMQException { - if (_log.isDebugEnabled()) - { - _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size()); - _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - - public boolean callback(UnacknowledgedMessage message) throws AMQException - { - _log.debug(message); - - return true; - } - - public void visitComplete() - { - } - }); - } - - AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); + final AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); if (q != null) { q.unregisterProtocolSession(session, _channelId, consumerTag); @@ -350,25 +348,27 @@ public class AMQChannel * Called from the protocol session to close this channel and clean up. T * * @param session The session to close - * * @throws AMQException if there is an error during closure */ - public void close(AMQProtocolSession session) throws AMQException + public void close(AMQProtocolSession session) + throws + AMQException { _txnContext.rollback(); unsubscribeAllConsumers(session); requeue(); } - private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException + private void unsubscribeAllConsumers(AMQProtocolSession session) + throws + AMQException { if (_log.isInfoEnabled()) { if (!_consumerTag2QueueMap.isEmpty()) { _log.info("Unsubscribing all consumers on channel " + toString()); - } - else + } else { _log.info("No consumers to unsubscribe on channel " + toString()); } @@ -400,13 +400,12 @@ public class AMQChannel if (queue == null) { _log.debug("Adding unacked message with a null queue:" + message.debugIdentity()); - } - else + } else { if (_log.isDebugEnabled()) { _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag + - ") with a queue(" + queue + ") for " + consumerTag); + ") with a queue(" + queue + ") for " + consumerTag); } } } @@ -431,7 +430,9 @@ public class AMQChannel * * @throws org.apache.qpid.AMQException if the requeue fails */ - public void requeue() throws AMQException + public void requeue() + throws + AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); @@ -452,12 +453,11 @@ public class AMQChannel // if (_nonTransactedContext == null) { _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; - } - else + } else { deliveryContext = _txnContext; } @@ -489,10 +489,11 @@ public class AMQChannel * Requeue a single message * * @param deliveryTag The message to requeue - * * @throws AMQException If something goes wrong. */ - public void requeue(long deliveryTag) throws AMQException + public void requeue(long deliveryTag) + throws + AMQException { UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag); @@ -516,12 +517,11 @@ public class AMQChannel // if (_nonTransactedContext == null) { _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; - } - else + } else { deliveryContext = _txnContext; } @@ -532,19 +532,17 @@ public class AMQChannel deliveryContext.deliver(unacked.message, unacked.queue, true); //Deliver increments the message count but we have already deliverted this once so don't increment it again // this was because deliver did an increment changed this. - } - else + } else { _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag + - " but no queue defined and no DeadLetter queue so DROPPING message."); + " but no queue defined and no DeadLetter queue so DROPPING message."); // _log.error("Requested requeue of message:" + deliveryTag + // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); // // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); // } - } - else + } else { _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size()); @@ -554,10 +552,12 @@ public class AMQChannel { int count = 0; - public boolean callback(UnacknowledgedMessage message) throws AMQException + public boolean callback(UnacknowledgedMessage message) + throws + AMQException { _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" + - "[" + message.deliveryTag + "]"); + "[" + message.deliveryTag + "]"); return false; // Continue } @@ -577,10 +577,11 @@ public class AMQChannel * Called to resend all outstanding unacknowledged messages to this same channel. * * @param requeue Are the messages to be requeued or dropped. - * * @throws AMQException When something goes wrong. */ - public void resend(final boolean requeue) throws AMQException + public void resend(final boolean requeue) + throws + AMQException { final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>(); @@ -595,7 +596,9 @@ public class AMQChannel // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - public boolean callback(UnacknowledgedMessage message) throws AMQException + public boolean callback(UnacknowledgedMessage message) + throws + AMQException { AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; @@ -606,13 +609,11 @@ public class AMQChannel if (_consumerTag2QueueMap.containsKey(consumerTag)) { msgToResend.add(message); - } - else // consumer has gone + } else // consumer has gone { msgToRequeue.add(message); } - } - else + } else { // Message has no consumer tag, so was "delivered" to a GET // or consumer no longer registered @@ -622,13 +623,11 @@ public class AMQChannel if (requeue) { msgToRequeue.add(message); - } - else + } else { _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); } - } - else + } else { _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); } @@ -649,8 +648,7 @@ public class AMQChannel if (!msgToResend.isEmpty()) { _log.info("Preparing (" + msgToResend.size() + ") message to resend."); - } - else + } else { _log.info("No message to resend."); } @@ -699,8 +697,7 @@ public class AMQChannel } //move this message to requeue msgToRequeue.add(message); - } - else + } else { if (_log.isDebugEnabled()) { @@ -710,8 +707,7 @@ public class AMQChannel _unacknowledgedMessageMap.remove(message.deliveryTag); } } // sync(sub.getSendLock) - } - else + } else { if (_log.isInfoEnabled()) @@ -740,12 +736,11 @@ public class AMQChannel if (_nonTransactedContext == null) { _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; - } - else + } else { deliveryContext = _txnContext; } @@ -768,14 +763,17 @@ public class AMQChannel * since we may get an ack for a delivery tag that was generated from the deleted queue. * * @param queue the queue that has been deleted - * * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages */ - public void queueDeleted(final AMQQueue queue) throws AMQException + public void queueDeleted(final AMQQueue queue) + throws + AMQException { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - public boolean callback(UnacknowledgedMessage message) throws AMQException + public boolean callback(UnacknowledgedMessage message) + throws + AMQException { if (message.queue == queue) { @@ -787,7 +785,7 @@ public class AMQChannel catch (AMQException e) { _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " + - e, e); + e, e); } } return false; @@ -805,10 +803,11 @@ public class AMQChannel * @param deliveryTag the last delivery tag * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only * acknowledges the single message specified by the delivery tag - * * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException + public void acknowledgeMessage(long deliveryTag, boolean multiple) + throws + AMQException { synchronized (_unacknowledgedMessageMap.getLock()) { @@ -842,7 +841,7 @@ public class AMQChannel boolean suspend; suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark) - || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()); + || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()); setSuspended(suspend); } @@ -868,8 +867,7 @@ public class AMQChannel { q.deliverAsync(); } - } - else + } else { _log.debug("Suspending channel " + this); } @@ -881,16 +879,20 @@ public class AMQChannel return _suspended.get(); } - public void commit() throws AMQException + public void commit() + throws + AMQException { if (!isTransactional()) { throw new AMQException("Fatal error: commit called on non-transactional channel"); } - _txnContext.commit(); + _txnContext.commit(); } - public void rollback() throws AMQException + public void rollback() + throws + AMQException { _txnContext.rollback(); } @@ -919,14 +921,16 @@ public class AMQChannel return _storeContext; } - public void processReturns(AMQProtocolSession session) throws AMQException + public void processReturns(AMQProtocolSession session) + throws + AMQException { for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); session.getProtocolOutputConverter().writeReturn(message, _channelId, - bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); } @@ -937,8 +941,7 @@ public class AMQChannel if (isSuspended()) { return true; - } - else + } else { boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark); if (!willSuspend) diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java index 581bca2efe..a027b90743 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java @@ -19,49 +19,75 @@ package org.apache.qpid.server.messageStore; import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.TransactionRecord;
+import org.apache.qpid.server.txn.MemoryEnqueueRecord;
+import org.apache.qpid.server.txn.MemoryDequeueRecord;
import org.apache.qpid.server.exception.*;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
import javax.transaction.xa.Xid;
-import java.util.Collection;
+import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
/**
+ * This a simple in-memory implementation of a message store i.e. nothing is persisted
+ * <p/>
* Created by Arnaud Simon
* Date: 26-Apr-2007
* Time: 08:23:45
*/
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore implements MessageStore
{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+
+ // The table of message with its corresponding stream containing the message body
+ private Map<StorableMessage, ByteArrayOutputStream> _stagedMessages;
+ // The queue/messages association
+ private Map<StorableQueue, List<StorableMessage>> _queueMap;
+ // the message ID
+ private long _messageID = 0;
+ // The transaction manager
+ private TransactionManager _txm;
+
+ //========================================================================
+ // Interface MessageStore
+ //========================================================================
public void removeExchange(Exchange exchange)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
- throws
- InternalErrorException
+ throws
+ InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void createExchange(Exchange exchange)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
@@ -69,14 +95,21 @@ public class MemoryMessageStore implements MessageStore InternalErrorException,
IllegalArgumentException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _log.info("Configuring memory message store");
+ // Initialise the maps
+ _stagedMessages = new HashMap<StorableMessage, ByteArrayOutputStream>();
+ _queueMap = new HashMap<StorableQueue, List<StorableMessage>>();
+ _txm = tm;
+ _txm.configure(this, "txn", config);
}
public void close()
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _log.info("Closing memory message store");
+ _stagedMessages.clear();
+ _queueMap.clear();
}
public void createQueue(StorableQueue queue)
@@ -84,7 +117,12 @@ public class MemoryMessageStore implements MessageStore InternalErrorException,
QueueAlreadyExistsException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (_queueMap.containsKey(queue))
+ {
+ throw new QueueAlreadyExistsException("queue " + queue + " already exists");
+ }
+ // add this queue into the map
+ _queueMap.put(queue, new LinkedList<StorableMessage>());
}
public void destroyQueue(StorableQueue queue)
@@ -92,7 +130,12 @@ public class MemoryMessageStore implements MessageStore InternalErrorException,
QueueDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " does not exist");
+ }
+ // remove this queue from the map
+ _queueMap.remove(queue);
}
public void stage(StorableMessage m)
@@ -100,7 +143,12 @@ public class MemoryMessageStore implements MessageStore InternalErrorException,
MessageAlreadyStagedException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (_stagedMessages.containsKey(m))
+ {
+ throw new MessageAlreadyStagedException("message " + m + " already staged");
+ }
+ _stagedMessages.put(m, new ByteArrayOutputStream());
+ m.staged();
}
public void appendContent(StorableMessage m, byte[] data, int offset, int size)
@@ -108,7 +156,11 @@ public class MemoryMessageStore implements MessageStore InternalErrorException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ _stagedMessages.get(m).write(data, offset, size);
}
public byte[] loadContent(StorableMessage m, int offset, int size)
@@ -116,7 +168,15 @@ public class MemoryMessageStore implements MessageStore InternalErrorException,
MessageDoesntExistException
{
- return new byte[0]; //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ byte[] result = new byte[size];
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ buf.put(_stagedMessages.get(m).toByteArray(), offset, size);
+ buf.get(result);
+ return result;
}
public void destroy(StorableMessage m)
@@ -124,7 +184,11 @@ public class MemoryMessageStore implements MessageStore InternalErrorException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ _stagedMessages.remove(m);
}
public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
@@ -135,7 +199,31 @@ public class MemoryMessageStore implements MessageStore UnknownXidException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (xid != null)
+ {
+ // this is a tx operation
+ TransactionRecord enqueueRecord = new MemoryEnqueueRecord(m, queue);
+ _txm.getTransaction(xid).addRecord(enqueueRecord);
+ } else
+ {
+ if (!_stagedMessages.containsKey(m))
+ {
+ try
+ {
+ stage(m);
+ } catch (MessageAlreadyStagedException e)
+ {
+ throw new InternalErrorException(e);
+ }
+ appendContent(m, m.getData(), 0, m.getPayloadSize());
+ }
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+ }
+ _queueMap.get(queue).add(m);
+ m.enqueue(queue);
+ }
}
public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
@@ -145,25 +233,43 @@ public class MemoryMessageStore implements MessageStore InvalidXidException,
UnknownXidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (xid != null)
+ {
+ // this is a tx operation
+ TransactionRecord dequeueRecord = new MemoryDequeueRecord(m, queue);
+ _txm.getTransaction(xid).addRecord(dequeueRecord);
+ } else
+ {
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+ }
+ m.dequeue(queue);
+ _queueMap.get(queue).remove(m);
+ if (!m.isEnqueued())
+ {
+ // we can delete this message
+ _stagedMessages.remove(m);
+ }
+ }
}
public Collection<StorableQueue> getAllQueues()
throws
InternalErrorException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueMap.keySet();
}
public Collection<StorableMessage> getAllMessages(StorableQueue queue)
throws
InternalErrorException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueMap.get(queue);
}
public long getNewMessageId()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _messageID++;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index eefff090df..32c6eb2c9b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -432,10 +432,10 @@ public class AMQMessage implements StorableMessage // enqueuing the messages ensure that if required the destinations are recorded to a // persistent store - for (AMQQueue q : _transientMessageData.getDestinationQueues()) - { - _messageHandle.enqueue(storeContext, _messageId, q); - } + // for (AMQQueue q : _transientMessageData.getDestinationQueues()) + // { + // _messageHandle.enqueue(storeContext, _messageId, q); + // } if (_transientMessageData.getContentHeaderBody().bodySize == 0) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java index 5978e3b10d..5539627820 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java @@ -169,7 +169,10 @@ public class StorableMessageHandle implements AMQMessageHandle { try { - _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue); + if (queue.isDurable()) + { + _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue); + } } catch (Exception e) { throw new AMQException("PRoblem during message enqueue", e); @@ -182,7 +185,10 @@ public class StorableMessageHandle implements AMQMessageHandle { try { - _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue); + if (queue.isDurable()) + { + _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue); + } } catch (Exception e) { throw new AMQException("PRoblem during message dequeue", e); @@ -199,8 +205,8 @@ public class StorableMessageHandle implements AMQMessageHandle if (_payload == null) { int bodySize = (int) _contentHeaderBody.bodySize; - _buffer = ByteBuffer.allocate(bodySize); _payload = new byte[bodySize]; + _buffer = ByteBuffer.wrap(_payload); for (ContentChunk contentBody : _chunks) { int chunkSize = contentBody.getSize(); @@ -208,7 +214,6 @@ public class StorableMessageHandle implements AMQMessageHandle contentBody.getData().get(chunk); _buffer.put(chunk); } - _buffer.get(_payload); } return _payload; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java index 0d25ab0e32..d899bb972c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java @@ -39,7 +39,7 @@ public class DequeueRecord implements TransactionRecord UnknownXidException, MessageDoesntExistException { - // do nothing + // nothing } public void rollback(MessageStore store) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java index eff623ca7c..05756a8c23 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java @@ -46,7 +46,8 @@ public class DistributedTransactionalContext implements TransactionalContext //======================================================================== // The logger for this class private static final Logger _log = Logger.getLogger(DistributedTransactionalContext.class); - + private static final Object _lockXID = new Object(); + private static int _count = 0; //======================================================================== // Instance Fields //======================================================================== @@ -60,7 +61,6 @@ public class DistributedTransactionalContext implements TransactionalContext final private List<RequiredDeliveryException> _returnMessages; // for generating xids private byte[] _txId = ("txid").getBytes(); - private int _count = 0; public DistributedTransactionalContext(TransactionManager transactionManager, MessageStore messageStore, StoreContext storeContext, List<RequiredDeliveryException> returnMessages) @@ -75,15 +75,21 @@ public class DistributedTransactionalContext implements TransactionalContext throws AMQException { - // begin the transaction and pass the XID through the context - Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId); - try + if (_storeContext.getPayload() == null) { - _transactionManager.begin(xid); - _storeContext.setPayload(xid); - } catch (Exception e) - { - throw new AMQException("Problem during transaction begin", e); + synchronized (_lockXID) + { + // begin the transaction and pass the XID through the context + Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId); + try + { + _transactionManager.begin(xid); + _storeContext.setPayload(xid); + } catch (Exception e) + { + throw new AMQException("Problem during transaction begin", e); + } + } } } @@ -93,11 +99,18 @@ public class DistributedTransactionalContext implements TransactionalContext { try { - _transactionManager.commit_one_phase((Xid) _storeContext.getPayload()); + if (_storeContext.getPayload() != null) + { + _transactionManager.commit_one_phase((Xid) _storeContext.getPayload()); + } } catch (Exception e) { throw new AMQException("Problem during transaction commit", e); } + finally + { + _storeContext.setPayload(null); + } } public void rollback() @@ -106,11 +119,18 @@ public class DistributedTransactionalContext implements TransactionalContext { try { - _transactionManager.rollback((Xid) _storeContext.getPayload()); + if (_storeContext.getPayload() != null) + { + _transactionManager.rollback((Xid) _storeContext.getPayload()); + } } catch (Exception e) { throw new AMQException("Problem during transaction rollback", e); } + finally + { + _storeContext.setPayload(null); + } } public void messageFullyReceived(boolean persistent) @@ -142,6 +162,7 @@ public class DistributedTransactionalContext implements TransactionalContext throws AMQException { + beginTranIfNecessary(); if (multiple) { if (deliveryTag == 0) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java new file mode 100644 index 0000000000..abbd2ad923 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java @@ -0,0 +1,82 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.StorableQueue; +import org.apache.qpid.server.exception.*; +import org.apache.log4j.Logger; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 03-May-2007 + * Time: 13:59:47 + */ +public class MemoryDequeueRecord implements TransactionRecord +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(MemoryDequeueRecord.class); + // the queue + StorableQueue _queue; + // the message + StorableMessage _message; + + //======================================================================== + // Constructor + //======================================================================== + public MemoryDequeueRecord( StorableMessage m, StorableQueue queue) + { + _queue = queue; + _message = m; + } + + //======================================================================== + // Interface TransactionRecord + //======================================================================== + + public void commit(MessageStore store, Xid xid) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException + { + store.dequeue(null, _message, _queue); + } + + public void rollback(MessageStore store) + throws + InternalErrorException + { + // do nothing + } + + public void prepare(MessageStore store) + throws + InternalErrorException + { + // do nothing + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java new file mode 100644 index 0000000000..159a5a471c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java @@ -0,0 +1,82 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.StorableQueue; +import org.apache.qpid.server.exception.*; +import org.apache.log4j.Logger; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 03-May-2007 + * Time: 14:00:04 + */ +public class MemoryEnqueueRecord implements TransactionRecord +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(MemoryDequeueRecord.class); + + // the queue + StorableQueue _queue; + // the message + StorableMessage _message; + + //======================================================================== + // Constructor + //======================================================================== + public MemoryEnqueueRecord(StorableMessage m, StorableQueue queue) + { + _queue = queue; + _message = m; + } + //======================================================================== + // Interface TransactionRecord + //======================================================================== + + public void commit(MessageStore store, Xid xid) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException + { + store.enqueue(null, _message, _queue); + } + + public void rollback(MessageStore store) + throws + InternalErrorException + { + // do nothing + } + + public void prepare(MessageStore store) + throws + InternalErrorException + { + // do nothing + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java new file mode 100644 index 0000000000..4cd52b1a72 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java @@ -0,0 +1,158 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.LinkedList; + +/** + * Created by Arnaud Simon + * Date: 03-May-2007 + * Time: 14:30:41 + */ +public class MemoryTransaction implements Transaction +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(MemoryTransaction.class); + + //======================================================================== + // Instance Fields + //======================================================================== + // Indicates whether this transaction is prepared + private boolean _prepared = false; + // Indicates that this transaction has heuristically rolled back + private boolean _heurRollBack = false; + // The list of Abstract records associated with this tx + private List<TransactionRecord> _records = new LinkedList<TransactionRecord>(); + // The date when this tx has been created. + private long _dateCreated; + // The timeout in seconds + private long _timeout; + + //========================================================= + // Constructors + //========================================================= + /** + * Create a transaction that wraps a BDB tx and set the creation date. + * + */ + public MemoryTransaction() + { + _dateCreated = System.currentTimeMillis(); + } + + //========================================================= + // Getter and Setter methods + //========================================================= + /** + * Notify that this tx has been prepared + */ + public void prepare() + { + _prepared = true; + } + + /** + * Specify whether this transaction is prepared + * + * @return true if this transaction is prepared, false otherwise + */ + public boolean isPrepared() + { + return _prepared; + } + + /** + * Notify that this tx has been heuristically rolled back + */ + public void heurRollback() + { + _heurRollBack = true; + } + + /** + * Specify whether this transaction has been heuristically rolled back + * + * @return true if this transaction has been heuristically rolled back , false otherwise + */ + public boolean isHeurRollback() + { + return _heurRollBack; + } + + /** + * Add an abstract record to this tx. + * + * @param record The record to be added + */ + public void addRecord(TransactionRecord record) + { + _records.add(record); + } + + /** + * Get the list of records associated with this tx. + * + * @return The list of records associated with this tx. + */ + public List<TransactionRecord> getrecords() + { + return _records; + } + + /** + * Set this tx timeout + * + * @param timeout This tx timeout in seconds + */ + public void setTimeout(long timeout) + { + _timeout = timeout; + } + + /** + * Get this tx timeout + * + * @return This tx timeout in seconds + */ + public long getTimeout() + { + return _timeout; + } + + /** + * Specify whether this tx has expired + * + * @return true if this tx has expired, false otherwise + */ + public boolean hasExpired() + { + long currentDate = System.currentTimeMillis(); + boolean result = currentDate - _dateCreated > _timeout * 1000; + if (_log.isDebugEnabled()) + { + _log.debug("transaction has expired"); + } + return result; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java index a132bcefe6..e740ff9c30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java @@ -18,9 +18,13 @@ package org.apache.qpid.server.txn; import org.apache.qpid.server.exception.*; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; import javax.transaction.xa.Xid; import java.util.Set; +import java.util.HashMap; /** * Created by Arnaud Simon @@ -29,13 +33,65 @@ import java.util.Set; */ public class MemoryTransactionManager implements TransactionManager { + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(MemoryTransactionManager.class); + + private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout"; + + //======================================================================== + // Instance Fields + //======================================================================== + // The underlying BDB message store + private MessageStore _messagStore; + // A map of XID/BDBtx + private HashMap<Xid, Transaction> _xidMap; + // A map of in-doubt txs + private HashMap<Xid, MemoryTransaction> _indoubtXidMap; + + // A default tx timeout in sec + private int _defaultTimeout; // set to 10s if not specified in the config + + //======================================================================== + // Interface TransactionManager + //======================================================================== + public void configure(MessageStore messageStroe, String base, Configuration config) + { + _messagStore = messageStroe; + if (config != null) + { + _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 10); + } else + { + _defaultTimeout = 10; + } + _log.info("Using transaction timeout of " + _defaultTimeout + " s"); + _xidMap = new HashMap<Xid, Transaction>(); + _indoubtXidMap = new HashMap<Xid, MemoryTransaction>(); + } public XAFlag begin(Xid xid) throws InternalErrorException, InvalidXidException { - return null; //To change body of implemented methods use File | Settings | File Templates. + synchronized (xid) + { + if (xid == null) + { + throw new InvalidXidException(xid, "null xid"); + } + if (_xidMap.containsKey(xid)) + { + throw new InvalidXidException(xid, "Xid already exist"); + } + MemoryTransaction tx = new MemoryTransaction(); + tx.setTimeout(_defaultTimeout); + _xidMap.put(xid, tx); + return XAFlag.ok; + } } public XAFlag prepare(Xid xid) @@ -44,7 +100,35 @@ public class MemoryTransactionManager implements TransactionManager CommandInvalidException, UnknownXidException { - return null; //To change body of implemented methods use File | Settings | File Templates. + synchronized (xid) + { + // get the transaction + MemoryTransaction tx = (MemoryTransaction) getTransaction(xid); + XAFlag result = XAFlag.ok; + if (tx.hasExpired()) + { + result = XAFlag.rbtimeout; + // rollback this tx branch + rollback(xid); + } else + { + if (tx.isPrepared()) + { + throw new CommandInvalidException("TransactionImpl is already prepared"); + } + if (tx.getrecords().size() == 0) + { + // the tx was read only (no work has been done) + _xidMap.remove(xid); + result = XAFlag.rdonly; + } else + { + // we need to persist the tx records + tx.prepare(); + } + } + return result; + } } public XAFlag rollback(Xid xid) @@ -53,7 +137,28 @@ public class MemoryTransactionManager implements TransactionManager CommandInvalidException, UnknownXidException { - return null; //To change body of implemented methods use File | Settings | File Templates. + synchronized (xid) + { + // get the transaction + MemoryTransaction tx = (MemoryTransaction) getTransaction(xid); + XAFlag flag = XAFlag.ok; + if (tx.isHeurRollback()) + { + flag = XAFlag.heurrb; + } else + { + for (TransactionRecord record : tx.getrecords()) + { + record.rollback(_messagStore); + } + _xidMap.remove(xid); + } + if (tx.hasExpired()) + { + flag = XAFlag.rbtimeout; + } + return flag; + } } public XAFlag commit(Xid xid) @@ -63,7 +168,44 @@ public class MemoryTransactionManager implements TransactionManager UnknownXidException, NotPreparedException { - return null; //To change body of implemented methods use File | Settings | File Templates. + synchronized (xid) + { + // get the transaction + MemoryTransaction tx = (MemoryTransaction) getTransaction(xid); + XAFlag flag = XAFlag.ok; + if (tx.isHeurRollback()) + { + flag = XAFlag.heurrb; + } else if (tx.hasExpired()) + { + flag = XAFlag.rbtimeout; + // rollback this tx branch + rollback(xid); + } else + { + if (!tx.isPrepared()) + { + throw new NotPreparedException("TransactionImpl is not prepared"); + } + for (TransactionRecord record : tx.getrecords()) + { + try + { + record.commit(_messagStore, xid); + } catch (InvalidXidException e) + { + throw new UnknownXidException(xid, e); + } catch (Exception e) + { + // this should not happen as the queue and the message must exist + _log.error("Error when committing distributed transaction heurmix mode returned: " + xid); + flag = XAFlag.heurmix; + } + } + _xidMap.remove(xid); + } + return flag; + } } public XAFlag commit_one_phase(Xid xid) @@ -72,7 +214,47 @@ public class MemoryTransactionManager implements TransactionManager CommandInvalidException, UnknownXidException { - return null; //To change body of implemented methods use File | Settings | File Templates. + synchronized (xid) + { + XAFlag flag = XAFlag.ok; + MemoryTransaction tx = (MemoryTransaction) getTransaction(xid); + if (tx.isHeurRollback()) + { + flag = XAFlag.heurrb; + } else if (tx.hasExpired()) + { + flag = XAFlag.rbtimeout; + // rollback this tx branch + rollback(xid); + } else + { + // we need to prepare the tx + tx.prepare(); + try + { + for (TransactionRecord record : tx.getrecords()) + { + try + { + record.commit(_messagStore, xid); + } catch (InvalidXidException e) + { + throw new UnknownXidException(xid, e); + } catch (Exception e) + { + // this should not happen as the queue and the message must exist + _log.error("Error when committing transaction heurmix mode returned: " + xid); + flag = XAFlag.heurmix; + } + } + } + finally + { + _xidMap.remove(xid); + } + } + return flag; + } } public void forget(Xid xid) @@ -81,7 +263,10 @@ public class MemoryTransactionManager implements TransactionManager CommandInvalidException, UnknownXidException { - //To change body of implemented methods use File | Settings | File Templates. + synchronized (xid) + { + _xidMap.remove(xid); + } } public void setTimeout(Xid xid, long timeout) @@ -89,7 +274,8 @@ public class MemoryTransactionManager implements TransactionManager InternalErrorException, UnknownXidException { - //To change body of implemented methods use File | Settings | File Templates. + Transaction tx = getTransaction(xid); + tx.setTimeout(timeout); } public long getTimeout(Xid xid) @@ -97,7 +283,8 @@ public class MemoryTransactionManager implements TransactionManager InternalErrorException, UnknownXidException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + Transaction tx = getTransaction(xid); + return tx.getTimeout(); } public Set<Xid> recover(boolean startscan, boolean endscan) @@ -105,7 +292,7 @@ public class MemoryTransactionManager implements TransactionManager InternalErrorException, CommandInvalidException { - return null; //To change body of implemented methods use File | Settings | File Templates. + return _indoubtXidMap.keySet(); } public void HeuristicOutcome(Xid xid) @@ -113,13 +300,32 @@ public class MemoryTransactionManager implements TransactionManager UnknownXidException, InternalErrorException { - //To change body of implemented methods use File | Settings | File Templates. + synchronized (xid) + { + MemoryTransaction tx = (MemoryTransaction) getTransaction(xid); + if (!tx.isPrepared()) + { + // heuristically rollback this tx + for (TransactionRecord record : tx.getrecords()) + { + record.rollback(_messagStore); + } + tx.heurRollback(); + } + // add this branch in the list of indoubt tx + _indoubtXidMap.put(xid, tx); + } } public Transaction getTransaction(Xid xid) throws UnknownXidException { - return null; //To change body of implemented methods use File | Settings | File Templates. + Transaction tx = _xidMap.get(xid); + if (tx == null) + { + throw new UnknownXidException(xid); + } + return tx; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 857fb350a0..496c94dae9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -93,6 +93,7 @@ public class NonTransactionalContext implements TransactionalContext { try { + message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue); queue.process(_storeContext, message, deliverFirst); //following check implements the functionality //required by the 'immediate' flag: diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java index cd2f619f7e..2dc6ec2b77 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java @@ -26,10 +26,24 @@ package org.apache.qpid.server.txn; public interface Transaction { - /** + /** * Add an abstract record to this tx. * * @param record The record to be added */ public void addRecord(TransactionRecord record); + + /** + * Set this tx timeout + * + * @param timeout This tx timeout in seconds + */ + public void setTimeout(long timeout); + + /** + * Get this tx timeout + * + * @return This tx timeout in seconds + */ + public long getTimeout(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java index 8047236985..bcbf2c9de4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java @@ -19,6 +19,8 @@ package org.apache.qpid.server.txn; import org.apache.qpid.server.exception.*; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.commons.configuration.Configuration; import javax.transaction.xa.Xid; import java.util.Set; @@ -30,6 +32,17 @@ import java.util.Set; */ public interface TransactionManager { + + /** + * Configure this TM with the Message store implementation + * + * @param base The base element identifier from which all configuration items are relative. For example, if the base + * element is "store", the all elements used by concrete classes will be "store.foo" etc. + * @param config The apache commons configuration object + * @param messageStroe the message store associated with the TM + */ + public void configure(MessageStore messageStroe, String base, Configuration config); + /** * Begin a transaction branch identified by Xid * |
