summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-05-18 23:05:36 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-05-18 23:05:36 +0000
commit495a47e8feeb3cf3110ed9b972775fa40f58be00 (patch)
tree2bdb1ade7d0cf67f8fde7f211f7bccc9348fb044 /java/client/src
parent27920d16e70f590b49548877a2129a1d2162d985 (diff)
downloadqpid-python-495a47e8feeb3cf3110ed9b972775fa40f58be00.tar.gz
Implemented the feature described in QPID-2515
However a few issues needs to be ironed out - see the JIRA for these issues. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945945 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java40
2 files changed, 55 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 5344f77539..4aca7454bd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* USed to store the range of in tx messages
*/
private RangeSet _txRangeSet = new RangeSet();
- private int _txSize = 0;
+ private int _txSize = 0;
//--- constructors
/**
@@ -560,6 +560,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
throws AMQException, FailoverException
{
boolean preAcquire;
+
+ long capacity = getCapacity(consumer.getDestination());
+
try
{
preAcquire = ( ! consumer.isNoConsume() &&
@@ -578,7 +581,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
- if (! prefetch())
+ if (capacity == 0)
{
getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
}
@@ -589,12 +592,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
- if(prefetch() && _dispatcher != null && (isStarted() || _immediatePrefetch))
+ if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch(),
+ capacity,
Option.UNRELIABLE);
}
@@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getCurrentException();
}
}
+
+ private long getCapacity(AMQDestination destination)
+ {
+ long capacity = 0;
+ if (destination.getDestSyntax() == DestSyntax.ADDR &&
+ destination.getSourceLink().getCapacity() > 0)
+ {
+ capacity = destination.getSourceLink().getCapacity();
+ }
+ else if (prefetch())
+ {
+ capacity = getAMQConnection().getMaxPrefetch();
+ }
+ return capacity;
+ }
/**
* Create an 0_10 message producer
@@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
//only set if msg list is null
try
{
- if (! prefetch())
+ long capacity = getCapacity(consumer.getDestination());
+
+ if (capacity == 0)
{
if (consumer.getMessageListener() != null)
{
@@ -757,7 +777,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
getQpidSession()
.messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch(),
+ capacity,
Option.UNRELIABLE);
}
getQpidSession()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 1b2d6876bd..321f5855d7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -19,6 +19,7 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
@@ -72,7 +73,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
-
+
+ private long capacity = 0;
+
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -100,6 +103,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
_isStarted = connection.started();
+
+ // Destination setting overrides connection defaults
+ if (destination.getDestSyntax() == DestSyntax.ADDR &&
+ destination.getSourceLink().getCapacity() > 0)
+ {
+ capacity = destination.getSourceLink().getCapacity();
+ }
+ else if (getSession().prefetch())
+ {
+ capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ }
+
}
@@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
if (messageOk)
{
- if (isMessageListenerSet() && ! getSession().prefetch())
+ if (isMessageListenerSet() && capacity == 0)
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1,
@@ -245,7 +260,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
// if we are syncrhonously waiting for a message
// and messages are not prefetched we then need to request another one
- if(! getSession().prefetch())
+ if(capacity == 0)
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1,
@@ -333,7 +348,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener != null && ! getSession().prefetch())
+ if (messageListener != null && capacity == 0)
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1,
@@ -372,11 +387,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (! getSession().prefetch())
+ if (capacity == 0)
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1,
@@ -385,23 +400,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
{
+
_0_10session.getQpidSession().messageFlush
(getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
_0_10session.getQpidSession().sync();
_0_10session.getQpidSession().messageFlow
(getConsumerTagString(), MessageCreditUnit.BYTE,
0xFFFFFFFF, Option.UNRELIABLE);
- if (getSession().prefetch())
+
+ if (capacity > 0)
{
_0_10session.getQpidSession().messageFlow
- (getConsumerTagString(), MessageCreditUnit.MESSAGE,
- _0_10session.getAMQConnection().getMaxPrefetch(),
- Option.UNRELIABLE);
+ (getConsumerTagString(),
+ MessageCreditUnit.MESSAGE,
+ capacity,
+ Option.UNRELIABLE);
}
_0_10session.syncDispatchQueue();
o = super.getMessageFromQueue(-1);
}
- if (! getSession().prefetch())
+ if (capacity == 0)
{
_syncReceive.set(false);
}