diff options
author | Robert Gemmell <robbie@apache.org> | 2011-01-27 11:18:39 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-01-27 11:18:39 +0000 |
commit | 1a1f4659ff6466dc25fa206201d52e3b2fee5a2a (patch) | |
tree | 321ac32a8c528071ad6f3373b89443258e63e7fe /java | |
parent | fab618e011e19610bb9be31037465ff1cc70063b (diff) | |
download | qpid-python-1a1f4659ff6466dc25fa206201d52e3b2fee5a2a.tar.gz |
QPID-3021: set the session/connection actor when the connection recieves new events, ensure the correct thread logs close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1064084 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
7 files changed, 66 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index b1172a880e..ff2a8c959b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -41,7 +41,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void close() { //Set the Actor for Broker Shutdown - CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger)); + CurrentActor.set(new BrokerActor(_rootMessageLogger)); try { super.close(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index e301996113..d8b7c2e80e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; import java.text.MessageFormat; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -39,11 +40,13 @@ import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ProtocolEvent; public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject { private ConnectionConfig _config; private Runnable _onOpenTask; + private AtomicBoolean _logClosed = new AtomicBoolean(false); private LogActor _actor = GenericActor.getInstance(this); public ServerConnection() @@ -73,6 +76,14 @@ public class ServerConnection extends Connection implements AMQConnectionModel, if (state == State.CLOSED) { + logClosed(); + } + } + + protected void logClosed() + { + if(_logClosed.compareAndSet(false, true)) + { CurrentActor.get().message(this, ConnectionMessages.CLOSE()); } } @@ -135,13 +146,36 @@ public class ServerConnection extends Connection implements AMQConnectionModel, ((ServerSession)session).close(); } - public String toLogString() { + @Override + public void received(ProtocolEvent event) + { + ServerSession channel = (ServerSession) getSession(event.getChannel()); + LogActor channelActor = null; + + if (channel != null) + { + channelActor = channel.getLogActor(); + } + + CurrentActor.set(channelActor == null ? _actor : channelActor); + try + { + super.received(event); + } + finally + { + CurrentActor.remove(); + } + } + + public String toLogString() + { boolean hasVirtualHost = (null != this.getVirtualHost()); boolean hasPrincipal = (null != getAuthorizationID()); if (hasPrincipal && hasVirtualHost) { - return " [" + + return "[" + MessageFormat.format(CONNECTION_FORMAT, getConnectionId(), getClientId(), @@ -151,7 +185,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } else if (hasPrincipal) { - return " [" + + return "[" + MessageFormat.format(USER_FORMAT, getConnectionId(), getClientId(), @@ -161,7 +195,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } else { - return " [" + + return "[" + MessageFormat.format(SOCKET_FORMAT, getConnectionId(), getConfig().getAddress()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index a9b7d99503..7ba85ffe14 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -84,7 +84,22 @@ public class ServerConnectionDelegate extends ServerDelegate } - @Override public void connectionOpen(Connection conn, ConnectionOpen open) + @Override + public void connectionClose(Connection conn, ConnectionClose close) + { + try + { + ((ServerConnection) conn).logClosed(); + } + finally + { + super.connectionClose(conn, close); + } + + } + + @Override + public void connectionOpen(Connection conn, ConnectionOpen open) { ServerConnection sconn = (ServerConnection) conn; diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index c53f65f302..540ad3fffd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SessionConfigType; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; @@ -57,7 +58,6 @@ import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.Session.State; import java.lang.ref.WeakReference; import java.security.Principal; @@ -81,6 +81,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo private final UUID _id; private ConnectionConfig _connectionConfig; private long _createTime = System.currentTimeMillis(); + private LogActor _actor = GenericActor.getInstance(this); public static interface MessageDispositionChangeListener { @@ -130,7 +131,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo if (state == State.OPEN) { - GenericActor.getInstance(this).message(ChannelMessages.CREATE()); + _actor.message(ChannelMessages.CREATE()); } } @@ -595,6 +596,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo return getConnection().getClientId(); } + public LogActor getLogActor() + { + return _actor; + } + public LogSubject getLogSubject() { return (LogSubject) this; @@ -603,7 +609,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo @Override public String toLogString() { - return " [" + + return "[" + MessageFormat.format(CHANNEL_FORMAT, getConnection().getConnectionId(), getClientID(), diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 17d7dc90e3..42a3975e24 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1223,7 +1223,6 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void closed(Session session) { - super.closed(session); for(Subscription_0_10 sub : getSubscriptions(session)) { ((ServerSession)session).unregister(sub); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index fa3c1737a7..8abae7a23e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -434,7 +434,7 @@ public class Connection extends ConnectionInvoker } } - Session getSession(int channel) + protected Session getSession(int channel) { synchronized (lock) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 4e6d2130ae..dd6a37eca2 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -52,7 +52,6 @@ public class IoNetworkTransport implements NetworkTransport, IoContext private long timeout = 60000; private ConnectionSettings settings; - @Override public void init(ConnectionSettings settings) { try @@ -84,20 +83,17 @@ public class IoNetworkTransport implements NetworkTransport, IoContext } } - @Override public void receiver(Receiver<ByteBuffer> delegate) { receiver = new IoReceiver(this, delegate, 2*settings.getReadBufferSize() , timeout); } - @Override public Sender<ByteBuffer> sender() { return new IoSender(this, 2*settings.getWriteBufferSize(), timeout); } - - @Override + public void close() { |