diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-04-16 12:21:48 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-16 12:21:48 +0000 |
| commit | b5d81926754ceb238c330ba6e7ec2e37e53cbabe (patch) | |
| tree | 25846f595fd86704998bd8d373a142a0b10aa9b1 | |
| parent | a2669a1225e2e9a159ad951a16c36fcda8b105df (diff) | |
| download | qpid-python-b5d81926754ceb238c330ba6e7ec2e37e53cbabe.tar.gz | |
QPID-1814 : Relax Derby so that it does not error if you create an existing Exchange or Queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@765593 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java | 77 |
1 files changed, 50 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index e7f9c777c9..47d9311453 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -104,9 +104,12 @@ public class DerbyMessageStore implements MessageStore private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )"; private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )"; private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME; + private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; private static final String SELECT_FROM_BINDINGS = "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?"; + private static final String FIND_BINDING = + "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? "; private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"; @@ -631,29 +634,41 @@ public class DerbyMessageStore implements MessageStore try { conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS); + + PreparedStatement stmt = conn.prepareStatement(FIND_BINDING); stmt.setString(1, exchange.getName().toString() ); stmt.setString(2, queue.getName().toString()); stmt.setString(3, routingKey == null ? null : routingKey.toString()); - if(args != null) - { - /* This would be the Java 6 way of setting a Blob - Blob blobArgs = conn.createBlob(); - blobArgs.setBytes(0, args.getDataAsBytes()); - stmt.setBlob(4, blobArgs); - */ - byte[] bytes = args.getDataAsBytes(); - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - stmt.setBinaryStream(4, bis, bytes.length); - } - else + + ResultSet rs = stmt.executeQuery(); + + // If this binding is not already in the store then create it. + if (!rs.next()) { - stmt.setNull(4, Types.BLOB); - } + stmt = conn.prepareStatement(INSERT_INTO_BINDINGS); + stmt.setString(1, exchange.getName().toString() ); + stmt.setString(2, queue.getName().toString()); + stmt.setString(3, routingKey == null ? null : routingKey.toString()); + if(args != null) + { + /* This would be the Java 6 way of setting a Blob + Blob blobArgs = conn.createBlob(); + blobArgs.setBytes(0, args.getDataAsBytes()); + stmt.setBlob(4, blobArgs); + */ + byte[] bytes = args.getDataAsBytes(); + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + stmt.setBinaryStream(4, bis, bytes.length); + } + else + { + stmt.setNull(4, Types.BLOB); + } - stmt.executeUpdate(); - conn.commit(); - stmt.close(); + stmt.executeUpdate(); + conn.commit(); + stmt.close(); + } } catch (SQLException e) { @@ -744,19 +759,27 @@ public class DerbyMessageStore implements MessageStore { Connection conn = newConnection(); - PreparedStatement stmt = - conn.prepareStatement(INSERT_INTO_QUEUE); - + PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); stmt.setString(1, queue.getName().toString()); - stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString()); - stmt.execute(); + ResultSet rs = stmt.executeQuery(); - stmt.close(); + // If we don't have any data in the result set then we can add this queue + if (!rs.next()) + { + stmt = conn.prepareStatement(INSERT_INTO_QUEUE); - conn.commit(); + stmt.setString(1, queue.getName().toString()); + stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString()); - conn.close(); + stmt.execute(); + + stmt.close(); + + conn.commit(); + + conn.close(); + } } catch (SQLException e) { @@ -889,7 +912,7 @@ public class DerbyMessageStore implements MessageStore if (_logger.isDebugEnabled()) { - _logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); + _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]"); } } catch (SQLException e) |
