summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-01-27 11:18:39 +0000
committerRobert Gemmell <robbie@apache.org>2011-01-27 11:18:39 +0000
commit1a1f4659ff6466dc25fa206201d52e3b2fee5a2a (patch)
tree321ac32a8c528071ad6f3373b89443258e63e7fe /java
parentfab618e011e19610bb9be31037465ff1cc70063b (diff)
downloadqpid-python-1a1f4659ff6466dc25fa206201d52e3b2fee5a2a.tar.gz
QPID-3021: set the session/connection actor when the connection recieves new events, ensure the correct thread logs close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1064084 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java6
7 files changed, 66 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index b1172a880e..ff2a8c959b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -41,7 +41,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void close()
{
//Set the Actor for Broker Shutdown
- CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
+ CurrentActor.set(new BrokerActor(_rootMessageLogger));
try
{
super.close();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index e301996113..d8b7c2e80e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
import java.text.MessageFormat;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -39,11 +40,13 @@ import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolEvent;
public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
{
private ConnectionConfig _config;
private Runnable _onOpenTask;
+ private AtomicBoolean _logClosed = new AtomicBoolean(false);
private LogActor _actor = GenericActor.getInstance(this);
public ServerConnection()
@@ -73,6 +76,14 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
if (state == State.CLOSED)
{
+ logClosed();
+ }
+ }
+
+ protected void logClosed()
+ {
+ if(_logClosed.compareAndSet(false, true))
+ {
CurrentActor.get().message(this, ConnectionMessages.CLOSE());
}
}
@@ -135,13 +146,36 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
((ServerSession)session).close();
}
- public String toLogString() {
+ @Override
+ public void received(ProtocolEvent event)
+ {
+ ServerSession channel = (ServerSession) getSession(event.getChannel());
+ LogActor channelActor = null;
+
+ if (channel != null)
+ {
+ channelActor = channel.getLogActor();
+ }
+
+ CurrentActor.set(channelActor == null ? _actor : channelActor);
+ try
+ {
+ super.received(event);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+
+ public String toLogString()
+ {
boolean hasVirtualHost = (null != this.getVirtualHost());
boolean hasPrincipal = (null != getAuthorizationID());
if (hasPrincipal && hasVirtualHost)
{
- return " [" +
+ return "[" +
MessageFormat.format(CONNECTION_FORMAT,
getConnectionId(),
getClientId(),
@@ -151,7 +185,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
else if (hasPrincipal)
{
- return " [" +
+ return "[" +
MessageFormat.format(USER_FORMAT,
getConnectionId(),
getClientId(),
@@ -161,7 +195,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
else
{
- return " [" +
+ return "[" +
MessageFormat.format(SOCKET_FORMAT,
getConnectionId(),
getConfig().getAddress())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index a9b7d99503..7ba85ffe14 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -84,7 +84,22 @@ public class ServerConnectionDelegate extends ServerDelegate
}
- @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+ @Override
+ public void connectionClose(Connection conn, ConnectionClose close)
+ {
+ try
+ {
+ ((ServerConnection) conn).logClosed();
+ }
+ finally
+ {
+ super.connectionClose(conn, close);
+ }
+
+ }
+
+ @Override
+ public void connectionOpen(Connection conn, ConnectionOpen open)
{
ServerConnection sconn = (ServerConnection) conn;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index c53f65f302..540ad3fffd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
@@ -57,7 +58,6 @@ import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.Session.State;
import java.lang.ref.WeakReference;
import java.security.Principal;
@@ -81,6 +81,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
private final UUID _id;
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
+ private LogActor _actor = GenericActor.getInstance(this);
public static interface MessageDispositionChangeListener
{
@@ -130,7 +131,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
if (state == State.OPEN)
{
- GenericActor.getInstance(this).message(ChannelMessages.CREATE());
+ _actor.message(ChannelMessages.CREATE());
}
}
@@ -595,6 +596,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
return getConnection().getClientId();
}
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
public LogSubject getLogSubject()
{
return (LogSubject) this;
@@ -603,7 +609,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
@Override
public String toLogString()
{
- return " [" +
+ return "[" +
MessageFormat.format(CHANNEL_FORMAT,
getConnection().getConnectionId(),
getClientID(),
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 17d7dc90e3..42a3975e24 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -1223,7 +1223,6 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void closed(Session session)
{
- super.closed(session);
for(Subscription_0_10 sub : getSubscriptions(session))
{
((ServerSession)session).unregister(sub);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index fa3c1737a7..8abae7a23e 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -434,7 +434,7 @@ public class Connection extends ConnectionInvoker
}
}
- Session getSession(int channel)
+ protected Session getSession(int channel)
{
synchronized (lock)
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index 4e6d2130ae..dd6a37eca2 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -52,7 +52,6 @@ public class IoNetworkTransport implements NetworkTransport, IoContext
private long timeout = 60000;
private ConnectionSettings settings;
- @Override
public void init(ConnectionSettings settings)
{
try
@@ -84,20 +83,17 @@ public class IoNetworkTransport implements NetworkTransport, IoContext
}
}
- @Override
public void receiver(Receiver<ByteBuffer> delegate)
{
receiver = new IoReceiver(this, delegate,
2*settings.getReadBufferSize() , timeout);
}
- @Override
public Sender<ByteBuffer> sender()
{
return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
}
-
- @Override
+
public void close()
{