summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-05-09 15:33:17 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-05-09 15:33:17 +0000
commit6b4d8ff004e42b3f7c22b5632f317e81f1589ce3 (patch)
tree26ce3d66664e8dd9ea2f8f3edd61601396c31cf0 /java/broker/src
parent1bb43f379eda76afca9df488cd9510f60ad89717 (diff)
downloadqpid-python-6b4d8ff004e42b3f7c22b5632f317e81f1589ce3.tar.gz
Merged revisions 536506 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r536506 | ritchiem | 2007-05-09 13:32:27 +0100 (Wed, 09 May 2007) | 10 lines QPID-25 TimeToLive Basic implementation. Messages are not automatically purged rather they are checked as they are selected for delivery. If they have expired they are dequeued. AMQChannel - Update to call setExpiration on the message so the time can be adjusted if client & broker clocks are out of sync. AMQMessage - Caches the _expiration time for internal use, adjusted for broker time. This leaves message headers unchanged so receiving client can see actual value requested by producer. ConcurrentSelectorDeliveryManager - Updated to check for expired messages when getNextMessage is called. Immediate messages are NOT expired. Subscription - Added method to getChannel that this Subscription is attatched to so we can retrieve the StoreContext for dequeue-ing the message. TimeToLiveTest - Test of Time to live. Sends 50 msgs. 1 non-timed 48 1 second and 1 non-timed ensure only 2 msgs come back after 2 seconds ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536567 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java63
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java4
5 files changed, 81 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 6cf9fe6bf7..1cd098a64f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -228,6 +228,8 @@ public class AMQChannel
_log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
_currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setExpiration();
+
routeCurrentMessage();
_currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 282569eed5..6ffe1af018 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -34,6 +35,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.messageStore.StorableMessage;
import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
/** Combines the information that make up a deliverable message into a more manageable form. */
@@ -100,12 +102,42 @@ public class AMQMessage implements StorableMessage
private final int hashcode = System.identityHashCode(this);
+ private long _expiration;
public String debugIdentity()
{
return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
+ public void setExpiration()
+ {
+ long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+ {
+ _expiration = expiration;
+ }
+ else
+ {
+ // Update TTL to be in broker time.
+ if (expiration != 0L)
+ {
+ if (timestamp != 0L)
+ {
+ //todo perhaps use arrival time
+ long diff = (System.currentTimeMillis() - timestamp);
+
+ if (diff > 1000L || diff < 1000L)
+ {
+ _expiration = expiration + diff;
+ }
+ }
+ }
+ }
+
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -212,8 +244,6 @@ public class AMQMessage implements StorableMessage
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
-// _taken = new AtomicBoolean(false);
-
}
/**
@@ -731,10 +761,35 @@ public class AMQMessage implements StorableMessage
return _messageHandle.getArrivalTime();
}
-
/**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @param storecontext
+ * @param queue
+ *
+ * @return true if the message has expire
+ *
+ * @throws AMQException
*/
+ public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+ {
+ //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ if (now > _expiration)
+ {
+ dequeue(storecontext, queue);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 1f92cee1df..bdc2189676 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -434,13 +434,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return count;
}
- /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
+ /**
+ * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+ *
+ * @return the next message or null
+ *
+ * @throws org.apache.qpid.AMQException
+ */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
{
AMQMessage message = messages.peek();
@@ -449,9 +455,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
&& (
((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
|| sub == null)
- && message.taken(_queue, sub))
+ && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
+ || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
+ )
{
- //remove the already taken message
+ //remove the already taken message or expired
AMQMessage removed = messages.poll();
assert removed == message;
@@ -466,6 +474,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// try the next message
message = messages.peek();
}
+
return message;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index e6d5d0c88d..77688f19be 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import java.util.Queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
public interface Subscription
{
@@ -57,4 +58,6 @@ public interface Subscription
void addToResendQueue(AMQMessage msg);
Object getSendLock();
+
+ AMQChannel getChannel();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index c496996002..a7be9f2ad2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -668,5 +668,9 @@ public class SubscriptionImpl implements Subscription
return _sendLock;
}
+ public AMQChannel getChannel()
+ {
+ return channel;
+ }
}