diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-08-06 17:01:48 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-08-06 17:01:48 +0000 |
| commit | d793cd4d6d1873af383a8f1dbd2aa4383f0be546 (patch) | |
| tree | 3a19ff9c0ada9be07050d01d1b4e6c9ea6b76c26 /qpid/java/broker/src/main | |
| parent | 6f91ef0947e70bfe266257dfe257617c9417e257 (diff) | |
| download | qpid-python-d793cd4d6d1873af383a8f1dbd2aa4383f0be546.tar.gz | |
QPID-2002, QPID-2001 : Add new SubscriptionActor to perform Subscription close logging on the Subscription Flush thread. Alternative would be to create a Virtualhost Logger.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@801725 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
4 files changed, 59 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java new file mode 100644 index 0000000000..ab33a29eac --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.subscription.Subscription; + +import java.text.MessageFormat; + +public class SubscriptionActor extends AbstractActor +{ + public static String SUBSCRIBER_FORMAT = "sub:{0}(vh({1})/qu({2}))"; + + public SubscriptionActor(RootMessageLogger logger, Subscription subscription) + { + super(logger); + + _logString = "[" + MessageFormat.format(SUBSCRIBER_FORMAT, + subscription.getSubscriptionID(), + subscription.getQueue().getVirtualHost().getName(), + subscription.getQueue().getName()) + + "] "; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 46b2cf8fb4..929045599b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -236,15 +236,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } _bindings.addBinding(routingKey, arguments, exchange); -// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments); - - //fixme MR logging in progress -// _bindings.addBinding(binding); -// -// if (_logger.isMessageEnabled(binding)) -// { -// _logger.message(binding, "QM-1001 : Created Binding"); -// } } public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException @@ -1238,6 +1229,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean complete = false; try { + CurrentActor.set(_sub.getLogActor()); complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); } @@ -1245,11 +1237,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _logger.error(e); } + finally + { + CurrentActor.remove(); + } if (!complete && !_sub.isSuspended()) { _asyncDelivery.execute(this); } + } public boolean isRead() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 5001b0baf1..c9042325bf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -23,12 +23,13 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; public interface Subscription { - + LogActor getLogActor(); public static enum State { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index a959944434..54bd568bca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -34,12 +34,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.SubscriptionActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -75,6 +76,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); private LogSubject _logSubject; + private LogActor _logActor; static final class BrowserSubscription extends SubscriptionImpl { @@ -340,6 +342,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _queue = queue; _logSubject = new SubscriptionLogSubject(this); + _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); if (CurrentActor.get().getRootMessageLogger(). isMessageEnabled(CurrentActor.get(), _logSubject)) @@ -572,6 +575,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _channel.getProtocolSession(); } + public LogActor getLogActor() + { + return _logActor; + } + public AMQQueue getQueue() { return _queue; |
