summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-08 13:15:35 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-08 13:15:35 +0000
commit8468c9b9b067489aef2227456f662bfdbdc71272 (patch)
tree370659fd9ea9784d6e608f049a870433aa821353 /qpid/java/broker-core/src
parente7375322dc1083dbfffe49a903d4737a6943907e (diff)
downloadqpid-python-8468c9b9b067489aef2227456f662bfdbdc71272.tar.gz
QPID-3978 : [Java Broker] Allow for acquired messages to be removed from a queue due to TTL or management actions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java56
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java34
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java12
11 files changed, 221 insertions, 15 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index faf5a724f3..f8585344b0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -29,6 +29,8 @@ public interface ConsumerTarget
{
+ void acquisitionRemoved(MessageInstance node);
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 4ee47e05e9..1bf451948d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -51,6 +51,8 @@ public interface MessageInstance
boolean isAcquiredBy(ConsumerImpl consumer);
+ boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+
void setRedelivered();
boolean isRedelivered();
@@ -67,6 +69,10 @@ public interface MessageInstance
boolean acquire(ConsumerImpl sub);
+ boolean lockAcquisition();
+
+ boolean unlockAcquisition();
+
int getMaximumDeliveryCount();
int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
@@ -99,6 +105,7 @@ public interface MessageInstance
State currentState = getState();
return currentState == State.DEQUEUED || currentState == State.DELETED;
}
+
}
@@ -162,10 +169,12 @@ public interface MessageInstance
public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
{
private final C _consumer;
+ private final LockedAcquiredState<C> _lockedState;
public ConsumerAcquiredState(C consumer)
{
_consumer = consumer;
+ _lockedState = new LockedAcquiredState<>(this);
}
@@ -183,6 +192,43 @@ public interface MessageInstance
{
return "{" + getState().name() + " : " + _consumer +"}";
}
+
+ public LockedAcquiredState<C> getLockedState()
+ {
+ return _lockedState;
+ }
+
+ }
+
+ public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+ {
+ private final ConsumerAcquiredState<C> _acquiredState;
+
+ public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+ {
+ _acquiredState = acquiredState;
+ }
+
+ @Override
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public C getConsumer()
+ {
+ return _acquiredState.getConsumer();
+ }
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+ }
+
+ public ConsumerAcquiredState<C> getUnlockedState()
+ {
+ return _acquiredState;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 54f3c4de09..8483e35b9e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, entry, false);
+ if(sub.acquires())
+ {
+ entry.unlockAcquisition();
+ }
}
}
}
@@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
else
{
deliverMessage(sub, node, batch);
+ if(sub.acquires())
+ {
+ node.unlockAcquisition();
+ }
}
}
@@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (!node.isDeleted())
{
// If the node has expired then acquire it
- if (node.expired() && node.acquire())
+ if (node.expired())
{
- if (_logger.isDebugEnabled())
+ boolean acquiredForDequeueing = node.acquire();
+ if(!acquiredForDequeueing && node.getDeliveredToConsumer())
+ {
+ QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer();
+ acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer);
+ if(acquiredForDequeueing)
+ {
+ consumer.acquisitionRemoved(node);
+ }
+ }
+
+ if(acquiredForDequeueing)
{
- _logger.debug("Dequeuing expired node " + node);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeuing expired node " + node);
+ }
+ // Then dequeue it.
+ dequeueEntry(node);
}
- // Then dequeue it.
- dequeueEntry(node);
}
else
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 5ffbc0dbaa..71b7636159 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -39,6 +39,8 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
void send(QueueEntry entry, boolean batch);
+ void acquisitionRemoved(QueueEntry node);
+
void queueDeleted();
SubFlushRunner getRunner();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 55782ac095..d80aa92007 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -477,6 +477,13 @@ class QueueConsumerImpl
}
@Override
+ public void acquisitionRemoved(final QueueEntry node)
+ {
+ _target.acquisitionRemoved(node);
+ _queue.decrementUnackedMsgCount(node);
+ }
+
+ @Override
public String getDistributionMode()
{
return _distributionMode;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 49644f8d76..96916a02e2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -210,7 +210,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public boolean acquire(ConsumerImpl sub)
{
- final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
+ final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState());
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -218,17 +218,57 @@ public abstract class QueueEntryImpl implements QueueEntry
return acquired;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState());
+ }
+ return state instanceof LockedAcquiredState;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof LockedAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState());
+ }
+ return false;
+ }
+
public boolean acquiredByConsumer()
{
- return (_state instanceof ConsumerAcquiredState);
+ return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
}
+ @Override
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
- return state instanceof ConsumerAcquiredState
- && ((ConsumerAcquiredState)state).getConsumer() == consumer;
+ return (state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ || (state instanceof LockedAcquiredState
+ && ((LockedAcquiredState)state).getConsumer() == consumer);
+ }
+
+ @Override
+ public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ {
+ return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+ }
+ else
+ {
+ return false;
+ }
}
public void release()
@@ -238,7 +278,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof ConsumerAcquiredState)
+ if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -268,6 +308,10 @@ public abstract class QueueEntryImpl implements QueueEntry
{
return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
}
+ else if (state instanceof LockedAcquiredState)
+ {
+ return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
+ }
else
{
return null;
@@ -312,7 +356,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- if (state instanceof ConsumerAcquiredState)
+ if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index f15f608907..b72d44debf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -953,15 +955,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
op.withinTransaction(new Transaction()
{
- public void dequeue(final MessageInstance entry)
+ public void dequeue(final MessageInstance messageInstance)
{
- if(entry.acquire())
+ boolean acquired = messageInstance.acquire();
+ if(!acquired && messageInstance instanceof QueueEntry)
+ {
+ QueueEntry entry = (QueueEntry) messageInstance;
+ QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer();
+ acquired = messageInstance.removeAcquisitionFromConsumer(consumer);
+ if(acquired)
+ {
+ consumer.acquisitionRemoved((QueueEntry)messageInstance);
+ }
+ }
+ if(acquired)
{
- txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
- entry.delete();
+ messageInstance.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index e6afbc6e90..c36f87c4ae 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -181,6 +181,12 @@ public class MockConsumer implements ConsumerTarget
}
+ @Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+
+ }
+
public State getState()
{
return _state;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 74a2262265..37c4eeb127 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -65,6 +65,12 @@ public class MockMessageInstance implements MessageInstance
return false;
}
+ @Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return false;
+ }
+
public void delete()
{
@@ -81,6 +87,18 @@ public class MockMessageInstance implements MessageInstance
return false;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
public boolean isAvailable()
{
return false;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 3189010284..3a9f990846 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -137,6 +137,40 @@ public abstract class QueueEntryImplTestBase extends TestCase
return consumer;
}
+
+ public void testLocking()
+ {
+ QueueConsumer consumer = newConsumer();
+ QueueConsumer consumer2 = newConsumer();
+
+ _queueEntry.acquire(consumer);
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+
+ assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
+ assertFalse("Acquisition should not be able to be removed from the wrong consumer",_queueEntry.removeAcquisitionFromConsumer(consumer2));
+ assertTrue("Acquisition should be able to be removed once unlocked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired());
+ assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer());
+
+ _queueEntry.release();
+
+ assertFalse("Hijacked queue entry should be able to be released", _queueEntry.isAcquired());
+
+ _queueEntry.acquire(consumer);
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+
+ assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition());
+ assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+
+ _queueEntry.delete();
+ assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted());
+ }
+
/**
* A helper method to get entry state
*
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index d328e21a94..ce1c95e674 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -342,5 +342,17 @@ public class StandardQueueTest extends AbstractQueueTestBase
return super.acquire(sub);
}
}
+
+ @Override
+ public boolean lockAcquisition()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return true;
+ }
}
}