summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-04-03 22:21:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-04-03 22:21:05 +0000
commit8bcfb7bb278644a547bddf4719265d806ea69d72 (patch)
tree6aa69e2381cb2c7d71cbe59cbc8ed8005667937b /qpid/java/broker-plugins
parent47f4f5148f7a6f4fa3c214cc2efd4e4a3f44641c (diff)
downloadqpid-python-8bcfb7bb278644a547bddf4719265d806ea69d72.tar.gz
QPID-6476 : [Java Broker] Refactor MessageStore to allow more efficient implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671184 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java33
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java2
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java16
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java7
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java7
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java63
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java6
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java7
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java7
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java1
25 files changed, 93 insertions, 132 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 5affe3019c..db053eef26 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -377,7 +377,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private void forceDequeue(final MessageInstance entry, final boolean restoreCredit)
{
AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore());
- dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(),
+ dequeueTxn.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
public void postCommit()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 69abcd7727..bd04db11ae 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -76,12 +76,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
int size = messageContent.length - offsetInMessage;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 209eae9ad1..bd99458d51 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -83,12 +83,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
return serverMsg.getContent(dst, offsetInMessage);
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 67204427fb..12f79cf203 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -514,7 +514,7 @@ public class ServerSession extends Session
public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
{
- _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
+ _transaction.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 75a67c6c2a..694dda4da2 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -40,6 +40,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
@@ -493,13 +494,14 @@ public class ServerSessionDelegate extends SessionDelegate
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
final MessageMetaData_0_10 messageMetaData, final MessageStore store)
{
- final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+ final MessageHandle<MessageMetaData_0_10> addedMessage = store.addMessage(messageMetaData);
ByteBuffer body = xfr.getBody();
if(body != null)
{
- storeMessage.addContent(0, body);
+ addedMessage.addContent(body);
}
- return storeMessage;
+ final StoredMessage<MessageMetaData_0_10> storedMessage = addedMessage.allContentAdded();
+ return storedMessage;
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 16ea23b765..522408910d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -96,6 +96,7 @@ import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -429,22 +430,24 @@ public class AMQChannel
contentHeader,
getConnection().getLastReceivedTime());
- final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
- final AMQMessage amqMessage = createAMQMessage(handle);
- MessageReference reference = amqMessage.newReference();
- try
+ final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+ int bodyCount = _currentMessage.getBodyCount();
+ if(bodyCount > 0)
{
- int bodyCount = _currentMessage.getBodyCount();
- if(bodyCount > 0)
+ long bodyLengthReceived = 0;
+ for(int i = 0 ; i < bodyCount ; i++)
{
- long bodyLengthReceived = 0;
- for(int i = 0 ; i < bodyCount ; i++)
- {
- ContentBody contentChunk = _currentMessage.getContentChunk(i);
- handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload()));
- bodyLengthReceived += contentChunk.getSize();
- }
+ ContentBody contentChunk = _currentMessage.getContentChunk(i);
+ handle.addContent(ByteBuffer.wrap(contentChunk.getPayload()));
+ bodyLengthReceived += contentChunk.getSize();
}
+ }
+ final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
+
+ final AMQMessage amqMessage = createAMQMessage(storedMessage);
+ MessageReference reference = amqMessage.newReference();
+ try
+ {
_currentMessage = null;
@@ -500,7 +503,7 @@ public class AMQChannel
.createBasicAckBody(_confirmedMessageCounter, false);
_connection.writeFrame(responseBody.generateFrame(_channelId));
}
- incrementUncommittedMessageSize(handle);
+ incrementUncommittedMessageSize(storedMessage);
incrementOutstandingTxnsIfNecessary();
}
}
@@ -1512,7 +1515,7 @@ public class AMQChannel
try
{
entry.delete();
- txn.dequeue(queue, message,
+ txn.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 5f7d5fe46e..d46ed719b2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -189,7 +189,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
- _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP);
+ _txn.dequeue(entry.getEnqueueRecord(), NOOP);
ServerMessage message = entry.getMessage();
MessageReference ref = message.newReference();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index 55746dfa95..ef42a1311f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -88,12 +88,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
int size = messageContent.length - offsetInMessage;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 55fc865850..406566cd4c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -105,22 +106,21 @@ public class AckTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
- final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd);
+ final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd).allContentAdded();
final StoredMessage storedMessage = result;
final AMQMessage message = new AMQMessage(storedMessage);
ServerTransaction txn = new AutoCommitTransaction(_messageStore);
txn.enqueue(_queue, message,
- new ServerTransaction.Action()
+ new ServerTransaction.EnqueueAction()
{
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
- _queue.enqueue(message,null);
+ _queue.enqueue(message,null, null);
}
public void onRollback()
{
- //To change body of implemented methods use File | Settings | File Templates.
}
});
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index dcdac07eae..7dd4734e6b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -178,7 +178,7 @@ public class AcknowledgeTest extends QpidTestCase
private void checkStoreContents(int messageCount)
{
MessageCounter counter = new MessageCounter();
- _messageStore.visitMessages(counter);
+ _messageStore.newMessageStoreReader().visitMessages(counter);
assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount());
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
index f21bf07fae..c6aea39aa6 100755
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
@@ -26,9 +26,10 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.StoredMessage;
-public class MockStoredMessage implements StoredMessage<MessageMetaData>
+public class MockStoredMessage implements StoredMessage<MessageMetaData>, MessageHandle<MessageMetaData>
{
private long _messageId;
private MessageMetaData _metaData;
@@ -72,12 +73,17 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
return _messageId;
}
- public void addContent(int offsetInMessage, ByteBuffer src)
+ public void addContent(ByteBuffer src)
{
src = src.duplicate();
- ByteBuffer dst = _content.duplicate();
- dst.position(offsetInMessage);
- dst.put(src);
+ _content.put(src);
+ }
+
+ @Override
+ public StoredMessage<MessageMetaData> allContentAdded()
+ {
+ _content.flip();
+ return this;
}
public int getContent(int offset, ByteBuffer dst)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index 816485c42b..264350ff8d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -135,7 +135,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
private void checkStoreContents(int messageCount)
{
MessageCounter counter = new MessageCounter();
- _messageStore.visitMessages(counter);
+ _messageStore.newMessageStoreReader().visitMessages(counter);
assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount());
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index 6059f7f4e5..de3b68a0bc 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -60,7 +60,7 @@ public class ReferenceCountingTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(info, chb);
- StoredMessage storedMessage = _store.addMessage(mmd);
+ StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded();
Transaction txn = _store.newTransaction();
txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
txn.commitTran();
@@ -78,7 +78,7 @@ public class ReferenceCountingTest extends QpidTestCase
private int getStoreMessageCount()
{
MessageCounter counter = new MessageCounter();
- _store.visitMessages(counter);
+ _store.newMessageStoreReader().visitMessages(counter);
return counter.getCount();
}
@@ -99,7 +99,7 @@ public class ReferenceCountingTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(info, chb);
- StoredMessage storedMessage = _store.addMessage(mmd);
+ StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded();
Transaction txn = _store.newTransaction();
txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
txn.commitTran();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index fa2e543f8d..680c05ec0a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -390,7 +390,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
if(outcome instanceof Accepted)
{
_queueEntry.lockAcquisition();
- txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
+ txn.dequeue(_queueEntry.getEnqueueRecord(),
new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 3572b98cad..e9e81bb623 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -235,12 +235,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
ByteBuffer buf = allData.duplicate();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
index d0843c349d..e8e31dcd15 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.ServerTransaction;
public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
@@ -45,16 +46,16 @@ public class QueueDestination extends MessageSourceDestination implements Sendin
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- txn.enqueue(getQueue(),message, new ServerTransaction.Action()
+ txn.enqueue(getQueue(),message, new ServerTransaction.EnqueueAction()
{
MessageReference _reference = message.newReference();
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
try
{
- getQueue().enqueue(message, null);
+ getQueue().enqueue(message, null, records[0]);
}
finally
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
index 83430e6008..5deec2bb35 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -42,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -150,17 +151,17 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv
_sectionDecoder,
immutableSections);
- StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd);
+ MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd);
boolean skipping = true;
int offset = 0;
for(ByteBuffer bareMessageBuf : immutableSections)
{
- storedMessage.addContent(offset, bareMessageBuf.duplicate());
+ handle.addContent(bareMessageBuf.duplicate());
offset += bareMessageBuf.remaining();
}
-
+ final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
MessageReference<Message_1_0> reference = message.newReference();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 0353b9375d..ec57284a78 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -410,59 +410,40 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(Boolean.TRUE.equals(detach.getClosed())
|| !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability)))
{
- while(!_consumer.trySendLock())
- {
- synchronized (endpoint.getLock())
- {
- try
- {
- endpoint.getLock().wait(100);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- try
- {
- _consumer.close();
+ _consumer.close();
- Modified state = new Modified();
- state.setDeliveryFailed(true);
+ Modified state = new Modified();
+ state.setDeliveryFailed(true);
- for(UnsettledAction action : _unsettledActionMap.values())
- {
+ for(UnsettledAction action : _unsettledActionMap.values())
+ {
- action.process(state,Boolean.TRUE);
- }
- _unsettledActionMap.clear();
+ action.process(state,Boolean.TRUE);
+ }
+ _unsettledActionMap.clear();
- endpoint.close();
+ endpoint.close();
- if(_destination instanceof ExchangeDestination
- && (_durability == TerminusDurability.CONFIGURATION
- || _durability == TerminusDurability.UNSETTLED_STATE))
+ if(_destination instanceof ExchangeDestination
+ && (_durability == TerminusDurability.CONFIGURATION
+ || _durability == TerminusDurability.UNSETTLED_STATE))
+ {
+ try
{
- try
- {
- _vhost.removeQueue((AMQQueue)_queue);
- }
- catch (AccessControlException e)
- {
- //TODO
- _logger.error("Error registering subscription", e);
- }
+ _vhost.removeQueue((AMQQueue)_queue);
}
-
- if(_closeAction != null)
+ catch (AccessControlException e)
{
- _closeAction.run();
+ //TODO
+ _logger.error("Error registering subscription", e);
}
}
- finally
+
+ if(_closeAction != null)
{
- _consumer.releaseSendLock();
+ _closeAction.run();
}
+
}
else if(detach.getError() != null
&& !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError()))
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 0c47ddf137..789789ac33 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -87,12 +87,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
int size = messageContent.length - offsetInMessage;
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 02c3373f85..a9637e9d4e 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -192,12 +192,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
return message.getContent(dst, offsetInMessage);
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index fbc809305e..ee16d96d5b 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -81,12 +81,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
return message_0_8.getContent(dst, offsetInMessage);
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 647e6be28a..d4529aedb1 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -89,12 +89,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
int size = messageContent.length - offsetInMessage;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 8a3ef65979..8e64757cbb 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -1071,6 +1072,12 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 501ce40db7..dd5b7540a7 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -90,6 +91,12 @@ class ManagementResponse implements MessageInstance
}
@Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return consumer == _consumer && !isDeleted();
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index e30571615c..4639236d60 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -256,6 +256,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/");
+ root.setCompactPath(true);
server.setHandler(root);
server.setSendServerVersion(false);
final ErrorHandler errorHandler = new ErrorHandler()