summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-02-13 18:10:53 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-02-13 18:10:53 +0000
commit91dfa2865cb9998a379e099ff58e830b4b1ba8a4 (patch)
tree6bd402edb2385a218cf1d13129173a9c3a7619cf /java/broker/src
parent62ef6db190b842c39a7101a0d108c28554171b1b (diff)
downloadqpid-python-91dfa2865cb9998a379e099ff58e830b4b1ba8a4.tar.gz
QPID-790 : Performance Improvements
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@627552 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java63
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java5
9 files changed, 86 insertions, 38 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1bf0cd027a..10184a79e5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.MessageStore;
@@ -199,11 +200,12 @@ public class AMQChannel
_prefetch_HighWaterMark = prefetchCount;
}
- public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException
{
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
_currentMessage.setPublisher(publisher);
+ _currentMessage.setExchange(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
@@ -285,7 +287,7 @@ public class AMQChannel
{
try
{
- _exchanges.routeContent(_currentMessage);
+ _currentMessage.route();
}
catch (NoRouteException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 75be86a387..19172b98f3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -239,7 +239,7 @@ public class DestWildExchange extends AbstractExchange
{
MessagePublishInfo info = payload.getMessagePublishInfo();
- final AMQShortString routingKey = normalize(info.getRoutingKey());
+ final AMQShortString routingKey = info.getRoutingKey();
List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 66afc61751..687ec33ba0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -91,7 +91,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
}
MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
- channel.setPublishFrame(info, session);
+ info.setExchange(exchange);
+ channel.setPublishFrame(info, session, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 1bfe1e3d35..98c77d8d32 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -72,7 +72,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -127,7 +127,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final StoreContext storeContext = message.getStoreContext();
final long messageId = message.getMessageId();
- ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -171,7 +171,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -187,10 +187,10 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame.toByteBuffer();
+ return deliverFrame;
}
- private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -205,7 +205,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame.toByteBuffer();
+ return getOkFrame;
}
public byte getProtocolMinorVersion()
@@ -218,7 +218,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return getProtocolSession().getProtocolMajorVersion();
}
- private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicReturnBody basicReturnBody =
@@ -228,13 +228,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame.toByteBuffer();
+ return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 0bc2fcf6f7..b14f03e617 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -16,6 +16,7 @@ import org.apache.qpid.AMQException;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
public static Factory getInstanceFactory()
@@ -46,9 +47,10 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ contentHeaderBody);
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
@@ -101,7 +103,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final StoreContext storeContext = message.getStoreContext();
final long messageId = message.getMessageId();
- ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -145,41 +147,54 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
- private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- BasicDeliverBody deliverBody =
- methodRegistry.createBasicDeliverBody(consumerTag,
- deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+ final boolean isRedelivered = messageHandle.isRedelivered();
+ final AMQShortString exchangeName = pb.getExchange();
+ final AMQShortString routingKey = pb.getRoutingKey();
+
+ final AMQDataBlock returnBlock = new DeferredDataBlock()
+ {
+
+ protected AMQDataBlock createAMQDataBlock()
+ {
+ BasicDeliverBody deliverBody =
+ METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+ AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+
+
+ return deliverFrame;
- return deliverFrame.toByteBuffer();
+ }
+ };
+ return returnBlock;
}
- private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
BasicGetOkBody getOkBody =
- methodRegistry.createBasicGetOkBody(deliveryTag,
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
messageHandle.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame.toByteBuffer();
+ return getOkFrame;
}
public byte getProtocolMinorVersion()
@@ -192,23 +207,23 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return getProtocolSession().getProtocolMajorVersion();
}
- private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
BasicReturnBody basicReturnBody =
- methodRegistry.createBasicReturnBody(replyCode,
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame.toByteBuffer();
+ return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -252,8 +267,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d9a9d2273b..80158779b2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.exchange.Exchange;
import java.util.HashMap;
import java.util.HashSet;
@@ -84,6 +85,10 @@ public class AMQMessage
private final int hashcode = System.identityHashCode(this);
+ private Exchange _exchange;
+ private static final boolean SYNCED_CLOCKS =
+ ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false);
+
public String debugIdentity()
{
@@ -97,7 +102,7 @@ public class AMQMessage
long timestamp =
((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
- if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+ if (SYNCED_CLOCKS)
{
_expiration = expiration;
}
@@ -126,6 +131,16 @@ public class AMQMessage
return _referenceCount.get() > 0;
}
+ public void setExchange(final Exchange exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public void route() throws AMQException
+ {
+ _exchange.route(this);
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index 8e5879a51e..7e2d56b460 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -589,6 +589,11 @@ public class DestWildExchangeTest extends TestCase
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
public boolean isImmediate()
{
return false;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 81b0ae2213..fbd9e65480 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -242,6 +242,11 @@ public class AMQQueueAlertTest extends TestCase
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return immediate;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index d86c90bdae..e72e1bf1f0 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -234,6 +234,11 @@ public class AMQQueueMBeanTest extends TestCase
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return immediate;