summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-01 16:25:58 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-01 16:25:58 +0000
commit08308d4404c46de8a2939ca420cc57f8fc6227eb (patch)
treedf351b77cd8b349f668f5f72024fc0e7bf6cad7c /java/broker/src/main
parent56b5194706dea6c2c53dc44c37499efecd042d76 (diff)
downloadqpid-python-08308d4404c46de8a2939ca420cc57f8fc6227eb.tar.gz
QPID-1764 : Add a BaseTransactionLog that takes care of handling persistent message references so that the underlying TransactionLog need not worry about that.
Updated MemoryMS to use this even to ensure that the code is exercised. To ensure that the new BaseTransactionLog was correctly used when used by a TransactionLog. The configure() method now returns an Object(TransactionLog) that is the newly configured TL. Existing tests and code where the original TL reference was used have been changed to use the output of the configure() call. NOTE: the return type should be changed to TransactionLog but until we have completely split the TransactionLog and RoutingTable implementations then this is not possible. The implementation also includes a number of items from the Flow To Disk review: - The old get* Methods have been removed from the TransactionLog interface. - Rollback should now rollback enqueues. (No test provided) - StoreContext now has enqueue/dequeue methods that track the messageId/Queue pairing - The linked list per message has been reduced to a link list per message that is enqueued on multiple queues. Messages that exist on only one queue have no additional overhead. - Optimisation also included to: Include message delete in 'dequeue transaction' where the message was only ever enqueued on a single queue. All other message deletes are peformed as part of an asynchrounous commit. The asynchrounous commit is setup via the StoreContext, which has had some work done to move it towards becomming a Qpid Transaction Object where all operations are performed against rather than going via the TransactionLog. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@760951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java121
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java234
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java81
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java15
9 files changed, 456 insertions, 92 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 5eafd281c0..bab19fbc54 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -136,11 +136,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
if(_destinationQueues != null)
{
- for (int i = 0; i < _destinationQueues.size(); i++)
- {
- transactionLog.enqueueMessage(_txnContext.getStoreContext(),
- _destinationQueues.get(i), getMessageId());
- }
+ transactionLog.enqueueMessage(_txnContext.getStoreContext(), _destinationQueues, getMessageId());
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ed9b1eb8d7..e5898ceda9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -849,7 +849,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
+ //FIXME
+ //fixme
+ ArrayList list = new ArrayList();
+ list.add(toQueue);
+ transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
}
// dequeue will remove the messages from the queue
entry.dequeue(storeContext);
@@ -941,10 +945,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
+ //fixme
+ //FIXME
+ ArrayList list = new ArrayList();
+ list.add(toQueue);
+ transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
}
}
+
// Commit and flush the move transcations.
try
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java b/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
index 0c62638710..883a41b55f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
@@ -42,7 +42,7 @@ public interface RoutingTable
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
+ Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 33b3d8608e..157418d806 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -34,6 +34,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -41,7 +42,6 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -143,7 +143,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
private State _state = State.INITIAL;
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
//Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config
if (base.equals("store"))
@@ -178,7 +178,9 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
recover();
stateTransition(State.RECOVERING, State.STARTED);
+ return new BaseTransactionLog(this);
}
+ return null;
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -825,7 +827,18 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+ {
+ for (AMQQueue q : queues)
+ {
+ if (q.isDurable())
+ {
+ enqueueMessage(context,q,messageId);
+ }
+ }
+ }
+
+ void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
AMQShortString name = queue.getName();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 3754b41a3e..d57b81c362 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -30,17 +29,14 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -67,16 +63,16 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
- protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
- public void configure()
+ public TransactionLog configure()
{
_log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
+ return new BaseTransactionLog(this);
}
- public void configure(String base, VirtualHostConfiguration config)
+ public TransactionLog configure(String base, VirtualHostConfiguration config)
{
//Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
if (base.equals("store"))
@@ -85,12 +81,14 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
_log.info("Using capacity " + hashtableCapacity + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
+ return new BaseTransactionLog(this);
}
+ return null;
}
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
- configure(base, config);
+ return configure(base, config);
}
public void close() throws Exception
@@ -108,7 +106,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
}
- private void removeMessage(StoreContext context, Long messageId) throws AMQException
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
{
checkNotClosed();
if (_log.isDebugEnabled())
@@ -117,7 +115,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
_metaDataMap.remove(messageId);
_contentBodyMap.remove(messageId);
- _messageEnqueueMap.remove(messageId);
}
public void createExchange(Exchange exchange) throws AMQException
@@ -155,41 +152,25 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
// Not required to do anything
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
- synchronized (_messageEnqueueMap)
+ for (AMQQueue q : queues)
{
- List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
- if (queues == null)
+ if (q.isDurable())
{
- queues = new LinkedList<AMQQueue>();
- _messageEnqueueMap.put(messageId, queues);
+ enqueueMessage(context,q,messageId);
}
-
- queues.add(queue);
}
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
- synchronized (_messageEnqueueMap)
- {
- List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
- if (queues == null || !queues.contains(queue))
- {
- throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
- + " but it is not enqueued on that queue.");
- }
- else
- {
- queues.remove(queue);
- if (queues.isEmpty())
- {
- removeMessage(context,messageId);
- }
- }
- }
+ // Not required to do anything
+ }
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+ // Not required to do anything
}
public void beginTran(StoreContext context) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
index fdb56a1a55..b5ae8ea284 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -21,6 +21,12 @@
package org.apache.qpid.server.store;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
/**
* A context that the store can use to associate with a transactional context. For example, it could store
@@ -32,17 +38,37 @@ public class StoreContext
{
private static final Logger _logger = Logger.getLogger(StoreContext.class);
+ private static final String DEFAULT_NAME = "StoreContext";
private String _name;
private Object _payload;
+ private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap;
+ private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap;
+ private boolean _async;
public StoreContext()
{
- _name = "StoreContext";
+ this(DEFAULT_NAME);
}
public StoreContext(String name)
{
+ this(name,false);
+ }
+
+ /**
+ *
+ * @param name The name of this Transaction
+ * @param asynchrouous Is this Transaction Asynchronous
+ */
+ public StoreContext(String name, boolean asynchrouous)
+ {
_name = name;
+ _async = asynchrouous;
+ }
+
+ public StoreContext(boolean asynchronous)
+ {
+ this(DEFAULT_NAME, asynchronous);
}
public Object getPayload()
@@ -52,7 +78,7 @@ public class StoreContext
public void setPayload(Object payload)
{
- if(_logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
_logger.debug("public void setPayload(Object payload = " + payload + "): called");
}
@@ -68,4 +94,95 @@ public class StoreContext
{
return "<_name = " + _name + ", _payload = " + _payload + ">";
}
+
+ public Map<Long, ArrayList<AMQQueue>> getEnqueueMap()
+ {
+ return _enqueueMap;
+ }
+
+ public Map<Long, ArrayList<AMQQueue>> getDequeueMap()
+ {
+ return _dequeueMap;
+ }
+
+ /**
+ * Record the enqueues for processing if we abort
+ *
+ * @param queues
+ * @param messageId
+ *
+ * @throws AMQException
+ */
+ public void enqueueMessage(ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+ {
+ if (inTransaction())
+ {
+ ArrayList<AMQQueue> enqueues = _enqueueMap.get(messageId);
+
+ if (enqueues == null)
+ {
+ enqueues = new ArrayList<AMQQueue>();
+ _enqueueMap.put(messageId, enqueues);
+ }
+
+ for (AMQQueue q : queues)
+ {
+ if (!enqueues.contains(q))
+ {
+ enqueues.add(q);
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Record the dequeue for processing on commit
+ *
+ * @param queue
+ * @param messageId
+ *
+ * @throws AMQException
+ */
+ public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException
+ {
+ if (inTransaction())
+ {
+ ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
+
+ if (dequeues == null)
+ {
+ dequeues = new ArrayList<AMQQueue>();
+ _dequeueMap.put(messageId, dequeues);
+ }
+
+ dequeues.add(queue);
+ }
+ }
+
+ public void beginTransaction() throws AMQException
+ {
+ _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ }
+
+ public void commitTransaction() throws AMQException
+ {
+ _dequeueMap.clear();
+ }
+
+ public void abortTransaction() throws AMQException
+ {
+ _enqueueMap.clear();
+ }
+
+ public boolean inTransaction()
+ {
+ return _payload != null;
+ }
+
+ public boolean isAsync()
+ {
+ return _async;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
new file mode 100644
index 0000000000..4c3f1fcc49
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class BaseTransactionLog implements TransactionLog
+{
+ private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);
+
+ TransactionLog _delegate;
+ private Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
+
+ public BaseTransactionLog(TransactionLog delegate)
+ {
+ _delegate = delegate;
+ }
+
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ return _delegate.configure(virtualHost, base, config);
+ }
+
+ public void close() throws Exception
+ {
+ _delegate.close();
+ }
+
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+ {
+ context.enqueueMessage(queues, messageId);
+
+ if (queues.size() > 0)
+ {
+ _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
+
+ //Clone the list incase someone else changes it.
+ _idToQueues.put(messageId, (ArrayList) queues.clone());
+ }
+
+ _delegate.enqueueMessage(context, queues, messageId);
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ if (context.inTransaction())
+ {
+ context.dequeueMessage(queue, messageId);
+
+ Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
+
+ //For each Message ID that is in the map check
+ for (Long messageID : messageMap.keySet())
+ {
+ //If we don't have a gloabl reference for this message then there is only a single enqueue
+ if (_idToQueues.get(messageID) == null)
+ {
+ // Add the removal of the message to this transaction
+ _delegate.removeMessage(context,messageID);
+ // Remove this message ID as we have processed it so we don't reprocess after the main commmit
+ messageMap.remove(messageID);
+ }
+ }
+ }
+
+ _delegate.dequeueMessage(context, queue, messageId);
+
+ if (!context.inTransaction())
+ {
+ HashMap<Long, ArrayList<AMQQueue>> dequeue = new HashMap<Long, ArrayList<AMQQueue>>();
+ ArrayList list = new ArrayList();
+ list.add(queue);
+ dequeue.put(messageId, list);
+ processDequeues(dequeue);
+ }
+ }
+
+ /**
+ * This should not be called from main broker code.
+ * // Perhaps we need a new interface:
+ *
+ * Broker <->TransactionLog
+ * Broker <->BaseTransactionLog<->(Log with removeMessage())
+ */
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ {
+ _delegate.removeMessage(context, messageId);
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ context.beginTransaction();
+ _delegate.beginTran(context);
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ //Perform real commit of current data
+ _delegate.commitTran(context);
+
+ // If we have dequeues to process then process them
+ if (context.getDequeueMap() != null)
+ {
+ processDequeues(context.getDequeueMap());
+ }
+
+ //Commit the recorded state for this transaction.
+ context.commitTransaction();
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ // If we have enqueues to rollback
+ if (context.getEnqueueMap() != null)
+ {
+ processDequeues(context.getEnqueueMap());
+ }
+ //Abort the recorded state for this transaction.
+ context.abortTransaction();
+
+ _delegate.abortTran(context);
+ }
+
+ private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap)
+ throws AMQException
+ {
+ // Process any enqueues to bring our model up to date.
+ Set<Long> messageIDs = messageMap.keySet();
+
+ //Create a new Asynchronous Context.
+ StoreContext removeContext = new StoreContext(true);
+
+ //Batch Process the Dequeues on the delegate
+ _delegate.beginTran(removeContext);
+
+ try
+ {
+ //For each Message ID Decrement the reference for each of the queues it was on.
+ for (Long messageID : messageIDs)
+ {
+ ArrayList<AMQQueue> queueList = messageMap.get(messageID);
+
+ // For each of the queues decrement the reference
+ for (AMQQueue queue : queueList)
+ {
+ ArrayList<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+
+ // If we have no mapping then this message was only enqueued on a single queue
+ // This will be the case when we are not in a larger transaction
+ if (enqueuedList == null)
+ {
+ _delegate.removeMessage(removeContext, messageID);
+ }
+ else
+ {
+ // Update the enqueued list
+ enqueuedList.remove(queue);
+
+ // If the list is now empty then remove the message
+ if (enqueuedList.isEmpty())
+ {
+ _delegate.removeMessage(removeContext, messageID);
+ }
+ }
+ }
+ }
+
+ //Commit the removes on the delegate.
+ _delegate.commitTran(removeContext);
+ }
+ finally
+ {
+ if (removeContext.inTransaction())
+ {
+ _delegate.abortTran(removeContext);
+ }
+ }
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return _delegate.inTran(context);
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ _delegate.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ _delegate.storeMessageMetaData(context, messageId, messageMetaData);
+ }
+
+ public boolean isPersistent()
+ {
+ return _delegate.isPersistent();
+ }
+
+ public TransactionLog getDelegate()
+ {
+ return _delegate;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
index 97a1ecb38c..73d57df6e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
@@ -20,19 +20,16 @@
*/
package org.apache.qpid.server.transactionlog;
-import org.apache.commons.configuration.Configuration;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.StoreContext;
+import java.util.ArrayList;
+
/**
* TransactionLog defines the interface for performing transactions.
* This is used to preserve the state of messages, queues
@@ -68,7 +65,7 @@ public interface TransactionLog
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
+ Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
@@ -81,27 +78,33 @@ public interface TransactionLog
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param queue The queue to place the message on.
- * @param messageId The message to enqueue.
- * @throws AMQException If the operation fails for any reason.
+ * @param queues
+ *@param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException
*/
- void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
+ void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException;
/**
* Extracts a message from a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param queue The queue to place the message on.
- * @param messageId The message to dequeue.
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @param queue
+ * @param messageId The message to dequeue. @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
/**
+ * Remove the specified message from the log
+ *
+ * @param context The transactional context for the operation
+ * @param messageId The message to remove
+ * @throws AMQException
+ */
+ void removeMessage(StoreContext context, Long messageId) throws AMQException;
+
+ /**
* Begins a transactional context.
*
* @param context The transactional context to begin.
- *
* @throws AMQException If the operation fails for any reason.
*/
void beginTran(StoreContext context) throws AMQException;
@@ -158,31 +161,31 @@ public interface TransactionLog
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
-
- /**
- * Retrieves message meta-data.
- *
- * @param context The transactional context for the operation.
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
-
- /**
- * Retrieves a chunk of message data.
- *
- * @param context The transactional context for the operation.
- * @param messageId The message to get the data chunk for.
- * @param index The offset index of the data chunk within the message.
- *
- * @return A chunk of message data.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+//
+// /**
+// * Retrieves message meta-data.
+// *
+// * @param context The transactional context for the operation.
+// * @param messageId The message to get the meta-data for.
+// *
+// * @return The message meta data.
+// *
+// * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+// */
+// MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
+//
+// /**
+// * Retrieves a chunk of message data.
+// *
+// * @param context The transactional context for the operation.
+// * @param messageId The message to get the data chunk for.
+// * @param index The offset index of the data chunk within the message.
+// *
+// * @return A chunk of message data.
+// *
+// * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+// */
+// ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
/**
* Is this store capable of persisting the data
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 8a8cbd23cf..7bcfb9f59a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -51,6 +51,7 @@ import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import javax.management.NotCompliantMBeanException;
import java.util.Collections;
@@ -206,6 +207,14 @@ public class VirtualHost implements Accessable
{
_routingTable = (RoutingTable) _transactionLog;
}
+ else if (_transactionLog instanceof BaseTransactionLog)
+ {
+ TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate();
+ if (delegate instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable) delegate;
+ }
+ }
}
else
{
@@ -292,7 +301,8 @@ public class VirtualHost implements Accessable
_routingTable = (RoutingTable) _transactionLog;
}
- _transactionLog.configure(this, "store", config);
+ // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+ _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
}
//todo we need to move from store.class to transactionlog.class
@@ -497,8 +507,9 @@ public class VirtualHost implements Accessable
public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
+ return null;
}
public void close() throws Exception