summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java77
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java31
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java4
-rw-r--r--java/common/templates/model/ProtocolVersionListClass.vm7
5 files changed, 86 insertions, 35 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index 8e9ee3720c..d9f95ecb8e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -235,7 +235,7 @@ MST-1006 = Recovery Complete[ : {0}]
#Connection
# 0 - Client id
# 1 - Protocol Version
-CON-1001 = Open : Client ID {0}[ : Protocol Version : {1}]
+CON-1001 = Open[ : Client ID : {0}][ : Protocol Version : {1}]
CON-1002 = Close
#Channel
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index e46a52f3bf..c84408e680 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -54,6 +54,9 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -139,6 +142,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private final long _sessionID = idGenerator.getAndIncrement();
private AMQPConnectionActor _actor;
+ private LogSubject _logSubject;
public ManagedObject getManagedObject()
{
@@ -156,6 +160,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+ _actor.message(ConnectionMessages.CON_1001(null, null, false, false));
+
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -171,6 +177,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
+ // This is only used by two tests that do provide null values for stateManager
+ // so we can safely remove this and refactor.
public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
AMQStateManager stateManager) throws AMQException
{
@@ -236,42 +244,45 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Frame Received: " + frame);
- }
-
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
+ CurrentActor.set(_actor);
+ try
{
- if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ if (_logger.isDebugEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
+ _logger.debug("Frame Received: " + frame);
}
- else
+
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
{
- if (_logger.isInfoEnabled())
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
{
- _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
}
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
+ }
- closeProtocolSession();
- return;
+ closeProtocolSession();
+ return;
+ }
}
- }
- CurrentActor.set(_actor);
- try
- {
- body.handle(channelId, this);
- }
- catch (AMQException e)
- {
- closeChannel(channelId);
- throw e;
+ try
+ {
+ body.handle(channelId, this);
+ }
+ catch (AMQException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
}
finally
{
@@ -285,6 +296,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
+ // Log incomming protocol negotiation request
+ _actor.message(ConnectionMessages.CON_1001(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true));
+
ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
// This sets the protocol version (and hence framing classes) for this session.
@@ -643,6 +657,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
if (!_closed)
{
_closed = true;
+
+ _actor.message(ConnectionMessages.CON_1002());
if (_virtualHost != null)
{
@@ -770,7 +786,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)
{
- setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
+ String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE);
+ setContextKey(new AMQShortString(clientID));
+
+ // Log the Opening of the connection for this client
+ _actor.message(ConnectionMessages.CON_1001(clientID, _protocolVersion.toString(), true, true));
}
if (_clientProperties.getString(ClientProperties.version.toString()) != null)
@@ -829,6 +849,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_virtualHost = virtualHost;
_actor.virtualHostSelected(this);
+ _logSubject = new ConnectionLogSubject(this);
_virtualHost.getConnectionRegistry().registerConnection(this);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java
index eb76029a5c..d234c88210 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java
@@ -24,12 +24,12 @@ import java.util.List;
public class ConnectionMessagesTest extends AbstractTestMessages
{
- public void testMessage1001_WithProtocolVersion()
+ public void testMessage1001_WithClientIDProtocolVersion()
{
String clientID = "client";
String protocolVersion = "8-0";
- _logMessage = ConnectionMessages.CON_1001(clientID, protocolVersion, true);
+ _logMessage = ConnectionMessages.CON_1001(clientID, protocolVersion, true , true);
List<Object> log = performLog();
String[] expected = {"Open :", "Client ID", clientID,
@@ -38,11 +38,11 @@ public class ConnectionMessagesTest extends AbstractTestMessages
validateLogMessage(log, "CON-1001", expected);
}
- public void testMessage1001_NoProtocolVersion()
+ public void testMessage1001_WithClientIDNoProtocolVersion()
{
String clientID = "client";
- _logMessage = ConnectionMessages.CON_1001(clientID, null, false);
+ _logMessage = ConnectionMessages.CON_1001(clientID, null,true, false);
List<Object> log = performLog();
String[] expected = {"Open :", "Client ID", clientID};
@@ -50,6 +50,29 @@ public class ConnectionMessagesTest extends AbstractTestMessages
validateLogMessage(log, "CON-1001", expected);
}
+ public void testMessage1001_WithNOClientIDProtocolVersion()
+ {
+ String protocolVersion = "8-0";
+
+ _logMessage = ConnectionMessages.CON_1001(null, protocolVersion, false , true);
+ List<Object> log = performLog();
+
+ String[] expected = {"Open", ": Protocol Version :", protocolVersion};
+
+ validateLogMessage(log, "CON-1001", expected);
+ }
+
+ public void testMessage1001_WithNoClientIDNoProtocolVersion()
+ {
+ _logMessage = ConnectionMessages.CON_1001(null, null,false, false);
+ List<Object> log = performLog();
+
+ String[] expected = {"Open"};
+
+ validateLogMessage(log, "CON-1001", expected);
+ }
+
+
public void testMessage1002()
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
index 02cdbdbe6a..8a5b63d18e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
@@ -172,7 +172,7 @@ public abstract class AbstractTestLogSubject extends TestCase
* @param message the message to search
* @param vhost the vhostName to check against
*/
- protected void verifyVirtualHost(String message, VirtualHost vhost)
+ static public void verifyVirtualHost(String message, VirtualHost vhost)
{
String vhostSlice = getSlice("vh", message);
@@ -199,7 +199,7 @@ public abstract class AbstractTestLogSubject extends TestCase
*
* @return the slice if found otherwise null is returned
*/
- protected String getSlice(String sliceID, String message)
+ static public String getSlice(String sliceID, String message)
{
int indexOfSlice = message.indexOf(sliceID + "(");
diff --git a/java/common/templates/model/ProtocolVersionListClass.vm b/java/common/templates/model/ProtocolVersionListClass.vm
index 9ac6adfdf5..d3e943e36f 100644
--- a/java/common/templates/model/ProtocolVersionListClass.vm
+++ b/java/common/templates/model/ProtocolVersionListClass.vm
@@ -41,12 +41,14 @@ public class ProtocolVersion implements Comparable
{
private final byte _majorVersion;
private final byte _minorVersion;
+ private final String _stringFormat;
public ProtocolVersion(byte majorVersion, byte minorVersion)
{
_majorVersion = majorVersion;
_minorVersion = minorVersion;
+ _stringFormat = _majorVersion+"-"+_minorVersion;
}
public byte getMajorVersion()
@@ -59,6 +61,11 @@ public class ProtocolVersion implements Comparable
return _minorVersion;
}
+ public String toString()
+ {
+ return _stringFormat;
+ }
+
public int compareTo(Object o)
{