diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-07 16:36:26 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-07 16:36:26 +0000 |
| commit | 9e9d87a76f12567c2e0f65485bbf85b39fc6e437 (patch) | |
| tree | e8caf31a36a4618fbaa33b765ca68f9c48942226 /qpid/java/broker-plugins/amqp-0-10-protocol | |
| parent | 735d2fbd9b1cf73410363d23591d50fe34c61dc4 (diff) | |
| download | qpid-python-9e9d87a76f12567c2e0f65485bbf85b39fc6e437.tar.gz | |
QPID-5611 : [Java Broker] remove LogActors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575315 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol')
6 files changed, 111 insertions, 82 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 120ba2d951..d41af30a09 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -22,8 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -387,7 +386,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC protected void sendToDLQOrDiscard(MessageInstance entry) { - final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); int requeues = entry.routeToAlternate(new Action<MessageInstance>() @@ -395,7 +393,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override public void performAction(final MessageInstance requeueEntry) { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + SystemLog.message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), requeueEntry.getOwningResource().getName())); } }, null); @@ -410,12 +408,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC if(alternateExchange != null) { - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + SystemLog.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); } else { - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + SystemLog.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 9fe1babe20..9346e3e7bb 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; @@ -30,8 +31,11 @@ import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; +import javax.security.auth.Subject; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine @@ -63,15 +67,31 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) { - _network = network; + if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) + { + Subject.doAs(getSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + setNetworkConnection(network,sender); + return null; + } + }); + } + else + { + SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); + _network = network; + + _connection.setNetworkConnection(network); + _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); + // FIXME Two log messages to maintain compatibility with earlier protocol versions + SystemLog.message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); - _connection.setNetworkConnection(network); - _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); - // FIXME Two log messages to maintain compatibility with earlier protocol versions - _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); - _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); + } } private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) @@ -155,8 +175,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void readerIdle() { - _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE()); - _network.close(); + Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + SystemLog.message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); + } public String getAddress() @@ -189,4 +218,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return _connection.getConnectionId(); } + + @Override + public Subject getSubject() + { + return _connection.getAuthorizedSubject(); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 5bfc398bcf..5096223889 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -32,10 +32,8 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.connection.ConnectionPrincipal; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; @@ -65,7 +63,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); - private LogActor _actor; private final Subject _authorizedSubject = new Subject(); private Principal _authorizedPrincipal = null; @@ -87,7 +84,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _connectionId = connectionId; _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); - _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger()); } public Object getReference() @@ -112,7 +108,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _onOpenTask.run(); } - _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true)); + SystemLog.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true)); getVirtualHost().getConnectionRegistry().registerConnection(this); } @@ -135,7 +131,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { if(_logClosed.compareAndSet(false, true)) { - CurrentActor.get().message(this, ConnectionMessages.CLOSE()); + SystemLog.message(this, ConnectionMessages.CLOSE()); } } @@ -258,42 +254,31 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S Subject subject; if (event.isConnectionControl()) { - CurrentActor.set(_actor); subject = _authorizedSubject; } else { ServerSession channel = (ServerSession) getSession(event.getChannel()); - LogActor channelActor = null; if (channel != null) { subject = channel.getAuthorizedSubject(); - channelActor = channel.getLogActor(); } else { subject = _authorizedSubject; } - - CurrentActor.set(channelActor == null ? _actor : channelActor); } - try + Subject.doAs(subject, new PrivilegedAction<Void>() { - Subject.doAs(subject, new PrivilegedAction<Void>() + @Override + public Void run() { - @Override - public Void run() - { - ServerConnection.super.received(event); - return null; - } - }); - } - finally - { - CurrentActor.remove(); - } + ServerConnection.super.received(event); + return null; + } + }); + } public String toLogString() @@ -331,11 +316,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } } - public LogActor getLogActor() - { - return _actor; - } - public void close(AMQConstant cause, String message) { closeSubscriptions(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 5627b2eabe..453b3f85a5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.security.AccessController; import java.security.Principal; +import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.Collection; import java.util.Collections; @@ -43,15 +45,13 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; @@ -102,7 +102,6 @@ public class ServerSession extends Session private final UUID _id = UUID.randomUUID(); private final Subject _subject = new Subject(); private long _createTime = System.currentTimeMillis(); - private LogActor _actor = GenericActor.getInstance(this); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -161,18 +160,44 @@ public class ServerSession extends Session }); } - protected void setState(State state) + protected void setState(final State state) { - super.setState(state); - - if (state == State.OPEN) + if(runningAsSubject()) { - _actor.message(ChannelMessages.CREATE()); - if(_blocking.get()) + super.setState(state); + + if (state == State.OPEN) { - invokeBlock(); + SystemLog.message(ChannelMessages.CREATE()); + if(_blocking.get()) + { + invokeBlock(); + } } } + else + { + runAsSubject(new PrivilegedAction<Void>() { + + @Override + public Void run() + { + setState(state); + return null; + } + }); + + } + } + + private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction) + { + return Subject.doAs(getAuthorizedSubject(), privilegedAction); + } + + private boolean runningAsSubject() + { + return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext())); } private void invokeBlock() @@ -394,7 +419,7 @@ public class ServerSession extends Session { operationalLoggingMessage = ChannelMessages.CLOSE(); } - CurrentActor.get().message(getLogSubject(), operationalLoggingMessage); + SystemLog.message(getLogSubject(), operationalLoggingMessage); } @Override @@ -678,14 +703,10 @@ public class ServerSession extends Session return (ServerConnection) super.getConnection(); } - public LogActor getLogActor() - { - return _actor; - } public LogSubject getLogSubject() { - return (LogSubject) this; + return this; } public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) @@ -704,7 +725,7 @@ public class ServerSession extends Session } - private void block(Object queue, String name) + private void block(final Object queue, final String name) { synchronized (_blockingEntities) { @@ -717,7 +738,7 @@ public class ServerSession extends Session { invokeBlock(); } - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); + SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -735,25 +756,22 @@ public class ServerSession extends Session unblock(this); } - private void unblock(Object queue) + private void unblock(final Object queue) { - synchronized(_blockingEntities) + if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) { - if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) + if(_blocking.compareAndSet(true,false) && !isClosing()) { - if(_blocking.compareAndSet(true,false) && !isClosing()) - { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); + SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); - } } } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 9d7764414f..d21da824e9 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -29,6 +29,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.SystemLog; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.store.StoreException; @@ -345,7 +346,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); + SystemLog.message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index 421adb33a8..858482a034 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -19,8 +19,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.logging.RootMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -41,7 +39,6 @@ public class ServerSessionTest extends QpidTestCase super.setUp(); BrokerTestHelper.setUp(); _virtualHost = BrokerTestHelper.createVirtualHost(getName()); - GenericActor.setDefaultMessageLogger(CurrentActor.get().getRootMessageLogger()); } @Override |
