summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-04-22 11:15:21 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-04-22 11:15:21 +0000
commitebf722ae2d391281275da1f830d0b35982cb188a (patch)
tree07e6f7b8dc25ef2f2e2e06d178cee2f776cb5870
parent79d512cb5a0cde119039f0b72459a9b37eaafcaa (diff)
downloadqpid-python-ebf722ae2d391281275da1f830d0b35982cb188a.tar.gz
QPID-4763 : [Java Broker] Add JDBC store
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1470454 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java2040
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java2056
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java396
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java41
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory3
5 files changed, 2548 insertions, 1988 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
new file mode 100644
index 0000000000..3428e5735e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -0,0 +1,2040 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+
+abstract public class AbstractJDBCMessageStore implements MessageStore
+{
+ private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+
+ private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
+
+ private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA";
+ private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
+
+ private static final String LINKS_TABLE_NAME = "QPID_LINKS";
+ private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
+
+ private static final String XID_TABLE_NAME = "QPID_XIDS";
+ private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
+
+ private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+
+ private static final int DB_VERSION = 6;
+
+ private final AtomicLong _messageId = new AtomicLong(0);
+ private AtomicBoolean _closed = new AtomicBoolean(false);
+
+ protected String _connectionURL;
+
+
+ private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
+ private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+
+ private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
+ private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
+ private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
+ private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
+ + "( message_id, content ) values (?, ?)";
+ private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
+ + " WHERE message_id = ?";
+ private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
+ + " WHERE message_id = ?";
+
+ private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";
+ private static final String SELECT_FROM_META_DATA =
+ "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+ private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+
+ private static final String SELECT_FROM_LINKS =
+ "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
+ private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
+ + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
+ + "arguments FROM " + LINKS_TABLE_NAME;
+ private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
+ + " id_msb = ?";
+ private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
+ + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
+ private static final String SELECT_FROM_BRIDGES =
+ "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
+ + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
+ + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
+ + " create_time,"
+ + " link_id_lsb, link_id_msb, "
+ + "arguments FROM " + BRIDGES_TABLE_NAME
+ + " WHERE link_id_lsb = ? and link_id_msb = ?";
+ private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
+ " WHERE id_lsb = ? and id_msb = ?";
+ private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
+ + "create_time, "
+ + "link_id_lsb, link_id_msb, "
+ + "arguments )"
+ + " values (?, ?, ?, ?, ?, ?)";
+
+ private static final String INSERT_INTO_XIDS =
+ "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
+ private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
+ private static final String INSERT_INTO_XID_ACTIONS =
+ "INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " +
+ "queue_id, message_id ) values (?,?,?,?,?,?) ";
+ private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String SELECT_ALL_FROM_XID_ACTIONS =
+ "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
+ " WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id, object_type, attributes) VALUES (?,?,?)";
+ private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " set object_type =?, attributes = ? where id = ?";
+ private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
+
+ protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ protected final EventManager _eventManager = new EventManager();
+
+ protected final StateManager _stateManager;
+
+ private MessageStoreRecoveryHandler _messageRecoveryHandler;
+ private TransactionLogRecoveryHandler _tlogRecoveryHandler;
+ private ConfigurationRecoveryHandler _configRecoveryHandler;
+
+ public AbstractJDBCMessageStore()
+ {
+ _stateManager = new StateManager(_eventManager);
+ }
+
+ private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
+
+ @Override
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler configRecoveryHandler,
+ Configuration storeConfiguration) throws Exception
+ {
+ _stateManager.attainState(State.INITIALISING);
+ _configRecoveryHandler = configRecoveryHandler;
+
+ commonConfiguration(name, storeConfiguration);
+
+ }
+
+ @Override
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration storeConfiguration) throws Exception
+ {
+ _tlogRecoveryHandler = tlogRecoveryHandler;
+ _messageRecoveryHandler = recoveryHandler;
+
+ _stateManager.attainState(State.INITIALISED);
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ _stateManager.attainState(State.ACTIVATING);
+
+ // this recovers durable exchanges, queues, and bindings
+ recoverConfiguration(_configRecoveryHandler);
+ recoverMessages(_messageRecoveryHandler);
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
+ recoverXids(dtxrh);
+
+ _stateManager.attainState(State.ACTIVE);
+ }
+
+ private void commonConfiguration(String name, Configuration storeConfiguration)
+ throws ClassNotFoundException, SQLException
+ {
+ implementationSpecificConfiguration(name, storeConfiguration);
+ createOrOpenDatabase();
+
+ }
+
+ protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException;
+
+ abstract protected Logger getLogger();
+
+ abstract protected String getSqlBlobType();
+
+ abstract protected String getSqlVarBinaryType(int size);
+
+ abstract protected String getSqlBigIntType();
+
+ protected void createOrOpenDatabase() throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+
+ createVersionTable(conn);
+ createConfiguredObjectsTable(conn);
+ createQueueEntryTable(conn);
+ createMetaDataTable(conn);
+ createMessageContentTable(conn);
+ createLinkTable(conn);
+ createBridgeTable(conn);
+ createXidTable(conn);
+ createXidActionTable(conn);
+ conn.close();
+ }
+
+ private void createVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(DB_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_DB_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
+ try
+ {
+ pstmt.setInt(1, DB_VERSION);
+ pstmt.execute();
+ }
+ finally
+ {
+ pstmt.close();
+ }
+ }
+
+ }
+
+ private void createConfiguredObjectsTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createQueueEntryTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id "
+ + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+ private void createMetaDataTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(META_DATA_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "
+ + META_DATA_TABLE_NAME
+ + " ( message_id "
+ + getSqlBigIntType()
+ + " not null, meta_data "
+ + getSqlBlobType()
+ + ", PRIMARY KEY ( message_id ) )");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+ private void createMessageContentTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "
+ + MESSAGE_CONTENT_TABLE_NAME
+ + " ( message_id "
+ + getSqlBigIntType()
+ + " not null, content "
+ + getSqlBlobType()
+ + ", PRIMARY KEY (message_id) )");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ }
+
+ private void createLinkTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(LINKS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "+ LINKS_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
+ + " id_msb " + getSqlBigIntType() + " not null,"
+ + " create_time " + getSqlBigIntType() + " not null,"
+ + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createBridgeTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(BRIDGES_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "+ BRIDGES_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
+ + " id_msb " + getSqlBigIntType() + " not null,"
+ + " create_time " + getSqlBigIntType() + " not null,"
+ + " link_id_lsb " + getSqlBigIntType() + " not null,"
+ + " link_id_msb " + getSqlBigIntType() + " not null,"
+ + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createXidTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(XID_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE "
+ + XID_TABLE_NAME
+ + " ( format " + getSqlBigIntType() + " not null,"
+ + " global_id "
+ + getSqlVarBinaryType(64)
+ + ", branch_id "
+ + getSqlVarBinaryType(64)
+ + " , PRIMARY KEY ( format, "
+ +
+ "global_id, branch_id ))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createXidActionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + XID_ACTIONS_TABLE_NAME + " ( format " + getSqlBigIntType() + " not null,"
+ + " global_id " + getSqlVarBinaryType(64) + " not null, branch_id " + getSqlVarBinaryType(
+ 64) + " not null, " +
+ "action_type char not null, queue_id varchar(36) not null, message_id " + getSqlBigIntType() + " not null" +
+ ", PRIMARY KEY ( " +
+ "format, global_id, branch_id, action_type, queue_id, message_id))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ DatabaseMetaData metaData = conn.getMetaData();
+ ResultSet rs = metaData.getTables(null, null, "%", null);
+
+ try
+ {
+
+ while(rs.next())
+ {
+ final String table = rs.getString(3);
+ if(tableName.equalsIgnoreCase(table))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+
+ protected void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+ {
+ try
+ {
+ List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
+
+ ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
+ _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
+
+ QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+ BindingRecoveryHandler brh = qrh.completeQueueRecovery();
+ _configuredObjectHelper.recoverBindings(brh, configuredObjects);
+
+ brh.completeBindingRecovery();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ _closed.getAndSet(true);
+ _stateManager.attainState(State.CLOSING);
+
+ doClose();
+
+ _stateManager.attainState(State.CLOSED);
+ }
+
+
+ protected abstract void doClose() throws Exception;
+
+ @Override
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
+ {
+ if(metaData.isPersistent())
+ {
+ return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
+ }
+ else
+ {
+ return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
+ }
+ }
+
+ public StoredMessage getMessage(long messageNumber)
+ {
+ return null;
+ }
+
+ public void removeMessage(long messageId)
+ {
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+ int results = stmt.executeUpdate();
+ stmt.close();
+
+ if (results == 0)
+ {
+ getLogger().warn("Message metadata not found for message id " + messageId);
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Deleted metadata for message " + messageId);
+ }
+
+ stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ conn.commit();
+ }
+ catch(SQLException e)
+ {
+ try
+ {
+ conn.rollback();
+ }
+ catch(SQLException t)
+ {
+ // ignore - we are re-throwing underlying exception
+ }
+
+ throw e;
+
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public void createExchange(Exchange exchange) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
+ insertConfiguredObject(configuredObject);
+ }
+
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange) throws AMQStoreException
+ {
+ int results = removeConfiguredObject(exchange.getId());
+ if (results == 0)
+ {
+ throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found");
+ }
+ }
+
+ @Override
+ public void bindQueue(Binding binding)
+ throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
+ insertConfiguredObject(configuredObject);
+ }
+ }
+
+ @Override
+ public void unbindQueue(Binding binding)
+ throws AMQStoreException
+ {
+ int results = removeConfiguredObject(binding.getId());
+ if (results == 0)
+ {
+ throw new AMQStoreException("Binding " + binding + " not found");
+ }
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ createQueue(queue, null);
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+ {
+ getLogger().debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
+ insertConfiguredObject(queueConfiguredObject);
+ }
+ }
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * NOTE: Currently only updates the exclusivity.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
+ */
+ @Override
+ public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId());
+ if (queueConfiguredObject != null)
+ {
+ ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject);
+ updateConfiguredObject(newQueueRecord);
+ }
+ }
+
+ }
+
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions enabled.
+ */
+ protected Connection newAutoCommitConnection() throws SQLException
+ {
+ final Connection connection = newConnection();
+ try
+ {
+ connection.setAutoCommit(true);
+ }
+ catch (SQLException sqlEx)
+ {
+
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+
+ return connection;
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions disabled.
+ */
+ protected Connection newConnection() throws SQLException
+ {
+ final Connection connection = DriverManager.getConnection(_connectionURL);
+ try
+ {
+ connection.setAutoCommit(false);
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ }
+ catch (SQLException sqlEx)
+ {
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+ return connection;
+ }
+
+ @Override
+ public void removeQueue(final AMQQueue queue) throws AMQStoreException
+ {
+ AMQShortString name = queue.getNameShortString();
+ getLogger().debug("public void removeQueue(AMQShortString name = " + name + "): called");
+ int results = removeConfiguredObject(queue.getId());
+ if (results == 0)
+ {
+ throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found");
+ }
+ }
+
+ private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
+ {
+ byte[] argumentBytes;
+ if(arguments == null)
+ {
+ argumentBytes = new byte[0];
+ }
+ else
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+
+ try
+ {
+ dos.writeInt(arguments.size());
+ for(Map.Entry<String,String> arg : arguments.entrySet())
+ {
+ dos.writeUTF(arg.getKey());
+ dos.writeUTF(arg.getValue());
+ }
+ }
+ catch (IOException e)
+ {
+ // This should never happen
+ throw new AMQStoreException(e.getMessage(), e);
+ }
+ argumentBytes = bos.toByteArray();
+ }
+ return argumentBytes;
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new JDBCTransaction();
+ }
+
+ public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Enqueuing message "
+ + messageId
+ + " on queue "
+ + (queue instanceof AMQQueue
+ ? ((AMQQueue) queue).getName()
+ : "")
+ + queue.getId()
+ + "[Connection"
+ + conn
+ + "]");
+ }
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+ try
+ {
+ stmt.setString(1, queue.getId().toString());
+ stmt.setLong(2,messageId);
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+ + " to database", e);
+ }
+
+ }
+
+ public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ {
+
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
+ try
+ {
+ stmt.setString(1, queue.getId().toString());
+ stmt.setLong(2, messageId);
+ int results = stmt.executeUpdate();
+
+
+
+ if(results != 1)
+ {
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId());
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
+ ? ((AMQQueue) queue).getName()
+ : "")
+ + " with id " + queue.getId());
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Failed to dequeue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId() + " from database", e);
+ }
+
+ }
+
+ private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
+ throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2,globalId);
+ stmt.setBytes(3,branchId);
+ int results = stmt.executeUpdate();
+
+
+
+ if(results != 1)
+ {
+ throw new AMQStoreException("Unable to find message with xid");
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2,globalId);
+ stmt.setBytes(3,branchId);
+ int results = stmt.executeUpdate();
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Failed to dequeue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting enqueued message with xid", e);
+ }
+
+ }
+
+ private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
+ Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException
+ {
+ Connection conn = connWrapper.getConnection();
+
+
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS);
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2, globalId);
+ stmt.setBytes(3, branchId);
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
+
+ try
+ {
+ stmt.setLong(1,format);
+ stmt.setBytes(2, globalId);
+ stmt.setBytes(3, branchId);
+
+ if(enqueues != null)
+ {
+ stmt.setString(4, "E");
+ for(Transaction.Record record : enqueues)
+ {
+ stmt.setString(5, record.getQueue().getId().toString());
+ stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.executeUpdate();
+ }
+ }
+
+ if(dequeues != null)
+ {
+ stmt.setString(4, "D");
+ for(Transaction.Record record : dequeues)
+ {
+ stmt.setString(5, record.getQueue().getId().toString());
+ stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.executeUpdate();
+ }
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing xid ", e);
+ }
+
+ }
+
+ private static final class ConnectionWrapper
+ {
+ private final Connection _connection;
+
+ public ConnectionWrapper(Connection conn)
+ {
+ _connection = conn;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+ }
+
+
+ public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+
+ try
+ {
+ Connection conn = connWrapper.getConnection();
+ conn.commit();
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("commit tran completed");
+ }
+
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
+ }
+ finally
+ {
+
+ }
+ }
+
+ public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+ commitTran(connWrapper);
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException
+ {
+ if (connWrapper == null)
+ {
+ throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran");
+ }
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("abort tran called: " + connWrapper.getConnection());
+ }
+
+ try
+ {
+ Connection conn = connWrapper.getConnection();
+ conn.rollback();
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
+ }
+
+ }
+
+ public Long getNewMessageId()
+ {
+ return _messageId.incrementAndGet();
+ }
+
+ private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
+ throws SQLException
+ {
+ if(getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Adding metadata for message " + messageId);
+ }
+
+ PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+ try
+ {
+ stmt.setBinaryStream(2,bis,underlying.length);
+ int result = stmt.executeUpdate();
+
+ if(result == 0)
+ {
+ throw new RuntimeException("Unable to add meta data for message " +messageId);
+ }
+ }
+ finally
+ {
+ try
+ {
+ bis.close();
+ }
+ catch (IOException e)
+ {
+
+ throw new SQLException(e);
+ }
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+
+ protected void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
+ try
+ {
+
+ long maxId = 0;
+
+ while(rs.next())
+ {
+
+ long messageId = rs.getLong(1);
+ if(messageId > maxId)
+ {
+ maxId = messageId;
+ }
+
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+ StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+ messageHandler.message(message);
+ }
+
+ _messageId.set(maxId);
+
+ messageHandler.completeMessageRecovery();
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+
+ protected TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ try
+ {
+ while(rs.next())
+ {
+
+ String id = rs.getString(1);
+ long messageId = rs.getLong(2);
+ queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ return queueEntryHandler.completeQueueEntryRecovery();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ private static final class Xid
+ {
+
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public Xid(long format, byte[] globalId, byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+ }
+
+ private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage
+ {
+
+ private long _messageNumber;
+ private UUID _queueId;
+
+ public RecordImpl(UUID queueId, long messageNumber)
+ {
+ _messageNumber = messageNumber;
+ _queueId = queueId;
+ }
+
+ @Override
+ public TransactionLogResource getQueue()
+ {
+ return this;
+ }
+
+ @Override
+ public EnqueableMessage getMessage()
+ {
+ return this;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public StoredMessage getStoredMessage()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _queueId;
+ }
+ }
+
+ protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ List<Xid> xids = new ArrayList<Xid>();
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
+ try
+ {
+ while(rs.next())
+ {
+
+ long format = rs.getLong(1);
+ byte[] globalId = rs.getBytes(2);
+ byte[] branchId = rs.getBytes(3);
+ xids.add(new Xid(format, globalId, branchId));
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+
+
+ for(Xid xid : xids)
+ {
+ List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+ List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+ PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+ try
+ {
+ pstmt.setLong(1, xid.getFormat());
+ pstmt.setBytes(2, xid.getGlobalId());
+ pstmt.setBytes(3, xid.getBranchId());
+
+ ResultSet rs = pstmt.executeQuery();
+ try
+ {
+ while(rs.next())
+ {
+
+ String actionType = rs.getString(1);
+ UUID queueId = UUID.fromString(rs.getString(2));
+ long messageId = rs.getLong(3);
+
+ RecordImpl record = new RecordImpl(queueId, messageId);
+ List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+ records.add(record);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ pstmt.close();
+ }
+
+ dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()]));
+ }
+
+
+ dtxrh.completeDtxRecordRecovery();
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ StorableMessageMetaData getMetaData(long messageId) throws SQLException
+ {
+
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
+ try
+ {
+ stmt.setLong(1,messageId);
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ if(rs.next())
+ {
+ byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+
+ return metaData;
+ }
+ else
+ {
+ throw new RuntimeException("Meta data not found for message with id " + messageId);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException;
+
+ private void addContent(Connection conn, long messageId, ByteBuffer src)
+ {
+ if(getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Adding content for message " + messageId);
+ }
+ PreparedStatement stmt = null;
+
+ try
+ {
+ src = src.slice();
+
+ byte[] chunkData = new byte[src.limit()];
+ src.duplicate().get(chunkData);
+
+ stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+ stmt.setBinaryStream(2, bis, chunkData.length);
+ stmt.executeUpdate();
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ }
+
+ }
+
+ public int getContent(long messageId, int offset, ByteBuffer dst)
+ {
+ Connection conn = null;
+ PreparedStatement stmt = null;
+
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ ResultSet rs = stmt.executeQuery();
+
+ int written = 0;
+
+ if (rs.next())
+ {
+
+ byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+ int size = dataAsBytes.length;
+
+ if (offset > size)
+ {
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
+
+ }
+
+ written = size - offset;
+ if(written > dst.remaining())
+ {
+ written = dst.remaining();
+ }
+
+ dst.put(dataAsBytes, offset, written);
+ }
+
+ return written;
+
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closeConnection(conn);
+ }
+
+
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+
+ protected class JDBCTransaction implements Transaction
+ {
+ private final ConnectionWrapper _connWrapper;
+ private int _storeSizeIncrease;
+
+
+ protected JDBCTransaction()
+ {
+ try
+ {
+ _connWrapper = new ConnectionWrapper(newConnection());
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ final StoredMessage storedMessage = message.getStoredMessage();
+ if(storedMessage instanceof StoredJDBCMessage)
+ {
+ try
+ {
+ ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
+ }
+ }
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
+ }
+
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
+
+ }
+
+ @Override
+ public void commitTran() throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.commitTran(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
+ }
+
+ @Override
+ public StoreFuture commitTranAsync() throws AMQStoreException
+ {
+ final StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
+ return storeFuture;
+ }
+
+ @Override
+ public void abortTran() throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.abortTran(_connWrapper);
+ }
+
+ @Override
+ public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
+ }
+
+ @Override
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ throws AMQStoreException
+ {
+ AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
+ }
+ }
+
+ private class StoredJDBCMessage implements StoredMessage
+ {
+
+ private final long _messageId;
+ private final boolean _isRecovered;
+
+ private StorableMessageMetaData _metaData;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
+
+ StoredJDBCMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, false);
+ }
+
+
+ StoredJDBCMessage(long messageId,
+ StorableMessageMetaData metaData, boolean isRecovered)
+ {
+ _messageId = messageId;
+ _isRecovered = isRecovered;
+
+ if(!_isRecovered)
+ {
+ _metaData = metaData;
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ @Override
+ public StorableMessageMetaData getMetaData()
+ {
+ StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
+ if(metaData == null)
+ {
+ try
+ {
+ metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ int length = getContent(offsetInMessage, buf);
+ buf.position(0);
+ buf.limit(length);
+ return buf;
+ }
+
+ @Override
+ public synchronized StoreFuture flushToStore()
+ {
+ Connection conn = null;
+ try
+ {
+ if(!stored())
+ {
+ conn = newConnection();
+
+ store(conn);
+
+ conn.commit();
+ storedSizeChange(getMetaData().getContentSize());
+ }
+ }
+ catch (SQLException e)
+ {
+ if(getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
+ }
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ closeConnection(conn);
+ }
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ int delta = getMetaData().getContentSize();
+ AbstractJDBCMessageStore.this.removeMessage(_messageId);
+ storedSizeChange(-delta);
+ }
+
+ private synchronized void store(final Connection conn) throws SQLException
+ {
+ if (!stored())
+ {
+ try
+ {
+ storeMetaData(conn, _messageId, _metaData);
+ AbstractJDBCMessageStore.this.addContent(conn, _messageId,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+
+ if(getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Storing message " + _messageId + " to store");
+ }
+ }
+ }
+
+ private boolean stored()
+ {
+ return _metaData == null || _isRecovered;
+ }
+ }
+
+ protected void closeConnection(final Connection conn)
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Problem closing connection", e);
+ }
+ }
+ }
+
+ protected void closePreparedStatement(final PreparedStatement stmt)
+ {
+ if (stmt != null)
+ {
+ try
+ {
+ stmt.close();
+ }
+ catch(SQLException e)
+ {
+ getLogger().error("Problem closing prepared statement", e);
+ }
+ }
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+
+ private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ // If we don't have any data in the result set then we can add this configured object
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private int removeConfiguredObject(UUID id) throws AMQStoreException
+ {
+ int results = 0;
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt.setString(1, id.toString());
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ }
+ return results;
+ }
+
+ private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
+ {
+ byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
+ }
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
+ }
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException
+ {
+ ConfiguredObjectRecord result = null;
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, id.toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ String type = rs.getString(1);
+ String attributes = getBlobAsString(rs, 2);
+ result = new ConfiguredObjectRecord(id, type, attributes);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ return result;
+ }
+
+ private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
+ {
+ ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = getBlobAsString(rs, 3);
+ results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ return results;
+ }
+
+ protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
+
+ protected abstract void storedSizeChange(int storeSizeIncrease);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index e9946d1860..271b7f5551 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -21,14 +21,7 @@
package org.apache.qpid.server.store.derby;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.IOException;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.sql.Blob;
import java.sql.CallableStatement;
import java.sql.Connection;
@@ -37,1871 +30,211 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectHelper;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.EventManager;
-import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreConstants;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.State;
-import org.apache.qpid.server.store.StateManager;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
/**
* An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
* mechanism.
*
- * TODO extract the SQL statements into a generic JDBC store
*/
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
- private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
-
- private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
-
- private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA";
- private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
-
- private static final String LINKS_TABLE_NAME = "QPID_LINKS";
- private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
-
- private static final String XID_TABLE_NAME = "QPID_XIDS";
- private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
-
- private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
-
- private static final int DB_VERSION = 6;
-
-
-
- private static Class<Driver> DRIVER_CLASS;
public static final String MEMORY_STORE_LOCATION = ":memory:";
- private final AtomicLong _messageId = new AtomicLong(0);
- private AtomicBoolean _closed = new AtomicBoolean(false);
-
- private String _connectionURL;
-
private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
- private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
- private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
-
- private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_id varchar(36) not null, message_id bigint not null, PRIMARY KEY (queue_id, message_id) )";
- private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
- private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
- private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
-
-
- private static final String CREATE_META_DATA_TABLE = "CREATE TABLE " + META_DATA_TABLE_NAME
- + " ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
- private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE " + MESSAGE_CONTENT_TABLE_NAME
- + " ( message_id bigint not null, content blob , PRIMARY KEY (message_id) )";
-
- private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
- + "( message_id, content ) values (?, ?)";
- private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
- + " WHERE message_id = ?";
- private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
- + " WHERE message_id = ?";
-
- private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
- private static final String SELECT_FROM_META_DATA =
- "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
- private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
- private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
-
- private static final String CREATE_LINKS_TABLE =
- "CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null,"
- + " id_msb bigint not null,"
- + " create_time bigint not null,"
- + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
- private static final String SELECT_FROM_LINKS =
- "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
- private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
- + " WHERE id_lsb = ? and id_msb = ?";
- private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
- + "arguments FROM " + LINKS_TABLE_NAME;
- private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
- + " id_msb = ?";
- private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
- + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
-
-
- private static final String CREATE_BRIDGES_TABLE =
- "CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null,"
- + " id_msb bigint not null,"
- + " create_time bigint not null,"
- + " link_id_lsb bigint not null,"
- + " link_id_msb bigint not null,"
- + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
- private static final String SELECT_FROM_BRIDGES =
- "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
- + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
- private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
- + " WHERE id_lsb = ? and id_msb = ?";
- private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
- + " create_time,"
- + " link_id_lsb, link_id_msb, "
- + "arguments FROM " + BRIDGES_TABLE_NAME
- + " WHERE link_id_lsb = ? and link_id_msb = ?";
- private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
- " WHERE id_lsb = ? and id_msb = ?";
- private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
- + "create_time, "
- + "link_id_lsb, link_id_msb, "
- + "arguments )"
- + " values (?, ?, ?, ?, ?, ?)";
-
- private static final String CREATE_XIDS_TABLE =
- "CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null,"
- + " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " +
- "global_id, branch_id ))";
- private static final String INSERT_INTO_XIDS =
- "INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)";
- private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
- + " WHERE format = ? and global_id = ? and branch_id = ?";
- private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
-
-
- private static final String CREATE_XID_ACTIONS_TABLE =
- "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null,"
- + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " +
- "action_type char not null, queue_id varchar(36) not null, message_id bigint not null" +
- ", PRIMARY KEY ( " +
- "format, global_id, branch_id, action_type, queue_id, message_id))";
- private static final String INSERT_INTO_XID_ACTIONS =
- "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " +
- "queue_id, message_id ) values (?,?,?,?,?,?) ";
- private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
- + " WHERE format = ? and global_id = ? and branch_id = ?";
- private static final String SELECT_ALL_FROM_XID_ACTIONS =
- "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
- " WHERE format = ? and global_id = ? and branch_id = ?";
-
- private static final String CREATE_CONFIGURED_OBJECTS_TABLE = "CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
- + " ( id VARCHAR(36) not null, object_type varchar(255), attributes blob, PRIMARY KEY (id))";
- private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
- + " ( id, object_type, attributes) VALUES (?,?,?)";
- private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
- + " set object_type =?, attributes = ? where id = ?";
- private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
- + " where id = ?";
- private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
- + " where id = ?";
- private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
-
- private final Charset UTF8_CHARSET = Charset.forName("UTF-8");
-
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
public static final String TYPE = "DERBY";
- private final StateManager _stateManager;
-
- private final EventManager _eventManager = new EventManager();
-
private long _totalStoreSize;
private boolean _limitBusted;
private long _persistentSizeLowThreshold;
private long _persistentSizeHighThreshold;
- private MessageStoreRecoveryHandler _messageRecoveryHandler;
-
- private TransactionLogRecoveryHandler _tlogRecoveryHandler;
-
- private ConfigurationRecoveryHandler _configRecoveryHandler;
private String _storeLocation;
+ private Class<Driver> _driverClass;
public DerbyMessageStore()
{
- _stateManager = new StateManager(_eventManager);
}
- private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
-
- @Override
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler configRecoveryHandler,
- Configuration storeConfiguration) throws Exception
+ protected Logger getLogger()
{
- _stateManager.attainState(State.INITIALISING);
- _configRecoveryHandler = configRecoveryHandler;
-
- commonConfiguration(name, storeConfiguration);
-
+ return _logger;
}
@Override
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration storeConfiguration) throws Exception
+ protected String getSqlBlobType()
{
- _tlogRecoveryHandler = tlogRecoveryHandler;
- _messageRecoveryHandler = recoveryHandler;
-
- _stateManager.attainState(State.INITIALISED);
+ return "blob";
}
@Override
- public void activate() throws Exception
- {
- _stateManager.attainState(State.ACTIVATING);
-
- // this recovers durable exchanges, queues, and bindings
- recoverConfiguration(_configRecoveryHandler);
- recoverMessages(_messageRecoveryHandler);
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
- recoverXids(dtxrh);
-
- _stateManager.attainState(State.ACTIVE);
- }
-
- private void commonConfiguration(String name, Configuration storeConfiguration)
- throws ClassNotFoundException, SQLException
- {
- initialiseDriver();
-
- //Update to pick up QPID_WORK and use that as the default location not just derbyDB
-
- final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
- + File.separator + "derbyDB");
-
- if(!MEMORY_STORE_LOCATION.equals(databasePath))
- {
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
- }
-
- _storeLocation = databasePath;
-
- _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l);
- _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
-
- createOrOpenDatabase(name, databasePath);
-
- Connection conn = newAutoCommitConnection();;
- try
- {
- _totalStoreSize = getSizeOnDisk(conn);
- }
- finally
- {
- conn.close();
- }
- }
-
- private static synchronized void initialiseDriver() throws ClassNotFoundException
- {
- if(DRIVER_CLASS == null)
- {
- DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
- }
- }
-
- private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
- {
- //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
- _connectionURL = "jdbc:derby" + (environmentPath.equals(MEMORY_STORE_LOCATION) ? environmentPath : ":" + environmentPath + "/") + name + ";create=true";
-
- Connection conn = newAutoCommitConnection();
-
- createVersionTable(conn);
- createConfiguredObjectsTable(conn);
- createQueueEntryTable(conn);
- createMetaDataTable(conn);
- createMessageContentTable(conn);
- createLinkTable(conn);
- createBridgeTable(conn);
- createXidTable(conn);
- createXidActionTable(conn);
- conn.close();
- }
-
-
-
- private void createVersionTable(final Connection conn) throws SQLException
- {
- if(!tableExists(DB_VERSION_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_DB_VERSION_TABLE);
- }
- finally
- {
- stmt.close();
- }
-
- PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
- try
- {
- pstmt.setInt(1, DB_VERSION);
- pstmt.execute();
- }
- finally
- {
- pstmt.close();
- }
- }
-
- }
-
- private void createConfiguredObjectsTable(final Connection conn) throws SQLException
- {
- if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_CONFIGURED_OBJECTS_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
- private void createQueueEntryTable(final Connection conn) throws SQLException
- {
- if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
-
- }
-
- private void createMetaDataTable(final Connection conn) throws SQLException
- {
- if(!tableExists(META_DATA_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_META_DATA_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
-
- }
-
-
- private void createMessageContentTable(final Connection conn) throws SQLException
- {
- if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
-
- }
-
- private void createLinkTable(final Connection conn) throws SQLException
- {
- if(!tableExists(LINKS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_LINKS_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
-
- private void createBridgeTable(final Connection conn) throws SQLException
+ protected String getSqlVarBinaryType(int size)
{
- if(!tableExists(BRIDGES_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_BRIDGES_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
+ return "varchar("+size+") for bit data";
}
- private void createXidTable(final Connection conn) throws SQLException
- {
- if(!tableExists(XID_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_XIDS_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
-
- private void createXidActionTable(final Connection conn) throws SQLException
- {
- if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_XID_ACTIONS_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
- private boolean tableExists(final String tableName, final Connection conn) throws SQLException
- {
- PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
- try
- {
- stmt.setString(1, tableName);
- ResultSet rs = stmt.executeQuery();
- try
- {
- return rs.next();
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
-
- private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+ @Override
+ protected String getSqlBigIntType()
{
- try
- {
- List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
-
- ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
-
- QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
- BindingRecoveryHandler brh = qrh.completeQueueRecovery();
- _configuredObjectHelper.recoverBindings(brh, configuredObjects);
-
- brh.completeBindingRecovery();
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
+ return "bigint";
}
- @Override
- public void close() throws Exception
+ protected void doClose() throws SQLException
{
- _closed.getAndSet(true);
- _stateManager.attainState(State.CLOSING);
-
try
{
Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true");
// Shouldn't reach this point - shutdown=true should throw SQLException
conn.close();
- _logger.error("Unable to shut down the store");
+ getLogger().error("Unable to shut down the store");
}
catch (SQLException e)
{
- if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
+ if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE))
{
//expected and represents a clean shutdown of this database only, do nothing.
}
else
{
- _logger.error("Exception whilst shutting down the store: " + e);
- }
- }
-
- _stateManager.attainState(State.CLOSED);
- }
-
- @Override
- public StoredMessage addMessage(StorableMessageMetaData metaData)
- {
- if(metaData.isPersistent())
- {
- return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
- }
- }
-
- public StoredMessage getMessage(long messageNumber)
- {
- return null;
- }
-
- public void removeMessage(long messageId)
- {
- try
- {
- Connection conn = newConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
- try
- {
- stmt.setLong(1,messageId);
- int results = stmt.executeUpdate();
- stmt.close();
-
- if (results == 0)
- {
- _logger.warn("Message metadata not found for message id " + messageId);
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Deleted metadata for message " + messageId);
- }
-
- stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
- results = stmt.executeUpdate();
- }
- finally
- {
- stmt.close();
- }
- conn.commit();
- }
- catch(SQLException e)
- {
- try
- {
- conn.rollback();
- }
- catch(SQLException t)
- {
- // ignore - we are re-throwing underlying exception
- }
-
+ getLogger().error("Exception whilst shutting down the store: " + e);
throw e;
-
- }
- finally
- {
- conn.close();
}
}
- catch (SQLException e)
- {
- throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
- }
-
- }
-
- @Override
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
- insertConfiguredObject(configuredObject);
- }
-
- }
-
- @Override
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- int results = removeConfiguredObject(exchange.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found");
- }
- }
-
- @Override
- public void bindQueue(Binding binding)
- throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
- insertConfiguredObject(configuredObject);
- }
}
@Override
- public void unbindQueue(Binding binding)
- throws AMQStoreException
+ protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration)
+ throws ClassNotFoundException
{
- int results = removeConfiguredObject(binding.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Binding " + binding + " not found");
- }
- }
-
- @Override
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
- }
-
- @Override
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
-
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
- insertConfiguredObject(queueConfiguredObject);
- }
- }
-
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * NOTE: Currently only updates the exclusivity.
- *
- * @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- @Override
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId());
- if (queueConfiguredObject != null)
- {
- ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject);
- updateConfiguredObject(newQueueRecord);
- }
- }
-
- }
-
- /**
- * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
- * isolation and with auto-commit transactions enabled.
- */
- private Connection newAutoCommitConnection() throws SQLException
- {
- final Connection connection = newConnection();
- try
- {
- connection.setAutoCommit(true);
- }
- catch (SQLException sqlEx)
- {
-
- try
- {
- connection.close();
- }
- finally
- {
- throw sqlEx;
- }
- }
+ //Update to pick up QPID_WORK and use that as the default location not just derbyDB
- return connection;
- }
+ _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
- /**
- * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
- * isolation and with auto-commit transactions disabled.
- */
- private Connection newConnection() throws SQLException
- {
- final Connection connection = DriverManager.getConnection(_connectionURL);
- try
- {
- connection.setAutoCommit(false);
- connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
- }
- catch (SQLException sqlEx)
- {
- try
- {
- connection.close();
- }
- finally
- {
- throw sqlEx;
- }
- }
- return connection;
- }
-
- @Override
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- AMQShortString name = queue.getNameShortString();
- _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
- int results = removeConfiguredObject(queue.getId());
- if (results == 0)
- {
- throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found");
- }
- }
+ final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
+ + File.separator + "derbyDB");
- private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
- {
- byte[] argumentBytes;
- if(arguments == null)
- {
- argumentBytes = new byte[0];
- }
- else
+ if(!MEMORY_STORE_LOCATION.equals(databasePath))
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
-
- try
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
{
- dos.writeInt(arguments.size());
- for(Map.Entry<String,String> arg : arguments.entrySet())
+ if (!environmentPath.mkdirs())
{
- dos.writeUTF(arg.getKey());
- dos.writeUTF(arg.getValue());
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
}
}
- catch (IOException e)
- {
- // This should never happen
- throw new AMQStoreException(e.getMessage(), e);
- }
- argumentBytes = bos.toByteArray();
}
- return argumentBytes;
- }
-
-
-
- @Override
- public Transaction newTransaction()
- {
- return new DerbyTransaction();
- }
-
- public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
- {
- Connection conn = connWrapper.getConnection();
+ _storeLocation = databasePath;
- try
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + queue.getId()+ "[Connection" + conn + "]");
- }
-
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
- try
- {
- stmt.setString(1, queue.getId().toString());
- stmt.setLong(2,messageId);
- stmt.executeUpdate();
- }
- finally
- {
- stmt.close();
- }
- }
- catch (SQLException e)
+ _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l);
+ _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
{
- _logger.error("Failed to enqueue: " + e.getMessage(), e);
- throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
- + " to database", e);
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
}
- }
-
- public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
- {
-
- Connection conn = connWrapper.getConnection();
-
-
- try
- {
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
- try
- {
- stmt.setString(1, queue.getId().toString());
- stmt.setLong(2,messageId);
- int results = stmt.executeUpdate();
+ //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
+ _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true";
- if(results != 1)
- {
- throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
- + " with id " + queue.getId());
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
- + " with id " + queue.getId());
- }
- }
- finally
- {
- stmt.close();
- }
- }
- catch (SQLException e)
- {
- _logger.error("Failed to dequeue: " + e.getMessage(), e);
- throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
- + " with id " + queue.getId() + " from database", e);
- }
+ _eventManager.addEventListener(new EventListener()
+ {
+ @Override
+ public void event(Event event)
+ {
+ setInitialSize();
+ }
+ }, Event.BEFORE_ACTIVATE);
}
-
- private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
- throws AMQStoreException
+ private void setInitialSize()
{
- Connection conn = connWrapper.getConnection();
-
-
+ Connection conn = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS);
- try
- {
- stmt.setLong(1,format);
- stmt.setBytes(2,globalId);
- stmt.setBytes(3,branchId);
- int results = stmt.executeUpdate();
-
-
-
- if(results != 1)
- {
- throw new AMQStoreException("Unable to find message with xid");
- }
- }
- finally
- {
- stmt.close();
- }
-
- stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS);
- try
- {
- stmt.setLong(1,format);
- stmt.setBytes(2,globalId);
- stmt.setBytes(3,branchId);
- int results = stmt.executeUpdate();
-
- }
- finally
- {
- stmt.close();
- }
-
- }
- catch (SQLException e)
- {
- _logger.error("Failed to dequeue: " + e.getMessage(), e);
- throw new AMQStoreException("Error deleting enqueued message with xid", e);
- }
- }
-
-
- private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
- Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException
- {
- Connection conn = connWrapper.getConnection();
-
-
- try
- {
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS);
try
{
- stmt.setLong(1,format);
- stmt.setBytes(2, globalId);
- stmt.setBytes(3, branchId);
- stmt.executeUpdate();
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
}
finally
{
- stmt.close();
- }
-
- stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
-
- try
- {
- stmt.setLong(1,format);
- stmt.setBytes(2, globalId);
- stmt.setBytes(3, branchId);
-
- if(enqueues != null)
- {
- stmt.setString(4, "E");
- for(Transaction.Record record : enqueues)
- {
- stmt.setString(5, record.getQueue().getId().toString());
- stmt.setLong(6, record.getMessage().getMessageNumber());
- stmt.executeUpdate();
- }
- }
-
- if(dequeues != null)
+ if(conn != null)
{
- stmt.setString(4, "D");
- for(Transaction.Record record : dequeues)
- {
- stmt.setString(5, record.getQueue().getId().toString());
- stmt.setLong(6, record.getMessage().getMessageNumber());
- stmt.executeUpdate();
- }
- }
-
- }
- finally
- {
- stmt.close();
- }
+ conn.close();
- }
- catch (SQLException e)
- {
- _logger.error("Failed to enqueue: " + e.getMessage(), e);
- throw new AMQStoreException("Error writing xid ", e);
- }
-
- }
- private static final class ConnectionWrapper
- {
- private final Connection _connection;
-
- public ConnectionWrapper(Connection conn)
- {
- _connection = conn;
- }
-
- public Connection getConnection()
- {
- return _connection;
- }
- }
-
-
- public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException
- {
-
- try
- {
- Connection conn = connWrapper.getConnection();
- conn.commit();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("commit tran completed");
+ }
}
-
- conn.close();
}
catch (SQLException e)
{
- throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
- }
- finally
- {
-
+ getLogger().error("Unable to set initial store size", e);
}
}
- public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException
- {
- commitTran(connWrapper);
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException
+ protected String getBlobAsString(ResultSet rs, int col) throws SQLException
{
- if (connWrapper == null)
- {
- throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran");
- }
-
- if (_logger.isDebugEnabled())
+ Blob blob = rs.getBlob(col);
+ if(blob == null)
{
- _logger.debug("abort tran called: " + connWrapper.getConnection());
+ return null;
}
-
- try
- {
- Connection conn = connWrapper.getConnection();
- conn.rollback();
- conn.close();
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
- }
-
+ byte[] bytes = blob.getBytes(1, (int)blob.length());
+ return new String(bytes, UTF8_CHARSET);
}
- public Long getNewMessageId()
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
{
- return _messageId.incrementAndGet();
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
}
- private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
- throws SQLException
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Adding metadata for message " +messageId);
- }
-
- PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
+ PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
try
{
- stmt.setLong(1,messageId);
-
- final int bodySize = 1 + metaData.getStorableSize();
- byte[] underlying = new byte[bodySize];
- underlying[0] = (byte) metaData.getType().ordinal();
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
- buf.position(1);
- buf = buf.slice();
-
- metaData.writeToBuffer(0, buf);
- ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+ stmt.setString(1, tableName);
+ ResultSet rs = stmt.executeQuery();
try
{
- stmt.setBinaryStream(2,bis,underlying.length);
- int result = stmt.executeUpdate();
-
- if(result == 0)
- {
- throw new RuntimeException("Unable to add meta data for message " +messageId);
- }
+ return rs.next();
}
finally
{
- try
- {
- bis.close();
- }
- catch (IOException e)
- {
-
- throw new SQLException(e);
- }
+ rs.close();
}
-
}
finally
{
stmt.close();
}
-
- }
-
-
-
-
- private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
- try
- {
-
- long maxId = 0;
-
- while(rs.next())
- {
-
- long messageId = rs.getLong(1);
- Blob dataAsBlob = rs.getBlob(2);
-
- if(messageId > maxId)
- {
- maxId = messageId;
- }
-
- byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
- StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
- StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
- messageHandler.message(message);
- }
-
- _messageId.set(maxId);
-
- messageHandler.completeMessageRecovery();
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
-
-
-
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
- try
- {
- while(rs.next())
- {
-
- String id = rs.getString(1);
- long messageId = rs.getLong(2);
- queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- return queueEntryHandler.completeQueueEntryRecovery();
- }
- finally
- {
- conn.close();
- }
- }
-
- private static final class Xid
- {
-
- private final long _format;
- private final byte[] _globalId;
- private final byte[] _branchId;
-
- public Xid(long format, byte[] globalId, byte[] branchId)
- {
- _format = format;
- _globalId = globalId;
- _branchId = branchId;
- }
-
- public long getFormat()
- {
- return _format;
- }
-
- public byte[] getGlobalId()
- {
- return _globalId;
- }
-
- public byte[] getBranchId()
- {
- return _branchId;
- }
- }
-
- private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage
- {
-
- private long _messageNumber;
- private UUID _queueId;
-
- public RecordImpl(UUID queueId, long messageNumber)
- {
- _messageNumber = messageNumber;
- _queueId = queueId;
- }
-
- @Override
- public TransactionLogResource getQueue()
- {
- return this;
- }
-
- @Override
- public EnqueableMessage getMessage()
- {
- return this;
- }
-
- @Override
- public long getMessageNumber()
- {
- return _messageNumber;
- }
-
- @Override
- public boolean isPersistent()
- {
- return true;
- }
-
- @Override
- public StoredMessage getStoredMessage()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public UUID getId()
- {
- return _queueId;
- }
- }
-
- private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- List<Xid> xids = new ArrayList<Xid>();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
- try
- {
- while(rs.next())
- {
-
- long format = rs.getLong(1);
- byte[] globalId = rs.getBytes(2);
- byte[] branchId = rs.getBytes(3);
- xids.add(new Xid(format, globalId, branchId));
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
-
-
- for(Xid xid : xids)
- {
- List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
- List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
- PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
-
- try
- {
- pstmt.setLong(1, xid.getFormat());
- pstmt.setBytes(2, xid.getGlobalId());
- pstmt.setBytes(3, xid.getBranchId());
-
- ResultSet rs = pstmt.executeQuery();
- try
- {
- while(rs.next())
- {
-
- String actionType = rs.getString(1);
- UUID queueId = UUID.fromString(rs.getString(2));
- long messageId = rs.getLong(3);
-
- RecordImpl record = new RecordImpl(queueId, messageId);
- List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
- records.add(record);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- pstmt.close();
- }
-
- dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- enqueues.toArray(new RecordImpl[enqueues.size()]),
- dequeues.toArray(new RecordImpl[dequeues.size()]));
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
- finally
- {
- conn.close();
- }
-
}
- StorableMessageMetaData getMetaData(long messageId) throws SQLException
- {
-
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
- try
- {
- stmt.setLong(1,messageId);
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- if(rs.next())
- {
- Blob dataAsBlob = rs.getBlob(1);
-
- byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
- StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
-
- return metaData;
- }
- else
- {
- throw new RuntimeException("Meta data not found for message with id " + messageId);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
-
-
- private void addContent(Connection conn, long messageId, ByteBuffer src)
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Adding content for message " +messageId);
- }
- PreparedStatement stmt = null;
-
- try
- {
- src = src.slice();
-
- byte[] chunkData = new byte[src.limit()];
- src.duplicate().get(chunkData);
-
- stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
- stmt.setBinaryStream(2, bis, chunkData.length);
- stmt.executeUpdate();
- }
- catch (SQLException e)
- {
- closeConnection(conn);
- throw new RuntimeException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- }
-
- }
-
-
- public int getContent(long messageId, int offset, ByteBuffer dst)
- {
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try
- {
- conn = newAutoCommitConnection();
-
- stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
- stmt.setLong(1,messageId);
- ResultSet rs = stmt.executeQuery();
-
- int written = 0;
-
- if (rs.next())
- {
-
- Blob dataAsBlob = rs.getBlob(1);
-
- final int size = (int) dataAsBlob.length();
- byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
-
- if (offset > size)
- {
- throw new RuntimeException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
-
- }
-
- written = size - offset;
- if(written > dst.remaining())
- {
- written = dst.remaining();
- }
-
- dst.put(dataAsBytes, offset, written);
- }
-
- return written;
-
- }
- catch (SQLException e)
- {
- throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
-
-
- }
-
- @Override
- public boolean isPersistent()
- {
- return true;
- }
-
-
- private class DerbyTransaction implements Transaction
- {
- private final ConnectionWrapper _connWrapper;
- private int _storeSizeIncrease;
-
-
- private DerbyTransaction()
- {
- try
- {
- _connWrapper = new ConnectionWrapper(newConnection());
- }
- catch (SQLException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
- {
- final StoredMessage storedMessage = message.getStoredMessage();
- if(storedMessage instanceof StoredDerbyMessage)
- {
- try
- {
- ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection());
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
- }
- }
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
- DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
- {
- DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
-
- }
-
- @Override
- public void commitTran() throws AMQStoreException
- {
- DerbyMessageStore.this.commitTran(_connWrapper);
- storedSizeChange(_storeSizeIncrease);
- }
-
- @Override
- public StoreFuture commitTranAsync() throws AMQStoreException
- {
- final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper);
- storedSizeChange(_storeSizeIncrease);
- return storeFuture;
- }
-
- @Override
- public void abortTran() throws AMQStoreException
- {
- DerbyMessageStore.this.abortTran(_connWrapper);
- }
-
- @Override
- public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
- {
- DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
- }
-
- @Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- throws AMQStoreException
- {
- DerbyMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
- }
- }
-
-
-
- private class StoredDerbyMessage implements StoredMessage
- {
-
- private final long _messageId;
- private final boolean _isRecovered;
-
- private StorableMessageMetaData _metaData;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private byte[] _data;
- private volatile SoftReference<byte[]> _dataRef;
-
-
- StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
- {
- this(messageId, metaData, false);
- }
-
-
- StoredDerbyMessage(long messageId,
- StorableMessageMetaData metaData, boolean isRecovered)
- {
- _messageId = messageId;
- _isRecovered = isRecovered;
-
- if(!_isRecovered)
- {
- _metaData = metaData;
- }
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- @Override
- public StorableMessageMetaData getMetaData()
- {
- StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
- if(metaData == null)
- {
- try
- {
- metaData = DerbyMessageStore.this.getMetaData(_messageId);
- }
- catch (SQLException e)
- {
- throw new RuntimeException(e);
- }
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- @Override
- public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
- {
- src = src.slice();
-
- if(_data == null)
- {
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
- }
- else
- {
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
-
- System.arraycopy(oldData,0,_data,0,oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
- }
-
- }
-
- @Override
- public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
- {
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
- }
- else
- {
- return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
- }
- }
-
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- int length = getContent(offsetInMessage, buf);
- buf.position(0);
- buf.limit(length);
- return buf;
- }
-
- @Override
- public synchronized StoreFuture flushToStore()
- {
- Connection conn = null;
- try
- {
- if(!stored())
- {
- conn = newConnection();
-
- store(conn);
-
- conn.commit();
- storedSizeChange(getMetaData().getContentSize());
- }
- }
- catch (SQLException e)
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Error when trying to flush message " + _messageId + " to store: " + e);
- }
- throw new RuntimeException(e);
- }
- finally
- {
- closeConnection(conn);
- }
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void remove()
- {
- int delta = getMetaData().getContentSize();
- DerbyMessageStore.this.removeMessage(_messageId);
- storedSizeChange(-delta);
- }
-
- private synchronized void store(final Connection conn) throws SQLException
- {
- if (!stored())
- {
- try
- {
- storeMetaData(conn, _messageId, _metaData);
- DerbyMessageStore.this.addContent(conn, _messageId,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
- }
- finally
- {
- _metaData = null;
- _data = null;
- }
-
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Storing message " + _messageId + " to store");
- }
- }
- }
-
- private boolean stored()
- {
- return _metaData == null || _isRecovered;
- }
- }
-
- private void closeConnection(final Connection conn)
- {
- if(conn != null)
- {
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- _logger.error("Problem closing connection", e);
- }
- }
- }
-
- private void closePreparedStatement(final PreparedStatement stmt)
- {
- if (stmt != null)
- {
- try
- {
- stmt.close();
- }
- catch(SQLException e)
- {
- _logger.error("Problem closing prepared statement", e);
- }
- }
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- _eventManager.addEventListener(eventListener, events);
- }
@Override
public String getStoreLocation()
@@ -1909,258 +242,7 @@ public class DerbyMessageStore implements MessageStore
return _storeLocation;
}
- private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
- {
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
- // If we don't have any data in the result set then we can add this configured object
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
- {
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- }
- }
-
- private int removeConfiguredObject(UUID id) throws AMQStoreException
- {
- int results = 0;
- try
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
- try
- {
- stmt.setString(1, id.toString());
- results = stmt.executeUpdate();
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
- }
- return results;
- }
-
- private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
- {
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
- if (rs.next())
- {
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
- {
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
- }
- finally
- {
- stmt2.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- }
- }
-
- private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException
- {
- ConfiguredObjectRecord result = null;
- try
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
- {
- stmt.setString(1, id.toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
- if (rs.next())
- {
- String type = rs.getString(1);
- Blob blob = rs.getBlob(2);
- String attributes = null;
- if (blob != null)
- {
- attributes = blobToString(blob);
- }
- result = new ConfiguredObjectRecord(id, type, attributes);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
- + e.getMessage(), e);
- }
- return result;
- }
-
- private String blobToString(Blob blob) throws SQLException
- {
- byte[] bytes = blob.getBytes(1, (int)blob.length());
- return new String(bytes, UTF8_CHARSET);
- }
-
- private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
- {
- ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
- try
- {
- ResultSet rs = stmt.executeQuery();
- try
- {
- while (rs.next())
- {
- String id = rs.getString(1);
- String objectType = rs.getString(2);
- String attributes = blobToString(rs.getBlob(3));
- results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- return results;
- }
-
- private synchronized void storedSizeChange(final int delta)
+ protected synchronized void storedSizeChange(final int delta)
{
if(getPersistentSizeHighThreshold() > 0)
{
@@ -2210,7 +292,7 @@ public class DerbyMessageStore implements MessageStore
catch (SQLException e)
{
closeConnection(conn);
- throw new RuntimeException("Exception will processing store size change", e);
+ throw new RuntimeException("Exception while processing store size change", e);
}
}
}
@@ -2336,4 +418,4 @@ public class DerbyMessageStore implements MessageStore
return TYPE;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
new file mode 100644
index 0000000000..2c4b0e8119
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -0,0 +1,396 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.sql.Blob;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.AbstractJDBCMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.Transaction;
+
+/**
+ * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence
+ * mechanism.
+ *
+ */
+public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore
+{
+
+ private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class);
+
+
+ public static final String TYPE = "JDBC";
+
+
+ private static class JDBCDetails
+ {
+ private final String _vendor;
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigintType;
+ private boolean _useBytesMethodsForBlob;
+
+ private JDBCDetails(String vendor,
+ String blobType,
+ String varBinaryType,
+ String bigIntType,
+ boolean useBytesMethodsForBlob)
+ {
+ _vendor = vendor;
+ setBlobType(blobType);
+ setVarBinaryType(varBinaryType);
+ setBigintType(bigIntType);
+ setUseBytesMethodsForBlob(useBytesMethodsForBlob);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ JDBCDetails that = (JDBCDetails) o;
+
+ if (!getVendor().equals(that.getVendor()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return getVendor().hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JDBCDetails{" +
+ "vendor='" + getVendor() + '\'' +
+ ", blobType='" + getBlobType() + '\'' +
+ ", varBinaryType='" + getVarBinaryType() + '\'' +
+ ", bigIntType='" + getBigintType() + '\'' +
+ ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() +
+ '}';
+ }
+
+ public String getVendor()
+ {
+ return _vendor;
+ }
+
+ public String getBlobType()
+ {
+ return _blobType;
+ }
+
+ public void setBlobType(String blobType)
+ {
+ _blobType = blobType;
+ }
+
+ public String getVarBinaryType()
+ {
+ return _varBinaryType;
+ }
+
+ public void setVarBinaryType(String varBinaryType)
+ {
+ _varBinaryType = varBinaryType;
+ }
+
+ public boolean isUseBytesMethodsForBlob()
+ {
+ return _useBytesMethodsForBlob;
+ }
+
+ public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob)
+ {
+ _useBytesMethodsForBlob = useBytesMethodsForBlob;
+ }
+
+ public String getBigintType()
+ {
+ return _bigintType;
+ }
+
+ public void setBigintType(String bigintType)
+ {
+ _bigintType = bigintType;
+ }
+ }
+
+ private static JDBCDetails DERBY_DETAILS =
+ new JDBCDetails("derby",
+ "blob",
+ "varchar(%d) for bit data",
+ "bigint",
+ false);
+
+ private static JDBCDetails POSTGRESQL_DETAILS =
+ new JDBCDetails("postgresql",
+ "bytea",
+ "bytea",
+ "bigint",
+ true);
+
+ private static JDBCDetails MYSQL_DETAILS =
+ new JDBCDetails("mysql",
+ "blob",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails SYBASE_DETAILS =
+ new JDBCDetails("sybase",
+ "image",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails ORACLE_DETAILS =
+ new JDBCDetails("oracle",
+ "blob",
+ "raw(%d)",
+ "number",
+ false);
+
+
+ private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>();
+
+ static
+ {
+
+ addDetails(DERBY_DETAILS);
+ addDetails(POSTGRESQL_DETAILS);
+ addDetails(MYSQL_DETAILS);
+ addDetails(SYBASE_DETAILS);
+ addDetails(ORACLE_DETAILS);
+ }
+
+ private static void addDetails(JDBCDetails details)
+ {
+ VENDOR_DETAILS.put(details.getVendor(), details);
+ }
+
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigIntType;
+ private boolean _useBytesMethodsForBlob;
+
+ private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>();
+
+
+ public JDBCMessageStore()
+ {
+ }
+
+ protected Logger getLogger()
+ {
+ return _logger;
+ }
+
+ protected String getSqlBlobType()
+ {
+ return _blobType;
+ }
+
+ protected String getSqlVarBinaryType(int size)
+ {
+ return String.format(_varBinaryType, size);
+ }
+
+ public String getSqlBigIntType()
+ {
+ return _bigIntType;
+ }
+
+ @Override
+ protected void doClose() throws AMQStoreException
+ {
+ while(!_transactions.isEmpty())
+ {
+ RecordedJDBCTransaction txn = _transactions.get(0);
+ txn.abortTran();
+ }
+ }
+
+ protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration)
+ throws ClassNotFoundException
+ {
+
+ _connectionURL = storeConfiguration.getString("connectionUrl",
+ storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
+
+ JDBCDetails details = null;
+
+ String[] components = _connectionURL.split(":",3);
+ if(components.length >= 2)
+ {
+ String vendor = components[1];
+ details = VENDOR_DETAILS.get(vendor);
+ }
+
+ if(details == null)
+ {
+ getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+
+ // TODO - is there a better default than derby
+ details = DERBY_DETAILS;
+ }
+
+ _blobType = storeConfiguration.getString("sqlBlobType",details.getBlobType());
+ _varBinaryType = storeConfiguration.getString("sqlVarbinaryType",details.getVarBinaryType());
+ _useBytesMethodsForBlob = storeConfiguration.getBoolean("useBytesForBlob",details.isUseBytesMethodsForBlob());
+ _bigIntType = storeConfiguration.getString("sqlBigIntType", details.getBigintType());
+ }
+
+ protected void storedSizeChange(int contentSize)
+ {
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return "";
+ }
+
+ @Override
+ public String getStoreType()
+ {
+ return TYPE;
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ if(_useBytesMethodsForBlob)
+ {
+ return rs.getBytes(col);
+ }
+ else
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+
+ }
+ }
+
+ @Override
+ protected String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ byte[] bytes;
+ if(_useBytesMethodsForBlob)
+ {
+ bytes = rs.getBytes(col);
+ return new String(bytes,UTF8_CHARSET);
+ }
+ else
+ {
+ Blob blob = rs.getBlob(col);
+ if(blob == null)
+ {
+ return null;
+ }
+ bytes = blob.getBytes(1, (int)blob.length());
+ }
+ return new String(bytes, UTF8_CHARSET);
+
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new RecordedJDBCTransaction();
+ }
+
+ private class RecordedJDBCTransaction extends JDBCTransaction
+ {
+ private RecordedJDBCTransaction()
+ {
+ super();
+ JDBCMessageStore.this._transactions.add(this);
+ }
+
+ @Override
+ public void commitTran() throws AMQStoreException
+ {
+ try
+ {
+ super.commitTran();
+ }
+ finally
+ {
+ JDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public StoreFuture commitTranAsync() throws AMQStoreException
+ {
+ try
+ {
+ return super.commitTranAsync();
+ }
+ finally
+ {
+ JDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public void abortTran() throws AMQStoreException
+ {
+ try
+ {
+ super.abortTran();
+ }
+ finally
+ {
+ JDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
new file mode 100644
index 0000000000..1446ad34e9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.jdbc;
+
+import org.apache.qpid.server.plugin.MessageStoreFactory;
+import org.apache.qpid.server.store.MessageStore;
+
+public class JDBCMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return JDBCMessageStore.TYPE;
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new JDBCMessageStore();
+ }
+
+}
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
index 1357f816b7..0edd44f5a5 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
@@ -17,4 +17,5 @@
# under the License.
#
org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
-org.apache.qpid.server.store.MemoryMessageStoreFactory \ No newline at end of file
+org.apache.qpid.server.store.MemoryMessageStoreFactory
+org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory