diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-08 14:14:56 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-08 14:14:56 +0000 |
| commit | 9753f5c6b5e2f131776dc3797803a0322bce2616 (patch) | |
| tree | 583bb1f600dd27b138302a9d1e094631956bdf2e /java/client/src | |
| parent | f77e3fc9579aa4041b08c8181cb7b951d04f0002 (diff) | |
| download | qpid-python-9753f5c6b5e2f131776dc3797803a0322bce2616.tar.gz | |
fixed issue with transactions
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@582832 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
6 files changed, 57 insertions, 64 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 89ec7a8029..01a04aca88 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1370,12 +1370,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public void sendRollback() throws AMQException, FailoverException - { - _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + public abstract void sendRollback() throws AMQException, FailoverException ; - } public void run() { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 41efb4b586..1595dfe20c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -452,6 +452,14 @@ public class AMQSession_0_10 extends AMQSession } + public void sendRollback() throws AMQException, FailoverException + { + getQpidSession().txRollback(); + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } + //------ Private methods /** * Access to the underlying Qpid Session diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 8003f7b801..00c4a5365f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -31,32 +31,7 @@ import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRejectBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ExchangeBoundBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.TxCommitBody; -import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -344,4 +319,10 @@ public class AMQSession_0_8 extends AMQSession return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId, this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); } + + public void sendRollback() throws AMQException, FailoverException + { + _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index c801cf48fe..ca6312e79f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -869,35 +869,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me _logger.debug("Rejecting received messages in _receivedDTs (RQ)"); } - // rollback received but not committed messages - while (!_receivedDeliveryTags.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting the messages(" + _receivedDeliveryTags - .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag); - } - - Long tag = _receivedDeliveryTags.poll(); - - if (tag != null) - { - if (_logger.isTraceEnabled()) - { - _logger.trace("Rejecting tag from _receivedDTs:" + tag); - } - - _session.rejectMessage(tag, true); - } - } - - if (!_receivedDeliveryTags.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection"); - } - } + rollbackReceivedMessages(); // rollback pending messages if (_synchronousQueue.size() > 0) @@ -944,6 +916,39 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } + protected void rollbackReceivedMessages() + { + // rollback received but not committed messages + while (!_receivedDeliveryTags.isEmpty()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting the messages(" + _receivedDeliveryTags + .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag); + } + + Long tag = _receivedDeliveryTags.poll(); + + if (tag != null) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting tag from _receivedDTs:" + tag); + } + + _session.rejectMessage(tag, true); + } + } + + if (!_receivedDeliveryTags.isEmpty()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection"); + } + } + } + public String debugIdentity() { return String.valueOf(_consumerTag); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 09895b7520..f1b8c2d3e7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -297,6 +297,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } } + protected void rollbackReceivedMessages() + { + // do nothing as the rollback operation will do the job. + } + /** * Acquire a message * diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 1a86975b96..f60d944b42 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -27,8 +27,6 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Session; import org.apache.qpid.testutil.QpidTestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,13 +245,13 @@ public class TransactedTest extends QpidTestCase { try { - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); - AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); + AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = producerSession.createProducer(queue3); |
