summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-07 16:36:26 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-07 16:36:26 +0000
commit9e9d87a76f12567c2e0f65485bbf85b39fc6e437 (patch)
treee8caf31a36a4618fbaa33b765ca68f9c48942226 /qpid/java/broker-plugins
parent735d2fbd9b1cf73410363d23591d50fe34c61dc4 (diff)
downloadqpid-python-9e9d87a76f12567c2e0f65485bbf85b39fc6e437.tar.gz
QPID-5611 : [Java Broker] remove LogActors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575315 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java6
-rw-r--r--qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java10
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java53
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java42
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java82
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java66
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java322
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java10
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java6
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java16
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java57
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java9
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java15
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java6
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java10
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java19
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java23
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java32
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java49
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java29
23 files changed, 435 insertions, 438 deletions
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java
index fcb5bcbf70..58f3c71c7c 100644
--- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java
+++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java
@@ -38,7 +38,7 @@ import javax.security.auth.Subject;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.AccessControlMessages;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.access.ObjectProperties;
@@ -321,14 +321,14 @@ public class RuleSet
switch (permission)
{
case ALLOW_LOG:
- CurrentActor.get().message(AccessControlMessages.ALLOWED(
+ SystemLog.message(AccessControlMessages.ALLOWED(
action.getOperation().toString(),
action.getObjectType().toString(),
action.getProperties().toString()));
case ALLOW:
return Result.ALLOWED;
case DENY_LOG:
- CurrentActor.get().message(AccessControlMessages.DENIED(
+ SystemLog.message(AccessControlMessages.DENIED(
action.getOperation().toString(),
action.getObjectType().toString(),
action.getProperties().toString()));
diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
index e907a88001..f5f1866c3a 100644
--- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
+++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
@@ -33,9 +33,8 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.access.ObjectProperties;
@@ -67,7 +66,7 @@ public class DefaultAccessControlTest extends TestCase
private void configureAccessControl(final RuleSet rs) throws ConfigurationException
{
_plugin = new DefaultAccessControl(rs);
- CurrentActor.set(new TestLogActor(messageLogger));
+ SystemLog.setRootMessageLogger(messageLogger);
}
private RuleSet createGroupRuleSet()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 120ba2d951..d41af30a09 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,8 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -387,7 +386,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
protected void sendToDLQOrDiscard(MessageInstance entry)
{
- final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
int requeues = entry.routeToAlternate(new Action<MessageInstance>()
@@ -395,7 +393,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
@Override
public void performAction(final MessageInstance requeueEntry)
{
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ SystemLog.message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
requeueEntry.getOwningResource().getName()));
}
}, null);
@@ -410,12 +408,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
if(alternateExchange != null)
{
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ SystemLog.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
alternateExchange.getName()));
}
else
{
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ SystemLog.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
queue.getName(),
msg.getInitialRoutingAddress()));
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 9fe1babe20..9346e3e7bb 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
@@ -30,8 +31,11 @@ import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
+import javax.security.auth.Subject;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine
@@ -63,15 +67,31 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
{
- _network = network;
+ if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
+ {
+ Subject.doAs(getSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setNetworkConnection(network,sender);
+ return null;
+ }
+ });
+ }
+ else
+ {
+ SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+ _network = network;
+
+ _connection.setNetworkConnection(network);
+ _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
+ // FIXME Two log messages to maintain compatibility with earlier protocol versions
+ SystemLog.message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
- _connection.setNetworkConnection(network);
- _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
- // FIXME Two log messages to maintain compatibility with earlier protocol versions
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
+ }
}
private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
@@ -155,8 +175,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void readerIdle()
{
- _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE());
- _network.close();
+ Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ SystemLog.message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
+
}
public String getAddress()
@@ -189,4 +218,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
return _connection.getConnectionId();
}
+
+ @Override
+ public Subject getSubject()
+ {
+ return _connection.getAuthorizedSubject();
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 5bfc398bcf..5096223889 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -32,10 +32,8 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.ConnectionPrincipal;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
@@ -65,7 +63,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
- private LogActor _actor;
private final Subject _authorizedSubject = new Subject();
private Principal _authorizedPrincipal = null;
@@ -87,7 +84,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
_connectionId = connectionId;
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
- _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger());
}
public Object getReference()
@@ -112,7 +108,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
_onOpenTask.run();
}
- _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true));
+ SystemLog.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true));
getVirtualHost().getConnectionRegistry().registerConnection(this);
}
@@ -135,7 +131,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
if(_logClosed.compareAndSet(false, true))
{
- CurrentActor.get().message(this, ConnectionMessages.CLOSE());
+ SystemLog.message(this, ConnectionMessages.CLOSE());
}
}
@@ -258,42 +254,31 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
Subject subject;
if (event.isConnectionControl())
{
- CurrentActor.set(_actor);
subject = _authorizedSubject;
}
else
{
ServerSession channel = (ServerSession) getSession(event.getChannel());
- LogActor channelActor = null;
if (channel != null)
{
subject = channel.getAuthorizedSubject();
- channelActor = channel.getLogActor();
}
else
{
subject = _authorizedSubject;
}
-
- CurrentActor.set(channelActor == null ? _actor : channelActor);
}
- try
+ Subject.doAs(subject, new PrivilegedAction<Void>()
{
- Subject.doAs(subject, new PrivilegedAction<Void>()
+ @Override
+ public Void run()
{
- @Override
- public Void run()
- {
- ServerConnection.super.received(event);
- return null;
- }
- });
- }
- finally
- {
- CurrentActor.remove();
- }
+ ServerConnection.super.received(event);
+ return null;
+ }
+ });
+
}
public String toLogString()
@@ -331,11 +316,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
}
- public LogActor getLogActor()
- {
- return _actor;
- }
-
public void close(AMQConstant cause, String message)
{
closeSubscriptions();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 5627b2eabe..453b3f85a5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.security.AccessController;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
@@ -43,15 +45,13 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
@@ -102,7 +102,6 @@ public class ServerSession extends Session
private final UUID _id = UUID.randomUUID();
private final Subject _subject = new Subject();
private long _createTime = System.currentTimeMillis();
- private LogActor _actor = GenericActor.getInstance(this);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -161,18 +160,44 @@ public class ServerSession extends Session
});
}
- protected void setState(State state)
+ protected void setState(final State state)
{
- super.setState(state);
-
- if (state == State.OPEN)
+ if(runningAsSubject())
{
- _actor.message(ChannelMessages.CREATE());
- if(_blocking.get())
+ super.setState(state);
+
+ if (state == State.OPEN)
{
- invokeBlock();
+ SystemLog.message(ChannelMessages.CREATE());
+ if(_blocking.get())
+ {
+ invokeBlock();
+ }
}
}
+ else
+ {
+ runAsSubject(new PrivilegedAction<Void>() {
+
+ @Override
+ public Void run()
+ {
+ setState(state);
+ return null;
+ }
+ });
+
+ }
+ }
+
+ private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction)
+ {
+ return Subject.doAs(getAuthorizedSubject(), privilegedAction);
+ }
+
+ private boolean runningAsSubject()
+ {
+ return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
}
private void invokeBlock()
@@ -394,7 +419,7 @@ public class ServerSession extends Session
{
operationalLoggingMessage = ChannelMessages.CLOSE();
}
- CurrentActor.get().message(getLogSubject(), operationalLoggingMessage);
+ SystemLog.message(getLogSubject(), operationalLoggingMessage);
}
@Override
@@ -678,14 +703,10 @@ public class ServerSession extends Session
return (ServerConnection) super.getConnection();
}
- public LogActor getLogActor()
- {
- return _actor;
- }
public LogSubject getLogSubject()
{
- return (LogSubject) this;
+ return this;
}
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -704,7 +725,7 @@ public class ServerSession extends Session
}
- private void block(Object queue, String name)
+ private void block(final Object queue, final String name)
{
synchronized (_blockingEntities)
{
@@ -717,7 +738,7 @@ public class ServerSession extends Session
{
invokeBlock();
}
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
}
@@ -735,25 +756,22 @@ public class ServerSession extends Session
unblock(this);
}
- private void unblock(Object queue)
+ private void unblock(final Object queue)
{
- synchronized(_blockingEntities)
+ if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
- if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
+ if(_blocking.compareAndSet(true,false) && !isClosing())
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
- {
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
- }
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 9d7764414f..d21da824e9 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.store.StoreException;
@@ -345,7 +346,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
+ SystemLog.message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 421adb33a8..858482a034 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -19,8 +19,6 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -41,7 +39,6 @@ public class ServerSessionTest extends QpidTestCase
super.setUp();
BrokerTestHelper.setUp();
_virtualHost = BrokerTestHelper.createVirtualHost(getName());
- GenericActor.setDefaultMessageLogger(CurrentActor.get().getRootMessageLogger());
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 99068a9d6c..e248fc539a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
import java.security.AccessControlException;
+import java.security.PrivilegedAction;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,11 +55,9 @@ import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.AMQPChannelActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
@@ -154,7 +153,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final AtomicBoolean _blocking = new AtomicBoolean(false);
- private LogActor _actor;
private LogSubject _logSubject;
private volatile boolean _rollingBack;
@@ -178,19 +176,17 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private Subject _subject;
- public AMQChannel(T session, int channelId, MessageStore messageStore)
+ public AMQChannel(T session, int channelId, final MessageStore messageStore)
throws AMQException
{
_session = session;
_channelId = channelId;
- _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(),
session.getAuthorizedSubject().getPublicCredentials(),
session.getAuthorizedSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
_logSubject = new ChannelLogSubject(this);
- _actor.message(ChannelMessages.CREATE());
_messageStore = messageStore;
@@ -214,6 +210,18 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
}
});
+
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ SystemLog.message(ChannelMessages.CREATE());
+
+ return null;
+ }
+ });
+
}
/** Sets this channel to be part of a local transaction */
@@ -449,12 +457,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
else
{
- _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(),
- _currentMessage.getMessagePublishInfo().getRoutingKey() == null
- ? null
- : _currentMessage.getMessagePublishInfo()
- .getRoutingKey()
- .toString()));
+ SystemLog.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(),
+ _currentMessage.getMessagePublishInfo().getRoutingKey()
+ == null
+ ? null
+ : _currentMessage.getMessagePublishInfo()
+ .getRoutingKey()
+ .toString()));
}
}
@@ -684,7 +693,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
LogMessage operationalLogMessage = cause == null ?
ChannelMessages.CLOSE() :
ChannelMessages.CLOSE_FORCED(cause.getCode(), message);
- CurrentActor.get().message(_logSubject, operationalLogMessage);
+ SystemLog.message(_logSubject, operationalLogMessage);
unsubscribeAllConsumers();
@@ -977,7 +986,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// Log Flow Started before we start the subscriptions
if (!suspended)
{
- _actor.message(_logSubject, ChannelMessages.FLOW("Started"));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW("Started"));
}
@@ -1028,7 +1037,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// stopped.
if (suspended)
{
- _actor.message(_logSubject, ChannelMessages.FLOW("Stopped"));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW("Stopped"));
}
}
@@ -1174,7 +1183,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public void setCredit(final long prefetchSize, final int prefetchCount)
{
- _actor.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
+ SystemLog.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount));
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
@@ -1431,19 +1440,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
}
-
- public LogActor getLogActor()
- {
- return _actor;
- }
-
public synchronized void block()
{
if(_blockingEntities.add(this))
{
if(_blocking.compareAndSet(false,true))
{
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
flow(false);
}
}
@@ -1455,7 +1458,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
{
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED());
flow(true);
}
@@ -1469,7 +1472,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
if(_blocking.compareAndSet(false,true))
{
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
flow(false);
}
}
@@ -1481,8 +1484,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
-
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED());
flow(true);
}
}
@@ -1552,14 +1554,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
+
int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
public void performAction(final MessageInstance requeueEntry)
{
- _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ SystemLog.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
requeueEntry.getOwningResource().getName()));
}
}, null);
@@ -1577,7 +1579,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
if (altExchange == null)
{
_logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress()));
+ SystemLog.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress()));
}
else
@@ -1585,7 +1587,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_logger.debug(
"Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ deliveryTag);
- _actor.message(_logSubject,
+ SystemLog.message(_logSubject,
ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 167a505c19..80c4a9fde7 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -55,15 +56,12 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.model.Broker;
@@ -143,7 +141,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final long _connectionID;
private Object _reference = new Object();
- private AMQPConnectionActor _actor;
private LogSubject _logSubject;
private long _lastIoTime;
@@ -172,7 +169,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private long _readBytes;
public AMQProtocolEngine(Broker broker,
- NetworkConnection network,
+ final NetworkConnection network,
final long connectionId,
Port port,
Transport transport)
@@ -184,21 +181,44 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(broker, this);
_codecFactory = new AMQCodecFactory(true, this);
-
- setNetworkConnection(network);
_connectionID = connectionId;
+ _logSubject = new ConnectionLogSubject(this);
- _actor = new AMQPConnectionActor(this, _broker.getRootMessageLogger());
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
+ runAsSubject(new PrivilegedAction<Void>()
+ {
- _logSubject = new ConnectionLogSubject(this);
+ @Override
+ public Void run()
+ {
+ setNetworkConnection(network);
+
+ SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+
+ _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE);
- _actor.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+ initialiseStatistics();
- _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE);
+ return null;
+ }
+ });
- initialiseStatistics();
+ }
+ private <T> T runAsSubject(PrivilegedAction<T> action)
+ {
+ return Subject.doAs(getAuthorizedSubject(), action);
+ }
+
+ private boolean runningAsSubject()
+ {
+ return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
+ }
+
+ @Override
+ public Subject getSubject()
+ {
+ return _authorizedSubject;
}
public void setNetworkConnection(NetworkConnection network)
@@ -217,11 +237,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _connectionID;
}
- public LogActor getLogActor()
- {
- return _actor;
- }
-
public void setMaxFrameSize(long frameMax)
{
_maxFrameSize = frameMax;
@@ -400,74 +415,57 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
AMQBody body = frame.getBodyFrame();
- //Look up the Channel's Actor and set that as the current actor
- // If that is not available then we can use the ConnectionActor
- // that is associated with this AMQMPSession.
- LogActor channelActor = null;
- if (amqChannel != null)
+ long startTime = 0;
+ String frameToString = null;
+ if (_logger.isDebugEnabled())
{
- channelActor = amqChannel.getLogActor();
+ startTime = System.currentTimeMillis();
+ frameToString = frame.toString();
+ _logger.debug("RECV: " + frame);
}
- CurrentActor.set(channelActor == null ? _actor : channelActor);
- try
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
{
- long startTime = 0;
- String frameToString = null;
- if (_logger.isDebugEnabled())
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
{
- startTime = System.currentTimeMillis();
- frameToString = frame.toString();
- _logger.debug("RECV: " + frame);
- }
-
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
+ if (_logger.isInfoEnabled())
{
- // The channel has been told to close, we don't process any more frames until
- // it's closed.
- return;
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
}
}
-
- try
- {
- body.handle(channelId, this);
- }
- catch(AMQConnectionException e)
- {
- _logger.info(e.getMessage() + " whilst processing frame: " + body);
- closeConnection(channelId, e);
- throw e;
- }
- catch (AMQException e)
- {
- closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
- throw e;
- }
- catch (TransportException e)
+ else
{
- closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
- throw e;
+ // The channel has been told to close, we don't process any more frames until
+ // it's closed.
+ return;
}
+ }
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
- }
+ try
+ {
+ body.handle(channelId, this);
}
- finally
+ catch(AMQConnectionException e)
+ {
+ _logger.info(e.getMessage() + " whilst processing frame: " + body);
+ closeConnection(channelId, e);
+ throw e;
+ }
+ catch (AMQException e)
{
- CurrentActor.remove();
+ closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
+ throw e;
+ }
+ catch (TransportException e)
+ {
+ closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
+ throw e;
+ }
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
}
}
@@ -478,7 +476,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
// Log incoming protocol negotiation request
- _actor.message(ConnectionMessages.OPEN(null, pi.getProtocolMajor() + "-" + pi.getProtocolMinor(), null, null, false, true, false, false));
+ SystemLog.message(ConnectionMessages.OPEN(null,
+ pi.getProtocolMajor() + "-" + pi.getProtocolMinor(),
+ null,
+ null,
+ false,
+ true,
+ false,
+ false));
ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
@@ -778,22 +783,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_maxNoOfChannels = value;
}
- public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
- {
- if ((channel != null) && channel.isTransactional())
- {
- channel.commit();
- }
- }
-
- public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
- {
- if ((channel != null) && channel.isTransactional())
- {
- channel.rollback();
- }
- }
-
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -908,77 +897,89 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
@Override
public void closeSession()
{
- if(_closing.compareAndSet(false,true))
- {
- // force sync of outstanding async work
- _receivedLock.lock();
- try
- {
- receivedComplete();
- }
- finally
- {
- _receivedLock.unlock();
- }
- // REMOVE THIS SHOULD NOT BE HERE.
- if (CurrentActor.get() == null)
- {
- CurrentActor.set(_actor);
- }
- if (!_closed)
+ if(runningAsSubject())
+ {
+ if(_closing.compareAndSet(false,true))
{
- if (_virtualHost != null)
+ // force sync of outstanding async work
+ _receivedLock.lock();
+ try
{
- _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ receivedComplete();
}
-
- closeAllChannels();
-
- for (Action<? super AMQProtocolEngine> task : _taskList)
+ finally
{
- task.performAction(this);
+ _receivedLock.unlock();
}
- synchronized(this)
+ if (!_closed)
{
- _closed = true;
- notifyAll();
+ if (_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
+
+ closeAllChannels();
+
+ for (Action<? super AMQProtocolEngine> task : _taskList)
+ {
+ task.performAction(this);
+ }
+
+ synchronized(this)
+ {
+ _closed = true;
+ notifyAll();
+ }
+ SystemLog.message(_logSubject, ConnectionMessages.CLOSE());
}
- CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
}
- }
- else
- {
- synchronized(this)
+ else
{
+ synchronized(this)
+ {
- boolean lockHeld = _receivedLock.isHeldByCurrentThread();
+ boolean lockHeld = _receivedLock.isHeldByCurrentThread();
- while(!_closed)
- {
- try
+ while(!_closed)
{
- if(lockHeld)
+ try
+ {
+ if(lockHeld)
+ {
+ _receivedLock.unlock();
+ }
+ wait(1000);
+ }
+ catch (InterruptedException e)
{
- _receivedLock.unlock();
+ // do nothing
}
- wait(1000);
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
- finally
- {
- if(lockHeld)
+ finally
{
- _receivedLock.lock();
+ if(lockHeld)
+ {
+ _receivedLock.lock();
+ }
}
}
}
}
}
+ else
+ {
+ runAsSubject(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ closeSession();
+ return null;
+ }
+ });
+
+ }
}
private void closeConnection(int channelId, AMQConnectionException e)
@@ -1091,7 +1092,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
setContextKey(new AMQShortString(clientId));
}
- _actor.message(ConnectionMessages.OPEN(clientId, _protocolVersion.toString(), _clientVersion, _clientProduct, true, true, true, true));
+ SystemLog.message(ConnectionMessages.OPEN(clientId,
+ _protocolVersion.toString(),
+ _clientVersion,
+ _clientProduct,
+ true,
+ true,
+ true,
+ true));
}
}
@@ -1361,40 +1369,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return String.valueOf(getRemoteAddress());
}
- public void mgmtCloseChannel(int channelId)
- {
- MethodRegistry methodRegistry = getMethodRegistry();
- ChannelCloseBody responseBody =
- methodRegistry.createChannelCloseBody(
- AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("The channel was closed using the broker's management interface."),
- 0,0);
-
- // This seems ugly but because we use AMQChannel.close() in both normal
- // broker operation and as part of the management interface it cannot
- // be avoided. The Current Actor will be null when this method is
- // called via the QMF management interface. As such we need to set one.
- boolean removeActor = false;
- if (CurrentActor.get() == null)
- {
- removeActor = true;
- CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
- }
-
- try
- {
- writeFrame(responseBody.generateFrame(channelId));
- closeChannel(channelId);
- }
- finally
- {
- if (removeActor)
- {
- CurrentActor.remove();
- }
- }
- }
-
public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
{
int channelId = session.getChannelId();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index 045367b667..ade087ef21 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -35,9 +35,7 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -48,8 +46,6 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
{
long getSessionID();
- LogActor getLogActor();
-
void setMaxFrameSize(long frameMax);
long getMaxFrameSize();
@@ -202,14 +198,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
void setMaximumNumberOfChannels(Long value);
- void commitTransactions(AMQChannel<T> channel) throws AMQException;
-
- void rollbackTransactions(AMQChannel<T> channel) throws AMQException;
-
List<AMQChannel<T>> getChannels();
- void mgmtCloseChannel(int channelId);
-
public Principal getPeerPrincipal();
Lock getReceivedLock();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index f82689e36a..3c3c17a6fb 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -511,6 +511,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
return _connectionId;
}
+ @Override
+ public Subject getSubject()
+ {
+ return _connection.getSubject();
+ }
+
public long getLastReadTime()
{
return _lastReadTime;
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index d11b1e029c..9a4adbc4c7 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -33,7 +33,7 @@ import javax.servlet.DispatcherType;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
import org.apache.qpid.server.management.plugin.filter.ForbiddingAuthorisationFilter;
import org.apache.qpid.server.management.plugin.filter.RedirectingAuthorisationFilter;
@@ -140,7 +140,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
private void start()
{
- CurrentActor.get().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
Collection<Port> httpPorts = getHttpPorts(getBroker().getPorts());
_server = createServer(httpPorts);
@@ -154,7 +154,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
throw new ServerScopedRuntimeException("Failed to start HTTP management on ports : " + httpPorts, e);
}
- CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
}
private void stop()
@@ -172,7 +172,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
}
}
- CurrentActor.get().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
}
public int getSessionTimeout()
@@ -381,13 +381,14 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
Connector[] connectors = server.getConnectors();
for (Connector connector : connectors)
{
- CurrentActor.get().message(ManagementConsoleMessages.LISTENING(stringifyConnectorScheme(connector), connector.getPort()));
+ SystemLog.message(ManagementConsoleMessages.LISTENING(stringifyConnectorScheme(connector),
+ connector.getPort()));
if (connector instanceof SslSocketConnector)
{
SslContextFactory sslContextFactory = ((SslSocketConnector)connector).getSslContextFactory();
if (sslContextFactory != null && sslContextFactory.getKeyStorePath() != null)
{
- CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(sslContextFactory.getKeyStorePath()));
+ SystemLog.message(ManagementConsoleMessages.SSL_KEYSTORE(sslContextFactory.getKeyStorePath()));
}
}
}
@@ -398,7 +399,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
Connector[] connectors = server.getConnectors();
for (Connector connector : connectors)
{
- CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN(stringifyConnectorScheme(connector), connector.getPort()));
+ SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN(stringifyConnectorScheme(connector),
+ connector.getPort()));
}
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
index 5d39b3c0c1..023453e74f 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
@@ -22,11 +22,8 @@ package org.apache.qpid.server.management.plugin;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.security.AccessControlException;
import java.security.Principal;
import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
import java.security.cert.X509Certificate;
import java.util.Collections;
@@ -37,9 +34,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import org.apache.commons.codec.binary.Base64;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.HttpManagementActor;
import org.apache.qpid.server.management.plugin.servlet.ServletConnectionPrincipal;
import org.apache.qpid.server.management.plugin.session.LoginLogoutReporter;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -51,7 +45,6 @@ import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationS
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class HttpManagementUtil
@@ -117,55 +110,33 @@ public class HttpManagementUtil
subject.getPrincipals().add(new ServletConnectionPrincipal(request));
subject.setReadOnly();
- LogActor actor = createHttpManagementActor(broker, request);
+ assertManagementAccess(broker.getSecurityManager(), subject);
- assertManagementAccess(broker.getSecurityManager(), subject, actor);
-
- saveAuthorisedSubject(session, subject, actor);
+ saveAuthorisedSubject(session, subject);
}
}
- public static void assertManagementAccess(final SecurityManager securityManager, Subject subject, LogActor actor)
+ public static void assertManagementAccess(final SecurityManager securityManager, Subject subject)
{
- CurrentActor.set(actor);
- try
+ Subject.doAs(subject, new PrivilegedAction<Void>()
{
- Subject.doAs(subject, new PrivilegedAction<Void>()
+ @Override
+ public Void run()
{
- @Override
- public Void run()
- {
- securityManager.accessManagement();
- return null;
- }
- });
- }
- finally
- {
- CurrentActor.remove();
- }
- }
-
- public static HttpManagementActor getOrCreateAndCacheLogActor(HttpServletRequest request, Broker broker)
- {
- HttpSession session = request.getSession();
- HttpManagementActor actor = (HttpManagementActor) session.getAttribute(ATTR_LOG_ACTOR);
- if (actor == null)
- {
- actor = createHttpManagementActor(broker, request);
- session.setAttribute(ATTR_LOG_ACTOR, actor);
- }
- return actor;
+ securityManager.accessManagement();
+ return null;
+ }
+ });
}
- public static void saveAuthorisedSubject(HttpSession session, Subject subject, LogActor logActor)
+ public static void saveAuthorisedSubject(HttpSession session, Subject subject)
{
session.setAttribute(ATTR_SUBJECT, subject);
// Cause the user logon to be logged.
- session.setAttribute(ATTR_LOGIN_LOGOUT_REPORTER, new LoginLogoutReporter(logActor, subject));
+ session.setAttribute(ATTR_LOGIN_LOGOUT_REPORTER, new LoginLogoutReporter(subject));
}
public static Subject tryToAuthenticate(HttpServletRequest request, HttpManagementConfiguration managementConfig)
@@ -245,9 +216,5 @@ public class HttpManagementUtil
return null;
}
- private static HttpManagementActor createHttpManagementActor(Broker broker, HttpServletRequest request)
- {
- return new HttpManagementActor(broker.getRootMessageLogger(), request.getRemoteAddr(), request.getRemotePort());
- }
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java
index 0925e2b088..18a026ec93 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.server.management.plugin.servlet;
+import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal;
import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
import javax.servlet.ServletRequest;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-public class ServletConnectionPrincipal implements SocketConnectionPrincipal
+public class ServletConnectionPrincipal implements ManagementConnectionPrincipal
{
private final InetSocketAddress _address;
@@ -74,4 +75,10 @@ public class ServletConnectionPrincipal implements SocketConnectionPrincipal
{
return _address.hashCode();
}
+
+ @Override
+ public String getType()
+ {
+ return "HTTP";
+ }
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
index 1133b6e091..a9e80db3bf 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
@@ -33,13 +33,9 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.HttpManagementActor;
import org.apache.qpid.server.management.plugin.HttpManagementConfiguration;
import org.apache.qpid.server.management.plugin.HttpManagementUtil;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
@@ -88,7 +84,6 @@ public abstract class AbstractServlet extends HttpServlet
/**
* Performs the GET action as the logged-in {@link Subject}.
- * The {@link LogActor} is set before this method is called.
* Subclasses commonly override this method
*/
protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse resp) throws ServletException, IOException
@@ -117,7 +112,6 @@ public abstract class AbstractServlet extends HttpServlet
/**
* Performs the POST action as the logged-in {@link Subject}.
- * The {@link LogActor} is set before this method is called.
* Subclasses commonly override this method
*/
protected void doPostWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@@ -145,7 +139,6 @@ public abstract class AbstractServlet extends HttpServlet
/**
* Performs the PUT action as the logged-in {@link Subject}.
- * The {@link LogActor} is set before this method is called.
* Subclasses commonly override this method
*/
protected void doPutWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@@ -174,7 +167,6 @@ public abstract class AbstractServlet extends HttpServlet
/**
* Performs the PUT action as the logged-in {@link Subject}.
- * The {@link LogActor} is set before this method is called.
* Subclasses commonly override this method
*/
protected void doDeleteWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
@@ -198,8 +190,6 @@ public abstract class AbstractServlet extends HttpServlet
return;
}
- HttpManagementActor logActor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, _broker);
- CurrentActor.set(logActor);
try
{
Subject.doAs(subject, privilegedExceptionAction);
@@ -223,11 +213,6 @@ public abstract class AbstractServlet extends HttpServlet
}
throw new ConnectionScopedRuntimeException(e.getCause());
}
- finally
- {
- CurrentActor.remove();
- }
-
}
protected Subject getAuthorisedSubject(HttpServletRequest request)
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
index b59e9bc04d..2ca67fadc9 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
@@ -27,7 +27,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.management.plugin.HttpManagementConfiguration;
import org.apache.qpid.server.management.plugin.HttpManagementUtil;
import org.apache.qpid.server.model.Broker;
@@ -247,10 +246,9 @@ public class SaslServlet extends AbstractServlet
subject.setReadOnly();
Broker broker = getBroker();
- LogActor actor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, broker);
try
{
- HttpManagementUtil.assertManagementAccess(broker.getSecurityManager(), subject, actor);
+ HttpManagementUtil.assertManagementAccess(broker.getSecurityManager(), subject);
}
catch(SecurityException e)
{
@@ -258,7 +256,7 @@ public class SaslServlet extends AbstractServlet
return;
}
- HttpManagementUtil.saveAuthorisedSubject(request.getSession(), subject, actor);
+ HttpManagementUtil.saveAuthorisedSubject(request.getSession(), subject);
session.removeAttribute(ATTR_ID);
session.removeAttribute(ATTR_SASL_SERVER);
session.removeAttribute(ATTR_EXPIRY);
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java
index 238f1b4719..5155654e82 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java
@@ -28,7 +28,7 @@ import javax.servlet.http.HttpSessionBindingEvent;
import javax.servlet.http.HttpSessionBindingListener;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -40,14 +40,12 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
public class LoginLogoutReporter implements HttpSessionBindingListener
{
private static final Logger LOGGER = Logger.getLogger(LoginLogoutReporter.class);
- private final LogActor _logActor;
private final Subject _subject;
private final Principal _principal;
- public LoginLogoutReporter(LogActor logActor, Subject subject)
+ public LoginLogoutReporter(Subject subject)
{
super();
- _logActor = logActor;
_subject = subject;
_principal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_subject);
}
@@ -76,7 +74,7 @@ public class LoginLogoutReporter implements HttpSessionBindingListener
@Override
public Void run()
{
- _logActor.message(ManagementConsoleMessages.OPEN(_principal.getName()));
+ SystemLog.message(ManagementConsoleMessages.OPEN(_principal.getName()));
return null;
}
});
@@ -94,7 +92,7 @@ public class LoginLogoutReporter implements HttpSessionBindingListener
@Override
public Void run()
{
- _logActor.message(ManagementConsoleMessages.CLOSE(_principal.getName()));
+ SystemLog.message(ManagementConsoleMessages.CLOSE(_principal.getName()));
return null;
}
});
diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java
index 1d43c44587..16ae5220ab 100644
--- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java
+++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java
@@ -19,16 +19,20 @@
*/
package org.apache.qpid.server.management.plugin.session;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.when;
import javax.security.auth.Subject;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
import junit.framework.TestCase;
@@ -36,7 +40,7 @@ public class LoginLogoutReporterTest extends TestCase
{
private LoginLogoutReporter _loginLogoutReport;
private Subject _subject = new Subject();
- private LogActor _logActor = Mockito.mock(LogActor.class);
+ private RootMessageLogger _logger = mock(RootMessageLogger.class);
@Override
protected void setUp() throws Exception
@@ -44,19 +48,22 @@ public class LoginLogoutReporterTest extends TestCase
super.setUp();
_subject.getPrincipals().add(new AuthenticatedPrincipal("mockusername"));
- _loginLogoutReport = new LoginLogoutReporter(_logActor, _subject);
+ when(_logger.isEnabled()).thenReturn(true);
+ when(_logger.isMessageEnabled(anyString())).thenReturn(true);
+ SystemLog.setRootMessageLogger(_logger);
+ _loginLogoutReport = new LoginLogoutReporter(_subject);
}
public void testLoginLogged()
{
_loginLogoutReport.valueBound(null);
- verify(_logActor).message(isLogMessageWithMessage("MNG-1007 : Open : User mockusername"));
+ verify(_logger).message(isLogMessageWithMessage("MNG-1007 : Open : User mockusername"));
}
public void testLogoutLogged()
{
_loginLogoutReport.valueUnbound(null);
- verify(_logActor).message(isLogMessageWithMessage("MNG-1008 : Close : User mockusername"));
+ verify(_logger).message(isLogMessageWithMessage("MNG-1008 : Close : User mockusername"));
}
private LogMessage isLogMessageWithMessage(final String expectedMessage)
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
index e2b9a98784..8b1463b476 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
@@ -20,10 +20,9 @@
*/
package org.apache.qpid.server.jmx;
-import javax.net.ssl.KeyManager;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.KeyStore;
@@ -32,7 +31,6 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.security.auth.jmx.JMXPasswordAuthenticator;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.ssl.SSLContextFactory;
import javax.management.JMException;
import javax.management.MBeanServer;
@@ -99,12 +97,12 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
@Override
public void start() throws IOException
{
- CurrentActor.get().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
//check if system properties are set to use the JVM's out-of-the-box JMXAgent
if (areOutOfTheBoxJMXOptionsSet())
{
- CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
}
else
{
@@ -138,7 +136,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
throw new ServerScopedRuntimeException("Unable to create SSLContext for key store", e);
}
- CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName()));
+ SystemLog.message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName()));
//create the SSL RMI socket factories
csf = new SslRMIClientSocketFactory();
@@ -250,8 +248,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
_cs.start();
String connectorServer = (connectorSslEnabled ? "SSL " : "") + "JMX RMIConnectorServer";
- CurrentActor.get().message(ManagementConsoleMessages.LISTENING(connectorServer, jmxPortConnectorServer));
- CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.LISTENING(connectorServer, jmxPortConnectorServer));
+ SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
}
private Registry createRmiRegistry(int jmxPortRegistryServer, boolean useCustomRmiRegistry)
@@ -269,7 +267,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
rmiRegistry = LocateRegistry.createRegistry(jmxPortRegistryServer, null, null);
}
- CurrentActor.get().message(ManagementConsoleMessages.LISTENING("RMI Registry", jmxPortRegistryServer));
+ SystemLog.message(ManagementConsoleMessages.LISTENING("RMI Registry", jmxPortRegistryServer));
return rmiRegistry;
}
@@ -294,7 +292,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
unregisterAllMbeans();
- CurrentActor.get().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
+ SystemLog.message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME));
}
private void closeConnectorAndRegistryServers()
@@ -338,7 +336,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
if (_rmiRegistry != null)
{
// Stopping the RMI registry
- CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _registryPort.getPort()));
+ SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _registryPort.getPort()));
try
{
boolean success = UnicastRemoteObject.unexportObject(_rmiRegistry, false);
@@ -365,7 +363,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
// Stopping the JMX ConnectorServer
try
{
- CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer", _cs.getAddress().getPort()));
+ SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer",
+ _cs.getAddress().getPort()));
_cs.stop();
}
catch (IOException e)
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
index a38addae7b..b20685985f 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
@@ -23,8 +23,6 @@ package org.apache.qpid.server.jmx;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
@@ -39,17 +37,13 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.RuntimeErrorException;
import javax.management.remote.MBeanServerForwarder;
-import javax.management.remote.rmi.RMIServer;
import javax.security.auth.Subject;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.rmi.server.RemoteServer;
-import java.rmi.server.ServerNotActiveException;
import java.security.AccessControlContext;
import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
@@ -64,7 +58,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate";
private MBeanServer _mbs;
- private final ManagementActor _logActor;
private final boolean _managementRightsInferAllAccess;
private final Broker _broker;
@@ -73,7 +66,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
{
_managementRightsInferAllAccess = Boolean.valueOf(System.getProperty(BrokerProperties.PROPERTY_MANAGEMENT_RIGHTS_INFER_ALL_ACCESS, "true"));
_broker = broker;
- _logActor = new ManagementActor(broker.getRootMessageLogger());
}
public static MBeanServerForwarder newProxyInstance(Broker broker)
@@ -117,9 +109,9 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
return false;
}
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable
{
- String methodName = method.getName();
+ final String methodName = method.getName();
if (methodName.equals("getMBeanServer"))
{
@@ -167,16 +159,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
throw new SecurityException("Access denied: no authenticated principal", e);
}
- // Save the subject
- CurrentActor.set(_logActor);
- try
- {
- return authoriseAndInvoke(method, args);
- }
- finally
- {
- CurrentActor.remove();
- }
+ return authoriseAndInvoke(method, args);
}
catch (InvocationTargetException e)
{
@@ -207,7 +190,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
}
}
- private Object authoriseAndInvoke(final Method method, final Object[] args) throws Throwable
+ private Object authoriseAndInvoke(final Method method, final Object[] args) throws Exception
{
String methodName;
// Get the component, type and impact, which may be null
@@ -239,8 +222,11 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
{
try
{
+ Subject subject = Subject.getSubject(AccessController.getContext());
+ subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials());
+ subject.getPrincipals().addAll(SecurityManager.SYSTEM.getPrincipals());
- return Subject.doAs(SecurityManager.SYSTEM, new PrivilegedExceptionAction<Object>()
+ return Subject.doAs(subject, new PrivilegedExceptionAction<Object>()
{
@Override
public Object run() throws IllegalAccessException, InvocationTargetException
@@ -251,7 +237,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
}
catch (PrivilegedActionException e)
{
- throw e.getCause();
+ throw (Exception) e.getCause();
}
}
else
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java
index ae0574dc21..79d944fc5c 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java
@@ -27,12 +27,19 @@ import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.remote.JMXConnectionNotification;
+import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.jmx.JMXConnectionPrincipal;
+
+import java.rmi.server.RemoteServer;
+import java.rmi.server.ServerNotActiveException;
+import java.security.PrivilegedAction;
+import java.util.Collections;
public class ManagementLogonLogoffReporter implements NotificationListener, NotificationFilter
{
@@ -65,19 +72,41 @@ public class ManagementLogonLogoffReporter implements NotificationListener, Not
final String[] splitConnectionId = connectionId.split(" ");
user = splitConnectionId[1];
}
+ Subject originalSubject = new Subject(false, Collections.singleton(new AuthenticatedPrincipal(user)), Collections.emptySet(), Collections.emptySet());
+ Subject subject;
- // use a separate instance of actor as subject is not set on connect/disconnect
- // we need to pass principal name explicitly into log actor
- LogActor logActor = new ManagementActor(_rootMessageLogger, user);
- if (JMXConnectionNotification.OPENED.equals(type))
+ try
{
- logActor.message(ManagementConsoleMessages.OPEN(user));
+ String clientHost = RemoteServer.getClientHost();
+ subject = new Subject(false,
+ originalSubject.getPrincipals(),
+ originalSubject.getPublicCredentials(),
+ originalSubject.getPrivateCredentials());
+ subject.getPrincipals().add(new JMXConnectionPrincipal(clientHost));
+ subject.setReadOnly();
}
- else if (JMXConnectionNotification.CLOSED.equals(type) ||
- JMXConnectionNotification.FAILED.equals(type))
+ catch(ServerNotActiveException e)
{
- logActor.message(ManagementConsoleMessages.CLOSE(user));
+ subject = originalSubject;
}
+ final String username = user;
+ Subject.doAs(subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ if (JMXConnectionNotification.OPENED.equals(type))
+ {
+ SystemLog.message(ManagementConsoleMessages.OPEN(username));
+ }
+ else if (JMXConnectionNotification.CLOSED.equals(type) ||
+ JMXConnectionNotification.FAILED.equals(type))
+ {
+ SystemLog.message(ManagementConsoleMessages.CLOSE(username));
+ }
+ return null;
+ }
+ });
}
@Override
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java
index ba9c2cdaa5..515a9d88f2 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java
@@ -23,6 +23,7 @@ import static javax.management.remote.JMXConnectionNotification.OPENED;
import static javax.management.remote.JMXConnectionNotification.CLOSED;
import static javax.management.remote.JMXConnectionNotification.FAILED;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
@@ -31,10 +32,14 @@ import static org.mockito.Matchers.anyString;
import javax.management.remote.JMXConnectionNotification;
-import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.RootMessageLogger;
import junit.framework.TestCase;
+import org.apache.qpid.server.logging.SystemLog;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.mockito.ArgumentMatcher;
public class ManagementLogonLogoffReporterTest extends TestCase
{
@@ -52,8 +57,8 @@ public class ManagementLogonLogoffReporterTest extends TestCase
_usernameAccessor = mock(UsernameAccessor.class);
_rootMessageLogger = mock(RootMessageLogger.class);
// Enable messaging so we can valid the generated strings
- when(_rootMessageLogger.isMessageEnabled(any(LogActor.class), anyString())).thenReturn(true);
-
+ when(_rootMessageLogger.isMessageEnabled(anyString())).thenReturn(true);
+ SystemLog.setRootMessageLogger(_rootMessageLogger);
_reporter = new ManagementLogonLogoffReporter(_rootMessageLogger, _usernameAccessor);
}
@@ -64,7 +69,21 @@ public class ManagementLogonLogoffReporterTest extends TestCase
_reporter.handleNotification(openNotification, null);
- verify(_rootMessageLogger).rawMessage("[main] MNG-1007 : Open : User jmxuser", "qpid.message.managementconsole.open");
+ verify(_rootMessageLogger).message(messageMatch("MNG-1007 : Open : User jmxuser",
+ "qpid.message.managementconsole.open"));
+ }
+
+ private LogMessage messageMatch(final String message, final String hierarchy)
+ {
+ return argThat(new ArgumentMatcher<LogMessage>()
+ {
+ @Override
+ public boolean matches(final Object argument)
+ {
+ LogMessage actual = (LogMessage) argument;
+ return actual.getLogHierarchy().equals(hierarchy) && actual.toString().equals(message);
+ }
+ });
}
public void testClosedNotification()
@@ -74,7 +93,7 @@ public class ManagementLogonLogoffReporterTest extends TestCase
_reporter.handleNotification(closeNotification, null);
- verify(_rootMessageLogger).rawMessage("[main] MNG-1008 : Close : User jmxuser", "qpid.message.managementconsole.close");
+ verify(_rootMessageLogger).message(messageMatch("MNG-1008 : Close : User jmxuser", "qpid.message.managementconsole.close"));
}
public void tesNotifiedForLogOnTypeEvents()