diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:29:21 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:29:21 +0000 |
| commit | 95fc93485ab66966713611a4e1429d917dabde64 (patch) | |
| tree | 09ee31bc9462449dbcfc62379a393017c8f39843 /qpid/java/broker-plugins | |
| parent | 28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (diff) | |
| download | qpid-python-95fc93485ab66966713611a4e1429d917dabde64.tar.gz | |
QPID-6164 : Add synchronous publish capability to 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
3 files changed, 130 insertions, 6 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 5ca94891c1..d3ddaa16dd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -201,6 +201,8 @@ public class AMQChannel private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; + private boolean _confirmOnPublish; + private long _confirmedMessageCounter; public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) @@ -394,6 +396,11 @@ public class AMQChannel // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) { + if(_confirmOnPublish) + { + _confirmedMessageCounter++; + } + try { @@ -421,6 +428,10 @@ public class AMQChannel if(!checkMessageUserId(_currentMessage.getContentHeader())) { + if(_confirmOnPublish) + { + _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false))); + } _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage)); } else @@ -461,6 +472,12 @@ public class AMQChannel } else { + if(_confirmOnPublish) + { + BasicAckBody responseBody = _connection.getMethodRegistry() + .createBasicAckBody(_confirmedMessageCounter, false); + _connection.writeFrame(responseBody.generateFrame(_channelId)); + } incrementOutstandingTxnsIfNecessary(); } } @@ -503,7 +520,7 @@ public class AMQChannel description, mandatory, isTransactional(), closeOnNoRoute)); } - if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute()) + if (mandatory && isTransactional() && !_confirmOnPublish && _connection.isCloseWhenNoRoute()) { _connection.closeConnection(AMQConstant.NO_ROUTE, "No route for message " + currentMessageDescription(), _channelId); @@ -512,6 +529,10 @@ public class AMQChannel { if (mandatory || message.isImmediate()) { + if(_confirmOnPublish) + { + _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false))); + } _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), @@ -2236,8 +2257,6 @@ public class AMQChannel if (requeue) { - //this requeue represents a message rejected from the pre-dispatch queue - //therefore we need to amend the delivery counter. message.decrementDeliveryCount(); requeue(deliveryTag); @@ -2359,6 +2378,85 @@ public class AMQChannel } @Override + public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]"); + } + + Map<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<>(); + _unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap); + + for(MessageInstance message : nackedMessageMap.values()) + { + + if (message == null) + { + _logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag); + } + else + { + + if (message.getMessage() == null) + { + _logger.warn("Message has already been purged, unable to nack."); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Nack-ing: DT:" + deliveryTag + + "-" + message.getMessage() + + ": Requeue:" + requeue + + + " on channel:" + debugIdentity()); + } + + if (requeue) + { + message.decrementDeliveryCount(); + + requeue(deliveryTag); + } + else + { + message.reject(); + + final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + + maxDeliveryCountEnabled + + " deliveryTag " + + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + + deliveredTooManyTimes + + " deliveryTag " + + deliveryTag); + if (deliveredTooManyTimes) + { + deadLetter(deliveryTag); + } + else + { + message.incrementDeliveryCount(); + } + } + else + { + requeue(deliveryTag); + } + } + } + } + + } + + } + + @Override public void receiveChannelFlow(final boolean active) { if(_logger.isDebugEnabled()) @@ -3355,6 +3453,21 @@ public class AMQChannel resend(); } + @Override + public void receiveConfirmSelect(final boolean nowait) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ConfirmSelect [ nowait: " + nowait + " ]"); + } + _confirmOnPublish = true; + + if(!nowait) + { + _connection.writeFrame(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE)); + } + } + private void closeChannel(final AMQConstant cause, final String message) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 413cf49eaf..49db24be52 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -85,6 +85,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; @@ -432,6 +433,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, String.valueOf(_closeWhenNoRoute)); serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(_broker.isMessageCompressionEnabled())); + serverProperties.setString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED, Boolean.TRUE.toString()); AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) pv.getActualMinorVersion(), @@ -1119,9 +1121,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _currentClassId, _currentMethodId); - writeFrame(closeBody.generateFrame(0)); + try + { + writeFrame(closeBody.generateFrame(0)); + + _sender.close(); + } + catch(SenderException e) + { + // ignore + } - _sender.close(); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java index bd7b070cd2..198b7fe21b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.util.Collection; +import java.util.Map; import java.util.Set; import org.apache.qpid.AMQException; @@ -63,7 +64,7 @@ public interface UnacknowledgedMessageMap Set<Long> getDeliveryTags(); Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple); - + void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs); } |
