diff options
Diffstat (limited to 'qpid/java')
35 files changed, 1367 insertions, 1428 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5fde08cbdd..3b290b3d51 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -35,12 +35,12 @@ import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.UnauthorizedAccessException; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; @@ -108,7 +108,7 @@ public class AMQChannel private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); - private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); + private MessageFactory _messageHandleFactory = new MessageFactory(); // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 4949e5b41d..ea94f23ff9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -28,7 +28,6 @@ package org.apache.qpid.server.output.amqp0_8; import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -37,8 +36,6 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import org.apache.mina.common.ByteBuffer;
-
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -79,11 +76,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -100,7 +93,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter // Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -112,7 +105,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -126,8 +119,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
@@ -135,7 +126,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -150,7 +141,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter // Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -162,7 +153,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -179,7 +170,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicDeliverBody deliverBody =
@@ -188,18 +178,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
-
-
- return deliverFrame;
+ return deliverBody.generateFrame(channelId);
}
private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
@@ -208,9 +194,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -231,9 +215,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
+
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 00a15d2d50..b71b118275 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -28,9 +28,7 @@ import java.util.Iterator; import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -77,12 +75,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -99,7 +92,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter // Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
@@ -111,7 +104,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -123,9 +116,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
+ return ContentHeaderBody.createAMQFrame(channelId, contentHeaderBody);
}
@@ -133,15 +124,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter {
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -156,7 +145,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter // Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -168,7 +157,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -190,7 +179,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQBody returnBlock = new AMQBody()
+ return new AMQBody()
{
public AMQBody _underlyingBody;
@@ -238,7 +227,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter throw new AMQException("This block should never be dispatched!");
}
};
- return returnBlock;
}
private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
@@ -253,9 +241,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -276,9 +262,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index d73d37f48d..2bd6e612f8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,363 +20,52 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.TransactionalContext; - +import org.apache.qpid.AMQException; import java.util.Iterator; -import java.util.concurrent.atomic.AtomicInteger; -/** - * A deliverable message. - */ -public class AMQMessage +public interface AMQMessage { - /** Used for debugging purposes. */ - private static final Logger _log = Logger.getLogger(AMQMessage.class); - - private final AtomicInteger _referenceCount = new AtomicInteger(1); - - private final AMQMessageHandle _messageHandle; - - /** Holds the transactional context in which this message is being processed. */ - private StoreContext _storeContext; - - /** Flag to indicate that this message requires 'immediate' delivery. */ - - private static final byte IMMEDIATE = 0x01; - - /** - * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality - * for messages published with the 'immediate' flag. - */ - - private static final byte DELIVERED_TO_CONSUMER = 0x02; - - private byte _flags = 0; - - private long _expiration; - - private final long _size; - - private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; - private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); - - - - /** - * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory - * therefore is memory-efficient. - */ - private class BodyFrameIterator implements Iterator<AMQDataBlock> - { - private int _channel; - - private int _index = -1; - private AMQProtocolSession _protocolSession; - - private BodyFrameIterator(AMQProtocolSession protocolSession, int channel) - { - _channel = channel; - _protocolSession = protocolSession; - } - - public boolean hasNext() - { - try - { - return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1); - } - catch (AMQException e) - { - _log.error("Unable to get body count: " + e, e); - - return false; - } - } - - public AMQDataBlock next() - { - try - { - - AMQBody cb = - getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), - ++_index)); - - return new AMQFrame(_channel, cb); - } - catch (AMQException e) - { - // have no choice but to throw a runtime exception - throw new RuntimeException("Error getting content body: " + e, e); - } - - } - - private ProtocolVersionMethodConverter getProtocolVersionMethodConverter() - { - return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - public void clearStoreContext() - { - _storeContext = new StoreContext(); - } - - public StoreContext getStoreContext() - { - return _storeContext; - } - - private class BodyContentIterator implements Iterator<ContentChunk> - { - - private int _index = -1; - - public boolean hasNext() - { - try - { - return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1); - } - catch (AMQException e) - { - _log.error("Error getting body count: " + e, e); - - return false; - } - } - - public ContentChunk next() - { - try - { - return _messageHandle.getContentChunk(getStoreContext(), ++_index); - } - catch (AMQException e) - { - throw new RuntimeException("Error getting content body: " + e, e); - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - - - /** - * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal - * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to - * queues. - * - * @param messageId - * @param store - * @param factory - * - * @throws AMQException - */ - public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) - throws AMQException - { - _messageHandle = factory.createMessageHandle(messageId, store, true); - _storeContext = txnConext.getStoreContext(); - _size = _messageHandle.getBodySize(txnConext.getStoreContext()); - } - - /** - * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal - * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to - * queues. - * - * @param messageHandle - * - * @throws AMQException - */ - public AMQMessage( - AMQMessageHandle messageHandle, - StoreContext storeConext, - MessagePublishInfo info) - throws AMQException - { - _messageHandle = messageHandle; - _storeContext = storeConext; - - if(info.isImmediate()) - { - _flags |= IMMEDIATE; - } - _size = messageHandle.getBodySize(storeConext); - - } + //Get Content relating to this message + Long getMessageId(); - protected AMQMessage(AMQMessage msg) throws AMQException - { - _messageHandle = msg._messageHandle; - _storeContext = msg._storeContext; - _flags = msg._flags; - _size = msg._size; + Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel); - } + Iterator<ContentChunk> getContentBodyIterator(); + ContentHeaderBody getContentHeaderBody(); - public String debugIdentity() - { - return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")"; - } + ContentChunk getContentChunk(int index); - public void setExpiration(final long expiration) - { + Object getPublisherClientInstance(); - _expiration = expiration; + Object getPublisherIdentifier(); - } + MessagePublishInfo getMessagePublishInfo(); - public boolean isReferenced() - { - return _referenceCount.get() > 0; - } + int getBodyCount(); - public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) - { - return new BodyFrameIterator(protocolSession, channel); - } + long getSize(); - public Iterator<ContentChunk> getContentBodyIterator() - { - return new BodyContentIterator(); - } + long getArrivalTime(); - public ContentHeaderBody getContentHeaderBody() throws AMQException - { - return _messageHandle.getContentHeaderBody(getStoreContext()); - } - - public Long getMessageId() - { - return _messageHandle.getMessageId(); - } - - /** - * Creates a long-lived reference to this message, and increments the count of such references, as an atomic - * operation. - */ - public AMQMessage takeReference() - { - incrementReference(); // _referenceCount.incrementAndGet(); - - return this; - } - - public boolean incrementReference() - { - return incrementReference(1); - } - - /* Threadsafe. Increment the reference count on the message. */ - public boolean incrementReference(int count) - { - if(_referenceCount.addAndGet(count) <= 1) - { - _referenceCount.addAndGet(-count); - return false; - } - else - { - return true; - } - - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - * - * @param storeContext - * - * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that - * failed - */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException - { - - int count = _referenceCount.decrementAndGet(); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE/2); - - try - { - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - if (_messageHandle != null) - { - _messageHandle.removeMessage(storeContext); - } - } - catch (AMQException e) - { - // to maintain consistency, we revert the count - incrementReference(); - throw new MessageCleanupException(getMessageId(), e); - } - } - else - { - if (count < 0) - { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() - + " has gone below 0."); - } - } - } - + //Check the status of this message /** * Called selectors to determin if the message has already been sent * * @return _deliveredToConsumer */ - public boolean getDeliveredToConsumer() - { - return (_flags & DELIVERED_TO_CONSUMER) != 0; - } - - public boolean isPersistent() throws AMQException - { - return _messageHandle.isPersistent(); - } + boolean getDeliveredToConsumer(); /** * Called to enforce the 'immediate' flag. @@ -384,89 +73,62 @@ public class AMQMessage * @returns true if the message is marked for immediate delivery but has not been marked as delivered * to a consumer */ - public boolean immediateAndNotDelivered() - { - - return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; - - } - - public MessagePublishInfo getMessagePublishInfo() throws AMQException - { - return _messageHandle.getMessagePublishInfo(getStoreContext()); - } - - public long getArrivalTime() - { - return _messageHandle.getArrivalTime(); - } + boolean immediateAndNotDelivered(); /** * Checks to see if the message has expired. If it has the message is dequeued. * - * @param queue The queue to check the expiration against. (Currently not used) - * * @return true if the message has expire * - * @throws AMQException + * @throws org.apache.qpid.AMQException */ - public boolean expired(AMQQueue queue) throws AMQException - { + boolean expired() throws AMQException; - if (_expiration != 0L) - { - long now = System.currentTimeMillis(); - - return (now > _expiration); - } + /** Is this a persistent message + * + * @return true if the message is persistent + */ + boolean isPersistent(); - return false; - } /** * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). * And for selector efficiency. */ - public void setDeliveredToConsumer() - { - _flags |= DELIVERED_TO_CONSUMER; - } + void setDeliveredToConsumer(); + + void setExpiration(long expiration); + + void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier); + + /** + * This is called when all the content has been received. + * @param storeContext + *@param messagePublishInfo + * @param contentHeaderBody @throws org.apache.qpid.AMQException + */ + void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) + throws AMQException; + void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) + throws AMQException; - public AMQMessageHandle getMessageHandle() - { - return _messageHandle; - } + void removeMessage(StoreContext storeContext) throws AMQException; - public long getSize() - { - return _size; + String toString(); - } + String debugIdentity(); - public Object getPublisherClientInstance() - { - return _sessionIdentifier.getSessionInstance(); - } - - public Object getPublisherIdentifier() - { - return _sessionIdentifier.getSessionIdentifier(); - } + // Reference counting methods - public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier) - { - _sessionIdentifier = sessionIdentifier; - } + void decrementReference(StoreContext storeContext) throws MessageCleanupException; + boolean incrementReference(int queueCount); - public String toString() - { - // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + - // _taken + " by :" + _takenBySubcription; + boolean incrementReference(); - return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount; - } + AMQMessage takeReference(); + boolean isReferenced(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index f5853bd303..a08719875d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -353,29 +353,20 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } } - try + // Create header attributes list + CommonContentHeaderProperties headerProperties = + (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; + String mimeType = null, encoding = null; + if (headerProperties != null) { - // Create header attributes list - CommonContentHeaderProperties headerProperties = - (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) - { - AMQShortString mimeTypeShortSting = headerProperties.getContentType(); - mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); - encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); - } + AMQShortString mimeTypeShortSting = headerProperties.getContentType(); + mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); + encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); + } - Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; - return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); - } - catch (AMQException e) - { - JMException jme = new JMException("Error creating header attributes list: " + e); - jme.initCause(e); - throw jme; - } + return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); } /** @@ -392,27 +383,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que List<QueueEntry> list = _queue.getMessagesOnTheQueue(); TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); - try + // Create the tabular list of message header contents + for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { - // Create the tabular list of message header contents - for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) - { - QueueEntry queueEntry = list.get(i - 1); - AMQMessage msg = queueEntry.getMessage(); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, - queueEntry.isRedelivered() }; - CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); - _messageList.put(messageData); - } - } - catch (AMQException e) - { - JMException jme = new JMException("Error creating message contents: " + e); - jme.initCause(e); - throw jme; + QueueEntry queueEntry = list.get(i - 1); + AMQMessage msg = queueEntry.getMessage(); + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + String[] headerAttributes = getMessageHeaderProperties(headerBody); + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, + queueEntry.isRedelivered() }; + CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); + _messageList.put(messageData); } return _messageList; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java deleted file mode 100644 index 1092f67d94..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.LinkedList; -import java.util.List; -import java.util.Collections; -import java.util.ArrayList; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.store.StoreContext; - -/** - */ -public class InMemoryMessageHandle implements AMQMessageHandle -{ - - private ContentHeaderBody _contentHeaderBody; - - private MessagePublishInfo _messagePublishInfo; - - private List<ContentChunk> _contentBodies; - - private boolean _redelivered; - - private long _arrivalTime; - - private final Long _messageId; - - public InMemoryMessageHandle(final Long messageId) - { - _messageId = messageId; - } - - public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException - { - return _contentHeaderBody; - } - - public Long getMessageId() - { - return _messageId; - } - - public int getBodyCount(StoreContext context) - { - return _contentBodies.size(); - } - - public long getBodySize(StoreContext context) throws AMQException - { - return getContentHeaderBody(context).bodySize; - } - - public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException - { - if(_contentBodies == null) - { - throw new RuntimeException("No ContentBody has been set"); - } - - if (index > _contentBodies.size() - 1 || index < 0) - { - throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + - (_contentBodies.size() - 1)); - } - return _contentBodies.get(index); - } - - public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody) - throws AMQException - { - if(_contentBodies == null) - { - if(isLastContentBody) - { - _contentBodies = Collections.singletonList(contentBody); - } - else - { - _contentBodies = new ArrayList<ContentChunk>(); - _contentBodies.add(contentBody); - } - } - else - { - _contentBodies.add(contentBody); - } - } - - public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException - { - return _messagePublishInfo; - } - - public boolean isPersistent() - { - return false; - } - - /** - * This is called when all the content has been received. - * @param messagePublishInfo - * @param contentHeaderBody - * @throws AMQException - */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, - ContentHeaderBody contentHeaderBody) - throws AMQException - { - _messagePublishInfo = messagePublishInfo; - _contentHeaderBody = contentHeaderBody; - if(contentHeaderBody.bodySize == 0) - { - _contentBodies = Collections.EMPTY_LIST; - } - _arrivalTime = System.currentTimeMillis(); - } - - public void removeMessage(StoreContext storeContext) throws AMQException - { - // NO OP - } - - public long getArrivalTime() - { - return _arrivalTime; - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index b994040131..aad99da6c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -35,7 +35,6 @@ import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import java.util.ArrayList; -import java.util.Collection; public class IncomingMessage implements Filterable<RuntimeException> { @@ -48,7 +47,7 @@ public class IncomingMessage implements Filterable<RuntimeException> private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; - private AMQMessageHandle _messageHandle; + private AMQMessage _message; private final Long _messageId; private final TransactionalContext _txnContext; @@ -74,7 +73,6 @@ public class IncomingMessage implements Filterable<RuntimeException> private Exchange _exchange; - public IncomingMessage(final Long messageId, final MessagePublishInfo info, final TransactionalContext txnContext, @@ -124,11 +122,11 @@ public class IncomingMessage implements Filterable<RuntimeException> } public void routingComplete(final MessageStore store, - final MessageHandleFactory factory) throws AMQException + final MessageFactory factory) throws AMQException { final boolean persistent = isPersistent(); - _messageHandle = factory.createMessageHandle(_messageId, store, persistent); + _message = factory.createMessage(_messageId, store, persistent); if (persistent) { _txnContext.beginTranIfNecessary(); @@ -157,21 +155,16 @@ public class IncomingMessage implements Filterable<RuntimeException> _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues); } - AMQMessage message = null; - try { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks - _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), - _messagePublishInfo, getContentHeaderBody()); + _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody()); + - - - message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo); - message.setExpiration(_expiration); - message.setClientIdentifier(_publisher.getSessionIdentifier()); + _message.setExpiration(_expiration); + _message.setClientIdentifier(_publisher.getSessionIdentifier()); // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery @@ -182,7 +175,7 @@ public class IncomingMessage implements Filterable<RuntimeException> if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString())) { - throw new UnauthorizedAccessException("Acccess Refused",message); + throw new UnauthorizedAccessException("Acccess Refused", _message); } if ((_destinationQueues == null) || _destinationQueues.size() == 0) @@ -190,26 +183,26 @@ public class IncomingMessage implements Filterable<RuntimeException> if (isMandatory() || isImmediate()) { - throw new NoRouteException("No Route for message", message); + throw new NoRouteException("No Route for message", _message); } else { - _logger.warn("MESSAGE DISCARDED: No routes for message - " + message); + _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message); } } else { int offset; final int queueCount = _destinationQueues.size(); - message.incrementReference(queueCount); + _message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; } else { - offset = ((int)(message.getMessageId().longValue())) % queueCount; + offset = ((int)(_message.getMessageId().longValue())) % queueCount; if(offset < 0) { offset = -offset; @@ -218,22 +211,21 @@ public class IncomingMessage implements Filterable<RuntimeException> for (int i = offset; i < queueCount; i++) { // normal deliver so add this message at the end. - _txnContext.deliver(_destinationQueues.get(i), message); + _txnContext.deliver(_destinationQueues.get(i), _message); } for (int i = 0; i < offset; i++) { // normal deliver so add this message at the end. - _txnContext.deliver(_destinationQueues.get(i), message); + _txnContext.deliver(_destinationQueues.get(i), _message); } } - message.clearStoreContext(); - return message; + return _message; } finally { // Remove refence for routing process . Reference count should now == delivered queue count - if(message != null) message.decrementReference(_txnContext.getStoreContext()); + if(_message != null) _message.decrementReference(_txnContext.getStoreContext()); } } @@ -244,7 +236,7 @@ public class IncomingMessage implements Filterable<RuntimeException> _bodyLengthReceived += contentChunk.getSize(); - _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived()); + _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java index bdb0707c27..e18834874f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java @@ -20,18 +20,22 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -public class MockAMQMessageHandle extends InMemoryMessageHandle +public class MessageFactory { - public MockAMQMessageHandle(final Long messageId) - { - super(messageId); - } - @Override - public long getBodySize(StoreContext store) + public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent) { - return 0l; + if (persistent) + { + return new PersistentAMQMessage(messageId, store); + } + else + { + return new TransientAMQMessage(messageId); + } } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6f9efd3200..e33b0c83c7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -47,20 +47,13 @@ public enum NotificationCheck if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
-
+ long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
+ maximumMessageSize + ") breached. [Message ID=" +
+ (msg == null ? "null" : msg.getMessageId()) + "]");
return true;
}
}
@@ -110,7 +103,7 @@ public enum NotificationCheck }
}
return false;
-
+
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java new file mode 100644 index 0000000000..04e3635f92 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; + +public class PersistentAMQMessage extends TransientAMQMessage +{ + protected MessageStore _messageStore; + + public PersistentAMQMessage(Long messageId, MessageStore store) + { + super(messageId); + _messageStore = store; + } + + @Override + public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) + throws AMQException + { + super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody); + _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1, + contentChunk, isLastContentBody); + } + + @Override + public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, + ContentHeaderBody contentHeaderBody) + throws AMQException + { + super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody); + MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime); + + _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); + } + + @Override + public void removeMessage(StoreContext storeContext) throws AMQException + { + _messageStore.removeMessage(storeContext, _messageId); + } + + @Override + public boolean isPersistent() + { + return true; + } + + public void recoverFromMessageMetaData(MessageMetaData mmd) + { + _arrivalTime = mmd.getArrivalTime(); + _contentHeaderBody = mmd.getContentHeaderBody(); + _messagePublishInfo = mmd.getMessagePublishInfo(); + } + + public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException + { + super.addContentBodyFrame(null, contentChunk, isLastContentBody); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index fd46a8a5ff..7be2827e0f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -54,25 +54,16 @@ public class PriorityQueueList implements QueueEntryList public QueueEntry add(AMQMessage message) { - try + int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; + if(index >= _priorities) { - int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; - if(index >= _priorities) - { - index = _priorities-1; - } - else if(index < 0) - { - index = 0; - } - return _priorityLists[index].add(message); + index = _priorities-1; } - catch (AMQException e) + else if(index < 0) { - // TODO - fix AMQ Exception - throw new RuntimeException(e); + index = 0; } - + return _priorityLists[index].add(message); } public QueueEntry next(QueueEntry node) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index fe9686e906..ba14be5580 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -132,7 +132,7 @@ public class QueueEntryImpl implements QueueEntry public boolean expired() throws AMQException { - return getMessage().expired(getQueue()); + return getMessage().expired(); } public boolean isAcquired() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java new file mode 100644 index 0000000000..8c62e046f8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -0,0 +1,469 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.StoreContext; + +import java.util.Iterator; +import java.util.List; +import java.util.Collections; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A deliverable message. + */ +public class TransientAMQMessage implements AMQMessage +{ + /** Used for debugging purposes. */ + private static final Logger _log = Logger.getLogger(AMQMessage.class); + + private final AtomicInteger _referenceCount = new AtomicInteger(1); + + protected ContentHeaderBody _contentHeaderBody; + + protected MessagePublishInfo _messagePublishInfo; + + protected List<ContentChunk> _contentBodies; + + protected long _arrivalTime; + + protected final Long _messageId; + + + + /** Flag to indicate that this message requires 'immediate' delivery. */ + + private static final byte IMMEDIATE = 0x01; + + /** + * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality + * for messages published with the 'immediate' flag. + */ + + private static final byte DELIVERED_TO_CONSUMER = 0x02; + + private byte _flags = 0; + + private long _expiration; + + private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; + private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + + /** + * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory + * therefore is memory-efficient. + */ + private class BodyFrameIterator implements Iterator<AMQDataBlock> + { + private int _channel; + + private int _index = -1; + private AMQProtocolSession _protocolSession; + + private BodyFrameIterator(AMQProtocolSession protocolSession, int channel) + { + _channel = channel; + _protocolSession = protocolSession; + } + + public boolean hasNext() + { + return _index < (getBodyCount() - 1); + } + + public AMQDataBlock next() + { + AMQBody cb = + getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index)); + + return new AMQFrame(_channel, cb); + } + + private ProtocolVersionMethodConverter getProtocolVersionMethodConverter() + { + return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + private class BodyContentIterator implements Iterator<ContentChunk> + { + + private int _index = -1; + + public boolean hasNext() + { + return _index < (getBodyCount() - 1); + } + + public ContentChunk next() + { + return getContentChunk(++_index); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + /** + * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message + * These all need refactoring to some sort of MockAMQMessageFactory. + */ + @Deprecated + protected TransientAMQMessage(AMQMessage message) throws AMQException + { + _messageId = message.getMessageId(); + _flags = ((TransientAMQMessage)message)._flags; + _contentHeaderBody = message.getContentHeaderBody(); + _messagePublishInfo = message.getMessagePublishInfo(); + } + + + /** + * Normal message creation via the MessageFactory uses this constructor + * Package scope limited as MessageFactory should be used + * @see MessageFactory + * + * @param messageId + */ + TransientAMQMessage(Long messageId) + { + _messageId = messageId; + } + + public String debugIdentity() + { + return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")"; + } + + public void setExpiration(final long expiration) + { + _expiration = expiration; + } + + public boolean isReferenced() + { + return _referenceCount.get() > 0; + } + + public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) + { + return new BodyFrameIterator(protocolSession, channel); + } + + public Iterator<ContentChunk> getContentBodyIterator() + { + return new BodyContentIterator(); + } + + + public ContentHeaderBody getContentHeaderBody() + { + return _contentHeaderBody; + } + + public Long getMessageId() + { + return _messageId; + } + + /** + * Creates a long-lived reference to this message, and increments the count of such references, as an atomic + * operation. + */ + public AMQMessage takeReference() + { + incrementReference(); // _referenceCount.incrementAndGet(); + + return this; + } + + public boolean incrementReference() + { + return incrementReference(1); + } + + /* Threadsafe. Increment the reference count on the message. */ + public boolean incrementReference(int count) + { + if(_referenceCount.addAndGet(count) <= 1) + { + _referenceCount.addAndGet(-count); + return false; + } + else + { + return true; + } + + } + + /** + * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the + * message store. + * + * @param storeContext + * + * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed + */ + public void decrementReference(StoreContext storeContext) throws MessageCleanupException + { + + int count = _referenceCount.decrementAndGet(); + + // note that the operation of decrementing the reference count and then removing the message does not + // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after + // the message has been passed to all queues. i.e. we are + // not relying on the all the increments having taken place before the delivery manager decrements. + if (count == 0) + { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _referenceCount.set(Integer.MIN_VALUE/2); + + try + { + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op + removeMessage(storeContext); + } + catch (AMQException e) + { + // to maintain consistency, we revert the count + incrementReference(); + throw new MessageCleanupException(getMessageId(), e); + } + } + else + { + if (count < 0) + { + throw new MessageCleanupException("Reference count for message id " + debugIdentity() + + " has gone below 0."); + } + } + } + + + /** + * Called selectors to determin if the message has already been sent + * + * @return _deliveredToConsumer + */ + public boolean getDeliveredToConsumer() + { + return (_flags & DELIVERED_TO_CONSUMER) != 0; + } + + /** + * Called to enforce the 'immediate' flag. + * + * @returns true if the message is marked for immediate delivery but has not been marked as delivered + * to a consumer + */ + public boolean immediateAndNotDelivered() + { + + return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; + + } + + /** + * Checks to see if the message has expired. If it has the message is dequeued. + * + * @return true if the message has expire + * + * @throws AMQException + */ + public boolean expired() throws AMQException + { + + if (_expiration != 0L) + { + long now = System.currentTimeMillis(); + + return (now > _expiration); + } + + return false; + } + + /** + * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). + * And for selector efficiency. + */ + public void setDeliveredToConsumer() + { + _flags |= DELIVERED_TO_CONSUMER; + } + + + public long getSize() + { + return _contentHeaderBody.bodySize; + } + + public Object getPublisherClientInstance() + { + return _sessionIdentifier.getSessionInstance(); + } + + public Object getPublisherIdentifier() + { + return _sessionIdentifier.getSessionIdentifier(); + } + + public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier) + { + _sessionIdentifier = sessionIdentifier; + } + + /** From AMQMessageHandle **/ + + public int getBodyCount() + { + return _contentBodies.size(); + } + + public ContentChunk getContentChunk(int index) + { + if(_contentBodies == null) + { + throw new RuntimeException("No ContentBody has been set"); + } + + if (index > _contentBodies.size() - 1 || index < 0) + { + throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + + (_contentBodies.size() - 1)); + } + return _contentBodies.get(index); + } + + public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) + throws AMQException + { + if(_contentBodies == null) + { + if(isLastContentBody) + { + _contentBodies = Collections.singletonList(contentChunk); + } + else + { + _contentBodies = new ArrayList<ContentChunk>(); + _contentBodies.add(contentChunk); + } + } + else + { + _contentBodies.add(contentChunk); + } + } + + public MessagePublishInfo getMessagePublishInfo() + { + return _messagePublishInfo; + } + + public boolean isPersistent() + { + return false; + } + + /** + * This is called when all the content has been received. + * @param storeContext + *@param messagePublishInfo + * @param contentHeaderBody @throws AMQException + */ + public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, + ContentHeaderBody contentHeaderBody) + throws AMQException + { + + if (contentHeaderBody == null) + { + throw new NullPointerException("HeaderBody cannot be null"); + } + + if( messagePublishInfo == null) + { + throw new NullPointerException("PublishInfo cannot be null"); + } + + _messagePublishInfo = messagePublishInfo; + _contentHeaderBody = contentHeaderBody; + + + if( contentHeaderBody.bodySize == 0) + { + _contentBodies = Collections.EMPTY_LIST; + } + + _arrivalTime = System.currentTimeMillis(); + + if(messagePublishInfo.isImmediate()) + { + _flags |= IMMEDIATE; + } + } + + public long getArrivalTime() + { + return _arrivalTime; + } + + public void removeMessage(StoreContext storeContext) throws AMQException + { + //no-op + } + + + public String toString() + { + // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + + // _taken + " by :" + _takenBySubcription; + + return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java deleted file mode 100644 index 804d2c2131..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; - -/** - * @author Robert Greig (robert.j.greig@jpmorgan.com) - */ -public class WeakReferenceMessageHandle implements AMQMessageHandle -{ - private WeakReference<ContentHeaderBody> _contentHeaderBody; - - private WeakReference<MessagePublishInfo> _messagePublishInfo; - - private List<WeakReference<ContentChunk>> _contentBodies; - - private boolean _redelivered; - - private final MessageStore _messageStore; - - private final Long _messageId; - private long _arrivalTime; - - public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore) - { - _messageId = messageId; - _messageStore = messageStore; - } - - public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException - { - ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null); - if (chb == null) - { - MessageMetaData mmd = loadMessageMetaData(context); - chb = mmd.getContentHeaderBody(); - } - return chb; - } - - public Long getMessageId() - { - return _messageId; - } - - private MessageMetaData loadMessageMetaData(StoreContext context) - throws AMQException - { - MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId); - populateFromMessageMetaData(mmd); - return mmd; - } - - private void populateFromMessageMetaData(MessageMetaData mmd) - { - _arrivalTime = mmd.getArrivalTime(); - _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody()); - _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo()); - } - - public int getBodyCount(StoreContext context) throws AMQException - { - if (_contentBodies == null) - { - MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId); - int chunkCount = mmd.getContentChunkCount(); - _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount); - for (int i = 0; i < chunkCount; i++) - { - _contentBodies.add(new WeakReference<ContentChunk>(null)); - } - } - return _contentBodies.size(); - } - - public long getBodySize(StoreContext context) throws AMQException - { - return getContentHeaderBody(context).bodySize; - } - - public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException - { - if(_contentBodies == null) - { - throw new RuntimeException("No ContentBody has been set"); - } - - if (index > _contentBodies.size() - 1 || index < 0) - { - throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + - (_contentBodies.size() - 1)); - } - WeakReference<ContentChunk> wr = _contentBodies.get(index); - ContentChunk cb = wr.get(); - if (cb == null) - { - cb = _messageStore.getContentBodyChunk(context, _messageId, index); - _contentBodies.set(index, new WeakReference<ContentChunk>(cb)); - } - return cb; - } - - /** - * Content bodies are set <i>before</i> the publish and header frames - * - * @param storeContext - * @param contentChunk - * @param isLastContentBody - * @throws AMQException - */ - public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException - { - if (_contentBodies == null && isLastContentBody) - { - _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1); - } - else - { - if (_contentBodies == null) - { - _contentBodies = new LinkedList<WeakReference<ContentChunk>>(); - } - } - _contentBodies.add(new WeakReference<ContentChunk>(contentChunk)); - _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1, - contentChunk, isLastContentBody); - } - - public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException - { - MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null); - if (bpb == null) - { - MessageMetaData mmd = loadMessageMetaData(context); - - bpb = mmd.getMessagePublishInfo(); - } - return bpb; - } - - public boolean isRedelivered() - { - return _redelivered; - } - - public void setRedelivered(boolean redelivered) - { - _redelivered = redelivered; - } - - public boolean isPersistent() - { - return true; - } - - /** - * This is called when all the content has been received. - * - * @param publishBody - * @param contentHeaderBody - * @throws AMQException - */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo publishBody, - ContentHeaderBody contentHeaderBody) - throws AMQException - { - // if there are no content bodies the list will be null so we must - // create en empty list here - if (contentHeaderBody.bodySize == 0) - { - _contentBodies = new LinkedList<WeakReference<ContentChunk>>(); - } - - final long arrivalTime = System.currentTimeMillis(); - - MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), arrivalTime); - - _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); - - - populateFromMessageMetaData(mmd); - } - - public void removeMessage(StoreContext storeContext) throws AMQException - { - _messageStore.removeMessage(storeContext, _messageId); - } - - public long getArrivalTime() - { - return _arrivalTime; - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index f23983641b..9de2d09b8e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -27,8 +27,8 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -93,7 +93,7 @@ public class DerbyMessageStore implements MessageStore private String _connectionURL; - + MessageFactory _messageFactory; private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; @@ -167,6 +167,8 @@ public class DerbyMessageStore implements MessageStore // this recovers durable queues and persistent messages + _messageFactory = new MessageFactory(); + recover(); stateTransition(State.RECOVERING, State.STARTED); @@ -1299,7 +1301,7 @@ public class DerbyMessageStore implements MessageStore private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues) throws SQLException, AMQException { - Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>(); + Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>(); List<ProcessAction> actions = new ArrayList<ProcessAction>(); Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>(); @@ -1318,8 +1320,6 @@ public class DerbyMessageStore implements MessageStore conn = newConnection(); } - - MessageHandleFactory messageHandleFactory = new MessageHandleFactory(); long maxId = 1; TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null); @@ -1355,7 +1355,11 @@ public class DerbyMessageStore implements MessageStore } else { - message = new AMQMessage(messageId, this, messageHandleFactory, txnContext); + message = _messageFactory.createMessage(messageId, this, true); + + _logger.error("todo must do message recovery now."); + //todo must do message recovery now. + msgMap.put(messageId,message); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index b5a91c8da6..d46ba85069 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.QueueEntryImpl; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.tools.messagestore.MessageStoreTool; @@ -349,32 +348,15 @@ public class Show extends AbstractCommand arrival.add("" + msg.getArrivalTime()); - try - { - ispersitent.add(msg.isPersistent() ? "true" : "false"); - } - catch (AMQException e) - { - ispersitent.add("n/a"); - } + ispersitent.add(msg.isPersistent() ? "true" : "false"); isredelivered.add(entry.isRedelivered() ? "true" : "false"); isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); -// msg.getMessageHandle(); - BasicContentHeaderProperties headers = null; - try - { - headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties); - } - catch (AMQException e) - { - //ignore -// commandError("Unable to read properties for message: " + e.getMessage(), null); - } + headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties); if (headers != null) { @@ -414,15 +396,7 @@ public class Show extends AbstractCommand AMQShortString useridSS = headers.getUserId(); userid.add(useridSS == null ? "null" : useridSS.toString()); - MessagePublishInfo info = null; - try - { - info = msg.getMessagePublishInfo(); - } - catch (AMQException e) - { - //ignore - } + MessagePublishInfo info = msg.getMessagePublishInfo(); if (info != null) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 5fbf9484f7..2a97db6066 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -22,14 +22,13 @@ package org.apache.qpid.server; import junit.framework.TestCase; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.queue.MockQueueEntry; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleQueueEntryList; import org.apache.qpid.server.queue.MockAMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.QueueEntryIterator; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.MockSubscription; @@ -38,7 +37,6 @@ import org.apache.qpid.AMQException; import java.util.Map; import java.util.LinkedHashMap; import java.util.LinkedList; -import java.util.Iterator; /** * QPID-1385 : Race condition between added to unacked map and resending due to a rollback. @@ -62,7 +60,7 @@ public class ExtractResendAndRequeueTest extends TestCase UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; - private AMQQueue _queue = new MockAMQQueue(); + private AMQQueue _queue = new MockAMQQueue("ExtractResendAndRequeueTest"); private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>(); @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index a705c8bbb4..228c99dcbd 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -28,11 +28,11 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.AMQMessageHandle; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.TransientAMQMessage; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; @@ -113,6 +113,8 @@ public class TxAckTest extends TestCase private StoreContext _storeContext = new StoreContext(); private AMQQueue _queue; + private static final int MESSAGE_SIZE=100; + Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), @@ -128,7 +130,12 @@ public class TxAckTest extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + AMQMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + message.setPublishAndContentHeaderBody(_storeContext, info, header); + _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); } _acked = acked; @@ -190,16 +197,15 @@ public class TxAckTest extends TestCase } } - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody) { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); + final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, + null, + false); try { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() + // Safe to use null here as we just created a TransientMessage above + amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody() { public int getSize() { @@ -213,11 +219,11 @@ public class TxAckTest extends TestCase } - return amqMessageHandle; + return amqMessage; } - private class TestMessage extends AMQMessage + private class TestMessage extends TransientAMQMessage { private final long _tag; private int _count; @@ -225,7 +231,7 @@ public class TxAckTest extends TestCase TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) throws AMQException { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + super(createMessage(messageId, publishBody)); _tag = tag; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 883a712bef..e0a4357990 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; @@ -54,7 +55,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase private StoreContext _storeContext = new StoreContext(); - private MessageHandleFactory _handleFactory = new MessageHandleFactory(); + private MessageFactory _handleFactory = new MessageFactory(); private int count; @@ -370,7 +371,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase /** * Just add some extra utility methods to AMQMessage to aid testing. */ - static class Message extends AMQMessage + static class Message extends PersistentAMQMessage { private class TestIncomingMessage extends IncomingMessage { @@ -392,14 +393,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase public ContentHeaderBody getContentHeaderBody() { - try - { - return Message.this.getContentHeaderBody(); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } + return Message.this.getContentHeaderBody(); } } @@ -407,10 +401,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase private static MessageStore _messageStore = new SkeletonMessageStore(); - private static StoreContext _storeContext = new StoreContext(); - - - private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, + private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, new LinkedList<RequiredDeliveryException>() ); @@ -422,7 +413,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase Message(String id, FieldTable headers) throws AMQException { - this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); + this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers)); } public IncomingMessage getIncomingMessage() @@ -432,42 +423,35 @@ public class AbstractHeadersExchangeTestBase extends TestCase private Message(long messageId, MessagePublishInfo publish, - ContentHeaderBody header, - List<ContentBody> bodies) throws AMQException - { - super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish); - - - - _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); - _incoming.setContentHeaderBody(header); - - - } - - private static AMQMessageHandle createMessageHandle(final long messageId, - final MessagePublishInfo publish, - final ContentHeaderBody header) + ContentHeaderBody header) throws AMQException { - - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - _messageStore, - true); + super(messageId, _messageStore); try { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header); + setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header); } catch (AMQException e) { - + } - return amqMessageHandle; + + _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); + _incoming.setContentHeaderBody(header); } private Message(AMQMessage msg) throws AMQException { - super(msg); + super(msg.getMessageId(), _messageStore); + + this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody()); + + Iterator<ContentChunk> iterator = msg.getContentBodyIterator(); + + while(iterator.hasNext()) + { + this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext()); + } } @@ -500,15 +484,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase private Object getKey() { - try - { - return getMessagePublishInfo().getRoutingKey(); - } - catch (AMQException e) - { - _log.error("Error getting routing key: " + e, e); - return null; - } + return getMessagePublishInfo().getRoutingKey(); } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index d1a69c9d3c..ddf177690c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -497,7 +497,7 @@ public class DestWildExchangeTest extends TestCase throws AMQException { _exchange.route(message); - message.routingComplete(_store, new MessageHandleFactory()); + message.routingComplete(_store, new MessageFactory()); message.deliverToQueues(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index aff7af6952..ffe858f517 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -25,10 +25,12 @@ import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ContentHeaderBody; import junit.framework.AssertionFailedError; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { + private static final long MESSAGE_SIZE = 100L; @Override protected void setUp() throws Exception @@ -92,11 +94,18 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest protected AMQMessage createMessage(Long id, byte i) throws AMQException { - AMQMessage msg = super.createMessage(id); + AMQMessage message = super.createMessage(id); + + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + + //The createMessage above is for a Transient Message so it is safe to have no context. + message.setPublishAndContentHeaderBody(null, info, header); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); props.setPriority(i); - msg.getContentHeaderBody().properties = props; - return msg; + message.getContentHeaderBody().properties = props; + return message; } protected AMQMessage createMessage(Long id) throws AMQException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index fba30528ea..b159e2cda5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -136,6 +136,7 @@ public class AMQQueueAlertTest extends TestCase while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH) { sendMessages(1, MAX_MESSAGE_SIZE); + System.err.println(_queue.getQueueDepth() + ":" + MAX_QUEUE_DEPTH); } Notification lastNotification = _queueMBean.getLastNotification(); @@ -307,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); messages[i].enqueue(qs); - messages[i].routingComplete(_messageStore, new MessageHandleFactory()); + messages[i].routingComplete(_messageStore, new MessageFactory()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 38f030f670..a5e2da7b36 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -221,7 +221,7 @@ public class AMQQueueMBeanTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore, new MessageHandleFactory()); + msg.routingComplete(_messageStore, new MessageFactory()); msg.addContentBodyFrame(new ContentChunk() { @@ -305,7 +305,7 @@ public class AMQQueueMBeanTest extends TestCase currentMessage.enqueue(qs); // route header - currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); + currentMessage.routingComplete(_messageStore, new MessageFactory()); // Add the body so we have somthing to test later currentMessage.addContentBodyFrame( diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 01674c5b3d..cd1ee65c0c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -98,7 +98,7 @@ public class AckTest extends TestCase new LinkedList<RequiredDeliveryException>() ); _queue.registerSubscription(_subscription,false); - MessageHandleFactory factory = new MessageHandleFactory(); + MessageFactory factory = new MessageFactory(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java deleted file mode 100644 index cac84c01b4..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentHeaderProperties; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; - -public class InMemoryMessageHandleTest extends TestCase -{ - AMQMessageHandle _handle; - - protected AMQMessageHandle newHandle(Long id) - { - return new InMemoryMessageHandle(id); - } - - public void testMessageID() - { - Long id = 1L; - _handle = newHandle(id); - - assertEquals("Message not set value", id, _handle.getMessageId()); - } - - public void testInvalidContentChunk() - { - _handle = newHandle(1L); - - try - { - _handle.getContentChunk(null, 0); - fail("getContentChunk should not succeed"); - } - catch (RuntimeException e) - { - assertTrue(e.getMessage().equals("No ContentBody has been set")); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - ContentChunk cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, false); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - _handle.getContentChunk(null, -1); - fail("getContentChunk should not succeed"); - } - catch (IllegalArgumentException e) - { - assertTrue(e.getMessage().contains("out of valid range")); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - _handle.getContentChunk(null, 1); - fail("getContentChunk should not succeed"); - } - catch (IllegalArgumentException e) - { - assertTrue(e.getMessage().contains("out of valid range")); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - } - - public void testAddSingleContentChunk() - { - - _handle = newHandle(1L); - - ContentChunk cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, true); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect body count", 1, _handle.getBodyCount(null)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 0)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, true); - fail("Exception should prevent adding two final chunks"); - } - catch (UnsupportedOperationException e) - { - //normal path - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - } - - public void testAddMultipleContentChunk() - { - - _handle = newHandle(1L); - - ContentChunk cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, false); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect body count", 1, _handle.getBodyCount(null)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 0)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, true); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect body count", 2, _handle.getBodyCount(null)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 1)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - } - - // todo Move test to QueueEntry -// public void testRedelivered() -// { -// _handle = newHandle(1L); -// -// assertFalse("New message should not be redelivered", _handle.isRedelivered()); -// -// _handle.setRedelivered(true); -// -// assertTrue("New message should not be redelivered", _handle.isRedelivered()); -// } - - public void testInitialArrivalTime() - { - _handle = newHandle(1L); - - assertEquals("Initial Arrival time should be 0L", 0L, _handle.getArrivalTime()); - } - - public void testSetPublishAndContentHeaderBody_WithBody() - { - _handle = newHandle(1L); - - MessagePublishInfo mpi = new MessagePublishInfoImpl(); - int bodySize = 100; - - ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize); - - try - { - _handle.setPublishAndContentHeaderBody(null, mpi, chb); - - assertEquals("BodySize not returned correctly. ", bodySize, _handle.getBodySize(null)); - } - catch (AMQException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - public void testSetPublishAndContentHeaderBody_Empty() - { - _handle = newHandle(1L); - - MessagePublishInfo mpi = new MessagePublishInfoImpl(); - int bodySize = 0; - - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - - props.setAppId("HandleTest"); - - ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); - - try - { - _handle.setPublishAndContentHeaderBody(null, mpi, chb); - - assertEquals("BodySize not returned correctly. ", bodySize, _handle.getBodySize(null)); - - ContentHeaderBody retreived_chb = _handle.getContentHeaderBody(null); - - ContentHeaderProperties chp = retreived_chb.properties; - - assertEquals("ContentHeaderBody not correct", chb, retreived_chb); - - assertEquals("AppID not correctly retreived", "HandleTest", - ((BasicContentHeaderProperties) chp).getAppIdAsString()); - - MessagePublishInfo retreived_mpi = _handle.getMessagePublishInfo(null); - - assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi); - - - } - catch (AMQException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - public void testIsPersistent() - { - _handle = newHandle(1L); - - assertFalse(_handle.isPersistent()); - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java new file mode 100644 index 0000000000..582e2bfb00 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; + +public class MessageFactoryTest extends TestCase +{ + private MessageFactory _factory; + + public void setUp() + { + _factory = new MessageFactory(); + } + + public void testTransientMessageCreation() + { + AMQMessage message = _factory.createMessage(0L, null, false); + + assertEquals("Transient Message creation does not return correct class.", TransientAMQMessage.class, message.getClass()); + } + + public void testPersistentMessageCreation() + { + AMQMessage message = _factory.createMessage(0L, null, true); + + assertEquals("Transient Message creation does not return correct class.", PersistentAMQMessage.class, message.getClass()); + } + +}
\ No newline at end of file diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index a05eb0892b..cc6c486e11 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -22,23 +22,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -public class MockAMQMessage extends AMQMessage +public class MockAMQMessage extends TransientAMQMessage { public MockAMQMessage(long messageId) throws AMQException { - super(new MockAMQMessageHandle(messageId) , - (StoreContext)null, - (MessagePublishInfo)new MessagePublishInfoImpl()); - } - - protected MockAMQMessage(AMQMessage msg) - throws AMQException - { - super(msg); + super(messageId); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 758c8ddb2e..5f1cc81772 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -27,19 +27,16 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; import java.util.List; import java.util.Set; -import java.util.Map; -import java.util.HashMap; -import java.util.LinkedList; public class MockAMQQueue implements AMQQueue { private boolean _deleted = false; + private int _queueCount; private AMQShortString _name; public MockAMQQueue(String name) @@ -47,11 +44,6 @@ public class MockAMQQueue implements AMQQueue _name = new AMQShortString(name); } - public MockAMQQueue() - { - - } - public AMQShortString getName() { return _name; @@ -134,7 +126,7 @@ public class MockAMQQueue implements AMQQueue public long getQueueDepth() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return _queueCount; } public long getReceivedMessageCount() @@ -159,6 +151,7 @@ public class MockAMQQueue implements AMQQueue public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException { + _queueCount++; return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -169,7 +162,7 @@ public class MockAMQQueue implements AMQQueue public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { - //To change body of implemented methods use File | Settings | File Templates. + _queueCount--; } public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java index ee85fecfa3..8a9d1ae771 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java @@ -20,14 +20,32 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.FixedSizeByteBufferAllocator; +import org.apache.qpid.framing.abstraction.ContentChunk; public class MockContentChunk implements ContentChunk { + public static final int DEFAULT_SIZE=0; + private ByteBuffer _bytebuffer; private int _size; + + + public MockContentChunk() + { + this(0); + } + + public MockContentChunk(int size) + { + FixedSizeByteBufferAllocator allocator = new FixedSizeByteBufferAllocator(); + _bytebuffer = allocator.allocate(size, false); + + _size = size; + } + public MockContentChunk(ByteBuffer bytebuffer, int size) { _bytebuffer = bytebuffer; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index c6e7e2ebe2..e213be7560 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -21,8 +21,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; -public class WeakMessageHandleTest extends InMemoryMessageHandleTest +public class PersistentMessageTest extends TransientMessageTest { private MemoryMessageStore _messageStore; @@ -30,19 +31,20 @@ public class WeakMessageHandleTest extends InMemoryMessageHandleTest { _messageStore = new MemoryMessageStore(); _messageStore.configure(); + _storeContext = new StoreContext(); } - protected AMQMessageHandle newHandle(Long id) + @Override + protected AMQMessage newMessage(Long id) { - return new WeakReferenceMessageHandle(id, _messageStore); + return new MessageFactory().createMessage(id, _messageStore, true); } @Override public void testIsPersistent() { - _handle = newHandle(1L); - assertTrue(_handle.isPersistent()); + _message = newMessage(1L); + assertTrue(_message.isPersistent()); } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java index 7573a629c1..f7cd860c22 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java @@ -20,30 +20,30 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.MessageStore; +import junit.framework.TestCase; -/** - * Constructs a message handle based on the publish body, the content header and the queue to which the message - * has been routed. - * - * @author Robert Greig (robert.j.greig@jpmorgan.com) - */ -public class MessageHandleFactory +public class QueueEntryImplTest extends TestCase { - public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent) + /** + * Test the Redelivered state of a QueueEntryImpl + */ + public void testRedelivered() { - // just hardcoded for now - if (persistent) - { - return new WeakReferenceMessageHandle(messageId, store); - } - else - { - return new InMemoryMessageHandle(messageId); - } - -// return new AMQMessage(messageId, store, persistent); + QueueEntry entry = new QueueEntryImpl(null, null); + + assertFalse("New message should not be redelivered", entry.isRedelivered()); + + entry.setRedelivered(true); + + assertTrue("New message should not be redelivered", entry.isRedelivered()); + + //Check we can revert it.. not that we ever should. + entry.setRedelivered(false); + + assertFalse("New message should not be redelivered", entry.isRedelivered()); + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 500655c07c..2dcb081739 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -56,7 +56,8 @@ public class SimpleAMQQueueTest extends TestCase protected FieldTable _arguments = null; MessagePublishInfo info = new MessagePublishInfoImpl(); - + private static final long MESSAGE_SIZE = 100; + @Override protected void setUp() throws Exception { @@ -317,7 +318,7 @@ public class SimpleAMQQueueTest extends TestCase // Send persistent message qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_store, new MessageHandleFactory()); + msg.routingComplete(_store, new MessageFactory()); _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); // Check that it is enqueued @@ -326,9 +327,14 @@ public class SimpleAMQQueueTest extends TestCase // Dequeue message MockQueueEntry entry = new MockQueueEntry(); - AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); + AMQMessage message = new MessageFactory().createMessage(1L, _store, true); - entry.setMessage(amqmsg); + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + // This is a persist message but we are not in a transaction so create a new context for the message + message.setPublishAndContentHeaderBody(new StoreContext(), info, header); + + entry.setMessage(message); _queue.dequeue(null, entry); // Check that it is dequeued @@ -338,22 +344,19 @@ public class SimpleAMQQueueTest extends TestCase // FIXME: move this to somewhere useful - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody) { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); + final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false); try { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); + //Safe to use a null StoreContext as we have created a TransientMessage (see false param above) + amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); } catch (AMQException e) { @@ -361,18 +364,18 @@ public class SimpleAMQQueueTest extends TestCase } - return amqMessageHandle; + return amqMessage; } - public class TestMessage extends AMQMessage + public class TestMessage extends TransientAMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody) throws AMQException { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + super(createMessage(messageId, publishBody)); _tag = tag; } @@ -396,7 +399,8 @@ public class SimpleAMQQueueTest extends TestCase protected AMQMessage createMessage(Long id) throws AMQException { - AMQMessage messageA = new TestMessage(id, id, info, new StoreContext()); + + AMQMessage messageA = new TestMessage(id, id, info); return messageA; } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java new file mode 100644 index 0000000000..e37269526c --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java @@ -0,0 +1,467 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.store.StoreContext; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class TransientMessageTest extends TestCase +{ + AMQMessage _message; + StoreContext _storeContext = null; + + protected AMQMessage newMessage(Long id) + { + return new MessageFactory().createMessage(id, null, false); + } + + public void testMessageID() + { + Long id = 1L; + _message = newMessage(id); + + assertEquals("Message not set value", id, _message.getMessageId()); + } + + public void testInvalidContentChunk() + { + _message = newMessage(1L); + + try + { + _message.getContentChunk(0); + fail("getContentChunk should not succeed"); + } + catch (RuntimeException e) + { + assertTrue(e.getMessage().equals("No ContentBody has been set")); + } + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, false); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + try + { + _message.getContentChunk(-1); + fail("getContentChunk should not succeed"); + } + catch (IllegalArgumentException e) + { + assertTrue(e.getMessage().contains("out of valid range")); + } + + try + { + _message.getContentChunk(1); + fail("getContentChunk should not succeed"); + } + catch (IllegalArgumentException e) + { + assertTrue(e.getMessage().contains("out of valid range")); + } + } + + public void testAddSingleContentChunk() + { + + _message = newMessage(1L); + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 1, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0)); + + cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + fail("Exception should prevent adding two final chunks"); + } + catch (UnsupportedOperationException e) + { + //normal path + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + } + + public void testAddMultipleContentChunk() + { + + _message = newMessage(1L); + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, false); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 1, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0)); + + cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 2, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(1)); + + } + + public void testInitialArrivalTime() + { + _message = newMessage(1L); + + assertEquals("Initial Arrival time should be 0L", 0L, _message.getArrivalTime()); + } + + public void testSetPublishAndContentHeaderBody_WithBody() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 100; + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testSetPublishAndContentHeaderBody_Null() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, null); + fail("setPublishAndContentHeaderBody with null ContentHeaederBody did not throw NPE."); + } + catch (NullPointerException npe) + { + assertEquals("HeaderBody cannot be null", npe.getMessage()); + } + catch (AMQException e) + { + fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage()); + } + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, null, chb); + fail("setPublishAndContentHeaderBody with null MessagePublishInfo did not throw NPE."); + } + catch (NullPointerException npe) + { + assertEquals("PublishInfo cannot be null", npe.getMessage()); + } + catch (AMQException e) + { + fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage()); + } + } + + public void testSetPublishAndContentHeaderBody_Empty() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize()); + + ContentHeaderBody retreived_chb = _message.getContentHeaderBody(); + + ContentHeaderProperties chp = retreived_chb.properties; + + assertEquals("ContentHeaderBody not correct", chb, retreived_chb); + + assertEquals("AppID not correctly retreived", "HandleTest", + ((BasicContentHeaderProperties) chp).getAppIdAsString()); + + MessagePublishInfo retreived_mpi = _message.getMessagePublishInfo(); + + assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testIsPersistent() + { + _message = newMessage(1L); + + assertFalse(_message.isPersistent()); + } + + public void testImmediateAndNotDelivered() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered()); + + assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer()); + + _message.setDeliveredToConsumer(); + + assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer()); + + assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testNotImmediateAndNotDelivered() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered()); + + assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer()); + + _message.setDeliveredToConsumer(); + + assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer()); + + assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testExpiry() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + try + { + _message.setExpiration(System.currentTimeMillis() + 10L); + + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("New messages should not be expired.", _message.expired()); + + final long MILLIS =1000000L; + long waitTime = 20 * MILLIS; + + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + assertTrue("After a sleep messages should now be expired.", _message.expired()); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + + public void testNoExpiry() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + try + { + + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("New messages should not be expired.", _message.expired()); + + final long MILLIS =1000000L; + long waitTime = 10 * MILLIS; + + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + assertFalse("After a sleep messages without an expiry should not expire.", _message.expired()); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 12ed928e7f..b4ed1f8709 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -389,7 +389,7 @@ public class MessageStoreTest extends TestCase try { - currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory()); + currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageFactory()); } catch (AMQException e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 51820f72dd..9a9fe3644c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -26,9 +26,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.AMQMessageHandle; /** * Tests that reference counting works correctly with AMQMessage and the message store @@ -56,10 +55,9 @@ public class TestReferenceCounting extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); final long messageId = _store.getNewMessageId(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext,info); + + AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true); + message.setPublishAndContentHeaderBody(_storeContext, info, chb); message = message.takeReference(); @@ -88,18 +86,10 @@ public class TestReferenceCounting extends TestCase final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext, - info); - + AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true); + message.setPublishAndContentHeaderBody(_storeContext, info, chb); message = message.takeReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - assertEquals(1, _store.getMessageMetaDataMap().size()); message = message.takeReference(); |
