summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java436
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java62
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java151
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java (renamed from java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java)20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java84
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java469
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java223
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java32
16 files changed, 696 insertions, 960 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5fde08cbdd..3b290b3d51 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -35,12 +35,12 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -108,7 +108,7 @@ public class AMQChannel
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
+ private MessageFactory _messageHandleFactory = new MessageFactory();
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 4949e5b41d..ea94f23ff9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -28,7 +28,6 @@ package org.apache.qpid.server.output.amqp0_8;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -37,8 +36,6 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import org.apache.mina.common.ByteBuffer;
-
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -79,11 +76,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -100,7 +93,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -112,7 +105,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -126,8 +119,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
@@ -135,7 +126,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -150,7 +141,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -162,7 +153,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -179,7 +170,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicDeliverBody deliverBody =
@@ -188,18 +178,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
-
-
- return deliverFrame;
+ return deliverBody.generateFrame(channelId);
}
private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
@@ -208,9 +194,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -231,9 +215,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
+
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 00a15d2d50..b71b118275 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -28,9 +28,7 @@ import java.util.Iterator;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -77,12 +75,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -99,7 +92,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
@@ -111,7 +104,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -123,9 +116,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
+ return ContentHeaderBody.createAMQFrame(channelId, contentHeaderBody);
}
@@ -133,15 +124,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -156,7 +145,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -168,7 +157,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
//
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -190,7 +179,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQBody returnBlock = new AMQBody()
+ return new AMQBody()
{
public AMQBody _underlyingBody;
@@ -238,7 +227,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
throw new AMQException("This block should never be dispatched!");
}
};
- return returnBlock;
}
private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
@@ -253,9 +241,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -276,9 +262,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d73d37f48d..2bd6e612f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,363 +20,52 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-
+import org.apache.qpid.AMQException;
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
-/**
- * A deliverable message.
- */
-public class AMQMessage
+public interface AMQMessage
{
- /** Used for debugging purposes. */
- private static final Logger _log = Logger.getLogger(AMQMessage.class);
-
- private final AtomicInteger _referenceCount = new AtomicInteger(1);
-
- private final AMQMessageHandle _messageHandle;
-
- /** Holds the transactional context in which this message is being processed. */
- private StoreContext _storeContext;
-
- /** Flag to indicate that this message requires 'immediate' delivery. */
-
- private static final byte IMMEDIATE = 0x01;
-
- /**
- * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
- * for messages published with the 'immediate' flag.
- */
-
- private static final byte DELIVERED_TO_CONSUMER = 0x02;
-
- private byte _flags = 0;
-
- private long _expiration;
-
- private final long _size;
-
- private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
- private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
-
-
- /**
- * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
- * therefore is memory-efficient.
- */
- private class BodyFrameIterator implements Iterator<AMQDataBlock>
- {
- private int _channel;
-
- private int _index = -1;
- private AMQProtocolSession _protocolSession;
-
- private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- _channel = channel;
- _protocolSession = protocolSession;
- }
-
- public boolean hasNext()
- {
- try
- {
- return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
- }
- catch (AMQException e)
- {
- _log.error("Unable to get body count: " + e, e);
-
- return false;
- }
- }
-
- public AMQDataBlock next()
- {
- try
- {
-
- AMQBody cb =
- getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
- ++_index));
-
- return new AMQFrame(_channel, cb);
- }
- catch (AMQException e)
- {
- // have no choice but to throw a runtime exception
- throw new RuntimeException("Error getting content body: " + e, e);
- }
-
- }
-
- private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
- {
- return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- public void clearStoreContext()
- {
- _storeContext = new StoreContext();
- }
-
- public StoreContext getStoreContext()
- {
- return _storeContext;
- }
-
- private class BodyContentIterator implements Iterator<ContentChunk>
- {
-
- private int _index = -1;
-
- public boolean hasNext()
- {
- try
- {
- return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
- }
- catch (AMQException e)
- {
- _log.error("Error getting body count: " + e, e);
-
- return false;
- }
- }
-
- public ContentChunk next()
- {
- try
- {
- return _messageHandle.getContentChunk(getStoreContext(), ++_index);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Error getting content body: " + e, e);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
-
-
- /**
- * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
- * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
- * queues.
- *
- * @param messageId
- * @param store
- * @param factory
- *
- * @throws AMQException
- */
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
- throws AMQException
- {
- _messageHandle = factory.createMessageHandle(messageId, store, true);
- _storeContext = txnConext.getStoreContext();
- _size = _messageHandle.getBodySize(txnConext.getStoreContext());
- }
-
- /**
- * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
- * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
- * queues.
- *
- * @param messageHandle
- *
- * @throws AMQException
- */
- public AMQMessage(
- AMQMessageHandle messageHandle,
- StoreContext storeConext,
- MessagePublishInfo info)
- throws AMQException
- {
- _messageHandle = messageHandle;
- _storeContext = storeConext;
-
- if(info.isImmediate())
- {
- _flags |= IMMEDIATE;
- }
- _size = messageHandle.getBodySize(storeConext);
-
- }
+ //Get Content relating to this message
+ Long getMessageId();
- protected AMQMessage(AMQMessage msg) throws AMQException
- {
- _messageHandle = msg._messageHandle;
- _storeContext = msg._storeContext;
- _flags = msg._flags;
- _size = msg._size;
+ Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel);
- }
+ Iterator<ContentChunk> getContentBodyIterator();
+ ContentHeaderBody getContentHeaderBody();
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
- }
+ ContentChunk getContentChunk(int index);
- public void setExpiration(final long expiration)
- {
+ Object getPublisherClientInstance();
- _expiration = expiration;
+ Object getPublisherIdentifier();
- }
+ MessagePublishInfo getMessagePublishInfo();
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
+ int getBodyCount();
- public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- return new BodyFrameIterator(protocolSession, channel);
- }
+ long getSize();
- public Iterator<ContentChunk> getContentBodyIterator()
- {
- return new BodyContentIterator();
- }
+ long getArrivalTime();
- public ContentHeaderBody getContentHeaderBody() throws AMQException
- {
- return _messageHandle.getContentHeaderBody(getStoreContext());
- }
-
- public Long getMessageId()
- {
- return _messageHandle.getMessageId();
- }
-
- /**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
- if(_referenceCount.addAndGet(count) <= 1)
- {
- _referenceCount.addAndGet(-count);
- return false;
- }
- else
- {
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- * @param storeContext
- *
- * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
- {
-
- int count = _referenceCount.decrementAndGet();
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
-
- try
- {
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_messageHandle != null)
- {
- _messageHandle.removeMessage(storeContext);
- }
- }
- catch (AMQException e)
- {
- // to maintain consistency, we revert the count
- incrementReference();
- throw new MessageCleanupException(getMessageId(), e);
- }
- }
- else
- {
- if (count < 0)
- {
- throw new MessageCleanupException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
+ //Check the status of this message
/**
* Called selectors to determin if the message has already been sent
*
* @return _deliveredToConsumer
*/
- public boolean getDeliveredToConsumer()
- {
- return (_flags & DELIVERED_TO_CONSUMER) != 0;
- }
-
- public boolean isPersistent() throws AMQException
- {
- return _messageHandle.isPersistent();
- }
+ boolean getDeliveredToConsumer();
/**
* Called to enforce the 'immediate' flag.
@@ -384,89 +73,62 @@ public class AMQMessage
* @returns true if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public boolean immediateAndNotDelivered()
- {
-
- return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
- }
-
- public MessagePublishInfo getMessagePublishInfo() throws AMQException
- {
- return _messageHandle.getMessagePublishInfo(getStoreContext());
- }
-
- public long getArrivalTime()
- {
- return _messageHandle.getArrivalTime();
- }
+ boolean immediateAndNotDelivered();
/**
* Checks to see if the message has expired. If it has the message is dequeued.
*
- * @param queue The queue to check the expiration against. (Currently not used)
- *
* @return true if the message has expire
*
- * @throws AMQException
+ * @throws org.apache.qpid.AMQException
*/
- public boolean expired(AMQQueue queue) throws AMQException
- {
+ boolean expired() throws AMQException;
- if (_expiration != 0L)
- {
- long now = System.currentTimeMillis();
-
- return (now > _expiration);
- }
+ /** Is this a persistent message
+ *
+ * @return true if the message is persistent
+ */
+ boolean isPersistent();
- return false;
- }
/**
* Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
* And for selector efficiency.
*/
- public void setDeliveredToConsumer()
- {
- _flags |= DELIVERED_TO_CONSUMER;
- }
+ void setDeliveredToConsumer();
+
+ void setExpiration(long expiration);
+
+ void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier);
+
+ /**
+ * This is called when all the content has been received.
+ * @param storeContext
+ *@param messagePublishInfo
+ * @param contentHeaderBody @throws org.apache.qpid.AMQException
+ */
+ void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody)
+ throws AMQException;
+ void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException;
- public AMQMessageHandle getMessageHandle()
- {
- return _messageHandle;
- }
+ void removeMessage(StoreContext storeContext) throws AMQException;
- public long getSize()
- {
- return _size;
+ String toString();
- }
+ String debugIdentity();
- public Object getPublisherClientInstance()
- {
- return _sessionIdentifier.getSessionInstance();
- }
-
- public Object getPublisherIdentifier()
- {
- return _sessionIdentifier.getSessionIdentifier();
- }
+ // Reference counting methods
- public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
- {
- _sessionIdentifier = sessionIdentifier;
- }
+ void decrementReference(StoreContext storeContext) throws MessageCleanupException;
+ boolean incrementReference(int queueCount);
- public String toString()
- {
- // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- // _taken + " by :" + _takenBySubcription;
+ boolean incrementReference();
- return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
- }
+ AMQMessage takeReference();
+ boolean isReferenced();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index f5853bd303..a08719875d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -353,29 +353,20 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
}
- try
+ // Create header attributes list
+ CommonContentHeaderProperties headerProperties =
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = null, encoding = null;
+ if (headerProperties != null)
{
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
- {
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
- }
+ AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+ mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+ encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+ }
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
- catch (AMQException e)
- {
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
- }
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
/**
@@ -392,27 +383,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
List<QueueEntry> list = _queue.getMessagesOnTheQueue();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
- try
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
- // Create the tabular list of message header contents
- for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
- {
- QueueEntry queueEntry = list.get(i - 1);
- AMQMessage msg = queueEntry.getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
- queueEntry.isRedelivered() };
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
- }
- }
- catch (AMQException e)
- {
- JMException jme = new JMException("Error creating message contents: " + e);
- jme.initCause(e);
- throw jme;
+ QueueEntry queueEntry = list.get(i - 1);
+ AMQMessage msg = queueEntry.getMessage();
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
+ queueEntry.isRedelivered() };
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
+ _messageList.put(messageData);
}
return _messageList;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
deleted file mode 100644
index 1092f67d94..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- */
-public class InMemoryMessageHandle implements AMQMessageHandle
-{
-
- private ContentHeaderBody _contentHeaderBody;
-
- private MessagePublishInfo _messagePublishInfo;
-
- private List<ContentChunk> _contentBodies;
-
- private boolean _redelivered;
-
- private long _arrivalTime;
-
- private final Long _messageId;
-
- public InMemoryMessageHandle(final Long messageId)
- {
- _messageId = messageId;
- }
-
- public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
- {
- return _contentHeaderBody;
- }
-
- public Long getMessageId()
- {
- return _messageId;
- }
-
- public int getBodyCount(StoreContext context)
- {
- return _contentBodies.size();
- }
-
- public long getBodySize(StoreContext context) throws AMQException
- {
- return getContentHeaderBody(context).bodySize;
- }
-
- public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
- {
- if(_contentBodies == null)
- {
- throw new RuntimeException("No ContentBody has been set");
- }
-
- if (index > _contentBodies.size() - 1 || index < 0)
- {
- throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
- (_contentBodies.size() - 1));
- }
- return _contentBodies.get(index);
- }
-
- public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody)
- throws AMQException
- {
- if(_contentBodies == null)
- {
- if(isLastContentBody)
- {
- _contentBodies = Collections.singletonList(contentBody);
- }
- else
- {
- _contentBodies = new ArrayList<ContentChunk>();
- _contentBodies.add(contentBody);
- }
- }
- else
- {
- _contentBodies.add(contentBody);
- }
- }
-
- public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
- {
- return _messagePublishInfo;
- }
-
- public boolean isPersistent()
- {
- return false;
- }
-
- /**
- * This is called when all the content has been received.
- * @param messagePublishInfo
- * @param contentHeaderBody
- * @throws AMQException
- */
- public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
- ContentHeaderBody contentHeaderBody)
- throws AMQException
- {
- _messagePublishInfo = messagePublishInfo;
- _contentHeaderBody = contentHeaderBody;
- if(contentHeaderBody.bodySize == 0)
- {
- _contentBodies = Collections.EMPTY_LIST;
- }
- _arrivalTime = System.currentTimeMillis();
- }
-
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- // NO OP
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index b994040131..aad99da6c3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -35,7 +35,6 @@ import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
{
@@ -48,7 +47,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private AMQMessageHandle _messageHandle;
+ private AMQMessage _message;
private final Long _messageId;
private final TransactionalContext _txnContext;
@@ -74,7 +73,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
private Exchange _exchange;
-
public IncomingMessage(final Long messageId,
final MessagePublishInfo info,
final TransactionalContext txnContext,
@@ -124,11 +122,11 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
public void routingComplete(final MessageStore store,
- final MessageHandleFactory factory) throws AMQException
+ final MessageFactory factory) throws AMQException
{
final boolean persistent = isPersistent();
- _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
+ _message = factory.createMessage(_messageId, store, persistent);
if (persistent)
{
_txnContext.beginTranIfNecessary();
@@ -157,21 +155,16 @@ public class IncomingMessage implements Filterable<RuntimeException>
_logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
}
- AMQMessage message = null;
-
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
- _messagePublishInfo, getContentHeaderBody());
+ _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
+
-
-
- message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
- message.setExpiration(_expiration);
- message.setClientIdentifier(_publisher.getSessionIdentifier());
+ _message.setExpiration(_expiration);
+ _message.setClientIdentifier(_publisher.getSessionIdentifier());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -182,7 +175,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
{
- throw new UnauthorizedAccessException("Acccess Refused",message);
+ throw new UnauthorizedAccessException("Acccess Refused", _message);
}
if ((_destinationQueues == null) || _destinationQueues.size() == 0)
@@ -190,26 +183,26 @@ public class IncomingMessage implements Filterable<RuntimeException>
if (isMandatory() || isImmediate())
{
- throw new NoRouteException("No Route for message", message);
+ throw new NoRouteException("No Route for message", _message);
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + message);
+ _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message);
}
}
else
{
int offset;
final int queueCount = _destinationQueues.size();
- message.incrementReference(queueCount);
+ _message.incrementReference(queueCount);
if(queueCount == 1)
{
offset = 0;
}
else
{
- offset = ((int)(message.getMessageId().longValue())) % queueCount;
+ offset = ((int)(_message.getMessageId().longValue())) % queueCount;
if(offset < 0)
{
offset = -offset;
@@ -218,22 +211,21 @@ public class IncomingMessage implements Filterable<RuntimeException>
for (int i = offset; i < queueCount; i++)
{
// normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), message);
+ _txnContext.deliver(_destinationQueues.get(i), _message);
}
for (int i = 0; i < offset; i++)
{
// normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), message);
+ _txnContext.deliver(_destinationQueues.get(i), _message);
}
}
- message.clearStoreContext();
- return message;
+ return _message;
}
finally
{
// Remove refence for routing process . Reference count should now == delivered queue count
- if(message != null) message.decrementReference(_txnContext.getStoreContext());
+ if(_message != null) _message.decrementReference(_txnContext.getStoreContext());
}
}
@@ -244,7 +236,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
_bodyLengthReceived += contentChunk.getSize();
- _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
+ _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index 7573a629c1..e18834874f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -21,29 +21,21 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
-/**
- * Constructs a message handle based on the publish body, the content header and the queue to which the message
- * has been routed.
- *
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
- */
-public class MessageHandleFactory
+public class MessageFactory
{
- public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent)
+ public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent)
{
- // just hardcoded for now
if (persistent)
{
- return new WeakReferenceMessageHandle(messageId, store);
+ return new PersistentAMQMessage(messageId, store);
}
else
{
- return new InMemoryMessageHandle(messageId);
+ return new TransientAMQMessage(messageId);
}
-
-// return new AMQMessage(messageId, store, persistent);
}
-
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index 6f9efd3200..e33b0c83c7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -47,20 +47,13 @@ public enum NotificationCheck
if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
-
+ long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
+ maximumMessageSize + ") breached. [Message ID=" +
+ (msg == null ? "null" : msg.getMessageId()) + "]");
return true;
}
}
@@ -110,7 +103,7 @@ public enum NotificationCheck
}
}
return false;
-
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
new file mode 100644
index 0000000000..04e3635f92
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+public class PersistentAMQMessage extends TransientAMQMessage
+{
+ protected MessageStore _messageStore;
+
+ public PersistentAMQMessage(Long messageId, MessageStore store)
+ {
+ super(messageId);
+ _messageStore = store;
+ }
+
+ @Override
+ public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException
+ {
+ super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody);
+ _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
+ contentChunk, isLastContentBody);
+ }
+
+ @Override
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
+ MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
+
+ _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+ }
+
+ @Override
+ public void removeMessage(StoreContext storeContext) throws AMQException
+ {
+ _messageStore.removeMessage(storeContext, _messageId);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public void recoverFromMessageMetaData(MessageMetaData mmd)
+ {
+ _arrivalTime = mmd.getArrivalTime();
+ _contentHeaderBody = mmd.getContentHeaderBody();
+ _messagePublishInfo = mmd.getMessagePublishInfo();
+ }
+
+ public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
+ {
+ super.addContentBodyFrame(null, contentChunk, isLastContentBody);
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index fd46a8a5ff..7be2827e0f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -54,25 +54,16 @@ public class PriorityQueueList implements QueueEntryList
public QueueEntry add(AMQMessage message)
{
- try
+ int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+ if(index >= _priorities)
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if(index >= _priorities)
- {
- index = _priorities-1;
- }
- else if(index < 0)
- {
- index = 0;
- }
- return _priorityLists[index].add(message);
+ index = _priorities-1;
}
- catch (AMQException e)
+ else if(index < 0)
{
- // TODO - fix AMQ Exception
- throw new RuntimeException(e);
+ index = 0;
}
-
+ return _priorityLists[index].add(message);
}
public QueueEntry next(QueueEntry node)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index fe9686e906..ba14be5580 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -132,7 +132,7 @@ public class QueueEntryImpl implements QueueEntry
public boolean expired() throws AMQException
{
- return getMessage().expired(getQueue());
+ return getMessage().expired();
}
public boolean isAcquired()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
new file mode 100644
index 0000000000..8c62e046f8
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -0,0 +1,469 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A deliverable message.
+ */
+public class TransientAMQMessage implements AMQMessage
+{
+ /** Used for debugging purposes. */
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
+ private final AtomicInteger _referenceCount = new AtomicInteger(1);
+
+ protected ContentHeaderBody _contentHeaderBody;
+
+ protected MessagePublishInfo _messagePublishInfo;
+
+ protected List<ContentChunk> _contentBodies;
+
+ protected long _arrivalTime;
+
+ protected final Long _messageId;
+
+
+
+ /** Flag to indicate that this message requires 'immediate' delivery. */
+
+ private static final byte IMMEDIATE = 0x01;
+
+ /**
+ * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
+ */
+
+ private static final byte DELIVERED_TO_CONSUMER = 0x02;
+
+ private byte _flags = 0;
+
+ private long _expiration;
+
+ private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
+ private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+
+ /**
+ * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
+ * therefore is memory-efficient.
+ */
+ private class BodyFrameIterator implements Iterator<AMQDataBlock>
+ {
+ private int _channel;
+
+ private int _index = -1;
+ private AMQProtocolSession _protocolSession;
+
+ private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
+ {
+ _channel = channel;
+ _protocolSession = protocolSession;
+ }
+
+ public boolean hasNext()
+ {
+ return _index < (getBodyCount() - 1);
+ }
+
+ public AMQDataBlock next()
+ {
+ AMQBody cb =
+ getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index));
+
+ return new AMQFrame(_channel, cb);
+ }
+
+ private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class BodyContentIterator implements Iterator<ContentChunk>
+ {
+
+ private int _index = -1;
+
+ public boolean hasNext()
+ {
+ return _index < (getBodyCount() - 1);
+ }
+
+ public ContentChunk next()
+ {
+ return getContentChunk(++_index);
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
+ * These all need refactoring to some sort of MockAMQMessageFactory.
+ */
+ @Deprecated
+ protected TransientAMQMessage(AMQMessage message) throws AMQException
+ {
+ _messageId = message.getMessageId();
+ _flags = ((TransientAMQMessage)message)._flags;
+ _contentHeaderBody = message.getContentHeaderBody();
+ _messagePublishInfo = message.getMessagePublishInfo();
+ }
+
+
+ /**
+ * Normal message creation via the MessageFactory uses this constructor
+ * Package scope limited as MessageFactory should be used
+ * @see MessageFactory
+ *
+ * @param messageId
+ */
+ TransientAMQMessage(Long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public String debugIdentity()
+ {
+ return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
+ }
+
+ public void setExpiration(final long expiration)
+ {
+ _expiration = expiration;
+ }
+
+ public boolean isReferenced()
+ {
+ return _referenceCount.get() > 0;
+ }
+
+ public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
+ {
+ return new BodyFrameIterator(protocolSession, channel);
+ }
+
+ public Iterator<ContentChunk> getContentBodyIterator()
+ {
+ return new BodyContentIterator();
+ }
+
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public Long getMessageId()
+ {
+ return _messageId;
+ }
+
+ /**
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+ * operation.
+ */
+ public AMQMessage takeReference()
+ {
+ incrementReference(); // _referenceCount.incrementAndGet();
+
+ return this;
+ }
+
+ public boolean incrementReference()
+ {
+ return incrementReference(1);
+ }
+
+ /* Threadsafe. Increment the reference count on the message. */
+ public boolean incrementReference(int count)
+ {
+ if(_referenceCount.addAndGet(count) <= 1)
+ {
+ _referenceCount.addAndGet(-count);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ * @param storeContext
+ *
+ * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference(StoreContext storeContext) throws MessageCleanupException
+ {
+
+ int count = _referenceCount.decrementAndGet();
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
+ try
+ {
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
+ removeMessage(storeContext);
+ }
+ catch (AMQException e)
+ {
+ // to maintain consistency, we revert the count
+ incrementReference();
+ throw new MessageCleanupException(getMessageId(), e);
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity()
+ + " has gone below 0.");
+ }
+ }
+ }
+
+
+ /**
+ * Called selectors to determin if the message has already been sent
+ *
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return (_flags & DELIVERED_TO_CONSUMER) != 0;
+ }
+
+ /**
+ * Called to enforce the 'immediate' flag.
+ *
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
+ */
+ public boolean immediateAndNotDelivered()
+ {
+
+ return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
+
+ }
+
+ /**
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @return true if the message has expire
+ *
+ * @throws AMQException
+ */
+ public boolean expired() throws AMQException
+ {
+
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > _expiration);
+ }
+
+ return false;
+ }
+
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * And for selector efficiency.
+ */
+ public void setDeliveredToConsumer()
+ {
+ _flags |= DELIVERED_TO_CONSUMER;
+ }
+
+
+ public long getSize()
+ {
+ return _contentHeaderBody.bodySize;
+ }
+
+ public Object getPublisherClientInstance()
+ {
+ return _sessionIdentifier.getSessionInstance();
+ }
+
+ public Object getPublisherIdentifier()
+ {
+ return _sessionIdentifier.getSessionIdentifier();
+ }
+
+ public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
+ {
+ _sessionIdentifier = sessionIdentifier;
+ }
+
+ /** From AMQMessageHandle **/
+
+ public int getBodyCount()
+ {
+ return _contentBodies.size();
+ }
+
+ public ContentChunk getContentChunk(int index)
+ {
+ if(_contentBodies == null)
+ {
+ throw new RuntimeException("No ContentBody has been set");
+ }
+
+ if (index > _contentBodies.size() - 1 || index < 0)
+ {
+ throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+ (_contentBodies.size() - 1));
+ }
+ return _contentBodies.get(index);
+ }
+
+ public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+ throws AMQException
+ {
+ if(_contentBodies == null)
+ {
+ if(isLastContentBody)
+ {
+ _contentBodies = Collections.singletonList(contentChunk);
+ }
+ else
+ {
+ _contentBodies = new ArrayList<ContentChunk>();
+ _contentBodies.add(contentChunk);
+ }
+ }
+ else
+ {
+ _contentBodies.add(contentChunk);
+ }
+ }
+
+ public MessagePublishInfo getMessagePublishInfo()
+ {
+ return _messagePublishInfo;
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ /**
+ * This is called when all the content has been received.
+ * @param storeContext
+ *@param messagePublishInfo
+ * @param contentHeaderBody @throws AMQException
+ */
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+
+ if (contentHeaderBody == null)
+ {
+ throw new NullPointerException("HeaderBody cannot be null");
+ }
+
+ if( messagePublishInfo == null)
+ {
+ throw new NullPointerException("PublishInfo cannot be null");
+ }
+
+ _messagePublishInfo = messagePublishInfo;
+ _contentHeaderBody = contentHeaderBody;
+
+
+ if( contentHeaderBody.bodySize == 0)
+ {
+ _contentBodies = Collections.EMPTY_LIST;
+ }
+
+ _arrivalTime = System.currentTimeMillis();
+
+ if(messagePublishInfo.isImmediate())
+ {
+ _flags |= IMMEDIATE;
+ }
+ }
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
+ public void removeMessage(StoreContext storeContext) throws AMQException
+ {
+ //no-op
+ }
+
+
+ public String toString()
+ {
+ // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ // _taken + " by :" + _takenBySubcription;
+
+ return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
deleted file mode 100644
index 804d2c2131..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
- */
-public class WeakReferenceMessageHandle implements AMQMessageHandle
-{
- private WeakReference<ContentHeaderBody> _contentHeaderBody;
-
- private WeakReference<MessagePublishInfo> _messagePublishInfo;
-
- private List<WeakReference<ContentChunk>> _contentBodies;
-
- private boolean _redelivered;
-
- private final MessageStore _messageStore;
-
- private final Long _messageId;
- private long _arrivalTime;
-
- public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore)
- {
- _messageId = messageId;
- _messageStore = messageStore;
- }
-
- public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
- {
- ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
- if (chb == null)
- {
- MessageMetaData mmd = loadMessageMetaData(context);
- chb = mmd.getContentHeaderBody();
- }
- return chb;
- }
-
- public Long getMessageId()
- {
- return _messageId;
- }
-
- private MessageMetaData loadMessageMetaData(StoreContext context)
- throws AMQException
- {
- MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
- populateFromMessageMetaData(mmd);
- return mmd;
- }
-
- private void populateFromMessageMetaData(MessageMetaData mmd)
- {
- _arrivalTime = mmd.getArrivalTime();
- _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
- _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo());
- }
-
- public int getBodyCount(StoreContext context) throws AMQException
- {
- if (_contentBodies == null)
- {
- MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
- int chunkCount = mmd.getContentChunkCount();
- _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount);
- for (int i = 0; i < chunkCount; i++)
- {
- _contentBodies.add(new WeakReference<ContentChunk>(null));
- }
- }
- return _contentBodies.size();
- }
-
- public long getBodySize(StoreContext context) throws AMQException
- {
- return getContentHeaderBody(context).bodySize;
- }
-
- public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
- {
- if(_contentBodies == null)
- {
- throw new RuntimeException("No ContentBody has been set");
- }
-
- if (index > _contentBodies.size() - 1 || index < 0)
- {
- throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
- (_contentBodies.size() - 1));
- }
- WeakReference<ContentChunk> wr = _contentBodies.get(index);
- ContentChunk cb = wr.get();
- if (cb == null)
- {
- cb = _messageStore.getContentBodyChunk(context, _messageId, index);
- _contentBodies.set(index, new WeakReference<ContentChunk>(cb));
- }
- return cb;
- }
-
- /**
- * Content bodies are set <i>before</i> the publish and header frames
- *
- * @param storeContext
- * @param contentChunk
- * @param isLastContentBody
- * @throws AMQException
- */
- public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
- {
- if (_contentBodies == null && isLastContentBody)
- {
- _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1);
- }
- else
- {
- if (_contentBodies == null)
- {
- _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
- }
- }
- _contentBodies.add(new WeakReference<ContentChunk>(contentChunk));
- _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
- contentChunk, isLastContentBody);
- }
-
- public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
- {
- MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null);
- if (bpb == null)
- {
- MessageMetaData mmd = loadMessageMetaData(context);
-
- bpb = mmd.getMessagePublishInfo();
- }
- return bpb;
- }
-
- public boolean isRedelivered()
- {
- return _redelivered;
- }
-
- public void setRedelivered(boolean redelivered)
- {
- _redelivered = redelivered;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- /**
- * This is called when all the content has been received.
- *
- * @param publishBody
- * @param contentHeaderBody
- * @throws AMQException
- */
- public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo publishBody,
- ContentHeaderBody contentHeaderBody)
- throws AMQException
- {
- // if there are no content bodies the list will be null so we must
- // create en empty list here
- if (contentHeaderBody.bodySize == 0)
- {
- _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
- }
-
- final long arrivalTime = System.currentTimeMillis();
-
- MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), arrivalTime);
-
- _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
-
-
- populateFromMessageMetaData(mmd);
- }
-
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- _messageStore.removeMessage(storeContext, _messageId);
- }
-
- public long getArrivalTime()
- {
- return _arrivalTime;
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index f23983641b..9de2d09b8e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -27,8 +27,8 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -93,7 +93,7 @@ public class DerbyMessageStore implements MessageStore
private String _connectionURL;
-
+ MessageFactory _messageFactory;
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
@@ -167,6 +167,8 @@ public class DerbyMessageStore implements MessageStore
// this recovers durable queues and persistent messages
+ _messageFactory = new MessageFactory();
+
recover();
stateTransition(State.RECOVERING, State.STARTED);
@@ -1299,7 +1301,7 @@ public class DerbyMessageStore implements MessageStore
private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
throws SQLException, AMQException
{
- Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
+ Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
List<ProcessAction> actions = new ArrayList<ProcessAction>();
Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
@@ -1318,8 +1320,6 @@ public class DerbyMessageStore implements MessageStore
conn = newConnection();
}
-
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
long maxId = 1;
TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
@@ -1355,7 +1355,11 @@ public class DerbyMessageStore implements MessageStore
}
else
{
- message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
+ message = _messageFactory.createMessage(messageId, this, true);
+
+ _logger.error("todo must do message recovery now.");
+ //todo must do message recovery now.
+
msgMap.put(messageId,message);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index b5a91c8da6..d46ba85069 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
@@ -349,32 +348,15 @@ public class Show extends AbstractCommand
arrival.add("" + msg.getArrivalTime());
- try
- {
- ispersitent.add(msg.isPersistent() ? "true" : "false");
- }
- catch (AMQException e)
- {
- ispersitent.add("n/a");
- }
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
isredelivered.add(entry.isRedelivered() ? "true" : "false");
isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
-// msg.getMessageHandle();
-
BasicContentHeaderProperties headers = null;
- try
- {
- headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
- }
- catch (AMQException e)
- {
- //ignore
-// commandError("Unable to read properties for message: " + e.getMessage(), null);
- }
+ headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
if (headers != null)
{
@@ -414,15 +396,7 @@ public class Show extends AbstractCommand
AMQShortString useridSS = headers.getUserId();
userid.add(useridSS == null ? "null" : useridSS.toString());
- MessagePublishInfo info = null;
- try
- {
- info = msg.getMessagePublishInfo();
- }
- catch (AMQException e)
- {
- //ignore
- }
+ MessagePublishInfo info = msg.getMessagePublishInfo();
if (info != null)
{