From 01b4cfa2d93ba951d0983bc4cd4b94dd87ea9400 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 12 Feb 2007 11:03:12 +0000 Subject: Fixed Xmx value ConcurrentSelectorDeliveryManager : Added trace logging. Ensured messages are removed when required, rather than leaking memory. AMQSession moved exceptions in recover to wrap method rather than individual suspend calls. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@506413 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/broker/etc/qpid-server.conf | 2 +- .../qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 12 ++++++++++-- .../org/apache/qpid/server/store/MemoryMessageStore.java | 12 ++++++++---- 3 files changed, 19 insertions(+), 7 deletions(-) (limited to 'qpid/java/broker') diff --git a/qpid/java/broker/etc/qpid-server.conf b/qpid/java/broker/etc/qpid-server.conf index 6d31db7fa9..0e98f72a37 100644 --- a/qpid/java/broker/etc/qpid-server.conf +++ b/qpid/java/broker/etc/qpid-server.conf @@ -21,5 +21,5 @@ QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar export JAVA=java \ JAVA_VM=-server \ - JAVA_MEM="-Xmx3160m -Xms512m "\ + JAVA_MEM="-Xmx1024m -Xms512m "\ CLASSPATH=$QPID_LIBS diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 1a26bab011..9b79657575 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -230,6 +230,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager Queue messageQueue = sub.getNextQueue(_messages); + if (_log.isTraceEnabled()) + { + _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) + + ") from queue (" + System.identityHashCode(messageQueue) + + ") AMQQueue (" + System.identityHashCode(queue) + ")"); + } + if (messageQueue == null) { // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector @@ -276,7 +283,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { //fixme _log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages"); - //_messages.remove(message); + //inefficient + _messages.remove(message); } } @@ -353,7 +361,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + - " subscribers to give the message to."); + " subscribers to give the message to. Queued count (" + getMessageCount() + ")"); } for (Subscription sub : _subscriptions.getSubscriptions()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 328aed81d9..5b6cab294c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -32,9 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -/** - * A simple message store that stores the messages in a threadsafe structure in memory. - */ +/** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -85,7 +83,13 @@ public class MemoryMessageStore implements MessageStore { _log.debug("Removing message with id " + messageId); } - _messageMap.remove(messageId); + Object o = _messageMap.remove(messageId); + + if (_log.isDebugEnabled()) + { + _log.debug("Removed message " + System.identityHashCode(o)); + } + } public void createQueue(AMQQueue queue) throws AMQException -- cgit v1.2.1