From 48c533782ccf266d7dbad72686ae56be667e7fb8 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 7 Aug 2008 11:15:01 +0000 Subject: QPID-1218: Boost broker performance by lots. AMQMessage: Allow references to be incremented in a pile IncomingMessage: Increment message references in one go, flatten delivery loop a little. Make _destinationQueues an ArrayList, massively increasing performance. Iter ate through it with indexing AccessResult: don't use StringBuilder so much Update tests and exchanges to reflect new API usage, almost all of this is just type narrowing except for Topic where there's an extra copy, but it isn't too bad relative to the number of HashSet and HashMap operations that go on inside there. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683583 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/DirectExchange.java | 2 +- .../qpid/server/exchange/HeadersExchange.java | 2 +- .../org/apache/qpid/server/exchange/Index.java | 10 ++-- .../apache/qpid/server/exchange/TopicExchange.java | 9 ++-- .../org/apache/qpid/server/queue/AMQMessage.java | 11 ++-- .../apache/qpid/server/queue/IncomingMessage.java | 61 ++++++---------------- .../qpid/server/security/access/AccessResult.java | 13 +++-- .../qpid/server/queue/AMQQueueAlertTest.java | 6 ++- .../qpid/server/queue/AMQQueueMBeanTest.java | 11 ++-- 9 files changed, 54 insertions(+), 71 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4da639567a..616f47bd24 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -191,7 +191,7 @@ public class DirectExchange extends AbstractExchange final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey(); - final List queues = (routingKey == null) ? null : _index.get(routingKey); + final ArrayList queues = (routingKey == null) ? null : _index.get(routingKey); if (_logger.isDebugEnabled()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index d1bea3410b..1ee1f35de6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -249,7 +249,7 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); } boolean routed = false; - Collection queues = new ArrayList(); + ArrayList queues = new ArrayList(); for (Registration e : _bindings) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java index 9bf82a3730..ec83161029 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java @@ -37,12 +37,12 @@ import org.apache.qpid.server.queue.AMQQueue; */ class Index { - private ConcurrentMap> _index - = new ConcurrentHashMap>(); + private ConcurrentMap> _index + = new ConcurrentHashMap>(); synchronized boolean add(AMQShortString key, AMQQueue queue) { - List queues = _index.get(key); + ArrayList queues = _index.get(key); if(queues == null) { queues = new ArrayList(); @@ -66,7 +66,7 @@ class Index synchronized boolean remove(AMQShortString key, AMQQueue queue) { - List queues = _index.get(key); + ArrayList queues = _index.get(key); if (queues != null) { queues = new ArrayList(queues); @@ -87,7 +87,7 @@ class Index return false; } - List get(AMQShortString key) + ArrayList get(AMQShortString key) { return _index.get(key); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index d07501a188..c18cc337fe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; @@ -48,9 +47,6 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicInteger; import java.lang.ref.WeakReference; public class TopicExchange extends AbstractExchange @@ -532,7 +528,10 @@ public class TopicExchange extends AbstractExchange final AMQShortString routingKey = payload.getRoutingKey(); - Collection queues = getMatchedQueues(payload, routingKey); + // The copy here is unfortunate, but not too bad relevant to the amount of + // things created and copied in getMatchedQueues + ArrayList queues = new ArrayList(); + queues.addAll(getMatchedQueues(payload, routingKey)); if(queues == null || queues.isEmpty()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 0e5e7aa68c..a485649410 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -291,12 +291,17 @@ public class AMQMessage implements Filterable return this; } - /** Threadsafe. Increment the reference count on the message. */ public boolean incrementReference() { - if(_referenceCount.incrementAndGet() <= 1) + return incrementReference(1); + } + + /* Threadsafe. Increment the reference count on the message. */ + public boolean incrementReference(int count) + { + if(_referenceCount.addAndGet(count) <= 1) { - _referenceCount.decrementAndGet(); + _referenceCount.addAndGet(-count); return false; } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 9d769d7582..6b498d4d98 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; +import java.util.ArrayList; import java.util.Collection; public class IncomingMessage implements Filterable @@ -63,7 +64,7 @@ public class IncomingMessage implements Filterable * delivered. It is cleared after delivery has been attempted. Any persistent record of destinations is done * by the message handle. */ - private Collection _destinationQueues; + private ArrayList _destinationQueues; private AMQProtocolSession _publisher; private MessageStore _messageStore; @@ -134,21 +135,13 @@ public class IncomingMessage implements Filterable if(_destinationQueues != null) { - for (AMQQueue q : _destinationQueues) + for (int i = 0; i < _destinationQueues.size(); i++) { - if(q.isDurable()) - { - - _messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId); - } + _messageStore.enqueueMessage(_txnContext.getStoreContext(), + _destinationQueues.get(i), _messageId); } } - } - - - - } public AMQMessage deliverToQueues() @@ -157,10 +150,9 @@ public class IncomingMessage implements Filterable // we get a reference to the destination queues now so that we can clear the // transient message data as quickly as possible - Collection destinationQueues = _destinationQueues; if (_logger.isDebugEnabled()) { - _logger.debug("Delivering message " + _messageId + " to " + destinationQueues); + _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues); } AMQMessage message = null; @@ -178,10 +170,7 @@ public class IncomingMessage implements Filterable message.setExpiration(_expiration); message.setClientIdentifier(_publisher.getSessionIdentifier()); - - - - if ((destinationQueues == null) || destinationQueues.isEmpty()) + if ((_destinationQueues == null) || _destinationQueues.size() == 0) { if (isMandatory() || isImmediate()) @@ -196,10 +185,9 @@ public class IncomingMessage implements Filterable } else { - // TODO - int offset; - final int queueCount = destinationQueues.size(); + final int queueCount = _destinationQueues.size(); + message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; @@ -212,33 +200,16 @@ public class IncomingMessage implements Filterable offset = -offset; } } - - int i = 0; - for (AMQQueue q : destinationQueues) + for (int i = offset; i < queueCount; i++) { - if(++i > offset) - { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); - } + // normal deliver so add this message at the end. + _txnContext.deliver(_destinationQueues.get(i), message); } - i = 0; - if(offset != 0) + for (int i = 0; i < offset; i++) { - for (AMQQueue q : destinationQueues) - { - if(i++ < offset) - { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); - } - } + // normal deliver so add this message at the end. + _txnContext.deliver(_destinationQueues.get(i), message); } - } // we then allow the transactional context to do something with the message content @@ -329,7 +300,7 @@ public class IncomingMessage implements Filterable _exchange.route(this); } - public void enqueue(final Collection queues) + public void enqueue(final ArrayList queues) { _destinationQueues = queues; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java index 89cead69b3..86f155d862 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java @@ -27,23 +27,23 @@ public class AccessResult GRANTED, REFUSED } - StringBuilder _authorizer; - AccessStatus _status; + private String _authorizer; + private AccessStatus _status; public AccessResult(ACLPlugin authorizer, AccessStatus status) { _status = status; - _authorizer = new StringBuilder(authorizer.getPluginName()); + _authorizer = authorizer.getPluginName(); } public void setAuthorizer(ACLPlugin authorizer) { - _authorizer.append(authorizer.getPluginName()); + _authorizer += authorizer.getPluginName(); } public String getAuthorizer() { - return _authorizer.toString(); + return _authorizer; } public void setStatus(AccessStatus status) @@ -58,8 +58,7 @@ public class AccessResult public void addAuthorizer(ACLPlugin accessManager) { - _authorizer.insert(0, "->"); - _authorizer.insert(0, accessManager.getPluginName()); + _authorizer = accessManager.getPluginName() + "->" + _authorizer; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index ca614e053a..712d3abc8f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -43,6 +43,8 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.mina.common.ByteBuffer; import javax.management.Notification; + +import java.util.ArrayList; import java.util.LinkedList; import java.util.Collections; @@ -304,7 +306,9 @@ public class AMQQueueAlertTest extends TestCase for (int i = 0; i < messages.length; i++) { messages[i] = message(false, size); - messages[i].enqueue(Collections.singleton(_queue)); + ArrayList qs = new ArrayList(); + qs.add(_queue); + messages[i].enqueue(qs); messages[i].routingComplete(_messageStore, new MessageHandleFactory()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index bf0a8a6d90..17f8a751de 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -47,6 +47,8 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.mina.common.ByteBuffer; import javax.management.JMException; + +import java.util.ArrayList; import java.util.LinkedList; import java.util.Collections; @@ -216,8 +218,9 @@ public class AMQQueueMBeanTest extends TestCase IncomingMessage msg = message(false, false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - - msg.enqueue(Collections.singleton(_queue)); + ArrayList qs = new ArrayList(); + qs.add(_queue); + msg.enqueue(qs); msg.routingComplete(_messageStore, new MessageHandleFactory()); msg.addContentBodyFrame(new ContentChunk() @@ -319,7 +322,9 @@ public class AMQQueueMBeanTest extends TestCase for (int i = 0; i < messageCount; i++) { IncomingMessage currentMessage = message(false, persistent); - currentMessage.enqueue(Collections.singleton(_queue)); + ArrayList qs = new ArrayList(); + qs.add(_queue); + currentMessage.enqueue(qs); // route header currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); -- cgit v1.2.1