summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-03 13:26:55 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-03 13:26:55 +0000
commitd62a09ce7b5e3c7a66ec85c4064d40e012552b0c (patch)
tree1364a8757dddef44ad897ebb61dfed30993722eb /java/broker/src/main
parent7c604ebe2d81a733df8a25a7c2b330e1556d3013 (diff)
downloadqpid-python-d62a09ce7b5e3c7a66ec85c4064d40e012552b0c.tar.gz
QPID-1764 : Update to BaseTransactionLog to create a TestableTransactionLog, which will replace TestableMessageStore. Update to BaseTransactionLog/Test to work correctly with transactions and to fully test that functionality. Updated StoreContext to know when it is in a transaction as relying on a payload being set is not sufficient as that is not set when running with the MessageMemoryStore and so transactional testing in the BTLT was not correct.
Update to Virtualhost to correctly set the RoutingTable when the specified TransactionLog is wrapped in a BaseTransactionLog. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@761670 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java13
3 files changed, 44 insertions, 35 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
index b5ae8ea284..bb50df139f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -44,6 +44,7 @@ public class StoreContext
private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap;
private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap;
private boolean _async;
+ private boolean _inTransaction;
public StoreContext()
{
@@ -64,6 +65,9 @@ public class StoreContext
{
_name = name;
_async = asynchrouous;
+ _inTransaction = false;
+ _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
}
public StoreContext(boolean asynchronous)
@@ -82,7 +86,7 @@ public class StoreContext
{
_logger.debug("public void setPayload(Object payload = " + payload + "): called");
}
- _payload = payload;
+ _payload = payload;
}
/**
@@ -137,7 +141,7 @@ public class StoreContext
}
/**
- * Record the dequeue for processing on commit
+ * Record the dequeue for processing after the commit
*
* @param queue
* @param messageId
@@ -146,39 +150,37 @@ public class StoreContext
*/
public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException
{
- if (inTransaction())
- {
- ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
+ ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
- if (dequeues == null)
- {
- dequeues = new ArrayList<AMQQueue>();
- _dequeueMap.put(messageId, dequeues);
- }
-
- dequeues.add(queue);
+ if (dequeues == null)
+ {
+ dequeues = new ArrayList<AMQQueue>();
+ _dequeueMap.put(messageId, dequeues);
}
+
+ dequeues.add(queue);
}
public void beginTransaction() throws AMQException
{
- _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
- _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ _inTransaction = true;
}
public void commitTransaction() throws AMQException
{
_dequeueMap.clear();
+ _inTransaction = false;
}
public void abortTransaction() throws AMQException
{
_enqueueMap.clear();
+ _inTransaction = false;
}
public boolean inTransaction()
{
- return _payload != null;
+ return _inTransaction; // _payload != null;
}
public boolean isAsync()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
index 4c3f1fcc49..973ecd6c09 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
@@ -39,7 +39,7 @@ public class BaseTransactionLog implements TransactionLog
private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);
TransactionLog _delegate;
- private Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
+ protected Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
public BaseTransactionLog(TransactionLog delegate)
{
@@ -60,7 +60,7 @@ public class BaseTransactionLog implements TransactionLog
{
context.enqueueMessage(queues, messageId);
- if (queues.size() > 0)
+ if (queues.size() > 1)
{
_logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
@@ -73,10 +73,10 @@ public class BaseTransactionLog implements TransactionLog
public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
+ context.dequeueMessage(queue, messageId);
+
if (context.inTransaction())
{
- context.dequeueMessage(queue, messageId);
-
Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
//For each Message ID that is in the map check
@@ -97,11 +97,7 @@ public class BaseTransactionLog implements TransactionLog
if (!context.inTransaction())
{
- HashMap<Long, ArrayList<AMQQueue>> dequeue = new HashMap<Long, ArrayList<AMQQueue>>();
- ArrayList list = new ArrayList();
- list.add(queue);
- dequeue.put(messageId, list);
- processDequeues(dequeue);
+ processDequeues(context.getDequeueMap());
}
}
@@ -128,11 +124,7 @@ public class BaseTransactionLog implements TransactionLog
//Perform real commit of current data
_delegate.commitTran(context);
- // If we have dequeues to process then process them
- if (context.getDequeueMap() != null)
- {
- processDequeues(context.getDequeueMap());
- }
+ processDequeues(context.getDequeueMap());
//Commit the recorded state for this transaction.
context.commitTransaction();
@@ -141,10 +133,8 @@ public class BaseTransactionLog implements TransactionLog
public void abortTran(StoreContext context) throws AMQException
{
// If we have enqueues to rollback
- if (context.getEnqueueMap() != null)
- {
- processDequeues(context.getEnqueueMap());
- }
+ processDequeues(context.getEnqueueMap());
+
//Abort the recorded state for this transaction.
context.abortTransaction();
@@ -154,6 +144,12 @@ public class BaseTransactionLog implements TransactionLog
private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap)
throws AMQException
{
+ // Check we have dequeues to process then process them
+ if (messageMap == null || messageMap.isEmpty())
+ {
+ return;
+ }
+
// Process any enqueues to bring our model up to date.
Set<Long> messageIDs = messageMap.keySet();
@@ -190,6 +186,8 @@ public class BaseTransactionLog implements TransactionLog
if (enqueuedList.isEmpty())
{
_delegate.removeMessage(removeContext, messageID);
+ //Remove references list
+ _idToQueues.remove(messageID);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 7bcfb9f59a..dc12d97557 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -295,14 +295,23 @@ public class VirtualHost implements Accessable
}
_transactionLog = (TransactionLog) o;
+ // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+ _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
+
//Assign RoutingTable as old MessageStores converted to TransactionLog will require the _routingTable.
if (_transactionLog instanceof RoutingTable)
{
_routingTable = (RoutingTable) _transactionLog;
}
+ else if (_transactionLog instanceof BaseTransactionLog)
+ {
+ TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate();
+ if (delegate instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable) delegate;
+ }
+ }
- // If a TransactionLog uses the BaseTransactionLog then it will return this object.
- _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
}
//todo we need to move from store.class to transactionlog.class