diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-08-07 11:15:01 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-08-07 11:15:01 +0000 |
| commit | 011ba4a3e4990077c468f122a9575018a9f09965 (patch) | |
| tree | be24a7dd8f4561aeaf7edff2ff96a589254140d9 /qpid/java | |
| parent | 1a73ea214ea2dd7e0542f97d8bf9df88855f9084 (diff) | |
| download | qpid-python-011ba4a3e4990077c468f122a9575018a9f09965.tar.gz | |
QPID-1218: Boost broker performance by lots.
AMQMessage: Allow references to be incremented in a pile
IncomingMessage: Increment message references in one go, flatten delivery loop a little.
Make _destinationQueues an ArrayList, massively increasing performance. Iter
ate through it with indexing
AccessResult: don't use StringBuilder so much
Update tests and exchanges to reflect new API usage, almost all of this is just type narrowing except for Topic where there's an extra copy, but it isn't too bad relative to the number of HashSet and HashMap operations that go on inside there.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@683583 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
11 files changed, 59 insertions, 73 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java index 0b7e300cec..027d220538 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java @@ -204,7 +204,7 @@ public class DiagnosticExchange extends AbstractExchange ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers); AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue")); - Collection<AMQQueue> queues = new ArrayList<AMQQueue>(); + ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); queues.add(q); payload.enqueue(queues); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4da639567a..616f47bd24 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -191,7 +191,7 @@ public class DirectExchange extends AbstractExchange final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey(); - final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); + final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index d1bea3410b..1ee1f35de6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -249,7 +249,7 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); } boolean routed = false; - Collection<AMQQueue> queues = new ArrayList<AMQQueue>(); + ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); for (Registration e : _bindings) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java index 9bf82a3730..ec83161029 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java @@ -37,12 +37,12 @@ import org.apache.qpid.server.queue.AMQQueue; */ class Index { - private ConcurrentMap<AMQShortString, List<AMQQueue>> _index - = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index + = new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>(); synchronized boolean add(AMQShortString key, AMQQueue queue) { - List<AMQQueue> queues = _index.get(key); + ArrayList<AMQQueue> queues = _index.get(key); if(queues == null) { queues = new ArrayList<AMQQueue>(); @@ -66,7 +66,7 @@ class Index synchronized boolean remove(AMQShortString key, AMQQueue queue) { - List<AMQQueue> queues = _index.get(key); + ArrayList<AMQQueue> queues = _index.get(key); if (queues != null) { queues = new ArrayList<AMQQueue>(queues); @@ -87,7 +87,7 @@ class Index return false; } - List<AMQQueue> get(AMQShortString key) + ArrayList<AMQQueue> get(AMQShortString key) { return _index.get(key); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index d07501a188..c18cc337fe 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; @@ -48,9 +47,6 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicInteger; import java.lang.ref.WeakReference; public class TopicExchange extends AbstractExchange @@ -532,7 +528,10 @@ public class TopicExchange extends AbstractExchange final AMQShortString routingKey = payload.getRoutingKey(); - Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey); + // The copy here is unfortunate, but not too bad relevant to the amount of + // things created and copied in getMatchedQueues + ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); + queues.addAll(getMatchedQueues(payload, routingKey)); if(queues == null || queues.isEmpty()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 0e5e7aa68c..a485649410 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -291,12 +291,17 @@ public class AMQMessage implements Filterable<AMQException> return this; } - /** Threadsafe. Increment the reference count on the message. */ public boolean incrementReference() { - if(_referenceCount.incrementAndGet() <= 1) + return incrementReference(1); + } + + /* Threadsafe. Increment the reference count on the message. */ + public boolean incrementReference(int count) + { + if(_referenceCount.addAndGet(count) <= 1) { - _referenceCount.decrementAndGet(); + _referenceCount.addAndGet(-count); return false; } else diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 9d769d7582..6b498d4d98 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; +import java.util.ArrayList; import java.util.Collection; public class IncomingMessage implements Filterable<RuntimeException> @@ -63,7 +64,7 @@ public class IncomingMessage implements Filterable<RuntimeException> * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done * by the message handle. */ - private Collection<AMQQueue> _destinationQueues; + private ArrayList<AMQQueue> _destinationQueues; private AMQProtocolSession _publisher; private MessageStore _messageStore; @@ -134,21 +135,13 @@ public class IncomingMessage implements Filterable<RuntimeException> if(_destinationQueues != null) { - for (AMQQueue q : _destinationQueues) + for (int i = 0; i < _destinationQueues.size(); i++) { - if(q.isDurable()) - { - - _messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId); - } + _messageStore.enqueueMessage(_txnContext.getStoreContext(), + _destinationQueues.get(i), _messageId); } } - } - - - - } public AMQMessage deliverToQueues() @@ -157,10 +150,9 @@ public class IncomingMessage implements Filterable<RuntimeException> // we get a reference to the destination queues now so that we can clear the // transient message data as quickly as possible - Collection<AMQQueue> destinationQueues = _destinationQueues; if (_logger.isDebugEnabled()) { - _logger.debug("Delivering message " + _messageId + " to " + destinationQueues); + _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues); } AMQMessage message = null; @@ -178,10 +170,7 @@ public class IncomingMessage implements Filterable<RuntimeException> message.setExpiration(_expiration); message.setClientIdentifier(_publisher.getSessionIdentifier()); - - - - if ((destinationQueues == null) || destinationQueues.isEmpty()) + if ((_destinationQueues == null) || _destinationQueues.size() == 0) { if (isMandatory() || isImmediate()) @@ -196,10 +185,9 @@ public class IncomingMessage implements Filterable<RuntimeException> } else { - // TODO - int offset; - final int queueCount = destinationQueues.size(); + final int queueCount = _destinationQueues.size(); + message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; @@ -212,33 +200,16 @@ public class IncomingMessage implements Filterable<RuntimeException> offset = -offset; } } - - int i = 0; - for (AMQQueue q : destinationQueues) + for (int i = offset; i < queueCount; i++) { - if(++i > offset) - { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); - } + // normal deliver so add this message at the end. + _txnContext.deliver(_destinationQueues.get(i), message); } - i = 0; - if(offset != 0) + for (int i = 0; i < offset; i++) { - for (AMQQueue q : destinationQueues) - { - if(i++ < offset) - { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); - } - } + // normal deliver so add this message at the end. + _txnContext.deliver(_destinationQueues.get(i), message); } - } // we then allow the transactional context to do something with the message content @@ -329,7 +300,7 @@ public class IncomingMessage implements Filterable<RuntimeException> _exchange.route(this); } - public void enqueue(final Collection<AMQQueue> queues) + public void enqueue(final ArrayList<AMQQueue> queues) { _destinationQueues = queues; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java index 89cead69b3..86f155d862 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java @@ -27,23 +27,23 @@ public class AccessResult GRANTED, REFUSED } - StringBuilder _authorizer; - AccessStatus _status; + private String _authorizer; + private AccessStatus _status; public AccessResult(ACLPlugin authorizer, AccessStatus status) { _status = status; - _authorizer = new StringBuilder(authorizer.getPluginName()); + _authorizer = authorizer.getPluginName(); } public void setAuthorizer(ACLPlugin authorizer) { - _authorizer.append(authorizer.getPluginName()); + _authorizer += authorizer.getPluginName(); } public String getAuthorizer() { - return _authorizer.toString(); + return _authorizer; } public void setStatus(AccessStatus status) @@ -58,8 +58,7 @@ public class AccessResult public void addAuthorizer(ACLPlugin accessManager) { - _authorizer.insert(0, "->"); - _authorizer.insert(0, accessManager.getPluginName()); + _authorizer = accessManager.getPluginName() + "->" + _authorizer; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index ca614e053a..712d3abc8f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -43,6 +43,8 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.mina.common.ByteBuffer; import javax.management.Notification; + +import java.util.ArrayList; import java.util.LinkedList; import java.util.Collections; @@ -304,7 +306,9 @@ public class AMQQueueAlertTest extends TestCase for (int i = 0; i < messages.length; i++) { messages[i] = message(false, size); - messages[i].enqueue(Collections.singleton(_queue)); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + messages[i].enqueue(qs); messages[i].routingComplete(_messageStore, new MessageHandleFactory()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index bf0a8a6d90..17f8a751de 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -47,6 +47,8 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.mina.common.ByteBuffer; import javax.management.JMException; + +import java.util.ArrayList; import java.util.LinkedList; import java.util.Collections; @@ -216,8 +218,9 @@ public class AMQQueueMBeanTest extends TestCase IncomingMessage msg = message(false, false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - - msg.enqueue(Collections.singleton(_queue)); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + msg.enqueue(qs); msg.routingComplete(_messageStore, new MessageHandleFactory()); msg.addContentBodyFrame(new ContentChunk() @@ -319,7 +322,9 @@ public class AMQQueueMBeanTest extends TestCase for (int i = 0; i < messageCount; i++) { IncomingMessage currentMessage = message(false, persistent); - currentMessage.enqueue(Collections.singleton(_queue)); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + currentMessage.enqueue(qs); // route header currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index bbd6deffd3..afa0f84d71 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; +import java.util.ArrayList; import java.util.LinkedList; import java.util.Set; import java.util.Collections; @@ -146,7 +147,9 @@ public class AckTest extends TestCase // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription - msg.enqueue(Collections.singleton(_queue)); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + msg.enqueue(qs); msg.routingComplete(_messageStore, factory); if(msg.allContentReceived()) { |
