diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-08-06 17:01:48 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-08-06 17:01:48 +0000 |
| commit | 36b1a55deef53c2e5667aa0fa638002ea2da1f75 (patch) | |
| tree | b38e219e988f178a22b0bcf0c5859771f1ea2880 | |
| parent | 0bcc71e12f76fca002c3b1b3c88333bffe5d5a0c (diff) | |
| download | qpid-python-36b1a55deef53c2e5667aa0fa638002ea2da1f75.tar.gz | |
QPID-2002, QPID-2001 : Add new SubscriptionActor to perform Subscription close logging on the Subscription Flush thread. Alternative would be to create a Virtualhost Logger.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@801725 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 203 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java b/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java new file mode 100644 index 0000000000..ab33a29eac --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.subscription.Subscription; + +import java.text.MessageFormat; + +public class SubscriptionActor extends AbstractActor +{ + public static String SUBSCRIBER_FORMAT = "sub:{0}(vh({1})/qu({2}))"; + + public SubscriptionActor(RootMessageLogger logger, Subscription subscription) + { + super(logger); + + _logString = "[" + MessageFormat.format(SUBSCRIBER_FORMAT, + subscription.getSubscriptionID(), + subscription.getQueue().getVirtualHost().getName(), + subscription.getQueue().getName()) + + "] "; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 46b2cf8fb4..929045599b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -236,15 +236,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } _bindings.addBinding(routingKey, arguments, exchange); -// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments); - - //fixme MR logging in progress -// _bindings.addBinding(binding); -// -// if (_logger.isMessageEnabled(binding)) -// { -// _logger.message(binding, "QM-1001 : Created Binding"); -// } } public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException @@ -1238,6 +1229,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean complete = false; try { + CurrentActor.set(_sub.getLogActor()); complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); } @@ -1245,11 +1237,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _logger.error(e); } + finally + { + CurrentActor.remove(); + } if (!complete && !_sub.isSuspended()) { _asyncDelivery.execute(this); } + } public boolean isRead() diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 5001b0baf1..c9042325bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -23,12 +23,13 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; public interface Subscription { - + LogActor getLogActor(); public static enum State { diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index a959944434..54bd568bca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -34,12 +34,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.SubscriptionActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -75,6 +76,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); private LogSubject _logSubject; + private LogActor _logActor; static final class BrowserSubscription extends SubscriptionImpl { @@ -340,6 +342,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _queue = queue; _logSubject = new SubscriptionLogSubject(this); + _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); if (CurrentActor.get().getRootMessageLogger(). isMessageEnabled(CurrentActor.get(), _logSubject)) @@ -572,6 +575,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _channel.getProtocolSession(); } + public LogActor getLogActor() + { + return _logActor; + } + public AMQQueue getQueue() { return _queue; diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java new file mode 100644 index 0000000000..4e357ec52e --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import junit.framework.TestCase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.UnitTestMessageLogger; +import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; + +import java.security.Principal; +import java.util.List; + +/** + * Test : AMQPConnectionActorTest + * Validate the AMQPConnectionActor class. + * + * The test creates a new AMQPActor and then logs a message using it. + * + * The test then verifies that the logged message was the only one created and + * that the message contains the required message. + */ +public class SubscriptionActorTest extends TestCase +{ + + LogActor _amqpActor; + UnitTestMessageLogger _rawLogger; + + public void setUp() throws ConfigurationException + { + Configuration config = new PropertiesConfiguration(); + ServerConfiguration serverConfig = new ServerConfiguration(config); + + _rawLogger = new UnitTestMessageLogger(); + RootMessageLogger rootLogger = + new RootMessageLoggerImpl(serverConfig, _rawLogger); + + MockSubscription mockSubscription = new MockSubscription(); + + MockAMQQueue queue = new MockAMQQueue(getName()); + + queue.setVirtualHost(ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next()); + + mockSubscription.setQueue(queue,false); + + _amqpActor = new SubscriptionActor(rootLogger, mockSubscription); + } + + public void tearDown() + { + _rawLogger.clearLogMessages(); + ApplicationRegistry.remove(); + } + + /** + * Test the AMQPActor logging as a Subscription logger. + * + * The test sends a message then verifies that it entered the logs. + * + * The log message should be fully repalaced (no '{n}' values) and should + * contain subscription identification. + */ + public void testSubscription() + { + final String message = "test logging"; + + _amqpActor.message(new LogSubject() + { + public String toString() + { + return "[AMQPActorTest]"; + } + + }, new LogMessage() + { + public String toString() + { + return message; + } + }); + + List<Object> logs = _rawLogger.getLogMessages(); + + assertEquals("Message log size not as expected.", 1, logs.size()); + + // Verify that the logged message is present in the output + assertTrue("Message was not found in log message", + logs.get(0).toString().contains(message)); + + // Verify that all the values were presented to the MessageFormatter + // so we will not end up with '{n}' entries in the log. + assertFalse("Verify that the string does not contain any '{'.", + logs.get(0).toString().contains("{")); + + // Verify that the message has the correct type + assertTrue("Message contains the [sub: prefix", + logs.get(0).toString().contains("[sub:")); + + // Verify that the logged message does not contains the 'ch:' marker + assertFalse("Message was logged with a channel identifier." + logs.get(0), + logs.get(0).toString().contains("/ch:")); + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index f5348ce1bb..a3274a3a05 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -24,6 +24,7 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; @@ -90,6 +91,11 @@ public class MockSubscription implements Subscription return new QueueEntry.SubscriptionAcquiredState(this); } + public LogActor getLogActor() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public AMQQueue getQueue() { return queue; diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 418cb99354..509c027cbf 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.framing.AMQShortString; @@ -151,6 +152,11 @@ public class SubscriptionTestHelper implements Subscription return false; //To change body of implemented methods use File | Settings | File Templates. } + public LogActor getLogActor() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public AMQQueue getQueue() { return null; |
