summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/jdbc-store/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-18 20:51:43 +0000
committerKeith Wall <kwall@apache.org>2014-06-18 20:51:43 +0000
commit326b9560c14d1c30eb71c1396858791f9187d11e (patch)
tree34ba78548d48295e88b9e038f382bd8861f32500 /qpid/java/broker-plugins/jdbc-store/src
parent2622dda9c7d3267efd985b5ae5928b99063d2fa7 (diff)
downloadqpid-python-326b9560c14d1c30eb71c1396858791f9187d11e.tar.gz
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with another persistent store implementation. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603626 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/jdbc-store/src')
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java160
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java286
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java177
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java197
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java522
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java4
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java4
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java6
8 files changed, 827 insertions, 529 deletions
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
new file mode 100644
index 0000000000..bd245fa4f4
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.Transaction;
+
+public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore
+{
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false);
+ private final List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<>();
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public final void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+
+ doOpen(parent, storeSettings);
+
+ createOrOpenMessageStoreDatabase();
+ setMaximumMessageId();
+ }
+ }
+
+ protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings)
+ throws StoreException;
+
+ @Override
+ public final void upgradeStoreStructure() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ upgrade(_parent);
+ }
+
+ @Override
+ public final void closeMessageStore()
+ {
+ if (_messageStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ while(!_transactions.isEmpty())
+ {
+ RecordedJDBCTransaction txn = _transactions.get(0);
+ txn.abortTran();
+ }
+ }
+ finally
+ {
+ doClose();
+ }
+
+ }
+ }
+
+ protected abstract void doClose();
+
+ protected boolean isMessageStoreOpen()
+ {
+ return _messageStoreOpen.get();
+ }
+
+ @Override
+ protected void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
+ @Override
+ protected void storedSizeChange(int contentSize)
+ {
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new RecordedJDBCTransaction();
+ }
+
+
+ private class RecordedJDBCTransaction extends JDBCTransaction
+ {
+ private RecordedJDBCTransaction()
+ {
+ super();
+ GenericAbstractJDBCMessageStore.this._transactions.add(this);
+ }
+
+ @Override
+ public void commitTran()
+ {
+ try
+ {
+ super.commitTran();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public StoreFuture commitTranAsync()
+ {
+ try
+ {
+ return super.commitTranAsync();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public void abortTran()
+ {
+ try
+ {
+ super.abortTran();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
new file mode 100644
index 0000000000..7bafe5a859
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.nio.charset.Charset;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
+import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.util.MapValueConverter;
+
+/**
+ * Implementation of a DurableConfigurationStore backed by Generic JDBC Database
+ * that also provides a MessageStore.
+ */
+public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStore implements MessageStoreProvider
+{
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ private static final Logger LOGGER = Logger.getLogger(GenericJDBCConfigurationStore.class);
+
+ public static final String CONNECTION_URL = "connectionUrl";
+ public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
+ public static final String JDBC_BIG_INT_TYPE = "bigIntType";
+ public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
+ public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
+ public static final String JDBC_BLOB_TYPE = "blobType";
+
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
+ private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
+
+ protected String _connectionURL;
+ private ConnectionProvider _connectionProvider;
+
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigIntType;
+ private boolean _useBytesMethodsForBlob;
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+ _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
+ Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
+
+ JDBCDetails details = null;
+
+ String[] components = _connectionURL.split(":", 3);
+ if(components.length >= 2)
+ {
+ String vendor = components[1];
+ details = JDBCDetails.getDetails(vendor);
+ }
+
+ if(details == null)
+ {
+ getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+
+ details = JDBCDetails.getDefaultDetails();
+ }
+
+ String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
+
+ JDBCConnectionProviderFactory connectionProviderFactory =
+ JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
+ if(connectionProviderFactory == null)
+ {
+ LOGGER.warn("Unknown connection pool type: "
+ + connectionPoolType
+ + ". no connection pooling will be used");
+ connectionProviderFactory = new DefaultConnectionProviderFactory();
+ }
+
+ try
+ {
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Failed to create connection provider for " + _connectionURL);
+ }
+ _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
+ _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
+ _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
+ _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE,
+ storeSettings,
+ details.getBigintType());
+
+ createOrOpenConfigurationStoreDatabase();
+ }
+ }
+
+ @Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ upgradeIfNecessary(_parent);
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return _connectionProvider.getConnection();
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ _connectionProvider.close();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to close connection provider ", e);
+ }
+ }
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return _blobType;
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return String.format(_varBinaryType, size);
+ }
+
+ @Override
+ public String getSqlBigIntType()
+ {
+ return _bigIntType;
+ }
+
+ @Override
+ protected String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ byte[] bytes;
+ if(_useBytesMethodsForBlob)
+ {
+ bytes = rs.getBytes(col);
+ return new String(bytes,UTF8_CHARSET);
+ }
+ else
+ {
+ Blob blob = rs.getBlob(col);
+ if(blob == null)
+ {
+ return null;
+ }
+ bytes = blob.getBytes(1, (int)blob.length());
+ }
+ return new String(bytes, UTF8_CHARSET);
+
+ }
+
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ if(_useBytesMethodsForBlob)
+ {
+ return rs.getBytes(col);
+ }
+ else
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+
+ }
+ }
+
+ @Override
+ protected void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStoreFacade;
+ }
+
+ private class MessageStoreWrapper extends GenericAbstractJDBCMessageStore
+ {
+ @Override
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return GenericJDBCConfigurationStore.this.getConnection();
+ }
+
+ @Override
+ protected void doClose()
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return GenericJDBCConfigurationStore.this._connectionURL;
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return GenericJDBCConfigurationStore.this.getLogger();
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return GenericJDBCConfigurationStore.this.getSqlBlobType();
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return GenericJDBCConfigurationStore.this.getSqlVarBinaryType(size);
+ }
+
+ @Override
+ protected String getSqlBigIntType()
+ {
+ return GenericJDBCConfigurationStore.this.getSqlBigIntType();
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(final ResultSet rs, final int col) throws SQLException
+ {
+ return GenericJDBCConfigurationStore.this.getBlobAsBytes(rs, col);
+ }
+ }
+
+
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
new file mode 100644
index 0000000000..dad4432183
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
@@ -0,0 +1,177 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.MapValueConverter;
+
+/**
+ * Implementation of a MessageStore backed by a Generic JDBC Database.
+ */
+public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore
+{
+
+ private static final Logger _logger = Logger.getLogger(GenericJDBCMessageStore.class);
+
+ public static final String TYPE = "JDBC";
+ public static final String CONNECTION_URL = "connectionUrl";
+ public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
+ public static final String JDBC_BIG_INT_TYPE = "bigIntType";
+ public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
+ public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
+ public static final String JDBC_BLOB_TYPE = "blobType";
+
+ protected String _connectionURL;
+ private ConnectionProvider _connectionProvider;
+
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigIntType;
+ private boolean _useBytesMethodsForBlob;
+
+
+ @Override
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) throws StoreException
+ {
+ _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
+
+ org.apache.qpid.server.store.jdbc.JDBCDetails details = null;
+
+ String[] components = _connectionURL.split(":", 3);
+ if(components.length >= 2)
+ {
+ String vendor = components[1];
+ details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDetails(vendor);
+ }
+
+ if(details == null)
+ {
+ getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+
+ details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDefaultDetails();
+ }
+
+
+ _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
+ _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
+ _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
+ _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE,
+ storeSettings,
+ details.getBigintType());
+
+ Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
+ String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
+
+ JDBCConnectionProviderFactory connectionProviderFactory =
+ JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
+ if(connectionProviderFactory == null)
+ {
+ _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used");
+ connectionProviderFactory = new DefaultConnectionProviderFactory();
+ }
+
+ try
+ {
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Failed to create connection provider for " + _connectionURL);
+ }
+
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return _connectionProvider.getConnection();
+ }
+
+ protected void doClose()
+ {
+ try
+ {
+ _connectionProvider.close();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to close connection provider ", e);
+ }
+ }
+
+
+ @Override
+ protected Logger getLogger()
+ {
+ return _logger;
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return _blobType;
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return String.format(_varBinaryType, size);
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ if(_useBytesMethodsForBlob)
+ {
+ return rs.getBytes(col);
+ }
+ else
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+
+ }
+ }
+
+ @Override
+ public String getSqlBigIntType()
+ {
+ return _bigIntType;
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _connectionURL;
+ }
+
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
new file mode 100644
index 0000000000..6cf1413b83
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JDBCDetails
+{
+
+ private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<>();
+
+ private static JDBCDetails DERBY_DETAILS =
+ new JDBCDetails("derby",
+ "blob",
+ "varchar(%d) for bit data",
+ "bigint",
+ false);
+
+ private static JDBCDetails POSTGRESQL_DETAILS =
+ new JDBCDetails("postgresql",
+ "bytea",
+ "bytea",
+ "bigint",
+ true);
+
+ private static JDBCDetails MYSQL_DETAILS =
+ new JDBCDetails("mysql",
+ "blob",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails SYBASE_DETAILS =
+ new JDBCDetails("sybase",
+ "image",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails ORACLE_DETAILS =
+ new JDBCDetails("oracle",
+ "blob",
+ "raw(%d)",
+ "number",
+ false);
+
+
+ static
+ {
+
+ addDetails(DERBY_DETAILS);
+ addDetails(POSTGRESQL_DETAILS);
+ addDetails(MYSQL_DETAILS);
+ addDetails(SYBASE_DETAILS);
+ addDetails(ORACLE_DETAILS);
+ }
+
+ public static JDBCDetails getDetails(String vendor)
+ {
+ return VENDOR_DETAILS.get(vendor);
+ }
+
+ public static JDBCDetails getDefaultDetails()
+ {
+ return DERBY_DETAILS;
+ }
+
+ private static void addDetails(JDBCDetails details)
+ {
+ VENDOR_DETAILS.put(details.getVendor(), details);
+ }
+
+ private final String _vendor;
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigintType;
+ private boolean _useBytesMethodsForBlob;
+
+ JDBCDetails(String vendor,
+ String blobType,
+ String varBinaryType,
+ String bigIntType,
+ boolean useBytesMethodsForBlob)
+ {
+ _vendor = vendor;
+ setBlobType(blobType);
+ setVarBinaryType(varBinaryType);
+ setBigintType(bigIntType);
+ setUseBytesMethodsForBlob(useBytesMethodsForBlob);
+ }
+
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ JDBCDetails that = (JDBCDetails) o;
+
+ if (!getVendor().equals(that.getVendor()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return getVendor().hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JDBCDetails{" +
+ "vendor='" + getVendor() + '\'' +
+ ", blobType='" + getBlobType() + '\'' +
+ ", varBinaryType='" + getVarBinaryType() + '\'' +
+ ", bigIntType='" + getBigintType() + '\'' +
+ ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() +
+ '}';
+ }
+
+ public String getVendor()
+ {
+ return _vendor;
+ }
+
+ public String getBlobType()
+ {
+ return _blobType;
+ }
+
+ public void setBlobType(String blobType)
+ {
+ _blobType = blobType;
+ }
+
+ public String getVarBinaryType()
+ {
+ return _varBinaryType;
+ }
+
+ public void setVarBinaryType(String varBinaryType)
+ {
+ _varBinaryType = varBinaryType;
+ }
+
+ public boolean isUseBytesMethodsForBlob()
+ {
+ return _useBytesMethodsForBlob;
+ }
+
+ public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob)
+ {
+ _useBytesMethodsForBlob = useBytesMethodsForBlob;
+ }
+
+ public String getBigintType()
+ {
+ return _bigintType;
+ }
+
+ public void setBigintType(String bigintType)
+ {
+ _bigintType = bigintType;
+ }
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
deleted file mode 100644
index 61ecb6748c..0000000000
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.store.jdbc;
-
-
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
-import org.apache.qpid.server.store.AbstractJDBCMessageStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreProvider;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.server.util.MapValueConverter;
-
-/**
- * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence
- * mechanism.
- *
- */
-public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider
-{
-
- private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class);
-
- public static final String TYPE = "JDBC";
- public static final String CONNECTION_URL = "connectionUrl";
- public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
- public static final String JDBC_BIG_INT_TYPE = "bigIntType";
- public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
- public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
- public static final String JDBC_BLOB_TYPE = "blobType";
-
- protected String _connectionURL;
- private ConnectionProvider _connectionProvider;
-
- private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
-
- private static class JDBCDetails
- {
- private final String _vendor;
- private String _blobType;
- private String _varBinaryType;
- private String _bigintType;
- private boolean _useBytesMethodsForBlob;
-
- private JDBCDetails(String vendor,
- String blobType,
- String varBinaryType,
- String bigIntType,
- boolean useBytesMethodsForBlob)
- {
- _vendor = vendor;
- setBlobType(blobType);
- setVarBinaryType(varBinaryType);
- setBigintType(bigIntType);
- setUseBytesMethodsForBlob(useBytesMethodsForBlob);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- JDBCDetails that = (JDBCDetails) o;
-
- if (!getVendor().equals(that.getVendor()))
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- return getVendor().hashCode();
- }
-
- @Override
- public String toString()
- {
- return "JDBCDetails{" +
- "vendor='" + getVendor() + '\'' +
- ", blobType='" + getBlobType() + '\'' +
- ", varBinaryType='" + getVarBinaryType() + '\'' +
- ", bigIntType='" + getBigintType() + '\'' +
- ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() +
- '}';
- }
-
- public String getVendor()
- {
- return _vendor;
- }
-
- public String getBlobType()
- {
- return _blobType;
- }
-
- public void setBlobType(String blobType)
- {
- _blobType = blobType;
- }
-
- public String getVarBinaryType()
- {
- return _varBinaryType;
- }
-
- public void setVarBinaryType(String varBinaryType)
- {
- _varBinaryType = varBinaryType;
- }
-
- public boolean isUseBytesMethodsForBlob()
- {
- return _useBytesMethodsForBlob;
- }
-
- public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob)
- {
- _useBytesMethodsForBlob = useBytesMethodsForBlob;
- }
-
- public String getBigintType()
- {
- return _bigintType;
- }
-
- public void setBigintType(String bigintType)
- {
- _bigintType = bigintType;
- }
- }
-
- private static JDBCDetails DERBY_DETAILS =
- new JDBCDetails("derby",
- "blob",
- "varchar(%d) for bit data",
- "bigint",
- false);
-
- private static JDBCDetails POSTGRESQL_DETAILS =
- new JDBCDetails("postgresql",
- "bytea",
- "bytea",
- "bigint",
- true);
-
- private static JDBCDetails MYSQL_DETAILS =
- new JDBCDetails("mysql",
- "blob",
- "varbinary(%d)",
- "bigint",
- false);
-
-
- private static JDBCDetails SYBASE_DETAILS =
- new JDBCDetails("sybase",
- "image",
- "varbinary(%d)",
- "bigint",
- false);
-
-
- private static JDBCDetails ORACLE_DETAILS =
- new JDBCDetails("oracle",
- "blob",
- "raw(%d)",
- "number",
- false);
-
-
- private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>();
-
- static
- {
-
- addDetails(DERBY_DETAILS);
- addDetails(POSTGRESQL_DETAILS);
- addDetails(MYSQL_DETAILS);
- addDetails(SYBASE_DETAILS);
- addDetails(ORACLE_DETAILS);
- }
-
- private static void addDetails(JDBCDetails details)
- {
- VENDOR_DETAILS.put(details.getVendor(), details);
- }
-
- private String _blobType;
- private String _varBinaryType;
- private String _bigIntType;
- private boolean _useBytesMethodsForBlob;
-
- private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>();
-
-
- public JDBCMessageStore()
- {
- }
-
- protected Logger getLogger()
- {
- return _logger;
- }
-
- protected String getSqlBlobType()
- {
- return _blobType;
- }
-
- protected String getSqlVarBinaryType(int size)
- {
- return String.format(_varBinaryType, size);
- }
-
- public String getSqlBigIntType()
- {
- return _bigIntType;
- }
-
- @Override
- protected void doClose()
- {
- try
- {
- while(!_transactions.isEmpty())
- {
- RecordedJDBCTransaction txn = _transactions.get(0);
- txn.abortTran();
- }
- }
- finally
- {
- try
- {
- _connectionProvider.close();
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to close connection provider ", e);
- }
- }
- }
-
-
- protected Connection getConnection() throws SQLException
- {
- return _connectionProvider.getConnection();
- }
-
-
- protected void implementationSpecificConfiguration(String name, Map<String, Object> storeSettings)
- throws ClassNotFoundException, SQLException
- {
- _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
- Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
-
- JDBCDetails details = null;
-
- String[] components = _connectionURL.split(":",3);
- if(components.length >= 2)
- {
- String vendor = components[1];
- details = VENDOR_DETAILS.get(vendor);
- }
-
- if(details == null)
- {
- getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
-
- // TODO - is there a better default than derby
- details = DERBY_DETAILS;
- }
-
- String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
-
- JDBCConnectionProviderFactory connectionProviderFactory =
- JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
- if(connectionProviderFactory == null)
- {
- _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used");
- connectionProviderFactory = new DefaultConnectionProviderFactory();
- }
-
- _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
- _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
- _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
- _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
- _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, storeSettings, details.getBigintType());
- }
-
- @Override
- protected void storedSizeChange(int contentSize)
- {
- }
-
- public String getStoreLocation()
- {
- return _connectionURL;
- }
-
- @Override
- protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
- {
- if(_useBytesMethodsForBlob)
- {
- return rs.getBytes(col);
- }
- else
- {
- Blob dataAsBlob = rs.getBlob(col);
- return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-
- }
- }
-
- @Override
- protected String getBlobAsString(ResultSet rs, int col) throws SQLException
- {
- byte[] bytes;
- if(_useBytesMethodsForBlob)
- {
- bytes = rs.getBytes(col);
- return new String(bytes,UTF8_CHARSET);
- }
- else
- {
- Blob blob = rs.getBlob(col);
- if(blob == null)
- {
- return null;
- }
- bytes = blob.getBytes(1, (int)blob.length());
- }
- return new String(bytes, UTF8_CHARSET);
-
- }
-
- @Override
- public Transaction newTransaction()
- {
- return new RecordedJDBCTransaction();
- }
-
- private class RecordedJDBCTransaction extends JDBCTransaction
- {
- private RecordedJDBCTransaction()
- {
- super();
- JDBCMessageStore.this._transactions.add(this);
- }
-
- @Override
- public void commitTran()
- {
- try
- {
- super.commitTran();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
-
- @Override
- public StoreFuture commitTranAsync()
- {
- try
- {
- return super.commitTranAsync();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
-
- @Override
- public void abortTran()
- {
- try
- {
- super.abortTran();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
- }
-
- @Override
- public MessageStore getMessageStore()
- {
- return _messageStoreFacade;
- }
-
- private class MessageStoreWrapper implements MessageStore
- {
-
- @Override
- public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
- {
- JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings);
- }
-
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
- {
- return JDBCMessageStore.this.addMessage(metaData);
- }
-
- @Override
- public boolean isPersistent()
- {
- return JDBCMessageStore.this.isPersistent();
- }
-
- @Override
- public Transaction newTransaction()
- {
- return JDBCMessageStore.this.newTransaction();
- }
-
- @Override
- public void closeMessageStore()
- {
- JDBCMessageStore.this.closeMessageStore();
- }
-
- @Override
- public void addEventListener(final EventListener eventListener, final Event... events)
- {
- JDBCMessageStore.this.addEventListener(eventListener, events);
- }
-
- @Override
- public void upgradeStoreStructure() throws StoreException
- {
- }
-
- @Override
- public String getStoreLocation()
- {
- return JDBCMessageStore.this.getStoreLocation();
- }
-
- @Override
- public void onDelete()
- {
- JDBCMessageStore.this.onDelete();
- }
-
- @Override
- public void visitMessages(final MessageHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitMessages(handler);
- }
-
- @Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitMessageInstances(handler);
- }
-
- @Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitDistributedTransactions(handler);
- }
- }
-
-}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
index 1dd39a8696..85e8f89dbe 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
@@ -26,7 +26,7 @@ import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.jdbc.JDBCMessageStore;
+import org.apache.qpid.server.store.jdbc.GenericJDBCMessageStore;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@ManagedObject(category = false, type = JDBCVirtualHost.VIRTUAL_HOST_TYPE)
@@ -45,6 +45,6 @@ public class JDBCVirtualHost extends AbstractVirtualHost<JDBCVirtualHost>
@Override
protected MessageStore createMessageStore()
{
- return new JDBCMessageStore().getMessageStore();
+ return new GenericJDBCMessageStore();
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
index 2108b4e09a..ab8f4554cb 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.jdbc.JDBCMessageStore;
+import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore;
import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
@ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category = false )
@@ -62,7 +62,7 @@ public class JDBCVirtualHostNodeImpl extends AbstractStandardVirtualHostNode<JDB
@Override
protected DurableConfigurationStore createConfigurationStore()
{
- return new JDBCMessageStore();
+ return new GenericJDBCConfigurationStore();
}
@Override
diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 1f03b7f75f..8261e93347 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -52,7 +52,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
public void testOnDelete() throws Exception
{
- Set<String> expectedTables = JDBCMessageStore.MESSAGE_STORE_TABLE_NAMES;
+ Set<String> expectedTables = GenericJDBCMessageStore.MESSAGE_STORE_TABLE_NAMES;
assertTablesExist(expectedTables, true);
getStore().closeMessageStore();
assertTablesExist(expectedTables, true);
@@ -65,7 +65,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
{
_connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true";
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
- messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL);
+ messageStoreSettings.put(GenericJDBCMessageStore.CONNECTION_URL, _connectionURL);
return messageStoreSettings;
}
@@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return (new JDBCMessageStore()).getMessageStore();
+ return new GenericJDBCMessageStore();
}
private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException