diff options
Diffstat (limited to 'java')
5 files changed, 86 insertions, 35 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index 8e9ee3720c..d9f95ecb8e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -235,7 +235,7 @@ MST-1006 = Recovery Complete[ : {0}] #Connection # 0 - Client id # 1 - Protocol Version -CON-1001 = Open : Client ID {0}[ : Protocol Version : {1}] +CON-1001 = Open[ : Client ID : {0}][ : Protocol Version : {1}] CON-1002 = Close #Channel diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index e46a52f3bf..c84408e680 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -54,6 +54,9 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -139,6 +142,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private final long _sessionID = idGenerator.getAndIncrement(); private AMQPConnectionActor _actor; + private LogSubject _logSubject; public ManagedObject getManagedObject() { @@ -156,6 +160,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); + try { IoServiceConfig config = session.getServiceConfig(); @@ -171,6 +177,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } + // This is only used by two tests that do provide null values for stateManager + // so we can safely remove this and refactor. public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException { @@ -236,42 +244,45 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Frame Received: " + frame); - } - - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) + CurrentActor.set(_actor); + try { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) + if (_logger.isDebugEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } + _logger.debug("Frame Received: " + frame); } - else + + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) { - if (_logger.isInfoEnabled()) + if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) { - _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); + } } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); + } - closeProtocolSession(); - return; + closeProtocolSession(); + return; + } } - } - CurrentActor.set(_actor); - try - { - body.handle(channelId, this); - } - catch (AMQException e) - { - closeChannel(channelId); - throw e; + try + { + body.handle(channelId, this); + } + catch (AMQException e) + { + closeChannel(channelId); + throw e; + } } finally { @@ -285,6 +296,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); try { + // Log incomming protocol negotiation request + _actor.message(ConnectionMessages.CON_1001(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true)); + ProtocolVersion pv = pi.checkVersion(); // Fails if not correct // This sets the protocol version (and hence framing classes) for this session. @@ -643,6 +657,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (!_closed) { _closed = true; + + _actor.message(ConnectionMessages.CON_1002()); if (_virtualHost != null) { @@ -770,7 +786,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null) { - setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); + String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE); + setContextKey(new AMQShortString(clientID)); + + // Log the Opening of the connection for this client + _actor.message(ConnectionMessages.CON_1001(clientID, _protocolVersion.toString(), true, true)); } if (_clientProperties.getString(ClientProperties.version.toString()) != null) @@ -829,6 +849,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _virtualHost = virtualHost; _actor.virtualHostSelected(this); + _logSubject = new ConnectionLogSubject(this); _virtualHost.getConnectionRegistry().registerConnection(this); diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java index eb76029a5c..d234c88210 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java @@ -24,12 +24,12 @@ import java.util.List; public class ConnectionMessagesTest extends AbstractTestMessages { - public void testMessage1001_WithProtocolVersion() + public void testMessage1001_WithClientIDProtocolVersion() { String clientID = "client"; String protocolVersion = "8-0"; - _logMessage = ConnectionMessages.CON_1001(clientID, protocolVersion, true); + _logMessage = ConnectionMessages.CON_1001(clientID, protocolVersion, true , true); List<Object> log = performLog(); String[] expected = {"Open :", "Client ID", clientID, @@ -38,11 +38,11 @@ public class ConnectionMessagesTest extends AbstractTestMessages validateLogMessage(log, "CON-1001", expected); } - public void testMessage1001_NoProtocolVersion() + public void testMessage1001_WithClientIDNoProtocolVersion() { String clientID = "client"; - _logMessage = ConnectionMessages.CON_1001(clientID, null, false); + _logMessage = ConnectionMessages.CON_1001(clientID, null,true, false); List<Object> log = performLog(); String[] expected = {"Open :", "Client ID", clientID}; @@ -50,6 +50,29 @@ public class ConnectionMessagesTest extends AbstractTestMessages validateLogMessage(log, "CON-1001", expected); } + public void testMessage1001_WithNOClientIDProtocolVersion() + { + String protocolVersion = "8-0"; + + _logMessage = ConnectionMessages.CON_1001(null, protocolVersion, false , true); + List<Object> log = performLog(); + + String[] expected = {"Open", ": Protocol Version :", protocolVersion}; + + validateLogMessage(log, "CON-1001", expected); + } + + public void testMessage1001_WithNoClientIDNoProtocolVersion() + { + _logMessage = ConnectionMessages.CON_1001(null, null,false, false); + List<Object> log = performLog(); + + String[] expected = {"Open"}; + + validateLogMessage(log, "CON-1001", expected); + } + + public void testMessage1002() { diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java index 02cdbdbe6a..8a5b63d18e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java @@ -172,7 +172,7 @@ public abstract class AbstractTestLogSubject extends TestCase * @param message the message to search * @param vhost the vhostName to check against */ - protected void verifyVirtualHost(String message, VirtualHost vhost) + static public void verifyVirtualHost(String message, VirtualHost vhost) { String vhostSlice = getSlice("vh", message); @@ -199,7 +199,7 @@ public abstract class AbstractTestLogSubject extends TestCase * * @return the slice if found otherwise null is returned */ - protected String getSlice(String sliceID, String message) + static public String getSlice(String sliceID, String message) { int indexOfSlice = message.indexOf(sliceID + "("); diff --git a/java/common/templates/model/ProtocolVersionListClass.vm b/java/common/templates/model/ProtocolVersionListClass.vm index 9ac6adfdf5..d3e943e36f 100644 --- a/java/common/templates/model/ProtocolVersionListClass.vm +++ b/java/common/templates/model/ProtocolVersionListClass.vm @@ -41,12 +41,14 @@ public class ProtocolVersion implements Comparable { private final byte _majorVersion; private final byte _minorVersion; + private final String _stringFormat; public ProtocolVersion(byte majorVersion, byte minorVersion) { _majorVersion = majorVersion; _minorVersion = minorVersion; + _stringFormat = _majorVersion+"-"+_minorVersion; } public byte getMajorVersion() @@ -59,6 +61,11 @@ public class ProtocolVersion implements Comparable return _minorVersion; } + public String toString() + { + return _stringFormat; + } + public int compareTo(Object o) { |
