summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-01-25 15:38:45 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-01-25 15:38:45 +0000
commite7ad0e742e5bd214bb750484047fde9bf434e1eb (patch)
tree96f92bcf39e42484fd1058aea68620bdbb8a7fcf /qpid/java/broker/src
parent6a359e9049e06c00e77ddbccb65b7f53cbc4b032 (diff)
downloadqpid-python-e7ad0e742e5bd214bb750484047fde9bf434e1eb.tar.gz
QPID-4550 : AMQP 1.0 Persistent Messages cause failure on restart
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1438556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java98
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java3
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java13
5 files changed, 127 insertions, 18 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
index 2cc1a92853..e01f20d54f 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
@@ -342,6 +342,14 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
{
private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance();
+ private MetaDataFactory()
+ {
+ _typeRegistry.registerTransportLayer();
+ _typeRegistry.registerMessagingLayer();
+ _typeRegistry.registerTransactionLayer();
+ _typeRegistry.registerSecurityLayer();
+ }
+
public MessageMetaData_1_0 createMetaData(ByteBuffer buf)
{
ValueHandler valueHandler = new ValueHandler(_typeRegistry);
@@ -354,7 +362,8 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
try
{
ByteBuffer encodedBuf = buf.duplicate();
- sections.add((Section) valueHandler.parse(buf));
+ Object parse = valueHandler.parse(buf);
+ sections.add((Section) parse);
encodedBuf.limit(buf.position());
encodedSections.add(encodedBuf);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index baecb5b0fe..fbce1666b7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -23,7 +23,9 @@ package org.apache.qpid.server.protocol.v1_0;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData_1_0;
@@ -33,11 +35,45 @@ import org.apache.qpid.server.store.StoredMessage;
public class Message_1_0 implements ServerMessage, InboundMessage
{
+
+
+ private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
+
+ private volatile int _referenceCount = 0;
+
private final StoredMessage<MessageMetaData_1_0> _storedMessage;
private List<ByteBuffer> _fragments;
private WeakReference<Session_1_0> _session;
+ public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
+ {
+ _storedMessage = storedMessage;
+ _session = null;
+ _fragments = restoreFragments(storedMessage);
+ }
+
+ private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
+ {
+ ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
+ final int FRAGMENT_SIZE = 2048;
+ int offset = 0;
+ ByteBuffer b;
+ do
+ {
+
+ b = storedMessage.getContent(offset,FRAGMENT_SIZE);
+ if(b.hasRemaining())
+ {
+ fragments.add(b);
+ offset+= b.remaining();
+ }
+ }
+ while(b.hasRemaining());
+ return fragments;
+ }
+
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
final List<ByteBuffer> fragments,
final Session_1_0 session)
@@ -142,7 +178,61 @@ public class Message_1_0 implements ServerMessage, InboundMessage
public Session_1_0 getSession()
{
- return _session.get();
+ return _session == null ? null : _session.get();
+ }
+
+
+ public boolean incrementReference()
+ {
+ if(_refCountUpdater.incrementAndGet(this) <= 0)
+ {
+ _refCountUpdater.decrementAndGet(this);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ *
+ * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference()
+ {
+ int count = _refCountUpdater.decrementAndGet(this);
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _refCountUpdater.set(this,Integer.MIN_VALUE/2);
+
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_storedMessage != null)
+ {
+ _storedMessage.remove();
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new RuntimeException("Reference count for message id " + getMessageNumber()
+ + " has gone below 0.");
+ }
+ }
}
public static class Reference extends MessageReference<Message_1_0>
@@ -154,13 +244,13 @@ public class Message_1_0 implements ServerMessage, InboundMessage
protected void onReference(Message_1_0 message)
{
-
+ message.incrementReference();
}
protected void onRelease(Message_1_0 message)
{
-
+ message.decrementReference();
}
-}
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index fca8d59836..2f6ad96fb1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -1793,8 +1793,9 @@ public class DerbyMessageStore implements MessageStore
public ByteBuffer getContent(int offsetInMessage, int size)
{
ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
+ int length = getContent(offsetInMessage, buf);
buf.position(0);
+ buf.limit(length);
return buf;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index ff5f5c738a..ae88e3e9f7 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -41,8 +41,10 @@ import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
@@ -75,7 +77,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private final VirtualHost _virtualHost;
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
+ private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
private MessageStoreLogSubject _logSubject;
@@ -167,7 +169,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void message(StoredMessage message)
{
- AbstractServerMessageImpl serverMessage;
+ ServerMessage serverMessage;
switch(message.getMetaData().getType())
{
case META_DATA_0_8:
@@ -176,6 +178,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
case META_DATA_0_10:
serverMessage = new MessageTransferMessage(message, null);
break;
+ case META_DATA_1_0:
+ serverMessage = new Message_1_0(message);
+ break;
default:
throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
}
@@ -206,12 +211,13 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
- final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+ final ServerMessage message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
if(message != null)
{
- message.incrementReference();
+ final MessageReference ref = message.newReference();
+
branch.enqueue(queue,message);
@@ -224,7 +230,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
queue.enqueue(message, true, null);
- message.decrementReference();
+ ref.release();
}
catch (AMQException e)
{
@@ -236,7 +242,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void onRollback()
{
- message.decrementReference();
+ ref.release();
}
});
}
@@ -265,7 +271,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
- final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+ final ServerMessage message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
if(message != null)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
index 4aa023a25c..7c6891da71 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -86,10 +87,12 @@ public class ReferenceCountingTest extends QpidTestCase
AMQMessage message = new AMQMessage(storedMessage);
- message.incrementReference();
+ MessageReference ref = message.newReference();
assertEquals(1, _store.getMessageCount());
- message.decrementReference();
+
+ ref.release();
+
assertEquals(0, _store.getMessageCount());
}
@@ -142,13 +145,13 @@ public class ReferenceCountingTest extends QpidTestCase
AMQMessage message = new AMQMessage(storedMessage);
- message.incrementReference();
+ MessageReference ref = message.newReference();
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertEquals(1, _store.getMessageCount());
- message.incrementReference();
- message.decrementReference();
+ MessageReference ref2 = message.newReference();
+ ref.release();
assertEquals(1, _store.getMessageCount());
}