diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-09-08 15:37:20 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-09-08 15:37:20 +0000 |
| commit | 7f5b8221bb212a12d588785c1881dabd3538ccdf (patch) | |
| tree | 0b5aaf6512994b1fa5107fd834d1f2b621cd7057 /qpid/java/client | |
| parent | 76b0ca0a375198db3bd5f73687c0b24f713b1143 (diff) | |
| download | qpid-python-7f5b8221bb212a12d588785c1881dabd3538ccdf.tar.gz | |
QPID-6088 : [Java Client] AMQP 0-8/8/9-1 prefetch should auto expand when receive is called in a situation where the prefetch buffer is full
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1623422 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
4 files changed, 151 insertions, 67 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2a91ff3ce2..6b3b4601d9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -67,13 +67,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.ConnectionListener; @@ -696,36 +690,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException, FailoverException - { - - ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); - - // TODO: Be aware of possible changes to parameter order as versions change. - - _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); - - BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false); - - // todo send low water mark when protocol allows. - // todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class); - - if (transacted) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Issuing TxSelect for " + channelId); - } - - TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody(); - - // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); - } - } - public void setFailoverPolicy(FailoverPolicy policy) { _failoverPolicy = policy; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 9650ad76fb..176eb5d0c4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -44,8 +44,6 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.framing.FieldTable; @@ -55,6 +53,7 @@ import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.Session; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.network.NetworkConnection; @@ -182,10 +181,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); } - return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( - new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() + return new FailoverRetrySupport<Session, JMSException>( + new FailoverProtectedOperation<Session, JMSException>() { - public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException + public Session execute() throws JMSException, FailoverException { int channelId = _conn.getNextChannelID(); @@ -197,7 +196,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate // We must create the session and register it before actually sending the frame to the server to // open it, so that there is no window where we could receive data on the channel and not be set // up to handle it appropriately. - AMQSession session = + AMQSession_0_8 session = new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow); _conn.registerSession(channelId, session); @@ -205,7 +204,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate boolean success = false; try { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + createChannelOverWire(channelId, transacted); + session.setPrefetchLimits(prefetchHigh, 0); success = true; } catch (AMQException e) @@ -252,18 +252,12 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); } - private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) + private void createChannelOverWire(int channelId, boolean transacted) throws AMQException, FailoverException { ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); - // TODO: Be aware of possible changes to parameter order as versions change. _conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); - // todo send low water mark when protocol allows. - // todo Be aware of possible changes to parameter order as versions change. - BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); - _conn.getProtocolHandler().syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); - if (transacted) { if (_logger.isDebugEnabled()) @@ -292,7 +286,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? for (Iterator it = sessions.iterator(); it.hasNext();) { - AMQSession s = (AMQSession) it.next(); + AMQSession_0_8 s = (AMQSession_0_8) it.next(); // reset the flow control flag // on opening channel, broker sends flow blocked if virtual host is blocked @@ -300,7 +294,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate // that's why we need to reset the flow control flag s.setFlowControl(true); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted()); - + s.setPrefetchLimits(s.getDefaultPrefetchHigh(), 0); s.resubscribe(); } } @@ -310,7 +304,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { try { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + createChannelOverWire(channelId, transacted); } catch (AMQException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index f3db245e45..693358c3ae 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -32,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Destination; import javax.jms.JMSException; @@ -93,6 +94,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe */ private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + private AtomicInteger _currentPrefetch = new AtomicInteger(); /** Flow control */ private FlowControlIndicator _flowControl = new FlowControlIndicator(); @@ -112,7 +114,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { - super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark); + super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark); + _currentPrefetch.set(0); } /** @@ -140,6 +143,14 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe protected void acknowledgeImpl() throws JMSException { boolean syncRequired = false; + try + { + reduceCreditAfterAcknowledge(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } while (true) { Long tag = getUnacknowledgedMessageTags().poll(); @@ -151,7 +162,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe acknowledgeMessage(tag, false); syncRequired = true; } - + _currentPrefetch.set(0); try { if (syncRequired && _syncAfterClientAck) @@ -262,8 +273,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } final AMQProtocolHandler handler = getProtocolHandler(); - + reduceCreditAfterAcknowledge(); handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class); + _currentPrefetch.set(0); } public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException, @@ -817,25 +829,77 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class); } - public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException + public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) + throws AMQException, FailoverException { - new FailoverRetrySupport<Object, AMQException>( - new FailoverProtectedOperation<Object, AMQException>() + _currentPrefetch.set(0); + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + + } + + + + protected boolean ensureCreditForReceive() throws AMQException + { + return new FailoverNoopSupport<>( + new FailoverProtectedOperation<Boolean, AMQException>() { - public Object execute() throws AMQException, FailoverException + public Boolean execute() throws AMQException, FailoverException { + int currentPrefetch = _currentPrefetch.get(); + if (currentPrefetch >= getPrefetch()) + { + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry() + .createBasicQosBody(0, currentPrefetch + 1, false); - BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), + BasicQosOkBody.class); + return true; + } + else + { + return false; + } + } + }, getProtocolHandler().getConnection()).execute(); - // todo send low water mark when protocol allows. - // todo Be aware of possible changes to parameter order as versions change. - getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + } - return null; - } - }, getAMQConnection()).execute(); + protected void reduceCreditAfterAcknowledge() throws AMQException + { + int acknowledgeMode = getAcknowledgeMode(); + boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED; + + if(manageCredit) + { + new FailoverNoopSupport<>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + BasicQosBody basicQosBody = + getProtocolHandler().getMethodRegistry() + .createBasicQosBody(0, getPrefetch(), false); + + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), + BasicQosOkBody.class); + return null; + } + }, getProtocolHandler().getConnection()).execute(); + } } + protected void updateCurrentPrefetch(int delta) + { + _currentPrefetch.addAndGet(delta); + } + + + public DestinationCache<AMQQueue> getQueueDestinationCache() { return _queueDestinationCache; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 459030a10d..6718121e6b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,4 +167,65 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe } + @Override + public Message receive(final long l) throws JMSException + { + int acknowledgeMode = getSession().getAcknowledgeMode(); + boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED; + boolean creditModified = false; + try + { + if (manageCredit) + { + creditModified = getSession().ensureCreditForReceive(); + } + Message message = super.receive(l); + if (creditModified && message == null) + { + getSession().reduceCreditAfterAcknowledge(); + } + if (manageCredit && message != null) + { + getSession().updateCurrentPrefetch(1); + } + return message; + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + @Override + public Message receiveNoWait() throws JMSException + { + int acknowledgeMode = getSession().getAcknowledgeMode(); + boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED; + boolean creditModified = false; + try + { + if (manageCredit) + { + creditModified = getSession().ensureCreditForReceive(); + if (creditModified) + { + getSession().sync(); + } + } + Message message = super.receiveNoWait(); + if (creditModified && message == null) + { + getSession().reduceCreditAfterAcknowledge(); + } + if (manageCredit && message != null) + { + getSession().updateCurrentPrefetch(1); + } + return message; + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } } |
