summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-06-24 15:37:57 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-06-24 15:37:57 +0000
commit0920024e3b0ccae42e180e5d432714f013fd4eb3 (patch)
tree65acbeb44a8c2c5261a0fcf055265147ee22b3b9 /qpid/java/broker
parent31b9cc15ef54b61a14867e308a8d60208d4dec81 (diff)
downloadqpid-python-0920024e3b0ccae42e180e5d432714f013fd4eb3.tar.gz
QPID-4837 : [Java Broker] add ability to use connection pool for JDBC store
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1496099 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/build.xml4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java78
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java8
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.JDBCConnectionProviderFactory19
10 files changed, 275 insertions, 11 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index 6e50bdae5a..dbb2ee8aee 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -25,9 +25,9 @@
<property name="module.genpom" value="true"/>
<!-- Add dependencies to the broker pom for the broker-plugins and bdbstore modules -->
- <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control bdbstore bdbstore/jmx"/>
+ <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control broker-plugins/jdbc-provider-bone bdbstore bdbstore/jmx"/>
<!-- Make them runtime dependencies, make bdbstore modules optional -->
- <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx"/>
+ <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx -Sqpid-broker-plugins-jdbc-provider-bone=runtime -Oqpid-broker-plugins-jdbc-provider-bone"/>
<import file="../module.xml"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java
new file mode 100644
index 0000000000..e3b7f03978
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.store.jdbc.ConnectionProvider;
+
+public interface JDBCConnectionProviderFactory
+{
+ String getType();
+
+ ConnectionProvider getConnectionProvider(String connectionUrl, Configuration storeConfiguration)
+ throws SQLException;
+
+ static final class TYPES
+ {
+ private TYPES()
+ {
+ }
+
+ public static Collection<String> get()
+ {
+ QpidServiceLoader<JDBCConnectionProviderFactory> qpidServiceLoader = new QpidServiceLoader<JDBCConnectionProviderFactory>();
+ Iterable<JDBCConnectionProviderFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(JDBCConnectionProviderFactory.class);
+ List<String> names = new ArrayList<String>();
+ for(JDBCConnectionProviderFactory factory : factories)
+ {
+ names.add(factory.getType());
+ }
+ return Collections.unmodifiableCollection(names);
+ }
+ }
+
+
+ static final class FACTORIES
+ {
+ private FACTORIES()
+ {
+ }
+
+ public static JDBCConnectionProviderFactory get(String type)
+ {
+ QpidServiceLoader<JDBCConnectionProviderFactory> qpidServiceLoader = new QpidServiceLoader<JDBCConnectionProviderFactory>();
+ Iterable<JDBCConnectionProviderFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(JDBCConnectionProviderFactory.class);
+ for(JDBCConnectionProviderFactory factory : factories)
+ {
+ if(factory.getType().equals(type))
+ {
+ return factory;
+ }
+ }
+ return null;
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 745a06c7fe..b7f5035de0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -80,9 +80,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private final AtomicLong _messageId = new AtomicLong(0);
private AtomicBoolean _closed = new AtomicBoolean(false);
- protected String _connectionURL;
-
-
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
@@ -217,7 +214,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException;
+ protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException, SQLException;
abstract protected Logger getLogger();
@@ -696,7 +693,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
*/
protected Connection newConnection() throws SQLException
{
- final Connection connection = DriverManager.getConnection(_connectionURL);
+ final Connection connection = getConnection();
try
{
connection.setAutoCommit(false);
@@ -716,6 +713,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
return connection;
}
+ protected abstract Connection getConnection() throws SQLException;
+
@Override
public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 5b53f9ee6c..0d4231a10d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -67,6 +67,8 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
private long _persistentSizeLowThreshold;
private long _persistentSizeHighThreshold;
+ protected String _connectionURL;
+
private String _storeLocation;
private Class<Driver> _driverClass;
@@ -445,4 +447,9 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
}
}
}
+
+ protected Connection getConnection() throws SQLException
+ {
+ return DriverManager.getConnection(_connectionURL);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java
new file mode 100644
index 0000000000..c66fa4e869
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.jdbc;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public interface ConnectionProvider
+{
+ Connection getConnection() throws SQLException;
+
+ void close() throws SQLException;
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java
new file mode 100644
index 0000000000..7945ae3b46
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+class DefaultConnectionProvider implements ConnectionProvider
+{
+ private final String _connectionUrl;
+
+ public DefaultConnectionProvider(String connectionUrl)
+ {
+ _connectionUrl = connectionUrl;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException
+ {
+ return DriverManager.getConnection(_connectionUrl);
+ }
+
+ @Override
+ public void close() throws SQLException
+ {
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java
new file mode 100644
index 0000000000..0f074cc95b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.jdbc;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
+
+public class DefaultConnectionProviderFactory implements JDBCConnectionProviderFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return "DEFAULT";
+ }
+
+ @Override
+ public ConnectionProvider getConnectionProvider(String connectionUrl,
+ Configuration storeConfiguration)
+ {
+ return new DefaultConnectionProvider(connectionUrl);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index 42a32c1cbd..79093fe2e2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.jdbc;
import java.sql.Blob;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
@@ -31,6 +32,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreConstants;
@@ -50,6 +52,9 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
public static final String TYPE = "JDBC";
+ protected String _connectionURL;
+ private ConnectionProvider _connectionProvider;
+
private static class JDBCDetails
{
@@ -253,18 +258,34 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
RecordedJDBCTransaction txn = _transactions.get(0);
txn.abortTran();
}
+ try
+ {
+ _connectionProvider.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Unable to close connection provider ", e);
+ }
+ }
+
+
+ protected Connection getConnection() throws SQLException
+ {
+ return _connectionProvider.getConnection();
}
+
protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration)
- throws ClassNotFoundException
+ throws ClassNotFoundException, SQLException
{
- _connectionURL = storeConfiguration.getString("connectionUrl",
+
+ String connectionURL = storeConfiguration.getString("connectionUrl",
storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
JDBCDetails details = null;
- String[] components = _connectionURL.split(":",3);
+ String[] components = connectionURL.split(":",3);
if(components.length >= 2)
{
String vendor = components[1];
@@ -273,12 +294,25 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
if(details == null)
{
- getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+ getLogger().info("Do not recognize vendor from connection URL: " + connectionURL);
// TODO - is there a better default than derby
details = DERBY_DETAILS;
}
+
+ Configuration poolConfig = storeConfiguration.subset("pool");
+ String connectionPoolType = poolConfig.getString("type", "DEFAULT");
+ JDBCConnectionProviderFactory connectionProviderFactory =
+ JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
+ if(connectionProviderFactory == null)
+ {
+ _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used");
+ connectionProviderFactory = new DefaultConnectionProviderFactory();
+ }
+
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, poolConfig);
+
_blobType = storeConfiguration.getString("sqlBlobType",details.getBlobType());
_varBinaryType = storeConfiguration.getString("sqlVarbinaryType",details.getVarBinaryType());
_useBytesMethodsForBlob = storeConfiguration.getBoolean("useBytesForBlob",details.isUseBytesMethodsForBlob());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
index b47a5dd149..3a604dbd90 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
@@ -93,6 +93,14 @@ public class StandardVirtualHostFactory implements VirtualHostFactory
Map<String,Object> convertedMap = new LinkedHashMap<String, Object>();
convertedMap.put("store.type", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE));
convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH));
+
+ // TODO - this should all be inverted to populate vhost from xml and then pass model object to the store
+
+ convertedMap.put("store.pool.type",virtualHostAdapter.getAttribute("connectionPool"));
+ convertedMap.put("store.pool.minConnectionsPerPartition",virtualHostAdapter.getAttribute("minConnectionsPerPartition"));
+ convertedMap.put("store.pool.maxConnectionsPerPartition",virtualHostAdapter.getAttribute("maxConnectionsPerPartition"));
+ convertedMap.put("store.pool.partitionCount",virtualHostAdapter.getAttribute("partitionCount"));
+
return convertedMap;
}
}
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.JDBCConnectionProviderFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.JDBCConnectionProviderFactory
new file mode 100644
index 0000000000..e0ae6e97cc
--- /dev/null
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.JDBCConnectionProviderFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.jdbc.DefaultConnectionProviderFactory