summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-08-13 16:19:28 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-08-13 16:19:28 +0000
commitc700093f4294bb1bcda42f3fc1982dcc57dc44da (patch)
tree5fc93a03fed2274513e7da3123258d12f58ffbcd
parentcf96b23f687c379bd71f465c837379b0966c2184 (diff)
downloadqpid-python-c700093f4294bb1bcda42f3fc1982dcc57dc44da.tar.gz
QPID-2657: Correct handling of sync on 0-10 client session for exceptions
AMQSession_0_10 is modified to contain a pair of get/set methods for the current exception, using the set method to post the exception to the listener. The sync method will now throw an exception if one has occurred and all other methods that used to call sync()/getCurrentException() can just call sync(0 and get the expected behaviour. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@985262 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java123
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java19
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java18
9 files changed, 147 insertions, 93 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index e71782b116..8c7b374791 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -105,11 +105,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
ExecutionException ex = new ExecutionException();
- ex.setErrorCode(ExecutionErrorCode.get(cause.getCode()));
+ ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
+ try
+ {
+ code = ExecutionErrorCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore, already set to INTERNAL_ERROR
+ }
+ ex.setErrorCode(code);
ex.setDescription(message);
((ServerSession)session).invoke(ex);
((ServerSession)session).close();
}
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index d24ad46512..430a4bd9e9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (!nowait)
{
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
@@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
flushTask = null;
}
flushAcknowledgments();
- getQpidSession().sync();
- getQpidSession().close();
- getCurrentException();
+ try
+ {
+ getQpidSession().sync();
+ getQpidSession().close();
+ }
+ catch (SessionException se)
+ {
+ setCurrentException(se);
+ }
+
+ AMQException amqe = getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
@@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().setAutoSync(false);
}
// We need to sync so that we get notify of an error.
- getCurrentException();
+ sync();
}
/**
@@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
autoDelete ? Option.AUTO_DELETE : Option.NONE,
exclusive ? Option.EXCLUSIVE : Option.NONE);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
/**
@@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
@@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
boolean isTopic;
-
+
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
}
-
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
@@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
-
+
if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
{
// set the flow
@@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (!nowait)
{
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
-
+
private long getCapacity(AMQDestination destination)
{
long capacity = 0;
@@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// We need to sync so that we get notify of an error.
if (!nowait)
{
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
@@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
queueName = amqd.getAMQQueueName();
}
-
+
if (amqd.getDestSyntax() == DestSyntax.BURL)
{
Map<String,Object> arguments = new HashMap<String,Object>();
@@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
arguments.put("no-local", true);
}
-
+
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
@@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
-
+
// passive --> false
if (!nowait)
{
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
return queueName;
}
@@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// ifEmpty --> false
// ifUnused --> false
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
/**
@@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
@@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
getQpidSession().txRollback();
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
//------ Private methods
@@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Get the latest thrown exception.
*
- * @throws org.apache.qpid.AMQException get the latest thrown error.
+ * @throws SessionException get the latest thrown error.
*/
- public void getCurrentException() throws AMQException
+ public AMQException getCurrentException()
{
+ AMQException amqe = null;
synchronized (_currentExceptionLock)
{
if (_currentException != null)
{
- AMQException amqe = _currentException;
+ amqe = _currentException;
_currentException = null;
- throw amqe;
}
}
+ return amqe;
}
public void opened(Session ssn) {}
@@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void exception(Session ssn, SessionException exc)
{
- synchronized (_currentExceptionLock)
- {
- ExecutionException ee = exc.getException();
- int code;
- if (ee == null)
- {
- code = AMQConstant.INTERNAL_ERROR.getCode();
- }
- else
- {
- code = ee.getErrorCode().getValue();
- }
- AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause());
- _connection.exceptionReceived(amqe);
- _currentException = amqe;
- }
+ setCurrentException(exc);
}
public void closed(Session ssn) {}
@@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
-
+
public void sync() throws AMQException
{
- _qpidSession.sync();
- getCurrentException();
+ try
+ {
+ getQpidSession().sync();
+ }
+ catch (SessionException se)
+ {
+ setCurrentException(se);
+ }
+
+ AMQException amqe = getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
+ }
+
+ public void setCurrentException(SessionException se)
+ {
+ synchronized (_currentExceptionLock)
+ {
+ ExecutionException ee = se.getException();
+ int code = AMQConstant.INTERNAL_ERROR.getCode();
+ if (ee != null)
+ {
+ code = ee.getErrorCode().getValue();
+ }
+ AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+
+ _connection.exceptionReceived(amqe);
+
+ _currentException = amqe;
+ }
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index a942d808a9..eddaa1a6bb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
@Override void sendCancel() throws AMQException
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
- ((AMQSession_0_10) getSession()).getQpidSession().sync();
- // confirm cancel
- getSession().confirmConsumerCancelled(getConsumerTag());
- ((AMQSession_0_10) getSession()).getCurrentException();
+ _0_10session.getQpidSession().messageCancel(getConsumerTagString());
+ try
+ {
+ _0_10session.getQpidSession().sync();
+ getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel
+ }
+ catch (SessionException se)
+ {
+ _0_10session.setCurrentException(se);
+ }
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
@Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
-
super.notifyMessage(messageFrame);
}
@@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_0_10session.messageAcknowledge
(ranges,
_acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- _0_10session.getCurrentException();
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
}
@@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
_0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.getCurrentException();
+ _0_10session.sync();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index df59be25d0..14e1601993 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -266,7 +266,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
return _destination;
}
- public void close() throws JMSException
+ public void close()
{
_closed.set(true);
_session.deregisterProducer(_producerId);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index ed6f00a51c..13b8e461d4 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -56,7 +56,7 @@ public class Connection extends ConnectionInvoker
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
- private static final Logger log = Logger.get(Connection.class);
+ protected static final Logger log = Logger.get(Connection.class);
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 29389df99a..88dd2d6afa 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -99,5 +99,4 @@ public abstract class ConnectionDelegate
ssn.closed();
}
}
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 5e40527c2f..9b84ff422b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -280,7 +280,7 @@ public class Session extends SessionInvoker
{
if (m != null)
{
- System.out.println(m);
+ log.debug("%s", m);
}
}
}
@@ -732,8 +732,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
- log.debug("%s waiting for[%d]: %d, %s", this, point,
- maxComplete, commands);
+ log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
w.await();
}
@@ -741,16 +740,23 @@ public class Session extends SessionInvoker
{
if (state != CLOSED)
{
- throw new SessionException
- (String.format
- ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point));
+ throw new SessionException(
+ String.format("timed out waiting for sync: complete = %s, point = %s",
+ maxComplete, point));
+ }
+ else
+ {
+ ExecutionException ee = getException();
+ if (ee != null)
+ {
+ throw new SessionException(ee);
+ }
}
}
}
}
- private Map<Integer,ResultFuture<?>> results =
- new HashMap<Integer,ResultFuture<?>>();
+ private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>();
private ExecutionException exception = null;
void result(int command, Struct result)
@@ -769,9 +775,8 @@ public class Session extends SessionInvoker
{
if (exception != null)
{
- throw new IllegalStateException
- (String.format
- ("too many exceptions: %s, %s", exception, exc));
+ throw new IllegalStateException(
+ String.format("too many exceptions: %s, %s", exception, exc));
}
exception = exc;
}
@@ -849,8 +854,8 @@ public class Session extends SessionInvoker
}
else
{
- throw new SessionException
- (String.format("%s timed out waiting for result: %s",
+ throw new SessionException(
+ String.format("%s timed out waiting for result: %s",
Session.this, this));
}
}
@@ -961,5 +966,4 @@ public class Session extends SessionInvoker
{
return String.format("ssn:%s", name);
}
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 15539c1d07..5d8e4d5565 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -33,11 +33,15 @@ public class SessionDelegate
extends MethodDelegate<Session>
implements ProtocolDelegate<Session>
{
- private static final Logger log = Logger.get(SessionDelegate.class);
+ protected static final Logger log = Logger.get(SessionDelegate.class);
- public void init(Session ssn, ProtocolHeader hdr) { }
+ public void init(Session ssn, ProtocolHeader hdr)
+ {
+ log.warn("INIT: [%s] %s", ssn, hdr);
+ }
- public void control(Session ssn, Method method) {
+ public void control(Session ssn, Method method)
+ {
method.dispatch(ssn, this);
}
@@ -50,7 +54,10 @@ public class SessionDelegate
}
}
- public void error(Session ssn, ProtocolError error) { }
+ public void error(Session ssn, ProtocolError error)
+ {
+ log.warn("ERROR: [%s] %s", ssn, error);
+ }
public void handle(Session ssn, Method method)
{
@@ -195,9 +202,11 @@ public class SessionDelegate
public void closed(Session session)
{
+ log.warn("CLOSED: [%s]", session);
}
public void detached(Session session)
- {
+ {
+ log.warn("DETACHED: [%s]", session);
}
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index bdd3a0c93b..375a326654 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -424,10 +424,6 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
}
}
- /**
- * The 0-10 {@code executionSync} command should set the exception status in the session,
- * so that the client session object can then throw it as an {@link AMQException}.
- */
public void testExecutionExceptionSync() throws Exception
{
startServer();
@@ -436,11 +432,15 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession();
send(ssn, "EXCP 0", true);
- ExecutionException before = ssn.getException();
- assertNull("There should not be an exception stored in the session", before);
- ssn.sync();
- ExecutionException after = ssn.getException();
- assertNotNull("There should be an exception stored in the session", after);
+ try
+ {
+ ssn.sync();
+ fail("this should have failed");
+ }
+ catch (SessionException exc)
+ {
+ assertNotNull(exc.getException());
+ }
}
}