summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-07 16:36:26 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-07 16:36:26 +0000
commit9e9d87a76f12567c2e0f65485bbf85b39fc6e437 (patch)
treee8caf31a36a4618fbaa33b765ca68f9c48942226 /qpid/java/broker-plugins/amqp-0-10-protocol
parent735d2fbd9b1cf73410363d23591d50fe34c61dc4 (diff)
downloadqpid-python-9e9d87a76f12567c2e0f65485bbf85b39fc6e437.tar.gz
QPID-5611 : [Java Broker] remove LogActors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575315 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java10
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java53
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java42
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java82
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java3
6 files changed, 111 insertions, 82 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 120ba2d951..d41af30a09 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,8 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -387,7 +386,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
protected void sendToDLQOrDiscard(MessageInstance entry)
{
- final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
int requeues = entry.routeToAlternate(new Action<MessageInstance>()
@@ -395,7 +393,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
@Override
public void performAction(final MessageInstance requeueEntry)
{
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ SystemLog.message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
requeueEntry.getOwningResource().getName()));
}
}, null);
@@ -410,12 +408,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
if(alternateExchange != null)
{
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ SystemLog.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
alternateExchange.getName()));
}
else
{
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ SystemLog.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
queue.getName(),
msg.getInitialRoutingAddress()));
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 9fe1babe20..9346e3e7bb 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
@@ -30,8 +31,11 @@ import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
+import javax.security.auth.Subject;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine
@@ -63,15 +67,31 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
{
- _network = network;
+ if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
+ {
+ Subject.doAs(getSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ setNetworkConnection(network,sender);
+ return null;
+ }
+ });
+ }
+ else
+ {
+ SystemLog.message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+ _network = network;
+
+ _connection.setNetworkConnection(network);
+ _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
+ // FIXME Two log messages to maintain compatibility with earlier protocol versions
+ SystemLog.message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
- _connection.setNetworkConnection(network);
- _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
- // FIXME Two log messages to maintain compatibility with earlier protocol versions
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
+ }
}
private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
@@ -155,8 +175,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void readerIdle()
{
- _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE());
- _network.close();
+ Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ SystemLog.message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
+
}
public String getAddress()
@@ -189,4 +218,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
return _connection.getConnectionId();
}
+
+ @Override
+ public Subject getSubject()
+ {
+ return _connection.getAuthorizedSubject();
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 5bfc398bcf..5096223889 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -32,10 +32,8 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.ConnectionPrincipal;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
@@ -65,7 +63,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
- private LogActor _actor;
private final Subject _authorizedSubject = new Subject();
private Principal _authorizedPrincipal = null;
@@ -87,7 +84,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
_connectionId = connectionId;
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
- _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger());
}
public Object getReference()
@@ -112,7 +108,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
_onOpenTask.run();
}
- _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true));
+ SystemLog.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), getClientProduct(), true, true, true, true));
getVirtualHost().getConnectionRegistry().registerConnection(this);
}
@@ -135,7 +131,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
if(_logClosed.compareAndSet(false, true))
{
- CurrentActor.get().message(this, ConnectionMessages.CLOSE());
+ SystemLog.message(this, ConnectionMessages.CLOSE());
}
}
@@ -258,42 +254,31 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
Subject subject;
if (event.isConnectionControl())
{
- CurrentActor.set(_actor);
subject = _authorizedSubject;
}
else
{
ServerSession channel = (ServerSession) getSession(event.getChannel());
- LogActor channelActor = null;
if (channel != null)
{
subject = channel.getAuthorizedSubject();
- channelActor = channel.getLogActor();
}
else
{
subject = _authorizedSubject;
}
-
- CurrentActor.set(channelActor == null ? _actor : channelActor);
}
- try
+ Subject.doAs(subject, new PrivilegedAction<Void>()
{
- Subject.doAs(subject, new PrivilegedAction<Void>()
+ @Override
+ public Void run()
{
- @Override
- public Void run()
- {
- ServerConnection.super.received(event);
- return null;
- }
- });
- }
- finally
- {
- CurrentActor.remove();
- }
+ ServerConnection.super.received(event);
+ return null;
+ }
+ });
+
}
public String toLogString()
@@ -331,11 +316,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
}
- public LogActor getLogActor()
- {
- return _actor;
- }
-
public void close(AMQConstant cause, String message)
{
closeSubscriptions();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 5627b2eabe..453b3f85a5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.security.AccessController;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
@@ -43,15 +45,13 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
@@ -102,7 +102,6 @@ public class ServerSession extends Session
private final UUID _id = UUID.randomUUID();
private final Subject _subject = new Subject();
private long _createTime = System.currentTimeMillis();
- private LogActor _actor = GenericActor.getInstance(this);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -161,18 +160,44 @@ public class ServerSession extends Session
});
}
- protected void setState(State state)
+ protected void setState(final State state)
{
- super.setState(state);
-
- if (state == State.OPEN)
+ if(runningAsSubject())
{
- _actor.message(ChannelMessages.CREATE());
- if(_blocking.get())
+ super.setState(state);
+
+ if (state == State.OPEN)
{
- invokeBlock();
+ SystemLog.message(ChannelMessages.CREATE());
+ if(_blocking.get())
+ {
+ invokeBlock();
+ }
}
}
+ else
+ {
+ runAsSubject(new PrivilegedAction<Void>() {
+
+ @Override
+ public Void run()
+ {
+ setState(state);
+ return null;
+ }
+ });
+
+ }
+ }
+
+ private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction)
+ {
+ return Subject.doAs(getAuthorizedSubject(), privilegedAction);
+ }
+
+ private boolean runningAsSubject()
+ {
+ return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
}
private void invokeBlock()
@@ -394,7 +419,7 @@ public class ServerSession extends Session
{
operationalLoggingMessage = ChannelMessages.CLOSE();
}
- CurrentActor.get().message(getLogSubject(), operationalLoggingMessage);
+ SystemLog.message(getLogSubject(), operationalLoggingMessage);
}
@Override
@@ -678,14 +703,10 @@ public class ServerSession extends Session
return (ServerConnection) super.getConnection();
}
- public LogActor getLogActor()
- {
- return _actor;
- }
public LogSubject getLogSubject()
{
- return (LogSubject) this;
+ return this;
}
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -704,7 +725,7 @@ public class ServerSession extends Session
}
- private void block(Object queue, String name)
+ private void block(final Object queue, final String name)
{
synchronized (_blockingEntities)
{
@@ -717,7 +738,7 @@ public class ServerSession extends Session
{
invokeBlock();
}
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
}
@@ -735,25 +756,22 @@ public class ServerSession extends Session
unblock(this);
}
- private void unblock(Object queue)
+ private void unblock(final Object queue)
{
- synchronized(_blockingEntities)
+ if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
- if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
+ if(_blocking.compareAndSet(true,false) && !isClosing())
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
- {
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
+ SystemLog.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
- }
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 9d7764414f..d21da824e9 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.logging.SystemLog;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.store.StoreException;
@@ -345,7 +346,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
+ SystemLog.message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 421adb33a8..858482a034 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -19,8 +19,6 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -41,7 +39,6 @@ public class ServerSessionTest extends QpidTestCase
super.setUp();
BrokerTestHelper.setUp();
_virtualHost = BrokerTestHelper.createVirtualHost(getName());
- GenericActor.setDefaultMessageLogger(CurrentActor.get().getRootMessageLogger());
}
@Override