diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-11-24 13:08:57 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-11-24 13:08:57 +0000 |
| commit | 20be0aafe66dbf776e4097c2ead6d39671a777fd (patch) | |
| tree | 101f56b16d4eebd29017971a67af17771fde675c /qpid/java/broker-plugins | |
| parent | 69b55cde71675856699af2675e4c907fae80654e (diff) | |
| download | qpid-python-20be0aafe66dbf776e4097c2ead6d39671a777fd.tar.gz | |
QPID-6245 : [Java Broker] release reference to IncomingMessage once AMQMessage has been created
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1641389 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
| -rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java | 73 |
1 files changed, 36 insertions, 37 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 012d7bffd6..7604662980 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -409,16 +409,24 @@ public class AMQChannel _confirmedMessageCounter++; } Runnable finallyAction = null; + ContentHeaderBody contentHeader = _currentMessage.getContentHeader(); + + long bodySize = _currentMessage.getSize(); + long timestamp = contentHeader.getProperties().getTimestamp(); + try { + final MessagePublishInfo messagePublishInfo = _currentMessage.getMessagePublishInfo(); + final MessageDestination destination = _currentMessage.getDestination(); + final MessageMetaData messageMetaData = - new MessageMetaData(_currentMessage.getMessagePublishInfo(), - _currentMessage.getContentHeader(), + new MessageMetaData(messagePublishInfo, + contentHeader, getConnection().getLastReceivedTime()); final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); - final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle); + final AMQMessage amqMessage = createAMQMessage(handle); MessageReference reference = amqMessage.newReference(); try { @@ -434,7 +442,9 @@ public class AMQChannel } } - if(!checkMessageUserId(_currentMessage.getContentHeader())) + _currentMessage = null; + + if(!checkMessageUserId(contentHeader)) { if(_confirmOnPublish) { @@ -444,7 +454,7 @@ public class AMQChannel } else { - final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate(); + final boolean immediate = messagePublishInfo.isImmediate(); final InstanceProperties instanceProperties = new InstanceProperties() @@ -461,7 +471,7 @@ public class AMQChannel case PERSISTENT: return amqMessage.isPersistent(); case MANDATORY: - return _currentMessage.getMessagePublishInfo().isMandatory(); + return messagePublishInfo.isMandatory(); case REDELIVERED: return false; } @@ -469,11 +479,11 @@ public class AMQChannel } }; - int enqueues = _currentMessage.getDestination().send(amqMessage, - amqMessage.getInitialRoutingAddress(), - instanceProperties, _transaction, - immediate ? _immediateAction : _capacityCheckAction - ); + int enqueues = destination.send(amqMessage, + amqMessage.getInitialRoutingAddress(), + instanceProperties, _transaction, + immediate ? _immediateAction : _capacityCheckAction + ); if(enqueues == 0) { finallyAction = handleUnroutableMessage(amqMessage); @@ -503,8 +513,6 @@ public class AMQChannel } finally { - long bodySize = _currentMessage.getSize(); - long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp(); _connection.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } @@ -553,7 +561,17 @@ public class AMQChannel private Runnable handleUnroutableMessage(AMQMessage message) { boolean mandatory = message.isMandatory(); - String description = currentMessageDescription(); + + String exchangeName = message.getMessagePublishInfo().getExchange() == null + ? null : message.getMessagePublishInfo().getExchange().asString(); + String routingKey = message.getMessagePublishInfo().getRoutingKey() == null + ? null : message.getMessagePublishInfo().getRoutingKey().asString(); + + final String description = String.format( + "[Exchange: %s, Routing key: %s]", + exchangeName, + routingKey); + boolean closeOnNoRoute = _connection.isCloseWhenNoRoute(); Runnable returnVal = null; if(_logger.isDebugEnabled()) @@ -571,7 +589,7 @@ public class AMQChannel public void run() { _connection.closeConnection(AMQConstant.NO_ROUTE, - "No route for message " + currentMessageDescription(), _channelId); + "No route for message " + description, _channelId); } }; @@ -586,37 +604,18 @@ public class AMQChannel } _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " - + currentMessageDescription(), + + description, message)); } else { - AMQShortString exchangeName = _currentMessage.getExchangeName(); - AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey(); - getVirtualHost().getEventLogger().message( - ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(), - routingKey == null ? null : routingKey.asString())); + getVirtualHost().getEventLogger().message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey)); } } return returnVal; } - private String currentMessageDescription() - { - if(_currentMessage == null || !_currentMessage.allContentReceived()) - { - throw new IllegalStateException("Cannot create message description for message: " + _currentMessage); - } - - return String.format( - "[Exchange: %s, Routing key: %s]", - _currentMessage.getExchangeName(), - _currentMessage.getMessagePublishInfo().getRoutingKey() == null - ? null - : _currentMessage.getMessagePublishInfo().getRoutingKey().toString()); - } - public void publishContentBody(ContentBody contentBody) { if (_logger.isDebugEnabled()) @@ -1347,7 +1346,7 @@ public class AMQChannel } - private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle) + private AMQMessage createAMQMessage(StoredMessage<MessageMetaData> handle) { AMQMessage message = new AMQMessage(handle, _connection.getReference()); |
