summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java77
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java82
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java15
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java17
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java229
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java16
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java15
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java11
14 files changed, 408 insertions, 121 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 756b5cacb0..8dc36673dc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -60,23 +59,7 @@ import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.*;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
@@ -141,13 +124,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private TimerTask flushTask = null;
- private RangeSet unacked = new RangeSet();
+ private RangeSet unacked = RangeSetFactory.createRangeSet();
private int unackedCount = 0;
/**
* Used to store the range of in tx messages
*/
- private final RangeSet _txRangeSet = new RangeSet();
+ private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet();
private int _txSize = 0;
//--- constructors
@@ -460,7 +443,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
- RangeSet all = new RangeSet();
+ RangeSet all = RangeSetFactory.createRangeSet();
RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
@@ -483,7 +466,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags)
{
- RangeSet ranges = new RangeSet();
+ RangeSet ranges = RangeSetFactory.createRangeSet();
while (true)
{
Long tag = messageTags.poll();
@@ -518,7 +501,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void rejectMessage(long deliveryTag, boolean requeue)
{
// The value of requeue is always true
- RangeSet ranges = new RangeSet();
+ RangeSet ranges = RangeSetFactory.createRangeSet();
ranges.add((int) deliveryTag);
flushProcessed(ranges, false);
if (requeue)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 96df463481..ab3a0284a6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,6 +21,8 @@
package org.apache.qpid.client;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.Map;
@@ -80,7 +82,7 @@ import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
+public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -96,8 +98,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
- AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ protected AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
@@ -150,7 +152,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
- getProtocolHandler().writeFrame(ackFrame);
+ getProtocolHandler().writeFrame(ackFrame, !getTransacted());
_unacknowledgedMessageTags.remove(deliveryTag);
}
@@ -512,7 +514,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
_messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
@@ -572,6 +574,16 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}, _connection).execute();
}
+ public DestinationCache<AMQQueue> getQueueDestinationCache()
+ {
+ return _queueDestinationCache;
+ }
+
+ public DestinationCache<AMQTopic> getTopicDestinationCache()
+ {
+ return _topicDestinationCache;
+ }
+
class QueueDeclareOkHandler extends SpecificMethodFrameListener
{
@@ -613,12 +625,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
return okHandler._messageCount;
}
- protected final boolean tagLE(long tag1, long tag2)
+ protected boolean tagLE(long tag1, long tag2)
{
return tag1 <= tag2;
}
- protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return false;
}
@@ -695,4 +707,55 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
return null;
}
}
+
+ public abstract static class DestinationCache<T extends AMQDestination>
+ {
+ private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>();
+
+ public T getDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ Map<AMQShortString, T> routingMap = cache.get(exchangeName);
+ if(routingMap == null)
+ {
+ routingMap = new LinkedHashMap<AMQShortString, T>()
+ {
+
+ protected boolean removeEldestEntry(Map.Entry<AMQShortString, T> eldest)
+ {
+ return size() >= 200;
+ }
+ };
+ cache.put(exchangeName,routingMap);
+ }
+ T destination = routingMap.get(routingKey);
+ if(destination == null)
+ {
+ destination = newDestination(exchangeName, routingKey);
+ routingMap.put(routingKey,destination);
+ }
+ return destination;
+ }
+
+ protected abstract T newDestination(AMQShortString exchangeName, AMQShortString routingKey);
+ }
+
+ private static class TopicDestinationCache extends DestinationCache<AMQTopic>
+ {
+ protected AMQTopic newDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ return new AMQTopic(exchangeName, routingKey, null);
+ }
+ }
+
+ private static class QueueDestinationCache extends DestinationCache<AMQQueue>
+ {
+ protected AMQQueue newDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ return new AMQQueue(exchangeName, routingKey, routingKey);
+ }
+ }
+
+ private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache();
+ private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache();
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index bb277887aa..a49e31ce8c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -272,10 +272,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
{
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
_0_10session.messageAcknowledge
- (ranges,
+ (Range.newInstance((int) message.getDeliveryTag()),
_acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
final AMQException amqe = _0_10session.getCurrentException();
@@ -294,9 +292,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
{
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.flushProcessed(ranges,false);
+ _0_10session.flushProcessed(Range.newInstance((int) message.getDeliveryTag()),false);
final AMQException amqe = _0_10session.getCurrentException();
if (amqe != null)
@@ -315,10 +311,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ final Acquired acq = _0_10session.getQpidSession().messageAcquire(Range.newInstance((int)message.getDeliveryTag())).get();
final RangeSet acquired = acq.getTransfers();
if (acquired != null && acquired.size() > 0)
@@ -451,7 +445,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_synchronousQueue.size() > 0)
{
- RangeSet ranges = new RangeSet();
+ RangeSet ranges = RangeSetFactory.createRangeSet();
Iterator iterator = _synchronousQueue.iterator();
while (iterator.hasNext())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index efcbfd5532..b2f4fcef84 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -38,11 +38,13 @@ import org.slf4j.LoggerFactory;
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
+ private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;
private final RejectBehaviour _rejectBehaviour;
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
@@ -60,6 +62,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+ _topicDestinationCache = session.getTopicDestinationCache();
+ _queueDestinationCache = session.getQueueDestinationCache();
+
if (destination.getRejectBehaviour() != null)
{
_rejectBehaviour = destination.getRejectBehaviour();
@@ -100,7 +105,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
+ _queueDestinationCache, _topicDestinationCache);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index 6237234c4d..33ca584b34 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -48,7 +48,10 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic
body.getExchange(),
body.getRoutingKey(),
body.getRedelivered());
- _logger.debug("New JmsDeliver method received:" + session);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("New JmsDeliver method received:" + session);
+ }
session.unprocessedMessageReceived(channelId, msg);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index f360b546b2..179ebd66d1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -124,7 +124,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
*/
public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session)
{
- DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
+ DeliveryProperties deliveryProps = header.getDeliveryProperties();
if (deliveryProps != null)
{
String exchange = deliveryProps.getExchange();
@@ -132,7 +132,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
- MessageProperties msgProps = header.get(MessageProperties.class);
+ MessageProperties msgProps = header.getMessageProperties();
if (msgProps != null && msgProps.getReplyTo() != null)
{
String exchange = msgProps.getReplyTo().getExchange();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index 9ab03412fe..ab7061c382 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -31,12 +31,7 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.client.*;
import org.apache.qpid.collections.ReferenceMap;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -81,7 +76,8 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
// Used when generating a received message object
protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey)
+ AMQShortString routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
{
this(contentHeader, deliveryTag);
@@ -95,10 +91,10 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
switch (type.intValue())
{
case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
+ dest = queueDestinationCache.getDestination(exchange, routingKey);
break;
case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
+ dest = topicDestinationCache.getDestination(exchange, routingKey);
break;
default:
// Use the generateDestination method
@@ -133,10 +129,66 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (messageId != null)
{
- getContentHeaderProperties().setMessageId("ID:" + messageId);
+ getContentHeaderProperties().setMessageId(asShortStringMsgId(messageId));
}
}
+ private static final byte[] HEX_DIGITS = {0x30,0x31,0x32,0x33,0x34,0x35,0x36,0x37,0x38,0x39,
+ 0x61,0x62,0x63,0x64,0x65,0x66};
+
+ private static AMQShortString asShortStringMsgId(UUID messageId)
+ {
+ long msb = messageId.getMostSignificantBits();
+ long lsb = messageId.getLeastSignificantBits();
+
+ byte[] messageIdBytes = new byte[39];
+ messageIdBytes[0] = (byte) 'I';
+ messageIdBytes[1] = (byte) 'D';
+ messageIdBytes[2] = (byte) ':';
+
+ messageIdBytes[3] = HEX_DIGITS[(int)((msb >> 60) & 0xFl)];
+ messageIdBytes[4] = HEX_DIGITS[(int)((msb >> 56) & 0xFl)];
+ messageIdBytes[5] = HEX_DIGITS[(int)((msb >> 52) & 0xFl)];
+ messageIdBytes[6] = HEX_DIGITS[(int)((msb >> 48) & 0xFl)];
+ messageIdBytes[7] = HEX_DIGITS[(int)((msb >> 44) & 0xFl)];
+ messageIdBytes[8] = HEX_DIGITS[(int)((msb >> 40) & 0xFl)];
+ messageIdBytes[9] = HEX_DIGITS[(int)((msb >> 36) & 0xFl)];
+ messageIdBytes[10] = HEX_DIGITS[(int)((msb >> 32) & 0xFl)];
+
+ messageIdBytes[11] = (byte) '-';
+ messageIdBytes[12] = HEX_DIGITS[(int)((msb >> 28) & 0xFl)];
+ messageIdBytes[13] = HEX_DIGITS[(int)((msb >> 24) & 0xFl)];
+ messageIdBytes[14] = HEX_DIGITS[(int)((msb >> 20) & 0xFl)];
+ messageIdBytes[15] = HEX_DIGITS[(int)((msb >> 16) & 0xFl)];
+ messageIdBytes[16] = (byte) '-';
+ messageIdBytes[17] = HEX_DIGITS[(int)((msb >> 12) & 0xFl)];
+ messageIdBytes[18] = HEX_DIGITS[(int)((msb >> 8) & 0xFl)];
+ messageIdBytes[19] = HEX_DIGITS[(int)((msb >> 4) & 0xFl)];
+ messageIdBytes[20] = HEX_DIGITS[(int)(msb & 0xFl)];
+ messageIdBytes[21] = (byte) '-';
+
+ messageIdBytes[22] = HEX_DIGITS[(int)((lsb >> 60) & 0xFl)];
+ messageIdBytes[23] = HEX_DIGITS[(int)((lsb >> 56) & 0xFl)];
+ messageIdBytes[24] = HEX_DIGITS[(int)((lsb >> 52) & 0xFl)];
+ messageIdBytes[25] = HEX_DIGITS[(int)((lsb >> 48) & 0xFl)];
+
+ messageIdBytes[26] = (byte) '-';
+
+ messageIdBytes[27] = HEX_DIGITS[(int)((lsb >> 44) & 0xFl)];
+ messageIdBytes[28] = HEX_DIGITS[(int)((lsb >> 40) & 0xFl)];
+ messageIdBytes[29] = HEX_DIGITS[(int)((lsb >> 36) & 0xFl)];
+ messageIdBytes[30] = HEX_DIGITS[(int)((lsb >> 32) & 0xFl)];
+ messageIdBytes[31] = HEX_DIGITS[(int)((lsb >> 28) & 0xFl)];
+ messageIdBytes[32] = HEX_DIGITS[(int)((lsb >> 24) & 0xFl)];
+ messageIdBytes[33] = HEX_DIGITS[(int)((lsb >> 20) & 0xFl)];
+ messageIdBytes[34] = HEX_DIGITS[(int)((lsb >> 16) & 0xFl)];
+ messageIdBytes[35] = HEX_DIGITS[(int)((lsb >> 12) & 0xFl)];
+ messageIdBytes[36] = HEX_DIGITS[(int)((lsb >> 8) & 0xFl)];
+ messageIdBytes[37] = HEX_DIGITS[(int)((lsb >> 4) & 0xFl)];
+ messageIdBytes[38] = HEX_DIGITS[(int)(lsb & 0xFl)];
+
+ return new AMQShortString(messageIdBytes,0,39);
+ }
public long getJMSTimestamp() throws JMSException
{
@@ -413,7 +465,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
}
checkWritableProperties();
- getJmsHeaders().setByte(propertyName, new Byte(b));
+ getJmsHeaders().setByte(propertyName, b);
}
public void setShortProperty(String propertyName, short i) throws JMSException
@@ -424,13 +476,13 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
}
checkWritableProperties();
- getJmsHeaders().setShort(propertyName, new Short(i));
+ getJmsHeaders().setShort(propertyName, i);
}
public void setIntProperty(String propertyName, int i) throws JMSException
{
checkWritableProperties();
- getJmsHeaders().setInteger(propertyName, new Integer(i));
+ getJmsHeaders().setInteger(propertyName, i);
}
public void setLongProperty(String propertyName, long l) throws JMSException
@@ -441,7 +493,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
}
checkWritableProperties();
- getJmsHeaders().setLong(propertyName, new Long(l));
+ getJmsHeaders().setLong(propertyName, l);
}
public void setFloatProperty(String propertyName, float f) throws JMSException
@@ -452,7 +504,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
}
checkWritableProperties();
- getJmsHeaders().setFloat(propertyName, new Float(f));
+ getJmsHeaders().setFloat(propertyName, f);
}
public void setDoubleProperty(String propertyName, double v) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 967a1fb49f..16b71db77e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -21,6 +21,9 @@
package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -44,7 +47,9 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
AMQShortString exchange, AMQShortString routingKey,
- List bodies) throws AMQException
+ List bodies,
+ AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -99,7 +104,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
(BasicContentHeaderProperties) contentHeader.getProperties(),
- exchange, routingKey);
+ exchange, routingKey, queueDestinationCache, topicDestinationCache);
return createMessage(delegate, data);
}
@@ -149,10 +154,12 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies)
+ AMQShortString exchange, AMQShortString routingKey, List bodies,
+ AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
throws JMSException, AMQException
{
- final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+ final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
msg.setJMSRedelivered(redelivered);
msg.setReceivedFromServer();
return msg;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index f3d96cd855..93c2872b2e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -25,6 +25,9 @@ import java.util.List;
import javax.jms.JMSException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.DeliveryProperties;
@@ -36,7 +39,7 @@ public interface MessageFactory
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
ContentHeaderBody contentHeader,
AMQShortString exchange, AMQShortString routingKey,
- List bodies)
+ List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
throws JMSException, AMQException;
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index cdb75fc9a9..15ad3ed89f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -28,6 +28,9 @@ import java.nio.ByteBuffer;
import javax.jms.JMSException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -93,15 +96,19 @@ public class MessageFactoryRegistry
* Create a message. This looks up the MIME type from the content header and instantiates the appropriate
* concrete message type.
*
+ *
* @param deliveryTag the AMQ message id
* @param redelivered true if redelivered
* @param contentHeader the content header that was received
* @param bodies a list of ContentBody instances @return the message.
- * @throws AMQException
+ * @param queueDestinationCache
+ *@param topicDestinationCache @throws AMQException
* @throws JMSException
*/
public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
- AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
+ AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies,
+ AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
throws AMQException, JMSException
{
BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
@@ -118,13 +125,13 @@ public class MessageFactoryRegistry
mf = _default;
}
- return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
}
public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
{
- MessageProperties mprop = transfer.getHeader().get(MessageProperties.class);
+ MessageProperties mprop = transfer.getHeader().getMessageProperties();
String messageType = "";
if ( mprop == null || mprop.getContentType() == null)
{
@@ -143,7 +150,7 @@ public class MessageFactoryRegistry
boolean redelivered = false;
DeliveryProperties deliverProps;
- if((deliverProps = transfer.getHeader().get(DeliveryProperties.class)) != null)
+ if((deliverProps = transfer.getHeader().getDeliveryProperties()) != null)
{
redelivered = deliverProps.getRedelivered();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 284954edba..8911d4ee3e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -409,10 +410,10 @@ public class AMQProtocolHandler implements ProtocolEngine
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
// Decode buffer
-
- for (AMQDataBlock message : dataBlocks)
+ int size = dataBlocks.size();
+ for (int i = 0; i < size; i++)
{
-
+ AMQDataBlock message = dataBlocks.get(i);
if (PROTOCOL_DEBUG)
{
_protocolLogger.info(String.format("RECV: [%s] %s", this, message));
@@ -420,10 +421,10 @@ public class AMQProtocolHandler implements ProtocolEngine
if(message instanceof AMQFrame)
{
- final boolean debug = _logger.isDebugEnabled();
+
final long msgNumber = ++_messageReceivedCount;
- if (debug && ((msgNumber % 1000) == 0))
+ if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled())
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
@@ -514,12 +515,20 @@ public class AMQProtocolHandler implements ProtocolEngine
return getStateManager().createWaiter(states);
}
- public synchronized void writeFrame(AMQDataBlock frame)
+ public void writeFrame(AMQDataBlock frame)
+ {
+ writeFrame(frame, true);
+ }
+
+ public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
_sender.send(buf);
- _sender.flush();
+ if(flush)
+ {
+ _sender.flush();
+ }
if (PROTOCOL_DEBUG)
{
@@ -539,35 +548,51 @@ public class AMQProtocolHandler implements ProtocolEngine
}
+ private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+ private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+ private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+ private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
private ByteBuffer asByteBuffer(AMQDataBlock block)
{
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+ final int size = (int) block.getSize();
- try
- {
- block.writePayload(new DataOutputStream(new OutputStream()
- {
+ final byte[] data;
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
+ if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ data= new byte[size];
+ }
+ else
+ {
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
+ data = _reusableBytes;
+ }
+ _reusableDataOutput.setBuffer(data);
+
+ try
+ {
+ block.writePayload(_reusableDataOutput);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- buf.flip();
+ final ByteBuffer buf;
+
+ if(size < REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ buf = _reusableByteBuffer;
+ buf.position(0);
+ }
+ else
+ {
+ buf = ByteBuffer.wrap(data);
+ }
+ buf.limit(_reusableDataOutput.length());
+
return buf;
}
@@ -840,4 +865,160 @@ public class AMQProtocolHandler implements ProtocolEngine
return _suggestedProtocolVersion;
}
+ private static class BytesDataOutput implements DataOutput
+ {
+ int _pos = 0;
+ byte[] _buf;
+
+ public BytesDataOutput(byte[] buf)
+ {
+ _buf = buf;
+ }
+
+ public void setBuffer(byte[] buf)
+ {
+ _buf = buf;
+ _pos = 0;
+ }
+
+ public void reset()
+ {
+ _pos = 0;
+ }
+
+ public int length()
+ {
+ return _pos;
+ }
+
+ public void write(int b)
+ {
+ _buf[_pos++] = (byte) b;
+ }
+
+ public void write(byte[] b)
+ {
+ System.arraycopy(b, 0, _buf, _pos, b.length);
+ _pos+=b.length;
+ }
+
+
+ public void write(byte[] b, int off, int len)
+ {
+ System.arraycopy(b, off, _buf, _pos, len);
+ _pos+=len;
+
+ }
+
+ public void writeBoolean(boolean v)
+ {
+ _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+ }
+
+ public void writeByte(int v)
+ {
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeShort(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeChar(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeInt(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeLong(long v)
+ {
+ _buf[_pos++] = (byte) (v >>> 56);
+ _buf[_pos++] = (byte) (v >>> 48);
+ _buf[_pos++] = (byte) (v >>> 40);
+ _buf[_pos++] = (byte) (v >>> 32);
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte)v;
+ }
+
+ public void writeFloat(float v)
+ {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v)
+ {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ _buf[_pos++] = ((byte)s.charAt(i));
+ }
+ }
+
+ public void writeChars(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ int v = s.charAt(i);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+ }
+
+ public void writeUTF(String s)
+ {
+ int strlen = s.length();
+
+ int pos = _pos;
+ _pos+=2;
+
+
+ for (int i = 0; i < strlen; i++)
+ {
+ int c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
+ {
+ c = s.charAt(i);
+ _buf[_pos++] = (byte) c;
+
+ }
+ else if (c > 0x07FF)
+ {
+ _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ else
+ {
+ _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ }
+
+ int len = _pos - (pos + 2);
+
+ _buf[pos++] = (byte) (len >>> 8);
+ _buf[pos] = (byte) len;
+ }
+
+ }
+
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
index 10fd8d2a80..2f1eda6ef2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
@@ -49,19 +49,9 @@ public class MessagePartListenerAdapter implements MessagePartListener
{
_currentMsg = new ByteBufferMessage(xfr.getId());
- for (Struct st : xfr.getHeader().getStructs())
- {
- if(st instanceof DeliveryProperties)
- {
- _currentMsg.setDeliveryProperties((DeliveryProperties)st);
-
- }
- else if(st instanceof MessageProperties)
- {
- _currentMsg.setMessageProperties((MessageProperties)st);
- }
-
- }
+ Header header = xfr.getHeader();
+ _currentMsg.setDeliveryProperties(header.getDeliveryProperties());
+ _currentMsg.setMessageProperties(header.getMessageProperties());
ByteBuffer body = xfr.getBody();
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
index d8d94ba40e..02089cc382 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
@@ -46,8 +46,9 @@ public class BasicMessageConsumer_0_8_Test extends TestCase
AMQBindingURL burl = new AMQBindingURL(url);
AMQDestination queue = new AMQQueue(burl);
- AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
- BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ TestAMQSession testSession = new TestAMQSession(conn);
+ BasicMessageConsumer_0_8 consumer =
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour());
}
@@ -65,8 +66,9 @@ public class BasicMessageConsumer_0_8_Test extends TestCase
final AMQBindingURL burl = new AMQBindingURL(url);
final AMQDestination queue = new AMQQueue(burl);
- final AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
- final BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ final TestAMQSession testSession = new TestAMQSession(conn);
+ final BasicMessageConsumer_0_8 consumer =
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
}
@@ -90,8 +92,9 @@ public class BasicMessageConsumer_0_8_Test extends TestCase
assertNull("Reject behaviour should have been null", queue.getRejectBehaviour());
- AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
- BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ TestAMQSession testSession = new TestAMQSession(conn);
+ BasicMessageConsumer_0_8 consumer =
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 06d0f4a3f9..4c3e9c2390 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -29,12 +29,7 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BasicMessageConsumer_0_8;
-import org.apache.qpid.client.BasicMessageProducer_0_8;
-import org.apache.qpid.client.MockAMQConnection;
+import org.apache.qpid.client.*;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -42,7 +37,7 @@ import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
+public class TestAMQSession extends AMQSession_0_8
{
public TestAMQSession(AMQConnection connection)
@@ -92,7 +87,7 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return null;
}
- protected void sendRecover() throws AMQException, FailoverException
+ public void sendRecover() throws AMQException, FailoverException
{
}