diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-07-16 15:19:46 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-07-16 15:19:46 +0000 |
| commit | ee7232042d82b6ce63c6398d61fa518e7beec4e3 (patch) | |
| tree | a197cbfe30bf8a472ce7bedf851352cf8db13d5f /java/broker/src/main | |
| parent | 9f89d600074f5e9f800d202e948adbe131cee5e8 (diff) | |
| download | qpid-python-ee7232042d82b6ce63c6398d61fa518e7beec4e3.tar.gz | |
QPID-2731: enable getting/setting queue exclusivity via JMX
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@964825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
7 files changed, 94 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 225fbec930..de9dc42de8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -280,4 +280,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer ConfigurationPlugin getConfiguration(); ManagedObject getManagedObject(); + + void setExclusive(boolean exclusive) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 806b7f3744..32b71a554b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -274,6 +274,23 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _queue.isOverfull(); } + public boolean isExclusive() + { + return _queue.isExclusive(); + } + + public void setExclusive(boolean exclusive) throws JMException + { + try + { + _queue.setExclusive(exclusive); + } + catch (AMQException e) + { + throw new JMException(e.toString()); + } + } + /** * Checks if there is any notification to be send to the listeners */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d47d229658..489a724254 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -53,6 +53,8 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; + +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -328,6 +330,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _exclusive; } + + public void setExclusive(boolean exclusive) throws AMQException + { + _exclusive = exclusive; + + if(isDurable()) + { + getVirtualHost().getDurableConfigurationStore().updateQueue(this); + } + } public Exchange getAlternateExchange() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 40f265e00f..627f059c53 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -94,6 +94,7 @@ public class DerbyMessageStore implements MessageStore private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME; private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; + private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?"; private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; private static final String SELECT_FROM_BINDINGS = "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name"; @@ -845,6 +846,50 @@ public class DerbyMessageStore implements MessageStore } } } + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * NOTE: Currently only updates the exclusivity. + * + * @param queue The queue to update the entry for. + * @throws org.apache.qpid.AMQException If the operation fails for any reason. + */ + public void updateQueue(final AMQQueue queue) throws AMQException + { + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); + stmt.setString(1, queue.getNameShortString().toString()); + + ResultSet rs = stmt.executeQuery(); + + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY); + + stmt2.setBoolean(1,queue.isExclusive()); + stmt2.setString(2, queue.getNameShortString().toString()); + + stmt2.execute(); + stmt2.close(); + } + + stmt.close(); + conn.close(); + } + catch (SQLException e) + { + throw new AMQException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e); + } + } + + } /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index a50e8e99b4..c169e3bcff 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -118,4 +118,13 @@ public interface DurableConfigurationStore * @throws org.apache.qpid.AMQException If the operation fails for any reason. */ void removeQueue(AMQQueue queue) throws AMQException; + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * @param queue The queue to update the entry for. + * @throws org.apache.qpid.AMQException If the operation fails for any reason. + */ + void updateQueue(AMQQueue queue) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 565afd2539..9d9312cd26 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -148,6 +148,11 @@ public class MemoryMessageStore implements MessageStore { // Not required to do anything } + + public void updateQueue(final AMQQueue queue) throws AMQException + { + // Not required to do anything + } public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index fcb06f56bf..c6055f35f6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -765,6 +765,10 @@ public class VirtualHostImpl implements VirtualHost arguments = args; } } + + public void updateQueue(AMQQueue queue) throws AMQException + { + } } @Override |
