diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-12-05 09:33:27 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-12-05 09:33:27 +0000 |
| commit | 4f3f5b6fb45b3f2f150c8534bc87e50b56ed71ee (patch) | |
| tree | d77ce7da6032086e2a0c7de4b19e9e49dfff7709 /qpid/java/broker-plugins | |
| parent | 24c57727be67f6b94057b135f00a393bc7b95a9c (diff) | |
| download | qpid-python-4f3f5b6fb45b3f2f150c8534bc87e50b56ed71ee.tar.gz | |
QPID-6257: Introduce operational log for connection being dropped by the clients or due to network issue and change the log level for SenderExceptions into INFO/DEBUG
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1643208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
4 files changed, 56 insertions, 35 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index bc463ef59e..8567be37f0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -174,7 +174,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { if(_logClosed.compareAndSet(false, true)) { - getEventLogger().message(this, ConnectionMessages.CLOSE()); + getEventLogger().message(this, isConnectionLost() ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE()); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 8b29c33e52..4212505d75 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -86,6 +86,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; @@ -331,6 +332,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _logger.error("Unexpected protocol version", e); closeProtocolSession(); } + catch (SenderClosedException e) + { + _logger.debug("Sender was closed abruptly, closing network.", e); + closeProtocolSession(); + } + catch (SenderException e) + { + _logger.info("Unexpected exception on send, closing network.", e); + closeProtocolSession(); + } catch (TransportException e) { _logger.error("Unexpected transport exception", e); @@ -766,7 +777,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - public void closeSession() + public void closeSession(final boolean connectionDropped) { if(runningAsSubject()) @@ -782,34 +793,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, finally { _receivedLock.unlock(); + finishClose(connectionDropped); } - if (!_closed) - { - if (_virtualHost != null) - { - _virtualHost.getConnectionRegistry().deregisterConnection(this); - } - - try - { - closeAllChannels(); - } - finally - { - for (Action<? super AMQProtocolEngine> task : _taskList) - { - task.performAction(this); - } - - synchronized (this) - { - _closed = true; - notifyAll(); - } - getEventLogger().message(_logSubject, ConnectionMessages.CLOSE()); - } - } } else { @@ -823,7 +809,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, @Override public Object run() { - closeSession(); + closeSession(connectionDropped); return null; } }); @@ -831,6 +817,41 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } + private void finishClose(boolean connectionDropped) + { + if (!_closed) + { + + try + { + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + closeAllChannels(); + } + finally + { + try + { + for (Action<? super AMQProtocolEngine> task : _taskList) + { + task.performAction(this); + } + } + finally + { + synchronized (this) + { + _closed = true; + notifyAll(); + } + getEventLogger().message(_logSubject, connectionDropped ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE()); + } + } + } + } + private void awaitClosed() { synchronized(this) @@ -898,7 +919,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { markChannelAwaitingCloseOk(channelId); - closeSession(); + closeSession(false); } finally { @@ -1126,7 +1147,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - closeSession(); + closeSession(true); } finally { @@ -1561,7 +1582,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } try { - closeSession(); + closeSession(false); } catch (Exception e) { @@ -1588,7 +1609,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { - closeSession(); + closeSession(false); } catch (Exception e) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index c01a349509..7407890b58 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -231,7 +231,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr //Simulate the Client responding with a CloseOK // should really update the StateManger but we don't have access here // changeState(AMQState.CONNECTION_CLOSED); - ((AMQChannel)session).getConnection().closeSession(); + ((AMQChannel)session).getConnection().closeSession(false); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java index 107e64bee5..c6f7defe56 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java @@ -62,7 +62,7 @@ public class MaxChannelsTest extends QpidTestCase try { _session.getVirtualHost().close(); - _session.closeSession(); + _session.closeSession(false); } finally { |
