summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java251
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java22
3 files changed, 124 insertions, 169 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a668016f93..0e2645689c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -336,7 +336,14 @@ public class AMQChannel
Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
if (sub != null)
{
- sub.getQueue().unregisterSubscription(sub);
+ try {
+ sub.getSendLock();
+ sub.getQueue().unregisterSubscription(sub);
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
return true;
}
else
@@ -395,7 +402,16 @@ public class AMQChannel
Subscription sub = me.getValue();
- sub.getQueue().unregisterSubscription(sub);
+ try
+ {
+ sub.getSendLock();
+ sub.getQueue().unregisterSubscription(sub);
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+
}
_tag2SubscriptionMap.clear();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index bfbcb9e22f..bc335d0ba5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1174,7 +1174,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
boolean complete = false;
try
{
- complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES);
+ complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
}
catch (AMQException e)
@@ -1204,79 +1204,28 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
flushSubscription(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException
+ public boolean flushSubscription(Subscription sub, Long deliveries) throws AMQException
{
boolean atTail = false;
- boolean advanced;
while (!sub.isSuspended() && !atTail && deliveries != 0)
{
-
- advanced = false;
- sub.getSendLock();
- try
+ try
{
- if (sub.isActive())
+ sub.getSendLock();
+ atTail = attemptDelivery(sub, deliveries);
+ if (atTail && sub.isAutoClose())
{
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (!(node.isAcquired() || node.isDeleted()))
- {
- if (!sub.isSuspended())
- {
- if (sub.hasInterest(node))
- {
- if (!sub.wouldSuspend(node))
- {
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
-
- }
- else
- {
- deliveries--;
- deliverMessage(sub, node);
-
- if (sub.isBrowser())
- {
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- }
- }
-
- }
- else
- {
- break;
- }
- }
- else
- {
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
- }
-
- }
- atTail = (_entries.next(node) == null) && !advanced;
+ unregisterSubscription(sub);
+ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
}
}
finally
{
sub.releaseSendLock();
}
-
}
// if there's (potentially) more than one subscription the others will potentially not have been advanced to the
@@ -1287,16 +1236,72 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
advanceAllSubscriptions();
}
+ return atTail;
+ }
- if (atTail && sub.isAutoClose())
+ private boolean attemptDelivery(Subscription sub, Long deliveries) throws AMQException
+ {
+ boolean atTail = false;
+ boolean advanced = false;
+ boolean subActive = sub.isActive();
+ if (subActive)
{
- unregisterSubscription(sub);
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (!(node.isAcquired() || node.isDeleted()))
+ {
+ if (!sub.isSuspended())
+ {
+ if (sub.hasInterest(node))
+ {
+ if (!sub.wouldSuspend(node))
+ {
+ if (!sub.isBrowser() && !node.acquire(sub))
+ {
+ sub.restoreCredit(node);
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
+ }
+ else
+ {
+ deliveries--;
+ deliverMessage(sub, node);
- return atTail;
+ if (sub.isBrowser())
+ {
+ QueueEntry newNode = _entries.next(node);
+
+ if (newNode != null)
+ {
+ advanced = true;
+ sub.setLastSeenEntry(node, newNode);
+ node = sub.getLastSeenEntry();
+ }
+ }
+ }
+
+ }
+ else // Not enough Credit for message and wouldSuspend
+ {
+ //QPID-1187 - Treat the subscription as suspended for this message
+ // and wait for the message to be removed to continue delivery.
+ subActive = false;
+ node.addStateChangeListener(new QueueEntryListener(sub, node));
+ }
+ }
+ else
+ {
+ // this subscription is not interested in this node so we can skip over it
+ QueueEntry newNode = _entries.next(node);
+ if (newNode != null)
+ {
+ sub.setLastSeenEntry(node, newNode);
+ }
+ }
+ }
+
+ }
+ atTail = (_entries.next(node) == null) && !advanced;
+ }
+ return atTail || !subActive;
}
protected void advanceAllSubscriptions() throws AMQException
@@ -1347,7 +1352,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
boolean deliveryIncomplete = true;
int extraLoops = 1;
- int deliveries = MAX_ASYNC_DELIVERIES;
+ Long deliveries = new Long(MAX_ASYNC_DELIVERIES);
_asynchronousRunner.compareAndSet(runner, null);
@@ -1372,110 +1377,46 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
boolean closeConsumer = false;
Subscription sub = subscriptionIter.getNode().getSubscription();
- if (sub != null)
+ sub.getSendLock();
+ try
{
- sub.getSendLock();
- try
+ if (sub != null)
{
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null && sub.isActive())
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (node != null)
{
- boolean advanced = false;
- boolean subActive = false;
-
- if (!(node.isAcquired() || node.isDeleted()))
- {
- if (!sub.isSuspended())
- {
- subActive = true;
- if (sub.hasInterest(node))
- {
- if (!sub.wouldSuspend(node))
- {
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
-
- }
- else
- {
- deliverMessage(sub, node);
- deliveries--;
-
- if (sub.isBrowser())
- {
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- advanced = true;
- }
-
- }
- }
- done = false;
- }
- else // Not enough Credit for message and wouldSuspend
- {
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
-
- node.addStateChangeListener(new QueueEntryListener(sub, node));
- }
- }
- else
- {
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
- }
- }
- final boolean atTail = (_entries.next(node) == null);
-
- done = done && (!subActive || atTail);
-
- closeConsumer = (atTail && !advanced && sub.isAutoClose());
+ done = attemptDelivery(sub, deliveries);
}
}
- finally
+ if (done)
{
- sub.releaseSendLock();
- }
-
- if (closeConsumer)
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
+ if (extraLoops == 0)
+ {
+ deliveryIncomplete = false;
+ if (sub.isAutoClose())
+ {
+ unregisterSubscription(sub);
- }
- if (done)
- {
- if (extraLoops == 0)
- {
- deliveryIncomplete = false;
+ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+ }
+ }
+ else
+ {
+ extraLoops--;
+ }
}
else
{
- extraLoops--;
+ extraLoops = 1;
}
}
- else
+ finally
{
- extraLoops = 1;
+ sub.releaseSendLock();
}
}
-
_asynchronousRunner.set(null);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
index 910d546034..91ed9766f6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.test.client;
import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.log4j.Logger;
@@ -91,25 +92,22 @@ public class FlowControlTest extends QpidTestCase
assertNotNull("Second message not received", r2);
assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
- Message r3 = recv.receiveNoWait();
+ Message r3 = recv.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r1.acknowledge();
+ ((AbstractJMSMessage)r1).acknowledgeThis();
- r3 = recv.receiveNoWait();
+ r3 = recv.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r2.acknowledge();
+ ((AbstractJMSMessage)r2).acknowledgeThis();
r3 = recv.receive(RECEIVE_TIMEOUT);
assertNotNull("Third message not received", r3);
assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
- r3.acknowledge();
- recv.close();
- consumerSession.close();
+ ((AbstractJMSMessage)r3).acknowledgeThis();
consumerConnection.close();
-
}
public void testTwoConsumersBytesFlowControl() throws Exception
@@ -161,21 +159,21 @@ public class FlowControlTest extends QpidTestCase
assertNotNull("First message not received", r1);
assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
- Message r2 = recv1.receiveNoWait();
+ Message r2 = recv1.receive(RECEIVE_TIMEOUT);
assertNull("Second message incorrectly delivered", r2);
Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
((AMQSession_0_8) consumerSession2).setPrefetchLimits(0, 256);
MessageConsumer recv2 = consumerSession2.createConsumer(_queue);
- r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT);
+ r2 = recv2.receive(RECEIVE_TIMEOUT);
assertNotNull("Second message not received", r2);
assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
- Message r3 = recv2.receiveNoWait();
+ Message r3 = recv2.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT);
+ r3 = recv1.receive(RECEIVE_TIMEOUT);
assertNotNull("Third message not received", r3);
assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));