From 8bcfb7bb278644a547bddf4719265d806ea69d72 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 3 Apr 2015 22:21:05 +0000 Subject: QPID-6476 : [Java Broker] Refactor MessageStore to allow more efficient implementations git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671184 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/protocol/v0_10/ConsumerTarget_0_10.java | 2 +- .../v0_10/MessageConverter_Internal_to_v0_10.java | 6 --- .../protocol/v0_10/MessageConverter_v0_10.java | 6 --- .../qpid/server/protocol/v0_10/ServerSession.java | 2 +- .../protocol/v0_10/ServerSessionDelegate.java | 8 +-- .../qpid/server/protocol/v0_8/AMQChannel.java | 33 ++++++------ .../server/protocol/v0_8/ConsumerTarget_0_8.java | 2 +- .../v0_8/MessageConverter_Internal_to_v0_8.java | 6 --- .../apache/qpid/server/protocol/v0_8/AckTest.java | 10 ++-- .../qpid/server/protocol/v0_8/AcknowledgeTest.java | 2 +- .../server/protocol/v0_8/MockStoredMessage.java | 16 ++++-- .../protocol/v0_8/QueueBrowserUsesNoAckTest.java | 2 +- .../protocol/v0_8/ReferenceCountingTest.java | 6 +-- .../server/protocol/v1_0/ConsumerTarget_1_0.java | 2 +- .../protocol/v1_0/MessageConverter_to_1_0.java | 6 --- .../server/protocol/v1_0/QueueDestination.java | 7 +-- .../server/protocol/v1_0/ReceivingLink_1_0.java | 7 +-- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 63 ++++++++-------------- .../v0_10_v1_0/MessageConverter_1_0_to_v0_10.java | 6 --- .../v0_8_v0_10/MessageConverter_0_10_to_0_8.java | 6 --- .../v0_8_v0_10/MessageConverter_0_8_to_0_10.java | 6 --- .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 6 --- .../server/management/amqp/ManagementNode.java | 7 +++ .../server/management/amqp/ManagementResponse.java | 7 +++ .../server/management/plugin/HttpManagement.java | 1 + 25 files changed, 93 insertions(+), 132 deletions(-) (limited to 'qpid/java/broker-plugins') diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 5affe3019c..db053eef26 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -377,7 +377,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private void forceDequeue(final MessageInstance entry, final boolean restoreCredit) { AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore()); - dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(), + dequeueTxn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { public void postCommit() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index 69abcd7727..bd04db11ae 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -75,12 +75,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter createStoreMessage(final MessageTransfer xfr, final MessageMetaData_0_10 messageMetaData, final MessageStore store) { - final StoredMessage storeMessage = store.addMessage(messageMetaData); + final MessageHandle addedMessage = store.addMessage(messageMetaData); ByteBuffer body = xfr.getBody(); if(body != null) { - storeMessage.addContent(0, body); + addedMessage.addContent(body); } - return storeMessage; + final StoredMessage storedMessage = addedMessage.allContentAdded(); + return storedMessage; } @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 16ea23b765..522408910d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -96,6 +96,7 @@ import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; @@ -429,22 +430,24 @@ public class AMQChannel contentHeader, getConnection().getLastReceivedTime()); - final StoredMessage handle = _messageStore.addMessage(messageMetaData); - final AMQMessage amqMessage = createAMQMessage(handle); - MessageReference reference = amqMessage.newReference(); - try + final MessageHandle handle = _messageStore.addMessage(messageMetaData); + int bodyCount = _currentMessage.getBodyCount(); + if(bodyCount > 0) { - int bodyCount = _currentMessage.getBodyCount(); - if(bodyCount > 0) + long bodyLengthReceived = 0; + for(int i = 0 ; i < bodyCount ; i++) { - long bodyLengthReceived = 0; - for(int i = 0 ; i < bodyCount ; i++) - { - ContentBody contentChunk = _currentMessage.getContentChunk(i); - handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload())); - bodyLengthReceived += contentChunk.getSize(); - } + ContentBody contentChunk = _currentMessage.getContentChunk(i); + handle.addContent(ByteBuffer.wrap(contentChunk.getPayload())); + bodyLengthReceived += contentChunk.getSize(); } + } + final StoredMessage storedMessage = handle.allContentAdded(); + + final AMQMessage amqMessage = createAMQMessage(storedMessage); + MessageReference reference = amqMessage.newReference(); + try + { _currentMessage = null; @@ -500,7 +503,7 @@ public class AMQChannel .createBasicAckBody(_confirmedMessageCounter, false); _connection.writeFrame(responseBody.generateFrame(_channelId)); } - incrementUncommittedMessageSize(handle); + incrementUncommittedMessageSize(storedMessage); incrementOutstandingTxnsIfNecessary(); } } @@ -1512,7 +1515,7 @@ public class AMQChannel try { entry.delete(); - txn.dequeue(queue, message, + txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 5f7d5fe46e..d46ed719b2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -189,7 +189,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP); + _txn.dequeue(entry.getEnqueueRecord(), NOOP); ServerMessage message = entry.getMessage(); MessageReference ref = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index 55746dfa95..ef42a1311f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -87,12 +87,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter result =_messageStore.addMessage(mmd); + final StoredMessage result =_messageStore.addMessage(mmd).allContentAdded(); final StoredMessage storedMessage = result; final AMQMessage message = new AMQMessage(storedMessage); ServerTransaction txn = new AutoCommitTransaction(_messageStore); txn.enqueue(_queue, message, - new ServerTransaction.Action() + new ServerTransaction.EnqueueAction() { - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { - _queue.enqueue(message,null); + _queue.enqueue(message,null, null); } public void onRollback() { - //To change body of implemented methods use File | Settings | File Templates. } }); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index dcdac07eae..7dd4734e6b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -178,7 +178,7 @@ public class AcknowledgeTest extends QpidTestCase private void checkStoreContents(int messageCount) { MessageCounter counter = new MessageCounter(); - _messageStore.visitMessages(counter); + _messageStore.newMessageStoreReader().visitMessages(counter); assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java index f21bf07fae..c6aea39aa6 100755 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java @@ -26,9 +26,10 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MessagePublishInfo; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.StoredMessage; -public class MockStoredMessage implements StoredMessage +public class MockStoredMessage implements StoredMessage, MessageHandle { private long _messageId; private MessageMetaData _metaData; @@ -72,12 +73,17 @@ public class MockStoredMessage implements StoredMessage return _messageId; } - public void addContent(int offsetInMessage, ByteBuffer src) + public void addContent(ByteBuffer src) { src = src.duplicate(); - ByteBuffer dst = _content.duplicate(); - dst.position(offsetInMessage); - dst.put(src); + _content.put(src); + } + + @Override + public StoredMessage allContentAdded() + { + _content.flip(); + return this; } public int getContent(int offset, ByteBuffer dst) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index 816485c42b..264350ff8d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -135,7 +135,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase private void checkStoreContents(int messageCount) { MessageCounter counter = new MessageCounter(); - _messageStore.visitMessages(counter); + _messageStore.newMessageStoreReader().visitMessages(counter); assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 6059f7f4e5..de3b68a0bc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -60,7 +60,7 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); - StoredMessage storedMessage = _store.addMessage(mmd); + StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded(); Transaction txn = _store.newTransaction(); txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); txn.commitTran(); @@ -78,7 +78,7 @@ public class ReferenceCountingTest extends QpidTestCase private int getStoreMessageCount() { MessageCounter counter = new MessageCounter(); - _store.visitMessages(counter); + _store.newMessageStoreReader().visitMessages(counter); return counter.getCount(); } @@ -99,7 +99,7 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); - StoredMessage storedMessage = _store.addMessage(mmd); + StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded(); Transaction txn = _store.newTransaction(); txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); txn.commitTran(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index fa2e543f8d..680c05ec0a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -390,7 +390,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget if(outcome instanceof Accepted) { _queueEntry.lockAcquisition(); - txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), + txn.dequeue(_queueEntry.getEnqueueRecord(), new ServerTransaction.Action() { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 3572b98cad..e9e81bb623 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -234,12 +234,6 @@ public abstract class MessageConverter_to_1_0 implement return serverMessage.getMessageNumber(); } - @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java index d0843c349d..e8e31dcd15 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java @@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.txn.ServerTransaction; public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination @@ -45,16 +46,16 @@ public class QueueDestination extends MessageSourceDestination implements Sendin public Outcome send(final Message_1_0 message, ServerTransaction txn) { - txn.enqueue(getQueue(),message, new ServerTransaction.Action() + txn.enqueue(getQueue(),message, new ServerTransaction.EnqueueAction() { MessageReference _reference = message.newReference(); - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { try { - getQueue().enqueue(message, null); + getQueue().enqueue(message, null, records[0]); } finally { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 83430e6008..5deec2bb35 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -42,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -150,17 +151,17 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv _sectionDecoder, immutableSections); - StoredMessage storedMessage = _vhost.getMessageStore().addMessage(mmd); + MessageHandle handle = _vhost.getMessageStore().addMessage(mmd); boolean skipping = true; int offset = 0; for(ByteBuffer bareMessageBuf : immutableSections) { - storedMessage.addContent(offset, bareMessageBuf.duplicate()); + handle.addContent(bareMessageBuf.duplicate()); offset += bareMessageBuf.remaining(); } - + final StoredMessage storedMessage = handle.allContentAdded(); Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); MessageReference reference = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 0353b9375d..ec57284a78 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -410,59 +410,40 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(Boolean.TRUE.equals(detach.getClosed()) || !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability))) { - while(!_consumer.trySendLock()) - { - synchronized (endpoint.getLock()) - { - try - { - endpoint.getLock().wait(100); - } - catch (InterruptedException e) - { - } - } - } - try - { - _consumer.close(); + _consumer.close(); - Modified state = new Modified(); - state.setDeliveryFailed(true); + Modified state = new Modified(); + state.setDeliveryFailed(true); - for(UnsettledAction action : _unsettledActionMap.values()) - { + for(UnsettledAction action : _unsettledActionMap.values()) + { - action.process(state,Boolean.TRUE); - } - _unsettledActionMap.clear(); + action.process(state,Boolean.TRUE); + } + _unsettledActionMap.clear(); - endpoint.close(); + endpoint.close(); - if(_destination instanceof ExchangeDestination - && (_durability == TerminusDurability.CONFIGURATION - || _durability == TerminusDurability.UNSETTLED_STATE)) + if(_destination instanceof ExchangeDestination + && (_durability == TerminusDurability.CONFIGURATION + || _durability == TerminusDurability.UNSETTLED_STATE)) + { + try { - try - { - _vhost.removeQueue((AMQQueue)_queue); - } - catch (AccessControlException e) - { - //TODO - _logger.error("Error registering subscription", e); - } + _vhost.removeQueue((AMQQueue)_queue); } - - if(_closeAction != null) + catch (AccessControlException e) { - _closeAction.run(); + //TODO + _logger.error("Error registering subscription", e); } } - finally + + if(_closeAction != null) { - _consumer.releaseSendLock(); + _closeAction.run(); } + } else if(detach.getError() != null && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError())) diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 0c47ddf137..789789ac33 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -86,12 +86,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter implem ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.setContextPath("/"); + root.setCompactPath(true); server.setHandler(root); server.setSendServerVersion(false); final ErrorHandler errorHandler = new ErrorHandler() -- cgit v1.2.1