summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-08-13 16:19:28 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-08-13 16:19:28 +0000
commitc700093f4294bb1bcda42f3fc1982dcc57dc44da (patch)
tree5fc93a03fed2274513e7da3123258d12f58ffbcd /java/client/src
parentcf96b23f687c379bd71f465c837379b0966c2184 (diff)
downloadqpid-python-c700093f4294bb1bcda42f3fc1982dcc57dc44da.tar.gz
QPID-2657: Correct handling of sync on 0-10 client session for exceptions
AMQSession_0_10 is modified to contain a pair of get/set methods for the current exception, using the set method to post the exception to the listener. The sync method will now throw an exception if one has occurred and all other methods that used to call sync()/getCurrentException() can just call sync(0 and get the expected behaviour. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@985262 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java123
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java2
3 files changed, 95 insertions, 61 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index d24ad46512..430a4bd9e9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (!nowait)
{
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
@@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
flushTask = null;
}
flushAcknowledgments();
- getQpidSession().sync();
- getQpidSession().close();
- getCurrentException();
+ try
+ {
+ getQpidSession().sync();
+ getQpidSession().close();
+ }
+ catch (SessionException se)
+ {
+ setCurrentException(se);
+ }
+
+ AMQException amqe = getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
@@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().setAutoSync(false);
}
// We need to sync so that we get notify of an error.
- getCurrentException();
+ sync();
}
/**
@@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
autoDelete ? Option.AUTO_DELETE : Option.NONE,
exclusive ? Option.EXCLUSIVE : Option.NONE);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
/**
@@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
@@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
boolean isTopic;
-
+
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
}
-
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
@@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
-
+
if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
{
// set the flow
@@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (!nowait)
{
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
-
+
private long getCapacity(AMQDestination destination)
{
long capacity = 0;
@@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// We need to sync so that we get notify of an error.
if (!nowait)
{
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
@@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
queueName = amqd.getAMQQueueName();
}
-
+
if (amqd.getDestSyntax() == DestSyntax.BURL)
{
Map<String,Object> arguments = new HashMap<String,Object>();
@@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
arguments.put("no-local", true);
}
-
+
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
@@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
-
+
// passive --> false
if (!nowait)
{
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
return queueName;
}
@@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// ifEmpty --> false
// ifUnused --> false
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
/**
@@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
@@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
getQpidSession().txRollback();
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
//------ Private methods
@@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Get the latest thrown exception.
*
- * @throws org.apache.qpid.AMQException get the latest thrown error.
+ * @throws SessionException get the latest thrown error.
*/
- public void getCurrentException() throws AMQException
+ public AMQException getCurrentException()
{
+ AMQException amqe = null;
synchronized (_currentExceptionLock)
{
if (_currentException != null)
{
- AMQException amqe = _currentException;
+ amqe = _currentException;
_currentException = null;
- throw amqe;
}
}
+ return amqe;
}
public void opened(Session ssn) {}
@@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void exception(Session ssn, SessionException exc)
{
- synchronized (_currentExceptionLock)
- {
- ExecutionException ee = exc.getException();
- int code;
- if (ee == null)
- {
- code = AMQConstant.INTERNAL_ERROR.getCode();
- }
- else
- {
- code = ee.getErrorCode().getValue();
- }
- AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause());
- _connection.exceptionReceived(amqe);
- _currentException = amqe;
- }
+ setCurrentException(exc);
}
public void closed(Session ssn) {}
@@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
-
+
public void sync() throws AMQException
{
- _qpidSession.sync();
- getCurrentException();
+ try
+ {
+ getQpidSession().sync();
+ }
+ catch (SessionException se)
+ {
+ setCurrentException(se);
+ }
+
+ AMQException amqe = getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
+ }
+
+ public void setCurrentException(SessionException se)
+ {
+ synchronized (_currentExceptionLock)
+ {
+ ExecutionException ee = se.getException();
+ int code = AMQConstant.INTERNAL_ERROR.getCode();
+ if (ee != null)
+ {
+ code = ee.getErrorCode().getValue();
+ }
+ AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+
+ _connection.exceptionReceived(amqe);
+
+ _currentException = amqe;
+ }
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index a942d808a9..eddaa1a6bb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
@Override void sendCancel() throws AMQException
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
- ((AMQSession_0_10) getSession()).getQpidSession().sync();
- // confirm cancel
- getSession().confirmConsumerCancelled(getConsumerTag());
- ((AMQSession_0_10) getSession()).getCurrentException();
+ _0_10session.getQpidSession().messageCancel(getConsumerTagString());
+ try
+ {
+ _0_10session.getQpidSession().sync();
+ getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel
+ }
+ catch (SessionException se)
+ {
+ _0_10session.setCurrentException(se);
+ }
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
@Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
-
super.notifyMessage(messageFrame);
}
@@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_0_10session.messageAcknowledge
(ranges,
_acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- _0_10session.getCurrentException();
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
}
@@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
_0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.getCurrentException();
+ _0_10session.sync();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index df59be25d0..14e1601993 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -266,7 +266,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
return _destination;
}
- public void close() throws JMSException
+ public void close()
{
_closed.set(true);
_session.deregisterProducer(_producerId);