diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-18 20:51:43 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-18 20:51:43 +0000 |
| commit | 326b9560c14d1c30eb71c1396858791f9187d11e (patch) | |
| tree | 34ba78548d48295e88b9e038f382bd8861f32500 /qpid/java/broker-plugins | |
| parent | 2622dda9c7d3267efd985b5ae5928b99063d2fa7 (diff) | |
| download | qpid-python-326b9560c14d1c30eb71c1396858791f9187d11e.tar.gz | |
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with
another persistent store implementation.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603626 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
17 files changed, 1585 insertions, 990 deletions
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java new file mode 100644 index 0000000000..0a121ad476 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java @@ -0,0 +1,337 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.derby; + + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.JdbcUtils; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreException; + +public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore +{ + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false); + + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + private long _totalStoreSize; + private boolean _limitBusted; + + private ConfiguredObject<?> _parent; + + @Override + public final void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + if (_messageStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + + DerbyUtils.loadDerbyDriver(); + + doOpen(parent, messageStoreSettings); + + Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number + ? ((Number) overfullAttr).longValue() + : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number + ? ((Number) underfullAttr).longValue() + : Long.parseLong(underfullAttr.toString()); + + if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + + + createOrOpenMessageStoreDatabase(); + setInitialSize(); + setMaximumMessageId(); + } + } + + protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings); + + @Override + public final void upgradeStoreStructure() throws StoreException + { + checkMessageStoreOpen(); + + upgrade(_parent); + } + + @Override + public final void closeMessageStore() + { + if (_messageStoreOpen.compareAndSet(true, false)) + { + doClose(); + } + } + + protected abstract void doClose(); + + @Override + protected boolean isMessageStoreOpen() + { + return _messageStoreOpen.get(); + } + + @Override + protected void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + + @Override + protected String getSqlBlobType() + { + return "blob"; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return "varchar("+size+") for bit data"; + } + + @Override + protected String getSqlBigIntType() + { + return "bigint"; + } + + @Override + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + return DerbyUtils.getBlobAsBytes(rs, col); + } + + @Override + protected boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + return DerbyUtils.tableExists(tableName, conn); + } + + @Override + protected void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) + { + synchronized(this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 3*delta; + + Connection conn = null; + try + { + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(conn); + + _totalStoreSize = getSizeOnDisk(conn); + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + catch (SQLException e) + { + JdbcUtils.closeConnection(conn, getLogger()); + throw new StoreException("Exception while processing store size change", e); + } + } + } + } + + private void setInitialSize() + { + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + } + catch (SQLException e) + { + getLogger().error("Unable to set initial store size", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } + } + + private long getSizeOnDisk(Connection conn) + { + PreparedStatement stmt = null; + try + { + String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + + " FROM " + + " SYS.SYSTABLES systabs," + + " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + + " WHERE systabs.tabletype = 'T'"; + + stmt = conn.prepareStatement(sizeQuery); + + ResultSet rs = null; + long size = 0l; + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + size = rs.getLong(1); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + return size; + + } + catch (SQLException e) + { + throw new StoreException("Error establishing on disk size", e); + } + finally + { + JdbcUtils.closePreparedStatement(stmt, getLogger()); + } + } + + private void reduceSizeOnDisk(Connection conn) + { + CallableStatement cs = null; + PreparedStatement stmt = null; + try + { + String tableQuery = + "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; + stmt = conn.prepareStatement(tableQuery); + ResultSet rs = null; + + List<String> schemas = new ArrayList<String>(); + List<String> tables = new ArrayList<String>(); + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + schemas.add(rs.getString(1)); + tables.add(rs.getString(2)); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + + cs = conn.prepareCall + ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); + + for(int i = 0; i < schemas.size(); i++) + { + cs.setString(1, schemas.get(i)); + cs.setString(2, tables.get(i)); + cs.setShort(3, (short) 0); + cs.execute(); + } + } + catch (SQLException e) + { + throw new StoreException("Error reducing on disk size", e); + } + finally + { + JdbcUtils.closePreparedStatement(stmt, getLogger()); + JdbcUtils.closePreparedStatement(cs, getLogger()); + } + } + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + +} diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java new file mode 100644 index 0000000000..540e92fac7 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java @@ -0,0 +1,221 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.derby; + + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.AbstractJDBCConfigurationStore; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; +import org.apache.qpid.server.store.StoreException; + +/** + * Implementation of a DurableConfigurationStore backed by Apache Derby + * that also provides a MessageStore. + */ +public class DerbyConfigurationStore extends AbstractJDBCConfigurationStore + implements MessageStoreProvider, DurableConfigurationStore +{ + private static final Logger LOGGER = Logger.getLogger(DerbyConfigurationStore.class); + + private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); + private final MessageStoreWrapper _messageStore = new MessageStoreWrapper(); + + private String _connectionURL; + private String _storeLocation; + + private ConfiguredObject<?> _parent; + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + throws StoreException + { + if (_configurationStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + DerbyUtils.loadDerbyDriver(); + + String databasePath = (String) storeSettings.get(MessageStore.STORE_PATH); + + _storeLocation = databasePath; + _connectionURL = DerbyUtils.createConnectionUrl(parent.getName(), databasePath); + + createOrOpenConfigurationStoreDatabase(); + } + } + + @Override + public void upgradeStoreStructure() throws StoreException + { + checkConfigurationStoreOpen(); + upgradeIfNecessary(_parent); + } + + @Override + protected Connection getConnection() throws SQLException + { + checkConfigurationStoreOpen(); + return DriverManager.getConnection(_connectionURL); + } + + @Override + public void closeConfigurationStore() throws StoreException + { + if (_messageStore.isMessageStoreOpen()) + { + throw new IllegalStateException("Cannot close the store as the provided message store is still open"); + } + + if (_configurationStoreOpen.compareAndSet(true, false)) + { + try + { + DerbyUtils.shutdownDatabase(_connectionURL); + } + catch (SQLException e) + { + throw new StoreException("Error closing configuration store", e); + } + } + } + + @Override + protected String getSqlBlobType() + { + return "blob"; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return "varchar("+size+") for bit data"; + } + + @Override + protected String getSqlBigIntType() + { + return "bigint"; + } + + @Override + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + return DerbyUtils.getBlobAsString(rs, col); + } + + @Override + public void onDelete() + { + if (_messageStore.isMessageStoreOpen()) + { + throw new IllegalStateException("Cannot delete the store as the provided message store is still open"); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting store " + _storeLocation); + } + + try + { + DerbyUtils.deleteDatabaseLocation(_storeLocation); + } + catch (StoreException se) + { + LOGGER.debug("Failed to delete the store at location " + _storeLocation); + } + finally + { + _storeLocation = null; + } + } + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + @Override + protected boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + return DerbyUtils.tableExists(tableName, conn); + } + + @Override + protected void checkConfigurationStoreOpen() + { + if (!_configurationStoreOpen.get()) + { + throw new IllegalStateException("Configuration store is not open"); + } + } + + @Override + protected Logger getLogger() + { + return LOGGER; + } + + private class MessageStoreWrapper extends AbstractDerbyMessageStore + { + @Override + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + protected Connection getConnection() throws SQLException + { + checkMessageStoreOpen(); + return DerbyConfigurationStore.this.getConnection(); + } + + @Override + protected void doClose() + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + public String getStoreLocation() + { + return DerbyConfigurationStore.this._storeLocation; + } + + @Override + protected Logger getLogger() + { + return DerbyConfigurationStore.this.getLogger(); + } + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 6f87e81ba1..9e2a2f63d4 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -21,522 +21,96 @@ package org.apache.qpid.server.store.derby; -import java.io.File; -import java.sql.Blob; -import java.sql.CallableStatement; import java.sql.Connection; -import java.sql.Driver; import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import org.apache.log4j.Logger; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreProvider; -import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; -import org.apache.qpid.util.FileUtils; /** - * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence - * mechanism. - * + * Implementation of a MessageStore backed by Apache Derby. */ -public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider, - DurableConfigurationStore +public class DerbyMessageStore extends AbstractDerbyMessageStore { + private static final Logger LOGGER = Logger.getLogger(DerbyMessageStore.class); - private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); - - private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; - - public static final String MEMORY_STORE_LOCATION = ":memory:"; - - private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; - - public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; - - public static final String TYPE = "DERBY"; - - private long _totalStoreSize; - private boolean _limitBusted; - private long _persistentSizeLowThreshold; - private long _persistentSizeHighThreshold; - - protected String _connectionURL; - + private String _connectionURL; private String _storeLocation; - private Class<Driver> _driverClass; - - private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); - - public DerbyMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } @Override - protected String getSqlBlobType() + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) { - return "blob"; - } + String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH); + String name = parent.getName(); - @Override - protected String getSqlVarBinaryType(int size) - { - return "varchar("+size+") for bit data"; + _storeLocation = databasePath; + _connectionURL = DerbyUtils.createConnectionUrl(name, databasePath); } @Override - protected String getSqlBigIntType() + protected Connection getConnection() throws SQLException { - return "bigint"; + checkMessageStoreOpen(); + return DriverManager.getConnection(_connectionURL); } + @Override protected void doClose() { try { - Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true"); - // Shouldn't reach this point - shutdown=true should throw SQLException - conn.close(); - getLogger().error("Unable to shut down the store"); + DerbyUtils.shutdownDatabase(_connectionURL); } catch (SQLException e) { - if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) - { - //expected and represents a clean shutdown of this database only, do nothing. - } - else - { - getLogger().error("Exception whilst shutting down the store: " + e); - throw new StoreException("Error closing message store", e); - } + throw new StoreException("Error closing configuration store", e); } } @Override - protected void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) - throws ClassNotFoundException - { - //Update to pick up QPID_WORK and use that as the default location not just derbyDB - _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); - - String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);; - - if(databasePath == null) - { - databasePath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; - } - - if(!MEMORY_STORE_LOCATION.equals(databasePath)) - { - File environmentPath = new File(databasePath); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - } - - _storeLocation = databasePath; - - Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); - Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); - - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - - //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created. - _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true"; - - setInitialSize(); - - } - - private void setInitialSize() - { - Connection conn = null; - try - { - - - try - { - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - } - finally - { - if(conn != null) - { - conn.close(); - - - } - } - } - catch (SQLException e) - { - getLogger().error("Unable to set initial store size", e); - } - } - - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - byte[] bytes = blob.getBytes(1, (int)blob.length()); - return new String(bytes, UTF8_CHARSET); - } - - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - } - - - protected boolean tableExists(final String tableName, final Connection conn) throws SQLException - { - PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); - try - { - stmt.setString(1, tableName); - ResultSet rs = stmt.executeQuery(); - try - { - return rs.next(); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - - public String getStoreLocation() - { - return _storeLocation; - } - - protected synchronized void storedSizeChange(final int delta) + public void onDelete() { - if(getPersistentSizeHighThreshold() > 0) + if (isMessageStoreOpen()) { - synchronized(this) - { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 3*delta; - - Connection conn = null; - try - { - - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - } - else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) - { - long oldSize = _totalStoreSize; - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(conn); - - _totalStoreSize = getSizeOnDisk(conn); - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - - } - } - catch (SQLException e) - { - closeConnection(conn); - throw new StoreException("Exception while processing store size change", e); - } - } + throw new IllegalStateException("Cannot delete the store as the message store is still open"); } - } - - private void reduceSizeOnDisk(Connection conn) - { - CallableStatement cs = null; - PreparedStatement stmt = null; - try - { - String tableQuery = - "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; - stmt = conn.prepareStatement(tableQuery); - ResultSet rs = null; - List<String> schemas = new ArrayList<String>(); - List<String> tables = new ArrayList<String>(); - - try - { - rs = stmt.executeQuery(); - while(rs.next()) - { - schemas.add(rs.getString(1)); - tables.add(rs.getString(2)); - } - } - finally - { - if(rs != null) - { - rs.close(); - } - } - - - cs = conn.prepareCall - ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); - - for(int i = 0; i < schemas.size(); i++) - { - cs.setString(1, schemas.get(i)); - cs.setString(2, tables.get(i)); - cs.setShort(3, (short) 0); - cs.execute(); - } - } - catch (SQLException e) - { - closeConnection(conn); - throw new StoreException("Error reducing on disk size", e); - } - finally + if (LOGGER.isDebugEnabled()) { - closePreparedStatement(stmt); - closePreparedStatement(cs); + LOGGER.debug("Deleting store " + _storeLocation); } - } - - private long getSizeOnDisk(Connection conn) - { - PreparedStatement stmt = null; try { - String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + - " FROM " + - " SYS.SYSTABLES systabs," + - " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + - " WHERE systabs.tabletype = 'T'"; - - stmt = conn.prepareStatement(sizeQuery); - - ResultSet rs = null; - long size = 0l; - - try - { - rs = stmt.executeQuery(); - while(rs.next()) - { - size = rs.getLong(1); - } - } - finally - { - if(rs != null) - { - rs.close(); - } - } - - return size; - + DerbyUtils.deleteDatabaseLocation(_storeLocation); } - catch (SQLException e) + catch (StoreException se) { - closeConnection(conn); - throw new StoreException("Error establishing on disk size", e); + LOGGER.debug("Failed to delete the store at location " + _storeLocation); } finally { - closePreparedStatement(stmt); + _storeLocation = null; } - - } - - - private long getPersistentSizeLowThreshold() - { - return _persistentSizeLowThreshold; - } - - private long getPersistentSizeHighThreshold() - { - return _persistentSizeHighThreshold; } @Override - public void onDelete() + protected Logger getLogger() { - if (_logger.isDebugEnabled()) - { - _logger.debug("Deleting store " + _storeLocation); - } - - if (MEMORY_STORE_LOCATION.equals(_storeLocation)) - { - return; - } - - if (_storeLocation != null) - { - File location = new File(_storeLocation); - if (location.exists()) - { - if (!FileUtils.delete(location, true)) - { - _logger.error("Cannot delete " + _storeLocation); - } - } - } + return LOGGER; } - protected Connection getConnection() throws SQLException - { - return DriverManager.getConnection(_connectionURL); - } @Override - public MessageStore getMessageStore() + public String getStoreLocation() { - return _messageStoreFacade; + return _storeLocation; } - private class MessageStoreWrapper implements MessageStore - { - - @Override - public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) - { - DerbyMessageStore.this.openMessageStore(parent, messageStoreSettings); - } - - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) - { - return DerbyMessageStore.this.addMessage(metaData); - } - - @Override - public boolean isPersistent() - { - return DerbyMessageStore.this.isPersistent(); - } - - @Override - public Transaction newTransaction() - { - return DerbyMessageStore.this.newTransaction(); - } - - @Override - public void closeMessageStore() - { - DerbyMessageStore.this.closeMessageStore(); - } - - @Override - public void addEventListener(final EventListener eventListener, final Event... events) - { - DerbyMessageStore.this.addEventListener(eventListener, events); - } - - @Override - public void upgradeStoreStructure() throws StoreException - { - } - - @Override - public String getStoreLocation() - { - return DerbyMessageStore.this.getStoreLocation(); - } - - @Override - public void onDelete() - { - DerbyMessageStore.this.onDelete(); - } - - @Override - public void visitMessages(final MessageHandler handler) throws StoreException - { - DerbyMessageStore.this.visitMessages(handler); - } - - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException - { - DerbyMessageStore.this.visitMessageInstances(handler); - } - - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - DerbyMessageStore.this.visitDistributedTransactions(handler); - } - } } diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java new file mode 100644 index 0000000000..b0f4a137f2 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java @@ -0,0 +1,165 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.derby; + + +import java.io.File; +import java.nio.charset.Charset; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.util.FileUtils; + +public class DerbyUtils +{ + public static final String MEMORY_STORE_LOCATION = ":memory:"; + public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + private static final String TABLE_EXISTENCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + + public static void loadDerbyDriver() + { + try + { + Class<Driver> driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); + } + catch (ClassNotFoundException e) + { + throw new StoreException("Failed to load driver " + SQL_DRIVER_NAME, e); + } + } + + public static String createConnectionUrl(final String name, final String databasePath) + { + // Derby wont use an existing directory, so we append parent name + if (MEMORY_STORE_LOCATION.equals(databasePath)) + { + return "jdbc:derby:" + MEMORY_STORE_LOCATION + "/" + name + ";create=true"; + } + else + { + File environmentPath = new File(databasePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + + environmentPath + + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + return "jdbc:derby:" + databasePath + "/" + name + ";create=true"; + } + + } + + public static void shutdownDatabase(String connectionURL) throws SQLException + { + try + { + Connection conn = DriverManager.getConnection(connectionURL + ";shutdown=true"); + // Shouldn't reach this point - shutdown=true should throw SQLException + conn.close(); + } + catch (SQLException e) + { + if (e.getSQLState().equalsIgnoreCase(DerbyUtils.DERBY_SINGLE_DB_SHUTDOWN_CODE)) + { + //expected and represents a clean shutdown of this database only, do nothing. + } + else + { + throw e; + } + } + } + + public static void deleteDatabaseLocation(String storeLocation) + { + if (MEMORY_STORE_LOCATION.equals(storeLocation)) + { + return; + } + + if (storeLocation != null) + { + File location = new File(storeLocation); + if (location.exists()) + { + if (!FileUtils.delete(location, true)) + { + throw new StoreException("Failed to delete the store at location : " + storeLocation); + } + } + } + } + + public static String getBlobAsString(ResultSet rs, int col) throws SQLException + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + byte[] bytes = blob.getBytes(1, (int) blob.length()); + return new String(bytes, UTF8_CHARSET); + } + + protected static byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + } + + public static boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTENCE_QUERY); + try + { + stmt.setString(1, tableName); + ResultSet rs = stmt.executeQuery(); + try + { + return rs.next(); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + + +} + diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java index 31c3f7c944..fc67d2fa50 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java @@ -45,7 +45,7 @@ public class DerbyVirtualHost extends AbstractVirtualHost<DerbyVirtualHost> @Override protected MessageStore createMessageStore() { - return new DerbyMessageStore().getMessageStore(); + return new DerbyMessageStore(); } } diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java index ffc19832fa..340d046033 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java @@ -28,7 +28,7 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.derby.DerbyMessageStore; +import org.apache.qpid.server.store.derby.DerbyConfigurationStore; import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode; import org.apache.qpid.server.virtualhostnode.FileBasedVirtualHostNode; @@ -49,7 +49,7 @@ public class DerbyVirtualHostNode extends AbstractStandardVirtualHostNode<DerbyV @Override protected DurableConfigurationStore createConfigurationStore() { - return new DerbyMessageStore(); + return new DerbyConfigurationStore(); } @Override diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java index aaf65e9ee0..08c421b606 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java @@ -26,9 +26,9 @@ public class DerbyMessageStoreConfigurationTest extends AbstractDurableConfigura { @Override - protected DerbyMessageStore createConfigStore() throws Exception + protected DerbyConfigurationStore createConfigStore() throws Exception { - return new DerbyMessageStore(); + return new DerbyConfigurationStore(); } } diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java index 1d35b9ef83..ba7ae26292 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -50,7 +50,7 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes @Override protected MessageStore createStore() throws Exception { - return (new DerbyMessageStore()).getMessageStore(); + return new DerbyMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java index 4594b7f223..9a2d945494 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -83,7 +83,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return (new DerbyMessageStore()).getMessageStore(); + return new DerbyMessageStore(); } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java new file mode 100644 index 0000000000..bd245fa4f4 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.jdbc; + + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; + +public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore +{ + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false); + private final List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<>(); + + private ConfiguredObject<?> _parent; + + @Override + public final void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + { + if (_messageStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + + doOpen(parent, storeSettings); + + createOrOpenMessageStoreDatabase(); + setMaximumMessageId(); + } + } + + protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) + throws StoreException; + + @Override + public final void upgradeStoreStructure() throws StoreException + { + checkMessageStoreOpen(); + + upgrade(_parent); + } + + @Override + public final void closeMessageStore() + { + if (_messageStoreOpen.compareAndSet(true, false)) + { + try + { + while(!_transactions.isEmpty()) + { + RecordedJDBCTransaction txn = _transactions.get(0); + txn.abortTran(); + } + } + finally + { + doClose(); + } + + } + } + + protected abstract void doClose(); + + protected boolean isMessageStoreOpen() + { + return _messageStoreOpen.get(); + } + + @Override + protected void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + + @Override + protected void storedSizeChange(int contentSize) + { + } + + @Override + public Transaction newTransaction() + { + return new RecordedJDBCTransaction(); + } + + + private class RecordedJDBCTransaction extends JDBCTransaction + { + private RecordedJDBCTransaction() + { + super(); + GenericAbstractJDBCMessageStore.this._transactions.add(this); + } + + @Override + public void commitTran() + { + try + { + super.commitTran(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public StoreFuture commitTranAsync() + { + try + { + return super.commitTranAsync(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public void abortTran() + { + try + { + super.abortTran(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + } +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java new file mode 100644 index 0000000000..7bafe5a859 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.jdbc; + + +import java.nio.charset.Charset; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; +import org.apache.qpid.server.store.*; +import org.apache.qpid.server.util.MapValueConverter; + +/** + * Implementation of a DurableConfigurationStore backed by Generic JDBC Database + * that also provides a MessageStore. + */ +public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStore implements MessageStoreProvider +{ + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + private static final Logger LOGGER = Logger.getLogger(GenericJDBCConfigurationStore.class); + + public static final String CONNECTION_URL = "connectionUrl"; + public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; + public static final String JDBC_BIG_INT_TYPE = "bigIntType"; + public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; + public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; + public static final String JDBC_BLOB_TYPE = "blobType"; + + private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); + private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); + + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + + private String _blobType; + private String _varBinaryType; + private String _bigIntType; + private boolean _useBytesMethodsForBlob; + + private ConfiguredObject<?> _parent; + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + throws StoreException + { + if (_configurationStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); + Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); + + JDBCDetails details = null; + + String[] components = _connectionURL.split(":", 3); + if(components.length >= 2) + { + String vendor = components[1]; + details = JDBCDetails.getDetails(vendor); + } + + if(details == null) + { + getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); + + details = JDBCDetails.getDefaultDetails(); + } + + String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); + + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + LOGGER.warn("Unknown connection pool type: " + + connectionPoolType + + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + try + { + _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); + } + catch (SQLException e) + { + throw new StoreException("Failed to create connection provider for " + _connectionURL); + } + _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); + _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); + _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); + _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, + storeSettings, + details.getBigintType()); + + createOrOpenConfigurationStoreDatabase(); + } + } + + @Override + public void upgradeStoreStructure() throws StoreException + { + checkConfigurationStoreOpen(); + upgradeIfNecessary(_parent); + } + + @Override + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); + } + + @Override + public void closeConfigurationStore() throws StoreException + { + if (_configurationStoreOpen.compareAndSet(true, false)) + { + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new StoreException("Unable to close connection provider ", e); + } + } + } + + @Override + protected String getSqlBlobType() + { + return _blobType; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return String.format(_varBinaryType, size); + } + + @Override + public String getSqlBigIntType() + { + return _bigIntType; + } + + @Override + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + byte[] bytes; + if(_useBytesMethodsForBlob) + { + bytes = rs.getBytes(col); + return new String(bytes,UTF8_CHARSET); + } + else + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + bytes = blob.getBytes(1, (int)blob.length()); + } + return new String(bytes, UTF8_CHARSET); + + } + + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + if(_useBytesMethodsForBlob) + { + return rs.getBytes(col); + } + else + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + + } + } + + @Override + protected void checkConfigurationStoreOpen() + { + if (!_configurationStoreOpen.get()) + { + throw new IllegalStateException("Configuration store is not open"); + } + } + + @Override + protected Logger getLogger() + { + return LOGGER; + } + + @Override + public MessageStore getMessageStore() + { + return _messageStoreFacade; + } + + private class MessageStoreWrapper extends GenericAbstractJDBCMessageStore + { + @Override + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + protected Connection getConnection() throws SQLException + { + return GenericJDBCConfigurationStore.this.getConnection(); + } + + @Override + protected void doClose() + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + public String getStoreLocation() + { + return GenericJDBCConfigurationStore.this._connectionURL; + } + + @Override + protected Logger getLogger() + { + return GenericJDBCConfigurationStore.this.getLogger(); + } + + @Override + protected String getSqlBlobType() + { + return GenericJDBCConfigurationStore.this.getSqlBlobType(); + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return GenericJDBCConfigurationStore.this.getSqlVarBinaryType(size); + } + + @Override + protected String getSqlBigIntType() + { + return GenericJDBCConfigurationStore.this.getSqlBigIntType(); + } + + @Override + protected byte[] getBlobAsBytes(final ResultSet rs, final int col) throws SQLException + { + return GenericJDBCConfigurationStore.this.getBlobAsBytes(rs, col); + } + } + + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java new file mode 100644 index 0000000000..dad4432183 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java @@ -0,0 +1,177 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.jdbc; + + +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.util.MapValueConverter; + +/** + * Implementation of a MessageStore backed by a Generic JDBC Database. + */ +public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore +{ + + private static final Logger _logger = Logger.getLogger(GenericJDBCMessageStore.class); + + public static final String TYPE = "JDBC"; + public static final String CONNECTION_URL = "connectionUrl"; + public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; + public static final String JDBC_BIG_INT_TYPE = "bigIntType"; + public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; + public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; + public static final String JDBC_BLOB_TYPE = "blobType"; + + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + + private String _blobType; + private String _varBinaryType; + private String _bigIntType; + private boolean _useBytesMethodsForBlob; + + + @Override + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) throws StoreException + { + _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); + + org.apache.qpid.server.store.jdbc.JDBCDetails details = null; + + String[] components = _connectionURL.split(":", 3); + if(components.length >= 2) + { + String vendor = components[1]; + details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDetails(vendor); + } + + if(details == null) + { + getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); + + details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDefaultDetails(); + } + + + _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); + _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); + _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); + _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, + storeSettings, + details.getBigintType()); + + Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); + String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); + + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + try + { + _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); + } + catch (SQLException e) + { + throw new StoreException("Failed to create connection provider for " + _connectionURL); + } + + } + + @Override + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); + } + + protected void doClose() + { + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new StoreException("Unable to close connection provider ", e); + } + } + + + @Override + protected Logger getLogger() + { + return _logger; + } + + @Override + protected String getSqlBlobType() + { + return _blobType; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return String.format(_varBinaryType, size); + } + + @Override + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + if(_useBytesMethodsForBlob) + { + return rs.getBytes(col); + } + else + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + + } + } + + @Override + public String getSqlBigIntType() + { + return _bigIntType; + } + + @Override + public String getStoreLocation() + { + return _connectionURL; + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java new file mode 100644 index 0000000000..6cf1413b83 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.store.jdbc; + +import java.util.HashMap; +import java.util.Map; + +public class JDBCDetails +{ + + private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<>(); + + private static JDBCDetails DERBY_DETAILS = + new JDBCDetails("derby", + "blob", + "varchar(%d) for bit data", + "bigint", + false); + + private static JDBCDetails POSTGRESQL_DETAILS = + new JDBCDetails("postgresql", + "bytea", + "bytea", + "bigint", + true); + + private static JDBCDetails MYSQL_DETAILS = + new JDBCDetails("mysql", + "blob", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails SYBASE_DETAILS = + new JDBCDetails("sybase", + "image", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails ORACLE_DETAILS = + new JDBCDetails("oracle", + "blob", + "raw(%d)", + "number", + false); + + + static + { + + addDetails(DERBY_DETAILS); + addDetails(POSTGRESQL_DETAILS); + addDetails(MYSQL_DETAILS); + addDetails(SYBASE_DETAILS); + addDetails(ORACLE_DETAILS); + } + + public static JDBCDetails getDetails(String vendor) + { + return VENDOR_DETAILS.get(vendor); + } + + public static JDBCDetails getDefaultDetails() + { + return DERBY_DETAILS; + } + + private static void addDetails(JDBCDetails details) + { + VENDOR_DETAILS.put(details.getVendor(), details); + } + + private final String _vendor; + private String _blobType; + private String _varBinaryType; + private String _bigintType; + private boolean _useBytesMethodsForBlob; + + JDBCDetails(String vendor, + String blobType, + String varBinaryType, + String bigIntType, + boolean useBytesMethodsForBlob) + { + _vendor = vendor; + setBlobType(blobType); + setVarBinaryType(varBinaryType); + setBigintType(bigIntType); + setUseBytesMethodsForBlob(useBytesMethodsForBlob); + } + + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + JDBCDetails that = (JDBCDetails) o; + + if (!getVendor().equals(that.getVendor())) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return getVendor().hashCode(); + } + + @Override + public String toString() + { + return "JDBCDetails{" + + "vendor='" + getVendor() + '\'' + + ", blobType='" + getBlobType() + '\'' + + ", varBinaryType='" + getVarBinaryType() + '\'' + + ", bigIntType='" + getBigintType() + '\'' + + ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + + '}'; + } + + public String getVendor() + { + return _vendor; + } + + public String getBlobType() + { + return _blobType; + } + + public void setBlobType(String blobType) + { + _blobType = blobType; + } + + public String getVarBinaryType() + { + return _varBinaryType; + } + + public void setVarBinaryType(String varBinaryType) + { + _varBinaryType = varBinaryType; + } + + public boolean isUseBytesMethodsForBlob() + { + return _useBytesMethodsForBlob; + } + + public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) + { + _useBytesMethodsForBlob = useBytesMethodsForBlob; + } + + public String getBigintType() + { + return _bigintType; + } + + public void setBigintType(String bigintType) + { + _bigintType = bigintType; + } +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java deleted file mode 100644 index 61ecb6748c..0000000000 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ /dev/null @@ -1,522 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store.jdbc; - - -import java.sql.Blob; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.log4j.Logger; - -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreProvider; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; -import org.apache.qpid.server.util.MapValueConverter; - -/** - * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence - * mechanism. - * - */ -public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider -{ - - private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); - - public static final String TYPE = "JDBC"; - public static final String CONNECTION_URL = "connectionUrl"; - public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; - public static final String JDBC_BIG_INT_TYPE = "bigIntType"; - public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; - public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; - public static final String JDBC_BLOB_TYPE = "blobType"; - - protected String _connectionURL; - private ConnectionProvider _connectionProvider; - - private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); - - private static class JDBCDetails - { - private final String _vendor; - private String _blobType; - private String _varBinaryType; - private String _bigintType; - private boolean _useBytesMethodsForBlob; - - private JDBCDetails(String vendor, - String blobType, - String varBinaryType, - String bigIntType, - boolean useBytesMethodsForBlob) - { - _vendor = vendor; - setBlobType(blobType); - setVarBinaryType(varBinaryType); - setBigintType(bigIntType); - setUseBytesMethodsForBlob(useBytesMethodsForBlob); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - JDBCDetails that = (JDBCDetails) o; - - if (!getVendor().equals(that.getVendor())) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - return getVendor().hashCode(); - } - - @Override - public String toString() - { - return "JDBCDetails{" + - "vendor='" + getVendor() + '\'' + - ", blobType='" + getBlobType() + '\'' + - ", varBinaryType='" + getVarBinaryType() + '\'' + - ", bigIntType='" + getBigintType() + '\'' + - ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + - '}'; - } - - public String getVendor() - { - return _vendor; - } - - public String getBlobType() - { - return _blobType; - } - - public void setBlobType(String blobType) - { - _blobType = blobType; - } - - public String getVarBinaryType() - { - return _varBinaryType; - } - - public void setVarBinaryType(String varBinaryType) - { - _varBinaryType = varBinaryType; - } - - public boolean isUseBytesMethodsForBlob() - { - return _useBytesMethodsForBlob; - } - - public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) - { - _useBytesMethodsForBlob = useBytesMethodsForBlob; - } - - public String getBigintType() - { - return _bigintType; - } - - public void setBigintType(String bigintType) - { - _bigintType = bigintType; - } - } - - private static JDBCDetails DERBY_DETAILS = - new JDBCDetails("derby", - "blob", - "varchar(%d) for bit data", - "bigint", - false); - - private static JDBCDetails POSTGRESQL_DETAILS = - new JDBCDetails("postgresql", - "bytea", - "bytea", - "bigint", - true); - - private static JDBCDetails MYSQL_DETAILS = - new JDBCDetails("mysql", - "blob", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails SYBASE_DETAILS = - new JDBCDetails("sybase", - "image", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails ORACLE_DETAILS = - new JDBCDetails("oracle", - "blob", - "raw(%d)", - "number", - false); - - - private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>(); - - static - { - - addDetails(DERBY_DETAILS); - addDetails(POSTGRESQL_DETAILS); - addDetails(MYSQL_DETAILS); - addDetails(SYBASE_DETAILS); - addDetails(ORACLE_DETAILS); - } - - private static void addDetails(JDBCDetails details) - { - VENDOR_DETAILS.put(details.getVendor(), details); - } - - private String _blobType; - private String _varBinaryType; - private String _bigIntType; - private boolean _useBytesMethodsForBlob; - - private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>(); - - - public JDBCMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } - - protected String getSqlBlobType() - { - return _blobType; - } - - protected String getSqlVarBinaryType(int size) - { - return String.format(_varBinaryType, size); - } - - public String getSqlBigIntType() - { - return _bigIntType; - } - - @Override - protected void doClose() - { - try - { - while(!_transactions.isEmpty()) - { - RecordedJDBCTransaction txn = _transactions.get(0); - txn.abortTran(); - } - } - finally - { - try - { - _connectionProvider.close(); - } - catch (SQLException e) - { - throw new StoreException("Unable to close connection provider ", e); - } - } - } - - - protected Connection getConnection() throws SQLException - { - return _connectionProvider.getConnection(); - } - - - protected void implementationSpecificConfiguration(String name, Map<String, Object> storeSettings) - throws ClassNotFoundException, SQLException - { - _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); - Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); - - JDBCDetails details = null; - - String[] components = _connectionURL.split(":",3); - if(components.length >= 2) - { - String vendor = components[1]; - details = VENDOR_DETAILS.get(vendor); - } - - if(details == null) - { - getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); - - // TODO - is there a better default than derby - details = DERBY_DETAILS; - } - - String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); - - JDBCConnectionProviderFactory connectionProviderFactory = - JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); - if(connectionProviderFactory == null) - { - _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); - connectionProviderFactory = new DefaultConnectionProviderFactory(); - } - - _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); - _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); - _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); - _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); - _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, storeSettings, details.getBigintType()); - } - - @Override - protected void storedSizeChange(int contentSize) - { - } - - public String getStoreLocation() - { - return _connectionURL; - } - - @Override - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - if(_useBytesMethodsForBlob) - { - return rs.getBytes(col); - } - else - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - - } - } - - @Override - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - byte[] bytes; - if(_useBytesMethodsForBlob) - { - bytes = rs.getBytes(col); - return new String(bytes,UTF8_CHARSET); - } - else - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - bytes = blob.getBytes(1, (int)blob.length()); - } - return new String(bytes, UTF8_CHARSET); - - } - - @Override - public Transaction newTransaction() - { - return new RecordedJDBCTransaction(); - } - - private class RecordedJDBCTransaction extends JDBCTransaction - { - private RecordedJDBCTransaction() - { - super(); - JDBCMessageStore.this._transactions.add(this); - } - - @Override - public void commitTran() - { - try - { - super.commitTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public StoreFuture commitTranAsync() - { - try - { - return super.commitTranAsync(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public void abortTran() - { - try - { - super.abortTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - } - - @Override - public MessageStore getMessageStore() - { - return _messageStoreFacade; - } - - private class MessageStoreWrapper implements MessageStore - { - - @Override - public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) - { - JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings); - } - - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) - { - return JDBCMessageStore.this.addMessage(metaData); - } - - @Override - public boolean isPersistent() - { - return JDBCMessageStore.this.isPersistent(); - } - - @Override - public Transaction newTransaction() - { - return JDBCMessageStore.this.newTransaction(); - } - - @Override - public void closeMessageStore() - { - JDBCMessageStore.this.closeMessageStore(); - } - - @Override - public void addEventListener(final EventListener eventListener, final Event... events) - { - JDBCMessageStore.this.addEventListener(eventListener, events); - } - - @Override - public void upgradeStoreStructure() throws StoreException - { - } - - @Override - public String getStoreLocation() - { - return JDBCMessageStore.this.getStoreLocation(); - } - - @Override - public void onDelete() - { - JDBCMessageStore.this.onDelete(); - } - - @Override - public void visitMessages(final MessageHandler handler) throws StoreException - { - JDBCMessageStore.this.visitMessages(handler); - } - - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException - { - JDBCMessageStore.this.visitMessageInstances(handler); - } - - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - JDBCMessageStore.this.visitDistributedTransactions(handler); - } - } - -} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java index 1dd39a8696..85e8f89dbe 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java @@ -26,7 +26,7 @@ import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.jdbc.JDBCMessageStore; +import org.apache.qpid.server.store.jdbc.GenericJDBCMessageStore; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @ManagedObject(category = false, type = JDBCVirtualHost.VIRTUAL_HOST_TYPE) @@ -45,6 +45,6 @@ public class JDBCVirtualHost extends AbstractVirtualHost<JDBCVirtualHost> @Override protected MessageStore createMessageStore() { - return new JDBCMessageStore().getMessageStore(); + return new GenericJDBCMessageStore(); } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java index 2108b4e09a..ab8f4554cb 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.jdbc.JDBCMessageStore; +import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore; import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode; @ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category = false ) @@ -62,7 +62,7 @@ public class JDBCVirtualHostNodeImpl extends AbstractStandardVirtualHostNode<JDB @Override protected DurableConfigurationStore createConfigurationStore() { - return new JDBCMessageStore(); + return new GenericJDBCConfigurationStore(); } @Override diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index 1f03b7f75f..8261e93347 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -52,7 +52,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase public void testOnDelete() throws Exception { - Set<String> expectedTables = JDBCMessageStore.MESSAGE_STORE_TABLE_NAMES; + Set<String> expectedTables = GenericJDBCMessageStore.MESSAGE_STORE_TABLE_NAMES; assertTablesExist(expectedTables, true); getStore().closeMessageStore(); assertTablesExist(expectedTables, true); @@ -65,7 +65,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase { _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); - messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL); + messageStoreSettings.put(GenericJDBCMessageStore.CONNECTION_URL, _connectionURL); return messageStoreSettings; } @@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return (new JDBCMessageStore()).getMessageStore(); + return new GenericJDBCMessageStore(); } private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException |
