summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java80
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java61
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java2
6 files changed, 148 insertions, 22 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
index 7e956698d1..109a72bcbf 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
@@ -42,7 +42,7 @@ public class Hello
{
}
- public static void main(String[] args)
+ public static void main(String[] args)
{
Hello hello = new Hello();
hello.runTest();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 66cade18a4..35582d92b7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -21,16 +21,20 @@
package org.apache.qpid.client;
import java.net.ConnectException;
+import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.XASession;
+import org.apache.qpid.transport.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +71,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
+ private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+ Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+ ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
private boolean _messageCompressionSupported;
private boolean _addrSyntaxSupported;
private boolean _confirmedPublishSupported;
@@ -136,7 +143,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
- NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
+ ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler()));
+
+ NetworkConnection network = transport.connect(settings, monitoringReceiver,
_conn.getProtocolHandler());
try
@@ -171,6 +180,19 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
network.close();
throw e;
}
+ finally
+ {
+ // await the receiver to finish its execution (and so the IO threads too)
+ if (!_conn.isConnected())
+ {
+ boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout);
+ if (!closedWithinTimeout)
+ {
+ _logger.warn("Timed-out waiting for receiver for connection to "
+ + brokerDetail + " to be closed.");
+ }
+ }
+ }
}
@@ -503,4 +525,60 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return _confirmedPublishNonTransactionalSupported;
}
+
+
+ private static class ReceiverClosedWaiter implements Receiver<ByteBuffer>
+ {
+ private final CountDownLatch _closedWatcher;
+ private final Receiver<ByteBuffer> _receiver;
+
+ public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver)
+ {
+ _receiver = receiver;
+ _closedWatcher = new CountDownLatch(1);
+ }
+
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ _receiver.received(msg);
+ }
+
+ @Override
+ public void exception(Throwable t)
+ {
+ _receiver.exception(t);
+ }
+
+ @Override
+ public void closed()
+ {
+ try
+ {
+ _receiver.closed();
+ }
+ finally
+ {
+ _closedWatcher.countDown();
+ }
+ }
+
+ public boolean awaitClose(long timeout)
+ {
+ try
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Waiting " + timeout + "ms for receiver to be closed");
+ }
+
+ return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ return _closedWatcher.getCount() == 0;
+ }
+ }
+ };
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index d86a2739f2..bb0f0d9b13 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -824,12 +824,13 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
throws AMQException, FailoverException
{
_currentPrefetch.set(0);
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
-
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+ if(messagePrefetch > 0 || sizePrefetch > 0)
+ {
+ BasicQosBody basicQosBody =
+ getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+ }
}
@@ -842,13 +843,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public Boolean execute() throws AMQException, FailoverException
{
int currentPrefetch = _currentPrefetch.get();
- if (currentPrefetch >= getPrefetch())
+ if (currentPrefetch >= getPrefetch() && getPrefetch() >= 0)
{
BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry()
.createBasicQosBody(0, currentPrefetch + 1, false);
getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
BasicQosOkBody.class);
+ if(currentPrefetch == 0 && !isSuspended())
+ {
+ sendSuspendChannel(false);
+ }
_creditChanged.set(true);
return true;
}
@@ -863,8 +868,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
protected void reduceCreditAfterAcknowledge() throws AMQException
{
- int acknowledgeMode = getAcknowledgeMode();
- boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED;
+ boolean manageCredit = isManagingCredit();
if(manageCredit && _creditChanged.compareAndSet(true,false))
{
@@ -873,18 +877,40 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
public Void execute() throws AMQException, FailoverException
{
- BasicQosBody basicQosBody =
- getProtocolHandler().getMethodRegistry()
- .createBasicQosBody(0, getPrefetch(), false);
+ int prefetch = getPrefetch();
+ if(prefetch == 0)
+ {
+ sendSuspendChannel(true);
+ }
+ else
+ {
+ BasicQosBody basicQosBody =
+ getProtocolHandler().getMethodRegistry()
+ .createBasicQosBody(0, prefetch == -1 ? 0 : prefetch, false);
- getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
- BasicQosOkBody.class);
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
+ BasicQosOkBody.class);
+ }
return null;
}
}, getProtocolHandler().getConnection()).execute();
}
}
+ protected void reduceCreditInPostDeliver()
+ {
+ int acknowledgeMode = getAcknowledgeMode();
+ boolean manageCredit = (acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE) && getPrefetch() == 0;
+
+ if(manageCredit && _creditChanged.compareAndSet(true,false))
+ {
+ ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(false);
+ AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
+ getProtocolHandler().writeFrame(channelFlowFrame, true);
+ }
+ }
+
+
protected void updateCurrentPrefetch(int delta)
{
_currentPrefetch.addAndGet(delta);
@@ -1414,6 +1440,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
+ boolean isManagingCredit()
+ {
+ int acknowledgeMode = getAcknowledgeMode();
+ return acknowledgeMode == CLIENT_ACKNOWLEDGE
+ || acknowledgeMode == SESSION_TRANSACTED
+ || ((acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE) && getPrefetch() == 0);
+ }
+
+
public boolean isFlowBlocked()
{
synchronized (_flowControl)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 3d0e972ca2..b1e606b8e9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -645,6 +645,12 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_receivingThread.interrupt();
}
+
+
+ if(!(isBrowseOnly() || getSession().isClosing()))
+ {
+ rollback();
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 6718121e6b..1d7bb6087a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -170,8 +170,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
@Override
public Message receive(final long l) throws JMSException
{
- int acknowledgeMode = getSession().getAcknowledgeMode();
- boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+ boolean manageCredit = getSession().isManagingCredit();
boolean creditModified = false;
try
{
@@ -184,7 +183,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
getSession().reduceCreditAfterAcknowledge();
}
- if (manageCredit && message != null)
+ if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
+ || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
{
getSession().updateCurrentPrefetch(1);
}
@@ -199,8 +199,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
@Override
public Message receiveNoWait() throws JMSException
{
- int acknowledgeMode = getSession().getAcknowledgeMode();
- boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+ boolean manageCredit = getSession().isManagingCredit();
boolean creditModified = false;
try
{
@@ -217,7 +216,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
getSession().reduceCreditAfterAcknowledge();
}
- if (manageCredit && message != null)
+ if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
+ || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
{
getSession().updateCurrentPrefetch(1);
}
@@ -228,4 +228,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
throw new JMSAMQException(e);
}
}
+
+
+ void postDeliver(AbstractJMSMessage msg)
+ {
+ getSession().reduceCreditInPostDeliver();
+ super.postDeliver(msg);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 0fe2ce232e..d5e3027601 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -240,7 +240,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
catch (Exception e)
{
- _logger.warn("Exception occured on closing the sender", e);
+ _logger.warn("Exception occurred on closing the sender", e);
}
if (_connection.failoverAllowed())
{