diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-03-05 12:17:54 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-03-05 12:17:54 +0000 |
| commit | 2637290ed5226aadc31c44ea0a8cfb31e3a3843b (patch) | |
| tree | e5b23745cc28a1201ef17198d8bcd3f01a4f5987 /qpid/java/broker/src/main | |
| parent | bb2137a1492b9f16cd9e2f94da58bf03e142d27d (diff) | |
| download | qpid-python-2637290ed5226aadc31c44ea0a8cfb31e3a3843b.tar.gz | |
QPID-3881: Ensure we only put 0-8/0-9/0-9-1 messages in the store if they are actually routable. Remove some unused and test-only methods.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1297026 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 62 | ||||
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java | 29 |
2 files changed, 38 insertions, 53 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1724b0e4ec..588b2079f2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -295,26 +296,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _currentMessage.setExpiration(); + _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime()); - MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime()); - final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd); - _currentMessage.setStoredMessage(handle); - - routeCurrentMessage(); - - - _transaction.addPostTransactionAction(new ServerTransaction.Action() - { - - public void postCommit() - { - } - - public void onRollback() - { - handle.remove(); - } - }); + _currentMessage.route(); deliverCurrentMessageIfComplete(); } @@ -346,17 +330,41 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm { _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); } - } else { + final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData()); + _currentMessage.setStoredMessage(handle); + int bodyCount = _currentMessage.getBodyCount(); + if(bodyCount > 0) + { + long bodyLengthReceived = 0; + for(int i = 0 ; i < bodyCount ; i++) + { + ContentChunk contentChunk = _currentMessage.getContentChunk(i); + handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); + bodyLengthReceived += contentChunk.getSize(); + } + } + + _transaction.addPostTransactionAction(new ServerTransaction.Action() + { + public void postCommit() + { + } + + public void onRollback() + { + handle.remove(); + } + }); + _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime()); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); + updateTransactionalActivity(); + _currentMessage.getStoredMessage().flushToStore(); } } - _currentMessage.getStoredMessage().flushToStore(); - } finally { @@ -383,9 +391,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm try { - - // returns true iff the message was delivered (i.e. if all data was - // received final ContentChunk contentChunk = _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody); @@ -409,11 +414,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } - protected void routeCurrentMessage() throws AMQException - { - _currentMessage.route(); - } - public long getNextDeliveryTag() { return ++_deliveryTag; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index cb018f1772..c5a610c7b6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -70,8 +70,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes private Exchange _exchange; - - private int _receivedChunkCount = 0; private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>(); // we keep both the original meta data object and the store reference to it just in case the @@ -132,12 +130,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes } - public MessageMetaData headersReceived() - { - - return headersReceived(System.currentTimeMillis()); - } - public MessageMetaData headersReceived(long currentTime) { _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime); @@ -150,16 +142,10 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _destinationQueues; } - public int addContentBodyFrame(final ContentChunk contentChunk) - throws AMQException + public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException { - _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); - - - - return _receivedChunkCount++; } public boolean allContentReceived() @@ -259,18 +245,12 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _expiration; } - public int getReceivedChunkCount() - { - return _receivedChunkCount; - } - - public int getBodyCount() throws AMQException { return _contentChunks.size(); } - public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException + public ContentChunk getContentChunk(int index) { return _contentChunks.get(index); } @@ -330,4 +310,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes { return _connectionReference; } + + public MessageMetaData getMessageMetaData() + { + return _messageMetaData; + } } |
