summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java100
1 files changed, 78 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b2046efee3..e19038504f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -80,18 +80,17 @@ public class AMQMessage
*/
private boolean _immediate;
- private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
- private Subscription _takenBySubcription;
+
private Set<Subscription> _rejectedBy = null;
+
+
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
- public boolean isTaken(AMQQueue queue)
- {
- return _taken.get();
- }
private final int hashcode = System.identityHashCode(this);
@@ -206,7 +205,7 @@ public class AMQMessage
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
- _taken = new AtomicBoolean(false);
+// _taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -326,7 +325,6 @@ public class AMQMessage
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
- _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -459,17 +457,53 @@ public class AMQMessage
return _deliveredToConsumer;
}
-
- public boolean taken(AMQQueue queue, Subscription sub)
+ public boolean isTaken(AMQQueue queue)
{
- if (_taken.getAndSet(true))
+ //return _taken.get();
+
+ synchronized (this)
{
- return true;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ _takenMap.put(queue, taken);
+ }
+
+ return taken.get();
}
- else
+ }
+
+ public boolean taken(AMQQueue queue, Subscription sub)
+ {
+// if (_taken.getAndSet(true))
+// {
+// return true;
+// }
+// else
+// {
+// _takenBySubcription = sub;
+// return false;
+// }
+
+ synchronized (this)
{
- _takenBySubcription = sub;
- return false;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+
+ if (taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, sub);
+ return false;
+ }
}
}
@@ -479,8 +513,26 @@ public class AMQMessage
{
_log.trace("Releasing Message:" + debugIdentity());
}
- _taken.set(false);
- _takenBySubcription = null;
+
+// _taken.set(false);
+// _takenBySubcription = null;
+
+
+ synchronized (this)
+ {
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+ else
+ {
+ taken.set(false);
+ }
+
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, null);
+ }
}
public boolean checkToken(Object token)
@@ -833,16 +885,20 @@ public class AMQMessage
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- _taken + " by :" + _takenBySubcription;
+// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+// _taken + " by :" + _takenBySubcription;
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+ _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
- return _takenBySubcription;
+// return _takenBySubcription;
+ synchronized (this)
+ {
+ return _takenBySubcriptionMap.get(queue);
+ }
}
public void reject(Subscription subscription)