diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-23 17:53:42 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-23 17:53:42 +0000 |
| commit | 027c4c9398c0c15ed285d8a99df22d98e469cb0f (patch) | |
| tree | d1e1d1e7b60f96c1c9c8ac0c0d2c4cd500d2eb0f /qpid/java/broker-plugins | |
| parent | 1dae32d6fd23383f759650607a7cc38e85ac3f79 (diff) | |
| download | qpid-python-027c4c9398c0c15ed285d8a99df22d98e469cb0f.tar.gz | |
QPID-5504 : simplify QueueEntry to remove discard/dispose/dequeue and only leave delete as the correct way to remove entries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
12 files changed, 112 insertions, 88 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index f68973096a..17d0e5cb64 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryInstanceProperties; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -532,7 +531,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { restoreCredit(entry); } - entry.discard(); + entry.delete(); } public void onRollback() @@ -548,7 +547,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr entry.routeToAlternate(); if(entry.isAcquiredBy(this)) { - entry.discard(); + entry.delete(); } } @@ -581,11 +580,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr final ServerMessage msg = entry.getMessage(); if (alternateExchange != null) { - final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry)); + final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); if (destinationQueues == null || destinationQueues.isEmpty()) { - entry.discard(); + entry.delete(); logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); } @@ -602,7 +601,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } else { - entry.discard(); + entry.delete(); logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); } } @@ -787,7 +786,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { _unacknowledgedBytes.addAndGet(-entry.getSize()); _unacknowledgedCount.decrementAndGet(); - entry.discard(); + entry.delete(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index bb1d1949a2..5c674ef27e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -74,7 +74,6 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryInstanceProperties; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -736,7 +735,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { - unacked.discard(); + unacked.delete(); } } @@ -771,7 +770,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - unacked.discard(); + unacked.delete(); } } else @@ -1362,7 +1361,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { for(QueueEntry entry : _ackedMessages) { - entry.discard(); + entry.delete(); } } finally @@ -1571,19 +1570,19 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); - rejectedQueueEntry.discard(); + rejectedQueueEntry.delete(); return; } final List<? extends BaseQueue> destinationQueues = - altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry)); + altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties()); if (destinationQueues == null || destinationQueues.isEmpty()) { _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); - rejectedQueueEntry.discard(); + rejectedQueueEntry.delete(); return; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index f04475eb33..c3489d8c82 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -73,6 +73,8 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; @@ -88,7 +90,6 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -348,7 +349,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * Process the data block. * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}. * - * @throws an AMQConnectionException if unable to process the data block. In this case, + * @throws AMQConnectionException if unable to process the data block. In this case, * the connection is already closed by the time the exception is thrown. If any other * type of exception is thrown, the connection is not already closed. */ @@ -376,7 +377,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * Handle the supplied frame. * Adds this frame's channel to {@link #_channelsForCurrentMessage}. * - * @throws an AMQConnectionException if unable to process the data block. In this case, + * @throws AMQConnectionException if unable to process the data block. In this case, * the connection is already closed by the time the exception is thrown. If any other * type of exception is thrown, the connection is not already closed. */ @@ -1667,12 +1668,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _channelId = channelId; } - public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + @Override + public void deliverToClient(final Subscription sub, final ServerMessage message, + final InstanceProperties props, final long deliveryTag) throws AMQException { - registerMessageDelivered(entry.getMessage().getSize()); - _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag()); - entry.incrementDeliveryCount(); + registerMessageDelivered(message.getSize()); + _protocolOutputConverter.writeDeliver(message, + props, + _channelId, + deliveryTag, + ((SubscriptionImpl)sub).getConsumerTag()); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 5e416b52ca..ce0ef6cf50 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -115,7 +115,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor public void postCommit() { - node.discard(); + node.delete(); } public void onRollback() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index d48e8b3dea..3b087d263a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; -import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -36,6 +35,9 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.SubscriptionActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -132,7 +134,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry, deliveryTag); + sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -147,7 +149,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public static class NoAckSubscription extends SubscriptionImpl { - private volatile AutoCommitTransaction _txn; + private final AutoCommitTransaction _txn; public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, @@ -157,6 +159,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage throws AMQException { super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); + _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore()); } @@ -192,23 +195,22 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - if(_txn == null) - { - _txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); - } _txn.dequeue(getQueue(), entry.getMessage(), NOOP); - entry.dequeue(); + ServerMessage message = entry.getMessage(); + MessageReference ref = message.newReference(); + InstanceProperties props = entry.getInstanceProperties(); + entry.delete(); synchronized (getChannel()) { getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry, deliveryTag); + sendToClient(message, props, deliveryTag); } - entry.dispose(); + ref.release(); } @@ -301,8 +303,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage addUnacknowledgedMessage(entry); recordMessageDelivery(entry, deliveryTag); - sendToClient(entry, deliveryTag); - + sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + entry.incrementDeliveryCount(); } } @@ -688,12 +690,12 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } - protected void sendToClient(final QueueEntry entry, final long deliveryTag) + protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException { - _deliveryMethod.deliverToClient(this,entry,deliveryTag); + _deliveryMethod.deliverToClient(this, message, props, deliveryTag); _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(entry.getSize()); + _deliveredBytes.addAndGet(message.getSize()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 5238a41e49..4b569ccc71 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -28,10 +28,11 @@ import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -126,21 +127,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod() { - public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) + @Override + public void deliverToClient(final Subscription sub, final ServerMessage message, final + InstanceProperties props, final long deliveryTag) throws AMQException { - singleMessageCredit.useCreditForMessage(entry.getMessage().getSize()); - if(entry.getMessage() instanceof AMQMessage) - { - session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(), - deliveryTag, queue.getMessageCount()); - entry.incrementDeliveryCount(); - } - else - { - //TODO Convert AMQP 0-10 message - throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Not implemented conversion of 0-10 message", null); - } + singleMessageCredit.useCreditForMessage(message.getSize()); + session.getProtocolOutputConverter().writeGetOk(message, + props, + channel.getChannelId(), + deliveryTag, + queue.getMessageCount()); + } }; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java index 0cfdff3338..f2ab154b32 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java @@ -79,7 +79,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); if(message != null) { - message.discard(); + message.delete(); } return; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java index 48e42ce5a3..2dae7d3e9a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java @@ -31,7 +31,9 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
@@ -44,10 +46,17 @@ public interface ProtocolOutputConverter ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ void writeDeliver(final ServerMessage msg,
+ final InstanceProperties props, int channelId,
+ long deliveryTag,
+ AMQShortString consumerTag)
throws AMQException;
- void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+ void writeGetOk(final ServerMessage msg,
+ final InstanceProperties props,
+ int channelId,
+ long deliveryTag,
+ int queueSize) throws AMQException;
byte getProtocolMinorVersion();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java index dd5e13e56a..290a859df6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java @@ -33,13 +33,13 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; import java.io.DataOutput; import java.io.IOException; @@ -64,24 +64,27 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _protocolSession; } - public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) + public void writeDeliver(final ServerMessage m, + final InstanceProperties props, int channelId, + long deliveryTag, + AMQShortString consumerTag) throws AMQException { - AMQMessage msg = convertToAMQMessage(entry); - AMQBody deliverBody = createEncodedDeliverBody(msg, entry.isRedelivered(), deliveryTag, consumerTag); + final AMQMessage msg = convertToAMQMessage(m); + final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); + AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag); writeMessageDelivery(msg, channelId, deliverBody); } - private AMQMessage convertToAMQMessage(QueueEntry entry) + private AMQMessage convertToAMQMessage(ServerMessage serverMessage) { - ServerMessage serverMessage = entry.getMessage(); if(serverMessage instanceof AMQMessage) { return (AMQMessage) serverMessage; } else { - return getMessageConverter(serverMessage).convert(serverMessage, entry.getQueue().getVirtualHost()); + return getMessageConverter(serverMessage).convert(serverMessage, _protocolSession.getVirtualHost()); } } @@ -186,10 +189,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } - public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException + public void writeGetOk(final ServerMessage msg, + final InstanceProperties props, + int channelId, + long deliveryTag, + int queueSize) throws AMQException { - AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); - writeMessageDelivery(convertToAMQMessage(entry), channelId, deliver); + AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize); + writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver); } @@ -274,18 +281,18 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } - private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) + private AMQBody createEncodedGetOkBody(ServerMessage msg, InstanceProperties props, long deliveryTag, int queueSize) throws AMQException { final AMQShortString exchangeName; final AMQShortString routingKey; - final AMQMessage message = convertToAMQMessage(entry); + final AMQMessage message = convertToAMQMessage(msg); final MessagePublishInfo pb = message.getMessagePublishInfo(); exchangeName = pb.getExchange(); routingKey = pb.getRoutingKey(); - final boolean isRedelivered = entry.isRedelivered(); + final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); BasicGetOkBody getOkBody = _methodRegistry.createBasicGetOkBody(deliveryTag, diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 1680a16b42..4a5ada89c2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -41,7 +41,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; @@ -133,11 +135,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } } - // *** ProtocolOutputConverter Implementation - public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - } - public ClientDeliveryMethod createDeliveryMethod(int channelId) { return new InternalWriteDeliverMethod(channelId); @@ -147,7 +144,10 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { } - public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException + public void writeDeliver(final ServerMessage msg, + final InstanceProperties props, int channelId, + long deliveryTag, + AMQShortString consumerTag) throws AMQException { _deliveryCount.incrementAndGet(); @@ -169,11 +169,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumers.put(consumerTag, consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); + consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg)); } } - public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException + public void writeGetOk(final ServerMessage msg, + final InstanceProperties props, + int channelId, + long deliveryTag, + int queueSize) throws AMQException { } @@ -195,15 +199,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr public class DeliveryPair { private long _deliveryTag; - private AMQMessage _message; + private ServerMessage _message; - public DeliveryPair(long deliveryTag, AMQMessage message) + public DeliveryPair(long deliveryTag, ServerMessage message) { _deliveryTag = deliveryTag; _message = message; } - public AMQMessage getMessage() + public ServerMessage getMessage() { return _message; } @@ -242,7 +246,9 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } - public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException + @Override + public void deliverToClient(Subscription sub, ServerMessage message, + InstanceProperties props, long deliveryTag) throws AMQException { _deliveryCount.incrementAndGet(); @@ -264,7 +270,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); + consumerDelivers.add(new DeliveryPair(deliveryTag, message)); } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 034927c56b..4abf1bf76b 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -631,7 +631,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { public void postCommit() { - queueEntry.discard(); + queueEntry.delete(); } public void onRollback() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index e5f3a52e3b..6a3f5b46e1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -148,14 +148,12 @@ class public boolean hasInterest(final QueueEntry entry) { - if(entry.getMessage() instanceof Message_1_0) + if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference()) { - if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference()) - { - return false; - } + return false; } - else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) + else if(!(entry.getMessage() instanceof Message_1_0) + && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) { return false; } @@ -537,7 +535,7 @@ class { if(_queueEntry.isAcquiredBy(Subscription_1_0.this)) { - _queueEntry.discard(); + _queueEntry.delete(); } } |
