diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-23 17:53:42 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-23 17:53:42 +0000 |
| commit | 027c4c9398c0c15ed285d8a99df22d98e469cb0f (patch) | |
| tree | d1e1d1e7b60f96c1c9c8ac0c0d2c4cd500d2eb0f /qpid | |
| parent | 1dae32d6fd23383f759650607a7cc38e85ac3f79 (diff) | |
| download | qpid-python-027c4c9398c0c15ed285d8a99df22d98e469cb0f.tar.gz | |
QPID-5504 : simplify QueueEntry to remove discard/dispose/dequeue and only leave delete as the correct way to remove entries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
32 files changed, 302 insertions, 362 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java index 6958ac106b..589e888059 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java @@ -23,8 +23,6 @@ package org.apache.qpid.server.filter; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryInstanceProperties; public interface Filterable { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java index 6941ed119c..1d7b8627f4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java @@ -50,14 +50,7 @@ public interface InstanceProperties public static InstanceProperties fromMap(Map<Property, Object> map) { final Map<Property,Object> props = new EnumMap<Property,Object>(map); - return new InstanceProperties() - { - @Override - public Object getProperty(final Property prop) - { - return props.get(prop); - } - }; + return new MapInstanceProperties(props); } public static Map<Property, Object> asMap(InstanceProperties props) @@ -75,5 +68,29 @@ public interface InstanceProperties return map; } + + public static InstanceProperties copy(InstanceProperties from) + { + final Map<Property,Object> props = asMap(from); + + return new MapInstanceProperties(props); + + } + + private static class MapInstanceProperties implements InstanceProperties + { + private final Map<Property, Object> _props; + + private MapInstanceProperties(final Map<Property, Object> props) + { + _props = props; + } + + @Override + public Object getProperty(final Property prop) + { + return _props.get(prop); + } + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 16151dbb63..859a7c784c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -767,7 +767,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { public void postCommit() { - entry.discard(); + entry.delete(); } public void onRollback() @@ -836,7 +836,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual public void postCommit() { - entry.discard(); + entry.delete(); } public void onRollback() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 000bfbfd6e..e4725e0e2a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -165,7 +165,7 @@ public class ConflationQueueList extends SimpleQueueEntryList @Override public void postCommit() { - entry.discard(); + entry.delete(); } @Override @@ -196,21 +196,14 @@ public class ConflationQueueList extends SimpleQueueEntryList } @Override - public boolean delete() + protected void onDelete() { - if(super.delete()) + if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) { - if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) - { - Object key = getMessage().getMessageHeader().getHeader(_conflationKey); - _latestValuesMap.remove(key,_latestValueReference); - } - return true; - } - else - { - return false; + Object key = getMessage().getMessageHeader().getHeader(_conflationKey); + _latestValuesMap.remove(key,_latestValueReference); } + } public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index d5c987026c..80ccbe1649 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; @@ -191,9 +192,6 @@ public interface QueueEntry extends Comparable<QueueEntry> boolean acquire(); boolean acquire(Subscription sub); - boolean delete(); - boolean isDeleted(); - boolean acquiredBySubscription(); boolean isAcquiredBy(Subscription subscription); @@ -209,11 +207,14 @@ public interface QueueEntry extends Comparable<QueueEntry> boolean isRejectedBy(long subscriptionId); - void dequeue(); - - void dispose(); + void delete(); - void discard(); + /** + * Returns true if entry is either DEQUED or DELETED state. + * + * @return true if entry is either DEQUED or DELETED state + */ + boolean isDeleted(); void routeToAlternate(); @@ -226,19 +227,6 @@ public interface QueueEntry extends Comparable<QueueEntry> void addStateChangeListener(StateChangeListener listener); boolean removeStateChangeListener(StateChangeListener listener); - /** - * Returns true if entry is in DEQUEUED state, otherwise returns false. - * - * @return true if entry is in DEQUEUED state, otherwise returns false - */ - boolean isDequeued(); - - /** - * Returns true if entry is either DEQUED or DELETED state. - * - * @return true if entry is either DEQUED or DELETED state - */ - boolean isDispensed(); /** * Number of times this queue entry has been delivered. @@ -253,4 +241,5 @@ public interface QueueEntry extends Comparable<QueueEntry> Filterable asFilterable(); + InstanceProperties getInstanceProperties(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 1b9b0f6daa..e7270e2028 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -25,13 +25,14 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import java.util.EnumMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -77,16 +78,13 @@ public abstract class QueueEntryImpl implements QueueEntry private volatile long _entryId; - private static final int DELIVERED_TO_CONSUMER = 1; - private static final int REDELIVERED = 2; - - private volatile int _deliveryState; + private final EntryInstanceProperties _instanceProperties = new EntryInstanceProperties(); /** Number of times this message has been delivered */ private volatile int _deliveryCount = 0; private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater .newUpdater(QueueEntryImpl.class, "_deliveryCount"); - + private boolean _deliveredToConsumer; public QueueEntryImpl(QueueEntryList<?> queueEntryList) @@ -103,12 +101,28 @@ public abstract class QueueEntryImpl implements QueueEntry _message = message == null ? null : message.newReference(); _entryIdUpdater.set(this, entryId); + populateInstanceProperties(); } public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; _message = message == null ? null : message.newReference(); + populateInstanceProperties(); + } + + private void populateInstanceProperties() + { + if(_message != null) + { + _instanceProperties.setProperty(InstanceProperties.Property.PERSISTENT, _message.getMessage().isPersistent()); + _instanceProperties.setProperty(InstanceProperties.Property.EXPIRATION, _message.getMessage().getExpiration()); + } + } + + public InstanceProperties getInstanceProperties() + { + return _instanceProperties; } protected void setEntryId(long entryId) @@ -138,7 +152,7 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean getDeliveredToConsumer() { - return (_deliveryState & DELIVERED_TO_CONSUMER) != 0; + return _deliveredToConsumer; } public boolean expired() throws AMQException @@ -190,7 +204,7 @@ public abstract class QueueEntryImpl implements QueueEntry final boolean acquired = acquire(sub.getOwningState()); if(acquired) { - _deliveryState |= DELIVERED_TO_CONSUMER; + _deliveredToConsumer = true; } return acquired; } @@ -244,12 +258,12 @@ public abstract class QueueEntryImpl implements QueueEntry public void setRedelivered() { - _deliveryState |= REDELIVERED; + _instanceProperties.setProperty(InstanceProperties.Property.REDELIVERED, Boolean.TRUE); } public boolean isRedelivered() { - return (_deliveryState & REDELIVERED) != 0; + return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED)); } public Subscription getDeliveredSubscription() @@ -297,7 +311,7 @@ public abstract class QueueEntryImpl implements QueueEntry } } - public void dequeue() + private void dequeue() { EntryState state = _state; @@ -329,21 +343,27 @@ public abstract class QueueEntryImpl implements QueueEntry } } - public void dispose() + private boolean dispose() { - if(delete()) + EntryState state = _state; + + if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { + _queueEntryList.entryDeleted(this); + onDelete(); _message.release(); + + return true; + } + else + { + return false; } } - public void discard() + public void delete() { - //if the queue is null then the message is waiting to be acked, but has been removed. - if (getQueue() != null) - { - dequeue(); - } + dequeue(); dispose(); } @@ -355,12 +375,11 @@ public abstract class QueueEntryImpl implements QueueEntry if (alternateExchange != null) { - QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(this); - List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), props); + List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties()); final ServerMessage message = getMessage(); if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null) { - queues = alternateExchange.getAlternateExchange().route(getMessage(), props); + queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties()); } @@ -397,7 +416,7 @@ public abstract class QueueEntryImpl implements QueueEntry { public void postCommit() { - discard(); + delete(); } public void onRollback() @@ -446,24 +465,8 @@ public abstract class QueueEntryImpl implements QueueEntry return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; } - public boolean isDeleted() + protected void onDelete() { - return _state == DELETED_STATE; - } - - public boolean delete() - { - EntryState state = _state; - - if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) - { - _queueEntryList.entryDeleted(this); - return true; - } - else - { - return false; - } } public QueueEntryList getQueueEntryList() @@ -471,12 +474,7 @@ public abstract class QueueEntryImpl implements QueueEntry return _queueEntryList; } - public boolean isDequeued() - { - return _state == DEQUEUED_STATE; - } - - public boolean isDispensed() + public boolean isDeleted() { return _state.isDispensed(); } @@ -499,7 +497,7 @@ public abstract class QueueEntryImpl implements QueueEntry @Override public Filterable asFilterable() { - return Filterable.Factory.newInstance(getMessage(), new QueueEntryInstanceProperties(this)); + return Filterable.Factory.newInstance(getMessage(), getInstanceProperties()); } public String toString() @@ -509,4 +507,21 @@ public abstract class QueueEntryImpl implements QueueEntry ", _state=" + _state + '}'; } + + private static class EntryInstanceProperties implements InstanceProperties + { + private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class); + + @Override + public Object getProperty(final Property prop) + { + return _properties.get(prop); + } + + private void setProperty(Property prop, Object value) + { + _properties.put(prop, value); + } + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java deleted file mode 100644 index 3affc69ed0..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.InstanceProperties; - -public class QueueEntryInstanceProperties implements InstanceProperties -{ - private final QueueEntry _entry; - - public QueueEntryInstanceProperties(final QueueEntry entry) - { - _entry = entry; - } - - @Override - public Object getProperty(final Property prop) - { - switch(prop) - { - case REDELIVERED: - return _entry.isRedelivered(); - case MANDATORY: - return false; - case PERSISTENT: - return _entry.getMessage().isPersistent(); - case IMMEDIATE: - return false; - case EXPIRATION: - return _entry.getMessage().getExpiration(); - } - return null; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index c0bc662039..4101437711 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1030,7 +1030,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDispensed()) + if (node != null && !node.isDeleted()) { entryList.add(node); } @@ -1153,7 +1153,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes while (queueListIterator.advance() && !filter.filterComplete()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDispensed() && filter.accept(node)) + if (!node.isDeleted() && filter.accept(node)) { entryList.add(node); } @@ -1170,7 +1170,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { QueueEntry node = queueListIterator.getNode(); - if(!node.isDispensed()) + if(!node.isDeleted()) { if(visitor.visit(node)) { @@ -1290,7 +1290,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void postCommit() { - node.discard(); + node.delete(); } public void onRollback() @@ -1361,11 +1361,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes for(final QueueEntry entry : entries) { - QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(entry); - List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), props); + List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null) { - queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(),props); + queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties()); } final ServerMessage message = entry.getMessage(); @@ -1403,7 +1402,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void postCommit() { - entry.discard(); + entry.delete(); } public void onRollback() @@ -1431,7 +1430,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void postCommit() { - entry.discard(); + entry.delete(); } public void onRollback() @@ -1908,7 +1907,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { QueueEntry node = queueListIterator.getNode(); // Only process nodes that are not currently deleted and not dequeued - if (!node.isDispensed()) + if (!node.isDeleted()) { // If the node has exired then acquire it if (node.expired() && node.acquire()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java index 4a10d31d37..251a1f55ed 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java @@ -57,7 +57,7 @@ public class SimpleQueueEntryImpl extends QueueEntryImpl { SimpleQueueEntryImpl next = getNextNode(); - while(next != null && next.isDispensed()) + while(next != null && next.isDeleted()) { final SimpleQueueEntryImpl newNext = next.getNextNode(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 7f742d455d..85559157a9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -283,14 +283,14 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl { synchronized(_lock) { - if(node.isDispensed() && _head != node) + if(node.isDeleted() && _head != node) { SortedQueueEntryImpl current = _head; SortedQueueEntryImpl next; while(current != null) { next = current.getNextValidEntry(); - if(current.compareTo(node)>0 && !current.isDispensed()) + if(current.compareTo(node)>0 && !current.isDeleted()) { break; } @@ -642,7 +642,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl if(!atTail()) { SortedQueueEntryImpl nextNode = next(_lastNode); - while(nextNode.isDispensed() && next(nextNode) != null) + while(nextNode.isDeleted() && next(nextNode) != null) { nextNode = next(nextNode); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java index 632b59d3fa..45a1978af1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java @@ -21,9 +21,11 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; public interface ClientDeliveryMethod { - void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException; + void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props, + final long deliveryTag) throws AMQException; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 39ca3197b4..d345148e5e 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -199,7 +199,7 @@ public class VirtualHostConfigRecoveryHandler implements public void postCommit() { - entry.discard(); + entry.delete(); } public void onRollback() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java index 6538724a71..d67c70c831 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java @@ -66,7 +66,7 @@ public class ConflationQueueListTest extends TestCase ServerMessage message = createTestServerMessage(null); QueueEntry addedEntry = _list.add(message); - addedEntry.discard(); + addedEntry.delete(); int numberOfEntries = countEntries(_list); assertEquals(0, numberOfEntries); @@ -86,7 +86,7 @@ public class ConflationQueueListTest extends TestCase ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); QueueEntry addedEntry = _list.add(message); - addedEntry.discard(); + addedEntry.delete(); int numberOfEntries = countEntries(_list); assertEquals(0, numberOfEntries); @@ -146,7 +146,7 @@ public class ConflationQueueListTest extends TestCase assertEquals(1, countEntries(_list)); assertEquals(1, _list.getLatestValuesMap().size()); - addedEntry.discard(); + addedEntry.delete(); assertEquals(0, countEntries(_list)); assertEquals(0, _list.getLatestValuesMap().size()); @@ -166,8 +166,8 @@ public class ConflationQueueListTest extends TestCase assertEquals(2, countEntries(_list)); assertEquals(2, _list.getLatestValuesMap().size()); - addedEntry1.discard(); - addedEntry2.discard(); + addedEntry1.delete(); + addedEntry2.delete(); assertEquals(0, countEntries(_list)); assertEquals(0, _list.getLatestValuesMap().size()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index bbaca38c79..2e3231e208 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; @@ -56,17 +57,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean delete() - { - return false; - } - - public void dequeue() - { - - } - - public void discard() + public void delete() { } @@ -76,11 +67,6 @@ public class MockQueueEntry implements QueueEntry } - public void dispose() - { - - } - public boolean expired() throws AMQException { return false; @@ -121,11 +107,6 @@ public class MockQueueEntry implements QueueEntry return false; } - public boolean isDeleted() - { - return false; - } - public boolean isQueueDeleted() { @@ -161,26 +142,6 @@ public class MockQueueEntry implements QueueEntry return false; } - - public void requeue() - { - - - } - - public void requeue(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - - public void setDeliveredToSubscription() - { - - - } - - public void setRedelivered() { @@ -213,12 +174,7 @@ public class MockQueueEntry implements QueueEntry _message = msg; } - public boolean isDequeued() - { - return false; - } - - public boolean isDispensed() + public boolean isDeleted() { return false; } @@ -252,6 +208,12 @@ public class MockQueueEntry implements QueueEntry @Override public Filterable asFilterable() { - return Filterable.Factory.newInstance(_message, new QueueEntryInstanceProperties(this)); + return Filterable.Factory.newInstance(_message, getInstanceProperties()); + } + + @Override + public InstanceProperties getInstanceProperties() + { + return InstanceProperties.EMPTY; } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index d348c3e67b..1c75c1ac6a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.QueueEntry.EntryState; import org.apache.qpid.server.subscription.MockSubscription; @@ -63,11 +64,6 @@ public abstract class QueueEntryImplTestBase extends TestCase acquire(); } - public void testDequeue() - { - dequeue(); - } - public void testDelete() { delete(); @@ -79,27 +75,12 @@ public abstract class QueueEntryImplTestBase extends TestCase * Entry in state ACQUIRED should be released and its status should be * changed to AVAILABLE. */ - public void testReleaseAquired() + public void testReleaseAcquired() { acquire(); _queueEntry.release(); assertTrue("Queue entry should be in AVAILABLE state after invoking of release method", - _queueEntry.isAvailable()); - } - - /** - * Tests release method for entry in dequeued state. - * <p> - * Invoking release on dequeued entry should not have any effect on its - * state. - */ - public void testReleaseDequeued() - { - dequeue(); - _queueEntry.release(); - EntryState state = getState(); - assertEquals("Invoking of release on entry in DEQUEUED state should not have any effect", - QueueEntry.DEQUEUED_STATE, state); + _queueEntry.isAvailable()); } /** @@ -126,17 +107,6 @@ public abstract class QueueEntryImplTestBase extends TestCase _queueEntry.isDeleted()); } - /** - * A helper method to put tested entry into dequeue state and assert the sate - */ - private void dequeue() - { - acquire(); - _queueEntry.dequeue(); - EntryState state = getState(); - assertEquals("Queue entry should be in DEQUEUED state after invoking of dequeue method", - QueueEntry.DEQUEUED_STATE, state); - } /** * A helper method to put tested entry into acquired state and assert the sate @@ -216,6 +186,9 @@ public abstract class QueueEntryImplTestBase extends TestCase { ServerMessage message = mock(ServerMessage.class); when(message.getMessageNumber()).thenReturn((long)i); + final MessageReference reference = mock(MessageReference.class); + when(reference.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(reference); QueueEntryImpl entry = queueEntryList.add(message); entries[i] = entry; } @@ -235,13 +208,13 @@ public abstract class QueueEntryImplTestBase extends TestCase } } - // delete second + // discard second entries[1].acquire(); entries[1].delete(); - // dequeue third + // discard third entries[2].acquire(); - entries[2].dequeue(); + entries[2].delete(); QueueEntry next = entries[0].getNextValidEntry(); assertEquals("expected forth entry",entries[3], next); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index beb5bda7ff..99e32855c7 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -156,7 +156,7 @@ public abstract class QueueEntryListTestBase extends TestCase if(counter++ % 2 == 0) { queueEntry.acquire(); - queueEntry.dequeue(); + queueEntry.delete(); } } @@ -225,7 +225,8 @@ public abstract class QueueEntryListTestBase extends TestCase assertNull(list.next(queueEntry2)); //'delete' the 2nd QueueEntry - assertTrue("Deleting node should have succeeded", queueEntry2.delete()); + queueEntry2.delete(); + assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted()); QueueEntryIterator<QueueEntry> iter = list.iterator(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index a7dcaf6d49..0d6cddb842 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -936,12 +936,8 @@ public class SimpleAMQQueueTest extends QpidTestCase { return new SimpleQueueEntryImpl(this, message) { - public boolean isDequeued() - { - return (message.getMessageNumber() % 2 == 0); - } - public boolean isDispensed() + public boolean isDeleted() { return (message.getMessageNumber() % 2 == 0); } @@ -1166,8 +1162,8 @@ public class SimpleAMQQueueTest extends QpidTestCase List<QueueEntry> entries = queue.getMessagesOnTheQueue(); QueueEntry entry = entries.get(dequeueMessageIndex); entry.acquire(); - entry.dequeue(); - assertTrue(entry.isDequeued()); + entry.delete(); + assertTrue(entry.isDeleted()); return entry; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index c2c2fc4b98..11ff7ed192 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -21,18 +21,24 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase { +public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase +{ private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException { + public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException + { ServerMessage message = mock(ServerMessage.class); when(message.getMessageNumber()).thenReturn((long)msgId); + final MessageReference reference = mock(MessageReference.class); + when(reference.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(reference); return queueEntryList.add(message); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index 63b3a7d165..ae282d5f37 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -132,37 +132,45 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase //requiring a scavenge once the requested threshold of 9 deletes is passed //Delete the 2nd message only - assertTrue("Failed to delete QueueEntry", entriesMap.remove(2).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,2)); verifyDeletedButPresentBeforeScavenge(head, 2); //Delete messages 12 to 14 - assertTrue("Failed to delete QueueEntry", entriesMap.remove(12).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,12)); verifyDeletedButPresentBeforeScavenge(head, 12); - assertTrue("Failed to delete QueueEntry", entriesMap.remove(13).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,13)); verifyDeletedButPresentBeforeScavenge(head, 13); - assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,14)); verifyDeletedButPresentBeforeScavenge(head, 14); //Delete message 20 only - assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,20)); verifyDeletedButPresentBeforeScavenge(head, 20); //Delete messages 81 to 84 - assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,81)); verifyDeletedButPresentBeforeScavenge(head, 81); - assertTrue("Failed to delete QueueEntry", entriesMap.remove(82).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,82)); verifyDeletedButPresentBeforeScavenge(head, 82); - assertTrue("Failed to delete QueueEntry", entriesMap.remove(83).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,83)); verifyDeletedButPresentBeforeScavenge(head, 83); - assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,84)); verifyDeletedButPresentBeforeScavenge(head, 84); //Delete message 99 - this is the 10th message deleted that is after the queue head //and so will invoke the scavenge() which is set to go after 9 previous deletions - assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete()); + assertTrue("Failed to delete QueueEntry", remove(entriesMap,99)); verifyAllDeletedMessagedNotPresent(head, entriesMap); } + + private boolean remove(Map<Integer,QueueEntry> entriesMap, int pos) + { + QueueEntry entry = entriesMap.remove(pos); + boolean wasDeleted = entry.isDeleted(); + entry.delete(); + return entry.isDeleted() && !wasDeleted; + } private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId) { @@ -211,6 +219,9 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase { ServerMessage message = mock(ServerMessage.class); when(message.getMessageNumber()).thenReturn((long)i); + final MessageReference reference = mock(MessageReference.class); + when(reference.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(reference); entries[i] = queueEntryList.add(message); } @@ -235,7 +246,7 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase // dequeue third entries[2].acquire(); - entries[2].dequeue(); + entries[2].delete(); SimpleQueueEntryImpl next = entries[2].getNextValidEntry(); assertEquals("expected forth entry", entries[3], next); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java index acd0ccbdeb..a84dd6c249 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java @@ -22,19 +22,22 @@ package org.apache.qpid.server.queue; import java.util.Collections; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class SortedQueueEntryImplTest extends QueueEntryImplTestBase { +public class SortedQueueEntryImplTest extends QueueEntryImplTestBase +{ public final static String keys[] = { "CCC", "AAA", "BBB" }; private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY"); - public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException { + public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException + { final ServerMessage message = mock(ServerMessage.class); AMQMessageHeader hdr = mock(AMQMessageHeader.class); when(message.getMessageHeader()).thenReturn(hdr); @@ -42,6 +45,9 @@ public class SortedQueueEntryImplTest extends QueueEntryImplTestBase { when(hdr.containsHeader(eq("KEY"))).thenReturn(true); when(hdr.getHeaderNames()).thenReturn(Collections.singleton("KEY")); + final MessageReference reference = mock(MessageReference.class); + when(reference.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(reference); return queueEntryList.add(message); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index f68973096a..17d0e5cb64 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryInstanceProperties; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -532,7 +531,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { restoreCredit(entry); } - entry.discard(); + entry.delete(); } public void onRollback() @@ -548,7 +547,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr entry.routeToAlternate(); if(entry.isAcquiredBy(this)) { - entry.discard(); + entry.delete(); } } @@ -581,11 +580,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr final ServerMessage msg = entry.getMessage(); if (alternateExchange != null) { - final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry)); + final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); if (destinationQueues == null || destinationQueues.isEmpty()) { - entry.discard(); + entry.delete(); logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); } @@ -602,7 +601,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } else { - entry.discard(); + entry.delete(); logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); } } @@ -787,7 +786,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { _unacknowledgedBytes.addAndGet(-entry.getSize()); _unacknowledgedCount.decrementAndGet(); - entry.discard(); + entry.delete(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index bb1d1949a2..5c674ef27e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -74,7 +74,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryInstanceProperties; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -736,7 +735,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { - unacked.discard(); + unacked.delete(); } } @@ -771,7 +770,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - unacked.discard(); + unacked.delete(); } } else @@ -1362,7 +1361,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { for(QueueEntry entry : _ackedMessages) { - entry.discard(); + entry.delete(); } } finally @@ -1571,19 +1570,19 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); - rejectedQueueEntry.discard(); + rejectedQueueEntry.delete(); return; } final List<? extends BaseQueue> destinationQueues = - altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry)); + altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties()); if (destinationQueues == null || destinationQueues.isEmpty()) { _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); - rejectedQueueEntry.discard(); + rejectedQueueEntry.delete(); return; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index f04475eb33..c3489d8c82 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -73,6 +73,8 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; @@ -88,7 +90,6 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -348,7 +349,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * Process the data block. * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}. * - * @throws an AMQConnectionException if unable to process the data block. In this case, + * @throws AMQConnectionException if unable to process the data block. In this case, * the connection is already closed by the time the exception is thrown. If any other * type of exception is thrown, the connection is not already closed. */ @@ -376,7 +377,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * Handle the supplied frame. * Adds this frame's channel to {@link #_channelsForCurrentMessage}. * - * @throws an AMQConnectionException if unable to process the data block. In this case, + * @throws AMQConnectionException if unable to process the data block. In this case, * the connection is already closed by the time the exception is thrown. If any other * type of exception is thrown, the connection is not already closed. */ @@ -1667,12 +1668,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _channelId = channelId; } - public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + @Override + public void deliverToClient(final Subscription sub, final ServerMessage message, + final InstanceProperties props, final long deliveryTag) throws AMQException { - registerMessageDelivered(entry.getMessage().getSize()); - _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag()); - entry.incrementDeliveryCount(); + registerMessageDelivered(message.getSize()); + _protocolOutputConverter.writeDeliver(message, + props, + _channelId, + deliveryTag, + ((SubscriptionImpl)sub).getConsumerTag()); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 5e416b52ca..ce0ef6cf50 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -115,7 +115,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor public void postCommit() { - node.discard(); + node.delete(); } public void onRollback() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index d48e8b3dea..3b087d263a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; -import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -36,6 +35,9 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.SubscriptionActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -132,7 +134,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry, deliveryTag); + sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -147,7 +149,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public static class NoAckSubscription extends SubscriptionImpl { - private volatile AutoCommitTransaction _txn; + private final AutoCommitTransaction _txn; public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, @@ -157,6 +159,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage throws AMQException { super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); + _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore()); } @@ -192,23 +195,22 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - if(_txn == null) - { - _txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); - } _txn.dequeue(getQueue(), entry.getMessage(), NOOP); - entry.dequeue(); + ServerMessage message = entry.getMessage(); + MessageReference ref = message.newReference(); + InstanceProperties props = entry.getInstanceProperties(); + entry.delete(); synchronized (getChannel()) { getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry, deliveryTag); + sendToClient(message, props, deliveryTag); } - entry.dispose(); + ref.release(); } @@ -301,8 +303,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage addUnacknowledgedMessage(entry); recordMessageDelivery(entry, deliveryTag); - sendToClient(entry, deliveryTag); - + sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + entry.incrementDeliveryCount(); } } @@ -688,12 +690,12 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } - protected void sendToClient(final QueueEntry entry, final long deliveryTag) + protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException { - _deliveryMethod.deliverToClient(this,entry,deliveryTag); + _deliveryMethod.deliverToClient(this, message, props, deliveryTag); _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(entry.getSize()); + _deliveredBytes.addAndGet(message.getSize()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 5238a41e49..4b569ccc71 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -28,10 +28,11 @@ import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -126,21 +127,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod() { - public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + @Override + public void deliverToClient(final Subscription sub, final ServerMessage message, final + InstanceProperties props, final long deliveryTag) throws AMQException { - singleMessageCredit.useCreditForMessage(entry.getMessage().getSize()); - if(entry.getMessage() instanceof AMQMessage) - { - session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(), - deliveryTag, queue.getMessageCount()); - entry.incrementDeliveryCount(); - } - else - { - //TODO Convert AMQP 0-10 message - throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Not implemented conversion of 0-10 message", null); - } + singleMessageCredit.useCreditForMessage(message.getSize()); + session.getProtocolOutputConverter().writeGetOk(message, + props, + channel.getChannelId(), + deliveryTag, + queue.getMessageCount()); + } }; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java index 0cfdff3338..f2ab154b32 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java @@ -79,7 +79,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); if(message != null) { - message.discard(); + message.delete(); } return; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java index 48e42ce5a3..2dae7d3e9a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java @@ -31,7 +31,9 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
@@ -44,10 +46,17 @@ public interface ProtocolOutputConverter ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ void writeDeliver(final ServerMessage msg,
+ final InstanceProperties props, int channelId,
+ long deliveryTag,
+ AMQShortString consumerTag)
throws AMQException;
- void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+ void writeGetOk(final ServerMessage msg,
+ final InstanceProperties props,
+ int channelId,
+ long deliveryTag,
+ int queueSize) throws AMQException;
byte getProtocolMinorVersion();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java index dd5e13e56a..290a859df6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java @@ -33,13 +33,13 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; import java.io.DataOutput; import java.io.IOException; @@ -64,24 +64,27 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _protocolSession; } - public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) + public void writeDeliver(final ServerMessage m, + final InstanceProperties props, int channelId, + long deliveryTag, + AMQShortString consumerTag) throws AMQException { - AMQMessage msg = convertToAMQMessage(entry); - AMQBody deliverBody = createEncodedDeliverBody(msg, entry.isRedelivered(), deliveryTag, consumerTag); + final AMQMessage msg = convertToAMQMessage(m); + final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); + AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag); writeMessageDelivery(msg, channelId, deliverBody); } - private AMQMessage convertToAMQMessage(QueueEntry entry) + private AMQMessage convertToAMQMessage(ServerMessage serverMessage) { - ServerMessage serverMessage = entry.getMessage(); if(serverMessage instanceof AMQMessage) { return (AMQMessage) serverMessage; } else { - return getMessageConverter(serverMessage).convert(serverMessage, entry.getQueue().getVirtualHost()); + return getMessageConverter(serverMessage).convert(serverMessage, _protocolSession.getVirtualHost()); } } @@ -186,10 +189,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } - public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException + public void writeGetOk(final ServerMessage msg, + final InstanceProperties props, + int channelId, + long deliveryTag, + int queueSize) throws AMQException { - AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); - writeMessageDelivery(convertToAMQMessage(entry), channelId, deliver); + AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize); + writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver); } @@ -274,18 +281,18 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } - private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) + private AMQBody createEncodedGetOkBody(ServerMessage msg, InstanceProperties props, long deliveryTag, int queueSize) throws AMQException { final AMQShortString exchangeName; final AMQShortString routingKey; - final AMQMessage message = convertToAMQMessage(entry); + final AMQMessage message = convertToAMQMessage(msg); final MessagePublishInfo pb = message.getMessagePublishInfo(); exchangeName = pb.getExchange(); routingKey = pb.getRoutingKey(); - final boolean isRedelivered = entry.isRedelivered(); + final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); BasicGetOkBody getOkBody = _methodRegistry.createBasicGetOkBody(deliveryTag, diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 1680a16b42..4a5ada89c2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -41,7 +41,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; @@ -133,11 +135,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } } - // *** ProtocolOutputConverter Implementation - public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - } - public ClientDeliveryMethod createDeliveryMethod(int channelId) { return new InternalWriteDeliverMethod(channelId); @@ -147,7 +144,10 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { } - public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException + public void writeDeliver(final ServerMessage msg, + final InstanceProperties props, int channelId, + long deliveryTag, + AMQShortString consumerTag) throws AMQException { _deliveryCount.incrementAndGet(); @@ -169,11 +169,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumers.put(consumerTag, consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); + consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg)); } } - public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException + public void writeGetOk(final ServerMessage msg, + final InstanceProperties props, + int channelId, + long deliveryTag, + int queueSize) throws AMQException { } @@ -195,15 +199,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr public class DeliveryPair { private long _deliveryTag; - private AMQMessage _message; + private ServerMessage _message; - public DeliveryPair(long deliveryTag, AMQMessage message) + public DeliveryPair(long deliveryTag, ServerMessage message) { _deliveryTag = deliveryTag; _message = message; } - public AMQMessage getMessage() + public ServerMessage getMessage() { return _message; } @@ -242,7 +246,9 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } - public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException + @Override + public void deliverToClient(Subscription sub, ServerMessage message, + InstanceProperties props, long deliveryTag) throws AMQException { _deliveryCount.incrementAndGet(); @@ -264,7 +270,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); + consumerDelivers.add(new DeliveryPair(deliveryTag, message)); } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 034927c56b..4abf1bf76b 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -631,7 +631,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { public void postCommit() { - queueEntry.discard(); + queueEntry.delete(); } public void onRollback() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index e5f3a52e3b..6a3f5b46e1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -148,14 +148,12 @@ class public boolean hasInterest(final QueueEntry entry) { - if(entry.getMessage() instanceof Message_1_0) + if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference()) { - if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference()) - { - return false; - } + return false; } - else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) + else if(!(entry.getMessage() instanceof Message_1_0) + && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) { return false; } @@ -537,7 +535,7 @@ class { if(_queueEntry.isAcquiredBy(Subscription_1_0.this)) { - _queueEntry.discard(); + _queueEntry.delete(); } } |
