summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-09-08 15:37:20 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-09-08 15:37:20 +0000
commit7f5b8221bb212a12d588785c1881dabd3538ccdf (patch)
tree0b5aaf6512994b1fa5107fd834d1f2b621cd7057 /qpid/java/client
parent76b0ca0a375198db3bd5f73687c0b24f713b1143 (diff)
downloadqpid-python-7f5b8221bb212a12d588785c1881dabd3538ccdf.tar.gz
QPID-6088 : [Java Client] AMQP 0-8/8/9-1 prefetch should auto expand when receive is called in a situation where the prefetch buffer is full
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1623422 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java28
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java92
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java62
4 files changed, 151 insertions, 67 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 2a91ff3ce2..6b3b4601d9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -67,13 +67,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
@@ -696,36 +690,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
-
- ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
-
- // TODO: Be aware of possible changes to parameter order as versions change.
-
- _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
-
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
-
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
-
- if (transacted)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Issuing TxSelect for " + channelId);
- }
-
- TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
-
- // TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
- }
- }
-
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 9650ad76fb..176eb5d0c4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -44,8 +44,6 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.FieldTable;
@@ -55,6 +53,7 @@ import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -182,10 +181,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
}
- return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
- new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
+ return new FailoverRetrySupport<Session, JMSException>(
+ new FailoverProtectedOperation<Session, JMSException>()
{
- public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
+ public Session execute() throws JMSException, FailoverException
{
int channelId = _conn.getNextChannelID();
@@ -197,7 +196,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
// We must create the session and register it before actually sending the frame to the server to
// open it, so that there is no window where we could receive data on the channel and not be set
// up to handle it appropriately.
- AMQSession session =
+ AMQSession_0_8 session =
new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
_conn.registerSession(channelId, session);
@@ -205,7 +204,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
boolean success = false;
try
{
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ createChannelOverWire(channelId, transacted);
+ session.setPrefetchLimits(prefetchHigh, 0);
success = true;
}
catch (AMQException e)
@@ -252,18 +252,12 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
}
- private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ private void createChannelOverWire(int channelId, boolean transacted)
throws AMQException, FailoverException
{
ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
- // TODO: Be aware of possible changes to parameter order as versions change.
_conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
- _conn.getProtocolHandler().syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
-
if (transacted)
{
if (_logger.isDebugEnabled())
@@ -292,7 +286,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
_logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
for (Iterator it = sessions.iterator(); it.hasNext();)
{
- AMQSession s = (AMQSession) it.next();
+ AMQSession_0_8 s = (AMQSession_0_8) it.next();
// reset the flow control flag
// on opening channel, broker sends flow blocked if virtual host is blocked
@@ -300,7 +294,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
// that's why we need to reset the flow control flag
s.setFlowControl(true);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
-
+ s.setPrefetchLimits(s.getDefaultPrefetchHigh(), 0);
s.resubscribe();
}
}
@@ -310,7 +304,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
try
{
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ createChannelOverWire(channelId, transacted);
}
catch (AMQException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index f3db245e45..693358c3ae 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -32,6 +32,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -93,6 +94,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
*/
private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+ private AtomicInteger _currentPrefetch = new AtomicInteger();
/** Flow control */
private FlowControlIndicator _flowControl = new FlowControlIndicator();
@@ -112,7 +114,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
- super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+ super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+ _currentPrefetch.set(0);
}
/**
@@ -140,6 +143,14 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
protected void acknowledgeImpl() throws JMSException
{
boolean syncRequired = false;
+ try
+ {
+ reduceCreditAfterAcknowledge();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
while (true)
{
Long tag = getUnacknowledgedMessageTags().poll();
@@ -151,7 +162,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
acknowledgeMessage(tag, false);
syncRequired = true;
}
-
+ _currentPrefetch.set(0);
try
{
if (syncRequired && _syncAfterClientAck)
@@ -262,8 +273,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
final AMQProtocolHandler handler = getProtocolHandler();
-
+ reduceCreditAfterAcknowledge();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
+ _currentPrefetch.set(0);
}
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
@@ -817,25 +829,77 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
}
- public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+ public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch)
+ throws AMQException, FailoverException
{
- new FailoverRetrySupport<Object, AMQException>(
- new FailoverProtectedOperation<Object, AMQException>()
+ _currentPrefetch.set(0);
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+
+ }
+
+
+
+ protected boolean ensureCreditForReceive() throws AMQException
+ {
+ return new FailoverNoopSupport<>(
+ new FailoverProtectedOperation<Boolean, AMQException>()
{
- public Object execute() throws AMQException, FailoverException
+ public Boolean execute() throws AMQException, FailoverException
{
+ int currentPrefetch = _currentPrefetch.get();
+ if (currentPrefetch >= getPrefetch())
+ {
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry()
+ .createBasicQosBody(0, currentPrefetch + 1, false);
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
+ BasicQosOkBody.class);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }, getProtocolHandler().getConnection()).execute();
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+ }
- return null;
- }
- }, getAMQConnection()).execute();
+ protected void reduceCreditAfterAcknowledge() throws AMQException
+ {
+ int acknowledgeMode = getAcknowledgeMode();
+ boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED;
+
+ if(manageCredit)
+ {
+ new FailoverNoopSupport<>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+ BasicQosBody basicQosBody =
+ getProtocolHandler().getMethodRegistry()
+ .createBasicQosBody(0, getPrefetch(), false);
+
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
+ BasicQosOkBody.class);
+ return null;
+ }
+ }, getProtocolHandler().getConnection()).execute();
+ }
}
+ protected void updateCurrentPrefetch(int delta)
+ {
+ _currentPrefetch.addAndGet(delta);
+ }
+
+
+
public DestinationCache<AMQQueue> getQueueDestinationCache()
{
return _queueDestinationCache;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 459030a10d..6718121e6b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,4 +167,65 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
}
+ @Override
+ public Message receive(final long l) throws JMSException
+ {
+ int acknowledgeMode = getSession().getAcknowledgeMode();
+ boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+ boolean creditModified = false;
+ try
+ {
+ if (manageCredit)
+ {
+ creditModified = getSession().ensureCreditForReceive();
+ }
+ Message message = super.receive(l);
+ if (creditModified && message == null)
+ {
+ getSession().reduceCreditAfterAcknowledge();
+ }
+ if (manageCredit && message != null)
+ {
+ getSession().updateCurrentPrefetch(1);
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ @Override
+ public Message receiveNoWait() throws JMSException
+ {
+ int acknowledgeMode = getSession().getAcknowledgeMode();
+ boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+ boolean creditModified = false;
+ try
+ {
+ if (manageCredit)
+ {
+ creditModified = getSession().ensureCreditForReceive();
+ if (creditModified)
+ {
+ getSession().sync();
+ }
+ }
+ Message message = super.receiveNoWait();
+ if (creditModified && message == null)
+ {
+ getSession().reduceCreditAfterAcknowledge();
+ }
+ if (manageCredit && message != null)
+ {
+ getSession().updateCurrentPrefetch(1);
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
}