summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-10-08 14:14:56 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-10-08 14:14:56 +0000
commit9753f5c6b5e2f131776dc3797803a0322bce2616 (patch)
tree583bb1f600dd27b138302a9d1e094631956bdf2e /java/client/src
parentf77e3fc9579aa4041b08c8181cb7b951d04f0002 (diff)
downloadqpid-python-9753f5c6b5e2f131776dc3797803a0322bce2616.tar.gz
fixed issue with transactions
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@582832 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java63
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java6
6 files changed, 57 insertions, 64 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 89ec7a8029..01a04aca88 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1370,12 +1370,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public void sendRollback() throws AMQException, FailoverException
- {
- _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ public abstract void sendRollback() throws AMQException, FailoverException ;
- }
public void run()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 41efb4b586..1595dfe20c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -452,6 +452,14 @@ public class AMQSession_0_10 extends AMQSession
}
+ public void sendRollback() throws AMQException, FailoverException
+ {
+ getQpidSession().txRollback();
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
//------ Private methods
/**
* Access to the underlying Qpid Session
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 8003f7b801..00c4a5365f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -31,32 +31,7 @@ import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -344,4 +319,10 @@ public class AMQSession_0_8 extends AMQSession
return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
}
+
+ public void sendRollback() throws AMQException, FailoverException
+ {
+ _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index c801cf48fe..ca6312e79f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -869,35 +869,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
_logger.debug("Rejecting received messages in _receivedDTs (RQ)");
}
- // rollback received but not committed messages
- while (!_receivedDeliveryTags.isEmpty())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
- .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag);
- }
-
- Long tag = _receivedDeliveryTags.poll();
-
- if (tag != null)
- {
- if (_logger.isTraceEnabled())
- {
- _logger.trace("Rejecting tag from _receivedDTs:" + tag);
- }
-
- _session.rejectMessage(tag, true);
- }
- }
-
- if (!_receivedDeliveryTags.isEmpty())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection");
- }
- }
+ rollbackReceivedMessages();
// rollback pending messages
if (_synchronousQueue.size() > 0)
@@ -944,6 +916,39 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
+ protected void rollbackReceivedMessages()
+ {
+ // rollback received but not committed messages
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
+ .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag);
+ }
+
+ Long tag = _receivedDeliveryTags.poll();
+
+ if (tag != null)
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting tag from _receivedDTs:" + tag);
+ }
+
+ _session.rejectMessage(tag, true);
+ }
+ }
+
+ if (!_receivedDeliveryTags.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection");
+ }
+ }
+ }
+
public String debugIdentity()
{
return String.valueOf(_consumerTag);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 09895b7520..f1b8c2d3e7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -297,6 +297,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
}
+ protected void rollbackReceivedMessages()
+ {
+ // do nothing as the rollback operation will do the job.
+ }
+
/**
* Acquire a message
*
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 1a86975b96..f60d944b42 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -27,8 +27,6 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
import org.apache.qpid.testutil.QpidTestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -247,13 +245,13 @@ public class TransactedTest extends QpidTestCase
{
try
{
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
MessageConsumer consumer = consumerSession.createConsumer(queue3);
- AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+ AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(queue3);