diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-04-03 22:21:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-04-03 22:21:05 +0000 |
| commit | 8bcfb7bb278644a547bddf4719265d806ea69d72 (patch) | |
| tree | 6aa69e2381cb2c7d71cbe59cbc8ed8005667937b /qpid/java/broker-plugins | |
| parent | 47f4f5148f7a6f4fa3c214cc2efd4e4a3f44641c (diff) | |
| download | qpid-python-8bcfb7bb278644a547bddf4719265d806ea69d72.tar.gz | |
QPID-6476 : [Java Broker] Refactor MessageStore to allow more efficient implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671184 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
25 files changed, 93 insertions, 132 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 5affe3019c..db053eef26 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -377,7 +377,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private void forceDequeue(final MessageInstance entry, final boolean restoreCredit) { AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore()); - dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(), + dequeueTxn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { public void postCommit() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index 69abcd7727..bd04db11ae 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -76,12 +76,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { int size = messageContent.length - offsetInMessage; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index 209eae9ad1..bd99458d51 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -83,12 +83,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { return serverMsg.getContent(dst, offsetInMessage); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 67204427fb..12f79cf203 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -514,7 +514,7 @@ public class ServerSession extends Session public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry) { - _transaction.dequeue(entry.getOwningResource(), entry.getMessage(), + _transaction.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 75a67c6c2a..694dda4da2 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -40,6 +40,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.ArrivalTimeFilter; @@ -493,13 +494,14 @@ public class ServerSessionDelegate extends SessionDelegate private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, final MessageMetaData_0_10 messageMetaData, final MessageStore store) { - final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); + final MessageHandle<MessageMetaData_0_10> addedMessage = store.addMessage(messageMetaData); ByteBuffer body = xfr.getBody(); if(body != null) { - storeMessage.addContent(0, body); + addedMessage.addContent(body); } - return storeMessage; + final StoredMessage<MessageMetaData_0_10> storedMessage = addedMessage.allContentAdded(); + return storedMessage; } @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 16ea23b765..522408910d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -96,6 +96,7 @@ import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; @@ -429,22 +430,24 @@ public class AMQChannel contentHeader, getConnection().getLastReceivedTime()); - final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); - final AMQMessage amqMessage = createAMQMessage(handle); - MessageReference reference = amqMessage.newReference(); - try + final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); + int bodyCount = _currentMessage.getBodyCount(); + if(bodyCount > 0) { - int bodyCount = _currentMessage.getBodyCount(); - if(bodyCount > 0) + long bodyLengthReceived = 0; + for(int i = 0 ; i < bodyCount ; i++) { - long bodyLengthReceived = 0; - for(int i = 0 ; i < bodyCount ; i++) - { - ContentBody contentChunk = _currentMessage.getContentChunk(i); - handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload())); - bodyLengthReceived += contentChunk.getSize(); - } + ContentBody contentChunk = _currentMessage.getContentChunk(i); + handle.addContent(ByteBuffer.wrap(contentChunk.getPayload())); + bodyLengthReceived += contentChunk.getSize(); } + } + final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded(); + + final AMQMessage amqMessage = createAMQMessage(storedMessage); + MessageReference reference = amqMessage.newReference(); + try + { _currentMessage = null; @@ -500,7 +503,7 @@ public class AMQChannel .createBasicAckBody(_confirmedMessageCounter, false); _connection.writeFrame(responseBody.generateFrame(_channelId)); } - incrementUncommittedMessageSize(handle); + incrementUncommittedMessageSize(storedMessage); incrementOutstandingTxnsIfNecessary(); } } @@ -1512,7 +1515,7 @@ public class AMQChannel try { entry.delete(); - txn.dequeue(queue, message, + txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 5f7d5fe46e..d46ed719b2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -189,7 +189,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP); + _txn.dequeue(entry.getEnqueueRecord(), NOOP); ServerMessage message = entry.getMessage(); MessageReference ref = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index 55746dfa95..ef42a1311f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -88,12 +88,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { int size = messageContent.length - offsetInMessage; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 55fc865850..406566cd4c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -105,22 +106,21 @@ public class AckTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis()); - final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd); + final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd).allContentAdded(); final StoredMessage storedMessage = result; final AMQMessage message = new AMQMessage(storedMessage); ServerTransaction txn = new AutoCommitTransaction(_messageStore); txn.enqueue(_queue, message, - new ServerTransaction.Action() + new ServerTransaction.EnqueueAction() { - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { - _queue.enqueue(message,null); + _queue.enqueue(message,null, null); } public void onRollback() { - //To change body of implemented methods use File | Settings | File Templates. } }); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index dcdac07eae..7dd4734e6b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -178,7 +178,7 @@ public class AcknowledgeTest extends QpidTestCase private void checkStoreContents(int messageCount) { MessageCounter counter = new MessageCounter(); - _messageStore.visitMessages(counter); + _messageStore.newMessageStoreReader().visitMessages(counter); assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java index f21bf07fae..c6aea39aa6 100755 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java @@ -26,9 +26,10 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MessagePublishInfo; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.StoredMessage; -public class MockStoredMessage implements StoredMessage<MessageMetaData> +public class MockStoredMessage implements StoredMessage<MessageMetaData>, MessageHandle<MessageMetaData> { private long _messageId; private MessageMetaData _metaData; @@ -72,12 +73,17 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> return _messageId; } - public void addContent(int offsetInMessage, ByteBuffer src) + public void addContent(ByteBuffer src) { src = src.duplicate(); - ByteBuffer dst = _content.duplicate(); - dst.position(offsetInMessage); - dst.put(src); + _content.put(src); + } + + @Override + public StoredMessage<MessageMetaData> allContentAdded() + { + _content.flip(); + return this; } public int getContent(int offset, ByteBuffer dst) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index 816485c42b..264350ff8d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -135,7 +135,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase private void checkStoreContents(int messageCount) { MessageCounter counter = new MessageCounter(); - _messageStore.visitMessages(counter); + _messageStore.newMessageStoreReader().visitMessages(counter); assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 6059f7f4e5..de3b68a0bc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -60,7 +60,7 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); - StoredMessage storedMessage = _store.addMessage(mmd); + StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded(); Transaction txn = _store.newTransaction(); txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); txn.commitTran(); @@ -78,7 +78,7 @@ public class ReferenceCountingTest extends QpidTestCase private int getStoreMessageCount() { MessageCounter counter = new MessageCounter(); - _store.visitMessages(counter); + _store.newMessageStoreReader().visitMessages(counter); return counter.getCount(); } @@ -99,7 +99,7 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); - StoredMessage storedMessage = _store.addMessage(mmd); + StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded(); Transaction txn = _store.newTransaction(); txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); txn.commitTran(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index fa2e543f8d..680c05ec0a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -390,7 +390,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget if(outcome instanceof Accepted) { _queueEntry.lockAcquisition(); - txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), + txn.dequeue(_queueEntry.getEnqueueRecord(), new ServerTransaction.Action() { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 3572b98cad..e9e81bb623 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -235,12 +235,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { ByteBuffer buf = allData.duplicate(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java index d0843c349d..e8e31dcd15 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java @@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.txn.ServerTransaction; public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination @@ -45,16 +46,16 @@ public class QueueDestination extends MessageSourceDestination implements Sendin public Outcome send(final Message_1_0 message, ServerTransaction txn) { - txn.enqueue(getQueue(),message, new ServerTransaction.Action() + txn.enqueue(getQueue(),message, new ServerTransaction.EnqueueAction() { MessageReference _reference = message.newReference(); - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { try { - getQueue().enqueue(message, null); + getQueue().enqueue(message, null, records[0]); } finally { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 83430e6008..5deec2bb35 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -42,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -150,17 +151,17 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv _sectionDecoder, immutableSections); - StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd); + MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd); boolean skipping = true; int offset = 0; for(ByteBuffer bareMessageBuf : immutableSections) { - storedMessage.addContent(offset, bareMessageBuf.duplicate()); + handle.addContent(bareMessageBuf.duplicate()); offset += bareMessageBuf.remaining(); } - + final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded(); Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); MessageReference<Message_1_0> reference = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 0353b9375d..ec57284a78 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -410,59 +410,40 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(Boolean.TRUE.equals(detach.getClosed()) || !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability))) { - while(!_consumer.trySendLock()) - { - synchronized (endpoint.getLock()) - { - try - { - endpoint.getLock().wait(100); - } - catch (InterruptedException e) - { - } - } - } - try - { - _consumer.close(); + _consumer.close(); - Modified state = new Modified(); - state.setDeliveryFailed(true); + Modified state = new Modified(); + state.setDeliveryFailed(true); - for(UnsettledAction action : _unsettledActionMap.values()) - { + for(UnsettledAction action : _unsettledActionMap.values()) + { - action.process(state,Boolean.TRUE); - } - _unsettledActionMap.clear(); + action.process(state,Boolean.TRUE); + } + _unsettledActionMap.clear(); - endpoint.close(); + endpoint.close(); - if(_destination instanceof ExchangeDestination - && (_durability == TerminusDurability.CONFIGURATION - || _durability == TerminusDurability.UNSETTLED_STATE)) + if(_destination instanceof ExchangeDestination + && (_durability == TerminusDurability.CONFIGURATION + || _durability == TerminusDurability.UNSETTLED_STATE)) + { + try { - try - { - _vhost.removeQueue((AMQQueue)_queue); - } - catch (AccessControlException e) - { - //TODO - _logger.error("Error registering subscription", e); - } + _vhost.removeQueue((AMQQueue)_queue); } - - if(_closeAction != null) + catch (AccessControlException e) { - _closeAction.run(); + //TODO + _logger.error("Error registering subscription", e); } } - finally + + if(_closeAction != null) { - _consumer.releaseSendLock(); + _closeAction.run(); } + } else if(detach.getError() != null && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError())) diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 0c47ddf137..789789ac33 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -87,12 +87,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { int size = messageContent.length - offsetInMessage; diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java index 02c3373f85..a9637e9d4e 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -192,12 +192,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { return message.getContent(dst, offsetInMessage); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index fbc809305e..ee16d96d5b 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -81,12 +81,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { return message_0_8.getContent(dst, offsetInMessage); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index 647e6be28a..d4529aedb1 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -89,12 +89,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { int size = messageContent.length - offsetInMessage; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 8a3ef65979..8e64757cbb 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -1071,6 +1072,12 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 501ce40db7..dd5b7540a7 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -90,6 +91,12 @@ class ManagementResponse implements MessageInstance } @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return consumer == _consumer && !isDeleted(); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index e30571615c..4639236d60 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -256,6 +256,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.setContextPath("/"); + root.setCompactPath(true); server.setHandler(root); server.setSendServerVersion(false); final ErrorHandler errorHandler = new ErrorHandler() |
