summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-19 15:10:10 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-19 15:10:10 +0000
commitbeb8918984d1afd7cce01a0d308c5e381bed47bd (patch)
treedf36d195a4b0970b85e3780f6c3015bfaa07fc45 /java
parentf97c8a999f3b62fa8212a8fd102ea31662bc00d3 (diff)
downloadqpid-python-beb8918984d1afd7cce01a0d308c5e381bed47bd.tar.gz
QPID-454 - Message 'taken' notion is per message. Adjusted to be per message per queue.
Previous commit was not sufficiently tested and other bugs were causing problems that were not related to this change. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530444 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java100
1 files changed, 78 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b2046efee3..e19038504f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -80,18 +80,17 @@ public class AMQMessage
*/
private boolean _immediate;
- private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
- private Subscription _takenBySubcription;
+
private Set<Subscription> _rejectedBy = null;
+
+
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
- public boolean isTaken(AMQQueue queue)
- {
- return _taken.get();
- }
private final int hashcode = System.identityHashCode(this);
@@ -206,7 +205,7 @@ public class AMQMessage
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
- _taken = new AtomicBoolean(false);
+// _taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -326,7 +325,6 @@ public class AMQMessage
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
- _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -459,17 +457,53 @@ public class AMQMessage
return _deliveredToConsumer;
}
-
- public boolean taken(AMQQueue queue, Subscription sub)
+ public boolean isTaken(AMQQueue queue)
{
- if (_taken.getAndSet(true))
+ //return _taken.get();
+
+ synchronized (this)
{
- return true;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ _takenMap.put(queue, taken);
+ }
+
+ return taken.get();
}
- else
+ }
+
+ public boolean taken(AMQQueue queue, Subscription sub)
+ {
+// if (_taken.getAndSet(true))
+// {
+// return true;
+// }
+// else
+// {
+// _takenBySubcription = sub;
+// return false;
+// }
+
+ synchronized (this)
{
- _takenBySubcription = sub;
- return false;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+
+ if (taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, sub);
+ return false;
+ }
}
}
@@ -479,8 +513,26 @@ public class AMQMessage
{
_log.trace("Releasing Message:" + debugIdentity());
}
- _taken.set(false);
- _takenBySubcription = null;
+
+// _taken.set(false);
+// _takenBySubcription = null;
+
+
+ synchronized (this)
+ {
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+ else
+ {
+ taken.set(false);
+ }
+
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, null);
+ }
}
public boolean checkToken(Object token)
@@ -833,16 +885,20 @@ public class AMQMessage
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- _taken + " by :" + _takenBySubcription;
+// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+// _taken + " by :" + _takenBySubcription;
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+ _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
- return _takenBySubcription;
+// return _takenBySubcription;
+ synchronized (this)
+ {
+ return _takenBySubcriptionMap.get(queue);
+ }
}
public void reject(Subscription subscription)