summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-06-09 09:46:11 +0000
committerRobert Gemmell <robbie@apache.org>2010-06-09 09:46:11 +0000
commit56428d040f004086fd9c22847aec7e3c3ec8fd6a (patch)
treec58591738d8f6a49e6a0b68b41d031747d5f2288 /java
parentc1af4037c6995891e88ee82da2b4454f819325ff (diff)
downloadqpid-python-56428d040f004086fd9c22847aec7e3c3ec8fd6a.tar.gz
QPID-2650: Make use of connections with auto-commit transactions disabled for metadata, content, and queue entries.
Additionally, make remaining uses of auto-commit enabled connections more visible and remove the erroneous explicit commits on these. Close completed Statements after use. Add/correct various related debug log statements. Stop adding vhost name to all environment paths, now just the default value. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@952930 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java212
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java4
3 files changed, 106 insertions, 118 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index a87cd32e37..aeec8798d5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -218,9 +218,8 @@ public class DerbyMessageStore implements MessageStore
{
initialiseDriver();
- //Update to pick up QPID_WORK and use that as the default location not just derbyDB
-
- final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB");
+ final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY,
+ System.getProperty("QPID_WORK")+"/derbyDB/" + name);
File environmentPath = new File(databasePath);
if (!environmentPath.exists())
@@ -234,7 +233,7 @@ public class DerbyMessageStore implements MessageStore
CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_STORE_LOCATION(environmentPath.getAbsolutePath()));
- createOrOpenDatabase(name, databasePath);
+ createOrOpenDatabase(databasePath);
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -245,12 +244,11 @@ public class DerbyMessageStore implements MessageStore
}
}
- private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
+ private void createOrOpenDatabase(final String environmentPath) throws SQLException
{
- //fixme this the _vhost name should not be added here.
- _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true";
+ _connectionURL = "jdbc:derby:" + environmentPath + ";create=true";
- Connection conn = newConnection();
+ Connection conn = newAutoCommitConnection();
createVersionTable(conn);
createExchangeTable(conn);
@@ -394,8 +392,7 @@ public class DerbyMessageStore implements MessageStore
private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException, AMQException
{
- Connection conn = newConnection();
-
+ Connection conn = newAutoCommitConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
@@ -425,6 +422,9 @@ public class DerbyMessageStore implements MessageStore
queues.add(queueName);
}
+
+ conn.close();
+
return queues;
}
@@ -436,8 +436,7 @@ public class DerbyMessageStore implements MessageStore
Connection conn = null;
try
{
- conn = newConnection();
-
+ conn = newAutoCommitConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
@@ -468,16 +467,12 @@ public class DerbyMessageStore implements MessageStore
private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws AMQException, SQLException
{
-
-
_logger.info("Recovering bindings...");
-
-
Connection conn = null;
try
{
- conn = newConnection();
+ conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
@@ -504,6 +499,8 @@ public class DerbyMessageStore implements MessageStore
brh.binding(exchangeName, queueName, bindingKey, buf);
}
+
+ stmt.close();
}
finally
{
@@ -544,20 +541,16 @@ public class DerbyMessageStore implements MessageStore
Connection conn = null;
try
{
-
-
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
stmt.setLong(1,messageId);
int results = stmt.executeUpdate();
-
+ stmt.close();
+
if (results == 0)
{
-
-
throw new RuntimeException("Message metadata not found for message id " + messageId);
}
- stmt.close();
if (_logger.isDebugEnabled())
{
@@ -567,8 +560,7 @@ public class DerbyMessageStore implements MessageStore
stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
results = stmt.executeUpdate();
-
-
+ stmt.close();
conn.commit();
conn.close();
@@ -588,7 +580,7 @@ public class DerbyMessageStore implements MessageStore
}
}
- throw new RuntimeException("Error removing Message with id " + messageId + " to database: " + e, e);
+ throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e, e);
}
}
@@ -603,13 +595,12 @@ public class DerbyMessageStore implements MessageStore
try
{
- conn = newConnection();
+ conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
stmt.setString(1, exchange.getNameShortString().toString());
stmt.execute();
stmt.close();
- conn.commit();
ResultSet rs = stmt.executeQuery();
@@ -622,7 +613,6 @@ public class DerbyMessageStore implements MessageStore
stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
stmt.execute();
stmt.close();
- conn.commit();
}
}
@@ -636,7 +626,7 @@ public class DerbyMessageStore implements MessageStore
}
catch (SQLException e)
{
- throw new AMQException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e, e);
+ throw new AMQException("Error adding Exchange with name " + exchange.getNameShortString() + " to database: " + e, e);
}
}
@@ -648,23 +638,19 @@ public class DerbyMessageStore implements MessageStore
try
{
- conn = newConnection();
+ conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
stmt.setString(1, exchange.getNameShortString().toString());
int results = stmt.executeUpdate();
+ stmt.close();
if(results == 0)
{
throw new AMQException("Exchange " + exchange.getNameShortString() + " not found");
}
- else
- {
- conn.commit();
- stmt.close();
- }
}
catch (SQLException e)
{
- throw new AMQException("Error writing deleting with name " + exchange.getNameShortString() + " from database: " + e, e);
+ throw new AMQException("Error deleting exchange with name " + exchange.getNameShortString() + " from database: " + e, e);
}
finally
{
@@ -690,10 +676,9 @@ public class DerbyMessageStore implements MessageStore
{
Connection conn = null;
-
try
{
- conn = newConnection();
+ conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
stmt.setString(1, exchange.getNameShortString().toString() );
@@ -726,7 +711,6 @@ public class DerbyMessageStore implements MessageStore
}
stmt.executeUpdate();
- conn.commit();
stmt.close();
}
}
@@ -761,24 +745,23 @@ public class DerbyMessageStore implements MessageStore
{
Connection conn = null;
-
try
{
- conn = newConnection();
+ conn = newAutoCommitConnection();
// exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
stmt.setString(1, exchange.getNameShortString().toString() );
stmt.setString(2, queue.getNameShortString().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
-
-
- if(stmt.executeUpdate() != 1)
+
+ int result = stmt.executeUpdate();
+ stmt.close();
+
+ if(result != 1)
{
throw new AMQException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
+ exchange.getNameShortString() + " not found");
}
- conn.commit();
- stmt.close();
}
catch (SQLException e)
{
@@ -817,7 +800,7 @@ public class DerbyMessageStore implements MessageStore
{
try
{
- Connection conn = newConnection();
+ Connection conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
stmt.setString(1, queue.getNameShortString().toString());
@@ -849,11 +832,8 @@ public class DerbyMessageStore implements MessageStore
stmt.setBinaryStream(4,bis,underlying.length);
stmt.execute();
-
stmt.close();
- conn.commit();
-
conn.close();
}
}
@@ -864,9 +844,27 @@ public class DerbyMessageStore implements MessageStore
}
}
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions enabled.
+ */
+ private Connection newAutoCommitConnection() throws SQLException
+ {
+ final Connection connection = newConnection();
+ connection.setAutoCommit(true);
+
+ return connection;
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions disabled.
+ */
private Connection newConnection() throws SQLException
{
final Connection connection = DriverManager.getConnection(_connectionURL);
+ connection.setAutoCommit(false);
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
return connection;
}
@@ -876,22 +874,18 @@ public class DerbyMessageStore implements MessageStore
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
Connection conn = null;
-
try
{
- conn = newConnection();
+ conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
stmt.setString(1, name.toString());
int results = stmt.executeUpdate();
-
-
+ stmt.close();
+
if (results == 0)
{
throw new AMQException("Queue " + name + " not found");
}
-
- conn.commit();
- stmt.close();
}
catch (SQLException e)
{
@@ -930,16 +924,16 @@ public class DerbyMessageStore implements MessageStore
try
{
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
- stmt.setString(1,name);
- stmt.setLong(2,messageId);
- stmt.executeUpdate();
- connWrapper.requiresCommit();
-
if (_logger.isDebugEnabled())
{
_logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
}
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+ stmt.setString(1,name);
+ stmt.setLong(2,messageId);
+ stmt.executeUpdate();
+ stmt.close();
}
catch (SQLException e)
{
@@ -964,8 +958,7 @@ public class DerbyMessageStore implements MessageStore
stmt.setString(1,name);
stmt.setLong(2,messageId);
int results = stmt.executeUpdate();
-
- connWrapper.requiresCommit();
+ stmt.close();
if(results != 1)
{
@@ -989,23 +982,12 @@ public class DerbyMessageStore implements MessageStore
private static final class ConnectionWrapper
{
private final Connection _connection;
- private boolean _requiresCommit;
public ConnectionWrapper(Connection conn)
{
_connection = conn;
}
- public void setRequiresCommit()
- {
- _requiresCommit = true;
- }
-
- public boolean requiresCommit()
- {
- return _requiresCommit;
- }
-
public Connection getConnection()
{
return _connection;
@@ -1071,11 +1053,7 @@ public class DerbyMessageStore implements MessageStore
try
{
Connection conn = connWrapper.getConnection();
- if(connWrapper.requiresCommit())
- {
- conn.rollback();
- }
-
+ conn.rollback();
conn.close();
}
catch (SQLException e)
@@ -1094,6 +1072,11 @@ public class DerbyMessageStore implements MessageStore
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
throws SQLException
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Adding metadata for message " +messageId);
+ }
+
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
stmt.setLong(1,messageId);
@@ -1107,8 +1090,13 @@ public class DerbyMessageStore implements MessageStore
metaData.writeToBuffer(0, buf);
ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
stmt.setBinaryStream(2,bis,underlying.length);
- stmt.executeUpdate();
-
+ int result = stmt.executeUpdate();
+ stmt.close();
+
+ if(result == 0)
+ {
+ throw new RuntimeException("Unable to add meta data for message " +messageId);
+ }
}
@@ -1116,7 +1104,7 @@ public class DerbyMessageStore implements MessageStore
private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
{
- Connection conn = newConnection();
+ Connection conn = newAutoCommitConnection();
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
@@ -1144,8 +1132,6 @@ public class DerbyMessageStore implements MessageStore
StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
messageHandler.message(message);
-
-
}
_messageId.set(maxId);
@@ -1157,14 +1143,13 @@ public class DerbyMessageStore implements MessageStore
private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
{
- Connection conn = newConnection();
+ Connection conn = newAutoCommitConnection();
TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
-
while(rs.next())
{
@@ -1172,17 +1157,16 @@ public class DerbyMessageStore implements MessageStore
long messageId = rs.getLong(2);
queueEntryHandler.queueEntry(queueName,messageId);
}
-
-
+
+ stmt.close();
queueEntryHandler.completeQueueEntryRecovery();
-
}
StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
- Connection conn = newConnection();
+ Connection conn = newAutoCommitConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
@@ -1191,7 +1175,8 @@ public class DerbyMessageStore implements MessageStore
if(rs.next())
{
-
+ stmt.close();
+
Blob dataAsBlob = rs.getBlob(1);
byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
@@ -1205,6 +1190,8 @@ public class DerbyMessageStore implements MessageStore
}
else
{
+ stmt.close();
+
throw new RuntimeException("Meta data not found for message with id " + messageId);
}
@@ -1218,18 +1205,13 @@ public class DerbyMessageStore implements MessageStore
private void addContent(Connection conn, long messageId, int offset, ByteBuffer src)
{
-
-
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Adding content chunk offset " + offset + " for message " +messageId);
+ }
try
{
- final boolean newConnection = conn == null;
-
- if(newConnection)
- {
- conn = newConnection();
- }
-
src = src.slice();
byte[] chunkData = new byte[src.limit()];
@@ -1249,12 +1231,7 @@ public class DerbyMessageStore implements MessageStore
ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
stmt.setBinaryStream(4, bis, chunkData.length);
stmt.executeUpdate();
-
- if(newConnection)
- {
- conn.commit();
- conn.close();
- }
+ stmt.close();
}
catch (SQLException e)
{
@@ -1270,10 +1247,9 @@ public class DerbyMessageStore implements MessageStore
}
}
- throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+ throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e, e);
}
-
}
@@ -1284,7 +1260,7 @@ public class DerbyMessageStore implements MessageStore
try
{
- conn = newConnection();
+ conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
@@ -1317,6 +1293,7 @@ public class DerbyMessageStore implements MessageStore
}
}
+ stmt.close();
conn.close();
return written;
@@ -1335,7 +1312,7 @@ public class DerbyMessageStore implements MessageStore
}
}
- throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+ throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e, e);
}
@@ -1478,12 +1455,21 @@ public class DerbyMessageStore implements MessageStore
{
if(_conn != null)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Flushing message " + _messageId + " to store");
+ }
+
_conn.commit();
_conn.close();
}
}
catch (SQLException e)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Error when trying to flush message " + _messageId + " to store: " + e);
+ }
throw new RuntimeException(e);
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index d6f9146742..a1d0f62348 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -167,9 +167,11 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
serverMessage = new MessageTransferMessage(message, null);
break;
default:
- throw new RuntimeException("Unknown message type retreived from store " + message.getMetaData().getClass());
+ throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
}
+ //_logger.debug("Recovered message with id " + serverMessage);
+
_recoveredMessages.put(message.getMessageNumber(), serverMessage);
_unusedMessages.put(message.getMessageNumber(), message);
@@ -222,7 +224,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
if (queue == null)
{
- _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
+ _logger.error("Unknown queue: " + queueName + " cannot be bound to exchange: "
+ exchange.getNameShortString());
}
else
@@ -302,7 +304,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
else
{
- _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getNameShortString() + " is unknwon, entry will be discarded");
+ _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
TransactionLog.Transaction txn = _transactionLog.newTransaction();
txn.dequeueMessage(queue, messageId);
txn.commitTranAsync();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
index 979a27789a..bf9d0e0f7b 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
@@ -70,13 +70,13 @@ public class PersistentStoreTest extends QpidBrokerTestCase
{
Message msg = _consumer.receive(RECEIVE_TIMEOUT);
assertNotNull("Message " + i + " not received", msg);
- assertEquals("Did not recieve the expected message", i, msg.getIntProperty(INDEX));
+ assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX));
}
Message msg = _consumer.receive(100);
if(msg != null)
{
- fail("No more messages should be received, but received message: " + msg.getIntProperty(INDEX));
+ fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX));
}
}