diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-11-06 09:56:56 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-11-06 09:56:56 +0000 |
| commit | f04d8a5bdde4c141f6d8ad04149e8c757fedf49a (patch) | |
| tree | 84fea5d986ec21c670f1f994f8559afe4798137d /qpid/java/client/src/main | |
| parent | b32e4e24efed5adfb9cbfe89c020fdce4c9e6999 (diff) | |
| download | qpid-python-f04d8a5bdde4c141f6d8ad04149e8c757fedf49a.tar.gz | |
QPID-1431 : Prevent listeners being added to _frameListeners when an exception state has occured due to connection closure. This adds additional synchronisation to the error handling condition. It also removes the need for a CopyOnWriteArrayList that was perhaps the wrong data structure for this list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@711822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 49 |
1 files changed, 36 insertions, 13 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 116b163b3c..bf32b4df1f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -452,13 +452,16 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToFrameListeners(Exception e) { - if (!_frameListeners.isEmpty()) + synchronized (_frameListeners) { - final Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + if (!_frameListeners.isEmpty()) { - final AMQMethodListener ml = (AMQMethodListener) it.next(); - ml.error(e); + final Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener ml = (AMQMethodListener) it.next(); + ml.error(e); + } } } } @@ -558,18 +561,20 @@ public class AMQProtocolHandler extends IoHandlerAdapter { boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) + synchronized (_frameListeners) { - //This iterator is safe from the error state as the frame listeners always add before they send so their - // will be ready and waiting for this response. - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + //This iterator is safe from the error state as the frame listeners always add before they send so their + // will be ready and waiting for this response. + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } } } - if (!wasAnyoneInterested) { throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" @@ -659,6 +664,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter throw _lastFailoverException; } + if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED) + { + Exception e = _stateManager.getLastException(); + if (e != null) + { + if (e instanceof AMQException) + { + AMQException amqe = (AMQException) e; + + amqe.rethrow(); + } + else + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e); + } + } + } + _frameListeners.add(listener); //FIXME: At this point here we should check or before add we should check _stateManager is in an open // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 |
