summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-11-06 09:56:56 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-11-06 09:56:56 +0000
commitf04d8a5bdde4c141f6d8ad04149e8c757fedf49a (patch)
tree84fea5d986ec21c670f1f994f8559afe4798137d /qpid/java/client/src/main
parentb32e4e24efed5adfb9cbfe89c020fdce4c9e6999 (diff)
downloadqpid-python-f04d8a5bdde4c141f6d8ad04149e8c757fedf49a.tar.gz
QPID-1431 : Prevent listeners being added to _frameListeners when an exception state has occured due to connection closure. This adds additional synchronisation to the error handling condition. It also removes the need for a CopyOnWriteArrayList that was perhaps the wrong data structure for this list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@711822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java49
1 files changed, 36 insertions, 13 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 116b163b3c..bf32b4df1f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -452,13 +452,16 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void propagateExceptionToFrameListeners(Exception e)
{
- if (!_frameListeners.isEmpty())
+ synchronized (_frameListeners)
{
- final Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener ml = (AMQMethodListener) it.next();
- ml.error(e);
+ final Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener ml = (AMQMethodListener) it.next();
+ ml.error(e);
+ }
}
}
}
@@ -558,18 +561,20 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
+ synchronized (_frameListeners)
{
- //This iterator is safe from the error state as the frame listeners always add before they send so their
- // will be ready and waiting for this response.
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ //This iterator is safe from the error state as the frame listeners always add before they send so their
+ // will be ready and waiting for this response.
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
}
}
-
if (!wasAnyoneInterested)
{
throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
@@ -659,6 +664,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter
throw _lastFailoverException;
}
+ if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED)
+ {
+ Exception e = _stateManager.getLastException();
+ if (e != null)
+ {
+ if (e instanceof AMQException)
+ {
+ AMQException amqe = (AMQException) e;
+
+ amqe.rethrow();
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
+ }
+
_frameListeners.add(listener);
//FIXME: At this point here we should check or before add we should check _stateManager is in an open
// state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255