diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-26 14:37:43 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-26 14:37:43 +0000 |
| commit | 84a53cdf08a58991b80487be8fe062024175f7e9 (patch) | |
| tree | 16decc4e68eb607ca73fc22f691020478125534c /java/broker/src/main | |
| parent | c36251461bb48a12e540abc554f5b2e0495f4ead (diff) | |
| download | qpid-python-84a53cdf08a58991b80487be8fe062024175f7e9.tar.gz | |
QPID-2659: Add AMQStoreException to message stores
This is a sub-class of AMQInternalException, which encapsulates error code 541,
or INTERNAL_ERROR.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@979315 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
7 files changed, 151 insertions, 154 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java index 7393f27ab4..400ce50bc4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -96,7 +97,7 @@ public class BindingFactory removeBinding(this); } - public void onClose(final Exchange exchange) throws AMQSecurityException + public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException { removeBinding(this); } @@ -140,7 +141,7 @@ public class BindingFactory - public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException + public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { return makeBinding(bindingKey, queue, exchange, arguments, false, false); } @@ -149,12 +150,12 @@ public class BindingFactory public boolean replaceBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, - final Map<String, Object> arguments) throws AMQSecurityException + final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { return makeBinding(bindingKey, queue, exchange, arguments, false, true); } - private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException + private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException { assert queue != null; if (bindingKey == null) @@ -187,14 +188,7 @@ public class BindingFactory if (b.isDurable() && !restore) { - try - { - _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); - } - catch (AMQException e) - { - throw new RuntimeException(e); // FIXME - } + _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); } queue.addQueueDeleteTask(b); @@ -217,18 +211,18 @@ public class BindingFactory return getVirtualHost().getConfigStore(); } - public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException + public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException { makeBinding(bindingKey,queue,exchange,argumentMap,true, false); } - public void removeBinding(final Binding b) throws AMQSecurityException + public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException { removeBinding(b.getBindingKey(), b.getQueue(), b.getExchange(), b.getArguments()); } - public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException + public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { assert queue != null; if (bindingKey == null) @@ -261,17 +255,10 @@ public class BindingFactory if (b.isDurable()) { - try - { - _configSource.getDurableConfigurationStore().unbindQueue(exchange, - new AMQShortString(bindingKey), - queue, - FieldTable.convertToFieldTable(arguments)); - } - catch (AMQException e) - { - throw new RuntimeException(e); // FIXME - } + _configSource.getDurableConfigurationStore().unbindQueue(exchange, + new AMQShortString(bindingKey), + queue, + FieldTable.convertToFieldTable(arguments)); } b.logDestruction(); getConfigStore().removeConfiguredObject(b); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 13fe767d3f..356a7f89b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -139,6 +140,6 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig public static interface Task { - public void onClose(Exchange exchange) throws AMQSecurityException; + public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 62f2d14e09..838867f233 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -20,20 +20,6 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.queue.AMQQueue; - import java.io.ByteArrayInputStream; import java.io.File; import java.lang.ref.SoftReference; @@ -52,7 +38,26 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.queue.AMQQueue; +/** + * An implementation of a {@link MessageStore} that uses Apache Derby as the persistance + * mechanism. + * + * TODO extract the SQL statements into a generic JDBC store + */ public class DerbyMessageStore implements MessageStore { @@ -221,7 +226,8 @@ public class DerbyMessageStore implements MessageStore //Update to pick up QPID_WORK and use that as the default location not just derbyDB - final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB"); + final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + + File.separator + "derbyDB"); File environmentPath = new File(databasePath); if (!environmentPath.exists()) @@ -387,13 +393,13 @@ public class DerbyMessageStore implements MessageStore catch (SQLException e) { - throw new AMQException("Error recovering persistent state: " + e, e); + throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); } } - private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException, AMQException + private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException { Connection conn = newAutoCommitConnection(); @@ -432,7 +438,7 @@ public class DerbyMessageStore implements MessageStore } - private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws AMQException, SQLException + private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException { List<String> exchanges = new ArrayList<String>(); @@ -468,7 +474,7 @@ public class DerbyMessageStore implements MessageStore } - private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws AMQException, SQLException + private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException { _logger.info("Recovering bindings..."); @@ -552,7 +558,7 @@ public class DerbyMessageStore implements MessageStore public StoredMessage getMessage(long messageNumber) { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public void removeMessage(long messageId) @@ -599,12 +605,12 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e, e); + throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); } } - public void createExchange(Exchange exchange) throws AMQException + public void createExchange(Exchange exchange) throws AMQStoreException { if (_state != State.RECOVERING) { @@ -645,13 +651,13 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error adding Exchange with name " + exchange.getNameShortString() + " to database: " + e, e); + throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e); } } } - public void removeExchange(Exchange exchange) throws AMQException + public void removeExchange(Exchange exchange) throws AMQStoreException { Connection conn = null; @@ -664,12 +670,12 @@ public class DerbyMessageStore implements MessageStore stmt.close(); if(results == 0) { - throw new AMQException("Exchange " + exchange.getNameShortString() + " not found"); + throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found"); } } catch (SQLException e) { - throw new AMQException("Error deleting exchange with name " + exchange.getNameShortString() + " from database: " + e, e); + throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e); } finally { @@ -689,7 +695,7 @@ public class DerbyMessageStore implements MessageStore } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - throws AMQException + throws AMQStoreException { if (_state != State.RECOVERING) { @@ -735,8 +741,8 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " to database: " + e, e); + throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " + + exchange.getNameShortString() + " to database: " + e.getMessage(), e); } finally { @@ -760,7 +766,7 @@ public class DerbyMessageStore implements MessageStore } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - throws AMQException + throws AMQStoreException { Connection conn = null; @@ -778,14 +784,14 @@ public class DerbyMessageStore implements MessageStore if(result != 1) { - throw new AMQException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange " + throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange " + exchange.getNameShortString() + " not found"); } } catch (SQLException e) { - throw new AMQException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " in database: " + e, e); + throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " + + exchange.getNameShortString() + " in database: " + e.getMessage(), e); } finally { @@ -806,12 +812,12 @@ public class DerbyMessageStore implements MessageStore } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue) throws AMQStoreException { createQueue(queue, null); } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called"); @@ -858,7 +864,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e); + throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); } } } @@ -870,9 +876,9 @@ public class DerbyMessageStore implements MessageStore * NOTE: Currently only updates the exclusivity. * * @param queue The queue to update the entry for. - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - public void updateQueue(final AMQQueue queue) throws AMQException + public void updateQueue(final AMQQueue queue) throws AMQStoreException { if (_state != State.RECOVERING) { @@ -901,7 +907,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e); + throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); } } @@ -931,7 +937,7 @@ public class DerbyMessageStore implements MessageStore return connection; } - public void removeQueue(final AMQQueue queue) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQStoreException { AMQShortString name = queue.getNameShortString(); _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); @@ -947,12 +953,12 @@ public class DerbyMessageStore implements MessageStore if (results == 0) { - throw new AMQException("Queue " + name + " not found"); + throw new AMQStoreException("Queue " + name + " not found"); } } catch (SQLException e) { - throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e); + throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e); } finally { @@ -978,7 +984,7 @@ public class DerbyMessageStore implements MessageStore return new DerbyTransaction(); } - public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException + public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException { String name = queue.getResourceName(); @@ -1000,14 +1006,14 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - _logger.error("Failed to enqueue: " + e, e); - throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name + _logger.error("Failed to enqueue: " + e.getMessage(), e); + throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name + " to database", e); } } - public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException + public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException { String name = queue.getResourceName(); @@ -1025,7 +1031,7 @@ public class DerbyMessageStore implements MessageStore if(results != 1) { - throw new AMQException("Unable to find message with id " + messageId + " on queue " + name); + throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); } if (_logger.isDebugEnabled()) @@ -1035,8 +1041,8 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - _logger.error("Failed to dequeue: " + e, e); - throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name + _logger.error("Failed to dequeue: " + e.getMessage(), e); + throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name + " from database", e); } @@ -1058,7 +1064,7 @@ public class DerbyMessageStore implements MessageStore } - public void commitTran(ConnectionWrapper connWrapper) throws AMQException + public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException { try @@ -1075,7 +1081,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error commit tx: " + e, e); + throw new AMQStoreException("Error commit tx: " + e.getMessage(), e); } finally { @@ -1083,7 +1089,7 @@ public class DerbyMessageStore implements MessageStore } } - public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQException + public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException { commitTran(connWrapper); return new StoreFuture() @@ -1101,11 +1107,11 @@ public class DerbyMessageStore implements MessageStore } - public void abortTran(ConnectionWrapper connWrapper) throws AMQException + public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException { if (connWrapper == null) { - throw new AMQException("Fatal internal error: transactional context is empty at abortTran"); + throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran"); } if (_logger.isDebugEnabled()) @@ -1121,7 +1127,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error aborting transaction: " + e, e); + throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e); } } @@ -1310,7 +1316,7 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e, e); + throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); } } @@ -1375,7 +1381,7 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e, e); + throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); } @@ -1388,11 +1394,11 @@ public class DerbyMessageStore implements MessageStore } - private synchronized void stateTransition(State requiredState, State newState) throws AMQException + private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException { if (_state != requiredState) { - throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState + throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState + "; currently in state: " + _state); } @@ -1417,28 +1423,28 @@ public class DerbyMessageStore implements MessageStore } } - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId); } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId); } - public void commitTran() throws AMQException + public void commitTran() throws AMQStoreException { DerbyMessageStore.this.commitTran(_connWrapper); } - public StoreFuture commitTranAsync() throws AMQException + public StoreFuture commitTranAsync() throws AMQStoreException { return DerbyMessageStore.this.commitTranAsync(_connWrapper); } - public void abortTran() throws AMQException + public void abortTran() throws AMQStoreException { DerbyMessageStore.this.abortTran(_connWrapper); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index c169e3bcff..5fb23653cb 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.AMQException; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.queue.AMQQueue; public interface DurableConfigurationStore { @@ -55,18 +55,18 @@ public interface DurableConfigurationStore * * @param exchange The exchange to persist. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void createExchange(Exchange exchange) throws AMQException; + void createExchange(Exchange exchange) throws AMQStoreException; /** * Removes the specified persistent exchange. * * @param exchange The exchange to remove. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void removeExchange(Exchange exchange) throws AMQException; + void removeExchange(Exchange exchange) throws AMQStoreException; /** * Binds the specified queue to an exchange with a routing key. @@ -76,9 +76,9 @@ public interface DurableConfigurationStore * @param queue The queue to bind. * @param args Additional parameters. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException if the operation fails for any reason. */ - void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; /** * Unbinds the specified from an exchange under a particular routing key. @@ -88,43 +88,44 @@ public interface DurableConfigurationStore * @param queue The queue to unbind. * @param args Additonal parameters. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; /** * Makes the specified queue persistent. * * @param queue The queue to store. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void createQueue(AMQQueue queue) throws AMQException; + void createQueue(AMQQueue queue) throws AMQStoreException; /** * Makes the specified queue persistent. * * @param queue The queue to store. - * * @param arguments The additional arguments to the binding - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * + * @throws AMQStoreException If the operation fails for any reason. */ - void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException; + void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException; /** * Removes the specified queue from the persistent store. * * @param queue The queue to remove. - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * + * @throws AMQStoreException If the operation fails for any reason. */ - void removeQueue(AMQQueue queue) throws AMQException; + void removeQueue(AMQQueue queue) throws AMQStoreException; /** * Updates the specified queue in the persistent store, IF it is already present. If the queue * is not present in the store, it will not be added. * * @param queue The queue to update the entry for. - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void updateQueue(AMQQueue queue) throws AMQException; + void updateQueue(AMQQueue queue) throws AMQStoreException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 9d9312cd26..d008d42fa0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,21 +20,22 @@ */ package org.apache.qpid.server.store; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.commons.configuration.Configuration; - -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.queue.AMQQueue; /** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore @@ -52,24 +53,24 @@ public class MemoryMessageStore implements MessageStore private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() { - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { } - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { } - public void commitTran() throws AMQException + public void commitTran() throws AMQStoreException { } - public StoreFuture commitTranAsync() throws AMQException + public StoreFuture commitTranAsync() throws AMQStoreException { return IMMEDIATE_FUTURE; } - public void abortTran() throws AMQException + public void abortTran() throws AMQStoreException { } @@ -113,43 +114,43 @@ public class MemoryMessageStore implements MessageStore } - public void createExchange(Exchange exchange) throws AMQException + public void createExchange(Exchange exchange) throws AMQStoreException { } - public void removeExchange(Exchange exchange) throws AMQException + public void removeExchange(Exchange exchange) throws AMQStoreException { } - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { } - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue) throws AMQStoreException { // Not requred to do anything } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { // Not required to do anything } - public void removeQueue(final AMQQueue queue) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQStoreException { // Not required to do anything } - public void updateQueue(final AMQQueue queue) throws AMQException + public void updateQueue(final AMQQueue queue) throws AMQStoreException { // Not required to do anything } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java index e6a33e23d6..d196a91930 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java @@ -20,9 +20,8 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.commons.configuration.Configuration; public interface TransactionLog @@ -35,40 +34,40 @@ public interface TransactionLog * * @param queue The queue to place the message on. * @param messageId The message to enqueue. - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException; + void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException; /** * Extracts a message from a specified queue, in a given transactional context. * * @param queue The queue to place the message on. * @param messageId The message to dequeue. - * @throws org.apache.qpid.AMQException If the operation fails for any reason, or if the specified message does not exist. + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. */ - void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException; + void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException; /** * Commits all operations performed within a given transactional context. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void commitTran() throws AMQException; + void commitTran() throws AMQStoreException; /** * Commits all operations performed within a given transactional context. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - StoreFuture commitTranAsync() throws AMQException; + StoreFuture commitTranAsync() throws AMQStoreException; /** * Abandons all operations performed within a given transactional context. * - * @throws org.apache.qpid.AMQException If the operation fails for any reason. + * @throws AMQStoreException If the operation fails for any reason. */ - void abortTran() throws AMQException; + void abortTran() throws AMQStoreException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index c6055f35f6..f2444718af 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -36,6 +36,8 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQBrokerManagerMBean; @@ -696,7 +698,7 @@ public class VirtualHostImpl implements VirtualHost //To change body of implemented methods use File | Settings | File Templates. } - public void createExchange(Exchange exchange) throws AMQException + public void createExchange(Exchange exchange) throws AMQStoreException { if (exchange.isDurable()) { @@ -704,11 +706,11 @@ public class VirtualHostImpl implements VirtualHost } } - public void removeExchange(Exchange exchange) throws AMQException + public void removeExchange(Exchange exchange) throws AMQStoreException { } - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { if (exchange.isDurable() && queue.isDurable()) { @@ -716,16 +718,16 @@ public class VirtualHostImpl implements VirtualHost } } - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue) throws AMQStoreException { createQueue(queue, null); } - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { if (queue.isDurable()) { @@ -733,7 +735,7 @@ public class VirtualHostImpl implements VirtualHost } } - public void removeQueue(AMQQueue queue) throws AMQException + public void removeQueue(AMQQueue queue) throws AMQStoreException { } @@ -766,7 +768,7 @@ public class VirtualHostImpl implements VirtualHost } } - public void updateQueue(AMQQueue queue) throws AMQException + public void updateQueue(AMQQueue queue) throws AMQStoreException { } } |
