diff options
Diffstat (limited to 'qpid/java/client')
6 files changed, 148 insertions, 22 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java index 7e956698d1..109a72bcbf 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java @@ -42,7 +42,7 @@ public class Hello { } - public static void main(String[] args) + public static void main(String[] args) { Hello hello = new Hello(); hello.runTest(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 66cade18a4..35582d92b7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -21,16 +21,20 @@ package org.apache.qpid.client; import java.net.ConnectException; +import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.XASession; +import org.apache.qpid.transport.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +71,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; + private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, + Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, + ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); private boolean _messageCompressionSupported; private boolean _addrSyntaxSupported; private boolean _confirmedPublishSupported; @@ -136,7 +143,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), + ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler())); + + NetworkConnection network = transport.connect(settings, monitoringReceiver, _conn.getProtocolHandler()); try @@ -171,6 +180,19 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate network.close(); throw e; } + finally + { + // await the receiver to finish its execution (and so the IO threads too) + if (!_conn.isConnected()) + { + boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout); + if (!closedWithinTimeout) + { + _logger.warn("Timed-out waiting for receiver for connection to " + + brokerDetail + " to be closed."); + } + } + } } @@ -503,4 +525,60 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _confirmedPublishNonTransactionalSupported; } + + + private static class ReceiverClosedWaiter implements Receiver<ByteBuffer> + { + private final CountDownLatch _closedWatcher; + private final Receiver<ByteBuffer> _receiver; + + public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver) + { + _receiver = receiver; + _closedWatcher = new CountDownLatch(1); + } + + @Override + public void received(ByteBuffer msg) + { + _receiver.received(msg); + } + + @Override + public void exception(Throwable t) + { + _receiver.exception(t); + } + + @Override + public void closed() + { + try + { + _receiver.closed(); + } + finally + { + _closedWatcher.countDown(); + } + } + + public boolean awaitClose(long timeout) + { + try + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Waiting " + timeout + "ms for receiver to be closed"); + } + + return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return _closedWatcher.getCount() == 0; + } + } + }; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index d86a2739f2..bb0f0d9b13 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -824,12 +824,13 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe throws AMQException, FailoverException { _currentPrefetch.set(0); - BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); - - // todo send low water mark when protocol allows. - // todo Be aware of possible changes to parameter order as versions change. - getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + if(messagePrefetch > 0 || sizePrefetch > 0) + { + BasicQosBody basicQosBody = + getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + } } @@ -842,13 +843,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public Boolean execute() throws AMQException, FailoverException { int currentPrefetch = _currentPrefetch.get(); - if (currentPrefetch >= getPrefetch()) + if (currentPrefetch >= getPrefetch() && getPrefetch() >= 0) { BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry() .createBasicQosBody(0, currentPrefetch + 1, false); getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + if(currentPrefetch == 0 && !isSuspended()) + { + sendSuspendChannel(false); + } _creditChanged.set(true); return true; } @@ -863,8 +868,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe protected void reduceCreditAfterAcknowledge() throws AMQException { - int acknowledgeMode = getAcknowledgeMode(); - boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED; + boolean manageCredit = isManagingCredit(); if(manageCredit && _creditChanged.compareAndSet(true,false)) { @@ -873,18 +877,40 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { public Void execute() throws AMQException, FailoverException { - BasicQosBody basicQosBody = - getProtocolHandler().getMethodRegistry() - .createBasicQosBody(0, getPrefetch(), false); + int prefetch = getPrefetch(); + if(prefetch == 0) + { + sendSuspendChannel(true); + } + else + { + BasicQosBody basicQosBody = + getProtocolHandler().getMethodRegistry() + .createBasicQosBody(0, prefetch == -1 ? 0 : prefetch, false); - getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), - BasicQosOkBody.class); + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), + BasicQosOkBody.class); + } return null; } }, getProtocolHandler().getConnection()).execute(); } } + protected void reduceCreditInPostDeliver() + { + int acknowledgeMode = getAcknowledgeMode(); + boolean manageCredit = (acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE) && getPrefetch() == 0; + + if(manageCredit && _creditChanged.compareAndSet(true,false)) + { + ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(false); + AMQFrame channelFlowFrame = body.generateFrame(getChannelId()); + getProtocolHandler().writeFrame(channelFlowFrame, true); + } + } + + protected void updateCurrentPrefetch(int delta) { _currentPrefetch.addAndGet(delta); @@ -1414,6 +1440,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } + boolean isManagingCredit() + { + int acknowledgeMode = getAcknowledgeMode(); + return acknowledgeMode == CLIENT_ACKNOWLEDGE + || acknowledgeMode == SESSION_TRANSACTED + || ((acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE) && getPrefetch() == 0); + } + + public boolean isFlowBlocked() { synchronized (_flowControl) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3d0e972ca2..b1e606b8e9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -645,6 +645,12 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _receivingThread.interrupt(); } + + + if(!(isBrowseOnly() || getSession().isClosing())) + { + rollback(); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 6718121e6b..1d7bb6087a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -170,8 +170,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe @Override public Message receive(final long l) throws JMSException { - int acknowledgeMode = getSession().getAcknowledgeMode(); - boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED; + boolean manageCredit = getSession().isManagingCredit(); boolean creditModified = false; try { @@ -184,7 +183,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { getSession().reduceCreditAfterAcknowledge(); } - if (manageCredit && message != null) + if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE + || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null) { getSession().updateCurrentPrefetch(1); } @@ -199,8 +199,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe @Override public Message receiveNoWait() throws JMSException { - int acknowledgeMode = getSession().getAcknowledgeMode(); - boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED; + boolean manageCredit = getSession().isManagingCredit(); boolean creditModified = false; try { @@ -217,7 +216,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { getSession().reduceCreditAfterAcknowledge(); } - if (manageCredit && message != null) + if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE + || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null) { getSession().updateCurrentPrefetch(1); } @@ -228,4 +228,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe throw new JMSAMQException(e); } } + + + void postDeliver(AbstractJMSMessage msg) + { + getSession().reduceCreditInPostDeliver(); + super.postDeliver(msg); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 0fe2ce232e..d5e3027601 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -240,7 +240,7 @@ public class AMQProtocolHandler implements ProtocolEngine } catch (Exception e) { - _logger.warn("Exception occured on closing the sender", e); + _logger.warn("Exception occurred on closing the sender", e); } if (_connection.failoverAllowed()) { |
