From d421eff48b17b37cb6c9ad226fde22dcc72cb6d4 Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Thu, 17 Apr 2008 12:44:35 +0000 Subject: QPID-796 Made connection URL property + use session level method git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649070 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 24 ++++++++++++++++++++++ .../java/org/apache/qpid/client/AMQSession.java | 11 ++++++++++ .../org/apache/qpid/client/AMQSession_0_10.java | 10 ++++----- .../qpid/client/BasicMessageConsumer_0_10.java | 12 +++++------ .../java/org/apache/qpid/jms/ConnectionURL.java | 1 + 5 files changed, 47 insertions(+), 11 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index adbe03e986..566582e666 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -152,6 +152,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected AMQConnectionDelegate _delegate; + // this connection maximum number of prefetched messages + private long _maxPrefetch; + /** * @param broker brokerdetails * @param username username @@ -231,6 +234,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { + // set this connection maxPrefetch + if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) + { + _maxPrefetch = Long.parseLong( connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + } + else + { + // use the defaul value set for all connections + _maxPrefetch = ClientProperties.MAX_PREFETCH; + } + _failoverPolicy = new FailoverPolicy(connectionURL); if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM)) { @@ -1179,4 +1193,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions.get(channelId); } + + /** + * Get the maximum number of messages that this connection can pre-fetch. + * + * @return The maximum number of messages that this connection can pre-fetch. + */ + public long getMaxPrefetch() + { + return _maxPrefetch; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index f79523e546..ee55743d0e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2339,6 +2339,17 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + /** + * Indicates whether this session consumers pre-fetche messages + * + * @return true if this session consumers pre-fetche messages false otherwise + */ + public boolean prefetch() + { + return getAMQConnection().getMaxPrefetch() > 0; + } + + public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException; /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 2d60877c5e..d72668bb53 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -406,7 +406,7 @@ public class AMQSession_0_10 extends AMQSession new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); - if (ClientProperties.MAX_PREFETCH == 0) + if (! prefetch()) { getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT); } @@ -417,12 +417,12 @@ public class AMQSession_0_10 extends AMQSession getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch - if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || _immediatePrefetch)) + if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, - ClientProperties.MAX_PREFETCH); + getAMQConnection().getMaxPrefetch()); } getQpidSession().sync(); getCurrentException(); @@ -531,7 +531,7 @@ public class AMQSession_0_10 extends AMQSession //only set if msg list is null try { - if (ClientProperties.MAX_PREFETCH == 0) + if (! prefetch()) { if (consumer.getMessageListener() != null) { @@ -543,7 +543,7 @@ public class AMQSession_0_10 extends AMQSession { getQpidSession() .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, - ClientProperties.MAX_PREFETCH); + getAMQConnection().getMaxPrefetch()); } getQpidSession() .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index d7cce986aa..4f2d5d8c34 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -141,7 +141,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer