diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-07 16:36:26 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-07 16:36:26 +0000 |
| commit | 9e9d87a76f12567c2e0f65485bbf85b39fc6e437 (patch) | |
| tree | e8caf31a36a4618fbaa33b765ca68f9c48942226 /qpid/java/broker-plugins | |
| parent | 735d2fbd9b1cf73410363d23591d50fe34c61dc4 (diff) | |
| download | qpid-python-9e9d87a76f12567c2e0f65485bbf85b39fc6e437.tar.gz | |
QPID-5611 : [Java Broker] remove LogActors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575315 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
23 files changed, 435 insertions, 438 deletions
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java index fcb5bcbf70..58f3c71c7c 100644 --- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java +++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/RuleSet.java @@ -38,7 +38,7 @@ import javax.security.auth.Subject; import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.AccessControlMessages; import org.apache.qpid.server.security.Result; import org.apache.qpid.server.security.access.ObjectProperties; @@ -321,14 +321,14 @@ public class RuleSet switch (permission) { case ALLOW_LOG: - CurrentActor.get().message(AccessControlMessages.ALLOWED( + SystemLog.message(AccessControlMessages.ALLOWED( action.getOperation().toString(), action.getObjectType().toString(), action.getProperties().toString())); case ALLOW: return Result.ALLOWED; case DENY_LOG: - CurrentActor.get().message(AccessControlMessages.DENIED( + SystemLog.message(AccessControlMessages.DENIED( action.getOperation().toString(), action.getObjectType().toString(), action.getProperties().toString())); diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java index e907a88001..f5f1866c3a 100644 --- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java +++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java @@ -33,9 +33,8 @@ import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.Result; import org.apache.qpid.server.security.access.ObjectProperties; @@ -67,7 +66,7 @@ public class DefaultAccessControlTest extends TestCase private void configureAccessControl(final RuleSet rs) throws ConfigurationException { _plugin = new DefaultAccessControl(rs); - CurrentActor.set(new TestLogActor(messageLogger)); + SystemLog.setRootMessageLogger(messageLogger); } private RuleSet createGroupRuleSet() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 120ba2d951..d41af30a09 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -22,8 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -387,7 +386,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC protected void sendToDLQOrDiscard(MessageInstance entry) { - final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); int requeues = entry.routeToAlternate(new Action<MessageInstance>() @@ -395,7 +393,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override public void performAction(final MessageInstance requeueEntry) { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + SystemLog.message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), requeueEntry.getOwningResource().getName())); } }, null); @@ -410,12 +408,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC if(alternateExchange != null) { - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + SystemLog.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); } else { - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + SystemLog.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 9fe1babe20..9346e3e7bb 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; @@ -30,8 +31,11 @@ import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; +import javax.security.auth.Subject; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine @@ -63,15 +67,31 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) { - _network = network; + if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) + { + Subject.doAs(getSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + setNetworkConnection(network,sender); + return null; + } + }); + } + else + { + SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); + _network = network; + + _connection.setNetworkConnection(network); + _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); + // FIXME Two log messages to maintain compatibility with earlier protocol versions + SystemLog.message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); - _connection.setNetworkConnection(network); - _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); - // FIXME Two log messages to maintain compatibility with earlier protocol versions - _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); - _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); + } } private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) @@ -155,8 +175,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void readerIdle() { - _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE()); - _network.close(); + Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + SystemLog.message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); + } public String getAddress() @@ -189,4 +218,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return _connection.getConnectionId(); } + + @Override + public Subject getSubject() + { + return _connection.getAuthorizedSubject(); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 5bfc398bcf..5096223889 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -32,10 +32,8 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.connection.ConnectionPrincipal; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; @@ -65,7 +63,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); - private LogActor _actor; private final Subject _authorizedSubject = new Subject(); private Principal _authorizedPrincipal = null; @@ -87,7 +84,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _connectionId = connectionId; _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); - _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger()); } public Object getReference() @@ -112,7 +108,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _onOpenTask.run(); } - _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true)); + SystemLog.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true)); getVirtualHost().getConnectionRegistry().registerConnection(this); } @@ -135,7 +131,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { if(_logClosed.compareAndSet(false, true)) { - CurrentActor.get().message(this, ConnectionMessages.CLOSE()); + SystemLog.message(this, ConnectionMessages.CLOSE()); } } @@ -258,42 +254,31 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S Subject subject; if (event.isConnectionControl()) { - CurrentActor.set(_actor); subject = _authorizedSubject; } else { ServerSession channel = (ServerSession) getSession(event.getChannel()); - LogActor channelActor = null; if (channel != null) { subject = channel.getAuthorizedSubject(); - channelActor = channel.getLogActor(); } else { subject = _authorizedSubject; } - - CurrentActor.set(channelActor == null ? _actor : channelActor); } - try + Subject.doAs(subject, new PrivilegedAction<Void>() { - Subject.doAs(subject, new PrivilegedAction<Void>() + @Override + public Void run() { - @Override - public Void run() - { - ServerConnection.super.received(event); - return null; - } - }); - } - finally - { - CurrentActor.remove(); - } + ServerConnection.super.received(event); + return null; + } + }); + } public String toLogString() @@ -331,11 +316,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } } - public LogActor getLogActor() - { - return _actor; - } - public void close(AMQConstant cause, String message) { closeSubscriptions(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 5627b2eabe..453b3f85a5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.security.AccessController; import java.security.Principal; +import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.Collection; import java.util.Collections; @@ -43,15 +45,13 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; @@ -102,7 +102,6 @@ public class ServerSession extends Session private final UUID _id = UUID.randomUUID(); private final Subject _subject = new Subject(); private long _createTime = System.currentTimeMillis(); - private LogActor _actor = GenericActor.getInstance(this); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -161,18 +160,44 @@ public class ServerSession extends Session }); } - protected void setState(State state) + protected void setState(final State state) { - super.setState(state); - - if (state == State.OPEN) + if(runningAsSubject()) { - _actor.message(ChannelMessages.CREATE()); - if(_blocking.get()) + super.setState(state); + + if (state == State.OPEN) { - invokeBlock(); + SystemLog.message(ChannelMessages.CREATE()); + if(_blocking.get()) + { + invokeBlock(); + } } } + else + { + runAsSubject(new PrivilegedAction<Void>() { + + @Override + public Void run() + { + setState(state); + return null; + } + }); + + } + } + + private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction) + { + return Subject.doAs(getAuthorizedSubject(), privilegedAction); + } + + private boolean runningAsSubject() + { + return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext())); } private void invokeBlock() @@ -394,7 +419,7 @@ public class ServerSession extends Session { operationalLoggingMessage = ChannelMessages.CLOSE(); } - CurrentActor.get().message(getLogSubject(), operationalLoggingMessage); + SystemLog.message(getLogSubject(), operationalLoggingMessage); } @Override @@ -678,14 +703,10 @@ public class ServerSession extends Session return (ServerConnection) super.getConnection(); } - public LogActor getLogActor() - { - return _actor; - } public LogSubject getLogSubject() { - return (LogSubject) this; + return this; } public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) @@ -704,7 +725,7 @@ public class ServerSession extends Session } - private void block(Object queue, String name) + private void block(final Object queue, final String name) { synchronized (_blockingEntities) { @@ -717,7 +738,7 @@ public class ServerSession extends Session { invokeBlock(); } - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); + SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -735,25 +756,22 @@ public class ServerSession extends Session unblock(this); } - private void unblock(Object queue) + private void unblock(final Object queue) { - synchronized(_blockingEntities) + if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) { - if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) + if(_blocking.compareAndSet(true,false) && !isClosing()) { - if(_blocking.compareAndSet(true,false) && !isClosing()) - { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); + SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); - } } } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 9d7764414f..d21da824e9 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -29,6 +29,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.store.StoreException; @@ -345,7 +346,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); + SystemLog.message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index 421adb33a8..858482a034 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -19,8 +19,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -41,7 +39,6 @@ public class ServerSessionTest extends QpidTestCase super.setUp(); BrokerTestHelper.setUp(); _virtualHost = BrokerTestHelper.createVirtualHost(getName()); - GenericActor.setDefaultMessageLogger(CurrentActor.get().getRootMessageLogger()); } @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 99068a9d6c..e248fc539a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; import java.security.AccessControlException; +import java.security.PrivilegedAction; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -54,11 +55,9 @@ import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.AMQPChannelActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; @@ -154,7 +153,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final AtomicBoolean _blocking = new AtomicBoolean(false); - private LogActor _actor; private LogSubject _logSubject; private volatile boolean _rollingBack; @@ -178,19 +176,17 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private Subject _subject; - public AMQChannel(T session, int channelId, MessageStore messageStore) + public AMQChannel(T session, int channelId, final MessageStore messageStore) throws AMQException { _session = session; _channelId = channelId; - _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(), session.getAuthorizedSubject().getPublicCredentials(), session.getAuthorizedSubject().getPrivateCredentials()); _subject.getPrincipals().add(new SessionPrincipal(this)); _logSubject = new ChannelLogSubject(this); - _actor.message(ChannelMessages.CREATE()); _messageStore = messageStore; @@ -214,6 +210,18 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } }); + + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + SystemLog.message(ChannelMessages.CREATE()); + + return null; + } + }); + } /** Sets this channel to be part of a local transaction */ @@ -449,12 +457,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } else { - _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(), - _currentMessage.getMessagePublishInfo().getRoutingKey() == null - ? null - : _currentMessage.getMessagePublishInfo() - .getRoutingKey() - .toString())); + SystemLog.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(), + _currentMessage.getMessagePublishInfo().getRoutingKey() + == null + ? null + : _currentMessage.getMessagePublishInfo() + .getRoutingKey() + .toString())); } } @@ -684,7 +693,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> LogMessage operationalLogMessage = cause == null ? ChannelMessages.CLOSE() : ChannelMessages.CLOSE_FORCED(cause.getCode(), message); - CurrentActor.get().message(_logSubject, operationalLogMessage); + SystemLog.message(_logSubject, operationalLogMessage); unsubscribeAllConsumers(); @@ -977,7 +986,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // Log Flow Started before we start the subscriptions if (!suspended) { - _actor.message(_logSubject, ChannelMessages.FLOW("Started")); + SystemLog.message(_logSubject, ChannelMessages.FLOW("Started")); } @@ -1028,7 +1037,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // stopped. if (suspended) { - _actor.message(_logSubject, ChannelMessages.FLOW("Stopped")); + SystemLog.message(_logSubject, ChannelMessages.FLOW("Stopped")); } } @@ -1174,7 +1183,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public void setCredit(final long prefetchSize, final int prefetchCount) { - _actor.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); + SystemLog.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); _creditManager.setCreditLimits(prefetchSize, prefetchCount); } @@ -1431,19 +1440,13 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } } - - public LogActor getLogActor() - { - return _actor; - } - public synchronized void block() { if(_blockingEntities.add(this)) { if(_blocking.compareAndSet(false,true)) { - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); + SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); flow(false); } } @@ -1455,7 +1458,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED()); flow(true); } @@ -1469,7 +1472,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if(_blocking.compareAndSet(false,true)) { - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); + SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); flow(false); } } @@ -1481,8 +1484,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); - + SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED()); flow(true); } } @@ -1552,14 +1554,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>> else { final ServerMessage msg = rejectedQueueEntry.getMessage(); - final Consumer sub = rejectedQueueEntry.getDeliveredConsumer(); + int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>() { @Override public void performAction(final MessageInstance requeueEntry) { - _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + SystemLog.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), requeueEntry.getOwningResource().getName())); } }, null); @@ -1577,7 +1579,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if (altExchange == null) { _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); + SystemLog.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); } else @@ -1585,7 +1587,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _logger.debug( "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, + SystemLog.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 167a505c19..80c4a9fde7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -55,15 +56,12 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.model.Broker; @@ -143,7 +141,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final long _connectionID; private Object _reference = new Object(); - private AMQPConnectionActor _actor; private LogSubject _logSubject; private long _lastIoTime; @@ -172,7 +169,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _readBytes; public AMQProtocolEngine(Broker broker, - NetworkConnection network, + final NetworkConnection network, final long connectionId, Port port, Transport transport) @@ -184,21 +181,44 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); _codecFactory = new AMQCodecFactory(true, this); - - setNetworkConnection(network); _connectionID = connectionId; + _logSubject = new ConnectionLogSubject(this); - _actor = new AMQPConnectionActor(this, _broker.getRootMessageLogger()); _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); + runAsSubject(new PrivilegedAction<Void>() + { - _logSubject = new ConnectionLogSubject(this); + @Override + public Void run() + { + setNetworkConnection(network); + + SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); + + _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE); - _actor.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); + initialiseStatistics(); - _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE); + return null; + } + }); - initialiseStatistics(); + } + private <T> T runAsSubject(PrivilegedAction<T> action) + { + return Subject.doAs(getAuthorizedSubject(), action); + } + + private boolean runningAsSubject() + { + return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext())); + } + + @Override + public Subject getSubject() + { + return _authorizedSubject; } public void setNetworkConnection(NetworkConnection network) @@ -217,11 +237,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _connectionID; } - public LogActor getLogActor() - { - return _actor; - } - public void setMaxFrameSize(long frameMax) { _maxFrameSize = frameMax; @@ -400,74 +415,57 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi AMQBody body = frame.getBodyFrame(); - //Look up the Channel's Actor and set that as the current actor - // If that is not available then we can use the ConnectionActor - // that is associated with this AMQMPSession. - LogActor channelActor = null; - if (amqChannel != null) + long startTime = 0; + String frameToString = null; + if (_logger.isDebugEnabled()) { - channelActor = amqChannel.getLogActor(); + startTime = System.currentTimeMillis(); + frameToString = frame.toString(); + _logger.debug("RECV: " + frame); } - CurrentActor.set(channelActor == null ? _actor : channelActor); - try + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) { - long startTime = 0; - String frameToString = null; - if (_logger.isDebugEnabled()) + if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) { - startTime = System.currentTimeMillis(); - frameToString = frame.toString(); - _logger.debug("RECV: " + frame); - } - - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else + if (_logger.isInfoEnabled()) { - // The channel has been told to close, we don't process any more frames until - // it's closed. - return; + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); } } - - try - { - body.handle(channelId, this); - } - catch(AMQConnectionException e) - { - _logger.info(e.getMessage() + " whilst processing frame: " + body); - closeConnection(channelId, e); - throw e; - } - catch (AMQException e) - { - closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); - throw e; - } - catch (TransportException e) + else { - closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage()); - throw e; + // The channel has been told to close, we don't process any more frames until + // it's closed. + return; } + } - if(_logger.isDebugEnabled()) - { - _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); - } + try + { + body.handle(channelId, this); } - finally + catch(AMQConnectionException e) + { + _logger.info(e.getMessage() + " whilst processing frame: " + body); + closeConnection(channelId, e); + throw e; + } + catch (AMQException e) { - CurrentActor.remove(); + closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); + throw e; + } + catch (TransportException e) + { + closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage()); + throw e; + } + + if(_logger.isDebugEnabled()) + { + _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString); } } @@ -478,7 +476,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi try { // Log incoming protocol negotiation request - _actor.message(ConnectionMessages.OPEN(null, pi.getProtocolMajor() + "-" + pi.getProtocolMinor(), null, null, false, true, false, false)); + SystemLog.message(ConnectionMessages.OPEN(null, + pi.getProtocolMajor() + "-" + pi.getProtocolMinor(), + null, + null, + false, + true, + false, + false)); ProtocolVersion pv = pi.checkVersion(); // Fails if not correct @@ -778,22 +783,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = value; } - public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException - { - if ((channel != null) && channel.isTransactional()) - { - channel.commit(); - } - } - - public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException - { - if ((channel != null) && channel.isTransactional()) - { - channel.rollback(); - } - } - /** * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> @@ -908,77 +897,89 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi @Override public void closeSession() { - if(_closing.compareAndSet(false,true)) - { - // force sync of outstanding async work - _receivedLock.lock(); - try - { - receivedComplete(); - } - finally - { - _receivedLock.unlock(); - } - // REMOVE THIS SHOULD NOT BE HERE. - if (CurrentActor.get() == null) - { - CurrentActor.set(_actor); - } - if (!_closed) + if(runningAsSubject()) + { + if(_closing.compareAndSet(false,true)) { - if (_virtualHost != null) + // force sync of outstanding async work + _receivedLock.lock(); + try { - _virtualHost.getConnectionRegistry().deregisterConnection(this); + receivedComplete(); } - - closeAllChannels(); - - for (Action<? super AMQProtocolEngine> task : _taskList) + finally { - task.performAction(this); + _receivedLock.unlock(); } - synchronized(this) + if (!_closed) { - _closed = true; - notifyAll(); + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + + closeAllChannels(); + + for (Action<? super AMQProtocolEngine> task : _taskList) + { + task.performAction(this); + } + + synchronized(this) + { + _closed = true; + notifyAll(); + } + SystemLog.message(_logSubject, ConnectionMessages.CLOSE()); } - CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE()); } - } - else - { - synchronized(this) + else { + synchronized(this) + { - boolean lockHeld = _receivedLock.isHeldByCurrentThread(); + boolean lockHeld = _receivedLock.isHeldByCurrentThread(); - while(!_closed) - { - try + while(!_closed) { - if(lockHeld) + try + { + if(lockHeld) + { + _receivedLock.unlock(); + } + wait(1000); + } + catch (InterruptedException e) { - _receivedLock.unlock(); + // do nothing } - wait(1000); - } - catch (InterruptedException e) - { - // do nothing - } - finally - { - if(lockHeld) + finally { - _receivedLock.lock(); + if(lockHeld) + { + _receivedLock.lock(); + } } } } } } + else + { + runAsSubject(new PrivilegedAction<Object>() + { + @Override + public Object run() + { + closeSession(); + return null; + } + }); + + } } private void closeConnection(int channelId, AMQConnectionException e) @@ -1091,7 +1092,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi setContextKey(new AMQShortString(clientId)); } - _actor.message(ConnectionMessages.OPEN(clientId, _protocolVersion.toString(), _clientVersion, _clientProduct, true, true, true, true)); + SystemLog.message(ConnectionMessages.OPEN(clientId, + _protocolVersion.toString(), + _clientVersion, + _clientProduct, + true, + true, + true, + true)); } } @@ -1361,40 +1369,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getRemoteAddress()); } - public void mgmtCloseChannel(int channelId) - { - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("The channel was closed using the broker's management interface."), - 0,0); - - // This seems ugly but because we use AMQChannel.close() in both normal - // broker operation and as part of the management interface it cannot - // be avoided. The Current Actor will be null when this method is - // called via the QMF management interface. As such we need to set one. - boolean removeActor = false; - if (CurrentActor.get() == null) - { - removeActor = true; - CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger())); - } - - try - { - writeFrame(responseBody.generateFrame(channelId)); - closeChannel(channelId); - } - finally - { - if (removeActor) - { - CurrentActor.remove(); - } - } - } - public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message) { int channelId = session.getChannelId(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 045367b667..ade087ef21 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -35,9 +35,7 @@ import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -48,8 +46,6 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> { long getSessionID(); - LogActor getLogActor(); - void setMaxFrameSize(long frameMax); long getMaxFrameSize(); @@ -202,14 +198,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> void setMaximumNumberOfChannels(Long value); - void commitTransactions(AMQChannel<T> channel) throws AMQException; - - void rollbackTransactions(AMQChannel<T> channel) throws AMQException; - List<AMQChannel<T>> getChannels(); - void mgmtCloseChannel(int channelId); - public Principal getPeerPrincipal(); Lock getReceivedLock(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index f82689e36a..3c3c17a6fb 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -511,6 +511,12 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut return _connectionId; } + @Override + public Subject getSubject() + { + return _connection.getSubject(); + } + public long getLastReadTime() { return _lastReadTime; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index d11b1e029c..9a4adbc4c7 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -33,7 +33,7 @@ import javax.servlet.DispatcherType; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.management.plugin.filter.ForbiddingAuthorisationFilter; import org.apache.qpid.server.management.plugin.filter.RedirectingAuthorisationFilter; @@ -140,7 +140,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem private void start() { - CurrentActor.get().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); Collection<Port> httpPorts = getHttpPorts(getBroker().getPorts()); _server = createServer(httpPorts); @@ -154,7 +154,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem throw new ServerScopedRuntimeException("Failed to start HTTP management on ports : " + httpPorts, e); } - CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); } private void stop() @@ -172,7 +172,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } } - CurrentActor.get().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); } public int getSessionTimeout() @@ -381,13 +381,14 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem Connector[] connectors = server.getConnectors(); for (Connector connector : connectors) { - CurrentActor.get().message(ManagementConsoleMessages.LISTENING(stringifyConnectorScheme(connector), connector.getPort())); + SystemLog.message(ManagementConsoleMessages.LISTENING(stringifyConnectorScheme(connector), + connector.getPort())); if (connector instanceof SslSocketConnector) { SslContextFactory sslContextFactory = ((SslSocketConnector)connector).getSslContextFactory(); if (sslContextFactory != null && sslContextFactory.getKeyStorePath() != null) { - CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(sslContextFactory.getKeyStorePath())); + SystemLog.message(ManagementConsoleMessages.SSL_KEYSTORE(sslContextFactory.getKeyStorePath())); } } } @@ -398,7 +399,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem Connector[] connectors = server.getConnectors(); for (Connector connector : connectors) { - CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN(stringifyConnectorScheme(connector), connector.getPort())); + SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN(stringifyConnectorScheme(connector), + connector.getPort())); } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java index 5d39b3c0c1..023453e74f 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java @@ -22,11 +22,8 @@ package org.apache.qpid.server.management.plugin; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.security.AccessControlException; import java.security.Principal; import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; import java.security.cert.X509Certificate; import java.util.Collections; @@ -37,9 +34,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; import org.apache.commons.codec.binary.Base64; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.HttpManagementActor; import org.apache.qpid.server.management.plugin.servlet.ServletConnectionPrincipal; import org.apache.qpid.server.management.plugin.session.LoginLogoutReporter; import org.apache.qpid.server.model.AuthenticationProvider; @@ -51,7 +45,6 @@ import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationS import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory; -import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.transport.network.security.ssl.SSLUtil; public class HttpManagementUtil @@ -117,55 +110,33 @@ public class HttpManagementUtil subject.getPrincipals().add(new ServletConnectionPrincipal(request)); subject.setReadOnly(); - LogActor actor = createHttpManagementActor(broker, request); + assertManagementAccess(broker.getSecurityManager(), subject); - assertManagementAccess(broker.getSecurityManager(), subject, actor); - - saveAuthorisedSubject(session, subject, actor); + saveAuthorisedSubject(session, subject); } } - public static void assertManagementAccess(final SecurityManager securityManager, Subject subject, LogActor actor) + public static void assertManagementAccess(final SecurityManager securityManager, Subject subject) { - CurrentActor.set(actor); - try + Subject.doAs(subject, new PrivilegedAction<Void>() { - Subject.doAs(subject, new PrivilegedAction<Void>() + @Override + public Void run() { - @Override - public Void run() - { - securityManager.accessManagement(); - return null; - } - }); - } - finally - { - CurrentActor.remove(); - } - } - - public static HttpManagementActor getOrCreateAndCacheLogActor(HttpServletRequest request, Broker broker) - { - HttpSession session = request.getSession(); - HttpManagementActor actor = (HttpManagementActor) session.getAttribute(ATTR_LOG_ACTOR); - if (actor == null) - { - actor = createHttpManagementActor(broker, request); - session.setAttribute(ATTR_LOG_ACTOR, actor); - } - return actor; + securityManager.accessManagement(); + return null; + } + }); } - public static void saveAuthorisedSubject(HttpSession session, Subject subject, LogActor logActor) + public static void saveAuthorisedSubject(HttpSession session, Subject subject) { session.setAttribute(ATTR_SUBJECT, subject); // Cause the user logon to be logged. - session.setAttribute(ATTR_LOGIN_LOGOUT_REPORTER, new LoginLogoutReporter(logActor, subject)); + session.setAttribute(ATTR_LOGIN_LOGOUT_REPORTER, new LoginLogoutReporter(subject)); } public static Subject tryToAuthenticate(HttpServletRequest request, HttpManagementConfiguration managementConfig) @@ -245,9 +216,5 @@ public class HttpManagementUtil return null; } - private static HttpManagementActor createHttpManagementActor(Broker broker, HttpServletRequest request) - { - return new HttpManagementActor(broker.getRootMessageLogger(), request.getRemoteAddr(), request.getRemotePort()); - } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java index 0925e2b088..18a026ec93 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/ServletConnectionPrincipal.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.server.management.plugin.servlet; +import org.apache.qpid.server.security.auth.ManagementConnectionPrincipal; import org.apache.qpid.server.security.auth.SocketConnectionPrincipal; import javax.servlet.ServletRequest; import java.net.InetSocketAddress; import java.net.SocketAddress; -public class ServletConnectionPrincipal implements SocketConnectionPrincipal +public class ServletConnectionPrincipal implements ManagementConnectionPrincipal { private final InetSocketAddress _address; @@ -74,4 +75,10 @@ public class ServletConnectionPrincipal implements SocketConnectionPrincipal { return _address.hashCode(); } + + @Override + public String getType() + { + return "HTTP"; + } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java index 1133b6e091..a9e80db3bf 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java @@ -33,13 +33,9 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.HttpManagementActor; import org.apache.qpid.server.management.plugin.HttpManagementConfiguration; import org.apache.qpid.server.management.plugin.HttpManagementUtil; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -88,7 +84,6 @@ public abstract class AbstractServlet extends HttpServlet /** * Performs the GET action as the logged-in {@link Subject}. - * The {@link LogActor} is set before this method is called. * Subclasses commonly override this method */ protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse resp) throws ServletException, IOException @@ -117,7 +112,6 @@ public abstract class AbstractServlet extends HttpServlet /** * Performs the POST action as the logged-in {@link Subject}. - * The {@link LogActor} is set before this method is called. * Subclasses commonly override this method */ protected void doPostWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException @@ -145,7 +139,6 @@ public abstract class AbstractServlet extends HttpServlet /** * Performs the PUT action as the logged-in {@link Subject}. - * The {@link LogActor} is set before this method is called. * Subclasses commonly override this method */ protected void doPutWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException @@ -174,7 +167,6 @@ public abstract class AbstractServlet extends HttpServlet /** * Performs the PUT action as the logged-in {@link Subject}. - * The {@link LogActor} is set before this method is called. * Subclasses commonly override this method */ protected void doDeleteWithSubjectAndActor(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException @@ -198,8 +190,6 @@ public abstract class AbstractServlet extends HttpServlet return; } - HttpManagementActor logActor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, _broker); - CurrentActor.set(logActor); try { Subject.doAs(subject, privilegedExceptionAction); @@ -223,11 +213,6 @@ public abstract class AbstractServlet extends HttpServlet } throw new ConnectionScopedRuntimeException(e.getCause()); } - finally - { - CurrentActor.remove(); - } - } protected Subject getAuthorisedSubject(HttpServletRequest request) diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java index b59e9bc04d..2ca67fadc9 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java @@ -27,7 +27,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.management.plugin.HttpManagementConfiguration; import org.apache.qpid.server.management.plugin.HttpManagementUtil; import org.apache.qpid.server.model.Broker; @@ -247,10 +246,9 @@ public class SaslServlet extends AbstractServlet subject.setReadOnly(); Broker broker = getBroker(); - LogActor actor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, broker); try { - HttpManagementUtil.assertManagementAccess(broker.getSecurityManager(), subject, actor); + HttpManagementUtil.assertManagementAccess(broker.getSecurityManager(), subject); } catch(SecurityException e) { @@ -258,7 +256,7 @@ public class SaslServlet extends AbstractServlet return; } - HttpManagementUtil.saveAuthorisedSubject(request.getSession(), subject, actor); + HttpManagementUtil.saveAuthorisedSubject(request.getSession(), subject); session.removeAttribute(ATTR_ID); session.removeAttribute(ATTR_SASL_SERVER); session.removeAttribute(ATTR_EXPIRY); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java index 238f1b4719..5155654e82 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporter.java @@ -28,7 +28,7 @@ import javax.servlet.http.HttpSessionBindingEvent; import javax.servlet.http.HttpSessionBindingListener; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -40,14 +40,12 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; public class LoginLogoutReporter implements HttpSessionBindingListener { private static final Logger LOGGER = Logger.getLogger(LoginLogoutReporter.class); - private final LogActor _logActor; private final Subject _subject; private final Principal _principal; - public LoginLogoutReporter(LogActor logActor, Subject subject) + public LoginLogoutReporter(Subject subject) { super(); - _logActor = logActor; _subject = subject; _principal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_subject); } @@ -76,7 +74,7 @@ public class LoginLogoutReporter implements HttpSessionBindingListener @Override public Void run() { - _logActor.message(ManagementConsoleMessages.OPEN(_principal.getName())); + SystemLog.message(ManagementConsoleMessages.OPEN(_principal.getName())); return null; } }); @@ -94,7 +92,7 @@ public class LoginLogoutReporter implements HttpSessionBindingListener @Override public Void run() { - _logActor.message(ManagementConsoleMessages.CLOSE(_principal.getName())); + SystemLog.message(ManagementConsoleMessages.CLOSE(_principal.getName())); return null; } }); diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java index 1d43c44587..16ae5220ab 100644 --- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java +++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/session/LoginLogoutReporterTest.java @@ -19,16 +19,20 @@ */ package org.apache.qpid.server.management.plugin.session; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.when; import javax.security.auth.Subject; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.mockito.ArgumentMatcher; -import org.mockito.Mockito; import junit.framework.TestCase; @@ -36,7 +40,7 @@ public class LoginLogoutReporterTest extends TestCase { private LoginLogoutReporter _loginLogoutReport; private Subject _subject = new Subject(); - private LogActor _logActor = Mockito.mock(LogActor.class); + private RootMessageLogger _logger = mock(RootMessageLogger.class); @Override protected void setUp() throws Exception @@ -44,19 +48,22 @@ public class LoginLogoutReporterTest extends TestCase super.setUp(); _subject.getPrincipals().add(new AuthenticatedPrincipal("mockusername")); - _loginLogoutReport = new LoginLogoutReporter(_logActor, _subject); + when(_logger.isEnabled()).thenReturn(true); + when(_logger.isMessageEnabled(anyString())).thenReturn(true); + SystemLog.setRootMessageLogger(_logger); + _loginLogoutReport = new LoginLogoutReporter(_subject); } public void testLoginLogged() { _loginLogoutReport.valueBound(null); - verify(_logActor).message(isLogMessageWithMessage("MNG-1007 : Open : User mockusername")); + verify(_logger).message(isLogMessageWithMessage("MNG-1007 : Open : User mockusername")); } public void testLogoutLogged() { _loginLogoutReport.valueUnbound(null); - verify(_logActor).message(isLogMessageWithMessage("MNG-1008 : Close : User mockusername")); + verify(_logger).message(isLogMessageWithMessage("MNG-1008 : Close : User mockusername")); } private LogMessage isLogMessageWithMessage(final String expectedMessage) diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java index e2b9a98784..8b1463b476 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java @@ -20,10 +20,9 @@ */ package org.apache.qpid.server.jmx; -import javax.net.ssl.KeyManager; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.KeyStore; @@ -32,7 +31,6 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.security.auth.jmx.JMXPasswordAuthenticator; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.ssl.SSLContextFactory; import javax.management.JMException; import javax.management.MBeanServer; @@ -99,12 +97,12 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry @Override public void start() throws IOException { - CurrentActor.get().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); //check if system properties are set to use the JVM's out-of-the-box JMXAgent if (areOutOfTheBoxJMXOptionsSet()) { - CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); } else { @@ -138,7 +136,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry throw new ServerScopedRuntimeException("Unable to create SSLContext for key store", e); } - CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName())); + SystemLog.message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName())); //create the SSL RMI socket factories csf = new SslRMIClientSocketFactory(); @@ -250,8 +248,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry _cs.start(); String connectorServer = (connectorSslEnabled ? "SSL " : "") + "JMX RMIConnectorServer"; - CurrentActor.get().message(ManagementConsoleMessages.LISTENING(connectorServer, jmxPortConnectorServer)); - CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.LISTENING(connectorServer, jmxPortConnectorServer)); + SystemLog.message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); } private Registry createRmiRegistry(int jmxPortRegistryServer, boolean useCustomRmiRegistry) @@ -269,7 +267,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry rmiRegistry = LocateRegistry.createRegistry(jmxPortRegistryServer, null, null); } - CurrentActor.get().message(ManagementConsoleMessages.LISTENING("RMI Registry", jmxPortRegistryServer)); + SystemLog.message(ManagementConsoleMessages.LISTENING("RMI Registry", jmxPortRegistryServer)); return rmiRegistry; } @@ -294,7 +292,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry unregisterAllMbeans(); - CurrentActor.get().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); + SystemLog.message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); } private void closeConnectorAndRegistryServers() @@ -338,7 +336,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry if (_rmiRegistry != null) { // Stopping the RMI registry - CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _registryPort.getPort())); + SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _registryPort.getPort())); try { boolean success = UnicastRemoteObject.unexportObject(_rmiRegistry, false); @@ -365,7 +363,8 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry // Stopping the JMX ConnectorServer try { - CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer", _cs.getAddress().getPort())); + SystemLog.message(ManagementConsoleMessages.SHUTTING_DOWN("JMX RMIConnectorServer", + _cs.getAddress().getPort())); _cs.stop(); } catch (IOException e) diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java index a38addae7b..b20685985f 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java @@ -23,8 +23,6 @@ package org.apache.qpid.server.jmx; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; @@ -39,17 +37,13 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.RuntimeErrorException; import javax.management.remote.MBeanServerForwarder; -import javax.management.remote.rmi.RMIServer; import javax.security.auth.Subject; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; -import java.rmi.server.RemoteServer; -import java.rmi.server.ServerNotActiveException; import java.security.AccessControlContext; import java.security.AccessController; -import java.security.PrivilegedAction; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; @@ -64,7 +58,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate"; private MBeanServer _mbs; - private final ManagementActor _logActor; private final boolean _managementRightsInferAllAccess; private final Broker _broker; @@ -73,7 +66,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler { _managementRightsInferAllAccess = Boolean.valueOf(System.getProperty(BrokerProperties.PROPERTY_MANAGEMENT_RIGHTS_INFER_ALL_ACCESS, "true")); _broker = broker; - _logActor = new ManagementActor(broker.getRootMessageLogger()); } public static MBeanServerForwarder newProxyInstance(Broker broker) @@ -117,9 +109,9 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler return false; } - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { - String methodName = method.getName(); + final String methodName = method.getName(); if (methodName.equals("getMBeanServer")) { @@ -167,16 +159,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler throw new SecurityException("Access denied: no authenticated principal", e); } - // Save the subject - CurrentActor.set(_logActor); - try - { - return authoriseAndInvoke(method, args); - } - finally - { - CurrentActor.remove(); - } + return authoriseAndInvoke(method, args); } catch (InvocationTargetException e) { @@ -207,7 +190,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler } } - private Object authoriseAndInvoke(final Method method, final Object[] args) throws Throwable + private Object authoriseAndInvoke(final Method method, final Object[] args) throws Exception { String methodName; // Get the component, type and impact, which may be null @@ -239,8 +222,11 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler { try { + Subject subject = Subject.getSubject(AccessController.getContext()); + subject = new Subject(false, subject.getPrincipals(), subject.getPublicCredentials(), subject.getPrivateCredentials()); + subject.getPrincipals().addAll(SecurityManager.SYSTEM.getPrincipals()); - return Subject.doAs(SecurityManager.SYSTEM, new PrivilegedExceptionAction<Object>() + return Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { @Override public Object run() throws IllegalAccessException, InvocationTargetException @@ -251,7 +237,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler } catch (PrivilegedActionException e) { - throw e.getCause(); + throw (Exception) e.getCause(); } } else diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java index ae0574dc21..79d944fc5c 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporter.java @@ -27,12 +27,19 @@ import javax.management.Notification; import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.management.remote.JMXConnectionNotification; +import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.ManagementActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.security.auth.jmx.JMXConnectionPrincipal; + +import java.rmi.server.RemoteServer; +import java.rmi.server.ServerNotActiveException; +import java.security.PrivilegedAction; +import java.util.Collections; public class ManagementLogonLogoffReporter implements NotificationListener, NotificationFilter { @@ -65,19 +72,41 @@ public class ManagementLogonLogoffReporter implements NotificationListener, Not final String[] splitConnectionId = connectionId.split(" "); user = splitConnectionId[1]; } + Subject originalSubject = new Subject(false, Collections.singleton(new AuthenticatedPrincipal(user)), Collections.emptySet(), Collections.emptySet()); + Subject subject; - // use a separate instance of actor as subject is not set on connect/disconnect - // we need to pass principal name explicitly into log actor - LogActor logActor = new ManagementActor(_rootMessageLogger, user); - if (JMXConnectionNotification.OPENED.equals(type)) + try { - logActor.message(ManagementConsoleMessages.OPEN(user)); + String clientHost = RemoteServer.getClientHost(); + subject = new Subject(false, + originalSubject.getPrincipals(), + originalSubject.getPublicCredentials(), + originalSubject.getPrivateCredentials()); + subject.getPrincipals().add(new JMXConnectionPrincipal(clientHost)); + subject.setReadOnly(); } - else if (JMXConnectionNotification.CLOSED.equals(type) || - JMXConnectionNotification.FAILED.equals(type)) + catch(ServerNotActiveException e) { - logActor.message(ManagementConsoleMessages.CLOSE(user)); + subject = originalSubject; } + final String username = user; + Subject.doAs(subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + if (JMXConnectionNotification.OPENED.equals(type)) + { + SystemLog.message(ManagementConsoleMessages.OPEN(username)); + } + else if (JMXConnectionNotification.CLOSED.equals(type) || + JMXConnectionNotification.FAILED.equals(type)) + { + SystemLog.message(ManagementConsoleMessages.CLOSE(username)); + } + return null; + } + }); } @Override diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java index ba9c2cdaa5..515a9d88f2 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/ManagementLogonLogoffReporterTest.java @@ -23,6 +23,7 @@ import static javax.management.remote.JMXConnectionNotification.OPENED; import static javax.management.remote.JMXConnectionNotification.CLOSED; import static javax.management.remote.JMXConnectionNotification.FAILED; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; @@ -31,10 +32,14 @@ import static org.mockito.Matchers.anyString; import javax.management.remote.JMXConnectionNotification; -import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.RootMessageLogger; import junit.framework.TestCase; +import org.apache.qpid.server.logging.SystemLog; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.mockito.ArgumentMatcher; public class ManagementLogonLogoffReporterTest extends TestCase { @@ -52,8 +57,8 @@ public class ManagementLogonLogoffReporterTest extends TestCase _usernameAccessor = mock(UsernameAccessor.class); _rootMessageLogger = mock(RootMessageLogger.class); // Enable messaging so we can valid the generated strings - when(_rootMessageLogger.isMessageEnabled(any(LogActor.class), anyString())).thenReturn(true); - + when(_rootMessageLogger.isMessageEnabled(anyString())).thenReturn(true); + SystemLog.setRootMessageLogger(_rootMessageLogger); _reporter = new ManagementLogonLogoffReporter(_rootMessageLogger, _usernameAccessor); } @@ -64,7 +69,21 @@ public class ManagementLogonLogoffReporterTest extends TestCase _reporter.handleNotification(openNotification, null); - verify(_rootMessageLogger).rawMessage("[main] MNG-1007 : Open : User jmxuser", "qpid.message.managementconsole.open"); + verify(_rootMessageLogger).message(messageMatch("MNG-1007 : Open : User jmxuser", + "qpid.message.managementconsole.open")); + } + + private LogMessage messageMatch(final String message, final String hierarchy) + { + return argThat(new ArgumentMatcher<LogMessage>() + { + @Override + public boolean matches(final Object argument) + { + LogMessage actual = (LogMessage) argument; + return actual.getLogHierarchy().equals(hierarchy) && actual.toString().equals(message); + } + }); } public void testClosedNotification() @@ -74,7 +93,7 @@ public class ManagementLogonLogoffReporterTest extends TestCase _reporter.handleNotification(closeNotification, null); - verify(_rootMessageLogger).rawMessage("[main] MNG-1008 : Close : User jmxuser", "qpid.message.managementconsole.close"); + verify(_rootMessageLogger).message(messageMatch("MNG-1008 : Close : User jmxuser", "qpid.message.managementconsole.close")); } public void tesNotifiedForLogOnTypeEvents() |
