summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-09-26 10:19:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-09-26 10:19:19 +0000
commit97bc49722766753312dbde7dcc18c57ebf43f04e (patch)
tree7ea9af207774736a14d4ca67d0f57811e05c6d96
parent624734f4cd4eb584dfafd55f4de8933e155f224d (diff)
downloadqpid-python-97bc49722766753312dbde7dcc18c57ebf43f04e.tar.gz
QPID-4901 : Queue Browser hangs on reaching end of queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1526438 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java17
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java12
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java2
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java2
5 files changed, 32 insertions, 7 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
index 575902ea03..29de87875c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
@@ -146,13 +146,26 @@ public class QueueBrowserImpl implements QueueBrowser
if( _needNext )
{
_needNext = false;
- _nextElement = createJMSMessage(_receiver.receive(0L));
+ Message msg = _receiver.receive(0L);
+ if(msg != null)
+ {
+ _receiver.acknowledge(msg);
+ }
+ _nextElement = createJMSMessage(msg);
if( _nextElement == null )
{
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
// Drain to verify there really are no more messages.
_receiver.drain();
_receiver.drainWait();
- _nextElement = createJMSMessage(_receiver.receive(0L));
+ msg = _receiver.receive(0L);
+
+ if(msg != null)
+ {
+ _receiver.acknowledge(msg);
+ }
+ _nextElement = createJMSMessage(msg);
+
if( _nextElement == null )
{
close();
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
index 3a7af39d8a..587d26026f 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
@@ -444,13 +444,25 @@ public abstract class LinkEndpoint<T extends LinkEventListener>
sendFlow(_flowTransactionId != null);
}
+ public void sendFlowWithEcho()
+ {
+ sendFlow(_flowTransactionId != null, true);
+ }
+
+
public void sendFlow(boolean setTransactionId)
{
+ sendFlow(setTransactionId, false);
+ }
+
+ public void sendFlow(boolean setTransactionId, boolean echo)
+ {
if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
{
Flow flow = new Flow();
flow.setLinkCredit(_linkCredit);
flow.setDeliveryCount(_deliveryCount);
+ flow.setEcho(echo);
_lastSentCreditLimit = _linkCredit.add(_deliveryCount);
flow.setAvailable(_available);
flow.setDrain(_drain);
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
index e5019f9479..8a5940658c 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
@@ -288,7 +288,7 @@ public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener>
setDrain(true);
_creditWindow = false;
_drainLimit = getDeliveryCount().add(getLinkCredit());
- sendFlow();
+ sendFlowWithEcho();
getLock().notifyAll();
}
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java
index 764fa222ee..be9b271f72 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/SourceConstructor.java
@@ -231,7 +231,7 @@ public class SourceConstructor extends DescribedTypeConstructor<Source>
try
{
- obj.setDistributionMode( (DistributionMode) val );
+ obj.setDistributionMode( StdDistMode.valueOf(val) );
}
catch(ClassCastException e)
{
@@ -326,7 +326,7 @@ public class SourceConstructor extends DescribedTypeConstructor<Source>
// TODO Error
}
}
-
+
}
@@ -360,7 +360,7 @@ public class SourceConstructor extends DescribedTypeConstructor<Source>
// TODO Error
}
}
-
+
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 35f24afbce..0351de3e00 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -164,7 +164,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _subscription = new Subscription_1_0(this, qd);
+ _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY);
}
else if(destination instanceof ExchangeDestination)
{