diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-04-08 19:46:29 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-08 19:46:29 +0000 |
| commit | 097dcd1f13d66d4726e05aae407415be88bfd19b (patch) | |
| tree | d7fde1d8d0e6188be53eea2886217f84b22875b0 /java/broker | |
| parent | c303e56a77c85134a94780a3368cd0970fbf9109 (diff) | |
| download | qpid-python-097dcd1f13d66d4726e05aae407415be88bfd19b.tar.gz | |
QPID-1794 : Removed unnecessary synchronisation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@763363 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java | 11 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java | 135 |
2 files changed, 70 insertions, 76 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index 26892868f3..eb28d83d92 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -66,7 +66,7 @@ public class StoreContext _name = name; _async = asynchrouous; _inTransaction = false; - _dequeueMap = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>()); + _dequeueMap = new HashMap<Long, List<AMQQueue>>(); } public StoreContext(boolean asynchronous) @@ -115,13 +115,10 @@ public class StoreContext { List<AMQQueue> dequeues = _dequeueMap.get(messageId); - synchronized (_dequeueMap) + if (dequeues == null) { - if (dequeues == null) - { - dequeues = Collections.synchronizedList(new ArrayList<AMQQueue>()); - _dequeueMap.put(messageId, dequeues); - } + dequeues = new ArrayList<AMQQueue>(); + _dequeueMap.put(messageId, dequeues); } dequeues.add(queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java index ce2d67cf60..e200de7a6f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java @@ -69,7 +69,7 @@ public class BaseTransactionLog implements TransactionLog } //Clone the list incase someone else changes it. - _idToQueues.put(messageId, (List<AMQQueue>)Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone())); + _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone())); } _delegate.enqueueMessage(context, queues, messageId); @@ -87,27 +87,24 @@ public class BaseTransactionLog implements TransactionLog //For each Message ID that is in the map check Set<Long> messageIDs = messageMap.keySet(); - synchronized (messageMap) + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Pre-Processing single dequeue of:" + messageIDs); - } + _logger.info("Pre-Processing single dequeue of:" + messageIDs); + } - Iterator iterator = messageIDs.iterator(); - - while (iterator.hasNext()) + Iterator iterator = messageIDs.iterator(); + + while (iterator.hasNext()) + { + Long messageID = (Long) iterator.next(); + //If we don't have a gloabl reference for this message then there is only a single enqueue + //can check here to see if this is the last reference? + if (_idToQueues.get(messageID) == null) { - Long messageID = (Long) iterator.next(); - //If we don't have a gloabl reference for this message then there is only a single enqueue - //can check here to see if this is the last reference? - if (_idToQueues.get(messageID) == null) - { - // Add the removal of the message to this transaction - _delegate.removeMessage(context, messageID); - // Remove this message ID as we have processed it so we don't reprocess after the main commmit - iterator.remove(); - } + // Add the removal of the message to this transaction + _delegate.removeMessage(context, messageID); + // Remove this message ID as we have processed it so we don't reprocess after the main commmit + iterator.remove(); } } } @@ -180,66 +177,66 @@ public class BaseTransactionLog implements TransactionLog { //For each Message ID Decrement the reference for each of the queues it was on. - synchronized (messageMap) + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Processing Dequeue for:" + messageIDs); - } + _logger.info("Processing Dequeue for:" + messageIDs); + } - Iterator<Long> messageIDIterator = messageIDs.iterator(); + Iterator<Long> messageIDIterator = messageIDs.iterator(); - while(messageIDIterator.hasNext()) - { - Long messageID = messageIDIterator.next(); - List<AMQQueue> queueList = messageMap.get(messageID); + while(messageIDIterator.hasNext()) + { + Long messageID = messageIDIterator.next(); + List<AMQQueue> queueList = messageMap.get(messageID); - //Remove this message from our DequeueMap as we are processing it. - messageIDIterator.remove(); + //Remove this message from our DequeueMap as we are processing it. + messageIDIterator.remove(); - // For each of the queues decrement the reference - for (AMQQueue queue : queueList) - { - List<AMQQueue> enqueuedList = _idToQueues.get(messageID); + // For each of the queues decrement the reference + for (AMQQueue queue : queueList) + { + List<AMQQueue> enqueuedList = _idToQueues.get(messageID); - if (_logger.isInfoEnabled()) - { - _logger.info("Dequeue message:" + messageID + " from :" + queue); - } + if (_logger.isInfoEnabled()) + { + _logger.info("Dequeue message:" + messageID + " from :" + queue); + } - // If we have no mapping then this message was only enqueued on a single queue - // This will be the case when we are not in a larger transaction - if (enqueuedList == null) - { - _delegate.removeMessage(removeContext, messageID); - } - else + // If we have no mapping then this message was only enqueued on a single queue + // This will be the case when we are not in a larger transaction + if (enqueuedList == null) + { + _delegate.removeMessage(removeContext, messageID); + } + else + { + //When a message is on more than one queue it is possible that this code section is exectuted + // by one thread per enqueue. + // It is however, thread safe because there is only removes being performed and so the + // last thread that does the remove will see the empty queue and remove the message + // At this stage there is nothing that is going to cause this operation to abort. So we don't + // need to worry about any potential adds. + // The message will no longer be enqueued as that operation has been committed before now so + // this is clean up of the data. + + //Must synchronize here as this list may have been extracted from _idToQueues in many threads + // and we must ensure only one of them update the list at a time. + synchronized (enqueuedList) { - //When a message is on more than one queue it is possible that this code section is exectuted - // by one thread per enqueue. - // It is however, thread safe because there is only removes being performed and so the - // last thread that does the remove will see the empty queue and remove the message - // At this stage there is nothing that is going to cause this operation to abort. So we don't - // need to worry about any potential adds. - // The message will no longer be enqueued as that operation has been committed before now so - // this is clean up of the data. - synchronized (enqueuedList) + // Update the enqueued list but if the queue is not in the list then we are trying + // to dequeue something that is not there anymore, or was never there. + if (!enqueuedList.remove(queue)) + { + throw new UnableToDequeueException(messageID, queue); + } + + // If the list is now empty then remove the message + if (enqueuedList.isEmpty()) { - // Update the enqueued list but if the queue is not in the list then we are trying - // to dequeue something that is not there anymore, or was never there. - if (!enqueuedList.remove(queue)) - { - throw new UnableToDequeueException(messageID, queue); - } - - // If the list is now empty then remove the message - if (enqueuedList.isEmpty()) - { - _delegate.removeMessage(removeContext, messageID); - //Remove references list - _idToQueues.remove(messageID); - } + _delegate.removeMessage(removeContext, messageID); + //Remove references list + _idToQueues.remove(messageID); } } } |
