summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-07-26 14:37:43 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-07-26 14:37:43 +0000
commit84a53cdf08a58991b80487be8fe062024175f7e9 (patch)
tree16decc4e68eb607ca73fc22f691020478125534c /java
parentc36251461bb48a12e540abc554f5b2e0495f4ead (diff)
downloadqpid-python-84a53cdf08a58991b80487be8fe062024175f7e9.tar.gz
QPID-2659: Add AMQStoreException to message stores
This is a sub-class of AMQInternalException, which encapsulates error code 541, or INTERNAL_ERROR. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@979315 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java134
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java43
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java18
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java27
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQInternalException.java49
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQStoreException.java48
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java27
14 files changed, 286 insertions, 189 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
index 7393f27ab4..400ce50bc4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -96,7 +97,7 @@ public class BindingFactory
removeBinding(this);
}
- public void onClose(final Exchange exchange) throws AMQSecurityException
+ public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
{
removeBinding(this);
}
@@ -140,7 +141,7 @@ public class BindingFactory
- public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException
+ public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
return makeBinding(bindingKey, queue, exchange, arguments, false, false);
}
@@ -149,12 +150,12 @@ public class BindingFactory
public boolean replaceBinding(final String bindingKey,
final AMQQueue queue,
final Exchange exchange,
- final Map<String, Object> arguments) throws AMQSecurityException
+ final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
return makeBinding(bindingKey, queue, exchange, arguments, false, true);
}
- private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException
+ private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
if (bindingKey == null)
@@ -187,14 +188,7 @@ public class BindingFactory
if (b.isDurable() && !restore)
{
- try
- {
- _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e); // FIXME
- }
+ _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
}
queue.addQueueDeleteTask(b);
@@ -217,18 +211,18 @@ public class BindingFactory
return getVirtualHost().getConfigStore();
}
- public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException
+ public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
{
makeBinding(bindingKey,queue,exchange,argumentMap,true, false);
}
- public void removeBinding(final Binding b) throws AMQSecurityException
+ public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException
{
removeBinding(b.getBindingKey(), b.getQueue(), b.getExchange(), b.getArguments());
}
- public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException
+ public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
if (bindingKey == null)
@@ -261,17 +255,10 @@ public class BindingFactory
if (b.isDurable())
{
- try
- {
- _configSource.getDurableConfigurationStore().unbindQueue(exchange,
- new AMQShortString(bindingKey),
- queue,
- FieldTable.convertToFieldTable(arguments));
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e); // FIXME
- }
+ _configSource.getDurableConfigurationStore().unbindQueue(exchange,
+ new AMQShortString(bindingKey),
+ queue,
+ FieldTable.convertToFieldTable(arguments));
}
b.logDestruction();
getConfigStore().removeConfiguredObject(b);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 13fe767d3f..356a7f89b9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -139,6 +140,6 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig
public static interface Task
{
- public void onClose(Exchange exchange) throws AMQSecurityException;
+ public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 62f2d14e09..838867f233 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -20,20 +20,6 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.queue.AMQQueue;
-
import java.io.ByteArrayInputStream;
import java.io.File;
import java.lang.ref.SoftReference;
@@ -52,7 +38,26 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.queue.AMQQueue;
+/**
+ * An implementation of a {@link MessageStore} that uses Apache Derby as the persistance
+ * mechanism.
+ *
+ * TODO extract the SQL statements into a generic JDBC store
+ */
public class DerbyMessageStore implements MessageStore
{
@@ -221,7 +226,8 @@ public class DerbyMessageStore implements MessageStore
//Update to pick up QPID_WORK and use that as the default location not just derbyDB
- final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB");
+ final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
+ + File.separator + "derbyDB");
File environmentPath = new File(databasePath);
if (!environmentPath.exists())
@@ -387,13 +393,13 @@ public class DerbyMessageStore implements MessageStore
catch (SQLException e)
{
- throw new AMQException("Error recovering persistent state: " + e, e);
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
}
}
- private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException, AMQException
+ private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -432,7 +438,7 @@ public class DerbyMessageStore implements MessageStore
}
- private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws AMQException, SQLException
+ private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException
{
List<String> exchanges = new ArrayList<String>();
@@ -468,7 +474,7 @@ public class DerbyMessageStore implements MessageStore
}
- private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws AMQException, SQLException
+ private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException
{
_logger.info("Recovering bindings...");
@@ -552,7 +558,7 @@ public class DerbyMessageStore implements MessageStore
public StoredMessage getMessage(long messageNumber)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void removeMessage(long messageId)
@@ -599,12 +605,12 @@ public class DerbyMessageStore implements MessageStore
}
}
- throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e, e);
+ throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
}
}
- public void createExchange(Exchange exchange) throws AMQException
+ public void createExchange(Exchange exchange) throws AMQStoreException
{
if (_state != State.RECOVERING)
{
@@ -645,13 +651,13 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- throw new AMQException("Error adding Exchange with name " + exchange.getNameShortString() + " to database: " + e, e);
+ throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
}
}
}
- public void removeExchange(Exchange exchange) throws AMQException
+ public void removeExchange(Exchange exchange) throws AMQStoreException
{
Connection conn = null;
@@ -664,12 +670,12 @@ public class DerbyMessageStore implements MessageStore
stmt.close();
if(results == 0)
{
- throw new AMQException("Exchange " + exchange.getNameShortString() + " not found");
+ throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found");
}
}
catch (SQLException e)
{
- throw new AMQException("Error deleting exchange with name " + exchange.getNameShortString() + " from database: " + e, e);
+ throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e);
}
finally
{
@@ -689,7 +695,7 @@ public class DerbyMessageStore implements MessageStore
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQException
+ throws AMQStoreException
{
if (_state != State.RECOVERING)
{
@@ -735,8 +741,8 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- throw new AMQException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
- + exchange.getNameShortString() + " to database: " + e, e);
+ throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+ + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
}
finally
{
@@ -760,7 +766,7 @@ public class DerbyMessageStore implements MessageStore
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQException
+ throws AMQStoreException
{
Connection conn = null;
@@ -778,14 +784,14 @@ public class DerbyMessageStore implements MessageStore
if(result != 1)
{
- throw new AMQException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
+ throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
+ exchange.getNameShortString() + " not found");
}
}
catch (SQLException e)
{
- throw new AMQException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
- + exchange.getNameShortString() + " in database: " + e, e);
+ throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+ + exchange.getNameShortString() + " in database: " + e.getMessage(), e);
}
finally
{
@@ -806,12 +812,12 @@ public class DerbyMessageStore implements MessageStore
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue) throws AMQStoreException
{
createQueue(queue, null);
}
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
_logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
@@ -858,7 +864,7 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- throw new AMQException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e);
+ throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
}
}
}
@@ -870,9 +876,9 @@ public class DerbyMessageStore implements MessageStore
* NOTE: Currently only updates the exclusivity.
*
* @param queue The queue to update the entry for.
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- public void updateQueue(final AMQQueue queue) throws AMQException
+ public void updateQueue(final AMQQueue queue) throws AMQStoreException
{
if (_state != State.RECOVERING)
{
@@ -901,7 +907,7 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- throw new AMQException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e);
+ throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
}
}
@@ -931,7 +937,7 @@ public class DerbyMessageStore implements MessageStore
return connection;
}
- public void removeQueue(final AMQQueue queue) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
AMQShortString name = queue.getNameShortString();
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
@@ -947,12 +953,12 @@ public class DerbyMessageStore implements MessageStore
if (results == 0)
{
- throw new AMQException("Queue " + name + " not found");
+ throw new AMQStoreException("Queue " + name + " not found");
}
}
catch (SQLException e)
{
- throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e);
+ throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e);
}
finally
{
@@ -978,7 +984,7 @@ public class DerbyMessageStore implements MessageStore
return new DerbyTransaction();
}
- public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException
+ public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
String name = queue.getResourceName();
@@ -1000,14 +1006,14 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- _logger.error("Failed to enqueue: " + e, e);
- throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
+ _logger.error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
+ " to database", e);
}
}
- public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException
+ public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
String name = queue.getResourceName();
@@ -1025,7 +1031,7 @@ public class DerbyMessageStore implements MessageStore
if(results != 1)
{
- throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
}
if (_logger.isDebugEnabled())
@@ -1035,8 +1041,8 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- _logger.error("Failed to dequeue: " + e, e);
- throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
+ _logger.error("Failed to dequeue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name
+ " from database", e);
}
@@ -1058,7 +1064,7 @@ public class DerbyMessageStore implements MessageStore
}
- public void commitTran(ConnectionWrapper connWrapper) throws AMQException
+ public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException
{
try
@@ -1075,7 +1081,7 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- throw new AMQException("Error commit tx: " + e, e);
+ throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
}
finally
{
@@ -1083,7 +1089,7 @@ public class DerbyMessageStore implements MessageStore
}
}
- public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQException
+ public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException
{
commitTran(connWrapper);
return new StoreFuture()
@@ -1101,11 +1107,11 @@ public class DerbyMessageStore implements MessageStore
}
- public void abortTran(ConnectionWrapper connWrapper) throws AMQException
+ public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException
{
if (connWrapper == null)
{
- throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
+ throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran");
}
if (_logger.isDebugEnabled())
@@ -1121,7 +1127,7 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- throw new AMQException("Error aborting transaction: " + e, e);
+ throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
}
}
@@ -1310,7 +1316,7 @@ public class DerbyMessageStore implements MessageStore
}
}
- throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e, e);
+ throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
}
}
@@ -1375,7 +1381,7 @@ public class DerbyMessageStore implements MessageStore
}
}
- throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e, e);
+ throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
}
@@ -1388,11 +1394,11 @@ public class DerbyMessageStore implements MessageStore
}
- private synchronized void stateTransition(State requiredState, State newState) throws AMQException
+ private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
{
if (_state != requiredState)
{
- throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ "; currently in state: " + _state);
}
@@ -1417,28 +1423,28 @@ public class DerbyMessageStore implements MessageStore
}
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
}
- public void commitTran() throws AMQException
+ public void commitTran() throws AMQStoreException
{
DerbyMessageStore.this.commitTran(_connWrapper);
}
- public StoreFuture commitTranAsync() throws AMQException
+ public StoreFuture commitTranAsync() throws AMQStoreException
{
return DerbyMessageStore.this.commitTranAsync(_connWrapper);
}
- public void abortTran() throws AMQException
+ public void abortTran() throws AMQStoreException
{
DerbyMessageStore.this.abortTran(_connWrapper);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index c169e3bcff..5fb23653cb 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.queue.AMQQueue;
public interface DurableConfigurationStore
{
@@ -55,18 +55,18 @@ public interface DurableConfigurationStore
*
* @param exchange The exchange to persist.
*
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void createExchange(Exchange exchange) throws AMQException;
+ void createExchange(Exchange exchange) throws AMQStoreException;
/**
* Removes the specified persistent exchange.
*
* @param exchange The exchange to remove.
*
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void removeExchange(Exchange exchange) throws AMQException;
+ void removeExchange(Exchange exchange) throws AMQStoreException;
/**
* Binds the specified queue to an exchange with a routing key.
@@ -76,9 +76,9 @@ public interface DurableConfigurationStore
* @param queue The queue to bind.
* @param args Additional parameters.
*
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException if the operation fails for any reason.
*/
- void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+ void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException;
/**
* Unbinds the specified from an exchange under a particular routing key.
@@ -88,43 +88,44 @@ public interface DurableConfigurationStore
* @param queue The queue to unbind.
* @param args Additonal parameters.
*
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+ void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException;
/**
* Makes the specified queue persistent.
*
* @param queue The queue to store.
*
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void createQueue(AMQQueue queue) throws AMQException;
+ void createQueue(AMQQueue queue) throws AMQStoreException;
/**
* Makes the specified queue persistent.
*
* @param queue The queue to store.
- *
* @param arguments The additional arguments to the binding
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
+ void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException;
/**
* Removes the specified queue from the persistent store.
*
* @param queue The queue to remove.
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void removeQueue(AMQQueue queue) throws AMQException;
+ void removeQueue(AMQQueue queue) throws AMQStoreException;
/**
* Updates the specified queue in the persistent store, IF it is already present. If the queue
* is not present in the store, it will not be added.
*
* @param queue The queue to update the entry for.
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void updateQueue(AMQQueue queue) throws AMQException;
+ void updateQueue(AMQQueue queue) throws AMQStoreException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 9d9312cd26..d008d42fa0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,21 +20,22 @@
*/
package org.apache.qpid.server.store;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.queue.AMQQueue;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
public class MemoryMessageStore implements MessageStore
@@ -52,24 +53,24 @@ public class MemoryMessageStore implements MessageStore
private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
}
- public void commitTran() throws AMQException
+ public void commitTran() throws AMQStoreException
{
}
- public StoreFuture commitTranAsync() throws AMQException
+ public StoreFuture commitTranAsync() throws AMQStoreException
{
return IMMEDIATE_FUTURE;
}
- public void abortTran() throws AMQException
+ public void abortTran() throws AMQStoreException
{
}
@@ -113,43 +114,43 @@ public class MemoryMessageStore implements MessageStore
}
- public void createExchange(Exchange exchange) throws AMQException
+ public void createExchange(Exchange exchange) throws AMQStoreException
{
}
- public void removeExchange(Exchange exchange) throws AMQException
+ public void removeExchange(Exchange exchange) throws AMQStoreException
{
}
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
}
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue) throws AMQStoreException
{
// Not requred to do anything
}
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
// Not required to do anything
}
- public void removeQueue(final AMQQueue queue) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
// Not required to do anything
}
- public void updateQueue(final AMQQueue queue) throws AMQException
+ public void updateQueue(final AMQQueue queue) throws AMQStoreException
{
// Not required to do anything
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
index e6a33e23d6..d196a91930 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.commons.configuration.Configuration;
public interface TransactionLog
@@ -35,40 +34,40 @@ public interface TransactionLog
*
* @param queue The queue to place the message on.
* @param messageId The message to enqueue.
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException;
+ void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
/**
* Extracts a message from a specified queue, in a given transactional context.
*
* @param queue The queue to place the message on.
* @param messageId The message to dequeue.
- * @throws org.apache.qpid.AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException;
+ void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
/**
* Commits all operations performed within a given transactional context.
*
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void commitTran() throws AMQException;
+ void commitTran() throws AMQStoreException;
/**
* Commits all operations performed within a given transactional context.
*
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- StoreFuture commitTranAsync() throws AMQException;
+ StoreFuture commitTranAsync() throws AMQStoreException;
/**
* Abandons all operations performed within a given transactional context.
*
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- void abortTran() throws AMQException;
+ void abortTran() throws AMQStoreException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index c6055f35f6..f2444718af 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -36,6 +36,8 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQBrokerManagerMBean;
@@ -696,7 +698,7 @@ public class VirtualHostImpl implements VirtualHost
//To change body of implemented methods use File | Settings | File Templates.
}
- public void createExchange(Exchange exchange) throws AMQException
+ public void createExchange(Exchange exchange) throws AMQStoreException
{
if (exchange.isDurable())
{
@@ -704,11 +706,11 @@ public class VirtualHostImpl implements VirtualHost
}
}
- public void removeExchange(Exchange exchange) throws AMQException
+ public void removeExchange(Exchange exchange) throws AMQStoreException
{
}
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
if (exchange.isDurable() && queue.isDurable())
{
@@ -716,16 +718,16 @@ public class VirtualHostImpl implements VirtualHost
}
}
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue) throws AMQStoreException
{
createQueue(queue, null);
}
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
if (queue.isDurable())
{
@@ -733,7 +735,7 @@ public class VirtualHostImpl implements VirtualHost
}
}
- public void removeQueue(AMQQueue queue) throws AMQException
+ public void removeQueue(AMQQueue queue) throws AMQStoreException
{
}
@@ -766,7 +768,7 @@ public class VirtualHostImpl implements VirtualHost
}
}
- public void updateQueue(AMQQueue queue) throws AMQException
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
{
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java
index 691d5acfa9..e3b07b072b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -54,7 +55,7 @@ public class TopicConfigurationTest extends InternalBrokerBaseCase
* @throws ConfigurationException
* @throws AMQSecurityException
*/
- public void testTopicCreation() throws ConfigurationException, AMQSecurityException
+ public void testTopicCreation() throws ConfigurationException, AMQSecurityException, AMQInternalException
{
Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
_virtualHost.getBindingFactory().addBinding("stocks.nyse.appl", _queue, topicExchange, null);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index d9378a223b..b0a655e8b6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,10 +21,10 @@ package org.apache.qpid.server.queue;
*/
-import junit.framework.TestCase;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -149,7 +149,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
}
- public void testBinding() throws AMQSecurityException
+ public void testBinding() throws AMQSecurityException, AMQInternalException
{
_virtualHost.getBindingFactory().addBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 8cbc50ecbe..5ff84557d8 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -76,31 +77,31 @@ public class SkeletonMessageStore implements MessageStore
{
}
- public void createExchange(Exchange exchange) throws AMQException
+ public void createExchange(Exchange exchange) throws AMQStoreException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void removeExchange(Exchange exchange) throws AMQException
+ public void removeExchange(Exchange exchange) throws AMQStoreException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue) throws AMQStoreException
{
}
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
}
@@ -161,7 +162,7 @@ public class SkeletonMessageStore implements MessageStore
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void removeQueue(final AMQQueue queue) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
}
@@ -179,22 +180,22 @@ public class SkeletonMessageStore implements MessageStore
return new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void commitTran() throws AMQException
+ public void commitTran() throws AMQStoreException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public StoreFuture commitTranAsync() throws AMQException
+ public StoreFuture commitTranAsync() throws AMQStoreException
{
return new StoreFuture()
{
@@ -210,14 +211,14 @@ public class SkeletonMessageStore implements MessageStore
};
}
- public void abortTran() throws AMQException
+ public void abortTran() throws AMQStoreException
{
//To change body of implemented methods use File | Settings | File Templates.
}
};
}
- public void updateQueue(AMQQueue queue) throws AMQException
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
{
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index ab8c1e7c9c..e8d0b99e6e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -68,21 +68,21 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
private class TestableTransaction implements Transaction
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
getMessages().put(messageId, (AMQQueue)queue);
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
getMessages().remove(messageId);
}
- public void commitTran() throws AMQException
+ public void commitTran() throws AMQStoreException
{
}
- public StoreFuture commitTranAsync() throws AMQException
+ public StoreFuture commitTranAsync() throws AMQStoreException
{
return new StoreFuture()
{
@@ -98,7 +98,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
};
}
- public void abortTran() throws AMQException
+ public void abortTran() throws AMQStoreException
{
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQInternalException.java b/java/common/src/main/java/org/apache/qpid/AMQInternalException.java
new file mode 100644
index 0000000000..59dc800c0e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQInternalException.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * InternalException encapsulates error code 541, or {@link AMQConstant#INTERNAL_ERROR} exceptions relating to the
+ * AMQ protocol. It is used to report internal failures and errors that occur within the broker.
+ */
+public class AMQInternalException extends AMQException
+{
+ /** serialVersionUID */
+ private static final long serialVersionUID = 2544449432843381112L;
+
+ /**
+ * Creates an exception with an optional message and optional underlying cause.
+ *
+ * @param msg The exception message. May be null if not to be set.
+ * @param cause The underlying cause of the exception. May be null if not to be set.
+ */
+ public AMQInternalException(String msg, Throwable cause)
+ {
+ super(AMQConstant.INTERNAL_ERROR, ((msg == null) ? "Internal error" : msg), cause);
+ }
+
+ public AMQInternalException(String msg)
+ {
+ this(msg, null);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQStoreException.java b/java/common/src/main/java/org/apache/qpid/AMQStoreException.java
new file mode 100644
index 0000000000..8389fe5efa
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQStoreException.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid;
+
+import java.sql.SQLException;
+
+/**
+ * StoreException is a specific type of internal error relating to errors in the message store, such as {@link SQLException}.
+ */
+public class AMQStoreException extends AMQInternalException
+{
+ /** serialVersionUID */
+ private static final long serialVersionUID = 2859681947490637496L;
+
+ /**
+ * Creates an exception with an optional message and optional underlying cause.
+ *
+ * @param msg The exception message. May be null if not to be set.
+ * @param cause The underlying cause of the exception. May be null if not to be set.
+ */
+ public AMQStoreException(String msg, Throwable cause)
+ {
+ super(((msg == null) ? "Store error" : msg), cause);
+ }
+
+ public AMQStoreException(String msg)
+ {
+ this(msg, null);
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 8131e09b49..a5c38e7e33 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
@@ -174,47 +175,47 @@ public class SlowMessageStore implements MessageStore
}
- public void createExchange(Exchange exchange) throws AMQException
+ public void createExchange(Exchange exchange) throws AMQStoreException
{
doPreDelay("createExchange");
_realStore.createExchange(exchange);
doPostDelay("createExchange");
}
- public void removeExchange(Exchange exchange) throws AMQException
+ public void removeExchange(Exchange exchange) throws AMQStoreException
{
doPreDelay("removeExchange");
_realStore.removeExchange(exchange);
doPostDelay("removeExchange");
}
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
doPreDelay("bindQueue");
_realStore.bindQueue(exchange, routingKey, queue, args);
doPostDelay("bindQueue");
}
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
doPreDelay("unbindQueue");
_realStore.unbindQueue(exchange, routingKey, queue, args);
doPostDelay("unbindQueue");
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue) throws AMQStoreException
{
createQueue(queue, null);
}
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
doPreDelay("createQueue");
_realStore.createQueue(queue, arguments);
doPostDelay("createQueue");
}
- public void removeQueue(AMQQueue queue) throws AMQException
+ public void removeQueue(AMQQueue queue) throws AMQStoreException
{
doPreDelay("removeQueue");
_realStore.removeQueue(queue);
@@ -268,7 +269,7 @@ public class SlowMessageStore implements MessageStore
}
public void enqueueMessage(TransactionLogResource queue, Long messageId)
- throws AMQException
+ throws AMQStoreException
{
doPreDelay("enqueueMessage");
_underlying.enqueueMessage(queue, messageId);
@@ -276,7 +277,7 @@ public class SlowMessageStore implements MessageStore
}
public void dequeueMessage(TransactionLogResource queue, Long messageId)
- throws AMQException
+ throws AMQStoreException
{
doPreDelay("dequeueMessage");
_underlying.dequeueMessage(queue, messageId);
@@ -284,7 +285,7 @@ public class SlowMessageStore implements MessageStore
}
public void commitTran()
- throws AMQException
+ throws AMQStoreException
{
doPreDelay("commitTran");
_underlying.commitTran();
@@ -292,7 +293,7 @@ public class SlowMessageStore implements MessageStore
}
public StoreFuture commitTranAsync()
- throws AMQException
+ throws AMQStoreException
{
doPreDelay("commitTran");
StoreFuture future = _underlying.commitTranAsync();
@@ -301,7 +302,7 @@ public class SlowMessageStore implements MessageStore
}
public void abortTran()
- throws AMQException
+ throws AMQStoreException
{
doPreDelay("abortTran");
_underlying.abortTran();
@@ -309,7 +310,7 @@ public class SlowMessageStore implements MessageStore
}
}
- public void updateQueue(AMQQueue queue) throws AMQException
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
{
doPreDelay("updateQueue");
_realStore.updateQueue(queue);