diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-24 15:37:57 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-24 15:37:57 +0000 |
| commit | 0920024e3b0ccae42e180e5d432714f013fd4eb3 (patch) | |
| tree | 65acbeb44a8c2c5261a0fcf055265147ee22b3b9 /qpid/java/broker | |
| parent | 31b9cc15ef54b61a14867e308a8d60208d4dec81 (diff) | |
| download | qpid-python-0920024e3b0ccae42e180e5d432714f013fd4eb3.tar.gz | |
QPID-4837 : [Java Broker] add ability to use connection pool for JDBC store
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1496099 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
10 files changed, 275 insertions, 11 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index 6e50bdae5a..dbb2ee8aee 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -25,9 +25,9 @@ <property name="module.genpom" value="true"/> <!-- Add dependencies to the broker pom for the broker-plugins and bdbstore modules --> - <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control bdbstore bdbstore/jmx"/> + <property name="module.maven.depends" value="broker-plugins/management-http broker-plugins/management-jmx broker-plugins/access-control broker-plugins/jdbc-provider-bone bdbstore bdbstore/jmx"/> <!-- Make them runtime dependencies, make bdbstore modules optional --> - <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx"/> + <property name="module.genpom.args" value="-Sqpid-broker-plugins-management-http=runtime -Sqpid-broker-plugins-management-jmx=runtime -Sqpid-broker-plugins-access-control=runtime -Sqpid-bdbstore=runtime -Oqpid-bdbstore -Sqpid-bdbstore-jmx=runtime -Oqpid-bdbstore-jmx -Sqpid-broker-plugins-jdbc-provider-bone=runtime -Oqpid-broker-plugins-jdbc-provider-bone"/> <import file="../module.xml"/> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java new file mode 100644 index 0000000000..e3b7f03978 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.store.jdbc.ConnectionProvider; + +public interface JDBCConnectionProviderFactory +{ + String getType(); + + ConnectionProvider getConnectionProvider(String connectionUrl, Configuration storeConfiguration) + throws SQLException; + + static final class TYPES + { + private TYPES() + { + } + + public static Collection<String> get() + { + QpidServiceLoader<JDBCConnectionProviderFactory> qpidServiceLoader = new QpidServiceLoader<JDBCConnectionProviderFactory>(); + Iterable<JDBCConnectionProviderFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(JDBCConnectionProviderFactory.class); + List<String> names = new ArrayList<String>(); + for(JDBCConnectionProviderFactory factory : factories) + { + names.add(factory.getType()); + } + return Collections.unmodifiableCollection(names); + } + } + + + static final class FACTORIES + { + private FACTORIES() + { + } + + public static JDBCConnectionProviderFactory get(String type) + { + QpidServiceLoader<JDBCConnectionProviderFactory> qpidServiceLoader = new QpidServiceLoader<JDBCConnectionProviderFactory>(); + Iterable<JDBCConnectionProviderFactory> factories = qpidServiceLoader.atLeastOneInstanceOf(JDBCConnectionProviderFactory.class); + for(JDBCConnectionProviderFactory factory : factories) + { + if(factory.getType().equals(type)) + { + return factory; + } + } + return null; + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 745a06c7fe..b7f5035de0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -80,9 +80,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private final AtomicLong _messageId = new AtomicLong(0); private AtomicBoolean _closed = new AtomicBoolean(false); - protected String _connectionURL; - - private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; @@ -217,7 +214,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException; + protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException, SQLException; abstract protected Logger getLogger(); @@ -696,7 +693,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC */ protected Connection newConnection() throws SQLException { - final Connection connection = DriverManager.getConnection(_connectionURL); + final Connection connection = getConnection(); try { connection.setAutoCommit(false); @@ -716,6 +713,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC return connection; } + protected abstract Connection getConnection() throws SQLException; + @Override public void removeQueue(final AMQQueue queue) throws AMQStoreException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 5b53f9ee6c..0d4231a10d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -67,6 +67,8 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa private long _persistentSizeLowThreshold; private long _persistentSizeHighThreshold; + protected String _connectionURL; + private String _storeLocation; private Class<Driver> _driverClass; @@ -445,4 +447,9 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa } } } + + protected Connection getConnection() throws SQLException + { + return DriverManager.getConnection(_connectionURL); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java new file mode 100644 index 0000000000..c66fa4e869 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.sql.Connection; +import java.sql.SQLException; + +public interface ConnectionProvider +{ + Connection getConnection() throws SQLException; + + void close() throws SQLException; +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java new file mode 100644 index 0000000000..7945ae3b46 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +class DefaultConnectionProvider implements ConnectionProvider +{ + private final String _connectionUrl; + + public DefaultConnectionProvider(String connectionUrl) + { + _connectionUrl = connectionUrl; + } + + @Override + public Connection getConnection() throws SQLException + { + return DriverManager.getConnection(_connectionUrl); + } + + @Override + public void close() throws SQLException + { + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java new file mode 100644 index 0000000000..0f074cc95b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; + +public class DefaultConnectionProviderFactory implements JDBCConnectionProviderFactory +{ + + @Override + public String getType() + { + return "DEFAULT"; + } + + @Override + public ConnectionProvider getConnectionProvider(String connectionUrl, + Configuration storeConfiguration) + { + return new DefaultConnectionProvider(connectionUrl); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index 42a32c1cbd..79093fe2e2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.jdbc; import java.sql.Blob; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; @@ -31,6 +32,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.store.AbstractJDBCMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreConstants; @@ -50,6 +52,9 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag public static final String TYPE = "JDBC"; + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + private static class JDBCDetails { @@ -253,18 +258,34 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag RecordedJDBCTransaction txn = _transactions.get(0); txn.abortTran(); } + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new AMQStoreException("Unable to close connection provider ", e); + } + } + + + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); } + protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration) - throws ClassNotFoundException + throws ClassNotFoundException, SQLException { - _connectionURL = storeConfiguration.getString("connectionUrl", + + String connectionURL = storeConfiguration.getString("connectionUrl", storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); JDBCDetails details = null; - String[] components = _connectionURL.split(":",3); + String[] components = connectionURL.split(":",3); if(components.length >= 2) { String vendor = components[1]; @@ -273,12 +294,25 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag if(details == null) { - getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); + getLogger().info("Do not recognize vendor from connection URL: " + connectionURL); // TODO - is there a better default than derby details = DERBY_DETAILS; } + + Configuration poolConfig = storeConfiguration.subset("pool"); + String connectionPoolType = poolConfig.getString("type", "DEFAULT"); + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, poolConfig); + _blobType = storeConfiguration.getString("sqlBlobType",details.getBlobType()); _varBinaryType = storeConfiguration.getString("sqlVarbinaryType",details.getVarBinaryType()); _useBytesMethodsForBlob = storeConfiguration.getBoolean("useBytesForBlob",details.isUseBytesMethodsForBlob()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java index b47a5dd149..3a604dbd90 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java @@ -93,6 +93,14 @@ public class StandardVirtualHostFactory implements VirtualHostFactory Map<String,Object> convertedMap = new LinkedHashMap<String, Object>(); convertedMap.put("store.type", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE)); convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH)); + + // TODO - this should all be inverted to populate vhost from xml and then pass model object to the store + + convertedMap.put("store.pool.type",virtualHostAdapter.getAttribute("connectionPool")); + convertedMap.put("store.pool.minConnectionsPerPartition",virtualHostAdapter.getAttribute("minConnectionsPerPartition")); + convertedMap.put("store.pool.maxConnectionsPerPartition",virtualHostAdapter.getAttribute("maxConnectionsPerPartition")); + convertedMap.put("store.pool.partitionCount",virtualHostAdapter.getAttribute("partitionCount")); + return convertedMap; } } diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.JDBCConnectionProviderFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.JDBCConnectionProviderFactory new file mode 100644 index 0000000000..e0ae6e97cc --- /dev/null +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.JDBCConnectionProviderFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.jdbc.DefaultConnectionProviderFactory |
