diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-01-25 15:38:45 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-01-25 15:38:45 +0000 |
| commit | e7ad0e742e5bd214bb750484047fde9bf434e1eb (patch) | |
| tree | 96f92bcf39e42484fd1058aea68620bdbb8a7fcf /qpid/java/broker/src | |
| parent | 6a359e9049e06c00e77ddbccb65b7f53cbc4b032 (diff) | |
| download | qpid-python-e7ad0e742e5bd214bb750484047fde9bf434e1eb.tar.gz | |
QPID-4550 : AMQP 1.0 Persistent Messages cause failure on restart
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1438556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
5 files changed, 127 insertions, 18 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java index 2cc1a92853..e01f20d54f 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java @@ -342,6 +342,14 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData { private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance(); + private MetaDataFactory() + { + _typeRegistry.registerTransportLayer(); + _typeRegistry.registerMessagingLayer(); + _typeRegistry.registerTransactionLayer(); + _typeRegistry.registerSecurityLayer(); + } + public MessageMetaData_1_0 createMetaData(ByteBuffer buf) { ValueHandler valueHandler = new ValueHandler(_typeRegistry); @@ -354,7 +362,8 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData try { ByteBuffer encodedBuf = buf.duplicate(); - sections.add((Section) valueHandler.parse(buf)); + Object parse = valueHandler.parse(buf); + sections.add((Section) parse); encodedBuf.limit(buf.position()); encodedSections.add(encodedBuf); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index baecb5b0fe..fbce1666b7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -23,7 +23,9 @@ package org.apache.qpid.server.protocol.v1_0; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageMetaData_1_0; @@ -33,11 +35,45 @@ import org.apache.qpid.server.store.StoredMessage; public class Message_1_0 implements ServerMessage, InboundMessage { + + + private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount"); + + private volatile int _referenceCount = 0; + private final StoredMessage<MessageMetaData_1_0> _storedMessage; private List<ByteBuffer> _fragments; private WeakReference<Session_1_0> _session; + public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage) + { + _storedMessage = storedMessage; + _session = null; + _fragments = restoreFragments(storedMessage); + } + + private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage) + { + ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>(); + final int FRAGMENT_SIZE = 2048; + int offset = 0; + ByteBuffer b; + do + { + + b = storedMessage.getContent(offset,FRAGMENT_SIZE); + if(b.hasRemaining()) + { + fragments.add(b); + offset+= b.remaining(); + } + } + while(b.hasRemaining()); + return fragments; + } + public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage, final List<ByteBuffer> fragments, final Session_1_0 session) @@ -142,7 +178,61 @@ public class Message_1_0 implements ServerMessage, InboundMessage public Session_1_0 getSession() { - return _session.get(); + return _session == null ? null : _session.get(); + } + + + public boolean incrementReference() + { + if(_refCountUpdater.incrementAndGet(this) <= 0) + { + _refCountUpdater.decrementAndGet(this); + return false; + } + else + { + return true; + } + } + + /** + * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the + * message store. + * + * + * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed + */ + public void decrementReference() + { + int count = _refCountUpdater.decrementAndGet(this); + + // note that the operation of decrementing the reference count and then removing the message does not + // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after + // the message has been passed to all queues. i.e. we are + // not relying on the all the increments having taken place before the delivery manager decrements. + if (count == 0) + { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _refCountUpdater.set(this,Integer.MIN_VALUE/2); + + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + if (_storedMessage != null) + { + _storedMessage.remove(); + } + } + else + { + if (count < 0) + { + throw new RuntimeException("Reference count for message id " + getMessageNumber() + + " has gone below 0."); + } + } } public static class Reference extends MessageReference<Message_1_0> @@ -154,13 +244,13 @@ public class Message_1_0 implements ServerMessage, InboundMessage protected void onReference(Message_1_0 message) { - + message.incrementReference(); } protected void onRelease(Message_1_0 message) { - + message.decrementReference(); } -} + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index fca8d59836..2f6ad96fb1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -1793,8 +1793,9 @@ public class DerbyMessageStore implements MessageStore public ByteBuffer getContent(int offsetInMessage, int size) { ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); + int length = getContent(offsetInMessage, buf); buf.position(0); + buf.limit(length); return buf; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index ff5f5c738a..ae88e3e9f7 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -41,8 +41,10 @@ import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; @@ -75,7 +77,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private final VirtualHost _virtualHost; private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); + private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); private MessageStoreLogSubject _logSubject; @@ -167,7 +169,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void message(StoredMessage message) { - AbstractServerMessageImpl serverMessage; + ServerMessage serverMessage; switch(message.getMetaData().getType()) { case META_DATA_0_8: @@ -176,6 +178,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa case META_DATA_0_10: serverMessage = new MessageTransferMessage(message, null); break; + case META_DATA_1_0: + serverMessage = new Message_1_0(message); + break; default: throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); } @@ -206,12 +211,13 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); - final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + final ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) { - message.incrementReference(); + final MessageReference ref = message.newReference(); + branch.enqueue(queue,message); @@ -224,7 +230,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { queue.enqueue(message, true, null); - message.decrementReference(); + ref.release(); } catch (AMQException e) { @@ -236,7 +242,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void onRollback() { - message.decrementReference(); + ref.release(); } }); } @@ -265,7 +271,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); - final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + final ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java index 4aa023a25c..7c6891da71 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -86,10 +87,12 @@ public class ReferenceCountingTest extends QpidTestCase AMQMessage message = new AMQMessage(storedMessage); - message.incrementReference(); + MessageReference ref = message.newReference(); assertEquals(1, _store.getMessageCount()); - message.decrementReference(); + + ref.release(); + assertEquals(0, _store.getMessageCount()); } @@ -142,13 +145,13 @@ public class ReferenceCountingTest extends QpidTestCase AMQMessage message = new AMQMessage(storedMessage); - message.incrementReference(); + MessageReference ref = message.newReference(); // we call routing complete to set up the handle // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertEquals(1, _store.getMessageCount()); - message.incrementReference(); - message.decrementReference(); + MessageReference ref2 = message.newReference(); + ref.release(); assertEquals(1, _store.getMessageCount()); } |
