diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-02-13 11:24:44 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-02-13 11:24:44 +0000 |
| commit | fc9058fc3df68f6c8c0fae455f34f751b584698e (patch) | |
| tree | 949703225e5c97f96f6220376c8817aeed74ac0c /java/broker/src/main | |
| parent | 9b19b4a3d0b15ee73d7186302443c2fc4d8fab75 (diff) | |
| download | qpid-python-fc9058fc3df68f6c8c0fae455f34f751b584698e.tar.gz | |
QPID-1629 : Convered AMQMessage to Interface and created concrete Transient/PersistentAMQMessage implementations
Removed the use of WeakReferences from PersistentAMQMessage and therefore the need to have a StoreContext on get requests.
NOTE: this checking will break persistent recovery.
Coverted all uses of *MessageHandle to AMQMessage. A number of tests (SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message) still use a custom constructor on Transient/PersistentAMQMessage. This is because they have their own Message implemntations that are used for testing. However, I'm sure they could be modified to override the required functionality rather than attempt to use the existing Factory and Wrap the resulting Message. A new JIRA to address this QPID-1659.
QPID-1628 : The update to MessageFactory removes the commented out code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@744079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
16 files changed, 696 insertions, 960 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5fde08cbdd..3b290b3d51 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -35,12 +35,12 @@ import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.UnauthorizedAccessException; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; @@ -108,7 +108,7 @@ public class AMQChannel private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); - private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); + private MessageFactory _messageHandleFactory = new MessageFactory(); // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 4949e5b41d..ea94f23ff9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -28,7 +28,6 @@ package org.apache.qpid.server.output.amqp0_8; import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -37,8 +36,6 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import org.apache.mina.common.ByteBuffer;
-
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -79,11 +76,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -100,7 +93,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter // Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -112,7 +105,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -126,8 +119,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
@@ -135,7 +126,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -150,7 +141,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter // Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -162,7 +153,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -179,7 +170,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQMessage message = queueEntry.getMessage();
final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicDeliverBody deliverBody =
@@ -188,18 +178,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter queueEntry.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
-
-
- return deliverFrame;
+ return deliverBody.generateFrame(channelId);
}
private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final AMQMessage message = queueEntry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicGetOkBody getOkBody =
@@ -208,9 +194,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -231,9 +215,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
+
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 00a15d2d50..b71b118275 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -28,9 +28,7 @@ import java.util.Iterator; import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -77,12 +75,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
-
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
@@ -99,7 +92,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter // Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
@@ -111,7 +104,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -123,9 +116,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
+ return ContentHeaderBody.createAMQFrame(channelId, contentHeaderBody);
}
@@ -133,15 +124,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter {
final AMQMessage message = queueEntry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ final int bodyCount = message.getBodyCount();
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -156,7 +145,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter // Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+ ContentChunk cb = message.getContentChunk(0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -168,7 +157,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter //
for(int i = 1; i < bodyCount; i++)
{
- cb = messageHandle.getContentChunk(storeContext, i);
+ cb = message.getContentChunk(i);
writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
}
@@ -190,7 +179,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQShortString exchangeName = pb.getExchange();
final AMQShortString routingKey = pb.getRoutingKey();
- final AMQBody returnBlock = new AMQBody()
+ return new AMQBody()
{
public AMQBody _underlyingBody;
@@ -238,7 +227,6 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter throw new AMQException("This block should never be dispatched!");
}
};
- return returnBlock;
}
private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
@@ -253,9 +241,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter pb.getExchange(),
pb.getRoutingKey(),
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
- return getOkFrame;
+ return getOkBody.generateFrame(channelId);
}
public byte getProtocolMinorVersion()
@@ -276,9 +262,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
- return returnFrame;
+ return basicReturnBody.generateFrame(channelId);
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index d73d37f48d..2bd6e612f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,363 +20,52 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.TransactionalContext; - +import org.apache.qpid.AMQException; import java.util.Iterator; -import java.util.concurrent.atomic.AtomicInteger; -/** - * A deliverable message. - */ -public class AMQMessage +public interface AMQMessage { - /** Used for debugging purposes. */ - private static final Logger _log = Logger.getLogger(AMQMessage.class); - - private final AtomicInteger _referenceCount = new AtomicInteger(1); - - private final AMQMessageHandle _messageHandle; - - /** Holds the transactional context in which this message is being processed. */ - private StoreContext _storeContext; - - /** Flag to indicate that this message requires 'immediate' delivery. */ - - private static final byte IMMEDIATE = 0x01; - - /** - * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality - * for messages published with the 'immediate' flag. - */ - - private static final byte DELIVERED_TO_CONSUMER = 0x02; - - private byte _flags = 0; - - private long _expiration; - - private final long _size; - - private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; - private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); - - - - /** - * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory - * therefore is memory-efficient. - */ - private class BodyFrameIterator implements Iterator<AMQDataBlock> - { - private int _channel; - - private int _index = -1; - private AMQProtocolSession _protocolSession; - - private BodyFrameIterator(AMQProtocolSession protocolSession, int channel) - { - _channel = channel; - _protocolSession = protocolSession; - } - - public boolean hasNext() - { - try - { - return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1); - } - catch (AMQException e) - { - _log.error("Unable to get body count: " + e, e); - - return false; - } - } - - public AMQDataBlock next() - { - try - { - - AMQBody cb = - getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), - ++_index)); - - return new AMQFrame(_channel, cb); - } - catch (AMQException e) - { - // have no choice but to throw a runtime exception - throw new RuntimeException("Error getting content body: " + e, e); - } - - } - - private ProtocolVersionMethodConverter getProtocolVersionMethodConverter() - { - return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - public void clearStoreContext() - { - _storeContext = new StoreContext(); - } - - public StoreContext getStoreContext() - { - return _storeContext; - } - - private class BodyContentIterator implements Iterator<ContentChunk> - { - - private int _index = -1; - - public boolean hasNext() - { - try - { - return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1); - } - catch (AMQException e) - { - _log.error("Error getting body count: " + e, e); - - return false; - } - } - - public ContentChunk next() - { - try - { - return _messageHandle.getContentChunk(getStoreContext(), ++_index); - } - catch (AMQException e) - { - throw new RuntimeException("Error getting content body: " + e, e); - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - - - /** - * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal - * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to - * queues. - * - * @param messageId - * @param store - * @param factory - * - * @throws AMQException - */ - public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) - throws AMQException - { - _messageHandle = factory.createMessageHandle(messageId, store, true); - _storeContext = txnConext.getStoreContext(); - _size = _messageHandle.getBodySize(txnConext.getStoreContext()); - } - - /** - * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal - * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to - * queues. - * - * @param messageHandle - * - * @throws AMQException - */ - public AMQMessage( - AMQMessageHandle messageHandle, - StoreContext storeConext, - MessagePublishInfo info) - throws AMQException - { - _messageHandle = messageHandle; - _storeContext = storeConext; - - if(info.isImmediate()) - { - _flags |= IMMEDIATE; - } - _size = messageHandle.getBodySize(storeConext); - - } + //Get Content relating to this message + Long getMessageId(); - protected AMQMessage(AMQMessage msg) throws AMQException - { - _messageHandle = msg._messageHandle; - _storeContext = msg._storeContext; - _flags = msg._flags; - _size = msg._size; + Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel); - } + Iterator<ContentChunk> getContentBodyIterator(); + ContentHeaderBody getContentHeaderBody(); - public String debugIdentity() - { - return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")"; - } + ContentChunk getContentChunk(int index); - public void setExpiration(final long expiration) - { + Object getPublisherClientInstance(); - _expiration = expiration; + Object getPublisherIdentifier(); - } + MessagePublishInfo getMessagePublishInfo(); - public boolean isReferenced() - { - return _referenceCount.get() > 0; - } + int getBodyCount(); - public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) - { - return new BodyFrameIterator(protocolSession, channel); - } + long getSize(); - public Iterator<ContentChunk> getContentBodyIterator() - { - return new BodyContentIterator(); - } + long getArrivalTime(); - public ContentHeaderBody getContentHeaderBody() throws AMQException - { - return _messageHandle.getContentHeaderBody(getStoreContext()); - } - - public Long getMessageId() - { - return _messageHandle.getMessageId(); - } - - /** - * Creates a long-lived reference to this message, and increments the count of such references, as an atomic - * operation. - */ - public AMQMessage takeReference() - { - incrementReference(); // _referenceCount.incrementAndGet(); - - return this; - } - - public boolean incrementReference() - { - return incrementReference(1); - } - - /* Threadsafe. Increment the reference count on the message. */ - public boolean incrementReference(int count) - { - if(_referenceCount.addAndGet(count) <= 1) - { - _referenceCount.addAndGet(-count); - return false; - } - else - { - return true; - } - - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - * - * @param storeContext - * - * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that - * failed - */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException - { - - int count = _referenceCount.decrementAndGet(); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE/2); - - try - { - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - if (_messageHandle != null) - { - _messageHandle.removeMessage(storeContext); - } - } - catch (AMQException e) - { - // to maintain consistency, we revert the count - incrementReference(); - throw new MessageCleanupException(getMessageId(), e); - } - } - else - { - if (count < 0) - { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() - + " has gone below 0."); - } - } - } - + //Check the status of this message /** * Called selectors to determin if the message has already been sent * * @return _deliveredToConsumer */ - public boolean getDeliveredToConsumer() - { - return (_flags & DELIVERED_TO_CONSUMER) != 0; - } - - public boolean isPersistent() throws AMQException - { - return _messageHandle.isPersistent(); - } + boolean getDeliveredToConsumer(); /** * Called to enforce the 'immediate' flag. @@ -384,89 +73,62 @@ public class AMQMessage * @returns true if the message is marked for immediate delivery but has not been marked as delivered * to a consumer */ - public boolean immediateAndNotDelivered() - { - - return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; - - } - - public MessagePublishInfo getMessagePublishInfo() throws AMQException - { - return _messageHandle.getMessagePublishInfo(getStoreContext()); - } - - public long getArrivalTime() - { - return _messageHandle.getArrivalTime(); - } + boolean immediateAndNotDelivered(); /** * Checks to see if the message has expired. If it has the message is dequeued. * - * @param queue The queue to check the expiration against. (Currently not used) - * * @return true if the message has expire * - * @throws AMQException + * @throws org.apache.qpid.AMQException */ - public boolean expired(AMQQueue queue) throws AMQException - { + boolean expired() throws AMQException; - if (_expiration != 0L) - { - long now = System.currentTimeMillis(); - - return (now > _expiration); - } + /** Is this a persistent message + * + * @return true if the message is persistent + */ + boolean isPersistent(); - return false; - } /** * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). * And for selector efficiency. */ - public void setDeliveredToConsumer() - { - _flags |= DELIVERED_TO_CONSUMER; - } + void setDeliveredToConsumer(); + + void setExpiration(long expiration); + + void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier); + + /** + * This is called when all the content has been received. + * @param storeContext + *@param messagePublishInfo + * @param contentHeaderBody @throws org.apache.qpid.AMQException + */ + void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) + throws AMQException; + void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) + throws AMQException; - public AMQMessageHandle getMessageHandle() - { - return _messageHandle; - } + void removeMessage(StoreContext storeContext) throws AMQException; - public long getSize() - { - return _size; + String toString(); - } + String debugIdentity(); - public Object getPublisherClientInstance() - { - return _sessionIdentifier.getSessionInstance(); - } - - public Object getPublisherIdentifier() - { - return _sessionIdentifier.getSessionIdentifier(); - } + // Reference counting methods - public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier) - { - _sessionIdentifier = sessionIdentifier; - } + void decrementReference(StoreContext storeContext) throws MessageCleanupException; + boolean incrementReference(int queueCount); - public String toString() - { - // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + - // _taken + " by :" + _takenBySubcription; + boolean incrementReference(); - return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount; - } + AMQMessage takeReference(); + boolean isReferenced(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index f5853bd303..a08719875d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -353,29 +353,20 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } } - try + // Create header attributes list + CommonContentHeaderProperties headerProperties = + (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; + String mimeType = null, encoding = null; + if (headerProperties != null) { - // Create header attributes list - CommonContentHeaderProperties headerProperties = - (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) - { - AMQShortString mimeTypeShortSting = headerProperties.getContentType(); - mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); - encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); - } + AMQShortString mimeTypeShortSting = headerProperties.getContentType(); + mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); + encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); + } - Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; - return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); - } - catch (AMQException e) - { - JMException jme = new JMException("Error creating header attributes list: " + e); - jme.initCause(e); - throw jme; - } + return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); } /** @@ -392,27 +383,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que List<QueueEntry> list = _queue.getMessagesOnTheQueue(); TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); - try + // Create the tabular list of message header contents + for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { - // Create the tabular list of message header contents - for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) - { - QueueEntry queueEntry = list.get(i - 1); - AMQMessage msg = queueEntry.getMessage(); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, - queueEntry.isRedelivered() }; - CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); - _messageList.put(messageData); - } - } - catch (AMQException e) - { - JMException jme = new JMException("Error creating message contents: " + e); - jme.initCause(e); - throw jme; + QueueEntry queueEntry = list.get(i - 1); + AMQMessage msg = queueEntry.getMessage(); + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + String[] headerAttributes = getMessageHeaderProperties(headerBody); + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, + queueEntry.isRedelivered() }; + CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); + _messageList.put(messageData); } return _messageList; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java deleted file mode 100644 index 1092f67d94..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.LinkedList; -import java.util.List; -import java.util.Collections; -import java.util.ArrayList; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.store.StoreContext; - -/** - */ -public class InMemoryMessageHandle implements AMQMessageHandle -{ - - private ContentHeaderBody _contentHeaderBody; - - private MessagePublishInfo _messagePublishInfo; - - private List<ContentChunk> _contentBodies; - - private boolean _redelivered; - - private long _arrivalTime; - - private final Long _messageId; - - public InMemoryMessageHandle(final Long messageId) - { - _messageId = messageId; - } - - public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException - { - return _contentHeaderBody; - } - - public Long getMessageId() - { - return _messageId; - } - - public int getBodyCount(StoreContext context) - { - return _contentBodies.size(); - } - - public long getBodySize(StoreContext context) throws AMQException - { - return getContentHeaderBody(context).bodySize; - } - - public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException - { - if(_contentBodies == null) - { - throw new RuntimeException("No ContentBody has been set"); - } - - if (index > _contentBodies.size() - 1 || index < 0) - { - throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + - (_contentBodies.size() - 1)); - } - return _contentBodies.get(index); - } - - public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody) - throws AMQException - { - if(_contentBodies == null) - { - if(isLastContentBody) - { - _contentBodies = Collections.singletonList(contentBody); - } - else - { - _contentBodies = new ArrayList<ContentChunk>(); - _contentBodies.add(contentBody); - } - } - else - { - _contentBodies.add(contentBody); - } - } - - public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException - { - return _messagePublishInfo; - } - - public boolean isPersistent() - { - return false; - } - - /** - * This is called when all the content has been received. - * @param messagePublishInfo - * @param contentHeaderBody - * @throws AMQException - */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, - ContentHeaderBody contentHeaderBody) - throws AMQException - { - _messagePublishInfo = messagePublishInfo; - _contentHeaderBody = contentHeaderBody; - if(contentHeaderBody.bodySize == 0) - { - _contentBodies = Collections.EMPTY_LIST; - } - _arrivalTime = System.currentTimeMillis(); - } - - public void removeMessage(StoreContext storeContext) throws AMQException - { - // NO OP - } - - public long getArrivalTime() - { - return _arrivalTime; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index b994040131..aad99da6c3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -35,7 +35,6 @@ import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import java.util.ArrayList; -import java.util.Collection; public class IncomingMessage implements Filterable<RuntimeException> { @@ -48,7 +47,7 @@ public class IncomingMessage implements Filterable<RuntimeException> private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; - private AMQMessageHandle _messageHandle; + private AMQMessage _message; private final Long _messageId; private final TransactionalContext _txnContext; @@ -74,7 +73,6 @@ public class IncomingMessage implements Filterable<RuntimeException> private Exchange _exchange; - public IncomingMessage(final Long messageId, final MessagePublishInfo info, final TransactionalContext txnContext, @@ -124,11 +122,11 @@ public class IncomingMessage implements Filterable<RuntimeException> } public void routingComplete(final MessageStore store, - final MessageHandleFactory factory) throws AMQException + final MessageFactory factory) throws AMQException { final boolean persistent = isPersistent(); - _messageHandle = factory.createMessageHandle(_messageId, store, persistent); + _message = factory.createMessage(_messageId, store, persistent); if (persistent) { _txnContext.beginTranIfNecessary(); @@ -157,21 +155,16 @@ public class IncomingMessage implements Filterable<RuntimeException> _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues); } - AMQMessage message = null; - try { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks - _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), - _messagePublishInfo, getContentHeaderBody()); + _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody()); + - - - message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo); - message.setExpiration(_expiration); - message.setClientIdentifier(_publisher.getSessionIdentifier()); + _message.setExpiration(_expiration); + _message.setClientIdentifier(_publisher.getSessionIdentifier()); // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery @@ -182,7 +175,7 @@ public class IncomingMessage implements Filterable<RuntimeException> if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString())) { - throw new UnauthorizedAccessException("Acccess Refused",message); + throw new UnauthorizedAccessException("Acccess Refused", _message); } if ((_destinationQueues == null) || _destinationQueues.size() == 0) @@ -190,26 +183,26 @@ public class IncomingMessage implements Filterable<RuntimeException> if (isMandatory() || isImmediate()) { - throw new NoRouteException("No Route for message", message); + throw new NoRouteException("No Route for message", _message); } else { - _logger.warn("MESSAGE DISCARDED: No routes for message - " + message); + _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message); } } else { int offset; final int queueCount = _destinationQueues.size(); - message.incrementReference(queueCount); + _message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; } else { - offset = ((int)(message.getMessageId().longValue())) % queueCount; + offset = ((int)(_message.getMessageId().longValue())) % queueCount; if(offset < 0) { offset = -offset; @@ -218,22 +211,21 @@ public class IncomingMessage implements Filterable<RuntimeException> for (int i = offset; i < queueCount; i++) { // normal deliver so add this message at the end. - _txnContext.deliver(_destinationQueues.get(i), message); + _txnContext.deliver(_destinationQueues.get(i), _message); } for (int i = 0; i < offset; i++) { // normal deliver so add this message at the end. - _txnContext.deliver(_destinationQueues.get(i), message); + _txnContext.deliver(_destinationQueues.get(i), _message); } } - message.clearStoreContext(); - return message; + return _message; } finally { // Remove refence for routing process . Reference count should now == delivered queue count - if(message != null) message.decrementReference(_txnContext.getStoreContext()); + if(_message != null) _message.decrementReference(_txnContext.getStoreContext()); } } @@ -244,7 +236,7 @@ public class IncomingMessage implements Filterable<RuntimeException> _bodyLengthReceived += contentChunk.getSize(); - _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived()); + _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java index 7573a629c1..e18834874f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java @@ -21,29 +21,21 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; -/** - * Constructs a message handle based on the publish body, the content header and the queue to which the message - * has been routed. - * - * @author Robert Greig (robert.j.greig@jpmorgan.com) - */ -public class MessageHandleFactory +public class MessageFactory { - public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent) + public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent) { - // just hardcoded for now if (persistent) { - return new WeakReferenceMessageHandle(messageId, store); + return new PersistentAMQMessage(messageId, store); } else { - return new InMemoryMessageHandle(messageId); + return new TransientAMQMessage(messageId); } - -// return new AMQMessage(messageId, store, persistent); } - + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6f9efd3200..e33b0c83c7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -47,20 +47,13 @@ public enum NotificationCheck if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
-
+ long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
+ maximumMessageSize + ") breached. [Message ID=" +
+ (msg == null ? "null" : msg.getMessageId()) + "]");
return true;
}
}
@@ -110,7 +103,7 @@ public enum NotificationCheck }
}
return false;
-
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java new file mode 100644 index 0000000000..04e3635f92 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; + +public class PersistentAMQMessage extends TransientAMQMessage +{ + protected MessageStore _messageStore; + + public PersistentAMQMessage(Long messageId, MessageStore store) + { + super(messageId); + _messageStore = store; + } + + @Override + public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) + throws AMQException + { + super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody); + _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1, + contentChunk, isLastContentBody); + } + + @Override + public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, + ContentHeaderBody contentHeaderBody) + throws AMQException + { + super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody); + MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime); + + _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); + } + + @Override + public void removeMessage(StoreContext storeContext) throws AMQException + { + _messageStore.removeMessage(storeContext, _messageId); + } + + @Override + public boolean isPersistent() + { + return true; + } + + public void recoverFromMessageMetaData(MessageMetaData mmd) + { + _arrivalTime = mmd.getArrivalTime(); + _contentHeaderBody = mmd.getContentHeaderBody(); + _messagePublishInfo = mmd.getMessagePublishInfo(); + } + + public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException + { + super.addContentBodyFrame(null, contentChunk, isLastContentBody); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index fd46a8a5ff..7be2827e0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -54,25 +54,16 @@ public class PriorityQueueList implements QueueEntryList public QueueEntry add(AMQMessage message) { - try + int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; + if(index >= _priorities) { - int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; - if(index >= _priorities) - { - index = _priorities-1; - } - else if(index < 0) - { - index = 0; - } - return _priorityLists[index].add(message); + index = _priorities-1; } - catch (AMQException e) + else if(index < 0) { - // TODO - fix AMQ Exception - throw new RuntimeException(e); + index = 0; } - + return _priorityLists[index].add(message); } public QueueEntry next(QueueEntry node) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index fe9686e906..ba14be5580 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -132,7 +132,7 @@ public class QueueEntryImpl implements QueueEntry public boolean expired() throws AMQException { - return getMessage().expired(getQueue()); + return getMessage().expired(); } public boolean isAcquired() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java new file mode 100644 index 0000000000..8c62e046f8 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -0,0 +1,469 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.StoreContext; + +import java.util.Iterator; +import java.util.List; +import java.util.Collections; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A deliverable message. + */ +public class TransientAMQMessage implements AMQMessage +{ + /** Used for debugging purposes. */ + private static final Logger _log = Logger.getLogger(AMQMessage.class); + + private final AtomicInteger _referenceCount = new AtomicInteger(1); + + protected ContentHeaderBody _contentHeaderBody; + + protected MessagePublishInfo _messagePublishInfo; + + protected List<ContentChunk> _contentBodies; + + protected long _arrivalTime; + + protected final Long _messageId; + + + + /** Flag to indicate that this message requires 'immediate' delivery. */ + + private static final byte IMMEDIATE = 0x01; + + /** + * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality + * for messages published with the 'immediate' flag. + */ + + private static final byte DELIVERED_TO_CONSUMER = 0x02; + + private byte _flags = 0; + + private long _expiration; + + private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; + private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + + /** + * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory + * therefore is memory-efficient. + */ + private class BodyFrameIterator implements Iterator<AMQDataBlock> + { + private int _channel; + + private int _index = -1; + private AMQProtocolSession _protocolSession; + + private BodyFrameIterator(AMQProtocolSession protocolSession, int channel) + { + _channel = channel; + _protocolSession = protocolSession; + } + + public boolean hasNext() + { + return _index < (getBodyCount() - 1); + } + + public AMQDataBlock next() + { + AMQBody cb = + getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index)); + + return new AMQFrame(_channel, cb); + } + + private ProtocolVersionMethodConverter getProtocolVersionMethodConverter() + { + return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + private class BodyContentIterator implements Iterator<ContentChunk> + { + + private int _index = -1; + + public boolean hasNext() + { + return _index < (getBodyCount() - 1); + } + + public ContentChunk next() + { + return getContentChunk(++_index); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + /** + * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message + * These all need refactoring to some sort of MockAMQMessageFactory. + */ + @Deprecated + protected TransientAMQMessage(AMQMessage message) throws AMQException + { + _messageId = message.getMessageId(); + _flags = ((TransientAMQMessage)message)._flags; + _contentHeaderBody = message.getContentHeaderBody(); + _messagePublishInfo = message.getMessagePublishInfo(); + } + + + /** + * Normal message creation via the MessageFactory uses this constructor + * Package scope limited as MessageFactory should be used + * @see MessageFactory + * + * @param messageId + */ + TransientAMQMessage(Long messageId) + { + _messageId = messageId; + } + + public String debugIdentity() + { + return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")"; + } + + public void setExpiration(final long expiration) + { + _expiration = expiration; + } + + public boolean isReferenced() + { + return _referenceCount.get() > 0; + } + + public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) + { + return new BodyFrameIterator(protocolSession, channel); + } + + public Iterator<ContentChunk> getContentBodyIterator() + { + return new BodyContentIterator(); + } + + + public ContentHeaderBody getContentHeaderBody() + { + return _contentHeaderBody; + } + + public Long getMessageId() + { + return _messageId; + } + + /** + * Creates a long-lived reference to this message, and increments the count of such references, as an atomic + * operation. + */ + public AMQMessage takeReference() + { + incrementReference(); // _referenceCount.incrementAndGet(); + + return this; + } + + public boolean incrementReference() + { + return incrementReference(1); + } + + /* Threadsafe. Increment the reference count on the message. */ + public boolean incrementReference(int count) + { + if(_referenceCount.addAndGet(count) <= 1) + { + _referenceCount.addAndGet(-count); + return false; + } + else + { + return true; + } + + } + + /** + * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the + * message store. + * + * @param storeContext + * + * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed + */ + public void decrementReference(StoreContext storeContext) throws MessageCleanupException + { + + int count = _referenceCount.decrementAndGet(); + + // note that the operation of decrementing the reference count and then removing the message does not + // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after + // the message has been passed to all queues. i.e. we are + // not relying on the all the increments having taken place before the delivery manager decrements. + if (count == 0) + { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _referenceCount.set(Integer.MIN_VALUE/2); + + try + { + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op + removeMessage(storeContext); + } + catch (AMQException e) + { + // to maintain consistency, we revert the count + incrementReference(); + throw new MessageCleanupException(getMessageId(), e); + } + } + else + { + if (count < 0) + { + throw new MessageCleanupException("Reference count for message id " + debugIdentity() + + " has gone below 0."); + } + } + } + + + /** + * Called selectors to determin if the message has already been sent + * + * @return _deliveredToConsumer + */ + public boolean getDeliveredToConsumer() + { + return (_flags & DELIVERED_TO_CONSUMER) != 0; + } + + /** + * Called to enforce the 'immediate' flag. + * + * @returns true if the message is marked for immediate delivery but has not been marked as delivered + * to a consumer + */ + public boolean immediateAndNotDelivered() + { + + return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; + + } + + /** + * Checks to see if the message has expired. If it has the message is dequeued. + * + * @return true if the message has expire + * + * @throws AMQException + */ + public boolean expired() throws AMQException + { + + if (_expiration != 0L) + { + long now = System.currentTimeMillis(); + + return (now > _expiration); + } + + return false; + } + + /** + * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). + * And for selector efficiency. + */ + public void setDeliveredToConsumer() + { + _flags |= DELIVERED_TO_CONSUMER; + } + + + public long getSize() + { + return _contentHeaderBody.bodySize; + } + + public Object getPublisherClientInstance() + { + return _sessionIdentifier.getSessionInstance(); + } + + public Object getPublisherIdentifier() + { + return _sessionIdentifier.getSessionIdentifier(); + } + + public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier) + { + _sessionIdentifier = sessionIdentifier; + } + + /** From AMQMessageHandle **/ + + public int getBodyCount() + { + return _contentBodies.size(); + } + + public ContentChunk getContentChunk(int index) + { + if(_contentBodies == null) + { + throw new RuntimeException("No ContentBody has been set"); + } + + if (index > _contentBodies.size() - 1 || index < 0) + { + throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + + (_contentBodies.size() - 1)); + } + return _contentBodies.get(index); + } + + public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) + throws AMQException + { + if(_contentBodies == null) + { + if(isLastContentBody) + { + _contentBodies = Collections.singletonList(contentChunk); + } + else + { + _contentBodies = new ArrayList<ContentChunk>(); + _contentBodies.add(contentChunk); + } + } + else + { + _contentBodies.add(contentChunk); + } + } + + public MessagePublishInfo getMessagePublishInfo() + { + return _messagePublishInfo; + } + + public boolean isPersistent() + { + return false; + } + + /** + * This is called when all the content has been received. + * @param storeContext + *@param messagePublishInfo + * @param contentHeaderBody @throws AMQException + */ + public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, + ContentHeaderBody contentHeaderBody) + throws AMQException + { + + if (contentHeaderBody == null) + { + throw new NullPointerException("HeaderBody cannot be null"); + } + + if( messagePublishInfo == null) + { + throw new NullPointerException("PublishInfo cannot be null"); + } + + _messagePublishInfo = messagePublishInfo; + _contentHeaderBody = contentHeaderBody; + + + if( contentHeaderBody.bodySize == 0) + { + _contentBodies = Collections.EMPTY_LIST; + } + + _arrivalTime = System.currentTimeMillis(); + + if(messagePublishInfo.isImmediate()) + { + _flags |= IMMEDIATE; + } + } + + public long getArrivalTime() + { + return _arrivalTime; + } + + public void removeMessage(StoreContext storeContext) throws AMQException + { + //no-op + } + + + public String toString() + { + // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + + // _taken + " by :" + _takenBySubcription; + + return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java deleted file mode 100644 index 804d2c2131..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; - -/** - * @author Robert Greig (robert.j.greig@jpmorgan.com) - */ -public class WeakReferenceMessageHandle implements AMQMessageHandle -{ - private WeakReference<ContentHeaderBody> _contentHeaderBody; - - private WeakReference<MessagePublishInfo> _messagePublishInfo; - - private List<WeakReference<ContentChunk>> _contentBodies; - - private boolean _redelivered; - - private final MessageStore _messageStore; - - private final Long _messageId; - private long _arrivalTime; - - public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore) - { - _messageId = messageId; - _messageStore = messageStore; - } - - public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException - { - ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null); - if (chb == null) - { - MessageMetaData mmd = loadMessageMetaData(context); - chb = mmd.getContentHeaderBody(); - } - return chb; - } - - public Long getMessageId() - { - return _messageId; - } - - private MessageMetaData loadMessageMetaData(StoreContext context) - throws AMQException - { - MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId); - populateFromMessageMetaData(mmd); - return mmd; - } - - private void populateFromMessageMetaData(MessageMetaData mmd) - { - _arrivalTime = mmd.getArrivalTime(); - _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody()); - _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo()); - } - - public int getBodyCount(StoreContext context) throws AMQException - { - if (_contentBodies == null) - { - MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId); - int chunkCount = mmd.getContentChunkCount(); - _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount); - for (int i = 0; i < chunkCount; i++) - { - _contentBodies.add(new WeakReference<ContentChunk>(null)); - } - } - return _contentBodies.size(); - } - - public long getBodySize(StoreContext context) throws AMQException - { - return getContentHeaderBody(context).bodySize; - } - - public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException - { - if(_contentBodies == null) - { - throw new RuntimeException("No ContentBody has been set"); - } - - if (index > _contentBodies.size() - 1 || index < 0) - { - throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + - (_contentBodies.size() - 1)); - } - WeakReference<ContentChunk> wr = _contentBodies.get(index); - ContentChunk cb = wr.get(); - if (cb == null) - { - cb = _messageStore.getContentBodyChunk(context, _messageId, index); - _contentBodies.set(index, new WeakReference<ContentChunk>(cb)); - } - return cb; - } - - /** - * Content bodies are set <i>before</i> the publish and header frames - * - * @param storeContext - * @param contentChunk - * @param isLastContentBody - * @throws AMQException - */ - public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException - { - if (_contentBodies == null && isLastContentBody) - { - _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1); - } - else - { - if (_contentBodies == null) - { - _contentBodies = new LinkedList<WeakReference<ContentChunk>>(); - } - } - _contentBodies.add(new WeakReference<ContentChunk>(contentChunk)); - _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1, - contentChunk, isLastContentBody); - } - - public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException - { - MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null); - if (bpb == null) - { - MessageMetaData mmd = loadMessageMetaData(context); - - bpb = mmd.getMessagePublishInfo(); - } - return bpb; - } - - public boolean isRedelivered() - { - return _redelivered; - } - - public void setRedelivered(boolean redelivered) - { - _redelivered = redelivered; - } - - public boolean isPersistent() - { - return true; - } - - /** - * This is called when all the content has been received. - * - * @param publishBody - * @param contentHeaderBody - * @throws AMQException - */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo publishBody, - ContentHeaderBody contentHeaderBody) - throws AMQException - { - // if there are no content bodies the list will be null so we must - // create en empty list here - if (contentHeaderBody.bodySize == 0) - { - _contentBodies = new LinkedList<WeakReference<ContentChunk>>(); - } - - final long arrivalTime = System.currentTimeMillis(); - - MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), arrivalTime); - - _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); - - - populateFromMessageMetaData(mmd); - } - - public void removeMessage(StoreContext storeContext) throws AMQException - { - _messageStore.removeMessage(storeContext, _messageId); - } - - public long getArrivalTime() - { - return _arrivalTime; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index f23983641b..9de2d09b8e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -27,8 +27,8 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -93,7 +93,7 @@ public class DerbyMessageStore implements MessageStore private String _connectionURL; - + MessageFactory _messageFactory; private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; @@ -167,6 +167,8 @@ public class DerbyMessageStore implements MessageStore // this recovers durable queues and persistent messages + _messageFactory = new MessageFactory(); + recover(); stateTransition(State.RECOVERING, State.STARTED); @@ -1299,7 +1301,7 @@ public class DerbyMessageStore implements MessageStore private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues) throws SQLException, AMQException { - Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>(); + Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>(); List<ProcessAction> actions = new ArrayList<ProcessAction>(); Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>(); @@ -1318,8 +1320,6 @@ public class DerbyMessageStore implements MessageStore conn = newConnection(); } - - MessageHandleFactory messageHandleFactory = new MessageHandleFactory(); long maxId = 1; TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null); @@ -1355,7 +1355,11 @@ public class DerbyMessageStore implements MessageStore } else { - message = new AMQMessage(messageId, this, messageHandleFactory, txnContext); + message = _messageFactory.createMessage(messageId, this, true); + + _logger.error("todo must do message recovery now."); + //todo must do message recovery now. + msgMap.put(messageId,message); } diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index b5a91c8da6..d46ba85069 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.QueueEntryImpl; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.tools.messagestore.MessageStoreTool; @@ -349,32 +348,15 @@ public class Show extends AbstractCommand arrival.add("" + msg.getArrivalTime()); - try - { - ispersitent.add(msg.isPersistent() ? "true" : "false"); - } - catch (AMQException e) - { - ispersitent.add("n/a"); - } + ispersitent.add(msg.isPersistent() ? "true" : "false"); isredelivered.add(entry.isRedelivered() ? "true" : "false"); isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); -// msg.getMessageHandle(); - BasicContentHeaderProperties headers = null; - try - { - headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties); - } - catch (AMQException e) - { - //ignore -// commandError("Unable to read properties for message: " + e.getMessage(), null); - } + headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties); if (headers != null) { @@ -414,15 +396,7 @@ public class Show extends AbstractCommand AMQShortString useridSS = headers.getUserId(); userid.add(useridSS == null ? "null" : useridSS.toString()); - MessagePublishInfo info = null; - try - { - info = msg.getMessagePublishInfo(); - } - catch (AMQException e) - { - //ignore - } + MessagePublishInfo info = msg.getMessagePublishInfo(); if (info != null) { |
