summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-23 17:53:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-23 17:53:42 +0000
commit027c4c9398c0c15ed285d8a99df22d98e469cb0f (patch)
treed1e1d1e7b60f96c1c9c8ac0c0d2c4cd500d2eb0f /qpid/java/broker-plugins
parent1dae32d6fd23383f759650607a7cc38e85ac3f79 (diff)
downloadqpid-python-027c4c9398c0c15ed285d8a99df22d98e469cb0f.tar.gz
QPID-5504 : simplify QueueEntry to remove discard/dispose/dequeue and only leave delete as the correct way to remove entries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java32
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java26
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java33
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java32
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java12
12 files changed, 112 insertions, 88 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index f68973096a..17d0e5cb64 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -532,7 +531,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
restoreCredit(entry);
}
- entry.discard();
+ entry.delete();
}
public void onRollback()
@@ -548,7 +547,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
entry.routeToAlternate();
if(entry.isAcquiredBy(this))
{
- entry.discard();
+ entry.delete();
}
}
@@ -581,11 +580,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
final ServerMessage msg = entry.getMessage();
if (alternateExchange != null)
{
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry));
+ final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
if (destinationQueues == null || destinationQueues.isEmpty())
{
- entry.discard();
+ entry.delete();
logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
}
@@ -602,7 +601,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
else
{
- entry.discard();
+ entry.delete();
logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
}
}
@@ -787,7 +786,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
_unacknowledgedBytes.addAndGet(-entry.getSize());
_unacknowledgedCount.decrementAndGet();
- entry.discard();
+ entry.delete();
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index bb1d1949a2..5c674ef27e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -74,7 +74,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -736,7 +735,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
- unacked.discard();
+ unacked.delete();
}
}
@@ -771,7 +770,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
- unacked.discard();
+ unacked.delete();
}
}
else
@@ -1362,7 +1361,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
for(QueueEntry entry : _ackedMessages)
{
- entry.discard();
+ entry.delete();
}
}
finally
@@ -1571,19 +1570,19 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
_logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
_actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- rejectedQueueEntry.discard();
+ rejectedQueueEntry.delete();
return;
}
final List<? extends BaseQueue> destinationQueues =
- altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry));
+ altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties());
if (destinationQueues == null || destinationQueues.isEmpty())
{
_logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
_actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
- rejectedQueueEntry.discard();
+ rejectedQueueEntry.delete();
return;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index f04475eb33..c3489d8c82 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -73,6 +73,8 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
@@ -88,7 +90,6 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -348,7 +349,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
* Process the data block.
* If the message is for a channel it is added to {@link #_channelsForCurrentMessage}.
*
- * @throws an AMQConnectionException if unable to process the data block. In this case,
+ * @throws AMQConnectionException if unable to process the data block. In this case,
* the connection is already closed by the time the exception is thrown. If any other
* type of exception is thrown, the connection is not already closed.
*/
@@ -376,7 +377,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
* Handle the supplied frame.
* Adds this frame's channel to {@link #_channelsForCurrentMessage}.
*
- * @throws an AMQConnectionException if unable to process the data block. In this case,
+ * @throws AMQConnectionException if unable to process the data block. In this case,
* the connection is already closed by the time the exception is thrown. If any other
* type of exception is thrown, the connection is not already closed.
*/
@@ -1667,12 +1668,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_channelId = channelId;
}
- public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ @Override
+ public void deliverToClient(final Subscription sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag)
throws AMQException
{
- registerMessageDelivered(entry.getMessage().getSize());
- _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
- entry.incrementDeliveryCount();
+ registerMessageDelivered(message.getSize());
+ _protocolOutputConverter.writeDeliver(message,
+ props,
+ _channelId,
+ deliveryTag,
+ ((SubscriptionImpl)sub).getConsumerTag());
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
index 5e416b52ca..ce0ef6cf50 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
@@ -115,7 +115,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
public void postCommit()
{
- node.discard();
+ node.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
index d48e8b3dea..3b087d263a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -36,6 +35,9 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -132,7 +134,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(entry, deliveryTag);
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
@@ -147,7 +149,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public static class NoAckSubscription extends SubscriptionImpl
{
- private volatile AutoCommitTransaction _txn;
+ private final AutoCommitTransaction _txn;
public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
@@ -157,6 +159,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
throws AMQException
{
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore());
}
@@ -192,23 +195,22 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
- if(_txn == null)
- {
- _txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
- }
_txn.dequeue(getQueue(), entry.getMessage(), NOOP);
- entry.dequeue();
+ ServerMessage message = entry.getMessage();
+ MessageReference ref = message.newReference();
+ InstanceProperties props = entry.getInstanceProperties();
+ entry.delete();
synchronized (getChannel())
{
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(entry, deliveryTag);
+ sendToClient(message, props, deliveryTag);
}
- entry.dispose();
+ ref.release();
}
@@ -301,8 +303,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
addUnacknowledgedMessage(entry);
recordMessageDelivery(entry, deliveryTag);
- sendToClient(entry, deliveryTag);
-
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ entry.incrementDeliveryCount();
}
}
@@ -688,12 +690,12 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
- protected void sendToClient(final QueueEntry entry, final long deliveryTag)
+ protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
throws AMQException
{
- _deliveryMethod.deliverToClient(this,entry,deliveryTag);
+ _deliveryMethod.deliverToClient(this, message, props, deliveryTag);
_deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(entry.getSize());
+ _deliveredBytes.addAndGet(message.getSize());
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 5238a41e49..4b569ccc71 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -28,10 +28,11 @@ import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -126,21 +127,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
{
- public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ @Override
+ public void deliverToClient(final Subscription sub, final ServerMessage message, final
+ InstanceProperties props, final long deliveryTag)
throws AMQException
{
- singleMessageCredit.useCreditForMessage(entry.getMessage().getSize());
- if(entry.getMessage() instanceof AMQMessage)
- {
- session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
- deliveryTag, queue.getMessageCount());
- entry.incrementDeliveryCount();
- }
- else
- {
- //TODO Convert AMQP 0-10 message
- throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Not implemented conversion of 0-10 message", null);
- }
+ singleMessageCredit.useCreditForMessage(message.getSize());
+ session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ channel.getChannelId(),
+ deliveryTag,
+ queue.getMessageCount());
+
}
};
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
index 0cfdff3338..f2ab154b32 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
@@ -79,7 +79,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
- message.discard();
+ message.delete();
}
return;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
index 48e42ce5a3..2dae7d3e9a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
@@ -31,7 +31,9 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
@@ -44,10 +46,17 @@ public interface ProtocolOutputConverter
ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ void writeDeliver(final ServerMessage msg,
+ final InstanceProperties props, int channelId,
+ long deliveryTag,
+ AMQShortString consumerTag)
throws AMQException;
- void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+ void writeGetOk(final ServerMessage msg,
+ final InstanceProperties props,
+ int channelId,
+ long deliveryTag,
+ int queueSize) throws AMQException;
byte getProtocolMinorVersion();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
index dd5e13e56a..290a859df6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
@@ -33,13 +33,13 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueEntry;
import java.io.DataOutput;
import java.io.IOException;
@@ -64,24 +64,27 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _protocolSession;
}
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(final ServerMessage m,
+ final InstanceProperties props, int channelId,
+ long deliveryTag,
+ AMQShortString consumerTag)
throws AMQException
{
- AMQMessage msg = convertToAMQMessage(entry);
- AMQBody deliverBody = createEncodedDeliverBody(msg, entry.isRedelivered(), deliveryTag, consumerTag);
+ final AMQMessage msg = convertToAMQMessage(m);
+ final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
+ AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag);
writeMessageDelivery(msg, channelId, deliverBody);
}
- private AMQMessage convertToAMQMessage(QueueEntry entry)
+ private AMQMessage convertToAMQMessage(ServerMessage serverMessage)
{
- ServerMessage serverMessage = entry.getMessage();
if(serverMessage instanceof AMQMessage)
{
return (AMQMessage) serverMessage;
}
else
{
- return getMessageConverter(serverMessage).convert(serverMessage, entry.getQueue().getVirtualHost());
+ return getMessageConverter(serverMessage).convert(serverMessage, _protocolSession.getVirtualHost());
}
}
@@ -186,10 +189,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(final ServerMessage msg,
+ final InstanceProperties props,
+ int channelId,
+ long deliveryTag,
+ int queueSize) throws AMQException
{
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(convertToAMQMessage(entry), channelId, deliver);
+ AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize);
+ writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver);
}
@@ -274,18 +281,18 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(ServerMessage msg, InstanceProperties props, long deliveryTag, int queueSize)
throws AMQException
{
final AMQShortString exchangeName;
final AMQShortString routingKey;
- final AMQMessage message = convertToAMQMessage(entry);
+ final AMQMessage message = convertToAMQMessage(msg);
final MessagePublishInfo pb = message.getMessagePublishInfo();
exchangeName = pb.getExchange();
routingKey = pb.getRoutingKey();
- final boolean isRedelivered = entry.isRedelivered();
+ final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
BasicGetOkBody getOkBody =
_methodRegistry.createBasicGetOkBody(deliveryTag,
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 1680a16b42..4a5ada89c2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -41,7 +41,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -133,11 +135,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
}
- // *** ProtocolOutputConverter Implementation
- public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- }
-
public ClientDeliveryMethod createDeliveryMethod(int channelId)
{
return new InternalWriteDeliverMethod(channelId);
@@ -147,7 +144,10 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
}
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+ public void writeDeliver(final ServerMessage msg,
+ final InstanceProperties props, int channelId,
+ long deliveryTag,
+ AMQShortString consumerTag) throws AMQException
{
_deliveryCount.incrementAndGet();
@@ -169,11 +169,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
consumers.put(consumerTag, consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
}
}
- public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(final ServerMessage msg,
+ final InstanceProperties props,
+ int channelId,
+ long deliveryTag,
+ int queueSize) throws AMQException
{
}
@@ -195,15 +199,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
public class DeliveryPair
{
private long _deliveryTag;
- private AMQMessage _message;
+ private ServerMessage _message;
- public DeliveryPair(long deliveryTag, AMQMessage message)
+ public DeliveryPair(long deliveryTag, ServerMessage message)
{
_deliveryTag = deliveryTag;
_message = message;
}
- public AMQMessage getMessage()
+ public ServerMessage getMessage()
{
return _message;
}
@@ -242,7 +246,9 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
- public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException
+ @Override
+ public void deliverToClient(Subscription sub, ServerMessage message,
+ InstanceProperties props, long deliveryTag) throws AMQException
{
_deliveryCount.incrementAndGet();
@@ -264,7 +270,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, message));
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 034927c56b..4abf1bf76b 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -631,7 +631,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
public void postCommit()
{
- queueEntry.discard();
+ queueEntry.delete();
}
public void onRollback()
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index e5f3a52e3b..6a3f5b46e1 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -148,14 +148,12 @@ class
public boolean hasInterest(final QueueEntry entry)
{
- if(entry.getMessage() instanceof Message_1_0)
+ if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference())
{
- if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference())
- {
- return false;
- }
+ return false;
}
- else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
+ else if(!(entry.getMessage() instanceof Message_1_0)
+ && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
{
return false;
}
@@ -537,7 +535,7 @@ class
{
if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
{
- _queueEntry.discard();
+ _queueEntry.delete();
}
}