From a81b0113c7384cd8e9f9d0b4eab069718ade6471 Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Sun, 3 Oct 2010 16:00:24 +0000 Subject: QPID-2835 Implement CON Operational Logging on 0-10 Committed patch from SorinS git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1003984 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/logging/actors/GenericActor.java | 2 +- .../qpid/server/protocol/AMQConnectionModel.java | 1 + .../qpid/server/protocol/AMQProtocolEngine.java | 5 ++ .../protocol/MultiVersionProtocolEngine.java | 2 +- .../qpid/server/protocol/ProtocolEngine_0_10.java | 7 ++- .../qpid/server/transport/ServerConnection.java | 59 ++++++++++++---------- .../server/transport/ServerConnectionDelegate.java | 7 ++- .../qpid/server/transport/ServerSession.java | 4 +- 8 files changed, 52 insertions(+), 35 deletions(-) (limited to 'java/broker') diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java b/java/broker/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java index 09db81e7e5..9afc76ce78 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java @@ -72,7 +72,7 @@ public class GenericActor extends AbstractActor { public String toLogString() { - return "[" + subjectMessage + "]"; + return "[" + subjectMessage + "] "; } }, _defaultMessageLogger); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 448c8508a5..bcda385f64 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -35,4 +35,5 @@ public interface AMQConnectionModel */ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException; + public long getConnectionId(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index c55c07a145..5368dfe532 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1151,6 +1151,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return _id; } + public long getConnectionId() + { + return getSessionID(); + } + public String getAddress() { return String.valueOf(getRemoteAddress()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index e894dda341..eb957ee33c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.server.protocol; -import org.apache.log4j.Logger; +import org.apache.log4j.Logger; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; import org.apache.qpid.server.registry.IApplicationRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index f1e79839c9..1fe4ec792e 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -22,12 +22,13 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.transport.ServerConnection; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.registry.IApplicationRegistry; import java.net.SocketAddress; @@ -55,6 +56,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine _networkDriver = networkDriver; _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; + + // FIXME Two log messages to maintain compatinbility with earlier protocol versions + CurrentActor.get().message(ConnectionMessages.OPEN(null, null, false, false)); + CurrentActor.get().message(ConnectionMessages.OPEN(null, "0-10", false, true)); } public void setNetworkDriver(NetworkDriver driver) diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 3c924f3231..a1a7bd119b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -20,38 +20,33 @@ */ package org.apache.qpid.server.transport; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; +import java.text.MessageFormat; + +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.ConnectionConfig; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDetachCode; -import org.apache.qpid.transport.SessionDetach; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.SessionDetached; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.AMQException; - -import java.text.MessageFormat; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.Method; -public class ServerConnection extends Connection implements AMQConnectionModel +public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject { private ConnectionConfig _config; private Runnable _onOpenTask; public ServerConnection() { + CurrentActor.set(GenericActor.getInstance(this)); } @Override @@ -64,9 +59,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel protected void setState(State state) { super.setState(state); - if(state == State.OPEN && _onOpenTask != null) + + if (state == State.OPEN) + { + if (_onOpenTask != null) + { + _onOpenTask.run(); + } + CurrentActor.get().message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true)); + } + + if (state == State.CLOSED) { - _onOpenTask.run(); + CurrentActor.get().message(this, ConnectionMessages.CLOSE()); } } @@ -137,8 +142,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel return " [" + MessageFormat.format(CONNECTION_FORMAT, getConnectionId(), - getAuthorizationID(), - _config.getAddress(), + getClientId(), + getConfig().getAddress(), getVirtualHost().getName()) + "] "; } @@ -147,8 +152,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel return " [" + MessageFormat.format(USER_FORMAT, getConnectionId(), - getAuthorizationID(), - _config.getAddress()) + getClientId(), + getConfig().getAddress()) + "] "; } @@ -156,8 +161,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel { return " [" + MessageFormat.format(SOCKET_FORMAT, - this.getConnectionId(), - _config.getAddress()) + getConnectionId(), + getConfig().getAddress()) + "] "; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index a95f4e5c42..4a304b3e66 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -21,7 +21,9 @@ package org.apache.qpid.server.transport; import org.apache.qpid.transport.*; - +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -71,7 +73,7 @@ public class ServerConnectionDelegate extends ServerDelegate SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry); ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0); - //ssn.setSessionListener(new Echo()); + return ssn; } @@ -112,6 +114,7 @@ public class ServerConnectionDelegate extends ServerDelegate else { sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); + CurrentActor.set(GenericActor.getInstance(sconn)); sconn.setState(Connection.State.OPEN); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 1f4e32a3e0..71add9c097 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -578,9 +578,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public String getClientID() { - //fixme this will only work for 0-10 connections - // In 0-8 there is an explicit ClientID property that is != Principal. - return getPrincipal().getName(); + return getConnection().getClientId(); } public LogSubject getLogSubject() -- cgit v1.2.1