diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-05-09 15:33:17 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-05-09 15:33:17 +0000 |
| commit | 6b4d8ff004e42b3f7c22b5632f317e81f1589ce3 (patch) | |
| tree | 26ce3d66664e8dd9ea2f8f3edd61601396c31cf0 /java/broker/src | |
| parent | 1bb43f379eda76afca9df488cd9510f60ad89717 (diff) | |
| download | qpid-python-6b4d8ff004e42b3f7c22b5632f317e81f1589ce3.tar.gz | |
Merged revisions 536506 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r536506 | ritchiem | 2007-05-09 13:32:27 +0100 (Wed, 09 May 2007) | 10 lines
QPID-25 TimeToLive Basic implementation.
Messages are not automatically purged rather they are checked as they are selected for delivery. If they have expired they are dequeued.
AMQChannel - Update to call setExpiration on the message so the time can be adjusted if client & broker clocks are out of sync.
AMQMessage - Caches the _expiration time for internal use, adjusted for broker time. This leaves message headers unchanged so receiving client can see actual value requested by producer.
ConcurrentSelectorDeliveryManager - Updated to check for expired messages when getNextMessage is called. Immediate messages are NOT expired.
Subscription - Added method to getChannel that this Subscription is attatched to so we can retrieve the StoreContext for dequeue-ing the message.
TimeToLiveTest - Test of Time to live. Sends 50 msgs. 1 non-timed 48 1 second and 1 non-timed ensure only 2 msgs come back after 2 seconds
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536567 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
5 files changed, 81 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 6cf9fe6bf7..1cd098a64f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -228,6 +228,8 @@ public class AMQChannel _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } _currentMessage.setContentHeaderBody(contentHeaderBody); + _currentMessage.setExpiration(); + routeCurrentMessage(); _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 282569eed5..6ffe1af018 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -34,6 +35,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.messageStore.StorableMessage; import org.apache.qpid.server.messageStore.StorableQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; /** Combines the information that make up a deliverable message into a more manageable form. */ @@ -100,12 +102,42 @@ public class AMQMessage implements StorableMessage private final int hashcode = System.identityHashCode(this); + private long _expiration; public String debugIdentity() { return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; } + public void setExpiration() + { + long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); + long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); + + if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false)) + { + _expiration = expiration; + } + else + { + // Update TTL to be in broker time. + if (expiration != 0L) + { + if (timestamp != 0L) + { + //todo perhaps use arrival time + long diff = (System.currentTimeMillis() - timestamp); + + if (diff > 1000L || diff < 1000L) + { + _expiration = expiration + diff; + } + } + } + } + + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -212,8 +244,6 @@ public class AMQMessage implements StorableMessage _immediate = info.isImmediate(); _transientMessageData.setMessagePublishInfo(info); -// _taken = new AtomicBoolean(false); - } /** @@ -731,10 +761,35 @@ public class AMQMessage implements StorableMessage return _messageHandle.getArrivalTime(); } - /** - * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). + * Checks to see if the message has expired. If it has the message is dequeued. + * + * @param storecontext + * @param queue + * + * @return true if the message has expire + * + * @throws AMQException */ + public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException + { + //note: If the storecontext isn't need then we can remove the getChannel() from Subscription. + + if (_expiration != 0L) + { + long now = System.currentTimeMillis(); + + if (now > _expiration) + { + dequeue(storecontext, queue); + return true; + } + } + + return false; + } + + /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */ public void setDeliveredToConsumer() { _deliveredToConsumer = true; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 1f92cee1df..bdc2189676 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -434,13 +434,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return count; } - /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */ + /** + * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. + * + * @return the next message or null + * + * @throws org.apache.qpid.AMQException + */ private AMQMessage getNextMessage() throws AMQException { return getNextMessage(_messages, null); } - private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) + private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException { AMQMessage message = messages.peek(); @@ -449,9 +455,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager && ( ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) || sub == null) - && message.taken(_queue, sub)) + && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired + || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired + ) { - //remove the already taken message + //remove the already taken message or expired AMQMessage removed = messages.poll(); assert removed == message; @@ -466,6 +474,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // try the next message message = messages.peek(); } + return message; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index e6d5d0c88d..77688f19be 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.util.Queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; public interface Subscription { @@ -57,4 +58,6 @@ public interface Subscription void addToResendQueue(AMQMessage msg); Object getSendLock(); + + AMQChannel getChannel(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index c496996002..a7be9f2ad2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -668,5 +668,9 @@ public class SubscriptionImpl implements Subscription return _sendLock; } + public AMQChannel getChannel() + { + return channel; + } } |
