summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-28 20:43:20 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-28 20:43:20 +0000
commit6f7106e1ab47566a82cf6373b25dff1dc345fb25 (patch)
tree6f822e0160f7d681503985228908f3e99397d5bb /qpid/java
parentff054df6bad7466b7aacd5cdabe162397b25c88a (diff)
downloadqpid-python-6f7106e1ab47566a82cf6373b25dff1dc345fb25.tar.gz
QPID-5934 : [Java Broker] Allow TTL to be overridden on a per-Queue basis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1614166 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java52
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java122
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html16
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js5
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html8
8 files changed, 207 insertions, 13 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index db53f840de..e98f6cd19b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -46,6 +46,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
+ String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
+ String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
@ManagedAttribute
Exchange getAlternateExchange();
@@ -135,16 +137,17 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute( defaultValue = "DEFAULT" )
MessageDurability getMessageDurability();
+ @ManagedAttribute
+ long getMinimumMessageTtl();
+ @ManagedAttribute
+ long getMaximumMessageTtl();
//children
Collection<? extends Binding> getBindings();
- // TODO - Undo this commented out line when we stop supporting 1.6 for compilation
- // In 1.6 this causes the build to break at AbstractQueue because the 1.6 compiler can't work out that
- // the definition in terms of the Consumer implementation meets both this, and the contract for AMQQueue
- // Collection<? extends Consumer> getConsumers();
+ Collection<? extends Consumer> getConsumers();
//operations
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 2aeca6f45f..11d5cc733f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -230,6 +230,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private String _messageGroupDefaultGroup;
@ManagedAttributeField
private int _maximumDistinctGroups;
+ @ManagedAttributeField
+ private long _minimumMessageTtl;
+ @ManagedAttributeField
+ private long _maximumMessageTtl;
+
private State _state = State.UNINITIALIZED;
private final AtomicBoolean _recovering = new AtomicBoolean(true);
@@ -547,6 +552,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
+ public long getMinimumMessageTtl()
+ {
+ return _minimumMessageTtl;
+ }
+
+ @Override
+ public long getMaximumMessageTtl()
+ {
+ return _maximumMessageTtl;
+ }
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -967,6 +984,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
final QueueEntry entry = getEntries().add(message);
+ updateExpiration(entry);
try
{
@@ -1011,6 +1029,40 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
+ private void updateExpiration(final QueueEntry entry)
+ {
+ long expiration = entry.getMessage().getExpiration();
+ long arrivalTime = entry.getMessage().getArrivalTime();
+ if(_minimumMessageTtl != 0l)
+ {
+ if(arrivalTime == 0)
+ {
+ arrivalTime = System.currentTimeMillis();
+ }
+ if(expiration != 0l)
+ {
+ long calculatedExpiration = arrivalTime+_minimumMessageTtl;
+ if(calculatedExpiration > expiration)
+ {
+ entry.setExpiration(calculatedExpiration);
+ expiration = calculatedExpiration;
+ }
+ }
+ }
+ if(_maximumMessageTtl != 0l)
+ {
+ if(arrivalTime == 0)
+ {
+ arrivalTime = System.currentTimeMillis();
+ }
+ long calculatedExpiration = arrivalTime+_maximumMessageTtl;
+ if(expiration == 0l || expiration > calculatedExpiration)
+ {
+ entry.setExpiration(calculatedExpiration);
+ }
+ }
+ }
+
/**
* iterate over consumers and if any is at the end of the queue and can deliver this message,
* then deliver the message
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index d984cf8ab4..3ddcb98b53 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -36,4 +36,5 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
QueueEntry getNextValidEntry();
+ void setExpiration(long calculatedExpiration);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index e6cde6c934..49644f8d76 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -128,6 +128,11 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
+ public void setExpiration(long expiration)
+ {
+ _expiration = expiration;
+ }
+
public InstanceProperties getInstanceProperties()
{
return new EntryInstanceProperties();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index f20285660a..4cbc9ae57b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -21,32 +21,41 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Matchers.contains;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
-import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -859,6 +868,101 @@ abstract class AbstractQueueTestBase extends QpidTestCase
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
+
+ public void testMaximumMessageTtl() throws Exception
+ {
+
+ // Test scenarios where only the maximum TTL has been set
+
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideMaximumTTl");
+ attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 10000l);
+
+ AMQQueue queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has been incorrectly overriden", 55000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+ long tooLateExpiration = System.currentTimeMillis() + 20000l;
+
+ assertTrue("TTL has not been overriden", tooLateExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
+
+ long acceptableExpiration = System.currentTimeMillis() + 5000l;
+
+ assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+
+ // Test the scenarios where only the minimum TTL has been set
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideMinimumTTl");
+ attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
+
+ queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has been overriden incorrectly", 0l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+ long unacceptableExpiration = System.currentTimeMillis() + 5000l;
+
+ assertTrue("TTL has not been overriden", unacceptableExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
+
+ acceptableExpiration = System.currentTimeMillis() + 20000l;
+
+ assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+
+
+ // Test the scenarios where both the minimum and maximum TTL have been set
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideBothTTl");
+ attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
+ attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 20000l);
+
+ queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has not been overriden", 70000l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+
+
+ }
+
+ private long getExpirationOnQueue(final AMQQueue queue, long arrivalTime, long expiration)
+ {
+ final List<QueueEntry> entries = new ArrayList<>();
+
+ ServerMessage message = createMessage(1l);
+ when(message.getArrivalTime()).thenReturn(arrivalTime);
+ when(message.getExpiration()).thenReturn(expiration);
+ queue.enqueue(message,null);
+ queue.visit(new QueueEntryVisitor()
+ {
+ @Override
+ public boolean visit(final QueueEntry entry)
+ {
+ entries.add(entry);
+ return true;
+ }
+ });
+ assertEquals("Expected only one entry in the queue", 1, entries.size());
+
+ Long entryExpiration =
+ (Long) entries.get(0).getInstanceProperties().getProperty(InstanceProperties.Property.EXPIRATION);
+
+ queue.clearQueue();
+ entries.clear();
+ return entryExpiration;
+ }
+
/**
* A helper method to put given number of messages into queue
* <p>
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
index c32c1d3bba..8d7d47a530 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
@@ -44,6 +44,22 @@
</td>
</tr>
<tr>
+ <td valign="top"><strong>Max Ttl: </strong></td>
+ <td><input type="text" required="false" name="maximumMessageTtl" id="formAddQueue.maximumMessageTtl" placeholder="Ttl in ms."
+ dojoType="dijit.form.ValidationTextBox"
+ trim="true"
+ regexp="[0-9]+"
+ invalidMessage= "Invalid value" /></td>
+ </tr>
+ <tr>
+ <td valign="top"><strong>Min Ttl: </strong></td>
+ <td><input type="text" required="false" name="minimumMessageTtl" id="formAddQueue.minimumMessageTtl" placeholder="Ttl in ms."
+ dojoType="dijit.form.ValidationTextBox"
+ trim="true"
+ regexp="[0-9]+"
+ invalidMessage= "Invalid value" /></td>
+ </tr>
+ <tr>
<td valign="top"><strong>Queue Type: </strong></td>
<td>
<input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index 6c0924d09b..3371445e5b 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -278,6 +278,8 @@ define(["dojo/_base/xhr",
"state",
"durable",
"messageDurability",
+ "maximumMessageTtl",
+ "minimumMessageTtl",
"exclusive",
"owner",
"lifetimePolicy",
@@ -353,6 +355,9 @@ define(["dojo/_base/xhr",
this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ;
this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ]));
this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ]));
+ this.minimumMessageTtl.innerHTML = entities.encode(String(this.queueData[ "minimumMessageTtl" ]));
+ this.maximumMessageTtl.innerHTML = entities.encode(String(this.queueData[ "maximumMessageTtl" ]));
+
this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ;
this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 8f2bf3364d..3a978f4fb5 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -37,6 +37,14 @@
<div class="messageDurability" style="float:left;"></div>
</div>
<div style="clear:both">
+ <div class="formLabel-labelCell" style="float:left; width: 150px;">Enforced Max. Ttl(ms):</div>
+ <div class="maximumMessageTtl" style="float:left;"></div>
+ </div>
+ <div style="clear:both">
+ <div class="formLabel-labelCell" style="float:left; width: 150px;">Enforced Min. Ttl(ms):</div>
+ <div class="minimumMessageTtl" style="float:left;"></div>
+ </div>
+ <div style="clear:both">
<div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div>
<div class="exclusive" style="float:left;"></div>
</div>