diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-10 09:10:51 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-10 09:10:51 +0000 |
| commit | eaa8c11396b13c46c59c2030a23cc7763ecee9d7 (patch) | |
| tree | 1035b7dd270a843436871ef4f321e956c5d220f3 /qpid/java/broker-plugins | |
| parent | 934d23d90cb12c820ff71e54f2220991fd72c081 (diff) | |
| download | qpid-python-eaa8c11396b13c46c59c2030a23cc7763ecee9d7.tar.gz | |
QPID-4983 : [Java Broker] Move store implementations to broker plugins
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1501682 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
19 files changed, 1625 insertions, 0 deletions
diff --git a/qpid/java/broker-plugins/derby-store/build.xml b/qpid/java/broker-plugins/derby-store/build.xml new file mode 100644 index 0000000000..e93b81aad7 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/build.xml @@ -0,0 +1,32 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + --> +<project name="Qpid Broker-Plugins Derby Store" default="build"> + <property name="module.depends" value="common broker" /> + <property name="module.test.depends" value="common/tests broker/tests" /> + + <property name="module.genpom" value="true"/> + <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker=provided"/> + + <property name="broker.plugin" value="true"/> + + <import file="../../module.xml" /> + + <target name="bundle" depends="bundle-tasks"/> + +</project> diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java new file mode 100644 index 0000000000..ac310d02c9 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -0,0 +1,466 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.derby; + + +import java.io.File; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.util.FileUtils; + +/** + * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence + * mechanism. + * + */ +public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore +{ + + private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); + + private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + + public static final String MEMORY_STORE_LOCATION = ":memory:"; + + private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; + + public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + + public static final String TYPE = "DERBY"; + + private long _totalStoreSize; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + + protected String _connectionURL; + + private String _storeLocation; + private Class<Driver> _driverClass; + + public DerbyMessageStore() + { + } + + protected Logger getLogger() + { + return _logger; + } + + @Override + protected String getSqlBlobType() + { + return "blob"; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return "varchar("+size+") for bit data"; + } + + @Override + protected String getSqlBigIntType() + { + return "bigint"; + } + + protected void doClose() throws SQLException + { + try + { + Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true"); + // Shouldn't reach this point - shutdown=true should throw SQLException + conn.close(); + getLogger().error("Unable to shut down the store"); + } + catch (SQLException e) + { + if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) + { + //expected and represents a clean shutdown of this database only, do nothing. + } + else + { + getLogger().error("Exception whilst shutting down the store: " + e); + throw e; + } + } + } + + @Override + protected void implementationSpecificConfiguration(String name, + VirtualHost virtualHost) + throws ClassNotFoundException + { + //Update to pick up QPID_WORK and use that as the default location not just derbyDB + + _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); + + String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; + String databasePath = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(databasePath == null) + { + databasePath = defaultPath; + } + + if(!MEMORY_STORE_LOCATION.equals(databasePath)) + { + File environmentPath = new File(databasePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + } + + _storeLocation = databasePath; + + Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + + //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created. + _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true"; + + + + _eventManager.addEventListener(new EventListener() + { + @Override + public void event(Event event) + { + setInitialSize(); + } + }, Event.BEFORE_ACTIVATE); + + } + + private void setInitialSize() + { + Connection conn = null; + try + { + + + try + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + } + finally + { + if(conn != null) + { + conn.close(); + + + } + } + } + catch (SQLException e) + { + getLogger().error("Unable to set initial store size", e); + } + } + + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + byte[] bytes = blob.getBytes(1, (int)blob.length()); + return new String(bytes, UTF8_CHARSET); + } + + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + } + + + protected boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); + try + { + stmt.setString(1, tableName); + ResultSet rs = stmt.executeQuery(); + try + { + return rs.next(); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + + + @Override + public String getStoreLocation() + { + return _storeLocation; + } + + protected synchronized void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) + { + synchronized(this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 3*delta; + + Connection conn = null; + try + { + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(conn); + + _totalStoreSize = getSizeOnDisk(conn); + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Exception while processing store size change", e); + } + } + } + } + + private void reduceSizeOnDisk(Connection conn) + { + CallableStatement cs = null; + PreparedStatement stmt = null; + try + { + String tableQuery = + "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; + stmt = conn.prepareStatement(tableQuery); + ResultSet rs = null; + + List<String> schemas = new ArrayList<String>(); + List<String> tables = new ArrayList<String>(); + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + schemas.add(rs.getString(1)); + tables.add(rs.getString(2)); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + + cs = conn.prepareCall + ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); + + for(int i = 0; i < schemas.size(); i++) + { + cs.setString(1, schemas.get(i)); + cs.setString(2, tables.get(i)); + cs.setShort(3, (short) 0); + cs.execute(); + } + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Error reducing on disk size", e); + } + finally + { + closePreparedStatement(stmt); + closePreparedStatement(cs); + } + + } + + private long getSizeOnDisk(Connection conn) + { + PreparedStatement stmt = null; + try + { + String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + + " FROM " + + " SYS.SYSTABLES systabs," + + " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + + " WHERE systabs.tabletype = 'T'"; + + stmt = conn.prepareStatement(sizeQuery); + + ResultSet rs = null; + long size = 0l; + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + size = rs.getLong(1); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + return size; + + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Error establishing on disk size", e); + } + finally + { + closePreparedStatement(stmt); + } + + } + + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + + @Override + public String getStoreType() + { + return TYPE; + } + + @Override + public void onDelete() + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Deleting store " + _storeLocation); + } + + if (MEMORY_STORE_LOCATION.equals(_storeLocation)) + { + return; + } + + if (_storeLocation != null) + { + File location = new File(_storeLocation); + if (location.exists()) + { + if (!FileUtils.delete(location, true)) + { + _logger.error("Cannot delete " + _storeLocation); + } + } + } + } + + protected Connection getConnection() throws SQLException + { + return DriverManager.getConnection(_connectionURL); + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java new file mode 100644 index 0000000000..1b111ad65e --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import java.util.Collections; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.MessageStore; + +public class DerbyMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return DerbyMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new DerbyMessageStore(); + } + + @Override + public Map<String, Object> convertStoreConfiguration(Configuration configuration) + { + return Collections.emptyMap(); + } + + + @Override + public void validateAttributes(Map<String, Object> attributes) + { + Object storePath = attributes.get(VirtualHost.STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + +"' is required and must be of type String."); + + } + } + +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js b/qpid/java/broker-plugins/derby-store/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js index 04016b5fae..04016b5fae 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js +++ b/qpid/java/broker-plugins/derby-store/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/derby/add.html b/qpid/java/broker-plugins/derby-store/src/main/java/resources/virtualhost/store/derby/add.html index 2ed5b35c10..2ed5b35c10 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/derby/add.html +++ b/qpid/java/broker-plugins/derby-store/src/main/java/resources/virtualhost/store/derby/add.html diff --git a/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..88ca1fed5e --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.derby.DerbyMessageStoreFactory diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java new file mode 100644 index 0000000000..ffb6ac479a --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import org.apache.qpid.server.store.AbstractDurableConfigurationStoreTestCase; + +public class DerbyMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase +{ + + private DerbyMessageStore _derbyMessageStore; + + @Override + protected void onReopenStore() + { + _derbyMessageStore = null; + } + + @Override + protected DerbyMessageStore createMessageStore() throws Exception + { + createStoreIfNecessary(); + return _derbyMessageStore; + } + + + private void createStoreIfNecessary() + { + if(_derbyMessageStore == null) + { + _derbyMessageStore = new DerbyMessageStore(); + } + } + + @Override + protected DerbyMessageStore createConfigStore() throws Exception + { + createStoreIfNecessary(); + return _derbyMessageStore; + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java new file mode 100644 index 0000000000..479675dac1 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase +{ + private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class); + + private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10; + + /** + * Estimated using an assumption that a physical disk space occupied by a + * message is 3 times bigger then a message size + */ + private static final int OVERFULL_SIZE = (int) (MESSAGE_DATA.length * 3 * NUMBER_OF_MESSAGES_TO_OVERFILL_STORE * 0.8); + + private static final int UNDERFULL_SIZE = (int) (OVERFULL_SIZE * 0.8); + + @Override + protected int getNumberOfMessagesToFillStore() + { + return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; + } + + @Override + protected void applyStoreSpecificConfiguration(VirtualHost vhost) + { + _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + + when(vhost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); + when(vhost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); + } + + @Override + protected MessageStore createStore() throws Exception + { + return new DerbyMessageStore(); + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java new file mode 100644 index 0000000000..859fad629b --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import java.io.File; + +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreTestCase; +import org.apache.qpid.util.FileUtils; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +public class DerbyMessageStoreTest extends MessageStoreTestCase +{ + private String _storeLocation; + + @Override + public void tearDown() throws Exception + { + try + { + deleteStoreIfExists(); + } + finally + { + super.tearDown(); + } + } + + public void testOnDelete() throws Exception + { + File location = new File(_storeLocation); + assertTrue("Store does not exist at " + _storeLocation, location.exists()); + + getStore().close(); + assertTrue("Store does not exist at " + _storeLocation, location.exists()); + + getStore().onDelete(); + assertFalse("Store exists at " + _storeLocation, location.exists()); + } + + @Override + protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception + { + _storeLocation = TMP_FOLDER + File.separator + getTestName(); + when(virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation); + deleteStoreIfExists(); + } + + private void deleteStoreIfExists() + { + File location = new File(_storeLocation); + if (location.exists()) + { + FileUtils.delete(location, true); + } + } + + @Override + protected MessageStore createMessageStore() + { + return new DerbyMessageStore(); + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/build.xml b/qpid/java/broker-plugins/jdbc-store/build.xml new file mode 100644 index 0000000000..de6ec59845 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/build.xml @@ -0,0 +1,31 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + --> +<project name="Qpid Broker-Plugins JDBC Store" default="build"> + <property name="module.depends" value="common broker" /> + <property name="module.test.depends" value="common/tests broker/tests" /> + + <property name="module.genpom" value="true"/> + <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker=provided"/> + + <property name="broker.plugin" value="true"/> + + <import file="../../module.xml" /> + + <target name="bundle" depends="bundle-tasks"/> +</project> diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java new file mode 100644 index 0000000000..7945ae3b46 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +class DefaultConnectionProvider implements ConnectionProvider +{ + private final String _connectionUrl; + + public DefaultConnectionProvider(String connectionUrl) + { + _connectionUrl = connectionUrl; + } + + @Override + public Connection getConnection() throws SQLException + { + return DriverManager.getConnection(_connectionUrl); + } + + @Override + public void close() throws SQLException + { + } +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java new file mode 100644 index 0000000000..8fc7de12d0 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; + +public class DefaultConnectionProviderFactory implements JDBCConnectionProviderFactory +{ + + @Override + public String getType() + { + return "NONE"; + } + + @Override + public ConnectionProvider getConnectionProvider(String connectionUrl, + VirtualHost virtualHost) + { + return new DefaultConnectionProvider(connectionUrl); + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java new file mode 100644 index 0000000000..f8d93536bb --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -0,0 +1,462 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.jdbc; + + +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; +import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; + +/** + * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence + * mechanism. + * + */ +public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore +{ + + private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); + + + public static final String TYPE = "JDBC"; + public static final String CONNECTION_URL = "connectionURL"; + + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + + + private static class JDBCDetails + { + private final String _vendor; + private String _blobType; + private String _varBinaryType; + private String _bigintType; + private boolean _useBytesMethodsForBlob; + + private JDBCDetails(String vendor, + String blobType, + String varBinaryType, + String bigIntType, + boolean useBytesMethodsForBlob) + { + _vendor = vendor; + setBlobType(blobType); + setVarBinaryType(varBinaryType); + setBigintType(bigIntType); + setUseBytesMethodsForBlob(useBytesMethodsForBlob); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + JDBCDetails that = (JDBCDetails) o; + + if (!getVendor().equals(that.getVendor())) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return getVendor().hashCode(); + } + + @Override + public String toString() + { + return "JDBCDetails{" + + "vendor='" + getVendor() + '\'' + + ", blobType='" + getBlobType() + '\'' + + ", varBinaryType='" + getVarBinaryType() + '\'' + + ", bigIntType='" + getBigintType() + '\'' + + ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + + '}'; + } + + public String getVendor() + { + return _vendor; + } + + public String getBlobType() + { + return _blobType; + } + + public void setBlobType(String blobType) + { + _blobType = blobType; + } + + public String getVarBinaryType() + { + return _varBinaryType; + } + + public void setVarBinaryType(String varBinaryType) + { + _varBinaryType = varBinaryType; + } + + public boolean isUseBytesMethodsForBlob() + { + return _useBytesMethodsForBlob; + } + + public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) + { + _useBytesMethodsForBlob = useBytesMethodsForBlob; + } + + public String getBigintType() + { + return _bigintType; + } + + public void setBigintType(String bigintType) + { + _bigintType = bigintType; + } + } + + private static JDBCDetails DERBY_DETAILS = + new JDBCDetails("derby", + "blob", + "varchar(%d) for bit data", + "bigint", + false); + + private static JDBCDetails POSTGRESQL_DETAILS = + new JDBCDetails("postgresql", + "bytea", + "bytea", + "bigint", + true); + + private static JDBCDetails MYSQL_DETAILS = + new JDBCDetails("mysql", + "blob", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails SYBASE_DETAILS = + new JDBCDetails("sybase", + "image", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails ORACLE_DETAILS = + new JDBCDetails("oracle", + "blob", + "raw(%d)", + "number", + false); + + + private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>(); + + static + { + + addDetails(DERBY_DETAILS); + addDetails(POSTGRESQL_DETAILS); + addDetails(MYSQL_DETAILS); + addDetails(SYBASE_DETAILS); + addDetails(ORACLE_DETAILS); + } + + private static void addDetails(JDBCDetails details) + { + VENDOR_DETAILS.put(details.getVendor(), details); + } + + private String _blobType; + private String _varBinaryType; + private String _bigIntType; + private boolean _useBytesMethodsForBlob; + + private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>(); + + + public JDBCMessageStore() + { + } + + protected Logger getLogger() + { + return _logger; + } + + protected String getSqlBlobType() + { + return _blobType; + } + + protected String getSqlVarBinaryType(int size) + { + return String.format(_varBinaryType, size); + } + + public String getSqlBigIntType() + { + return _bigIntType; + } + + @Override + protected void doClose() throws AMQStoreException + { + while(!_transactions.isEmpty()) + { + RecordedJDBCTransaction txn = _transactions.get(0); + txn.abortTran(); + } + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new AMQStoreException("Unable to close connection provider ", e); + } + } + + + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); + } + + + protected void implementationSpecificConfiguration(String name, + VirtualHost virtualHost) + throws ClassNotFoundException, SQLException + { + + + String connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null + ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH)) + : String.valueOf(virtualHost.getAttribute(CONNECTION_URL)); + + JDBCDetails details = null; + + String[] components = connectionURL.split(":",3); + if(components.length >= 2) + { + String vendor = components[1]; + details = VENDOR_DETAILS.get(vendor); + } + + if(details == null) + { + getLogger().info("Do not recognize vendor from connection URL: " + connectionURL); + + // TODO - is there a better default than derby + details = DERBY_DETAILS; + } + + + Object poolAttribute = virtualHost.getAttribute("connectionPool"); + String connectionPoolType = poolAttribute == null ? "DEFAULT" : String.valueOf(poolAttribute); + + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost); + + _blobType = getStringAttribute(virtualHost, "jdbcBlobType",details.getBlobType()); + _varBinaryType = getStringAttribute(virtualHost, "jdbcVarbinaryType",details.getVarBinaryType()); + _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, "jdbcBytesForBlob",details.isUseBytesMethodsForBlob()); + _bigIntType = getStringAttribute(virtualHost, "jdbcBigIntType", details.getBigintType()); + } + + + private String getStringAttribute(VirtualHost virtualHost, String attributeName, String defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + return attrValue.toString(); + } + return defaultVal; + } + + private boolean getBooleanAttribute(VirtualHost virtualHost, String attributeName, boolean defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + if(attrValue instanceof Boolean) + { + return ((Boolean) attrValue).booleanValue(); + } + else if(attrValue instanceof String) + { + return Boolean.parseBoolean((String)attrValue); + } + + } + return defaultVal; + } + + + protected void storedSizeChange(int contentSize) + { + } + + @Override + public String getStoreLocation() + { + return ""; + } + + @Override + public String getStoreType() + { + return TYPE; + } + + @Override + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + if(_useBytesMethodsForBlob) + { + return rs.getBytes(col); + } + else + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + + } + } + + @Override + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + byte[] bytes; + if(_useBytesMethodsForBlob) + { + bytes = rs.getBytes(col); + return new String(bytes,UTF8_CHARSET); + } + else + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + bytes = blob.getBytes(1, (int)blob.length()); + } + return new String(bytes, UTF8_CHARSET); + + } + + @Override + public Transaction newTransaction() + { + return new RecordedJDBCTransaction(); + } + + private class RecordedJDBCTransaction extends JDBCTransaction + { + private RecordedJDBCTransaction() + { + super(); + JDBCMessageStore.this._transactions.add(this); + } + + @Override + public void commitTran() throws AMQStoreException + { + try + { + super.commitTran(); + } + finally + { + JDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public StoreFuture commitTranAsync() throws AMQStoreException + { + try + { + return super.commitTranAsync(); + } + finally + { + JDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public void abortTran() throws AMQStoreException + { + try + { + super.abortTran(); + } + finally + { + JDBCMessageStore.this._transactions.remove(this); + } + } + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java new file mode 100644 index 0000000000..82d2275156 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.MessageStore; + +public class JDBCMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return JDBCMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new JDBCMessageStore(); + } + + @Override + public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration) + { + Map<String,Object> convertedMap = new HashMap<String,Object>(); + convertedMap.put("jdbcBlobType", storeConfiguration.getString("sqlBlobType")); + convertedMap.put("jdbcVarbinaryType", storeConfiguration.getString("sqlVarbinaryType")); + if(storeConfiguration.containsKey("useBytesForBlob")) + { + convertedMap.put("jdbcUseBytesForBlob", storeConfiguration.getBoolean("useBytesForBlob")); + } + convertedMap.put("jdbcBigIntType", storeConfiguration.getString("sqlBigIntType")); + convertedMap.put("connectionPool", storeConfiguration.getString("pool.type")); + convertedMap.put("minConnectionsPerPartition", storeConfiguration.getInteger("pool.minConnectionsPerPartition", + null)); + convertedMap.put("maxConnectionsPerPartition", storeConfiguration.getInteger("pool.maxConnectionsPerPartition", + null)); + convertedMap.put("partitionCount", storeConfiguration.getInteger("pool.partitionCount", null)); + + return convertedMap; + } + + + @Override + public void validateAttributes(Map<String, Object> attributes) + { + Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL); + if(!(connectionURL instanceof String)) + { + Object storePath = attributes.get(VirtualHost.STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL + +"' is required and must be of type String."); + + } + } + } + +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js index dd79aae2fa..dd79aae2fa 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js index 7276737873..7276737873 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/jdbc/add.html b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/jdbc/add.html index 966b4fcc06..966b4fcc06 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/jdbc/add.html +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/jdbc/add.html diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..a77458f27d --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java new file mode 100644 index 0000000000..9c348383c6 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreTestCase; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +public class JDBCMessageStoreTest extends MessageStoreTestCase +{ + private String _connectionURL; + + @Override + public void tearDown() throws Exception + { + try + { + shutdownDerby(); + } + finally + { + super.tearDown(); + } + } + + public void testOnDelete() throws Exception + { + String[] expectedTables = JDBCMessageStore.ALL_TABLES; + assertTablesExist(expectedTables, true); + getStore().close(); + assertTablesExist(expectedTables, true); + getStore().onDelete(); + assertTablesExist(expectedTables, false); + } + + @Override + protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception + { + _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; + + when(virtualHost.getAttribute(eq("connectionURL"))).thenReturn(_connectionURL); + } + + @Override + protected MessageStore createMessageStore() + { + return new JDBCMessageStore(); + } + + private void assertTablesExist(String[] expectedTables, boolean exists) throws SQLException + { + Set<String> existingTables = getTableNames(); + for (String tableName : expectedTables) + { + assertEquals("Table " + tableName + (exists ? " is not found" : " actually exist"), exists, + existingTables.contains(tableName)); + } + } + + private Set<String> getTableNames() throws SQLException + { + Set<String> tableNames = new HashSet<String>(); + Connection conn = null; + try + { + conn = openConnection(); + DatabaseMetaData metaData = conn.getMetaData(); + ResultSet tables = metaData.getTables(null, null, null, new String[] { "TABLE" }); + try + { + while (tables.next()) + { + tableNames.add(tables.getString("TABLE_NAME")); + } + } + finally + { + tables.close(); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + return tableNames; + } + + private Connection openConnection() throws SQLException + { + return DriverManager.getConnection(_connectionURL); + } + + + private void shutdownDerby() throws SQLException + { + Connection connection = null; + try + { + connection = DriverManager.getConnection("jdbc:derby:memory:/" + getTestName() + ";shutdown=true"); + } + catch(SQLException e) + { + if (e.getSQLState().equalsIgnoreCase("08006")) + { + //expected and represents a clean shutdown of this database only, do nothing. + } + else + { + throw e; + } + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } +} |
