diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-04-19 15:10:10 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-19 15:10:10 +0000 |
| commit | beb8918984d1afd7cce01a0d308c5e381bed47bd (patch) | |
| tree | df36d195a4b0970b85e3780f6c3015bfaa07fc45 /java | |
| parent | f97c8a999f3b62fa8212a8fd102ea31662bc00d3 (diff) | |
| download | qpid-python-beb8918984d1afd7cce01a0d308c5e381bed47bd.tar.gz | |
QPID-454 - Message 'taken' notion is per message. Adjusted to be per message per queue.
Previous commit was not sufficiently tested and other bugs were causing problems that were not related to this change.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530444 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java | 100 |
1 files changed, 78 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b2046efee3..e19038504f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -80,18 +80,17 @@ public class AMQMessage */ private boolean _immediate; - private AtomicBoolean _taken = new AtomicBoolean(false); + // private Subscription _takenBySubcription; + // private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); - private Subscription _takenBySubcription; + private Set<Subscription> _rejectedBy = null; + + private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); - public boolean isTaken(AMQQueue queue) - { - return _taken.get(); - } private final int hashcode = System.identityHashCode(this); @@ -206,7 +205,7 @@ public class AMQMessage _immediate = info.isImmediate(); _transientMessageData.setMessagePublishInfo(info); - _taken = new AtomicBoolean(false); +// _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { @@ -326,7 +325,6 @@ public class AMQMessage for (AMQQueue q : _transientMessageData.getDestinationQueues()) { - _takenMap.put(q, new AtomicBoolean(false)); _messageHandle.enqueue(storeContext, _messageId, q); } @@ -459,17 +457,53 @@ public class AMQMessage return _deliveredToConsumer; } - - public boolean taken(AMQQueue queue, Subscription sub) + public boolean isTaken(AMQQueue queue) { - if (_taken.getAndSet(true)) + //return _taken.get(); + + synchronized (this) { - return true; + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + _takenMap.put(queue, taken); + } + + return taken.get(); } - else + } + + public boolean taken(AMQQueue queue, Subscription sub) + { +// if (_taken.getAndSet(true)) +// { +// return true; +// } +// else +// { +// _takenBySubcription = sub; +// return false; +// } + + synchronized (this) { - _takenBySubcription = sub; - return false; + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + } + + if (taken.getAndSet(true)) + { + return true; + } + else + { + _takenMap.put(queue, taken); + _takenBySubcriptionMap.put(queue, sub); + return false; + } } } @@ -479,8 +513,26 @@ public class AMQMessage { _log.trace("Releasing Message:" + debugIdentity()); } - _taken.set(false); - _takenBySubcription = null; + +// _taken.set(false); +// _takenBySubcription = null; + + + synchronized (this) + { + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + } + else + { + taken.set(false); + } + + _takenMap.put(queue, taken); + _takenBySubcriptionMap.put(queue, null); + } } public boolean checkToken(Object token) @@ -833,16 +885,20 @@ public class AMQMessage public String toString() { - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + - _taken + " by :" + _takenBySubcription; +// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + +// _taken + " by :" + _takenBySubcription; -// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + -// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } public Subscription getDeliveredSubscription(AMQQueue queue) { - return _takenBySubcription; +// return _takenBySubcription; + synchronized (this) + { + return _takenBySubcriptionMap.get(queue); + } } public void reject(Subscription subscription) |
