diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-26 14:37:43 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-26 14:37:43 +0000 |
| commit | 84a53cdf08a58991b80487be8fe062024175f7e9 (patch) | |
| tree | 16decc4e68eb607ca73fc22f691020478125534c /java | |
| parent | c36251461bb48a12e540abc554f5b2e0495f4ead (diff) | |
| download | qpid-python-84a53cdf08a58991b80487be8fe062024175f7e9.tar.gz | |
QPID-2659: Add AMQStoreException to message stores
This is a sub-class of AMQInternalException, which encapsulates error code 541,
or INTERNAL_ERROR.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@979315 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
14 files changed, 286 insertions, 189 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java index 7393f27ab4..400ce50bc4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -96,7 +97,7 @@ public class BindingFactory removeBinding(this); } - public void onClose(final Exchange exchange) throws AMQSecurityException + public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException { removeBinding(this); } @@ -140,7 +141,7 @@ public class BindingFactory - public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException + public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { return makeBinding(bindingKey, queue, exchange, arguments, false, false); } @@ -149,12 +150,12 @@ public class BindingFactory public boolean replaceBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, - final Map<String, Object> arguments) throws AMQSecurityException + final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { return makeBinding(bindingKey, queue, exchange, arguments, false, true); } - private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException + private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException { assert queue != null; if (bindingKey == null) @@ -187,14 +188,7 @@ public class BindingFactory if (b.isDurable() && !restore) { - try - { - _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); - } - catch (AMQException e) - { - throw new RuntimeException(e); // FIXME - } + _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); } queue.addQueueDeleteTask(b); @@ -217,18 +211,18 @@ public class BindingFactory return getVirtualHost().getConfigStore(); } - public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException + public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException { makeBinding(bindingKey,queue,exchange,argumentMap,true, false); } - public void removeBinding(final Binding b) throws AMQSecurityException + public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException { removeBinding(b.getBindingKey(), b.getQueue(), b.getExchange(), b.getArguments()); } - public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException + public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { assert queue != null; if (bindingKey == null) @@ -261,17 +255,10 @@ public class BindingFactory if (b.isDurable()) { - try - { - _configSource.getDurableConfigurationStore().unbindQueue(exchange, - new AMQShortString(bindingKey), - queue, - FieldTable.convertToFieldTable(arguments)); - } - catch (AMQException e) - { - throw new RuntimeException(e); // FIXME - } + _configSource.getDurableConfigurationStore().unbindQueue(exchange, + new AMQShortString(bindingKey), + queue, + FieldTable.convertToFieldTable(arguments)); } b.logDestruction(); getConfigStore().removeConfiguredObject(b); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 13fe767d3f..356a7f89b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -139,6 +140,6 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig public static interface Task { - public void onClose(Exchange exchange) throws AMQSecurityException; + public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 62f2d14e09..838867f233 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -20,20 +20,6 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.queue.AMQQueue; - import java.io.ByteArrayInputStream; import java.io.File; import java.lang.ref.SoftReference; @@ -52,7 +38,26 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.queue.AMQQueue; +/** + * An implementation of a {@link MessageStore} that uses Apache Derby as the persistance + * mechanism. + * + * TODO extract the SQL statements into a generic JDBC store + */ public class DerbyMessageStore implements MessageStore { @@ -221,7 +226,8 @@ public class DerbyMessageStore implements MessageStore //Update to pick up QPID_WORK and use that as the default location not just derbyDB - final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB"); + final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + + File.separator + "derbyDB"); File environmentPath = new File(databasePath); if (!environmentPath.exists()) @@ -387,13 +393,13 @@ public class DerbyMessageStore implements MessageStore catch (SQLException e) { - throw new AMQException("Error recovering persistent state: " + e, e); + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); } } - private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException, AMQException + private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException { Connection conn = newAutoCommitConnection(); @@ -432,7 +438,7 @@ public class DerbyMessageStore implements MessageStore } - private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws AMQException, SQLException + private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException { List<String> exchanges = new ArrayList<String>(); @@ -468,7 +474,7 @@ public class DerbyMessageStore implements MessageStore } - private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws AMQException, SQLException + private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException { _logger.info("Recovering bindings..."); @@ -552,7 +558,7 @@ public class DerbyMessageStore implements MessageStore public StoredMessage getMessage(long messageNumber) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void removeMessage(long messageId) @@ -599,12 +605,12 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e, e); + throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); } } - public void createExchange(Exchange exchange) throws AMQException + public void createExchange(Exchange exchange) throws AMQStoreException { if (_state != State.RECOVERING) { @@ -645,13 +651,13 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error adding Exchange with name " + exchange.getNameShortString() + " to database: " + e, e); + throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e); } } } - public void removeExchange(Exchange exchange) throws AMQException + public void removeExchange(Exchange exchange) throws AMQStoreException { Connection conn = null; @@ -664,12 +670,12 @@ public class DerbyMessageStore implements MessageStore stmt.close(); if(results == 0) { - throw new AMQException("Exchange " + exchange.getNameShortString() + " not found"); + throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found"); } } catch (SQLException e) { - throw new AMQException("Error deleting exchange with name " + exchange.getNameShortString() + " from database: " + e, e); + throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e); } finally { @@ -689,7 +695,7 @@ public class DerbyMessageStore implements MessageStore } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - throws AMQException + throws AMQStoreException { if (_state != State.RECOVERING) { @@ -735,8 +741,8 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " to database: " + e, e); + throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " + + exchange.getNameShortString() + " to database: " + e.getMessage(), e); } finally { @@ -760,7 +766,7 @@ public class DerbyMessageStore implements MessageStore } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - throws AMQException + throws AMQStoreException { Connection conn = null; @@ -778,14 +784,14 @@ public class DerbyMessageStore implements MessageStore if(result != 1) { - throw new AMQException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange " + throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange " + exchange.getNameShortString() + " not found"); } } catch (SQLException e) { - throw new AMQException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " in database: " + e, e); + throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " + + exchange.getNameShortString() + " in database: " + e.getMessage(), e); } finally { @@ -806,12 +812,12 @@ public class DerbyMessageStore implements MessageStore } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue) throws AMQStoreException { createQueue(queue, null); } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called"); @@ -858,7 +864,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e); + throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); } } } @@ -870,9 +876,9 @@ public class DerbyMessageStore implements MessageStore * NOTE: Currently only updates the exclusivity. * * @param queue The queue to update the entry for. - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - public void updateQueue(final AMQQueue queue) throws AMQException + public void updateQueue(final AMQQueue queue) throws AMQStoreException { if (_state != State.RECOVERING) { @@ -901,7 +907,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e); + throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); } } @@ -931,7 +937,7 @@ public class DerbyMessageStore implements MessageStore return connection; } - public void removeQueue(final AMQQueue queue) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQStoreException { AMQShortString name = queue.getNameShortString(); _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); @@ -947,12 +953,12 @@ public class DerbyMessageStore implements MessageStore if (results == 0) { - throw new AMQException("Queue " + name + " not found"); + throw new AMQStoreException("Queue " + name + " not found"); } } catch (SQLException e) { - throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e); + throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e); } finally { @@ -978,7 +984,7 @@ public class DerbyMessageStore implements MessageStore return new DerbyTransaction(); } - public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException + public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException { String name = queue.getResourceName(); @@ -1000,14 +1006,14 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - _logger.error("Failed to enqueue: " + e, e); - throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name + _logger.error("Failed to enqueue: " + e.getMessage(), e); + throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name + " to database", e); } } - public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException + public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException { String name = queue.getResourceName(); @@ -1025,7 +1031,7 @@ public class DerbyMessageStore implements MessageStore if(results != 1) { - throw new AMQException("Unable to find message with id " + messageId + " on queue " + name); + throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); } if (_logger.isDebugEnabled()) @@ -1035,8 +1041,8 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - _logger.error("Failed to dequeue: " + e, e); - throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name + _logger.error("Failed to dequeue: " + e.getMessage(), e); + throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name + " from database", e); } @@ -1058,7 +1064,7 @@ public class DerbyMessageStore implements MessageStore } - public void commitTran(ConnectionWrapper connWrapper) throws AMQException + public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException { try @@ -1075,7 +1081,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error commit tx: " + e, e); + throw new AMQStoreException("Error commit tx: " + e.getMessage(), e); } finally { @@ -1083,7 +1089,7 @@ public class DerbyMessageStore implements MessageStore } } - public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQException + public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException { commitTran(connWrapper); return new StoreFuture() @@ -1101,11 +1107,11 @@ public class DerbyMessageStore implements MessageStore } - public void abortTran(ConnectionWrapper connWrapper) throws AMQException + public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException { if (connWrapper == null) { - throw new AMQException("Fatal internal error: transactional context is empty at abortTran"); + throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran"); } if (_logger.isDebugEnabled()) @@ -1121,7 +1127,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error aborting transaction: " + e, e); + throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e); } } @@ -1310,7 +1316,7 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e, e); + throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); } } @@ -1375,7 +1381,7 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e, e); + throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); } @@ -1388,11 +1394,11 @@ public class DerbyMessageStore implements MessageStore } - private synchronized void stateTransition(State requiredState, State newState) throws AMQException + private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException { if (_state != requiredState) { - throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState + throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState + "; currently in state: " + _state); } @@ -1417,28 +1423,28 @@ public class DerbyMessageStore implements MessageStore } } - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId); } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId); } - public void commitTran() throws AMQException + public void commitTran() throws AMQStoreException { DerbyMessageStore.this.commitTran(_connWrapper); } - public StoreFuture commitTranAsync() throws AMQException + public StoreFuture commitTranAsync() throws AMQStoreException { return DerbyMessageStore.this.commitTranAsync(_connWrapper); } - public void abortTran() throws AMQException + public void abortTran() throws AMQStoreException { DerbyMessageStore.this.abortTran(_connWrapper); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index c169e3bcff..5fb23653cb 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.AMQException; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.queue.AMQQueue; public interface DurableConfigurationStore { @@ -55,18 +55,18 @@ public interface DurableConfigurationStore * * @param exchange The exchange to persist. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void createExchange(Exchange exchange) throws AMQException; + void createExchange(Exchange exchange) throws AMQStoreException; /** * Removes the specified persistent exchange. * * @param exchange The exchange to remove. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void removeExchange(Exchange exchange) throws AMQException; + void removeExchange(Exchange exchange) throws AMQStoreException; /** * Binds the specified queue to an exchange with a routing key. @@ -76,9 +76,9 @@ public interface DurableConfigurationStore * @param queue The queue to bind. * @param args Additional parameters. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException if the operation fails for any reason. */ - void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; /** * Unbinds the specified from an exchange under a particular routing key. @@ -88,43 +88,44 @@ public interface DurableConfigurationStore * @param queue The queue to unbind. * @param args Additonal parameters. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; /** * Makes the specified queue persistent. * * @param queue The queue to store. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void createQueue(AMQQueue queue) throws AMQException; + void createQueue(AMQQueue queue) throws AMQStoreException; /** * Makes the specified queue persistent. * * @param queue The queue to store. - * * @param arguments The additional arguments to the binding - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * + * @throws AMQStoreException If the operation fails for any reason. */ - void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException; + void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException; /** * Removes the specified queue from the persistent store. * * @param queue The queue to remove. - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * + * @throws AMQStoreException If the operation fails for any reason. */ - void removeQueue(AMQQueue queue) throws AMQException; + void removeQueue(AMQQueue queue) throws AMQStoreException; /** * Updates the specified queue in the persistent store, IF it is already present. If the queue * is not present in the store, it will not be added. * * @param queue The queue to update the entry for. - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void updateQueue(AMQQueue queue) throws AMQException; + void updateQueue(AMQQueue queue) throws AMQStoreException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 9d9312cd26..d008d42fa0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,21 +20,22 @@ */ package org.apache.qpid.server.store; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.commons.configuration.Configuration; - -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.queue.AMQQueue; /** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore @@ -52,24 +53,24 @@ public class MemoryMessageStore implements MessageStore private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { } - public void commitTran() throws AMQException + public void commitTran() throws AMQStoreException { } - public StoreFuture commitTranAsync() throws AMQException + public StoreFuture commitTranAsync() throws AMQStoreException { return IMMEDIATE_FUTURE; } - public void abortTran() throws AMQException + public void abortTran() throws AMQStoreException { } @@ -113,43 +114,43 @@ public class MemoryMessageStore implements MessageStore } - public void createExchange(Exchange exchange) throws AMQException + public void createExchange(Exchange exchange) throws AMQStoreException { } - public void removeExchange(Exchange exchange) throws AMQException + public void removeExchange(Exchange exchange) throws AMQStoreException { } - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { } - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue) throws AMQStoreException { // Not requred to do anything } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { // Not required to do anything } - public void removeQueue(final AMQQueue queue) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQStoreException { // Not required to do anything } - public void updateQueue(final AMQQueue queue) throws AMQException + public void updateQueue(final AMQQueue queue) throws AMQStoreException { // Not required to do anything } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java index e6a33e23d6..d196a91930 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java @@ -20,9 +20,8 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.commons.configuration.Configuration; public interface TransactionLog @@ -35,40 +34,40 @@ public interface TransactionLog * * @param queue The queue to place the message on. * @param messageId The message to enqueue. - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException; + void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException; /** * Extracts a message from a specified queue, in a given transactional context. * * @param queue The queue to place the message on. * @param messageId The message to dequeue. - * @throws org.apache.qpid.AMQException If the operation fails for any reason, or if the specified message does not exist. + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ - void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException; + void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException; /** * Commits all operations performed within a given transactional context. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void commitTran() throws AMQException; + void commitTran() throws AMQStoreException; /** * Commits all operations performed within a given transactional context. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - StoreFuture commitTranAsync() throws AMQException; + StoreFuture commitTranAsync() throws AMQStoreException; /** * Abandons all operations performed within a given transactional context. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void abortTran() throws AMQException; + void abortTran() throws AMQStoreException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index c6055f35f6..f2444718af 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -36,6 +36,8 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQBrokerManagerMBean; @@ -696,7 +698,7 @@ public class VirtualHostImpl implements VirtualHost //To change body of implemented methods use File | Settings | File Templates. } - public void createExchange(Exchange exchange) throws AMQException + public void createExchange(Exchange exchange) throws AMQStoreException { if (exchange.isDurable()) { @@ -704,11 +706,11 @@ public class VirtualHostImpl implements VirtualHost } } - public void removeExchange(Exchange exchange) throws AMQException + public void removeExchange(Exchange exchange) throws AMQStoreException { } - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { if (exchange.isDurable() && queue.isDurable()) { @@ -716,16 +718,16 @@ public class VirtualHostImpl implements VirtualHost } } - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue) throws AMQStoreException { createQueue(queue, null); } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { if (queue.isDurable()) { @@ -733,7 +735,7 @@ public class VirtualHostImpl implements VirtualHost } } - public void removeQueue(AMQQueue queue) throws AMQException + public void removeQueue(AMQQueue queue) throws AMQStoreException { } @@ -766,7 +768,7 @@ public class VirtualHostImpl implements VirtualHost } } - public void updateQueue(AMQQueue queue) throws AMQException + public void updateQueue(AMQQueue queue) throws AMQStoreException { } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java index 691d5acfa9..e3b07b072b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -54,7 +55,7 @@ public class TopicConfigurationTest extends InternalBrokerBaseCase * @throws ConfigurationException * @throws AMQSecurityException */ - public void testTopicCreation() throws ConfigurationException, AMQSecurityException + public void testTopicCreation() throws ConfigurationException, AMQSecurityException, AMQInternalException { Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); _virtualHost.getBindingFactory().addBinding("stocks.nyse.appl", _queue, topicExchange, null); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index d9378a223b..b0a655e8b6 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -21,10 +21,10 @@ package org.apache.qpid.server.queue; */ -import junit.framework.TestCase; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -149,7 +149,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); } - public void testBinding() throws AMQSecurityException + public void testBinding() throws AMQSecurityException, AMQInternalException { _virtualHost.getBindingFactory().addBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 8cbc50ecbe..5ff84557d8 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; @@ -76,31 +77,31 @@ public class SkeletonMessageStore implements MessageStore { } - public void createExchange(Exchange exchange) throws AMQException + public void createExchange(Exchange exchange) throws AMQStoreException { //To change body of implemented methods use File | Settings | File Templates. } - public void removeExchange(Exchange exchange) throws AMQException + public void removeExchange(Exchange exchange) throws AMQStoreException { //To change body of implemented methods use File | Settings | File Templates. } - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { //To change body of implemented methods use File | Settings | File Templates. } - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { //To change body of implemented methods use File | Settings | File Templates. } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue) throws AMQStoreException { } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { } @@ -161,7 +162,7 @@ public class SkeletonMessageStore implements MessageStore return null; //To change body of implemented methods use File | Settings | File Templates. } - public void removeQueue(final AMQQueue queue) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQStoreException { } @@ -179,22 +180,22 @@ public class SkeletonMessageStore implements MessageStore return new Transaction() { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { //To change body of implemented methods use File | Settings | File Templates. } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { //To change body of implemented methods use File | Settings | File Templates. } - public void commitTran() throws AMQException + public void commitTran() throws AMQStoreException { //To change body of implemented methods use File | Settings | File Templates. } - public StoreFuture commitTranAsync() throws AMQException + public StoreFuture commitTranAsync() throws AMQStoreException { return new StoreFuture() { @@ -210,14 +211,14 @@ public class SkeletonMessageStore implements MessageStore }; } - public void abortTran() throws AMQException + public void abortTran() throws AMQStoreException { //To change body of implemented methods use File | Settings | File Templates. } }; } - public void updateQueue(AMQQueue queue) throws AMQException + public void updateQueue(AMQQueue queue) throws AMQStoreException { } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index ab8c1e7c9c..e8d0b99e6e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.framing.abstraction.ContentChunk; @@ -68,21 +68,21 @@ public class TestableMemoryMessageStore extends MemoryMessageStore private class TestableTransaction implements Transaction { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { getMessages().put(messageId, (AMQQueue)queue); } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { getMessages().remove(messageId); } - public void commitTran() throws AMQException + public void commitTran() throws AMQStoreException { } - public StoreFuture commitTranAsync() throws AMQException + public StoreFuture commitTranAsync() throws AMQStoreException { return new StoreFuture() { @@ -98,7 +98,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore }; } - public void abortTran() throws AMQException + public void abortTran() throws AMQStoreException { } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQInternalException.java b/java/common/src/main/java/org/apache/qpid/AMQInternalException.java new file mode 100644 index 0000000000..59dc800c0e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQInternalException.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + +/** + * InternalException encapsulates error code 541, or {@link AMQConstant#INTERNAL_ERROR} exceptions relating to the + * AMQ protocol. It is used to report internal failures and errors that occur within the broker. + */ +public class AMQInternalException extends AMQException +{ + /** serialVersionUID */ + private static final long serialVersionUID = 2544449432843381112L; + + /** + * Creates an exception with an optional message and optional underlying cause. + * + * @param msg The exception message. May be null if not to be set. + * @param cause The underlying cause of the exception. May be null if not to be set. + */ + public AMQInternalException(String msg, Throwable cause) + { + super(AMQConstant.INTERNAL_ERROR, ((msg == null) ? "Internal error" : msg), cause); + } + + public AMQInternalException(String msg) + { + this(msg, null); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQStoreException.java b/java/common/src/main/java/org/apache/qpid/AMQStoreException.java new file mode 100644 index 0000000000..8389fe5efa --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQStoreException.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +import java.sql.SQLException; + +/** + * StoreException is a specific type of internal error relating to errors in the message store, such as {@link SQLException}. + */ +public class AMQStoreException extends AMQInternalException +{ + /** serialVersionUID */ + private static final long serialVersionUID = 2859681947490637496L; + + /** + * Creates an exception with an optional message and optional underlying cause. + * + * @param msg The exception message. May be null if not to be set. + * @param cause The underlying cause of the exception. May be null if not to be set. + */ + public AMQStoreException(String msg, Throwable cause) + { + super(((msg == null) ? "Store error" : msg), cause); + } + + public AMQStoreException(String msg) + { + this(msg, null); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 8131e09b49..a5c38e7e33 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; @@ -174,47 +175,47 @@ public class SlowMessageStore implements MessageStore } - public void createExchange(Exchange exchange) throws AMQException + public void createExchange(Exchange exchange) throws AMQStoreException { doPreDelay("createExchange"); _realStore.createExchange(exchange); doPostDelay("createExchange"); } - public void removeExchange(Exchange exchange) throws AMQException + public void removeExchange(Exchange exchange) throws AMQStoreException { doPreDelay("removeExchange"); _realStore.removeExchange(exchange); doPostDelay("removeExchange"); } - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { doPreDelay("bindQueue"); _realStore.bindQueue(exchange, routingKey, queue, args); doPostDelay("bindQueue"); } - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { doPreDelay("unbindQueue"); _realStore.unbindQueue(exchange, routingKey, queue, args); doPostDelay("unbindQueue"); } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue) throws AMQStoreException { createQueue(queue, null); } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { doPreDelay("createQueue"); _realStore.createQueue(queue, arguments); doPostDelay("createQueue"); } - public void removeQueue(AMQQueue queue) throws AMQException + public void removeQueue(AMQQueue queue) throws AMQStoreException { doPreDelay("removeQueue"); _realStore.removeQueue(queue); @@ -268,7 +269,7 @@ public class SlowMessageStore implements MessageStore } public void enqueueMessage(TransactionLogResource queue, Long messageId) - throws AMQException + throws AMQStoreException { doPreDelay("enqueueMessage"); _underlying.enqueueMessage(queue, messageId); @@ -276,7 +277,7 @@ public class SlowMessageStore implements MessageStore } public void dequeueMessage(TransactionLogResource queue, Long messageId) - throws AMQException + throws AMQStoreException { doPreDelay("dequeueMessage"); _underlying.dequeueMessage(queue, messageId); @@ -284,7 +285,7 @@ public class SlowMessageStore implements MessageStore } public void commitTran() - throws AMQException + throws AMQStoreException { doPreDelay("commitTran"); _underlying.commitTran(); @@ -292,7 +293,7 @@ public class SlowMessageStore implements MessageStore } public StoreFuture commitTranAsync() - throws AMQException + throws AMQStoreException { doPreDelay("commitTran"); StoreFuture future = _underlying.commitTranAsync(); @@ -301,7 +302,7 @@ public class SlowMessageStore implements MessageStore } public void abortTran() - throws AMQException + throws AMQStoreException { doPreDelay("abortTran"); _underlying.abortTran(); @@ -309,7 +310,7 @@ public class SlowMessageStore implements MessageStore } } - public void updateQueue(AMQQueue queue) throws AMQException + public void updateQueue(AMQQueue queue) throws AMQStoreException { doPreDelay("updateQueue"); _realStore.updateQueue(queue); |
