diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2011-12-28 13:02:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2011-12-28 13:02:41 +0000 |
| commit | 55ccbf149980b06c7b7effa36871ffbdf50550fa (patch) | |
| tree | f5fc6181438968f82af0528c751af32ea8fef64e /qpid/java/client/src | |
| parent | f085f3b0ce89af428e75bf2ae3b8c65ecdd16ad6 (diff) | |
| download | qpid-python-55ccbf149980b06c7b7effa36871ffbdf50550fa.tar.gz | |
QPID-3714 : [Java] Performance Improvements
Persistence:
Store message in same transaction as enqueue if possible
Memory:
Remove unnecessary (un)boxing
Reduce unnecessary copying of message data
Cache short strings
Cache queues for a given routing key on an Exchange
(0-9) Use a fixed size buffer for preparing frames to write out
Other:
Reduce calls to System.currentTimeMillis
(0-10) Special case immutable RangeSets, in particular RangeSets of a single range/point
(0-10) Special case delivery properties and message properties in headers
(0-9) send commit-ok as soon as data committed to store
Cache publishing access control queries
(0-9) Optimised long and int typed values for FieldTables
(0-9) Retain FieldTable encoded form
(0-9) Cache queue and topic destinations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1225178 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
14 files changed, 408 insertions, 121 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 756b5cacb0..8dc36673dc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -30,7 +30,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; @@ -60,23 +59,7 @@ import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.ExchangeBoundResult; -import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExecutionErrorCode; -import org.apache.qpid.transport.ExecutionException; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.QueueQueryResult; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.*; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; import org.slf4j.Logger; @@ -141,13 +124,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000); private TimerTask flushTask = null; - private RangeSet unacked = new RangeSet(); + private RangeSet unacked = RangeSetFactory.createRangeSet(); private int unackedCount = 0; /** * Used to store the range of in tx messages */ - private final RangeSet _txRangeSet = new RangeSet(); + private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet(); private int _txSize = 0; //--- constructors @@ -460,7 +443,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendRecover() throws AMQException, FailoverException { // release all unacked messages - RangeSet all = new RangeSet(); + RangeSet all = RangeSetFactory.createRangeSet(); RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) @@ -483,7 +466,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags) { - RangeSet ranges = new RangeSet(); + RangeSet ranges = RangeSetFactory.createRangeSet(); while (true) { Long tag = messageTags.poll(); @@ -518,7 +501,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void rejectMessage(long deliveryTag, boolean requeue) { // The value of requeue is always true - RangeSet ranges = new RangeSet(); + RangeSet ranges = RangeSetFactory.createRangeSet(); ranges.add((int) deliveryTag); flushProcessed(ranges, false); if (requeue) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 96df463481..ab3a0284a6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,6 +21,8 @@ package org.apache.qpid.client; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.ArrayList; import java.util.Map; @@ -80,7 +82,7 @@ import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> +public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -96,8 +98,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. */ - AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) + protected AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark); @@ -150,7 +152,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } - getProtocolHandler().writeFrame(ackFrame); + getProtocolHandler().writeFrame(ackFrame, !getTransacted()); _unacknowledgedMessageTags.remove(deliveryTag); } @@ -512,7 +514,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache); AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); AMQShortString reason = msg.getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); @@ -572,6 +574,16 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B }, _connection).execute(); } + public DestinationCache<AMQQueue> getQueueDestinationCache() + { + return _queueDestinationCache; + } + + public DestinationCache<AMQTopic> getTopicDestinationCache() + { + return _topicDestinationCache; + } + class QueueDeclareOkHandler extends SpecificMethodFrameListener { @@ -613,12 +625,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B return okHandler._messageCount; } - protected final boolean tagLE(long tag1, long tag2) + protected boolean tagLE(long tag1, long tag2) { return tag1 <= tag2; } - protected final boolean updateRollbackMark(long currentMark, long deliveryTag) + protected boolean updateRollbackMark(long currentMark, long deliveryTag) { return false; } @@ -695,4 +707,55 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B return null; } } + + public abstract static class DestinationCache<T extends AMQDestination> + { + private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>(); + + public T getDestination(AMQShortString exchangeName, AMQShortString routingKey) + { + Map<AMQShortString, T> routingMap = cache.get(exchangeName); + if(routingMap == null) + { + routingMap = new LinkedHashMap<AMQShortString, T>() + { + + protected boolean removeEldestEntry(Map.Entry<AMQShortString, T> eldest) + { + return size() >= 200; + } + }; + cache.put(exchangeName,routingMap); + } + T destination = routingMap.get(routingKey); + if(destination == null) + { + destination = newDestination(exchangeName, routingKey); + routingMap.put(routingKey,destination); + } + return destination; + } + + protected abstract T newDestination(AMQShortString exchangeName, AMQShortString routingKey); + } + + private static class TopicDestinationCache extends DestinationCache<AMQTopic> + { + protected AMQTopic newDestination(AMQShortString exchangeName, AMQShortString routingKey) + { + return new AMQTopic(exchangeName, routingKey, null); + } + } + + private static class QueueDestinationCache extends DestinationCache<AMQQueue> + { + protected AMQQueue newDestination(AMQShortString exchangeName, AMQShortString routingKey) + { + return new AMQQueue(exchangeName, routingKey, routingKey); + } + } + + private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache(); + private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache(); + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index bb277887aa..a49e31ce8c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -272,10 +272,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException { - final RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); _0_10session.messageAcknowledge - (ranges, + (Range.newInstance((int) message.getDeliveryTag()), _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); final AMQException amqe = _0_10session.getCurrentException(); @@ -294,9 +292,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException { - final RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); - _0_10session.flushProcessed(ranges,false); + _0_10session.flushProcessed(Range.newInstance((int) message.getDeliveryTag()),false); final AMQException amqe = _0_10session.getCurrentException(); if (amqe != null) @@ -315,10 +311,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException { boolean result = false; - final RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); - final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); + final Acquired acq = _0_10session.getQpidSession().messageAcquire(Range.newInstance((int)message.getDeliveryTag())).get(); final RangeSet acquired = acq.getTransfers(); if (acquired != null && acquired.size() > 0) @@ -451,7 +445,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (_synchronousQueue.size() > 0) { - RangeSet ranges = new RangeSet(); + RangeSet ranges = RangeSetFactory.createRangeSet(); Iterator iterator = _synchronousQueue.iterator(); while (iterator.hasNext()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index efcbfd5532..b2f4fcef84 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -38,11 +38,13 @@ import org.slf4j.LoggerFactory; public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8> { protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache; + private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache; private final RejectBehaviour _rejectBehaviour; protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session, AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { @@ -60,6 +62,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } + _topicDestinationCache = session.getTopicDestinationCache(); + _queueDestinationCache = session.getQueueDestinationCache(); + if (destination.getRejectBehaviour() != null) { _rejectBehaviour = destination.getRejectBehaviour(); @@ -100,7 +105,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(), messageFrame.getExchange(), - messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(), + _queueDestinationCache, _topicDestinationCache); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 6237234c4d..33ca584b34 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -48,7 +48,10 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic body.getExchange(), body.getRoutingKey(), body.getRedelivered()); - _logger.debug("New JmsDeliver method received:" + session); + if(_logger.isDebugEnabled()) + { + _logger.debug("New JmsDeliver method received:" + session); + } session.unprocessedMessageReceived(channelId, msg); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index f360b546b2..179ebd66d1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -124,7 +124,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate */ public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session) { - DeliveryProperties deliveryProps = header.get(DeliveryProperties.class); + DeliveryProperties deliveryProps = header.getDeliveryProperties(); if (deliveryProps != null) { String exchange = deliveryProps.getExchange(); @@ -132,7 +132,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } - MessageProperties msgProps = header.get(MessageProperties.class); + MessageProperties msgProps = header.getMessageProperties(); if (msgProps != null && msgProps.getReplyTo() != null) { String exchange = msgProps.getReplyTo().getExchange(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index 9ab03412fe..ab7061c382 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -31,12 +31,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.CustomJMSXProperty; -import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.client.*; import org.apache.qpid.collections.ReferenceMap; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -81,7 +76,8 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate // Used when generating a received message object protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey) + AMQShortString routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) { this(contentHeader, deliveryTag); @@ -95,10 +91,10 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate switch (type.intValue()) { case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); + dest = queueDestinationCache.getDestination(exchange, routingKey); break; case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); + dest = topicDestinationCache.getDestination(exchange, routingKey); break; default: // Use the generateDestination method @@ -133,10 +129,66 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (messageId != null) { - getContentHeaderProperties().setMessageId("ID:" + messageId); + getContentHeaderProperties().setMessageId(asShortStringMsgId(messageId)); } } + private static final byte[] HEX_DIGITS = {0x30,0x31,0x32,0x33,0x34,0x35,0x36,0x37,0x38,0x39, + 0x61,0x62,0x63,0x64,0x65,0x66}; + + private static AMQShortString asShortStringMsgId(UUID messageId) + { + long msb = messageId.getMostSignificantBits(); + long lsb = messageId.getLeastSignificantBits(); + + byte[] messageIdBytes = new byte[39]; + messageIdBytes[0] = (byte) 'I'; + messageIdBytes[1] = (byte) 'D'; + messageIdBytes[2] = (byte) ':'; + + messageIdBytes[3] = HEX_DIGITS[(int)((msb >> 60) & 0xFl)]; + messageIdBytes[4] = HEX_DIGITS[(int)((msb >> 56) & 0xFl)]; + messageIdBytes[5] = HEX_DIGITS[(int)((msb >> 52) & 0xFl)]; + messageIdBytes[6] = HEX_DIGITS[(int)((msb >> 48) & 0xFl)]; + messageIdBytes[7] = HEX_DIGITS[(int)((msb >> 44) & 0xFl)]; + messageIdBytes[8] = HEX_DIGITS[(int)((msb >> 40) & 0xFl)]; + messageIdBytes[9] = HEX_DIGITS[(int)((msb >> 36) & 0xFl)]; + messageIdBytes[10] = HEX_DIGITS[(int)((msb >> 32) & 0xFl)]; + + messageIdBytes[11] = (byte) '-'; + messageIdBytes[12] = HEX_DIGITS[(int)((msb >> 28) & 0xFl)]; + messageIdBytes[13] = HEX_DIGITS[(int)((msb >> 24) & 0xFl)]; + messageIdBytes[14] = HEX_DIGITS[(int)((msb >> 20) & 0xFl)]; + messageIdBytes[15] = HEX_DIGITS[(int)((msb >> 16) & 0xFl)]; + messageIdBytes[16] = (byte) '-'; + messageIdBytes[17] = HEX_DIGITS[(int)((msb >> 12) & 0xFl)]; + messageIdBytes[18] = HEX_DIGITS[(int)((msb >> 8) & 0xFl)]; + messageIdBytes[19] = HEX_DIGITS[(int)((msb >> 4) & 0xFl)]; + messageIdBytes[20] = HEX_DIGITS[(int)(msb & 0xFl)]; + messageIdBytes[21] = (byte) '-'; + + messageIdBytes[22] = HEX_DIGITS[(int)((lsb >> 60) & 0xFl)]; + messageIdBytes[23] = HEX_DIGITS[(int)((lsb >> 56) & 0xFl)]; + messageIdBytes[24] = HEX_DIGITS[(int)((lsb >> 52) & 0xFl)]; + messageIdBytes[25] = HEX_DIGITS[(int)((lsb >> 48) & 0xFl)]; + + messageIdBytes[26] = (byte) '-'; + + messageIdBytes[27] = HEX_DIGITS[(int)((lsb >> 44) & 0xFl)]; + messageIdBytes[28] = HEX_DIGITS[(int)((lsb >> 40) & 0xFl)]; + messageIdBytes[29] = HEX_DIGITS[(int)((lsb >> 36) & 0xFl)]; + messageIdBytes[30] = HEX_DIGITS[(int)((lsb >> 32) & 0xFl)]; + messageIdBytes[31] = HEX_DIGITS[(int)((lsb >> 28) & 0xFl)]; + messageIdBytes[32] = HEX_DIGITS[(int)((lsb >> 24) & 0xFl)]; + messageIdBytes[33] = HEX_DIGITS[(int)((lsb >> 20) & 0xFl)]; + messageIdBytes[34] = HEX_DIGITS[(int)((lsb >> 16) & 0xFl)]; + messageIdBytes[35] = HEX_DIGITS[(int)((lsb >> 12) & 0xFl)]; + messageIdBytes[36] = HEX_DIGITS[(int)((lsb >> 8) & 0xFl)]; + messageIdBytes[37] = HEX_DIGITS[(int)((lsb >> 4) & 0xFl)]; + messageIdBytes[38] = HEX_DIGITS[(int)(lsb & 0xFl)]; + + return new AMQShortString(messageIdBytes,0,39); + } public long getJMSTimestamp() throws JMSException { @@ -413,7 +465,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate } checkWritableProperties(); - getJmsHeaders().setByte(propertyName, new Byte(b)); + getJmsHeaders().setByte(propertyName, b); } public void setShortProperty(String propertyName, short i) throws JMSException @@ -424,13 +476,13 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate } checkWritableProperties(); - getJmsHeaders().setShort(propertyName, new Short(i)); + getJmsHeaders().setShort(propertyName, i); } public void setIntProperty(String propertyName, int i) throws JMSException { checkWritableProperties(); - getJmsHeaders().setInteger(propertyName, new Integer(i)); + getJmsHeaders().setInteger(propertyName, i); } public void setLongProperty(String propertyName, long l) throws JMSException @@ -441,7 +493,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate } checkWritableProperties(); - getJmsHeaders().setLong(propertyName, new Long(l)); + getJmsHeaders().setLong(propertyName, l); } public void setFloatProperty(String propertyName, float f) throws JMSException @@ -452,7 +504,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate } checkWritableProperties(); - getJmsHeaders().setFloat(propertyName, new Float(f)); + getJmsHeaders().setFloat(propertyName, f); } public void setDoubleProperty(String propertyName, double v) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 967a1fb49f..16b71db77e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -21,6 +21,9 @@ package org.apache.qpid.client.message; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.AMQTopic; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -44,7 +47,9 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, - List bodies) throws AMQException + List bodies, + AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException { ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); @@ -99,7 +104,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr, (BasicContentHeaderProperties) contentHeader.getProperties(), - exchange, routingKey); + exchange, routingKey, queueDestinationCache, topicDestinationCache); return createMessage(delegate, data); } @@ -149,10 +154,12 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, List bodies) + AMQShortString exchange, AMQShortString routingKey, List bodies, + AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws JMSException, AMQException { - final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); + final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache); msg.setJMSRedelivered(redelivered); msg.setReceivedFromServer(); return msg; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index f3d96cd855..93c2872b2e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -25,6 +25,9 @@ import java.util.List; import javax.jms.JMSException; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.AMQTopic; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.transport.DeliveryProperties; @@ -36,7 +39,7 @@ public interface MessageFactory AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, - List bodies) + List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws JMSException, AMQException; AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index cdb75fc9a9..15ad3ed89f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -28,6 +28,9 @@ import java.nio.ByteBuffer; import javax.jms.JMSException; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.AMQTopic; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; @@ -93,15 +96,19 @@ public class MessageFactoryRegistry * Create a message. This looks up the MIME type from the content header and instantiates the appropriate * concrete message type. * + * * @param deliveryTag the AMQ message id * @param redelivered true if redelivered * @param contentHeader the content header that was received * @param bodies a list of ContentBody instances @return the message. - * @throws AMQException + * @param queueDestinationCache + *@param topicDestinationCache @throws AMQException * @throws JMSException */ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, - AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) + AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies, + AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, + AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException, JMSException { BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties(); @@ -118,13 +125,13 @@ public class MessageFactoryRegistry mf = _default; } - return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies); + return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache); } public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException { - MessageProperties mprop = transfer.getHeader().get(MessageProperties.class); + MessageProperties mprop = transfer.getHeader().getMessageProperties(); String messageType = ""; if ( mprop == null || mprop.getContentType() == null) { @@ -143,7 +150,7 @@ public class MessageFactoryRegistry boolean redelivered = false; DeliveryProperties deliverProps; - if((deliverProps = transfer.getHeader().get(DeliveryProperties.class)) != null) + if((deliverProps = transfer.getHeader().getDeliveryProperties()) != null) { redelivered = deliverProps.getRedelivered(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 284954edba..8911d4ee3e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.protocol; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -409,10 +410,10 @@ public class AMQProtocolHandler implements ProtocolEngine final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); // Decode buffer - - for (AMQDataBlock message : dataBlocks) + int size = dataBlocks.size(); + for (int i = 0; i < size; i++) { - + AMQDataBlock message = dataBlocks.get(i); if (PROTOCOL_DEBUG) { _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); @@ -420,10 +421,10 @@ public class AMQProtocolHandler implements ProtocolEngine if(message instanceof AMQFrame) { - final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; - if (debug && ((msgNumber % 1000) == 0)) + if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled()) { _logger.debug("Received " + _messageReceivedCount + " protocol messages"); } @@ -514,12 +515,20 @@ public class AMQProtocolHandler implements ProtocolEngine return getStateManager().createWaiter(states); } - public synchronized void writeFrame(AMQDataBlock frame) + public void writeFrame(AMQDataBlock frame) + { + writeFrame(frame, true); + } + + public synchronized void writeFrame(AMQDataBlock frame, boolean flush) { final ByteBuffer buf = asByteBuffer(frame); _writtenBytes += buf.remaining(); _sender.send(buf); - _sender.flush(); + if(flush) + { + _sender.flush(); + } if (PROTOCOL_DEBUG) { @@ -539,35 +548,51 @@ public class AMQProtocolHandler implements ProtocolEngine } + private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; + private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; + private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); + private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); + private ByteBuffer asByteBuffer(AMQDataBlock block) { - final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + final int size = (int) block.getSize(); - try - { - block.writePayload(new DataOutputStream(new OutputStream() - { + final byte[] data; - @Override - public void write(int b) throws IOException - { - buf.put((byte) b); - } + if(size > REUSABLE_BYTE_BUFFER_CAPACITY) + { + data= new byte[size]; + } + else + { - @Override - public void write(byte[] b, int off, int len) throws IOException - { - buf.put(b, off, len); - } - })); + data = _reusableBytes; + } + _reusableDataOutput.setBuffer(data); + + try + { + block.writePayload(_reusableDataOutput); } catch (IOException e) { throw new RuntimeException(e); } - buf.flip(); + final ByteBuffer buf; + + if(size < REUSABLE_BYTE_BUFFER_CAPACITY) + { + buf = _reusableByteBuffer; + buf.position(0); + } + else + { + buf = ByteBuffer.wrap(data); + } + buf.limit(_reusableDataOutput.length()); + return buf; } @@ -840,4 +865,160 @@ public class AMQProtocolHandler implements ProtocolEngine return _suggestedProtocolVersion; } + private static class BytesDataOutput implements DataOutput + { + int _pos = 0; + byte[] _buf; + + public BytesDataOutput(byte[] buf) + { + _buf = buf; + } + + public void setBuffer(byte[] buf) + { + _buf = buf; + _pos = 0; + } + + public void reset() + { + _pos = 0; + } + + public int length() + { + return _pos; + } + + public void write(int b) + { + _buf[_pos++] = (byte) b; + } + + public void write(byte[] b) + { + System.arraycopy(b, 0, _buf, _pos, b.length); + _pos+=b.length; + } + + + public void write(byte[] b, int off, int len) + { + System.arraycopy(b, off, _buf, _pos, len); + _pos+=len; + + } + + public void writeBoolean(boolean v) + { + _buf[_pos++] = v ? (byte) 1 : (byte) 0; + } + + public void writeByte(int v) + { + _buf[_pos++] = (byte) v; + } + + public void writeShort(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeChar(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeInt(int v) + { + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeLong(long v) + { + _buf[_pos++] = (byte) (v >>> 56); + _buf[_pos++] = (byte) (v >>> 48); + _buf[_pos++] = (byte) (v >>> 40); + _buf[_pos++] = (byte) (v >>> 32); + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte)v; + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + _buf[_pos++] = ((byte)s.charAt(i)); + } + } + + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + int v = s.charAt(i); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + } + + public void writeUTF(String s) + { + int strlen = s.length(); + + int pos = _pos; + _pos+=2; + + + for (int i = 0; i < strlen; i++) + { + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + { + c = s.charAt(i); + _buf[_pos++] = (byte) c; + + } + else if (c > 0x07FF) + { + _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + else + { + _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + } + + int len = _pos - (pos + 2); + + _buf[pos++] = (byte) (len >>> 8); + _buf[pos] = (byte) len; + } + + } + + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java index 10fd8d2a80..2f1eda6ef2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java @@ -49,19 +49,9 @@ public class MessagePartListenerAdapter implements MessagePartListener { _currentMsg = new ByteBufferMessage(xfr.getId()); - for (Struct st : xfr.getHeader().getStructs()) - { - if(st instanceof DeliveryProperties) - { - _currentMsg.setDeliveryProperties((DeliveryProperties)st); - - } - else if(st instanceof MessageProperties) - { - _currentMsg.setMessageProperties((MessageProperties)st); - } - - } + Header header = xfr.getHeader(); + _currentMsg.setDeliveryProperties(header.getDeliveryProperties()); + _currentMsg.setMessageProperties(header.getMessageProperties()); ByteBuffer body = xfr.getBody(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java index d8d94ba40e..02089cc382 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java @@ -46,8 +46,9 @@ public class BasicMessageConsumer_0_8_Test extends TestCase AMQBindingURL burl = new AMQBindingURL(url); AMQDestination queue = new AMQQueue(burl); - AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); - BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + TestAMQSession testSession = new TestAMQSession(conn); + BasicMessageConsumer_0_8 consumer = + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour()); } @@ -65,8 +66,9 @@ public class BasicMessageConsumer_0_8_Test extends TestCase final AMQBindingURL burl = new AMQBindingURL(url); final AMQDestination queue = new AMQQueue(burl); - final AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); - final BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + final TestAMQSession testSession = new TestAMQSession(conn); + final BasicMessageConsumer_0_8 consumer = + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); } @@ -90,8 +92,9 @@ public class BasicMessageConsumer_0_8_Test extends TestCase assertNull("Reject behaviour should have been null", queue.getRejectBehaviour()); - AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn); - BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); + TestAMQSession testSession = new TestAMQSession(conn); + BasicMessageConsumer_0_8 consumer = + new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false); assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour()); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 06d0f4a3f9..4c3e9c2390 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -29,12 +29,7 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.BasicMessageConsumer_0_8; -import org.apache.qpid.client.BasicMessageProducer_0_8; -import org.apache.qpid.client.MockAMQConnection; +import org.apache.qpid.client.*; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -42,7 +37,7 @@ import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> +public class TestAMQSession extends AMQSession_0_8 { public TestAMQSession(AMQConnection connection) @@ -92,7 +87,7 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe return null; } - protected void sendRecover() throws AMQException, FailoverException + public void sendRecover() throws AMQException, FailoverException { } |
