summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:29:21 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:29:21 +0000
commit95fc93485ab66966713611a4e1429d917dabde64 (patch)
tree09ee31bc9462449dbcfc62379a393017c8f39843 /qpid/java/broker-plugins
parent28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (diff)
downloadqpid-python-95fc93485ab66966713611a4e1429d917dabde64.tar.gz
QPID-6164 : Add synchronous publish capability to 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java119
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java3
3 files changed, 130 insertions, 6 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 5ca94891c1..d3ddaa16dd 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -201,6 +201,8 @@ public class AMQChannel
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
+ private boolean _confirmOnPublish;
+ private long _confirmedMessageCounter;
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
@@ -394,6 +396,11 @@ public class AMQChannel
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
{
+ if(_confirmOnPublish)
+ {
+ _confirmedMessageCounter++;
+ }
+
try
{
@@ -421,6 +428,10 @@ public class AMQChannel
if(!checkMessageUserId(_currentMessage.getContentHeader()))
{
+ if(_confirmOnPublish)
+ {
+ _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false)));
+ }
_transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage));
}
else
@@ -461,6 +472,12 @@ public class AMQChannel
}
else
{
+ if(_confirmOnPublish)
+ {
+ BasicAckBody responseBody = _connection.getMethodRegistry()
+ .createBasicAckBody(_confirmedMessageCounter, false);
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+ }
incrementOutstandingTxnsIfNecessary();
}
}
@@ -503,7 +520,7 @@ public class AMQChannel
description, mandatory, isTransactional(), closeOnNoRoute));
}
- if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
+ if (mandatory && isTransactional() && !_confirmOnPublish && _connection.isCloseWhenNoRoute())
{
_connection.closeConnection(AMQConstant.NO_ROUTE,
"No route for message " + currentMessageDescription(), _channelId);
@@ -512,6 +529,10 @@ public class AMQChannel
{
if (mandatory || message.isImmediate())
{
+ if(_confirmOnPublish)
+ {
+ _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false)));
+ }
_transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
"No Route for message "
+ currentMessageDescription(),
@@ -2236,8 +2257,6 @@ public class AMQChannel
if (requeue)
{
- //this requeue represents a message rejected from the pre-dispatch queue
- //therefore we need to amend the delivery counter.
message.decrementDeliveryCount();
requeue(deliveryTag);
@@ -2359,6 +2378,85 @@ public class AMQChannel
}
@Override
+ public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]");
+ }
+
+ Map<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<>();
+ _unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap);
+
+ for(MessageInstance message : nackedMessageMap.values())
+ {
+
+ if (message == null)
+ {
+ _logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag);
+ }
+ else
+ {
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message has already been purged, unable to nack.");
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Nack-ing: DT:" + deliveryTag
+ + "-" + message.getMessage() +
+ ": Requeue:" + requeue
+ +
+ " on channel:" + debugIdentity());
+ }
+
+ if (requeue)
+ {
+ message.decrementDeliveryCount();
+
+ requeue(deliveryTag);
+ }
+ else
+ {
+ message.reject();
+
+ final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: "
+ + maxDeliveryCountEnabled
+ + " deliveryTag "
+ + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: "
+ + deliveredTooManyTimes
+ + " deliveryTag "
+ + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ deadLetter(deliveryTag);
+ }
+ else
+ {
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ requeue(deliveryTag);
+ }
+ }
+ }
+ }
+
+ }
+
+ }
+
+ @Override
public void receiveChannelFlow(final boolean active)
{
if(_logger.isDebugEnabled())
@@ -3355,6 +3453,21 @@ public class AMQChannel
resend();
}
+ @Override
+ public void receiveConfirmSelect(final boolean nowait)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ConfirmSelect [ nowait: " + nowait + " ]");
+ }
+ _confirmOnPublish = true;
+
+ if(!nowait)
+ {
+ _connection.writeFrame(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE));
+ }
+ }
+
private void closeChannel(final AMQConstant cause, final String message)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 413cf49eaf..49db24be52 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -85,6 +85,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
@@ -432,6 +433,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
String.valueOf(_closeWhenNoRoute));
serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
String.valueOf(_broker.isMessageCompressionEnabled()));
+ serverProperties.setString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED, Boolean.TRUE.toString());
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) pv.getActualMinorVersion(),
@@ -1119,9 +1121,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_currentClassId,
_currentMethodId);
- writeFrame(closeBody.generateFrame(0));
+ try
+ {
+ writeFrame(closeBody.generateFrame(0));
+
+ _sender.close();
+ }
+ catch(SenderException e)
+ {
+ // ignore
+ }
- _sender.close();
}
finally
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
index bd7b070cd2..198b7fe21b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import org.apache.qpid.AMQException;
@@ -63,7 +64,7 @@ public interface UnacknowledgedMessageMap
Set<Long> getDeliveryTags();
Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
-
+ void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs);
}