summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-23 17:53:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-23 17:53:42 +0000
commit027c4c9398c0c15ed285d8a99df22d98e469cb0f (patch)
treed1e1d1e7b60f96c1c9c8ac0c0d2c4cd500d2eb0f /qpid
parent1dae32d6fd23383f759650607a7cc38e85ac3f79 (diff)
downloadqpid-python-027c4c9398c0c15ed285d8a99df22d98e469cb0f.tar.gz
QPID-5504 : simplify QueueEntry to remove discard/dispose/dequeue and only leave delete as the correct way to remove entries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java19
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java109
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java52
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java19
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java6
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java58
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java45
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java33
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java32
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java26
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java33
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java32
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java12
32 files changed, 302 insertions, 362 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
index 6958ac106b..589e888059 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -23,8 +23,6 @@ package org.apache.qpid.server.filter;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
public interface Filterable
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
index 6941ed119c..1d7b8627f4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
@@ -50,14 +50,7 @@ public interface InstanceProperties
public static InstanceProperties fromMap(Map<Property, Object> map)
{
final Map<Property,Object> props = new EnumMap<Property,Object>(map);
- return new InstanceProperties()
- {
- @Override
- public Object getProperty(final Property prop)
- {
- return props.get(prop);
- }
- };
+ return new MapInstanceProperties(props);
}
public static Map<Property, Object> asMap(InstanceProperties props)
@@ -75,5 +68,29 @@ public interface InstanceProperties
return map;
}
+
+ public static InstanceProperties copy(InstanceProperties from)
+ {
+ final Map<Property,Object> props = asMap(from);
+
+ return new MapInstanceProperties(props);
+
+ }
+
+ private static class MapInstanceProperties implements InstanceProperties
+ {
+ private final Map<Property, Object> _props;
+
+ private MapInstanceProperties(final Map<Property, Object> props)
+ {
+ _props = props;
+ }
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ return _props.get(prop);
+ }
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 16151dbb63..859a7c784c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -767,7 +767,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
public void postCommit()
{
- entry.discard();
+ entry.delete();
}
public void onRollback()
@@ -836,7 +836,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
public void postCommit()
{
- entry.discard();
+ entry.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 000bfbfd6e..e4725e0e2a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -165,7 +165,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
@Override
public void postCommit()
{
- entry.discard();
+ entry.delete();
}
@Override
@@ -196,21 +196,14 @@ public class ConflationQueueList extends SimpleQueueEntryList
}
@Override
- public boolean delete()
+ protected void onDelete()
{
- if(super.delete())
+ if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
{
- if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
- {
- Object key = getMessage().getMessageHeader().getHeader(_conflationKey);
- _latestValuesMap.remove(key,_latestValueReference);
- }
- return true;
- }
- else
- {
- return false;
+ Object key = getMessage().getMessageHeader().getHeader(_conflationKey);
+ _latestValuesMap.remove(key,_latestValueReference);
}
+
}
public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index d5c987026c..80ccbe1649 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
@@ -191,9 +192,6 @@ public interface QueueEntry extends Comparable<QueueEntry>
boolean acquire();
boolean acquire(Subscription sub);
- boolean delete();
- boolean isDeleted();
-
boolean acquiredBySubscription();
boolean isAcquiredBy(Subscription subscription);
@@ -209,11 +207,14 @@ public interface QueueEntry extends Comparable<QueueEntry>
boolean isRejectedBy(long subscriptionId);
- void dequeue();
-
- void dispose();
+ void delete();
- void discard();
+ /**
+ * Returns true if entry is either DEQUED or DELETED state.
+ *
+ * @return true if entry is either DEQUED or DELETED state
+ */
+ boolean isDeleted();
void routeToAlternate();
@@ -226,19 +227,6 @@ public interface QueueEntry extends Comparable<QueueEntry>
void addStateChangeListener(StateChangeListener listener);
boolean removeStateChangeListener(StateChangeListener listener);
- /**
- * Returns true if entry is in DEQUEUED state, otherwise returns false.
- *
- * @return true if entry is in DEQUEUED state, otherwise returns false
- */
- boolean isDequeued();
-
- /**
- * Returns true if entry is either DEQUED or DELETED state.
- *
- * @return true if entry is either DEQUED or DELETED state
- */
- boolean isDispensed();
/**
* Number of times this queue entry has been delivered.
@@ -253,4 +241,5 @@ public interface QueueEntry extends Comparable<QueueEntry>
Filterable asFilterable();
+ InstanceProperties getInstanceProperties();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 1b9b0f6daa..e7270e2028 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -25,13 +25,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -77,16 +78,13 @@ public abstract class QueueEntryImpl implements QueueEntry
private volatile long _entryId;
- private static final int DELIVERED_TO_CONSUMER = 1;
- private static final int REDELIVERED = 2;
-
- private volatile int _deliveryState;
+ private final EntryInstanceProperties _instanceProperties = new EntryInstanceProperties();
/** Number of times this message has been delivered */
private volatile int _deliveryCount = 0;
private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
.newUpdater(QueueEntryImpl.class, "_deliveryCount");
-
+ private boolean _deliveredToConsumer;
public QueueEntryImpl(QueueEntryList<?> queueEntryList)
@@ -103,12 +101,28 @@ public abstract class QueueEntryImpl implements QueueEntry
_message = message == null ? null : message.newReference();
_entryIdUpdater.set(this, entryId);
+ populateInstanceProperties();
}
public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference();
+ populateInstanceProperties();
+ }
+
+ private void populateInstanceProperties()
+ {
+ if(_message != null)
+ {
+ _instanceProperties.setProperty(InstanceProperties.Property.PERSISTENT, _message.getMessage().isPersistent());
+ _instanceProperties.setProperty(InstanceProperties.Property.EXPIRATION, _message.getMessage().getExpiration());
+ }
+ }
+
+ public InstanceProperties getInstanceProperties()
+ {
+ return _instanceProperties;
}
protected void setEntryId(long entryId)
@@ -138,7 +152,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public boolean getDeliveredToConsumer()
{
- return (_deliveryState & DELIVERED_TO_CONSUMER) != 0;
+ return _deliveredToConsumer;
}
public boolean expired() throws AMQException
@@ -190,7 +204,7 @@ public abstract class QueueEntryImpl implements QueueEntry
final boolean acquired = acquire(sub.getOwningState());
if(acquired)
{
- _deliveryState |= DELIVERED_TO_CONSUMER;
+ _deliveredToConsumer = true;
}
return acquired;
}
@@ -244,12 +258,12 @@ public abstract class QueueEntryImpl implements QueueEntry
public void setRedelivered()
{
- _deliveryState |= REDELIVERED;
+ _instanceProperties.setProperty(InstanceProperties.Property.REDELIVERED, Boolean.TRUE);
}
public boolean isRedelivered()
{
- return (_deliveryState & REDELIVERED) != 0;
+ return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
}
public Subscription getDeliveredSubscription()
@@ -297,7 +311,7 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
- public void dequeue()
+ private void dequeue()
{
EntryState state = _state;
@@ -329,21 +343,27 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
- public void dispose()
+ private boolean dispose()
{
- if(delete())
+ EntryState state = _state;
+
+ if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
+ _queueEntryList.entryDeleted(this);
+ onDelete();
_message.release();
+
+ return true;
+ }
+ else
+ {
+ return false;
}
}
- public void discard()
+ public void delete()
{
- //if the queue is null then the message is waiting to be acked, but has been removed.
- if (getQueue() != null)
- {
- dequeue();
- }
+ dequeue();
dispose();
}
@@ -355,12 +375,11 @@ public abstract class QueueEntryImpl implements QueueEntry
if (alternateExchange != null)
{
- QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(this);
- List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), props);
+ List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties());
final ServerMessage message = getMessage();
if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
{
- queues = alternateExchange.getAlternateExchange().route(getMessage(), props);
+ queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties());
}
@@ -397,7 +416,7 @@ public abstract class QueueEntryImpl implements QueueEntry
{
public void postCommit()
{
- discard();
+ delete();
}
public void onRollback()
@@ -446,24 +465,8 @@ public abstract class QueueEntryImpl implements QueueEntry
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
- public boolean isDeleted()
+ protected void onDelete()
{
- return _state == DELETED_STATE;
- }
-
- public boolean delete()
- {
- EntryState state = _state;
-
- if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
- {
- _queueEntryList.entryDeleted(this);
- return true;
- }
- else
- {
- return false;
- }
}
public QueueEntryList getQueueEntryList()
@@ -471,12 +474,7 @@ public abstract class QueueEntryImpl implements QueueEntry
return _queueEntryList;
}
- public boolean isDequeued()
- {
- return _state == DEQUEUED_STATE;
- }
-
- public boolean isDispensed()
+ public boolean isDeleted()
{
return _state.isDispensed();
}
@@ -499,7 +497,7 @@ public abstract class QueueEntryImpl implements QueueEntry
@Override
public Filterable asFilterable()
{
- return Filterable.Factory.newInstance(getMessage(), new QueueEntryInstanceProperties(this));
+ return Filterable.Factory.newInstance(getMessage(), getInstanceProperties());
}
public String toString()
@@ -509,4 +507,21 @@ public abstract class QueueEntryImpl implements QueueEntry
", _state=" + _state +
'}';
}
+
+ private static class EntryInstanceProperties implements InstanceProperties
+ {
+ private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class);
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ return _properties.get(prop);
+ }
+
+ private void setProperty(Property prop, Object value)
+ {
+ _properties.put(prop, value);
+ }
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java
deleted file mode 100644
index 3affc69ed0..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.message.InstanceProperties;
-
-public class QueueEntryInstanceProperties implements InstanceProperties
-{
- private final QueueEntry _entry;
-
- public QueueEntryInstanceProperties(final QueueEntry entry)
- {
- _entry = entry;
- }
-
- @Override
- public Object getProperty(final Property prop)
- {
- switch(prop)
- {
- case REDELIVERED:
- return _entry.isRedelivered();
- case MANDATORY:
- return false;
- case PERSISTENT:
- return _entry.getMessage().isPersistent();
- case IMMEDIATE:
- return false;
- case EXPIRATION:
- return _entry.getMessage().getExpiration();
- }
- return null;
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index c0bc662039..4101437711 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1030,7 +1030,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDispensed())
+ if (node != null && !node.isDeleted())
{
entryList.add(node);
}
@@ -1153,7 +1153,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDispensed() && filter.accept(node))
+ if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
}
@@ -1170,7 +1170,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
QueueEntry node = queueListIterator.getNode();
- if(!node.isDispensed())
+ if(!node.isDeleted())
{
if(visitor.visit(node))
{
@@ -1290,7 +1290,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void postCommit()
{
- node.discard();
+ node.delete();
}
public void onRollback()
@@ -1361,11 +1361,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
for(final QueueEntry entry : entries)
{
- QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(entry);
- List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), props);
+ List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
{
- queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(),props);
+ queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties());
}
final ServerMessage message = entry.getMessage();
@@ -1403,7 +1402,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void postCommit()
{
- entry.discard();
+ entry.delete();
}
public void onRollback()
@@ -1431,7 +1430,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void postCommit()
{
- entry.discard();
+ entry.delete();
}
public void onRollback()
@@ -1908,7 +1907,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
QueueEntry node = queueListIterator.getNode();
// Only process nodes that are not currently deleted and not dequeued
- if (!node.isDispensed())
+ if (!node.isDeleted())
{
// If the node has exired then acquire it
if (node.expired() && node.acquire())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
index 4a10d31d37..251a1f55ed 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
@@ -57,7 +57,7 @@ public class SimpleQueueEntryImpl extends QueueEntryImpl
{
SimpleQueueEntryImpl next = getNextNode();
- while(next != null && next.isDispensed())
+ while(next != null && next.isDeleted())
{
final SimpleQueueEntryImpl newNext = next.getNextNode();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index 7f742d455d..85559157a9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -283,14 +283,14 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
{
synchronized(_lock)
{
- if(node.isDispensed() && _head != node)
+ if(node.isDeleted() && _head != node)
{
SortedQueueEntryImpl current = _head;
SortedQueueEntryImpl next;
while(current != null)
{
next = current.getNextValidEntry();
- if(current.compareTo(node)>0 && !current.isDispensed())
+ if(current.compareTo(node)>0 && !current.isDeleted())
{
break;
}
@@ -642,7 +642,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
if(!atTail())
{
SortedQueueEntryImpl nextNode = next(_lastNode);
- while(nextNode.isDispensed() && next(nextNode) != null)
+ while(nextNode.isDeleted() && next(nextNode) != null)
{
nextNode = next(nextNode);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
index 632b59d3fa..45a1978af1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
@@ -21,9 +21,11 @@
package org.apache.qpid.server.subscription;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
public interface ClientDeliveryMethod
{
- void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException;
+ void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props,
+ final long deliveryTag) throws AMQException;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 39ca3197b4..d345148e5e 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -199,7 +199,7 @@ public class VirtualHostConfigRecoveryHandler implements
public void postCommit()
{
- entry.discard();
+ entry.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
index 6538724a71..d67c70c831 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
@@ -66,7 +66,7 @@ public class ConflationQueueListTest extends TestCase
ServerMessage message = createTestServerMessage(null);
QueueEntry addedEntry = _list.add(message);
- addedEntry.discard();
+ addedEntry.delete();
int numberOfEntries = countEntries(_list);
assertEquals(0, numberOfEntries);
@@ -86,7 +86,7 @@ public class ConflationQueueListTest extends TestCase
ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
QueueEntry addedEntry = _list.add(message);
- addedEntry.discard();
+ addedEntry.delete();
int numberOfEntries = countEntries(_list);
assertEquals(0, numberOfEntries);
@@ -146,7 +146,7 @@ public class ConflationQueueListTest extends TestCase
assertEquals(1, countEntries(_list));
assertEquals(1, _list.getLatestValuesMap().size());
- addedEntry.discard();
+ addedEntry.delete();
assertEquals(0, countEntries(_list));
assertEquals(0, _list.getLatestValuesMap().size());
@@ -166,8 +166,8 @@ public class ConflationQueueListTest extends TestCase
assertEquals(2, countEntries(_list));
assertEquals(2, _list.getLatestValuesMap().size());
- addedEntry1.discard();
- addedEntry2.discard();
+ addedEntry1.delete();
+ addedEntry2.delete();
assertEquals(0, countEntries(_list));
assertEquals(0, _list.getLatestValuesMap().size());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index bbaca38c79..2e3231e208 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
@@ -56,17 +57,7 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean delete()
- {
- return false;
- }
-
- public void dequeue()
- {
-
- }
-
- public void discard()
+ public void delete()
{
}
@@ -76,11 +67,6 @@ public class MockQueueEntry implements QueueEntry
}
- public void dispose()
- {
-
- }
-
public boolean expired() throws AMQException
{
return false;
@@ -121,11 +107,6 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public boolean isDeleted()
- {
- return false;
- }
-
public boolean isQueueDeleted()
{
@@ -161,26 +142,6 @@ public class MockQueueEntry implements QueueEntry
return false;
}
-
- public void requeue()
- {
-
-
- }
-
- public void requeue(Subscription subscription)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- public void setDeliveredToSubscription()
- {
-
-
- }
-
-
public void setRedelivered()
{
@@ -213,12 +174,7 @@ public class MockQueueEntry implements QueueEntry
_message = msg;
}
- public boolean isDequeued()
- {
- return false;
- }
-
- public boolean isDispensed()
+ public boolean isDeleted()
{
return false;
}
@@ -252,6 +208,12 @@ public class MockQueueEntry implements QueueEntry
@Override
public Filterable asFilterable()
{
- return Filterable.Factory.newInstance(_message, new QueueEntryInstanceProperties(this));
+ return Filterable.Factory.newInstance(_message, getInstanceProperties());
+ }
+
+ @Override
+ public InstanceProperties getInstanceProperties()
+ {
+ return InstanceProperties.EMPTY;
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index d348c3e67b..1c75c1ac6a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -21,6 +21,7 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry.EntryState;
import org.apache.qpid.server.subscription.MockSubscription;
@@ -63,11 +64,6 @@ public abstract class QueueEntryImplTestBase extends TestCase
acquire();
}
- public void testDequeue()
- {
- dequeue();
- }
-
public void testDelete()
{
delete();
@@ -79,27 +75,12 @@ public abstract class QueueEntryImplTestBase extends TestCase
* Entry in state ACQUIRED should be released and its status should be
* changed to AVAILABLE.
*/
- public void testReleaseAquired()
+ public void testReleaseAcquired()
{
acquire();
_queueEntry.release();
assertTrue("Queue entry should be in AVAILABLE state after invoking of release method",
- _queueEntry.isAvailable());
- }
-
- /**
- * Tests release method for entry in dequeued state.
- * <p>
- * Invoking release on dequeued entry should not have any effect on its
- * state.
- */
- public void testReleaseDequeued()
- {
- dequeue();
- _queueEntry.release();
- EntryState state = getState();
- assertEquals("Invoking of release on entry in DEQUEUED state should not have any effect",
- QueueEntry.DEQUEUED_STATE, state);
+ _queueEntry.isAvailable());
}
/**
@@ -126,17 +107,6 @@ public abstract class QueueEntryImplTestBase extends TestCase
_queueEntry.isDeleted());
}
- /**
- * A helper method to put tested entry into dequeue state and assert the sate
- */
- private void dequeue()
- {
- acquire();
- _queueEntry.dequeue();
- EntryState state = getState();
- assertEquals("Queue entry should be in DEQUEUED state after invoking of dequeue method",
- QueueEntry.DEQUEUED_STATE, state);
- }
/**
* A helper method to put tested entry into acquired state and assert the sate
@@ -216,6 +186,9 @@ public abstract class QueueEntryImplTestBase extends TestCase
{
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long)i);
+ final MessageReference reference = mock(MessageReference.class);
+ when(reference.getMessage()).thenReturn(message);
+ when(message.newReference()).thenReturn(reference);
QueueEntryImpl entry = queueEntryList.add(message);
entries[i] = entry;
}
@@ -235,13 +208,13 @@ public abstract class QueueEntryImplTestBase extends TestCase
}
}
- // delete second
+ // discard second
entries[1].acquire();
entries[1].delete();
- // dequeue third
+ // discard third
entries[2].acquire();
- entries[2].dequeue();
+ entries[2].delete();
QueueEntry next = entries[0].getNextValidEntry();
assertEquals("expected forth entry",entries[3], next);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index beb5bda7ff..99e32855c7 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -156,7 +156,7 @@ public abstract class QueueEntryListTestBase extends TestCase
if(counter++ % 2 == 0)
{
queueEntry.acquire();
- queueEntry.dequeue();
+ queueEntry.delete();
}
}
@@ -225,7 +225,8 @@ public abstract class QueueEntryListTestBase extends TestCase
assertNull(list.next(queueEntry2));
//'delete' the 2nd QueueEntry
- assertTrue("Deleting node should have succeeded", queueEntry2.delete());
+ queueEntry2.delete();
+ assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted());
QueueEntryIterator<QueueEntry> iter = list.iterator();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index a7dcaf6d49..0d6cddb842 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -936,12 +936,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
return new SimpleQueueEntryImpl(this, message)
{
- public boolean isDequeued()
- {
- return (message.getMessageNumber() % 2 == 0);
- }
- public boolean isDispensed()
+ public boolean isDeleted()
{
return (message.getMessageNumber() % 2 == 0);
}
@@ -1166,8 +1162,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
List<QueueEntry> entries = queue.getMessagesOnTheQueue();
QueueEntry entry = entries.get(dequeueMessageIndex);
entry.acquire();
- entry.dequeue();
- assertTrue(entry.isDequeued());
+ entry.delete();
+ assertTrue(entry.isDeleted());
return entry;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index c2c2fc4b98..11ff7ed192 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -21,18 +21,24 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase {
+public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
+{
private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
- public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException {
+ public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
+ {
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long)msgId);
+ final MessageReference reference = mock(MessageReference.class);
+ when(reference.getMessage()).thenReturn(message);
+ when(message.newReference()).thenReturn(reference);
return queueEntryList.add(message);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
index 63b3a7d165..ae282d5f37 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
@@ -132,37 +132,45 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
//requiring a scavenge once the requested threshold of 9 deletes is passed
//Delete the 2nd message only
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(2).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,2));
verifyDeletedButPresentBeforeScavenge(head, 2);
//Delete messages 12 to 14
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(12).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,12));
verifyDeletedButPresentBeforeScavenge(head, 12);
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(13).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,13));
verifyDeletedButPresentBeforeScavenge(head, 13);
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,14));
verifyDeletedButPresentBeforeScavenge(head, 14);
//Delete message 20 only
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,20));
verifyDeletedButPresentBeforeScavenge(head, 20);
//Delete messages 81 to 84
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,81));
verifyDeletedButPresentBeforeScavenge(head, 81);
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(82).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,82));
verifyDeletedButPresentBeforeScavenge(head, 82);
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(83).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,83));
verifyDeletedButPresentBeforeScavenge(head, 83);
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,84));
verifyDeletedButPresentBeforeScavenge(head, 84);
//Delete message 99 - this is the 10th message deleted that is after the queue head
//and so will invoke the scavenge() which is set to go after 9 previous deletions
- assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete());
+ assertTrue("Failed to delete QueueEntry", remove(entriesMap,99));
verifyAllDeletedMessagedNotPresent(head, entriesMap);
}
+
+ private boolean remove(Map<Integer,QueueEntry> entriesMap, int pos)
+ {
+ QueueEntry entry = entriesMap.remove(pos);
+ boolean wasDeleted = entry.isDeleted();
+ entry.delete();
+ return entry.isDeleted() && !wasDeleted;
+ }
private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId)
{
@@ -211,6 +219,9 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
{
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long)i);
+ final MessageReference reference = mock(MessageReference.class);
+ when(reference.getMessage()).thenReturn(message);
+ when(message.newReference()).thenReturn(reference);
entries[i] = queueEntryList.add(message);
}
@@ -235,7 +246,7 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
// dequeue third
entries[2].acquire();
- entries[2].dequeue();
+ entries[2].delete();
SimpleQueueEntryImpl next = entries[2].getNextValidEntry();
assertEquals("expected forth entry", entries[3], next);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
index acd0ccbdeb..a84dd6c249 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
@@ -22,19 +22,22 @@ package org.apache.qpid.server.queue;
import java.util.Collections;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SortedQueueEntryImplTest extends QueueEntryImplTestBase {
+public class SortedQueueEntryImplTest extends QueueEntryImplTestBase
+{
public final static String keys[] = { "CCC", "AAA", "BBB" };
private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY");
- public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException {
+ public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
+ {
final ServerMessage message = mock(ServerMessage.class);
AMQMessageHeader hdr = mock(AMQMessageHeader.class);
when(message.getMessageHeader()).thenReturn(hdr);
@@ -42,6 +45,9 @@ public class SortedQueueEntryImplTest extends QueueEntryImplTestBase {
when(hdr.containsHeader(eq("KEY"))).thenReturn(true);
when(hdr.getHeaderNames()).thenReturn(Collections.singleton("KEY"));
+ final MessageReference reference = mock(MessageReference.class);
+ when(reference.getMessage()).thenReturn(message);
+ when(message.newReference()).thenReturn(reference);
return queueEntryList.add(message);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index f68973096a..17d0e5cb64 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -532,7 +531,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
restoreCredit(entry);
}
- entry.discard();
+ entry.delete();
}
public void onRollback()
@@ -548,7 +547,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
entry.routeToAlternate();
if(entry.isAcquiredBy(this))
{
- entry.discard();
+ entry.delete();
}
}
@@ -581,11 +580,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
final ServerMessage msg = entry.getMessage();
if (alternateExchange != null)
{
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry));
+ final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
if (destinationQueues == null || destinationQueues.isEmpty())
{
- entry.discard();
+ entry.delete();
logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
}
@@ -602,7 +601,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
else
{
- entry.discard();
+ entry.delete();
logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
}
}
@@ -787,7 +786,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
_unacknowledgedBytes.addAndGet(-entry.getSize());
_unacknowledgedCount.decrementAndGet();
- entry.discard();
+ entry.delete();
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index bb1d1949a2..5c674ef27e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -74,7 +74,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -736,7 +735,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
- unacked.discard();
+ unacked.delete();
}
}
@@ -771,7 +770,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
- unacked.discard();
+ unacked.delete();
}
}
else
@@ -1362,7 +1361,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
for(QueueEntry entry : _ackedMessages)
{
- entry.discard();
+ entry.delete();
}
}
finally
@@ -1571,19 +1570,19 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
_logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
_actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- rejectedQueueEntry.discard();
+ rejectedQueueEntry.delete();
return;
}
final List<? extends BaseQueue> destinationQueues =
- altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry));
+ altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties());
if (destinationQueues == null || destinationQueues.isEmpty())
{
_logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
_actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
- rejectedQueueEntry.discard();
+ rejectedQueueEntry.delete();
return;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index f04475eb33..c3489d8c82 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -73,6 +73,8 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
@@ -88,7 +90,6 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -348,7 +349,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
* Process the data block.
* If the message is for a channel it is added to {@link #_channelsForCurrentMessage}.
*
- * @throws an AMQConnectionException if unable to process the data block. In this case,
+ * @throws AMQConnectionException if unable to process the data block. In this case,
* the connection is already closed by the time the exception is thrown. If any other
* type of exception is thrown, the connection is not already closed.
*/
@@ -376,7 +377,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
* Handle the supplied frame.
* Adds this frame's channel to {@link #_channelsForCurrentMessage}.
*
- * @throws an AMQConnectionException if unable to process the data block. In this case,
+ * @throws AMQConnectionException if unable to process the data block. In this case,
* the connection is already closed by the time the exception is thrown. If any other
* type of exception is thrown, the connection is not already closed.
*/
@@ -1667,12 +1668,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_channelId = channelId;
}
- public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ @Override
+ public void deliverToClient(final Subscription sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag)
throws AMQException
{
- registerMessageDelivered(entry.getMessage().getSize());
- _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
- entry.incrementDeliveryCount();
+ registerMessageDelivered(message.getSize());
+ _protocolOutputConverter.writeDeliver(message,
+ props,
+ _channelId,
+ deliveryTag,
+ ((SubscriptionImpl)sub).getConsumerTag());
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
index 5e416b52ca..ce0ef6cf50 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
@@ -115,7 +115,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
public void postCommit()
{
- node.discard();
+ node.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
index d48e8b3dea..3b087d263a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -36,6 +35,9 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -132,7 +134,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(entry, deliveryTag);
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
@@ -147,7 +149,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public static class NoAckSubscription extends SubscriptionImpl
{
- private volatile AutoCommitTransaction _txn;
+ private final AutoCommitTransaction _txn;
public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
@@ -157,6 +159,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
throws AMQException
{
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore());
}
@@ -192,23 +195,22 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
- if(_txn == null)
- {
- _txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
- }
_txn.dequeue(getQueue(), entry.getMessage(), NOOP);
- entry.dequeue();
+ ServerMessage message = entry.getMessage();
+ MessageReference ref = message.newReference();
+ InstanceProperties props = entry.getInstanceProperties();
+ entry.delete();
synchronized (getChannel())
{
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(entry, deliveryTag);
+ sendToClient(message, props, deliveryTag);
}
- entry.dispose();
+ ref.release();
}
@@ -301,8 +303,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
addUnacknowledgedMessage(entry);
recordMessageDelivery(entry, deliveryTag);
- sendToClient(entry, deliveryTag);
-
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ entry.incrementDeliveryCount();
}
}
@@ -688,12 +690,12 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
- protected void sendToClient(final QueueEntry entry, final long deliveryTag)
+ protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
throws AMQException
{
- _deliveryMethod.deliverToClient(this,entry,deliveryTag);
+ _deliveryMethod.deliverToClient(this, message, props, deliveryTag);
_deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(entry.getSize());
+ _deliveredBytes.addAndGet(message.getSize());
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 5238a41e49..4b569ccc71 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -28,10 +28,11 @@ import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -126,21 +127,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
{
- public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ @Override
+ public void deliverToClient(final Subscription sub, final ServerMessage message, final
+ InstanceProperties props, final long deliveryTag)
throws AMQException
{
- singleMessageCredit.useCreditForMessage(entry.getMessage().getSize());
- if(entry.getMessage() instanceof AMQMessage)
- {
- session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
- deliveryTag, queue.getMessageCount());
- entry.incrementDeliveryCount();
- }
- else
- {
- //TODO Convert AMQP 0-10 message
- throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Not implemented conversion of 0-10 message", null);
- }
+ singleMessageCredit.useCreditForMessage(message.getSize());
+ session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ channel.getChannelId(),
+ deliveryTag,
+ queue.getMessageCount());
+
}
};
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
index 0cfdff3338..f2ab154b32 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
@@ -79,7 +79,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
- message.discard();
+ message.delete();
}
return;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
index 48e42ce5a3..2dae7d3e9a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
@@ -31,7 +31,9 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
@@ -44,10 +46,17 @@ public interface ProtocolOutputConverter
ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ void writeDeliver(final ServerMessage msg,
+ final InstanceProperties props, int channelId,
+ long deliveryTag,
+ AMQShortString consumerTag)
throws AMQException;
- void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+ void writeGetOk(final ServerMessage msg,
+ final InstanceProperties props,
+ int channelId,
+ long deliveryTag,
+ int queueSize) throws AMQException;
byte getProtocolMinorVersion();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
index dd5e13e56a..290a859df6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
@@ -33,13 +33,13 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueEntry;
import java.io.DataOutput;
import java.io.IOException;
@@ -64,24 +64,27 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _protocolSession;
}
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(final ServerMessage m,
+ final InstanceProperties props, int channelId,
+ long deliveryTag,
+ AMQShortString consumerTag)
throws AMQException
{
- AMQMessage msg = convertToAMQMessage(entry);
- AMQBody deliverBody = createEncodedDeliverBody(msg, entry.isRedelivered(), deliveryTag, consumerTag);
+ final AMQMessage msg = convertToAMQMessage(m);
+ final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
+ AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag);
writeMessageDelivery(msg, channelId, deliverBody);
}
- private AMQMessage convertToAMQMessage(QueueEntry entry)
+ private AMQMessage convertToAMQMessage(ServerMessage serverMessage)
{
- ServerMessage serverMessage = entry.getMessage();
if(serverMessage instanceof AMQMessage)
{
return (AMQMessage) serverMessage;
}
else
{
- return getMessageConverter(serverMessage).convert(serverMessage, entry.getQueue().getVirtualHost());
+ return getMessageConverter(serverMessage).convert(serverMessage, _protocolSession.getVirtualHost());
}
}
@@ -186,10 +189,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(final ServerMessage msg,
+ final InstanceProperties props,
+ int channelId,
+ long deliveryTag,
+ int queueSize) throws AMQException
{
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(convertToAMQMessage(entry), channelId, deliver);
+ AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize);
+ writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver);
}
@@ -274,18 +281,18 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(ServerMessage msg, InstanceProperties props, long deliveryTag, int queueSize)
throws AMQException
{
final AMQShortString exchangeName;
final AMQShortString routingKey;
- final AMQMessage message = convertToAMQMessage(entry);
+ final AMQMessage message = convertToAMQMessage(msg);
final MessagePublishInfo pb = message.getMessagePublishInfo();
exchangeName = pb.getExchange();
routingKey = pb.getRoutingKey();
- final boolean isRedelivered = entry.isRedelivered();
+ final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
BasicGetOkBody getOkBody =
_methodRegistry.createBasicGetOkBody(deliveryTag,
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 1680a16b42..4a5ada89c2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -41,7 +41,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -133,11 +135,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
}
- // *** ProtocolOutputConverter Implementation
- public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- }
-
public ClientDeliveryMethod createDeliveryMethod(int channelId)
{
return new InternalWriteDeliverMethod(channelId);
@@ -147,7 +144,10 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
}
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+ public void writeDeliver(final ServerMessage msg,
+ final InstanceProperties props, int channelId,
+ long deliveryTag,
+ AMQShortString consumerTag) throws AMQException
{
_deliveryCount.incrementAndGet();
@@ -169,11 +169,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
consumers.put(consumerTag, consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
}
}
- public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(final ServerMessage msg,
+ final InstanceProperties props,
+ int channelId,
+ long deliveryTag,
+ int queueSize) throws AMQException
{
}
@@ -195,15 +199,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
public class DeliveryPair
{
private long _deliveryTag;
- private AMQMessage _message;
+ private ServerMessage _message;
- public DeliveryPair(long deliveryTag, AMQMessage message)
+ public DeliveryPair(long deliveryTag, ServerMessage message)
{
_deliveryTag = deliveryTag;
_message = message;
}
- public AMQMessage getMessage()
+ public ServerMessage getMessage()
{
return _message;
}
@@ -242,7 +246,9 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
- public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException
+ @Override
+ public void deliverToClient(Subscription sub, ServerMessage message,
+ InstanceProperties props, long deliveryTag) throws AMQException
{
_deliveryCount.incrementAndGet();
@@ -264,7 +270,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, message));
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 034927c56b..4abf1bf76b 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -631,7 +631,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
public void postCommit()
{
- queueEntry.discard();
+ queueEntry.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index e5f3a52e3b..6a3f5b46e1 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -148,14 +148,12 @@ class
public boolean hasInterest(final QueueEntry entry)
{
- if(entry.getMessage() instanceof Message_1_0)
+ if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference())
{
- if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference())
- {
- return false;
- }
+ return false;
}
- else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
+ else if(!(entry.getMessage() instanceof Message_1_0)
+ && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
{
return false;
}
@@ -537,7 +535,7 @@ class
{
if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
{
- _queueEntry.discard();
+ _queueEntry.delete();
}
}