diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-06-09 09:46:11 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-06-09 09:46:11 +0000 |
| commit | 56428d040f004086fd9c22847aec7e3c3ec8fd6a (patch) | |
| tree | c58591738d8f6a49e6a0b68b41d031747d5f2288 /java | |
| parent | c1af4037c6995891e88ee82da2b4454f819325ff (diff) | |
| download | qpid-python-56428d040f004086fd9c22847aec7e3c3ec8fd6a.tar.gz | |
QPID-2650: Make use of connections with auto-commit transactions disabled for metadata, content, and queue entries.
Additionally, make remaining uses of auto-commit enabled connections more visible and remove the erroneous explicit commits on these.
Close completed Statements after use. Add/correct various related debug log statements.
Stop adding vhost name to all environment paths, now just the default value.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@952930 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 106 insertions, 118 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index a87cd32e37..aeec8798d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -218,9 +218,8 @@ public class DerbyMessageStore implements MessageStore { initialiseDriver(); - //Update to pick up QPID_WORK and use that as the default location not just derbyDB - - final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB"); + final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, + System.getProperty("QPID_WORK")+"/derbyDB/" + name); File environmentPath = new File(databasePath); if (!environmentPath.exists()) @@ -234,7 +233,7 @@ public class DerbyMessageStore implements MessageStore CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_STORE_LOCATION(environmentPath.getAbsolutePath())); - createOrOpenDatabase(name, databasePath); + createOrOpenDatabase(databasePath); } private static synchronized void initialiseDriver() throws ClassNotFoundException @@ -245,12 +244,11 @@ public class DerbyMessageStore implements MessageStore } } - private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException + private void createOrOpenDatabase(final String environmentPath) throws SQLException { - //fixme this the _vhost name should not be added here. - _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true"; + _connectionURL = "jdbc:derby:" + environmentPath + ";create=true"; - Connection conn = newConnection(); + Connection conn = newAutoCommitConnection(); createVersionTable(conn); createExchangeTable(conn); @@ -394,8 +392,7 @@ public class DerbyMessageStore implements MessageStore private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException, AMQException { - Connection conn = newConnection(); - + Connection conn = newAutoCommitConnection(); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE); @@ -425,6 +422,9 @@ public class DerbyMessageStore implements MessageStore queues.add(queueName); } + + conn.close(); + return queues; } @@ -436,8 +436,7 @@ public class DerbyMessageStore implements MessageStore Connection conn = null; try { - conn = newConnection(); - + conn = newAutoCommitConnection(); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE); @@ -468,16 +467,12 @@ public class DerbyMessageStore implements MessageStore private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws AMQException, SQLException { - - _logger.info("Recovering bindings..."); - - Connection conn = null; try { - conn = newConnection(); + conn = newAutoCommitConnection(); PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS); @@ -504,6 +499,8 @@ public class DerbyMessageStore implements MessageStore brh.binding(exchangeName, queueName, bindingKey, buf); } + + stmt.close(); } finally { @@ -544,20 +541,16 @@ public class DerbyMessageStore implements MessageStore Connection conn = null; try { - - conn = newConnection(); PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA); stmt.setLong(1,messageId); int results = stmt.executeUpdate(); - + stmt.close(); + if (results == 0) { - - throw new RuntimeException("Message metadata not found for message id " + messageId); } - stmt.close(); if (_logger.isDebugEnabled()) { @@ -567,8 +560,7 @@ public class DerbyMessageStore implements MessageStore stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT); stmt.setLong(1,messageId); results = stmt.executeUpdate(); - - + stmt.close(); conn.commit(); conn.close(); @@ -588,7 +580,7 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error removing Message with id " + messageId + " to database: " + e, e); + throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e, e); } } @@ -603,13 +595,12 @@ public class DerbyMessageStore implements MessageStore try { - conn = newConnection(); + conn = newAutoCommitConnection(); PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE); stmt.setString(1, exchange.getNameShortString().toString()); stmt.execute(); stmt.close(); - conn.commit(); ResultSet rs = stmt.executeQuery(); @@ -622,7 +613,6 @@ public class DerbyMessageStore implements MessageStore stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0); stmt.execute(); stmt.close(); - conn.commit(); } } @@ -636,7 +626,7 @@ public class DerbyMessageStore implements MessageStore } catch (SQLException e) { - throw new AMQException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e, e); + throw new AMQException("Error adding Exchange with name " + exchange.getNameShortString() + " to database: " + e, e); } } @@ -648,23 +638,19 @@ public class DerbyMessageStore implements MessageStore try { - conn = newConnection(); + conn = newAutoCommitConnection(); PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE); stmt.setString(1, exchange.getNameShortString().toString()); int results = stmt.executeUpdate(); + stmt.close(); if(results == 0) { throw new AMQException("Exchange " + exchange.getNameShortString() + " not found"); } - else - { - conn.commit(); - stmt.close(); - } } catch (SQLException e) { - throw new AMQException("Error writing deleting with name " + exchange.getNameShortString() + " from database: " + e, e); + throw new AMQException("Error deleting exchange with name " + exchange.getNameShortString() + " from database: " + e, e); } finally { @@ -690,10 +676,9 @@ public class DerbyMessageStore implements MessageStore { Connection conn = null; - try { - conn = newConnection(); + conn = newAutoCommitConnection(); PreparedStatement stmt = conn.prepareStatement(FIND_BINDING); stmt.setString(1, exchange.getNameShortString().toString() ); @@ -726,7 +711,6 @@ public class DerbyMessageStore implements MessageStore } stmt.executeUpdate(); - conn.commit(); stmt.close(); } } @@ -761,24 +745,23 @@ public class DerbyMessageStore implements MessageStore { Connection conn = null; - try { - conn = newConnection(); + conn = newAutoCommitConnection(); // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS); stmt.setString(1, exchange.getNameShortString().toString() ); stmt.setString(2, queue.getNameShortString().toString()); stmt.setString(3, routingKey == null ? null : routingKey.toString()); - - - if(stmt.executeUpdate() != 1) + + int result = stmt.executeUpdate(); + stmt.close(); + + if(result != 1) { throw new AMQException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange " + exchange.getNameShortString() + " not found"); } - conn.commit(); - stmt.close(); } catch (SQLException e) { @@ -817,7 +800,7 @@ public class DerbyMessageStore implements MessageStore { try { - Connection conn = newConnection(); + Connection conn = newAutoCommitConnection(); PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); stmt.setString(1, queue.getNameShortString().toString()); @@ -849,11 +832,8 @@ public class DerbyMessageStore implements MessageStore stmt.setBinaryStream(4,bis,underlying.length); stmt.execute(); - stmt.close(); - conn.commit(); - conn.close(); } } @@ -864,9 +844,27 @@ public class DerbyMessageStore implements MessageStore } } + /** + * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED + * isolation and with auto-commit transactions enabled. + */ + private Connection newAutoCommitConnection() throws SQLException + { + final Connection connection = newConnection(); + connection.setAutoCommit(true); + + return connection; + } + + /** + * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED + * isolation and with auto-commit transactions disabled. + */ private Connection newConnection() throws SQLException { final Connection connection = DriverManager.getConnection(_connectionURL); + connection.setAutoCommit(false); + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); return connection; } @@ -876,22 +874,18 @@ public class DerbyMessageStore implements MessageStore _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); Connection conn = null; - try { - conn = newConnection(); + conn = newAutoCommitConnection(); PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE); stmt.setString(1, name.toString()); int results = stmt.executeUpdate(); - - + stmt.close(); + if (results == 0) { throw new AMQException("Queue " + name + " not found"); } - - conn.commit(); - stmt.close(); } catch (SQLException e) { @@ -930,16 +924,16 @@ public class DerbyMessageStore implements MessageStore try { - PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); - stmt.setString(1,name); - stmt.setLong(2,messageId); - stmt.executeUpdate(); - connWrapper.requiresCommit(); - if (_logger.isDebugEnabled()) { _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); } + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); + stmt.setString(1,name); + stmt.setLong(2,messageId); + stmt.executeUpdate(); + stmt.close(); } catch (SQLException e) { @@ -964,8 +958,7 @@ public class DerbyMessageStore implements MessageStore stmt.setString(1,name); stmt.setLong(2,messageId); int results = stmt.executeUpdate(); - - connWrapper.requiresCommit(); + stmt.close(); if(results != 1) { @@ -989,23 +982,12 @@ public class DerbyMessageStore implements MessageStore private static final class ConnectionWrapper { private final Connection _connection; - private boolean _requiresCommit; public ConnectionWrapper(Connection conn) { _connection = conn; } - public void setRequiresCommit() - { - _requiresCommit = true; - } - - public boolean requiresCommit() - { - return _requiresCommit; - } - public Connection getConnection() { return _connection; @@ -1071,11 +1053,7 @@ public class DerbyMessageStore implements MessageStore try { Connection conn = connWrapper.getConnection(); - if(connWrapper.requiresCommit()) - { - conn.rollback(); - } - + conn.rollback(); conn.close(); } catch (SQLException e) @@ -1094,6 +1072,11 @@ public class DerbyMessageStore implements MessageStore private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData) throws SQLException { + if(_logger.isDebugEnabled()) + { + _logger.debug("Adding metadata for message " +messageId); + } + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA); stmt.setLong(1,messageId); @@ -1107,8 +1090,13 @@ public class DerbyMessageStore implements MessageStore metaData.writeToBuffer(0, buf); ByteArrayInputStream bis = new ByteArrayInputStream(underlying); stmt.setBinaryStream(2,bis,underlying.length); - stmt.executeUpdate(); - + int result = stmt.executeUpdate(); + stmt.close(); + + if(result == 0) + { + throw new RuntimeException("Unable to add meta data for message " +messageId); + } } @@ -1116,7 +1104,7 @@ public class DerbyMessageStore implements MessageStore private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException { - Connection conn = newConnection(); + Connection conn = newAutoCommitConnection(); MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin(); @@ -1144,8 +1132,6 @@ public class DerbyMessageStore implements MessageStore StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false); messageHandler.message(message); - - } _messageId.set(maxId); @@ -1157,14 +1143,13 @@ public class DerbyMessageStore implements MessageStore private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException { - Connection conn = newConnection(); + Connection conn = newAutoCommitConnection(); TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); - while(rs.next()) { @@ -1172,17 +1157,16 @@ public class DerbyMessageStore implements MessageStore long messageId = rs.getLong(2); queueEntryHandler.queueEntry(queueName,messageId); } - - + + stmt.close(); queueEntryHandler.completeQueueEntryRecovery(); - } StorableMessageMetaData getMetaData(long messageId) throws SQLException { - Connection conn = newConnection(); + Connection conn = newAutoCommitConnection(); try { PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA); @@ -1191,7 +1175,8 @@ public class DerbyMessageStore implements MessageStore if(rs.next()) { - + stmt.close(); + Blob dataAsBlob = rs.getBlob(1); byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length()); @@ -1205,6 +1190,8 @@ public class DerbyMessageStore implements MessageStore } else { + stmt.close(); + throw new RuntimeException("Meta data not found for message with id " + messageId); } @@ -1218,18 +1205,13 @@ public class DerbyMessageStore implements MessageStore private void addContent(Connection conn, long messageId, int offset, ByteBuffer src) { - - + if(_logger.isDebugEnabled()) + { + _logger.debug("Adding content chunk offset " + offset + " for message " +messageId); + } try { - final boolean newConnection = conn == null; - - if(newConnection) - { - conn = newConnection(); - } - src = src.slice(); byte[] chunkData = new byte[src.limit()]; @@ -1249,12 +1231,7 @@ public class DerbyMessageStore implements MessageStore ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); stmt.setBinaryStream(4, bis, chunkData.length); stmt.executeUpdate(); - - if(newConnection) - { - conn.commit(); - conn.close(); - } + stmt.close(); } catch (SQLException e) { @@ -1270,10 +1247,9 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); + throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e, e); } - } @@ -1284,7 +1260,7 @@ public class DerbyMessageStore implements MessageStore try { - conn = newConnection(); + conn = newAutoCommitConnection(); PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); stmt.setLong(1,messageId); @@ -1317,6 +1293,7 @@ public class DerbyMessageStore implements MessageStore } } + stmt.close(); conn.close(); return written; @@ -1335,7 +1312,7 @@ public class DerbyMessageStore implements MessageStore } } - throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e); + throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e, e); } @@ -1478,12 +1455,21 @@ public class DerbyMessageStore implements MessageStore { if(_conn != null) { + if(_logger.isDebugEnabled()) + { + _logger.debug("Flushing message " + _messageId + " to store"); + } + _conn.commit(); _conn.close(); } } catch (SQLException e) { + if(_logger.isDebugEnabled()) + { + _logger.debug("Error when trying to flush message " + _messageId + " to store: " + e); + } throw new RuntimeException(e); } finally diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index d6f9146742..a1d0f62348 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -167,9 +167,11 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa serverMessage = new MessageTransferMessage(message, null); break; default: - throw new RuntimeException("Unknown message type retreived from store " + message.getMetaData().getClass()); + throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); } + //_logger.debug("Recovered message with id " + serverMessage); + _recoveredMessages.put(message.getMessageNumber(), serverMessage); _unusedMessages.put(message.getMessageNumber(), message); @@ -222,7 +224,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName)); if (queue == null) { - _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: " + _logger.error("Unknown queue: " + queueName + " cannot be bound to exchange: " + exchange.getNameShortString()); } else @@ -302,7 +304,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } else { - _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getNameShortString() + " is unknwon, entry will be discarded"); + _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); TransactionLog.Transaction txn = _transactionLog.newTransaction(); txn.dequeueMessage(queue, messageId); txn.commitTranAsync(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java index 979a27789a..bf9d0e0f7b 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java @@ -70,13 +70,13 @@ public class PersistentStoreTest extends QpidBrokerTestCase { Message msg = _consumer.receive(RECEIVE_TIMEOUT); assertNotNull("Message " + i + " not received", msg); - assertEquals("Did not recieve the expected message", i, msg.getIntProperty(INDEX)); + assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); } Message msg = _consumer.receive(100); if(msg != null) { - fail("No more messages should be received, but received message: " + msg.getIntProperty(INDEX)); + fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); } } |
