summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-18 20:51:43 +0000
committerKeith Wall <kwall@apache.org>2014-06-18 20:51:43 +0000
commit326b9560c14d1c30eb71c1396858791f9187d11e (patch)
tree34ba78548d48295e88b9e038f382bd8861f32500 /qpid/java/broker-plugins
parent2622dda9c7d3267efd985b5ae5928b99063d2fa7 (diff)
downloadqpid-python-326b9560c14d1c30eb71c1396858791f9187d11e.tar.gz
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with another persistent store implementation. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603626 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java337
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java221
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java482
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java165
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java2
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java4
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java4
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java2
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java2
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java160
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java286
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java177
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java197
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java522
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java4
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java4
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java6
17 files changed, 1585 insertions, 990 deletions
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
new file mode 100644
index 0000000000..0a121ad476
--- /dev/null
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
@@ -0,0 +1,337 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.derby;
+
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.AbstractJDBCMessageStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.JdbcUtils;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreException;
+
+public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore
+{
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false);
+
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public final void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+
+ DerbyUtils.loadDerbyDriver();
+
+ doOpen(parent, messageStoreSettings);
+
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
+
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number
+ ? ((Number) overfullAttr).longValue()
+ : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number
+ ? ((Number) underfullAttr).longValue()
+ : Long.parseLong(underfullAttr.toString());
+
+ if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
+
+ createOrOpenMessageStoreDatabase();
+ setInitialSize();
+ setMaximumMessageId();
+ }
+ }
+
+ protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings);
+
+ @Override
+ public final void upgradeStoreStructure() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ upgrade(_parent);
+ }
+
+ @Override
+ public final void closeMessageStore()
+ {
+ if (_messageStoreOpen.compareAndSet(true, false))
+ {
+ doClose();
+ }
+ }
+
+ protected abstract void doClose();
+
+ @Override
+ protected boolean isMessageStoreOpen()
+ {
+ return _messageStoreOpen.get();
+ }
+
+ @Override
+ protected void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return "blob";
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return "varchar("+size+") for bit data";
+ }
+
+ @Override
+ protected String getSqlBigIntType()
+ {
+ return "bigint";
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ return DerbyUtils.getBlobAsBytes(rs, col);
+ }
+
+ @Override
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ return DerbyUtils.tableExists(tableName, conn);
+ }
+
+ @Override
+ protected void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized(this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 3*delta;
+
+ Connection conn = null;
+ try
+ {
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk(conn);
+
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ catch (SQLException e)
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ throw new StoreException("Exception while processing store size change", e);
+ }
+ }
+ }
+ }
+
+ private void setInitialSize()
+ {
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Unable to set initial store size", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
+ }
+
+ private long getSizeOnDisk(Connection conn)
+ {
+ PreparedStatement stmt = null;
+ try
+ {
+ String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
+ " FROM " +
+ " SYS.SYSTABLES systabs," +
+ " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
+ " WHERE systabs.tabletype = 'T'";
+
+ stmt = conn.prepareStatement(sizeQuery);
+
+ ResultSet rs = null;
+ long size = 0l;
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ size = rs.getLong(1);
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+ return size;
+
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error establishing on disk size", e);
+ }
+ finally
+ {
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
+ }
+ }
+
+ private void reduceSizeOnDisk(Connection conn)
+ {
+ CallableStatement cs = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ String tableQuery =
+ "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
+ stmt = conn.prepareStatement(tableQuery);
+ ResultSet rs = null;
+
+ List<String> schemas = new ArrayList<String>();
+ List<String> tables = new ArrayList<String>();
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ schemas.add(rs.getString(1));
+ tables.add(rs.getString(2));
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+
+ cs = conn.prepareCall
+ ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
+
+ for(int i = 0; i < schemas.size(); i++)
+ {
+ cs.setString(1, schemas.get(i));
+ cs.setString(2, tables.get(i));
+ cs.setShort(3, (short) 0);
+ cs.execute();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error reducing on disk size", e);
+ }
+ finally
+ {
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
+ JdbcUtils.closePreparedStatement(cs, getLogger());
+ }
+ }
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
new file mode 100644
index 0000000000..540e92fac7
--- /dev/null
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
@@ -0,0 +1,221 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.derby;
+
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.AbstractJDBCConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
+import org.apache.qpid.server.store.StoreException;
+
+/**
+ * Implementation of a DurableConfigurationStore backed by Apache Derby
+ * that also provides a MessageStore.
+ */
+public class DerbyConfigurationStore extends AbstractJDBCConfigurationStore
+ implements MessageStoreProvider, DurableConfigurationStore
+{
+ private static final Logger LOGGER = Logger.getLogger(DerbyConfigurationStore.class);
+
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
+ private final MessageStoreWrapper _messageStore = new MessageStoreWrapper();
+
+ private String _connectionURL;
+ private String _storeLocation;
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+ DerbyUtils.loadDerbyDriver();
+
+ String databasePath = (String) storeSettings.get(MessageStore.STORE_PATH);
+
+ _storeLocation = databasePath;
+ _connectionURL = DerbyUtils.createConnectionUrl(parent.getName(), databasePath);
+
+ createOrOpenConfigurationStoreDatabase();
+ }
+ }
+
+ @Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ upgradeIfNecessary(_parent);
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ checkConfigurationStoreOpen();
+ return DriverManager.getConnection(_connectionURL);
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ if (_messageStore.isMessageStoreOpen())
+ {
+ throw new IllegalStateException("Cannot close the store as the provided message store is still open");
+ }
+
+ if (_configurationStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ DerbyUtils.shutdownDatabase(_connectionURL);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error closing configuration store", e);
+ }
+ }
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return "blob";
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return "varchar("+size+") for bit data";
+ }
+
+ @Override
+ protected String getSqlBigIntType()
+ {
+ return "bigint";
+ }
+
+ @Override
+ protected String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ return DerbyUtils.getBlobAsString(rs, col);
+ }
+
+ @Override
+ public void onDelete()
+ {
+ if (_messageStore.isMessageStoreOpen())
+ {
+ throw new IllegalStateException("Cannot delete the store as the provided message store is still open");
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting store " + _storeLocation);
+ }
+
+ try
+ {
+ DerbyUtils.deleteDatabaseLocation(_storeLocation);
+ }
+ catch (StoreException se)
+ {
+ LOGGER.debug("Failed to delete the store at location " + _storeLocation);
+ }
+ finally
+ {
+ _storeLocation = null;
+ }
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ @Override
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ return DerbyUtils.tableExists(tableName, conn);
+ }
+
+ @Override
+ protected void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ private class MessageStoreWrapper extends AbstractDerbyMessageStore
+ {
+ @Override
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ checkMessageStoreOpen();
+ return DerbyConfigurationStore.this.getConnection();
+ }
+
+ @Override
+ protected void doClose()
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return DerbyConfigurationStore.this._storeLocation;
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return DerbyConfigurationStore.this.getLogger();
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 6f87e81ba1..9e2a2f63d4 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -21,522 +21,96 @@
package org.apache.qpid.server.store.derby;
-import java.io.File;
-import java.sql.Blob;
-import java.sql.CallableStatement;
import java.sql.Connection;
-import java.sql.Driver;
import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.AbstractJDBCMessageStore;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreProvider;
-import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.util.FileUtils;
/**
- * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
- * mechanism.
- *
+ * Implementation of a MessageStore backed by Apache Derby.
*/
-public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider,
- DurableConfigurationStore
+public class DerbyMessageStore extends AbstractDerbyMessageStore
{
+ private static final Logger LOGGER = Logger.getLogger(DerbyMessageStore.class);
- private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
-
- private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
-
- public static final String MEMORY_STORE_LOCATION = ":memory:";
-
- private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
-
- public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
-
- public static final String TYPE = "DERBY";
-
- private long _totalStoreSize;
- private boolean _limitBusted;
- private long _persistentSizeLowThreshold;
- private long _persistentSizeHighThreshold;
-
- protected String _connectionURL;
-
+ private String _connectionURL;
private String _storeLocation;
- private Class<Driver> _driverClass;
-
- private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
-
- public DerbyMessageStore()
- {
- }
-
- protected Logger getLogger()
- {
- return _logger;
- }
@Override
- protected String getSqlBlobType()
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
{
- return "blob";
- }
+ String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
+ String name = parent.getName();
- @Override
- protected String getSqlVarBinaryType(int size)
- {
- return "varchar("+size+") for bit data";
+ _storeLocation = databasePath;
+ _connectionURL = DerbyUtils.createConnectionUrl(name, databasePath);
}
@Override
- protected String getSqlBigIntType()
+ protected Connection getConnection() throws SQLException
{
- return "bigint";
+ checkMessageStoreOpen();
+ return DriverManager.getConnection(_connectionURL);
}
+ @Override
protected void doClose()
{
try
{
- Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true");
- // Shouldn't reach this point - shutdown=true should throw SQLException
- conn.close();
- getLogger().error("Unable to shut down the store");
+ DerbyUtils.shutdownDatabase(_connectionURL);
}
catch (SQLException e)
{
- if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE))
- {
- //expected and represents a clean shutdown of this database only, do nothing.
- }
- else
- {
- getLogger().error("Exception whilst shutting down the store: " + e);
- throw new StoreException("Error closing message store", e);
- }
+ throw new StoreException("Error closing configuration store", e);
}
}
@Override
- protected void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings)
- throws ClassNotFoundException
- {
- //Update to pick up QPID_WORK and use that as the default location not just derbyDB
- _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
-
- String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);;
-
- if(databasePath == null)
- {
- databasePath = System.getProperty("QPID_WORK") + File.separator + "derbyDB";
- }
-
- if(!MEMORY_STORE_LOCATION.equals(databasePath))
- {
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
- }
-
- _storeLocation = databasePath;
-
- Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
- Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
-
- _persistentSizeHighThreshold = overfullAttr == null ? -1l :
- overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
- _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
- underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
-
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
-
- //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
- _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true";
-
- setInitialSize();
-
- }
-
- private void setInitialSize()
- {
- Connection conn = null;
- try
- {
-
-
- try
- {
- conn = newAutoCommitConnection();
- _totalStoreSize = getSizeOnDisk(conn);
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
-
-
- }
- }
- }
- catch (SQLException e)
- {
- getLogger().error("Unable to set initial store size", e);
- }
- }
-
- protected String getBlobAsString(ResultSet rs, int col) throws SQLException
- {
- Blob blob = rs.getBlob(col);
- if(blob == null)
- {
- return null;
- }
- byte[] bytes = blob.getBytes(1, (int)blob.length());
- return new String(bytes, UTF8_CHARSET);
- }
-
- protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
- {
- Blob dataAsBlob = rs.getBlob(col);
- return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
- }
-
-
- protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
- {
- PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
- try
- {
- stmt.setString(1, tableName);
- ResultSet rs = stmt.executeQuery();
- try
- {
- return rs.next();
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
-
- public String getStoreLocation()
- {
- return _storeLocation;
- }
-
- protected synchronized void storedSizeChange(final int delta)
+ public void onDelete()
{
- if(getPersistentSizeHighThreshold() > 0)
+ if (isMessageStoreOpen())
{
- synchronized(this)
- {
- // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
- // time, so we do so only when there's been enough change that it is worth looking again. We do this by
- // assuming the total size will change by less than twice the amount of the message data change.
- long newSize = _totalStoreSize += 3*delta;
-
- Connection conn = null;
- try
- {
-
- if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
- {
- conn = newAutoCommitConnection();
- _totalStoreSize = getSizeOnDisk(conn);
- if(_totalStoreSize > getPersistentSizeHighThreshold())
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
- }
- else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
- {
- long oldSize = _totalStoreSize;
- conn = newAutoCommitConnection();
- _totalStoreSize = getSizeOnDisk(conn);
- if(oldSize <= _totalStoreSize)
- {
-
- reduceSizeOnDisk(conn);
-
- _totalStoreSize = getSizeOnDisk(conn);
- }
-
- if(_totalStoreSize < getPersistentSizeLowThreshold())
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
-
-
- }
- }
- catch (SQLException e)
- {
- closeConnection(conn);
- throw new StoreException("Exception while processing store size change", e);
- }
- }
+ throw new IllegalStateException("Cannot delete the store as the message store is still open");
}
- }
-
- private void reduceSizeOnDisk(Connection conn)
- {
- CallableStatement cs = null;
- PreparedStatement stmt = null;
- try
- {
- String tableQuery =
- "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
- stmt = conn.prepareStatement(tableQuery);
- ResultSet rs = null;
- List<String> schemas = new ArrayList<String>();
- List<String> tables = new ArrayList<String>();
-
- try
- {
- rs = stmt.executeQuery();
- while(rs.next())
- {
- schemas.add(rs.getString(1));
- tables.add(rs.getString(2));
- }
- }
- finally
- {
- if(rs != null)
- {
- rs.close();
- }
- }
-
-
- cs = conn.prepareCall
- ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
-
- for(int i = 0; i < schemas.size(); i++)
- {
- cs.setString(1, schemas.get(i));
- cs.setString(2, tables.get(i));
- cs.setShort(3, (short) 0);
- cs.execute();
- }
- }
- catch (SQLException e)
- {
- closeConnection(conn);
- throw new StoreException("Error reducing on disk size", e);
- }
- finally
+ if (LOGGER.isDebugEnabled())
{
- closePreparedStatement(stmt);
- closePreparedStatement(cs);
+ LOGGER.debug("Deleting store " + _storeLocation);
}
- }
-
- private long getSizeOnDisk(Connection conn)
- {
- PreparedStatement stmt = null;
try
{
- String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
- " FROM " +
- " SYS.SYSTABLES systabs," +
- " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
- " WHERE systabs.tabletype = 'T'";
-
- stmt = conn.prepareStatement(sizeQuery);
-
- ResultSet rs = null;
- long size = 0l;
-
- try
- {
- rs = stmt.executeQuery();
- while(rs.next())
- {
- size = rs.getLong(1);
- }
- }
- finally
- {
- if(rs != null)
- {
- rs.close();
- }
- }
-
- return size;
-
+ DerbyUtils.deleteDatabaseLocation(_storeLocation);
}
- catch (SQLException e)
+ catch (StoreException se)
{
- closeConnection(conn);
- throw new StoreException("Error establishing on disk size", e);
+ LOGGER.debug("Failed to delete the store at location " + _storeLocation);
}
finally
{
- closePreparedStatement(stmt);
+ _storeLocation = null;
}
-
- }
-
-
- private long getPersistentSizeLowThreshold()
- {
- return _persistentSizeLowThreshold;
- }
-
- private long getPersistentSizeHighThreshold()
- {
- return _persistentSizeHighThreshold;
}
@Override
- public void onDelete()
+ protected Logger getLogger()
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Deleting store " + _storeLocation);
- }
-
- if (MEMORY_STORE_LOCATION.equals(_storeLocation))
- {
- return;
- }
-
- if (_storeLocation != null)
- {
- File location = new File(_storeLocation);
- if (location.exists())
- {
- if (!FileUtils.delete(location, true))
- {
- _logger.error("Cannot delete " + _storeLocation);
- }
- }
- }
+ return LOGGER;
}
- protected Connection getConnection() throws SQLException
- {
- return DriverManager.getConnection(_connectionURL);
- }
@Override
- public MessageStore getMessageStore()
+ public String getStoreLocation()
{
- return _messageStoreFacade;
+ return _storeLocation;
}
- private class MessageStoreWrapper implements MessageStore
- {
-
- @Override
- public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
- {
- DerbyMessageStore.this.openMessageStore(parent, messageStoreSettings);
- }
-
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
- {
- return DerbyMessageStore.this.addMessage(metaData);
- }
-
- @Override
- public boolean isPersistent()
- {
- return DerbyMessageStore.this.isPersistent();
- }
-
- @Override
- public Transaction newTransaction()
- {
- return DerbyMessageStore.this.newTransaction();
- }
-
- @Override
- public void closeMessageStore()
- {
- DerbyMessageStore.this.closeMessageStore();
- }
-
- @Override
- public void addEventListener(final EventListener eventListener, final Event... events)
- {
- DerbyMessageStore.this.addEventListener(eventListener, events);
- }
-
- @Override
- public void upgradeStoreStructure() throws StoreException
- {
- }
-
- @Override
- public String getStoreLocation()
- {
- return DerbyMessageStore.this.getStoreLocation();
- }
-
- @Override
- public void onDelete()
- {
- DerbyMessageStore.this.onDelete();
- }
-
- @Override
- public void visitMessages(final MessageHandler handler) throws StoreException
- {
- DerbyMessageStore.this.visitMessages(handler);
- }
-
- @Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
- {
- DerbyMessageStore.this.visitMessageInstances(handler);
- }
-
- @Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- DerbyMessageStore.this.visitDistributedTransactions(handler);
- }
- }
}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
new file mode 100644
index 0000000000..b0f4a137f2
--- /dev/null
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
@@ -0,0 +1,165 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.derby;
+
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.util.FileUtils;
+
+public class DerbyUtils
+{
+ public static final String MEMORY_STORE_LOCATION = ":memory:";
+ public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
+ private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+ private static final String TABLE_EXISTENCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+
+ public static void loadDerbyDriver()
+ {
+ try
+ {
+ Class<Driver> driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new StoreException("Failed to load driver " + SQL_DRIVER_NAME, e);
+ }
+ }
+
+ public static String createConnectionUrl(final String name, final String databasePath)
+ {
+ // Derby wont use an existing directory, so we append parent name
+ if (MEMORY_STORE_LOCATION.equals(databasePath))
+ {
+ return "jdbc:derby:" + MEMORY_STORE_LOCATION + "/" + name + ";create=true";
+ }
+ else
+ {
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
+ {
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path "
+ + environmentPath
+ + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+ return "jdbc:derby:" + databasePath + "/" + name + ";create=true";
+ }
+
+ }
+
+ public static void shutdownDatabase(String connectionURL) throws SQLException
+ {
+ try
+ {
+ Connection conn = DriverManager.getConnection(connectionURL + ";shutdown=true");
+ // Shouldn't reach this point - shutdown=true should throw SQLException
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ if (e.getSQLState().equalsIgnoreCase(DerbyUtils.DERBY_SINGLE_DB_SHUTDOWN_CODE))
+ {
+ //expected and represents a clean shutdown of this database only, do nothing.
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+ public static void deleteDatabaseLocation(String storeLocation)
+ {
+ if (MEMORY_STORE_LOCATION.equals(storeLocation))
+ {
+ return;
+ }
+
+ if (storeLocation != null)
+ {
+ File location = new File(storeLocation);
+ if (location.exists())
+ {
+ if (!FileUtils.delete(location, true))
+ {
+ throw new StoreException("Failed to delete the store at location : " + storeLocation);
+ }
+ }
+ }
+ }
+
+ public static String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ Blob blob = rs.getBlob(col);
+ if(blob == null)
+ {
+ return null;
+ }
+ byte[] bytes = blob.getBytes(1, (int) blob.length());
+ return new String(bytes, UTF8_CHARSET);
+ }
+
+ protected static byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+ }
+
+ public static boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTENCE_QUERY);
+ try
+ {
+ stmt.setString(1, tableName);
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ return rs.next();
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+
+}
+
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java
index 31c3f7c944..fc67d2fa50 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java
@@ -45,7 +45,7 @@ public class DerbyVirtualHost extends AbstractVirtualHost<DerbyVirtualHost>
@Override
protected MessageStore createMessageStore()
{
- return new DerbyMessageStore().getMessageStore();
+ return new DerbyMessageStore();
}
}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
index ffc19832fa..340d046033 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
@@ -28,7 +28,7 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.derby.DerbyMessageStore;
+import org.apache.qpid.server.store.derby.DerbyConfigurationStore;
import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.FileBasedVirtualHostNode;
@@ -49,7 +49,7 @@ public class DerbyVirtualHostNode extends AbstractStandardVirtualHostNode<DerbyV
@Override
protected DurableConfigurationStore createConfigurationStore()
{
- return new DerbyMessageStore();
+ return new DerbyConfigurationStore();
}
@Override
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
index aaf65e9ee0..08c421b606 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
@@ -26,9 +26,9 @@ public class DerbyMessageStoreConfigurationTest extends AbstractDurableConfigura
{
@Override
- protected DerbyMessageStore createConfigStore() throws Exception
+ protected DerbyConfigurationStore createConfigStore() throws Exception
{
- return new DerbyMessageStore();
+ return new DerbyConfigurationStore();
}
}
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
index 1d35b9ef83..ba7ae26292 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
@@ -50,7 +50,7 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes
@Override
protected MessageStore createStore() throws Exception
{
- return (new DerbyMessageStore()).getMessageStore();
+ return new DerbyMessageStore();
}
@Override
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 4594b7f223..9a2d945494 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -83,7 +83,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return (new DerbyMessageStore()).getMessageStore();
+ return new DerbyMessageStore();
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
new file mode 100644
index 0000000000..bd245fa4f4
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.Transaction;
+
+public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore
+{
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false);
+ private final List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<>();
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public final void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+
+ doOpen(parent, storeSettings);
+
+ createOrOpenMessageStoreDatabase();
+ setMaximumMessageId();
+ }
+ }
+
+ protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings)
+ throws StoreException;
+
+ @Override
+ public final void upgradeStoreStructure() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ upgrade(_parent);
+ }
+
+ @Override
+ public final void closeMessageStore()
+ {
+ if (_messageStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ while(!_transactions.isEmpty())
+ {
+ RecordedJDBCTransaction txn = _transactions.get(0);
+ txn.abortTran();
+ }
+ }
+ finally
+ {
+ doClose();
+ }
+
+ }
+ }
+
+ protected abstract void doClose();
+
+ protected boolean isMessageStoreOpen()
+ {
+ return _messageStoreOpen.get();
+ }
+
+ @Override
+ protected void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
+ @Override
+ protected void storedSizeChange(int contentSize)
+ {
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new RecordedJDBCTransaction();
+ }
+
+
+ private class RecordedJDBCTransaction extends JDBCTransaction
+ {
+ private RecordedJDBCTransaction()
+ {
+ super();
+ GenericAbstractJDBCMessageStore.this._transactions.add(this);
+ }
+
+ @Override
+ public void commitTran()
+ {
+ try
+ {
+ super.commitTran();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public StoreFuture commitTranAsync()
+ {
+ try
+ {
+ return super.commitTranAsync();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public void abortTran()
+ {
+ try
+ {
+ super.abortTran();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
new file mode 100644
index 0000000000..7bafe5a859
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.nio.charset.Charset;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
+import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.util.MapValueConverter;
+
+/**
+ * Implementation of a DurableConfigurationStore backed by Generic JDBC Database
+ * that also provides a MessageStore.
+ */
+public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStore implements MessageStoreProvider
+{
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ private static final Logger LOGGER = Logger.getLogger(GenericJDBCConfigurationStore.class);
+
+ public static final String CONNECTION_URL = "connectionUrl";
+ public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
+ public static final String JDBC_BIG_INT_TYPE = "bigIntType";
+ public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
+ public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
+ public static final String JDBC_BLOB_TYPE = "blobType";
+
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
+ private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
+
+ protected String _connectionURL;
+ private ConnectionProvider _connectionProvider;
+
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigIntType;
+ private boolean _useBytesMethodsForBlob;
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+ _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
+ Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
+
+ JDBCDetails details = null;
+
+ String[] components = _connectionURL.split(":", 3);
+ if(components.length >= 2)
+ {
+ String vendor = components[1];
+ details = JDBCDetails.getDetails(vendor);
+ }
+
+ if(details == null)
+ {
+ getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+
+ details = JDBCDetails.getDefaultDetails();
+ }
+
+ String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
+
+ JDBCConnectionProviderFactory connectionProviderFactory =
+ JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
+ if(connectionProviderFactory == null)
+ {
+ LOGGER.warn("Unknown connection pool type: "
+ + connectionPoolType
+ + ". no connection pooling will be used");
+ connectionProviderFactory = new DefaultConnectionProviderFactory();
+ }
+
+ try
+ {
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Failed to create connection provider for " + _connectionURL);
+ }
+ _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
+ _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
+ _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
+ _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE,
+ storeSettings,
+ details.getBigintType());
+
+ createOrOpenConfigurationStoreDatabase();
+ }
+ }
+
+ @Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ upgradeIfNecessary(_parent);
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return _connectionProvider.getConnection();
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ _connectionProvider.close();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to close connection provider ", e);
+ }
+ }
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return _blobType;
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return String.format(_varBinaryType, size);
+ }
+
+ @Override
+ public String getSqlBigIntType()
+ {
+ return _bigIntType;
+ }
+
+ @Override
+ protected String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ byte[] bytes;
+ if(_useBytesMethodsForBlob)
+ {
+ bytes = rs.getBytes(col);
+ return new String(bytes,UTF8_CHARSET);
+ }
+ else
+ {
+ Blob blob = rs.getBlob(col);
+ if(blob == null)
+ {
+ return null;
+ }
+ bytes = blob.getBytes(1, (int)blob.length());
+ }
+ return new String(bytes, UTF8_CHARSET);
+
+ }
+
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ if(_useBytesMethodsForBlob)
+ {
+ return rs.getBytes(col);
+ }
+ else
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+
+ }
+ }
+
+ @Override
+ protected void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStoreFacade;
+ }
+
+ private class MessageStoreWrapper extends GenericAbstractJDBCMessageStore
+ {
+ @Override
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return GenericJDBCConfigurationStore.this.getConnection();
+ }
+
+ @Override
+ protected void doClose()
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return GenericJDBCConfigurationStore.this._connectionURL;
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return GenericJDBCConfigurationStore.this.getLogger();
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return GenericJDBCConfigurationStore.this.getSqlBlobType();
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return GenericJDBCConfigurationStore.this.getSqlVarBinaryType(size);
+ }
+
+ @Override
+ protected String getSqlBigIntType()
+ {
+ return GenericJDBCConfigurationStore.this.getSqlBigIntType();
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(final ResultSet rs, final int col) throws SQLException
+ {
+ return GenericJDBCConfigurationStore.this.getBlobAsBytes(rs, col);
+ }
+ }
+
+
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
new file mode 100644
index 0000000000..dad4432183
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
@@ -0,0 +1,177 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.MapValueConverter;
+
+/**
+ * Implementation of a MessageStore backed by a Generic JDBC Database.
+ */
+public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore
+{
+
+ private static final Logger _logger = Logger.getLogger(GenericJDBCMessageStore.class);
+
+ public static final String TYPE = "JDBC";
+ public static final String CONNECTION_URL = "connectionUrl";
+ public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
+ public static final String JDBC_BIG_INT_TYPE = "bigIntType";
+ public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
+ public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
+ public static final String JDBC_BLOB_TYPE = "blobType";
+
+ protected String _connectionURL;
+ private ConnectionProvider _connectionProvider;
+
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigIntType;
+ private boolean _useBytesMethodsForBlob;
+
+
+ @Override
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) throws StoreException
+ {
+ _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
+
+ org.apache.qpid.server.store.jdbc.JDBCDetails details = null;
+
+ String[] components = _connectionURL.split(":", 3);
+ if(components.length >= 2)
+ {
+ String vendor = components[1];
+ details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDetails(vendor);
+ }
+
+ if(details == null)
+ {
+ getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+
+ details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDefaultDetails();
+ }
+
+
+ _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
+ _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
+ _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
+ _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE,
+ storeSettings,
+ details.getBigintType());
+
+ Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
+ String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
+
+ JDBCConnectionProviderFactory connectionProviderFactory =
+ JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
+ if(connectionProviderFactory == null)
+ {
+ _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used");
+ connectionProviderFactory = new DefaultConnectionProviderFactory();
+ }
+
+ try
+ {
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Failed to create connection provider for " + _connectionURL);
+ }
+
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return _connectionProvider.getConnection();
+ }
+
+ protected void doClose()
+ {
+ try
+ {
+ _connectionProvider.close();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to close connection provider ", e);
+ }
+ }
+
+
+ @Override
+ protected Logger getLogger()
+ {
+ return _logger;
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return _blobType;
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return String.format(_varBinaryType, size);
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ if(_useBytesMethodsForBlob)
+ {
+ return rs.getBytes(col);
+ }
+ else
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+
+ }
+ }
+
+ @Override
+ public String getSqlBigIntType()
+ {
+ return _bigIntType;
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _connectionURL;
+ }
+
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
new file mode 100644
index 0000000000..6cf1413b83
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JDBCDetails
+{
+
+ private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<>();
+
+ private static JDBCDetails DERBY_DETAILS =
+ new JDBCDetails("derby",
+ "blob",
+ "varchar(%d) for bit data",
+ "bigint",
+ false);
+
+ private static JDBCDetails POSTGRESQL_DETAILS =
+ new JDBCDetails("postgresql",
+ "bytea",
+ "bytea",
+ "bigint",
+ true);
+
+ private static JDBCDetails MYSQL_DETAILS =
+ new JDBCDetails("mysql",
+ "blob",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails SYBASE_DETAILS =
+ new JDBCDetails("sybase",
+ "image",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails ORACLE_DETAILS =
+ new JDBCDetails("oracle",
+ "blob",
+ "raw(%d)",
+ "number",
+ false);
+
+
+ static
+ {
+
+ addDetails(DERBY_DETAILS);
+ addDetails(POSTGRESQL_DETAILS);
+ addDetails(MYSQL_DETAILS);
+ addDetails(SYBASE_DETAILS);
+ addDetails(ORACLE_DETAILS);
+ }
+
+ public static JDBCDetails getDetails(String vendor)
+ {
+ return VENDOR_DETAILS.get(vendor);
+ }
+
+ public static JDBCDetails getDefaultDetails()
+ {
+ return DERBY_DETAILS;
+ }
+
+ private static void addDetails(JDBCDetails details)
+ {
+ VENDOR_DETAILS.put(details.getVendor(), details);
+ }
+
+ private final String _vendor;
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigintType;
+ private boolean _useBytesMethodsForBlob;
+
+ JDBCDetails(String vendor,
+ String blobType,
+ String varBinaryType,
+ String bigIntType,
+ boolean useBytesMethodsForBlob)
+ {
+ _vendor = vendor;
+ setBlobType(blobType);
+ setVarBinaryType(varBinaryType);
+ setBigintType(bigIntType);
+ setUseBytesMethodsForBlob(useBytesMethodsForBlob);
+ }
+
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ JDBCDetails that = (JDBCDetails) o;
+
+ if (!getVendor().equals(that.getVendor()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return getVendor().hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JDBCDetails{" +
+ "vendor='" + getVendor() + '\'' +
+ ", blobType='" + getBlobType() + '\'' +
+ ", varBinaryType='" + getVarBinaryType() + '\'' +
+ ", bigIntType='" + getBigintType() + '\'' +
+ ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() +
+ '}';
+ }
+
+ public String getVendor()
+ {
+ return _vendor;
+ }
+
+ public String getBlobType()
+ {
+ return _blobType;
+ }
+
+ public void setBlobType(String blobType)
+ {
+ _blobType = blobType;
+ }
+
+ public String getVarBinaryType()
+ {
+ return _varBinaryType;
+ }
+
+ public void setVarBinaryType(String varBinaryType)
+ {
+ _varBinaryType = varBinaryType;
+ }
+
+ public boolean isUseBytesMethodsForBlob()
+ {
+ return _useBytesMethodsForBlob;
+ }
+
+ public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob)
+ {
+ _useBytesMethodsForBlob = useBytesMethodsForBlob;
+ }
+
+ public String getBigintType()
+ {
+ return _bigintType;
+ }
+
+ public void setBigintType(String bigintType)
+ {
+ _bigintType = bigintType;
+ }
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
deleted file mode 100644
index 61ecb6748c..0000000000
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.store.jdbc;
-
-
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
-import org.apache.qpid.server.store.AbstractJDBCMessageStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreProvider;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.server.util.MapValueConverter;
-
-/**
- * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence
- * mechanism.
- *
- */
-public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider
-{
-
- private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class);
-
- public static final String TYPE = "JDBC";
- public static final String CONNECTION_URL = "connectionUrl";
- public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
- public static final String JDBC_BIG_INT_TYPE = "bigIntType";
- public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
- public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
- public static final String JDBC_BLOB_TYPE = "blobType";
-
- protected String _connectionURL;
- private ConnectionProvider _connectionProvider;
-
- private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
-
- private static class JDBCDetails
- {
- private final String _vendor;
- private String _blobType;
- private String _varBinaryType;
- private String _bigintType;
- private boolean _useBytesMethodsForBlob;
-
- private JDBCDetails(String vendor,
- String blobType,
- String varBinaryType,
- String bigIntType,
- boolean useBytesMethodsForBlob)
- {
- _vendor = vendor;
- setBlobType(blobType);
- setVarBinaryType(varBinaryType);
- setBigintType(bigIntType);
- setUseBytesMethodsForBlob(useBytesMethodsForBlob);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- JDBCDetails that = (JDBCDetails) o;
-
- if (!getVendor().equals(that.getVendor()))
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- return getVendor().hashCode();
- }
-
- @Override
- public String toString()
- {
- return "JDBCDetails{" +
- "vendor='" + getVendor() + '\'' +
- ", blobType='" + getBlobType() + '\'' +
- ", varBinaryType='" + getVarBinaryType() + '\'' +
- ", bigIntType='" + getBigintType() + '\'' +
- ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() +
- '}';
- }
-
- public String getVendor()
- {
- return _vendor;
- }
-
- public String getBlobType()
- {
- return _blobType;
- }
-
- public void setBlobType(String blobType)
- {
- _blobType = blobType;
- }
-
- public String getVarBinaryType()
- {
- return _varBinaryType;
- }
-
- public void setVarBinaryType(String varBinaryType)
- {
- _varBinaryType = varBinaryType;
- }
-
- public boolean isUseBytesMethodsForBlob()
- {
- return _useBytesMethodsForBlob;
- }
-
- public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob)
- {
- _useBytesMethodsForBlob = useBytesMethodsForBlob;
- }
-
- public String getBigintType()
- {
- return _bigintType;
- }
-
- public void setBigintType(String bigintType)
- {
- _bigintType = bigintType;
- }
- }
-
- private static JDBCDetails DERBY_DETAILS =
- new JDBCDetails("derby",
- "blob",
- "varchar(%d) for bit data",
- "bigint",
- false);
-
- private static JDBCDetails POSTGRESQL_DETAILS =
- new JDBCDetails("postgresql",
- "bytea",
- "bytea",
- "bigint",
- true);
-
- private static JDBCDetails MYSQL_DETAILS =
- new JDBCDetails("mysql",
- "blob",
- "varbinary(%d)",
- "bigint",
- false);
-
-
- private static JDBCDetails SYBASE_DETAILS =
- new JDBCDetails("sybase",
- "image",
- "varbinary(%d)",
- "bigint",
- false);
-
-
- private static JDBCDetails ORACLE_DETAILS =
- new JDBCDetails("oracle",
- "blob",
- "raw(%d)",
- "number",
- false);
-
-
- private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>();
-
- static
- {
-
- addDetails(DERBY_DETAILS);
- addDetails(POSTGRESQL_DETAILS);
- addDetails(MYSQL_DETAILS);
- addDetails(SYBASE_DETAILS);
- addDetails(ORACLE_DETAILS);
- }
-
- private static void addDetails(JDBCDetails details)
- {
- VENDOR_DETAILS.put(details.getVendor(), details);
- }
-
- private String _blobType;
- private String _varBinaryType;
- private String _bigIntType;
- private boolean _useBytesMethodsForBlob;
-
- private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>();
-
-
- public JDBCMessageStore()
- {
- }
-
- protected Logger getLogger()
- {
- return _logger;
- }
-
- protected String getSqlBlobType()
- {
- return _blobType;
- }
-
- protected String getSqlVarBinaryType(int size)
- {
- return String.format(_varBinaryType, size);
- }
-
- public String getSqlBigIntType()
- {
- return _bigIntType;
- }
-
- @Override
- protected void doClose()
- {
- try
- {
- while(!_transactions.isEmpty())
- {
- RecordedJDBCTransaction txn = _transactions.get(0);
- txn.abortTran();
- }
- }
- finally
- {
- try
- {
- _connectionProvider.close();
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to close connection provider ", e);
- }
- }
- }
-
-
- protected Connection getConnection() throws SQLException
- {
- return _connectionProvider.getConnection();
- }
-
-
- protected void implementationSpecificConfiguration(String name, Map<String, Object> storeSettings)
- throws ClassNotFoundException, SQLException
- {
- _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
- Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
-
- JDBCDetails details = null;
-
- String[] components = _connectionURL.split(":",3);
- if(components.length >= 2)
- {
- String vendor = components[1];
- details = VENDOR_DETAILS.get(vendor);
- }
-
- if(details == null)
- {
- getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
-
- // TODO - is there a better default than derby
- details = DERBY_DETAILS;
- }
-
- String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
-
- JDBCConnectionProviderFactory connectionProviderFactory =
- JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
- if(connectionProviderFactory == null)
- {
- _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used");
- connectionProviderFactory = new DefaultConnectionProviderFactory();
- }
-
- _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
- _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
- _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
- _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
- _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, storeSettings, details.getBigintType());
- }
-
- @Override
- protected void storedSizeChange(int contentSize)
- {
- }
-
- public String getStoreLocation()
- {
- return _connectionURL;
- }
-
- @Override
- protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
- {
- if(_useBytesMethodsForBlob)
- {
- return rs.getBytes(col);
- }
- else
- {
- Blob dataAsBlob = rs.getBlob(col);
- return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-
- }
- }
-
- @Override
- protected String getBlobAsString(ResultSet rs, int col) throws SQLException
- {
- byte[] bytes;
- if(_useBytesMethodsForBlob)
- {
- bytes = rs.getBytes(col);
- return new String(bytes,UTF8_CHARSET);
- }
- else
- {
- Blob blob = rs.getBlob(col);
- if(blob == null)
- {
- return null;
- }
- bytes = blob.getBytes(1, (int)blob.length());
- }
- return new String(bytes, UTF8_CHARSET);
-
- }
-
- @Override
- public Transaction newTransaction()
- {
- return new RecordedJDBCTransaction();
- }
-
- private class RecordedJDBCTransaction extends JDBCTransaction
- {
- private RecordedJDBCTransaction()
- {
- super();
- JDBCMessageStore.this._transactions.add(this);
- }
-
- @Override
- public void commitTran()
- {
- try
- {
- super.commitTran();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
-
- @Override
- public StoreFuture commitTranAsync()
- {
- try
- {
- return super.commitTranAsync();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
-
- @Override
- public void abortTran()
- {
- try
- {
- super.abortTran();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
- }
-
- @Override
- public MessageStore getMessageStore()
- {
- return _messageStoreFacade;
- }
-
- private class MessageStoreWrapper implements MessageStore
- {
-
- @Override
- public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
- {
- JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings);
- }
-
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
- {
- return JDBCMessageStore.this.addMessage(metaData);
- }
-
- @Override
- public boolean isPersistent()
- {
- return JDBCMessageStore.this.isPersistent();
- }
-
- @Override
- public Transaction newTransaction()
- {
- return JDBCMessageStore.this.newTransaction();
- }
-
- @Override
- public void closeMessageStore()
- {
- JDBCMessageStore.this.closeMessageStore();
- }
-
- @Override
- public void addEventListener(final EventListener eventListener, final Event... events)
- {
- JDBCMessageStore.this.addEventListener(eventListener, events);
- }
-
- @Override
- public void upgradeStoreStructure() throws StoreException
- {
- }
-
- @Override
- public String getStoreLocation()
- {
- return JDBCMessageStore.this.getStoreLocation();
- }
-
- @Override
- public void onDelete()
- {
- JDBCMessageStore.this.onDelete();
- }
-
- @Override
- public void visitMessages(final MessageHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitMessages(handler);
- }
-
- @Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitMessageInstances(handler);
- }
-
- @Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitDistributedTransactions(handler);
- }
- }
-
-}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
index 1dd39a8696..85e8f89dbe 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
@@ -26,7 +26,7 @@ import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.jdbc.JDBCMessageStore;
+import org.apache.qpid.server.store.jdbc.GenericJDBCMessageStore;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@ManagedObject(category = false, type = JDBCVirtualHost.VIRTUAL_HOST_TYPE)
@@ -45,6 +45,6 @@ public class JDBCVirtualHost extends AbstractVirtualHost<JDBCVirtualHost>
@Override
protected MessageStore createMessageStore()
{
- return new JDBCMessageStore().getMessageStore();
+ return new GenericJDBCMessageStore();
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
index 2108b4e09a..ab8f4554cb 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.jdbc.JDBCMessageStore;
+import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore;
import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
@ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category = false )
@@ -62,7 +62,7 @@ public class JDBCVirtualHostNodeImpl extends AbstractStandardVirtualHostNode<JDB
@Override
protected DurableConfigurationStore createConfigurationStore()
{
- return new JDBCMessageStore();
+ return new GenericJDBCConfigurationStore();
}
@Override
diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 1f03b7f75f..8261e93347 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -52,7 +52,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
public void testOnDelete() throws Exception
{
- Set<String> expectedTables = JDBCMessageStore.MESSAGE_STORE_TABLE_NAMES;
+ Set<String> expectedTables = GenericJDBCMessageStore.MESSAGE_STORE_TABLE_NAMES;
assertTablesExist(expectedTables, true);
getStore().closeMessageStore();
assertTablesExist(expectedTables, true);
@@ -65,7 +65,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
{
_connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true";
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
- messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL);
+ messageStoreSettings.put(GenericJDBCMessageStore.CONNECTION_URL, _connectionURL);
return messageStoreSettings;
}
@@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return (new JDBCMessageStore()).getMessageStore();
+ return new GenericJDBCMessageStore();
}
private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException