diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-07-17 11:38:10 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-07-17 11:38:10 +0000 |
| commit | 0579b1408d8dae4d85089fba5f1c8c14e04d1b61 (patch) | |
| tree | c9ed9c9079aa881f03a750287989907ddb264363 /java | |
| parent | b101bd671ef4372c91642c8284820595a4c04521 (diff) | |
| download | qpid-python-0579b1408d8dae4d85089fba5f1c8c14e04d1b61.tar.gz | |
QPID-541 A large portion of memory was being wasted in 32k ByteBuffers being held by the AMQShortStrings.
Patch submitted by Robert Godfrey to intern() the AMQSSs to reduce memory usage. Current implementation *will* impact performance due to the usage of a static Map for storage. However, a thread local implementation is in the works.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@556890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 69 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 56eae279dc..d1e4ae6aa6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -98,6 +98,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } else { + + if (body.consumerTag != null) + { + body.consumerTag = body.consumerTag.intern(); + } + try { AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, @@ -136,15 +142,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic // If the above doesn't work then perhaps this is wrong too. // throw body.getConnectionException(AMQConstant.NOT_ALLOWED, // "Non-unique consumer tag, '" + body.consumerTag + "'"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg)); // replyText + (byte) 8, (byte) 0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId + BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText } catch (AMQQueue.ExistingExclusiveSubscription e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 67ade0a744..68d4483df3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -67,6 +67,10 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME; } + else + { + body.exchange = body.exchange.intern(); + } VirtualHost vHost = session.getVirtualHost(); Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange); // if the exchange does not exist we raise a channel exception @@ -86,6 +90,11 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi throw body.getChannelNotFoundException(evt.getChannelId()); } + if(body.routingKey != null) + { + body.routingKey = body.routingKey.intern(); + } + MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body); channel.setPublishFrame(info, session); } @@ -93,3 +102,4 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi } + diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index be3ffcc698..8ab8366d9e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -83,7 +83,9 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange try { - exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable, + exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(), + body.type == null ? null : body.type.intern(), + body.durable, body.passive, body.ticket); exchangeRegistry.registerExchange(exchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 2b751ab692..3e68069838 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -97,6 +97,12 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist."); } + + if (body.routingKey != null) + { + body.routingKey = body.routingKey.intern(); + } + try { if (!exch.isBound(body.routingKey, body.arguments, queue)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 2e697d4564..2bc185cfb4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -91,8 +91,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar synchronized (queueRegistry) { + + if (((queue = queueRegistry.getQueue(body.queue)) == null)) { + if(body.queue != null) + { + body.queue = body.queue.intern(); + } + if (body.passive) { String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ")."; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 05e9473463..ec29d62847 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -26,6 +26,10 @@ import org.apache.mina.common.ByteBuffer; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.lang.ref.WeakReference;
+
/**
* A short string is a representation of an AMQ Short String
* Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
@@ -34,6 +38,10 @@ import org.slf4j.LoggerFactory; */
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
+
+ private static final Map<AMQShortString, WeakReference<AMQShortString>> internMap =
+ new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+
private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
private final ByteBuffer _data;
@@ -43,7 +51,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public AMQShortString(byte[] data)
{
-
_data = ByteBuffer.wrap(data);
_length = data.length;
}
@@ -376,4 +383,27 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return (length() == name.length()) ? 0 : -1;
}
}
+
+
+ public AMQShortString intern()
+ {
+ hashCode();
+ synchronized(internMap)
+ {
+
+ WeakReference<AMQShortString> ref = internMap.get(this);
+ if(ref != null)
+ {
+ AMQShortString internString = ref.get();
+ if(internString != null)
+ {
+ return internString;
+ }
+ }
+
+ AMQShortString internString = new AMQShortString(getBytes());
+ internMap.put(internString, new WeakReference(internString));
+ return internString;
+ }
+ }
}
|
