summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java39
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java436
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java151
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java)20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java84
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java469
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java223
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java32
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java32
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java72
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java311
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java48
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java)14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java)40
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java48
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java467
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java22
35 files changed, 1367 insertions, 1428 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5fde08cbdd..3b290b3d51 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -35,12 +35,12 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -108,7 +108,7 @@ public class AMQChannel
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
+ private MessageFactory _messageHandleFactory = new MessageFactory();
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 4949e5b41d..ea94f23ff9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -28,7 +28,6 @@ package org.apache.qpid.server.output.amqp0_8;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -37,8 +36,6 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import org.apache.mina.common.ByteBuffer;
-
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -79,11 +76,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -100,7 +93,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -112,7 +105,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -126,8 +119,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
@@ -135,7 +126,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -150,7 +141,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -162,7 +153,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -179,7 +170,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicDeliverBody deliverBody =
@@ -188,18 +178,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
-
-
- return deliverFrame;
+ return deliverBody.generateFrame(channelId);
}
private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
@@ -208,9 +194,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -231,9 +215,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
+
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 00a15d2d50..b71b118275 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -28,9 +28,7 @@ import java.util.Iterator;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -77,12 +75,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -99,7 +92,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
@@ -111,7 +104,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -123,9 +116,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
+ return ContentHeaderBody.createAMQFrame(channelId, contentHeaderBody);
}
@@ -133,15 +124,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -156,7 +145,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -168,7 +157,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -190,7 +179,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQBody returnBlock = new AMQBody()
+ return new AMQBody()
{
public AMQBody _underlyingBody;
@@ -238,7 +227,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
throw new AMQException("This block should never be dispatched!");
}
};
- return returnBlock;
}
private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
@@ -253,9 +241,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -276,9 +262,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d73d37f48d..2bd6e612f8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,363 +20,52 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-
+import org.apache.qpid.AMQException;
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
-/**
- * A deliverable message.
- */
-public class AMQMessage
+public interface AMQMessage
{
- /** Used for debugging purposes. */
- private static final Logger _log = Logger.getLogger(AMQMessage.class);
-
- private final AtomicInteger _referenceCount = new AtomicInteger(1);
-
- private final AMQMessageHandle _messageHandle;
-
- /** Holds the transactional context in which this message is being processed. */
- private StoreContext _storeContext;
-
- /** Flag to indicate that this message requires 'immediate' delivery. */
-
- private static final byte IMMEDIATE = 0x01;
-
- /**
- * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
- * for messages published with the 'immediate' flag.
- */
-
- private static final byte DELIVERED_TO_CONSUMER = 0x02;
-
- private byte _flags = 0;
-
- private long _expiration;
-
- private final long _size;
-
- private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
- private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
-
-
- /**
- * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
- * therefore is memory-efficient.
- */
- private class BodyFrameIterator implements Iterator<AMQDataBlock>
- {
- private int _channel;
-
- private int _index = -1;
- private AMQProtocolSession _protocolSession;
-
- private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- _channel = channel;
- _protocolSession = protocolSession;
- }
-
- public boolean hasNext()
- {
- try
- {
- return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
- }
- catch (AMQException e)
- {
- _log.error("Unable to get body count: " + e, e);
-
- return false;
- }
- }
-
- public AMQDataBlock next()
- {
- try
- {
-
- AMQBody cb =
- getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
- ++_index));
-
- return new AMQFrame(_channel, cb);
- }
- catch (AMQException e)
- {
- // have no choice but to throw a runtime exception
- throw new RuntimeException("Error getting content body: " + e, e);
- }
-
- }
-
- private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
- {
- return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- public void clearStoreContext()
- {
- _storeContext = new StoreContext();
- }
-
- public StoreContext getStoreContext()
- {
- return _storeContext;
- }
-
- private class BodyContentIterator implements Iterator<ContentChunk>
- {
-
- private int _index = -1;
-
- public boolean hasNext()
- {
- try
- {
- return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
- }
- catch (AMQException e)
- {
- _log.error("Error getting body count: " + e, e);
-
- return false;
- }
- }
-
- public ContentChunk next()
- {
- try
- {
- return _messageHandle.getContentChunk(getStoreContext(), ++_index);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Error getting content body: " + e, e);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
-
-
- /**
- * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
- * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
- * queues.
- *
- * @param messageId
- * @param store
- * @param factory
- *
- * @throws AMQException
- */
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
- throws AMQException
- {
- _messageHandle = factory.createMessageHandle(messageId, store, true);
- _storeContext = txnConext.getStoreContext();
- _size = _messageHandle.getBodySize(txnConext.getStoreContext());
- }
-
- /**
- * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
- * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
- * queues.
- *
- * @param messageHandle
- *
- * @throws AMQException
- */
- public AMQMessage(
- AMQMessageHandle messageHandle,
- StoreContext storeConext,
- MessagePublishInfo info)
- throws AMQException
- {
- _messageHandle = messageHandle;
- _storeContext = storeConext;
-
- if(info.isImmediate())
- {
- _flags |= IMMEDIATE;
- }
- _size = messageHandle.getBodySize(storeConext);
-
- }
+ //Get Content relating to this message
+ Long getMessageId();
- protected AMQMessage(AMQMessage msg) throws AMQException
- {
- _messageHandle = msg._messageHandle;
- _storeContext = msg._storeContext;
- _flags = msg._flags;
- _size = msg._size;
+ Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel);
- }
+ Iterator<ContentChunk> getContentBodyIterator();
+ ContentHeaderBody getContentHeaderBody();
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
- }
+ ContentChunk getContentChunk(int index);
- public void setExpiration(final long expiration)
- {
+ Object getPublisherClientInstance();
- _expiration = expiration;
+ Object getPublisherIdentifier();
- }
+ MessagePublishInfo getMessagePublishInfo();
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
+ int getBodyCount();
- public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- return new BodyFrameIterator(protocolSession, channel);
- }
+ long getSize();
- public Iterator<ContentChunk> getContentBodyIterator()
- {
- return new BodyContentIterator();
- }
+ long getArrivalTime();
- public ContentHeaderBody getContentHeaderBody() throws AMQException
- {
- return _messageHandle.getContentHeaderBody(getStoreContext());
- }
-
- public Long getMessageId()
- {
- return _messageHandle.getMessageId();
- }
-
- /**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
- if(_referenceCount.addAndGet(count) <= 1)
- {
- _referenceCount.addAndGet(-count);
- return false;
- }
- else
- {
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- * @param storeContext
- *
- * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
- {
-
- int count = _referenceCount.decrementAndGet();
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
-
- try
- {
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_messageHandle != null)
- {
- _messageHandle.removeMessage(storeContext);
- }
- }
- catch (AMQException e)
- {
- // to maintain consistency, we revert the count
- incrementReference();
- throw new MessageCleanupException(getMessageId(), e);
- }
- }
- else
- {
- if (count < 0)
- {
- throw new MessageCleanupException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
+ //Check the status of this message
/**
* Called selectors to determin if the message has already been sent
*
* @return _deliveredToConsumer
*/
- public boolean getDeliveredToConsumer()
- {
- return (_flags & DELIVERED_TO_CONSUMER) != 0;
- }
-
- public boolean isPersistent() throws AMQException
- {
- return _messageHandle.isPersistent();
- }
+ boolean getDeliveredToConsumer();
/**
* Called to enforce the 'immediate' flag.
@@ -384,89 +73,62 @@ public class AMQMessage
* @returns true if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public boolean immediateAndNotDelivered()
- {
-
- return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
- }
-
- public MessagePublishInfo getMessagePublishInfo() throws AMQException
- {
- return _messageHandle.getMessagePublishInfo(getStoreContext());
- }
-
- public long getArrivalTime()
- {
- return _messageHandle.getArrivalTime();
- }
+ boolean immediateAndNotDelivered();
/**
* Checks to see if the message has expired. If it has the message is dequeued.
*
- * @param queue The queue to check the expiration against. (Currently not used)
- *
* @return true if the message has expire
*
- * @throws AMQException
+ * @throws org.apache.qpid.AMQException
*/
- public boolean expired(AMQQueue queue) throws AMQException
- {
+ boolean expired() throws AMQException;
- if (_expiration != 0L)
- {
- long now = System.currentTimeMillis();
-
- return (now > _expiration);
- }
+ /** Is this a persistent message
+ *
+ * @return true if the message is persistent
+ */
+ boolean isPersistent();
- return false;
- }
/**
* Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
* And for selector efficiency.
*/
- public void setDeliveredToConsumer()
- {
- _flags |= DELIVERED_TO_CONSUMER;
- }
+ void setDeliveredToConsumer();
+
+ void setExpiration(long expiration);
+
+ void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier);
+
+ /**
+ * This is called when all the content has been received.
+ * @param storeContext
+ *@param messagePublishInfo
+ * @param contentHeaderBody @throws org.apache.qpid.AMQException
+ */
+ void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody)
+ throws AMQException;
+ void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException;
- public AMQMessageHandle getMessageHandle()
- {
- return _messageHandle;
- }
+ void removeMessage(StoreContext storeContext) throws AMQException;
- public long getSize()
- {
- return _size;
+ String toString();
- }
+ String debugIdentity();
- public Object getPublisherClientInstance()
- {
- return _sessionIdentifier.getSessionInstance();
- }
-
- public Object getPublisherIdentifier()
- {
- return _sessionIdentifier.getSessionIdentifier();
- }
+ // Reference counting methods
- public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
- {
- _sessionIdentifier = sessionIdentifier;
- }
+ void decrementReference(StoreContext storeContext) throws MessageCleanupException;
+ boolean incrementReference(int queueCount);
- public String toString()
- {
- // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- // _taken + " by :" + _takenBySubcription;
+ boolean incrementReference();
- return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
- }
+ AMQMessage takeReference();
+ boolean isReferenced();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index f5853bd303..a08719875d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -353,29 +353,20 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
}
- try
+ // Create header attributes list
+ CommonContentHeaderProperties headerProperties =
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = null, encoding = null;
+ if (headerProperties != null)
{
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
- {
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
- }
+ AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+ mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+ encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+ }
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
- catch (AMQException e)
- {
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
- }
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
/**
@@ -392,27 +383,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
List<QueueEntry> list = _queue.getMessagesOnTheQueue();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
- try
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
- // Create the tabular list of message header contents
- for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
- {
- QueueEntry queueEntry = list.get(i - 1);
- AMQMessage msg = queueEntry.getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
- queueEntry.isRedelivered() };
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
- }
- }
- catch (AMQException e)
- {
- JMException jme = new JMException("Error creating message contents: " + e);
- jme.initCause(e);
- throw jme;
+ QueueEntry queueEntry = list.get(i - 1);
+ AMQMessage msg = queueEntry.getMessage();
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
+ queueEntry.isRedelivered() };
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
+ _messageList.put(messageData);
}
return _messageList;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
deleted file mode 100644
index 1092f67d94..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- */
-public class InMemoryMessageHandle implements AMQMessageHandle
-{
-
- private ContentHeaderBody _contentHeaderBody;
-
- private MessagePublishInfo _messagePublishInfo;
-
- private List<ContentChunk> _contentBodies;
-
- private boolean _redelivered;
-
- private long _arrivalTime;
-
- private final Long _messageId;
-
- public InMemoryMessageHandle(final Long messageId)
- {
- _messageId = messageId;
- }
-
- public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
- {
- return _contentHeaderBody;
- }
-
- public Long getMessageId()
- {
- return _messageId;
- }
-
- public int getBodyCount(StoreContext context)
- {
- return _contentBodies.size();
- }
-
- public long getBodySize(StoreContext context) throws AMQException
- {
- return getContentHeaderBody(context).bodySize;
- }
-
- public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
- {
- if(_contentBodies == null)
- {
- throw new RuntimeException("No ContentBody has been set");
- }
-
- if (index > _contentBodies.size() - 1 || index < 0)
- {
- throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
- (_contentBodies.size() - 1));
- }
- return _contentBodies.get(index);
- }
-
- public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody)
- throws AMQException
- {
- if(_contentBodies == null)
- {
- if(isLastContentBody)
- {
- _contentBodies = Collections.singletonList(contentBody);
- }
- else
- {
- _contentBodies = new ArrayList<ContentChunk>();
- _contentBodies.add(contentBody);
- }
- }
- else
- {
- _contentBodies.add(contentBody);
- }
- }
-
- public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
- {
- return _messagePublishInfo;
- }
-
- public boolean isPersistent()
- {
- return false;
- }
-
- /**
- * This is called when all the content has been received.
- * @param messagePublishInfo
- * @param contentHeaderBody
- * @throws AMQException
- */
- public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
- ContentHeaderBody contentHeaderBody)
- throws AMQException
- {
- _messagePublishInfo = messagePublishInfo;
- _contentHeaderBody = contentHeaderBody;
- if(contentHeaderBody.bodySize == 0)
- {
- _contentBodies = Collections.EMPTY_LIST;
- }
- _arrivalTime = System.currentTimeMillis();
- }
-
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- // NO OP
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index b994040131..aad99da6c3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -35,7 +35,6 @@ import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
{
@@ -48,7 +47,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private AMQMessageHandle _messageHandle;
+ private AMQMessage _message;
private final Long _messageId;
private final TransactionalContext _txnContext;
@@ -74,7 +73,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
private Exchange _exchange;
-
public IncomingMessage(final Long messageId,
final MessagePublishInfo info,
final TransactionalContext txnContext,
@@ -124,11 +122,11 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
public void routingComplete(final MessageStore store,
- final MessageHandleFactory factory) throws AMQException
+ final MessageFactory factory) throws AMQException
{
final boolean persistent = isPersistent();
- _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
+ _message = factory.createMessage(_messageId, store, persistent);
if (persistent)
{
_txnContext.beginTranIfNecessary();
@@ -157,21 +155,16 @@ public class IncomingMessage implements Filterable<RuntimeException>
_logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
}
- AMQMessage message = null;
-
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
- _messagePublishInfo, getContentHeaderBody());
+ _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
+
-
-
- message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
- message.setExpiration(_expiration);
- message.setClientIdentifier(_publisher.getSessionIdentifier());
+ _message.setExpiration(_expiration);
+ _message.setClientIdentifier(_publisher.getSessionIdentifier());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -182,7 +175,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
{
- throw new UnauthorizedAccessException("Acccess Refused",message);
+ throw new UnauthorizedAccessException("Acccess Refused", _message);
}
if ((_destinationQueues == null) || _destinationQueues.size() == 0)
@@ -190,26 +183,26 @@ public class IncomingMessage implements Filterable<RuntimeException>
if (isMandatory() || isImmediate())
{
- throw new NoRouteException("No Route for message", message);
+ throw new NoRouteException("No Route for message", _message);
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + message);
+ _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message);
}
}
else
{
int offset;
final int queueCount = _destinationQueues.size();
- message.incrementReference(queueCount);
+ _message.incrementReference(queueCount);
if(queueCount == 1)
{
offset = 0;
}
else
{
- offset = ((int)(message.getMessageId().longValue())) % queueCount;
+ offset = ((int)(_message.getMessageId().longValue())) % queueCount;
if(offset < 0)
{
offset = -offset;
@@ -218,22 +211,21 @@ public class IncomingMessage implements Filterable<RuntimeException>
for (int i = offset; i < queueCount; i++)
{
// normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), message);
+ _txnContext.deliver(_destinationQueues.get(i), _message);
}
for (int i = 0; i < offset; i++)
{
// normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), message);
+ _txnContext.deliver(_destinationQueues.get(i), _message);
}
}
- message.clearStoreContext();
- return message;
+ return _message;
}
finally
{
// Remove refence for routing process . Reference count should now == delivered queue count
- if(message != null) message.decrementReference(_txnContext.getStoreContext());
+ if(_message != null) _message.decrementReference(_txnContext.getStoreContext());
}
}
@@ -244,7 +236,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
_bodyLengthReceived += contentChunk.getSize();
- _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
+ _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index bdb0707c27..e18834874f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -20,18 +20,22 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-public class MockAMQMessageHandle extends InMemoryMessageHandle
+public class MessageFactory
{
- public MockAMQMessageHandle(final Long messageId)
- {
- super(messageId);
- }
- @Override
- public long getBodySize(StoreContext store)
+ public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent)
{
- return 0l;
+ if (persistent)
+ {
+ return new PersistentAMQMessage(messageId, store);
+ }
+ else
+ {
+ return new TransientAMQMessage(messageId);
+ }
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index 6f9efd3200..e33b0c83c7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -47,20 +47,13 @@ public enum NotificationCheck
if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
-
+ long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
+ maximumMessageSize + ") breached. [Message ID=" +
+ (msg == null ? "null" : msg.getMessageId()) + "]");
return true;
}
}
@@ -110,7 +103,7 @@ public enum NotificationCheck
}
}
return false;
-
+
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
new file mode 100644
index 0000000000..04e3635f92
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+public class PersistentAMQMessage extends TransientAMQMessage
+{
+ protected MessageStore _messageStore;
+
+ public PersistentAMQMessage(Long messageId, MessageStore store)
+ {
+ super(messageId);
+ _messageStore = store;
+ }
+
+ @Override
+ public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException
+ {
+ super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody);
+ _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
+ contentChunk, isLastContentBody);
+ }
+
+ @Override
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
+ MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
+
+ _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+ }
+
+ @Override
+ public void removeMessage(StoreContext storeContext) throws AMQException
+ {
+ _messageStore.removeMessage(storeContext, _messageId);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public void recoverFromMessageMetaData(MessageMetaData mmd)
+ {
+ _arrivalTime = mmd.getArrivalTime();
+ _contentHeaderBody = mmd.getContentHeaderBody();
+ _messagePublishInfo = mmd.getMessagePublishInfo();
+ }
+
+ public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
+ {
+ super.addContentBodyFrame(null, contentChunk, isLastContentBody);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index fd46a8a5ff..7be2827e0f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -54,25 +54,16 @@ public class PriorityQueueList implements QueueEntryList
public QueueEntry add(AMQMessage message)
{
- try
+ int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+ if(index >= _priorities)
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if(index >= _priorities)
- {
- index = _priorities-1;
- }
- else if(index < 0)
- {
- index = 0;
- }
- return _priorityLists[index].add(message);
+ index = _priorities-1;
}
- catch (AMQException e)
+ else if(index < 0)
{
- // TODO - fix AMQ Exception
- throw new RuntimeException(e);
+ index = 0;
}
-
+ return _priorityLists[index].add(message);
}
public QueueEntry next(QueueEntry node)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index fe9686e906..ba14be5580 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -132,7 +132,7 @@ public class QueueEntryImpl implements QueueEntry
public boolean expired() throws AMQException
{
- return getMessage().expired(getQueue());
+ return getMessage().expired();
}
public boolean isAcquired()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
new file mode 100644
index 0000000000..8c62e046f8
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -0,0 +1,469 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A deliverable message.
+ */
+public class TransientAMQMessage implements AMQMessage
+{
+ /** Used for debugging purposes. */
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
+ private final AtomicInteger _referenceCount = new AtomicInteger(1);
+
+ protected ContentHeaderBody _contentHeaderBody;
+
+ protected MessagePublishInfo _messagePublishInfo;
+
+ protected List<ContentChunk> _contentBodies;
+
+ protected long _arrivalTime;
+
+ protected final Long _messageId;
+
+
+
+ /** Flag to indicate that this message requires 'immediate' delivery. */
+
+ private static final byte IMMEDIATE = 0x01;
+
+ /**
+ * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
+ */
+
+ private static final byte DELIVERED_TO_CONSUMER = 0x02;
+
+ private byte _flags = 0;
+
+ private long _expiration;
+
+ private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
+ private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+
+ /**
+ * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
+ * therefore is memory-efficient.
+ */
+ private class BodyFrameIterator implements Iterator<AMQDataBlock>
+ {
+ private int _channel;
+
+ private int _index = -1;
+ private AMQProtocolSession _protocolSession;
+
+ private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
+ {
+ _channel = channel;
+ _protocolSession = protocolSession;
+ }
+
+ public boolean hasNext()
+ {
+ return _index < (getBodyCount() - 1);
+ }
+
+ public AMQDataBlock next()
+ {
+ AMQBody cb =
+ getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index));
+
+ return new AMQFrame(_channel, cb);
+ }
+
+ private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class BodyContentIterator implements Iterator<ContentChunk>
+ {
+
+ private int _index = -1;
+
+ public boolean hasNext()
+ {
+ return _index < (getBodyCount() - 1);
+ }
+
+ public ContentChunk next()
+ {
+ return getContentChunk(++_index);
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
+ * These all need refactoring to some sort of MockAMQMessageFactory.
+ */
+ @Deprecated
+ protected TransientAMQMessage(AMQMessage message) throws AMQException
+ {
+ _messageId = message.getMessageId();
+ _flags = ((TransientAMQMessage)message)._flags;
+ _contentHeaderBody = message.getContentHeaderBody();
+ _messagePublishInfo = message.getMessagePublishInfo();
+ }
+
+
+ /**
+ * Normal message creation via the MessageFactory uses this constructor
+ * Package scope limited as MessageFactory should be used
+ * @see MessageFactory
+ *
+ * @param messageId
+ */
+ TransientAMQMessage(Long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public String debugIdentity()
+ {
+ return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
+ }
+
+ public void setExpiration(final long expiration)
+ {
+ _expiration = expiration;
+ }
+
+ public boolean isReferenced()
+ {
+ return _referenceCount.get() > 0;
+ }
+
+ public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
+ {
+ return new BodyFrameIterator(protocolSession, channel);
+ }
+
+ public Iterator<ContentChunk> getContentBodyIterator()
+ {
+ return new BodyContentIterator();
+ }
+
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public Long getMessageId()
+ {
+ return _messageId;
+ }
+
+ /**
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+ * operation.
+ */
+ public AMQMessage takeReference()
+ {
+ incrementReference(); // _referenceCount.incrementAndGet();
+
+ return this;
+ }
+
+ public boolean incrementReference()
+ {
+ return incrementReference(1);
+ }
+
+ /* Threadsafe. Increment the reference count on the message. */
+ public boolean incrementReference(int count)
+ {
+ if(_referenceCount.addAndGet(count) <= 1)
+ {
+ _referenceCount.addAndGet(-count);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ * @param storeContext
+ *
+ * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference(StoreContext storeContext) throws MessageCleanupException
+ {
+
+ int count = _referenceCount.decrementAndGet();
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
+ try
+ {
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
+ removeMessage(storeContext);
+ }
+ catch (AMQException e)
+ {
+ // to maintain consistency, we revert the count
+ incrementReference();
+ throw new MessageCleanupException(getMessageId(), e);
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity()
+ + " has gone below 0.");
+ }
+ }
+ }
+
+
+ /**
+ * Called selectors to determin if the message has already been sent
+ *
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return (_flags & DELIVERED_TO_CONSUMER) != 0;
+ }
+
+ /**
+ * Called to enforce the 'immediate' flag.
+ *
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
+ */
+ public boolean immediateAndNotDelivered()
+ {
+
+ return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
+
+ }
+
+ /**
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @return true if the message has expire
+ *
+ * @throws AMQException
+ */
+ public boolean expired() throws AMQException
+ {
+
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > _expiration);
+ }
+
+ return false;
+ }
+
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * And for selector efficiency.
+ */
+ public void setDeliveredToConsumer()
+ {
+ _flags |= DELIVERED_TO_CONSUMER;
+ }
+
+
+ public long getSize()
+ {
+ return _contentHeaderBody.bodySize;
+ }
+
+ public Object getPublisherClientInstance()
+ {
+ return _sessionIdentifier.getSessionInstance();
+ }
+
+ public Object getPublisherIdentifier()
+ {
+ return _sessionIdentifier.getSessionIdentifier();
+ }
+
+ public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
+ {
+ _sessionIdentifier = sessionIdentifier;
+ }
+
+ /** From AMQMessageHandle **/
+
+ public int getBodyCount()
+ {
+ return _contentBodies.size();
+ }
+
+ public ContentChunk getContentChunk(int index)
+ {
+ if(_contentBodies == null)
+ {
+ throw new RuntimeException("No ContentBody has been set");
+ }
+
+ if (index > _contentBodies.size() - 1 || index < 0)
+ {
+ throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+ (_contentBodies.size() - 1));
+ }
+ return _contentBodies.get(index);
+ }
+
+ public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException
+ {
+ if(_contentBodies == null)
+ {
+ if(isLastContentBody)
+ {
+ _contentBodies = Collections.singletonList(contentChunk);
+ }
+ else
+ {
+ _contentBodies = new ArrayList<ContentChunk>();
+ _contentBodies.add(contentChunk);
+ }
+ }
+ else
+ {
+ _contentBodies.add(contentChunk);
+ }
+ }
+
+ public MessagePublishInfo getMessagePublishInfo()
+ {
+ return _messagePublishInfo;
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ /**
+ * This is called when all the content has been received.
+ * @param storeContext
+ *@param messagePublishInfo
+ * @param contentHeaderBody @throws AMQException
+ */
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+
+ if (contentHeaderBody == null)
+ {
+ throw new NullPointerException("HeaderBody cannot be null");
+ }
+
+ if( messagePublishInfo == null)
+ {
+ throw new NullPointerException("PublishInfo cannot be null");
+ }
+
+ _messagePublishInfo = messagePublishInfo;
+ _contentHeaderBody = contentHeaderBody;
+
+
+ if( contentHeaderBody.bodySize == 0)
+ {
+ _contentBodies = Collections.EMPTY_LIST;
+ }
+
+ _arrivalTime = System.currentTimeMillis();
+
+ if(messagePublishInfo.isImmediate())
+ {
+ _flags |= IMMEDIATE;
+ }
+ }
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
+ public void removeMessage(StoreContext storeContext) throws AMQException
+ {
+ //no-op
+ }
+
+
+ public String toString()
+ {
+ // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ // _taken + " by :" + _takenBySubcription;
+
+ return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
deleted file mode 100644
index 804d2c2131..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
- */
-public class WeakReferenceMessageHandle implements AMQMessageHandle
-{
- private WeakReference<ContentHeaderBody> _contentHeaderBody;
-
- private WeakReference<MessagePublishInfo> _messagePublishInfo;
-
- private List<WeakReference<ContentChunk>> _contentBodies;
-
- private boolean _redelivered;
-
- private final MessageStore _messageStore;
-
- private final Long _messageId;
- private long _arrivalTime;
-
- public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore)
- {
- _messageId = messageId;
- _messageStore = messageStore;
- }
-
- public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
- {
- ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
- if (chb == null)
- {
- MessageMetaData mmd = loadMessageMetaData(context);
- chb = mmd.getContentHeaderBody();
- }
- return chb;
- }
-
- public Long getMessageId()
- {
- return _messageId;
- }
-
- private MessageMetaData loadMessageMetaData(StoreContext context)
- throws AMQException
- {
- MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
- populateFromMessageMetaData(mmd);
- return mmd;
- }
-
- private void populateFromMessageMetaData(MessageMetaData mmd)
- {
- _arrivalTime = mmd.getArrivalTime();
- _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
- _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo());
- }
-
- public int getBodyCount(StoreContext context) throws AMQException
- {
- if (_contentBodies == null)
- {
- MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
- int chunkCount = mmd.getContentChunkCount();
- _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount);
- for (int i = 0; i < chunkCount; i++)
- {
- _contentBodies.add(new WeakReference<ContentChunk>(null));
- }
- }
- return _contentBodies.size();
- }
-
- public long getBodySize(StoreContext context) throws AMQException
- {
- return getContentHeaderBody(context).bodySize;
- }
-
- public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
- {
- if(_contentBodies == null)
- {
- throw new RuntimeException("No ContentBody has been set");
- }
-
- if (index > _contentBodies.size() - 1 || index < 0)
- {
- throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
- (_contentBodies.size() - 1));
- }
- WeakReference<ContentChunk> wr = _contentBodies.get(index);
- ContentChunk cb = wr.get();
- if (cb == null)
- {
- cb = _messageStore.getContentBodyChunk(context, _messageId, index);
- _contentBodies.set(index, new WeakReference<ContentChunk>(cb));
- }
- return cb;
- }
-
- /**
- * Content bodies are set <i>before</i> the publish and header frames
- *
- * @param storeContext
- * @param contentChunk
- * @param isLastContentBody
- * @throws AMQException
- */
- public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
- {
- if (_contentBodies == null && isLastContentBody)
- {
- _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1);
- }
- else
- {
- if (_contentBodies == null)
- {
- _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
- }
- }
- _contentBodies.add(new WeakReference<ContentChunk>(contentChunk));
- _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
- contentChunk, isLastContentBody);
- }
-
- public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
- {
- MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null);
- if (bpb == null)
- {
- MessageMetaData mmd = loadMessageMetaData(context);
-
- bpb = mmd.getMessagePublishInfo();
- }
- return bpb;
- }
-
- public boolean isRedelivered()
- {
- return _redelivered;
- }
-
- public void setRedelivered(boolean redelivered)
- {
- _redelivered = redelivered;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- /**
- * This is called when all the content has been received.
- *
- * @param publishBody
- * @param contentHeaderBody
- * @throws AMQException
- */
- public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo publishBody,
- ContentHeaderBody contentHeaderBody)
- throws AMQException
- {
- // if there are no content bodies the list will be null so we must
- // create en empty list here
- if (contentHeaderBody.bodySize == 0)
- {
- _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
- }
-
- final long arrivalTime = System.currentTimeMillis();
-
- MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), arrivalTime);
-
- _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
-
-
- populateFromMessageMetaData(mmd);
- }
-
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- _messageStore.removeMessage(storeContext, _messageId);
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index f23983641b..9de2d09b8e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -27,8 +27,8 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -93,7 +93,7 @@ public class DerbyMessageStore implements MessageStore
private String _connectionURL;
-
+ MessageFactory _messageFactory;
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
@@ -167,6 +167,8 @@ public class DerbyMessageStore implements MessageStore
// this recovers durable queues and persistent messages
+ _messageFactory = new MessageFactory();
+
recover();
stateTransition(State.RECOVERING, State.STARTED);
@@ -1299,7 +1301,7 @@ public class DerbyMessageStore implements MessageStore
private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
throws SQLException, AMQException
{
- Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
+ Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
List<ProcessAction> actions = new ArrayList<ProcessAction>();
Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
@@ -1318,8 +1320,6 @@ public class DerbyMessageStore implements MessageStore
conn = newConnection();
}
-
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
long maxId = 1;
TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
@@ -1355,7 +1355,11 @@ public class DerbyMessageStore implements MessageStore
}
else
{
- message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
+ message = _messageFactory.createMessage(messageId, this, true);
+
+ _logger.error("todo must do message recovery now.");
+ //todo must do message recovery now.
+
msgMap.put(messageId,message);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index b5a91c8da6..d46ba85069 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
@@ -349,32 +348,15 @@ public class Show extends AbstractCommand
arrival.add("" + msg.getArrivalTime());
- try
- {
- ispersitent.add(msg.isPersistent() ? "true" : "false");
- }
- catch (AMQException e)
- {
- ispersitent.add("n/a");
- }
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
isredelivered.add(entry.isRedelivered() ? "true" : "false");
isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
-// msg.getMessageHandle();
-
BasicContentHeaderProperties headers = null;
- try
- {
- headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
- }
- catch (AMQException e)
- {
- //ignore
-// commandError("Unable to read properties for message: " + e.getMessage(), null);
- }
+ headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
if (headers != null)
{
@@ -414,15 +396,7 @@ public class Show extends AbstractCommand
AMQShortString useridSS = headers.getUserId();
userid.add(useridSS == null ? "null" : useridSS.toString());
- MessagePublishInfo info = null;
- try
- {
- info = msg.getMessagePublishInfo();
- }
- catch (AMQException e)
- {
- //ignore
- }
+ MessagePublishInfo info = msg.getMessagePublishInfo();
if (info != null)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
index 5fbf9484f7..2a97db6066 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
@@ -22,14 +22,13 @@ package org.apache.qpid.server;
import junit.framework.TestCase;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.queue.MockAMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntryIterator;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.MockSubscription;
@@ -38,7 +37,6 @@ import org.apache.qpid.AMQException;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.LinkedList;
-import java.util.Iterator;
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
@@ -62,7 +60,7 @@ public class ExtractResendAndRequeueTest extends TestCase
UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
- private AMQQueue _queue = new MockAMQQueue();
+ private AMQQueue _queue = new MockAMQQueue("ExtractResendAndRequeueTest");
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index a705c8bbb4..228c99dcbd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -28,11 +28,11 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.TransientAMQMessage;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -113,6 +113,8 @@ public class TxAckTest extends TestCase
private StoreContext _storeContext = new StoreContext();
private AMQQueue _queue;
+ private static final int MESSAGE_SIZE=100;
+
Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
@@ -128,7 +130,12 @@ public class TxAckTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+ AMQMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+ message.setPublishAndContentHeaderBody(_storeContext, info, header);
+
_map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
}
_acked = acked;
@@ -190,16 +197,15 @@ public class TxAckTest extends TestCase
}
}
- private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
{
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- null,
- false);
+ final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId,
+ null,
+ false);
try
{
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
- publishBody,
- new ContentHeaderBody()
+ // Safe to use null here as we just created a TransientMessage above
+ amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody()
{
public int getSize()
{
@@ -213,11 +219,11 @@ public class TxAckTest extends TestCase
}
- return amqMessageHandle;
+ return amqMessage;
}
- private class TestMessage extends AMQMessage
+ private class TestMessage extends TransientAMQMessage
{
private final long _tag;
private int _count;
@@ -225,7 +231,7 @@ public class TxAckTest extends TestCase
TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
throws AMQException
{
- super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+ super(createMessage(messageId, publishBody));
_tag = tag;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 883a712bef..e0a4357990 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
@@ -54,7 +55,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private StoreContext _storeContext = new StoreContext();
- private MessageHandleFactory _handleFactory = new MessageHandleFactory();
+ private MessageFactory _handleFactory = new MessageFactory();
private int count;
@@ -370,7 +371,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
/**
* Just add some extra utility methods to AMQMessage to aid testing.
*/
- static class Message extends AMQMessage
+ static class Message extends PersistentAMQMessage
{
private class TestIncomingMessage extends IncomingMessage
{
@@ -392,14 +393,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
public ContentHeaderBody getContentHeaderBody()
{
- try
- {
- return Message.this.getContentHeaderBody();
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
+ return Message.this.getContentHeaderBody();
}
}
@@ -407,10 +401,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private static MessageStore _messageStore = new SkeletonMessageStore();
- private static StoreContext _storeContext = new StoreContext();
-
-
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
+ private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(),
null,
new LinkedList<RequiredDeliveryException>()
);
@@ -422,7 +413,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
Message(String id, FieldTable headers) throws AMQException
{
- this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers));
}
public IncomingMessage getIncomingMessage()
@@ -432,42 +423,35 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private Message(long messageId,
MessagePublishInfo publish,
- ContentHeaderBody header,
- List<ContentBody> bodies) throws AMQException
- {
- super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
-
-
-
- _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
- _incoming.setContentHeaderBody(header);
-
-
- }
-
- private static AMQMessageHandle createMessageHandle(final long messageId,
- final MessagePublishInfo publish,
- final ContentHeaderBody header)
+ ContentHeaderBody header) throws AMQException
{
-
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- _messageStore,
- true);
+ super(messageId, _messageStore);
try
{
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
+ setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header);
}
catch (AMQException e)
{
-
+
}
- return amqMessageHandle;
+
+ _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
+ _incoming.setContentHeaderBody(header);
}
private Message(AMQMessage msg) throws AMQException
{
- super(msg);
+ super(msg.getMessageId(), _messageStore);
+
+ this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody());
+
+ Iterator<ContentChunk> iterator = msg.getContentBodyIterator();
+
+ while(iterator.hasNext())
+ {
+ this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext());
+ }
}
@@ -500,15 +484,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private Object getKey()
{
- try
- {
- return getMessagePublishInfo().getRoutingKey();
- }
- catch (AMQException e)
- {
- _log.error("Error getting routing key: " + e, e);
- return null;
- }
+ return getMessagePublishInfo().getRoutingKey();
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index d1a69c9d3c..ddf177690c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -497,7 +497,7 @@ public class DestWildExchangeTest extends TestCase
throws AMQException
{
_exchange.route(message);
- message.routingComplete(_store, new MessageHandleFactory());
+ message.routingComplete(_store, new MessageFactory());
message.deliverToQueues();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index aff7af6952..ffe858f517 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -25,10 +25,12 @@ import java.util.ArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
import junit.framework.AssertionFailedError;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
+ private static final long MESSAGE_SIZE = 100L;
@Override
protected void setUp() throws Exception
@@ -92,11 +94,18 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
protected AMQMessage createMessage(Long id, byte i) throws AMQException
{
- AMQMessage msg = super.createMessage(id);
+ AMQMessage message = super.createMessage(id);
+
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+
+ //The createMessage above is for a Transient Message so it is safe to have no context.
+ message.setPublishAndContentHeaderBody(null, info, header);
+
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
props.setPriority(i);
- msg.getContentHeaderBody().properties = props;
- return msg;
+ message.getContentHeaderBody().properties = props;
+ return message;
}
protected AMQMessage createMessage(Long id) throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index fba30528ea..b159e2cda5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -136,6 +136,7 @@ public class AMQQueueAlertTest extends TestCase
while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
{
sendMessages(1, MAX_MESSAGE_SIZE);
+ System.err.println(_queue.getQueueDepth() + ":" + MAX_QUEUE_DEPTH);
}
Notification lastNotification = _queueMBean.getLastNotification();
@@ -307,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
messages[i].enqueue(qs);
- messages[i].routingComplete(_messageStore, new MessageHandleFactory());
+ messages[i].routingComplete(_messageStore, new MessageFactory());
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 38f030f670..a5e2da7b36 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -221,7 +221,7 @@ public class AMQQueueMBeanTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, new MessageHandleFactory());
+ msg.routingComplete(_messageStore, new MessageFactory());
msg.addContentBodyFrame(new ContentChunk()
{
@@ -305,7 +305,7 @@ public class AMQQueueMBeanTest extends TestCase
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
+ currentMessage.routingComplete(_messageStore, new MessageFactory());
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 01674c5b3d..cd1ee65c0c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -98,7 +98,7 @@ public class AckTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
_queue.registerSubscription(_subscription,false);
- MessageHandleFactory factory = new MessageHandleFactory();
+ MessageFactory factory = new MessageFactory();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java
deleted file mode 100644
index cac84c01b4..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-
-public class InMemoryMessageHandleTest extends TestCase
-{
- AMQMessageHandle _handle;
-
- protected AMQMessageHandle newHandle(Long id)
- {
- return new InMemoryMessageHandle(id);
- }
-
- public void testMessageID()
- {
- Long id = 1L;
- _handle = newHandle(id);
-
- assertEquals("Message not set value", id, _handle.getMessageId());
- }
-
- public void testInvalidContentChunk()
- {
- _handle = newHandle(1L);
-
- try
- {
- _handle.getContentChunk(null, 0);
- fail("getContentChunk should not succeed");
- }
- catch (RuntimeException e)
- {
- assertTrue(e.getMessage().equals("No ContentBody has been set"));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- ContentChunk cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, false);
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- _handle.getContentChunk(null, -1);
- fail("getContentChunk should not succeed");
- }
- catch (IllegalArgumentException e)
- {
- assertTrue(e.getMessage().contains("out of valid range"));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- _handle.getContentChunk(null, 1);
- fail("getContentChunk should not succeed");
- }
- catch (IllegalArgumentException e)
- {
- assertTrue(e.getMessage().contains("out of valid range"));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
- }
-
- public void testAddSingleContentChunk()
- {
-
- _handle = newHandle(1L);
-
- ContentChunk cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, true);
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect body count", 1, _handle.getBodyCount(null));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 0));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, true);
- fail("Exception should prevent adding two final chunks");
- }
- catch (UnsupportedOperationException e)
- {
- //normal path
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- }
-
- public void testAddMultipleContentChunk()
- {
-
- _handle = newHandle(1L);
-
- ContentChunk cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, false);
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect body count", 1, _handle.getBodyCount(null));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 0));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- cc = new MockContentChunk(null, 100);
-
- try
- {
- _handle.addContentBodyFrame(null, cc, true);
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect body count", 2, _handle.getBodyCount(null));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- try
- {
- assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 1));
- }
- catch (AMQException e)
- {
- fail("AMQException thrown:" + e.getMessage());
- }
-
- }
-
- // todo Move test to QueueEntry
-// public void testRedelivered()
-// {
-// _handle = newHandle(1L);
-//
-// assertFalse("New message should not be redelivered", _handle.isRedelivered());
-//
-// _handle.setRedelivered(true);
-//
-// assertTrue("New message should not be redelivered", _handle.isRedelivered());
-// }
-
- public void testInitialArrivalTime()
- {
- _handle = newHandle(1L);
-
- assertEquals("Initial Arrival time should be 0L", 0L, _handle.getArrivalTime());
- }
-
- public void testSetPublishAndContentHeaderBody_WithBody()
- {
- _handle = newHandle(1L);
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl();
- int bodySize = 100;
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize);
-
- try
- {
- _handle.setPublishAndContentHeaderBody(null, mpi, chb);
-
- assertEquals("BodySize not returned correctly. ", bodySize, _handle.getBodySize(null));
- }
- catch (AMQException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-
- public void testSetPublishAndContentHeaderBody_Empty()
- {
- _handle = newHandle(1L);
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl();
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- try
- {
- _handle.setPublishAndContentHeaderBody(null, mpi, chb);
-
- assertEquals("BodySize not returned correctly. ", bodySize, _handle.getBodySize(null));
-
- ContentHeaderBody retreived_chb = _handle.getContentHeaderBody(null);
-
- ContentHeaderProperties chp = retreived_chb.properties;
-
- assertEquals("ContentHeaderBody not correct", chb, retreived_chb);
-
- assertEquals("AppID not correctly retreived", "HandleTest",
- ((BasicContentHeaderProperties) chp).getAppIdAsString());
-
- MessagePublishInfo retreived_mpi = _handle.getMessagePublishInfo(null);
-
- assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi);
-
-
- }
- catch (AMQException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-
- public void testIsPersistent()
- {
- _handle = newHandle(1L);
-
- assertFalse(_handle.isPersistent());
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
new file mode 100644
index 0000000000..582e2bfb00
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class MessageFactoryTest extends TestCase
+{
+ private MessageFactory _factory;
+
+ public void setUp()
+ {
+ _factory = new MessageFactory();
+ }
+
+ public void testTransientMessageCreation()
+ {
+ AMQMessage message = _factory.createMessage(0L, null, false);
+
+ assertEquals("Transient Message creation does not return correct class.", TransientAMQMessage.class, message.getClass());
+ }
+
+ public void testPersistentMessageCreation()
+ {
+ AMQMessage message = _factory.createMessage(0L, null, true);
+
+ assertEquals("Transient Message creation does not return correct class.", PersistentAMQMessage.class, message.getClass());
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index a05eb0892b..cc6c486e11 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -22,23 +22,13 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-public class MockAMQMessage extends AMQMessage
+public class MockAMQMessage extends TransientAMQMessage
{
public MockAMQMessage(long messageId)
throws AMQException
{
- super(new MockAMQMessageHandle(messageId) ,
- (StoreContext)null,
- (MessagePublishInfo)new MessagePublishInfoImpl());
- }
-
- protected MockAMQMessage(AMQMessage msg)
- throws AMQException
- {
- super(msg);
+ super(messageId);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 758c8ddb2e..5f1cc81772 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -27,19 +27,16 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
import java.util.List;
import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedList;
public class MockAMQQueue implements AMQQueue
{
private boolean _deleted = false;
+ private int _queueCount;
private AMQShortString _name;
public MockAMQQueue(String name)
@@ -47,11 +44,6 @@ public class MockAMQQueue implements AMQQueue
_name = new AMQShortString(name);
}
- public MockAMQQueue()
- {
-
- }
-
public AMQShortString getName()
{
return _name;
@@ -134,7 +126,7 @@ public class MockAMQQueue implements AMQQueue
public long getQueueDepth()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueCount;
}
public long getReceivedMessageCount()
@@ -159,6 +151,7 @@ public class MockAMQQueue implements AMQQueue
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
{
+ _queueCount++;
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -169,7 +162,7 @@ public class MockAMQQueue implements AMQQueue
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _queueCount--;
}
public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
index ee85fecfa3..8a9d1ae771 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
@@ -20,14 +20,32 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.FixedSizeByteBufferAllocator;
+import org.apache.qpid.framing.abstraction.ContentChunk;
public class MockContentChunk implements ContentChunk
{
+ public static final int DEFAULT_SIZE=0;
+
private ByteBuffer _bytebuffer;
private int _size;
+
+
+ public MockContentChunk()
+ {
+ this(0);
+ }
+
+ public MockContentChunk(int size)
+ {
+ FixedSizeByteBufferAllocator allocator = new FixedSizeByteBufferAllocator();
+ _bytebuffer = allocator.allocate(size, false);
+
+ _size = size;
+ }
+
public MockContentChunk(ByteBuffer bytebuffer, int size)
{
_bytebuffer = bytebuffer;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index c6e7e2ebe2..e213be7560 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -21,8 +21,9 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
-public class WeakMessageHandleTest extends InMemoryMessageHandleTest
+public class PersistentMessageTest extends TransientMessageTest
{
private MemoryMessageStore _messageStore;
@@ -30,19 +31,20 @@ public class WeakMessageHandleTest extends InMemoryMessageHandleTest
{
_messageStore = new MemoryMessageStore();
_messageStore.configure();
+ _storeContext = new StoreContext();
}
- protected AMQMessageHandle newHandle(Long id)
+ @Override
+ protected AMQMessage newMessage(Long id)
{
- return new WeakReferenceMessageHandle(id, _messageStore);
+ return new MessageFactory().createMessage(id, _messageStore, true);
}
@Override
public void testIsPersistent()
{
- _handle = newHandle(1L);
- assertTrue(_handle.isPersistent());
+ _message = newMessage(1L);
+ assertTrue(_message.isPersistent());
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
index 7573a629c1..f7cd860c22 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
@@ -20,30 +20,30 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.MessageStore;
+import junit.framework.TestCase;
-/**
- * Constructs a message handle based on the publish body, the content header and the queue to which the message
- * has been routed.
- *
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
- */
-public class MessageHandleFactory
+public class QueueEntryImplTest extends TestCase
{
- public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent)
+ /**
+ * Test the Redelivered state of a QueueEntryImpl
+ */
+ public void testRedelivered()
{
- // just hardcoded for now
- if (persistent)
- {
- return new WeakReferenceMessageHandle(messageId, store);
- }
- else
- {
- return new InMemoryMessageHandle(messageId);
- }
-
-// return new AMQMessage(messageId, store, persistent);
+ QueueEntry entry = new QueueEntryImpl(null, null);
+
+ assertFalse("New message should not be redelivered", entry.isRedelivered());
+
+ entry.setRedelivered(true);
+
+ assertTrue("New message should not be redelivered", entry.isRedelivered());
+
+ //Check we can revert it.. not that we ever should.
+ entry.setRedelivered(false);
+
+ assertFalse("New message should not be redelivered", entry.isRedelivered());
+
}
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 500655c07c..2dcb081739 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -56,7 +56,8 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
-
+ private static final long MESSAGE_SIZE = 100;
+
@Override
protected void setUp() throws Exception
{
@@ -317,7 +318,7 @@ public class SimpleAMQQueueTest extends TestCase
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store, new MessageHandleFactory());
+ msg.routingComplete(_store, new MessageFactory());
_store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
@@ -326,9 +327,14 @@ public class SimpleAMQQueueTest extends TestCase
// Dequeue message
MockQueueEntry entry = new MockQueueEntry();
- AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
+ AMQMessage message = new MessageFactory().createMessage(1L, _store, true);
- entry.setMessage(amqmsg);
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.bodySize = MESSAGE_SIZE;
+ // This is a persist message but we are not in a transaction so create a new context for the message
+ message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
+
+ entry.setMessage(message);
_queue.dequeue(null, entry);
// Check that it is dequeued
@@ -338,22 +344,19 @@ public class SimpleAMQQueueTest extends TestCase
// FIXME: move this to somewhere useful
- private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
{
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- null,
- false);
+ final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false);
try
{
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
- publishBody,
- new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
+ //Safe to use a null StoreContext as we have created a TransientMessage (see false param above)
+ amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
}
catch (AMQException e)
{
@@ -361,18 +364,18 @@ public class SimpleAMQQueueTest extends TestCase
}
- return amqMessageHandle;
+ return amqMessage;
}
- public class TestMessage extends AMQMessage
+ public class TestMessage extends TransientAMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+ super(createMessage(messageId, publishBody));
_tag = tag;
}
@@ -396,7 +399,8 @@ public class SimpleAMQQueueTest extends TestCase
protected AMQMessage createMessage(Long id) throws AMQException
{
- AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
+
+ AMQMessage messageA = new TestMessage(id, id, info);
return messageA;
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
new file mode 100644
index 0000000000..e37269526c
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
@@ -0,0 +1,467 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TransientMessageTest extends TestCase
+{
+ AMQMessage _message;
+ StoreContext _storeContext = null;
+
+ protected AMQMessage newMessage(Long id)
+ {
+ return new MessageFactory().createMessage(id, null, false);
+ }
+
+ public void testMessageID()
+ {
+ Long id = 1L;
+ _message = newMessage(id);
+
+ assertEquals("Message not set value", id, _message.getMessageId());
+ }
+
+ public void testInvalidContentChunk()
+ {
+ _message = newMessage(1L);
+
+ try
+ {
+ _message.getContentChunk(0);
+ fail("getContentChunk should not succeed");
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue(e.getMessage().equals("No ContentBody has been set"));
+ }
+
+ ContentChunk cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, false);
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ try
+ {
+ _message.getContentChunk(-1);
+ fail("getContentChunk should not succeed");
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue(e.getMessage().contains("out of valid range"));
+ }
+
+ try
+ {
+ _message.getContentChunk(1);
+ fail("getContentChunk should not succeed");
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue(e.getMessage().contains("out of valid range"));
+ }
+ }
+
+ public void testAddSingleContentChunk()
+ {
+
+ _message = newMessage(1L);
+
+ ContentChunk cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, true);
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ assertEquals("Incorrect body count", 1, _message.getBodyCount());
+
+ assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0));
+
+ cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, true);
+ fail("Exception should prevent adding two final chunks");
+ }
+ catch (UnsupportedOperationException e)
+ {
+ //normal path
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ }
+
+ public void testAddMultipleContentChunk()
+ {
+
+ _message = newMessage(1L);
+
+ ContentChunk cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, false);
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ assertEquals("Incorrect body count", 1, _message.getBodyCount());
+
+ assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0));
+
+ cc = new MockContentChunk(100);
+
+ try
+ {
+ _message.addContentBodyFrame(_storeContext, cc, true);
+ }
+ catch (AMQException e)
+ {
+ fail("AMQException thrown:" + e.getMessage());
+ }
+
+ assertEquals("Incorrect body count", 2, _message.getBodyCount());
+
+ assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(1));
+
+ }
+
+ public void testInitialArrivalTime()
+ {
+ _message = newMessage(1L);
+
+ assertEquals("Initial Arrival time should be 0L", 0L, _message.getArrivalTime());
+ }
+
+ public void testSetPublishAndContentHeaderBody_WithBody()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+ int bodySize = 100;
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testSetPublishAndContentHeaderBody_Null()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, null);
+ fail("setPublishAndContentHeaderBody with null ContentHeaederBody did not throw NPE.");
+ }
+ catch (NullPointerException npe)
+ {
+ assertEquals("HeaderBody cannot be null", npe.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage());
+ }
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, null, chb);
+ fail("setPublishAndContentHeaderBody with null MessagePublishInfo did not throw NPE.");
+ }
+ catch (NullPointerException npe)
+ {
+ assertEquals("PublishInfo cannot be null", npe.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage());
+ }
+ }
+
+ public void testSetPublishAndContentHeaderBody_Empty()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize());
+
+ ContentHeaderBody retreived_chb = _message.getContentHeaderBody();
+
+ ContentHeaderProperties chp = retreived_chb.properties;
+
+ assertEquals("ContentHeaderBody not correct", chb, retreived_chb);
+
+ assertEquals("AppID not correctly retreived", "HandleTest",
+ ((BasicContentHeaderProperties) chp).getAppIdAsString());
+
+ MessagePublishInfo retreived_mpi = _message.getMessagePublishInfo();
+
+ assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi);
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testIsPersistent()
+ {
+ _message = newMessage(1L);
+
+ assertFalse(_message.isPersistent());
+ }
+
+ public void testImmediateAndNotDelivered()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
+
+ _message.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
+
+ assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testNotImmediateAndNotDelivered()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
+
+ _message.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
+
+ assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testExpiry()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+ _message.setExpiration(System.currentTimeMillis() + 10L);
+
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertFalse("New messages should not be expired.", _message.expired());
+
+ final long MILLIS =1000000L;
+ long waitTime = 20 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertTrue("After a sleep messages should now be expired.", _message.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+
+ public void testNoExpiry()
+ {
+ _message = newMessage(1L);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+
+ _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+ assertFalse("New messages should not be expired.", _message.expired());
+
+ final long MILLIS =1000000L;
+ long waitTime = 10 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertFalse("After a sleep messages without an expiry should not expire.", _message.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 12ed928e7f..b4ed1f8709 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -389,7 +389,7 @@ public class MessageStoreTest extends TestCase
try
{
- currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
+ currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageFactory());
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 51820f72dd..9a9fe3644c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -26,9 +26,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.AMQMessageHandle;
/**
* Tests that reference counting works correctly with AMQMessage and the message store
@@ -56,10 +55,9 @@ public class TestReferenceCounting extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
final long messageId = _store.getNewMessageId();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
- AMQMessage message = new AMQMessage(messageHandle,
- _storeContext,info);
+
+ AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true);
+ message.setPublishAndContentHeaderBody(_storeContext, info, chb);
message = message.takeReference();
@@ -88,18 +86,10 @@ public class TestReferenceCounting extends TestCase
final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
- AMQMessage message = new AMQMessage(messageHandle,
- _storeContext,
- info);
-
+ AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true);
+ message.setPublishAndContentHeaderBody(_storeContext, info, chb);
message = message.takeReference();
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
-
assertEquals(1, _store.getMessageMetaDataMap().size());
message = message.takeReference();