summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-03-05 12:17:54 +0000
committerRobert Gemmell <robbie@apache.org>2012-03-05 12:17:54 +0000
commit2637290ed5226aadc31c44ea0a8cfb31e3a3843b (patch)
treee5b23745cc28a1201ef17198d8bcd3f01a4f5987 /qpid/java/broker/src/main
parentbb2137a1492b9f16cd9e2f94da58bf03e142d27d (diff)
downloadqpid-python-2637290ed5226aadc31c44ea0a8cfb31e3a3843b.tar.gz
QPID-3881: Ensure we only put 0-8/0-9/0-9-1 messages in the store if they are actually routable. Remove some unused and test-only methods.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1297026 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java29
2 files changed, 38 insertions, 53 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1724b0e4ec..588b2079f2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -295,26 +296,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_currentMessage.setExpiration();
+ _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
- MessageMetaData mmd = _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
- final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd);
- _currentMessage.setStoredMessage(handle);
-
- routeCurrentMessage();
-
-
- _transaction.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- }
-
- public void onRollback()
- {
- handle.remove();
- }
- });
+ _currentMessage.route();
deliverCurrentMessageIfComplete();
}
@@ -346,17 +330,41 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
{
_actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
}
-
}
else
{
+ final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData());
+ _currentMessage.setStoredMessage(handle);
+ int bodyCount = _currentMessage.getBodyCount();
+ if(bodyCount > 0)
+ {
+ long bodyLengthReceived = 0;
+ for(int i = 0 ; i < bodyCount ; i++)
+ {
+ ContentChunk contentChunk = _currentMessage.getContentChunk(i);
+ handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
+ bodyLengthReceived += contentChunk.getSize();
+ }
+ }
+
+ _transaction.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ }
+
+ public void onRollback()
+ {
+ handle.remove();
+ }
+ });
+
_transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
+ updateTransactionalActivity();
+ _currentMessage.getStoredMessage().flushToStore();
}
}
- _currentMessage.getStoredMessage().flushToStore();
-
}
finally
{
@@ -383,9 +391,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
try
{
-
- // returns true iff the message was delivered (i.e. if all data was
- // received
final ContentChunk contentChunk =
_session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
@@ -409,11 +414,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- protected void routeCurrentMessage() throws AMQException
- {
- _currentMessage.route();
- }
-
public long getNextDeliveryTag()
{
return ++_deliveryTag;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index cb018f1772..c5a610c7b6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -70,8 +70,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private Exchange _exchange;
-
- private int _receivedChunkCount = 0;
private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
// we keep both the original meta data object and the store reference to it just in case the
@@ -132,12 +130,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
}
- public MessageMetaData headersReceived()
- {
-
- return headersReceived(System.currentTimeMillis());
- }
-
public MessageMetaData headersReceived(long currentTime)
{
_messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
@@ -150,16 +142,10 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _destinationQueues;
}
- public int addContentBodyFrame(final ContentChunk contentChunk)
- throws AMQException
+ public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
-
-
-
- return _receivedChunkCount++;
}
public boolean allContentReceived()
@@ -259,18 +245,12 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _expiration;
}
- public int getReceivedChunkCount()
- {
- return _receivedChunkCount;
- }
-
-
public int getBodyCount() throws AMQException
{
return _contentChunks.size();
}
- public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+ public ContentChunk getContentChunk(int index)
{
return _contentChunks.get(index);
}
@@ -330,4 +310,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
{
return _connectionReference;
}
+
+ public MessageMetaData getMessageMetaData()
+ {
+ return _messageMetaData;
+ }
}