summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-25 11:32:24 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-25 11:32:24 +0000
commit357424abb8b425a1c1b1d8ac24d1d480d9c41bb3 (patch)
tree1b19832fc5771d8c35d3f0deaccc7a0a591adf94 /qpid/java
parentf3d424977961151a0ce4676eb93ecaff9ceb8569 (diff)
downloadqpid-python-357424abb8b425a1c1b1d8ac24d1d480d9c41bb3.tar.gz
QPID-1634 : Created QueueBackingStore interface and implementation FileQueueBackingStore. Tested by FileQueueBackingStoreTest.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@747754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java419
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java11
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java217
10 files changed, 775 insertions, 18 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 2a235fe9a2..1077306eac 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
@@ -71,7 +72,7 @@ public class VirtualHostConfiguration
public VirtualHostConfiguration(String name, Configuration mungedConf) throws ConfigurationException
{
- this(name,mungedConf, null);
+ this(name,mungedConf, new ServerConfiguration(new PropertiesConfiguration()));
}
public String getName()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index e96d2ba874..5bde27dba5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -113,6 +113,9 @@ public interface AMQMessage
void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
throws AMQException;
+ void recoverFromMessageMetaData(MessageMetaData mmd);
+
+ void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException;
String toString();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
new file mode 100644
index 0000000000..7acb683e3b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
@@ -0,0 +1,419 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.ConfigurationException;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class FileQueueBackingStore implements QueueBackingStore
+{
+ private static final Logger _log = Logger.getLogger(FileQueueBackingStore.class);
+
+ private AtomicBoolean _closed = new AtomicBoolean(false);
+ private String _flowToDiskLocation;
+ private static final String QUEUE_BACKING_DIR = "queueBacking";
+
+ public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException
+ {
+ setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
+ }
+
+ private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
+ {
+ if (vHostName == null)
+ {
+ throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified");
+ }
+
+ if (location == null)
+ {
+ throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified.");
+ }
+
+ _flowToDiskLocation = location;
+
+ _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
+
+ File root = new File(location);
+ if (!root.exists())
+ {
+ throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath());
+ }
+ else
+ {
+
+ if (root.isFile())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:"+
+ root.getAbsolutePath());
+ }
+
+ if(!root.canWrite())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:"+
+ root.getAbsolutePath());
+ }
+
+ }
+
+
+ File store = new File(_flowToDiskLocation);
+ if (store.exists())
+ {
+ if (!FileUtils.delete(store, true))
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store as directory already exsits:"
+ + store.getAbsolutePath());
+ }
+
+ if (store.isFile())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:"+
+ store.getAbsolutePath());
+ }
+
+ }
+ else
+ {
+ if (!store.getParentFile().getParentFile().canWrite())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to parent location:"+
+ store.getParentFile().getParentFile().getAbsolutePath());
+ }
+ }
+
+
+ _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
+ store.deleteOnExit();
+ if (!store.mkdirs())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath());
+ }
+ }
+
+
+ public AMQMessage recover(Long messageId)
+ {
+ MessageMetaData mmd;
+ List<ContentChunk> contentBodies = new LinkedList<ContentChunk>();
+
+ File handle = getFileHandle(messageId);
+ handle.deleteOnExit();
+
+ ObjectInputStream input = null;
+
+ Exception error = null;
+ try
+ {
+ input = new ObjectInputStream(new FileInputStream(handle));
+
+ long arrivaltime = input.readLong();
+
+ final AMQShortString exchange = new AMQShortString(input.readUTF());
+ final AMQShortString routingKey = new AMQShortString(input.readUTF());
+ final boolean mandatory = input.readBoolean();
+ final boolean immediate = input.readBoolean();
+
+ int bodySize = input.readInt();
+ byte[] underlying = new byte[bodySize];
+
+ input.readFully(underlying, 0, bodySize);
+
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+
+ ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
+
+ int chunkCount = input.readInt();
+
+ // There are WAY to many annonymous MPIs in the code this should be made concrete.
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+
+ mmd = new MessageMetaData(info, chb, chunkCount);
+ mmd.setArrivalTime(arrivaltime);
+
+ AMQMessage message;
+ if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2)
+ {
+ message = new PersistentAMQMessage(messageId, null);
+ }
+ else
+ {
+ message = new TransientAMQMessage(messageId);
+ }
+
+ message.recoverFromMessageMetaData(mmd);
+
+ for (int chunk = 0; chunk < chunkCount; chunk++)
+ {
+ int length = input.readInt();
+
+ byte[] data = new byte[length];
+
+ input.readFully(data, 0, length);
+
+ // There are WAY to many annonymous CCs in the code this should be made concrete.
+ try
+ {
+ message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount));
+ }
+ catch (AMQException e)
+ {
+ //ignore as this will not occur.
+ // It is thrown by the _transactionLog method in recover on PersistentAMQMessage
+ // but we have created the message with a null log and will never call that method.
+ }
+ }
+
+ return message;
+ }
+ catch (Exception e)
+ {
+ error = e;
+ }
+ finally
+ {
+ try
+ {
+ input.close();
+ }
+ catch (IOException e)
+ {
+ _log.info("Unable to close input on message("+messageId+") recovery due to:"+e.getMessage());
+ }
+ }
+
+ throw new UnableToRecoverMessageException(error);
+ }
+
+
+ public void flow(AMQMessage message) throws UnableToFlowMessageException
+ {
+ long messageId = message.getMessageId();
+
+ File handle = getFileHandle(messageId);
+
+ //If we have written the data once then we don't need to do it again.
+ if (handle.exists())
+ {
+ _log.debug("Message(" + messageId + ") already flowed to disk.");
+ return;
+ }
+
+ handle.deleteOnExit();
+
+ ObjectOutputStream writer = null;
+ Exception error = null;
+
+ try
+ {
+ writer = new ObjectOutputStream(new FileOutputStream(handle));
+
+ writer.writeLong(message.getArrivalTime());
+
+ MessagePublishInfo mpi = message.getMessagePublishInfo();
+ writer.writeUTF(String.valueOf(mpi.getExchange()));
+ writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
+ writer.writeBoolean(mpi.isMandatory());
+ writer.writeBoolean(mpi.isImmediate());
+ ContentHeaderBody chb = message.getContentHeaderBody();
+
+ // write out the content header body
+ final int bodySize = chb.getSize();
+ byte[] underlying = new byte[bodySize];
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ chb.writePayload(buf);
+
+ writer.writeInt(bodySize);
+ writer.write(underlying, 0, bodySize);
+
+ int bodyCount = message.getBodyCount();
+ writer.writeInt(bodyCount);
+
+ //WriteContentBody
+ for (int index = 0; index < bodyCount; index++)
+ {
+ ContentChunk chunk = message.getContentChunk(index);
+ chunk.reduceToFit();
+
+ byte[] chunkData = chunk.getData().array();
+
+ int length = chunk.getSize();
+ writer.writeInt(length);
+ writer.write(chunkData, 0, length);
+ }
+ }
+ catch (FileNotFoundException e)
+ {
+ error = e;
+ }
+ catch (IOException e)
+ {
+ error = e;
+ }
+ finally
+ {
+ try
+ {
+ writer.flush();
+ writer.close();
+ }
+ catch (IOException e)
+ {
+ error = e;
+ }
+ }
+
+ if (error != null)
+ {
+ _log.error("Unable to flow message(" + messageId + ") to disk, restoring state.");
+ handle.delete();
+ throw new UnableToFlowMessageException(messageId, error);
+ }
+ }
+
+ /**
+ * Use the messageId to calculate the file path on disk.
+ *
+ * Current implementation will give us 256 bins.
+ * Therefore the maximum messages that can be flowed before error/platform is:
+ * ext3 : 256 bins * 32000 = 8192000
+ * FAT32 : 256 bins * 65534 = 16776704
+ * Other FS have much greater limits than we need to worry about.
+ *
+ * @param messageId the Message we need a file Handle for.
+ *
+ * @return the File handle
+ */
+ private File getFileHandle(long messageId)
+ {
+ // grab the 8 LSB to give us 256 bins
+ long bin = messageId & 0xFFL;
+
+ String bin_path =_flowToDiskLocation + File.separator + bin;
+ File bin_dir = new File(bin_path);
+
+ if (!bin_dir.exists())
+ {
+ bin_dir.mkdirs();
+ bin_dir.deleteOnExit();
+ }
+
+ String id = bin_path + File.separator + messageId;
+
+ return new File(id);
+ }
+
+ public void delete(Long messageId)
+ {
+ String id = String.valueOf(messageId);
+ File handle = new File(_flowToDiskLocation, id);
+
+ if (handle.exists())
+ {
+ _log.debug("Message(" + messageId + ") delete flowToDisk.");
+ if (!handle.delete())
+ {
+ throw new RuntimeException("Unable to delete flowToDisk data");
+ }
+ }
+ }
+
+ private class RecoverDataBuffer implements ContentChunk
+ {
+ private int _length;
+ private ByteBuffer _dataBuffer;
+
+ public RecoverDataBuffer(int length, byte[] data)
+ {
+ _length = length;
+ _dataBuffer = ByteBuffer.wrap(data);
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public ByteBuffer getData()
+ {
+ return _dataBuffer;
+ }
+
+ public void reduceToFit()
+ {
+
+ }
+
+ }
+
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
index 92c10b0347..9c644cc010 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -52,7 +52,8 @@ public class PersistentAMQMessage extends TransientAMQMessage
throws AMQException
{
super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
- MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
+ MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody,
+ _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
_transactionLog.storeMessageMetaData(storeContext, _messageId, mmd);
}
@@ -63,13 +64,7 @@ public class PersistentAMQMessage extends TransientAMQMessage
return true;
}
- public void recoverFromMessageMetaData(MessageMetaData mmd)
- {
- _arrivalTime = mmd.getArrivalTime();
- _contentHeaderBody = mmd.getContentHeaderBody();
- _messagePublishInfo = mmd.getMessagePublishInfo();
- }
-
+ @Override
public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
{
super.addContentBodyFrame(null, contentChunk, isLastContentBody);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
new file mode 100644
index 0000000000..376e6f1b57
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.ConfigurationException;
+
+public interface QueueBackingStore
+{
+ void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
+
+ AMQMessage recover(Long messageId);
+
+ void flow(AMQMessage message) throws UnableToFlowMessageException;
+
+ void delete(Long messageId);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
index fa4e85a043..bea10cef53 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -339,27 +339,48 @@ public class TransientAMQMessage implements AMQMessage
throw new NullPointerException("PublishInfo cannot be null");
}
- _messagePublishInfo = messagePublishInfo;
+ _arrivalTime = System.currentTimeMillis();
+
+
_contentHeaderBody = contentHeaderBody;
+ _messagePublishInfo = messagePublishInfo;
+
+ updateHeaderAndFlags();
+ }
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
+ public void recoverFromMessageMetaData(MessageMetaData mmd)
+ {
+ _arrivalTime = mmd.getArrivalTime();
+ _contentHeaderBody = mmd.getContentHeaderBody();
+ _messagePublishInfo = mmd.getMessagePublishInfo();
- if (contentHeaderBody.bodySize == 0)
+ updateHeaderAndFlags();
+ }
+
+ private void updateHeaderAndFlags()
+ {
+ if (_contentHeaderBody.bodySize == 0)
{
_contentBodies = Collections.EMPTY_LIST;
}
- _arrivalTime = System.currentTimeMillis();
-
- if (messagePublishInfo.isImmediate())
+ if (_messagePublishInfo.isImmediate())
{
_flags |= IMMEDIATE;
}
}
- public long getArrivalTime()
+ public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
{
- return _arrivalTime;
+ addContentBodyFrame(null, contentChunk, isLastContentBody);
}
+
public String toString()
{
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java
new file mode 100644
index 0000000000..03cfed8533
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public class UnableToFlowMessageException extends Exception
+{
+ public UnableToFlowMessageException(long messageId, Exception error)
+ {
+ super("Unable to Flow Message:"+messageId, error);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java
new file mode 100644
index 0000000000..cae5bc6327
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public class UnableToRecoverMessageException extends RuntimeException
+{
+ public UnableToRecoverMessageException(Exception error)
+ {
+ super(error);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index e1b770b1d3..e10697f580 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -84,7 +84,9 @@ public class VirtualHost implements Accessable
private ACLManager _accessManager;
private final Timer _houseKeepingTimer;
-
+
+ private VirtualHostConfiguration _configuration;
+
public void setAccessableName(String name)
{
_logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -106,6 +108,11 @@ public class VirtualHost implements Accessable
return _routingTable;
}
+ public VirtualHostConfiguration getConfiguration()
+ {
+ return _configuration ;
+ }
+
/**
* Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
* implementaion of an Exchange MBean should extend this class.
@@ -137,7 +144,6 @@ public class VirtualHost implements Accessable
/**
* Normal Constructor
- * @param name
* @param hostConfig
* @throws Exception
*/
@@ -148,6 +154,7 @@ public class VirtualHost implements Accessable
public VirtualHost(VirtualHostConfiguration hostConfig, TransactionLog transactionLog) throws Exception
{
+ _configuration = hostConfig;
_name = hostConfig.getName();
if (_name == null || _name.length() == 0)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
new file mode 100644
index 0000000000..bb2a5f3d3b
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
@@ -0,0 +1,217 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.io.File;
+
+public class FileQueueBackingStoreTest extends TestCase
+{
+ FileQueueBackingStore _backing;
+ private TransactionLog _transactionLog;
+ VirtualHost _vhost;
+ VirtualHostConfiguration _vhostConfig;
+
+ public void setUp() throws Exception
+ {
+ _backing = new FileQueueBackingStore();
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ config.addProperty("store.class", MemoryMessageStore.class.getName());
+ _vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", config);
+ _vhost = new VirtualHost(_vhostConfig);
+ _transactionLog = _vhost.getTransactionLog();
+
+ _backing.configure(_vhost, _vhost.getConfiguration());
+
+ }
+
+ private void resetBacking(Configuration configuration) throws Exception
+ {
+ configuration.addProperty("store.class", MemoryMessageStore.class.getName());
+ _vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", configuration);
+ _vhost = new VirtualHost(_vhostConfig);
+ _transactionLog = _vhost.getTransactionLog();
+
+ _backing = new FileQueueBackingStore();
+
+ _backing.configure(_vhost, _vhost.getConfiguration());
+ }
+
+ public void testInvalidSetupRootExistsIsFile() throws Exception
+ {
+
+ File fileAsRoot = File.createTempFile("tmpRoot", "");
+ fileAsRoot.deleteOnExit();
+
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty(VirtualHostConfiguration.FLOW_TO_DISK_PATH, fileAsRoot.getAbsolutePath());
+
+ try
+ {
+ resetBacking(configuration);
+ fail("Exception expected to be thrown");
+ }
+ catch (ConfigurationException ce)
+ {
+ assertTrue("Expected Exception not thrown, expecting:" +
+ "Unable to create Temporary Flow to Disk store as specified root is a file:",
+ ce.getMessage().
+ startsWith("Unable to create Temporary Flow to Disk store as specified root is a file:"));
+ }
+
+ }
+
+ public void testInvalidSetupRootExistsCantWrite() throws Exception
+ {
+
+ File fileAsRoot = new File("/var/log");
+
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+
+ configuration.addProperty(VirtualHostConfiguration.FLOW_TO_DISK_PATH, fileAsRoot.getAbsolutePath());
+
+ try
+ {
+ resetBacking(configuration);
+ fail("Exception expected to be thrown");
+ }
+ catch (ConfigurationException ce)
+ {
+ assertEquals("Unable to create Temporary Flow to Disk store. Unable to write to specified root:/var/log",
+ ce.getMessage());
+ }
+
+ }
+
+ public void testEmptyTransientFlowToDisk() throws UnableToFlowMessageException, AMQException
+ {
+ AMQMessage original = MessageFactory.getInstance().createMessage(null, false);
+
+ ContentHeaderBody chb = new ContentHeaderBody(new BasicContentHeaderProperties(), BasicPublishBodyImpl.CLASS_ID);
+ chb.bodySize = 0L;
+
+ runTestWithMessage(original, chb);
+ }
+
+ public void testEmptyPersistentFlowToDisk() throws UnableToFlowMessageException, AMQException
+ {
+
+ AMQMessage original = MessageFactory.getInstance().createMessage(_transactionLog, true);
+ ContentHeaderBody chb = new ContentHeaderBody(new BasicContentHeaderProperties(), BasicPublishBodyImpl.CLASS_ID);
+ chb.bodySize = 0L;
+ ((BasicContentHeaderProperties) chb.properties).setDeliveryMode((byte) 2);
+
+ runTestWithMessage(original, chb);
+
+ }
+
+ public void testNonEmptyTransientFlowToDisk() throws UnableToFlowMessageException, AMQException
+ {
+ AMQMessage original = MessageFactory.getInstance().createMessage(null, false);
+
+ ContentHeaderBody chb = new ContentHeaderBody(new BasicContentHeaderProperties(), BasicPublishBodyImpl.CLASS_ID);
+ chb.bodySize = 100L;
+
+ runTestWithMessage(original, chb);
+ }
+
+ public void testNonEmptyPersistentFlowToDisk() throws UnableToFlowMessageException, AMQException
+ {
+ AMQMessage original = MessageFactory.getInstance().createMessage(_transactionLog, true);
+ ContentHeaderBody chb = new ContentHeaderBody(new BasicContentHeaderProperties(), BasicPublishBodyImpl.CLASS_ID);
+ chb.bodySize = 100L;
+ ((BasicContentHeaderProperties) chb.properties).setDeliveryMode((byte) 2);
+
+ runTestWithMessage(original, chb);
+ }
+
+ void runTestWithMessage(AMQMessage original, ContentHeaderBody chb) throws UnableToFlowMessageException, AMQException
+ {
+
+ // Create message
+
+ original.setPublishAndContentHeaderBody(null,
+ new MessagePublishInfoImpl(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
+ false, false, new AMQShortString("routing")),
+ chb);
+ if (chb.bodySize > 0)
+ {
+ ContentChunk chunk = new MockContentChunk((int) chb.bodySize/2);
+
+ original.addContentBodyFrame(null, chunk, false);
+
+ chunk = new MockContentChunk((int) chb.bodySize/2);
+
+ original.addContentBodyFrame(null, chunk, true);
+ }
+
+ _backing.flow(original);
+
+ AMQMessage fromDisk = _backing.recover(original.getMessageId());
+
+ assertEquals("Message IDs do not match", original.getMessageId(), fromDisk.getMessageId());
+ assertEquals("Message arrival times do not match", original.getArrivalTime(), fromDisk.getArrivalTime());
+ assertEquals(original.isPersistent(), fromDisk.isPersistent());
+
+ // Validate the MPI data was restored correctly
+ MessagePublishInfo originalMPI = original.getMessagePublishInfo();
+ MessagePublishInfo fromDiskMPI = fromDisk.getMessagePublishInfo();
+ assertEquals("Exchange", originalMPI.getExchange(), fromDiskMPI.getExchange());
+ assertEquals(originalMPI.isImmediate(), fromDiskMPI.isImmediate());
+ assertEquals(originalMPI.isMandatory(), fromDiskMPI.isMandatory());
+ assertEquals(originalMPI.getRoutingKey(), fromDiskMPI.getRoutingKey());
+
+ // Validate BodyCounts.
+ int originalBodyCount = original.getBodyCount();
+ assertEquals(originalBodyCount, fromDisk.getBodyCount());
+
+ if (originalBodyCount > 0)
+ {
+ for (int index = 0; index < originalBodyCount; index++)
+ {
+ ContentChunk originalChunk = original.getContentChunk(index);
+ ContentChunk fromDiskChunk = fromDisk.getContentChunk(index);
+
+ assertEquals(originalChunk.getSize(), fromDiskChunk.getSize());
+ assertEquals(originalChunk.getData(), fromDiskChunk.getData());
+ }
+ }
+
+ }
+
+}