diff options
| author | Keith Wall <kwall@apache.org> | 2012-09-13 20:23:21 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-09-13 20:23:21 +0000 |
| commit | 48ad8d0334be1f009400d8e30dda0ef17d322830 (patch) | |
| tree | 230e564e0ea6f56157513098b0b6f7eb4f76a673 /java/client | |
| parent | 2a9c39a0c669a2de76fecbef5b55e3b46d5278ed (diff) | |
| download | qpid-python-48ad8d0334be1f009400d8e30dda0ef17d322830.tar.gz | |
QPID-4302: 0-8..0-9-1 client should sync after message.acknowledge()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1384512 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 23 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java | 8 |
2 files changed, 21 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index ade7ab8033..ccae5e31e5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -64,6 +64,11 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); + public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack"; + + private final boolean _syncAfterClientAck = + Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true")); + /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked @@ -120,8 +125,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return getProtocolHandler().getProtocolVersion(); } - protected void acknowledgeImpl() + protected void acknowledgeImpl() throws JMSException { + boolean syncRequired = false; while (true) { Long tag = getUnacknowledgedMessageTags().poll(); @@ -131,6 +137,19 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } acknowledgeMessage(tag, false); + syncRequired = true; + } + + try + { + if (syncRequired && _syncAfterClientAck) + { + sync(); + } + } + catch (AMQException a) + { + throw new JMSAMQException("Failed to sync after acknowledge", a); } } @@ -681,7 +700,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe boolean noLocal, boolean noWait) throws AMQException { - throw new UnsupportedOperationException("The new addressing based sytanx is " + throw new UnsupportedOperationException("The new addressing based syntax is " + "not supported for AMQP 0-8/0-9 versions"); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 751066abbc..4ad9069ba0 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -189,14 +189,6 @@ public class TestAMQSession extends AMQSession_0_8 { } - public void handleAddressBasedDestination(AMQDestination dest, - boolean isConsumer, - boolean noWait) throws AMQException - { - throw new UnsupportedOperationException("The new addressing based sytanx is " - + "not supported for AMQP 0-8/0-9 versions"); - } - @Override protected void flushAcknowledgments() { |
