From eaa8c11396b13c46c59c2030a23cc7763ecee9d7 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 10 Jul 2013 09:10:51 +0000 Subject: QPID-4983 : [Java Broker] Move store implementations to broker plugins git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1501682 13f79535-47bb-0310-9956-ffa450edef68 --- .../BDBMessageStoreConfigurationTest.java | 24 +- .../store/berkeleydb/MessageStoreCreatorTest.java | 4 +- qpid/java/broker-plugins/derby-store/build.xml | 32 ++ .../qpid/server/store/derby/DerbyMessageStore.java | 466 ++++++++++++++++++ .../store/derby/DerbyMessageStoreFactory.java | 64 +++ .../qpid/management/virtualhost/store/derby/add.js | 56 +++ .../resources/virtualhost/store/derby/add.html | 9 + ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 + .../derby/DerbyMessageStoreConfigurationTest.java | 58 +++ .../derby/DerbyMessageStoreQuotaEventsTest.java | 66 +++ .../server/store/derby/DerbyMessageStoreTest.java | 85 ++++ qpid/java/broker-plugins/jdbc-store/build.xml | 31 ++ .../store/jdbc/DefaultConnectionProvider.java | 46 ++ .../jdbc/DefaultConnectionProviderFactory.java | 42 ++ .../qpid/server/store/jdbc/JDBCMessageStore.java | 462 ++++++++++++++++++ .../server/store/jdbc/JDBCMessageStoreFactory.java | 83 ++++ .../qpid/management/virtualhost/store/jdbc/add.js | 93 ++++ .../management/virtualhost/store/pool/none/add.js | 56 +++ .../java/resources/virtualhost/store/jdbc/add.html | 15 + ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 + .../server/store/jdbc/JDBCMessageStoreTest.java | 152 ++++++ .../qpid/management/virtualhost/store/derby/add.js | 56 --- .../qpid/management/virtualhost/store/jdbc/add.js | 93 ---- .../management/virtualhost/store/pool/none/add.js | 56 --- .../resources/virtualhost/store/derby/add.html | 9 - .../java/resources/virtualhost/store/jdbc/add.html | 15 - qpid/java/broker/build.xml | 4 +- .../qpid/server/store/derby/DerbyMessageStore.java | 466 ------------------ .../store/derby/DerbyMessageStoreFactory.java | 64 --- .../qpid/server/store/jdbc/ConnectionProvider.java | 1 + .../store/jdbc/DefaultConnectionProvider.java | 46 -- .../jdbc/DefaultConnectionProviderFactory.java | 42 -- .../qpid/server/store/jdbc/JDBCMessageStore.java | 462 ------------------ .../server/store/jdbc/JDBCMessageStoreFactory.java | 83 ---- ...g.apache.qpid.server.plugin.MessageStoreFactory | 2 - .../startup/VirtualHostRecovererTest.java | 6 +- .../AbstractDurableConfigurationStoreTestCase.java | 519 ++++++++++++++++++++ .../store/DurableConfigurationStoreTest.java | 520 --------------------- .../qpid/server/store/MessageStoreCreatorTest.java | 39 -- .../derby/DerbyMessageStoreQuotaEventsTest.java | 66 --- .../server/store/derby/DerbyMessageStoreTest.java | 85 ---- .../server/store/jdbc/JDBCMessageStoreTest.java | 153 ------ qpid/java/build.deps | 2 + .../apache/qpid/test/utils/QpidBrokerTestCase.java | 3 +- 44 files changed, 2401 insertions(+), 2273 deletions(-) create mode 100644 qpid/java/broker-plugins/derby-store/build.xml create mode 100644 qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java create mode 100644 qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java create mode 100644 qpid/java/broker-plugins/derby-store/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js create mode 100644 qpid/java/broker-plugins/derby-store/src/main/java/resources/virtualhost/store/derby/add.html create mode 100644 qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory create mode 100644 qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java create mode 100644 qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java create mode 100644 qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java create mode 100644 qpid/java/broker-plugins/jdbc-store/build.xml create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/jdbc/add.html create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory create mode 100644 qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java delete mode 100644 qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js delete mode 100644 qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js delete mode 100644 qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js delete mode 100644 qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/derby/add.html delete mode 100644 qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/jdbc/add.html delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreCreatorTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java index 6c6145fabb..da1038284e 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -20,26 +20,40 @@ */ package org.apache.qpid.server.store.berkeleydb; +import org.apache.qpid.server.store.AbstractDurableConfigurationStoreTestCase; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.DurableConfigurationStoreTest; -import org.apache.qpid.server.store.MessageStore; -public class BDBMessageStoreConfigurationTest extends DurableConfigurationStoreTest +public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase { private BDBMessageStore _bdbMessageStore; + @Override + protected void onReopenStore() + { + _bdbMessageStore = null; + } + @Override protected BDBMessageStore createMessageStore() throws Exception { - _bdbMessageStore = new BDBMessageStore(); + createStoreIfNecessary(); return _bdbMessageStore; } - // TODO - this only works so long as createConfigStore is called after createMessageStore + private void createStoreIfNecessary() + { + if(_bdbMessageStore == null) + { + _bdbMessageStore = new BDBMessageStore(); + } + } + @Override protected DurableConfigurationStore createConfigStore() throws Exception { + createStoreIfNecessary(); + return _bdbMessageStore; } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java index 8564199009..730001d849 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; -import org.apache.qpid.server.store.derby.DerbyMessageStore; import org.apache.qpid.test.utils.QpidTestCase; public class MessageStoreCreatorTest extends QpidTestCase { - private static final String[] STORE_TYPES = {MemoryMessageStore.TYPE, DerbyMessageStore.TYPE, BDBMessageStore.TYPE}; + private static final String[] STORE_TYPES = {BDBMessageStore.TYPE}; public void testMessageStoreCreator() { diff --git a/qpid/java/broker-plugins/derby-store/build.xml b/qpid/java/broker-plugins/derby-store/build.xml new file mode 100644 index 0000000000..e93b81aad7 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/build.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java new file mode 100644 index 0000000000..ac310d02c9 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -0,0 +1,466 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.derby; + + +import java.io.File; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.util.FileUtils; + +/** + * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence + * mechanism. + * + */ +public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore +{ + + private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); + + private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + + public static final String MEMORY_STORE_LOCATION = ":memory:"; + + private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; + + public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + + public static final String TYPE = "DERBY"; + + private long _totalStoreSize; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + + protected String _connectionURL; + + private String _storeLocation; + private Class _driverClass; + + public DerbyMessageStore() + { + } + + protected Logger getLogger() + { + return _logger; + } + + @Override + protected String getSqlBlobType() + { + return "blob"; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return "varchar("+size+") for bit data"; + } + + @Override + protected String getSqlBigIntType() + { + return "bigint"; + } + + protected void doClose() throws SQLException + { + try + { + Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true"); + // Shouldn't reach this point - shutdown=true should throw SQLException + conn.close(); + getLogger().error("Unable to shut down the store"); + } + catch (SQLException e) + { + if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) + { + //expected and represents a clean shutdown of this database only, do nothing. + } + else + { + getLogger().error("Exception whilst shutting down the store: " + e); + throw e; + } + } + } + + @Override + protected void implementationSpecificConfiguration(String name, + VirtualHost virtualHost) + throws ClassNotFoundException + { + //Update to pick up QPID_WORK and use that as the default location not just derbyDB + + _driverClass = (Class) Class.forName(SQL_DRIVER_NAME); + + String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; + String databasePath = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(databasePath == null) + { + databasePath = defaultPath; + } + + if(!MEMORY_STORE_LOCATION.equals(databasePath)) + { + File environmentPath = new File(databasePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + } + + _storeLocation = databasePath; + + Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + + //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created. + _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true"; + + + + _eventManager.addEventListener(new EventListener() + { + @Override + public void event(Event event) + { + setInitialSize(); + } + }, Event.BEFORE_ACTIVATE); + + } + + private void setInitialSize() + { + Connection conn = null; + try + { + + + try + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + } + finally + { + if(conn != null) + { + conn.close(); + + + } + } + } + catch (SQLException e) + { + getLogger().error("Unable to set initial store size", e); + } + } + + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + byte[] bytes = blob.getBytes(1, (int)blob.length()); + return new String(bytes, UTF8_CHARSET); + } + + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + } + + + protected boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); + try + { + stmt.setString(1, tableName); + ResultSet rs = stmt.executeQuery(); + try + { + return rs.next(); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + + + @Override + public String getStoreLocation() + { + return _storeLocation; + } + + protected synchronized void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) + { + synchronized(this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 3*delta; + + Connection conn = null; + try + { + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(conn); + + _totalStoreSize = getSizeOnDisk(conn); + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Exception while processing store size change", e); + } + } + } + } + + private void reduceSizeOnDisk(Connection conn) + { + CallableStatement cs = null; + PreparedStatement stmt = null; + try + { + String tableQuery = + "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; + stmt = conn.prepareStatement(tableQuery); + ResultSet rs = null; + + List schemas = new ArrayList(); + List tables = new ArrayList(); + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + schemas.add(rs.getString(1)); + tables.add(rs.getString(2)); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + + cs = conn.prepareCall + ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); + + for(int i = 0; i < schemas.size(); i++) + { + cs.setString(1, schemas.get(i)); + cs.setString(2, tables.get(i)); + cs.setShort(3, (short) 0); + cs.execute(); + } + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Error reducing on disk size", e); + } + finally + { + closePreparedStatement(stmt); + closePreparedStatement(cs); + } + + } + + private long getSizeOnDisk(Connection conn) + { + PreparedStatement stmt = null; + try + { + String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + + " FROM " + + " SYS.SYSTABLES systabs," + + " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + + " WHERE systabs.tabletype = 'T'"; + + stmt = conn.prepareStatement(sizeQuery); + + ResultSet rs = null; + long size = 0l; + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + size = rs.getLong(1); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + return size; + + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Error establishing on disk size", e); + } + finally + { + closePreparedStatement(stmt); + } + + } + + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + + @Override + public String getStoreType() + { + return TYPE; + } + + @Override + public void onDelete() + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Deleting store " + _storeLocation); + } + + if (MEMORY_STORE_LOCATION.equals(_storeLocation)) + { + return; + } + + if (_storeLocation != null) + { + File location = new File(_storeLocation); + if (location.exists()) + { + if (!FileUtils.delete(location, true)) + { + _logger.error("Cannot delete " + _storeLocation); + } + } + } + } + + protected Connection getConnection() throws SQLException + { + return DriverManager.getConnection(_connectionURL); + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java new file mode 100644 index 0000000000..1b111ad65e --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import java.util.Collections; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.MessageStore; + +public class DerbyMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return DerbyMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new DerbyMessageStore(); + } + + @Override + public Map convertStoreConfiguration(Configuration configuration) + { + return Collections.emptyMap(); + } + + + @Override + public void validateAttributes(Map attributes) + { + Object storePath = attributes.get(VirtualHost.STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + +"' is required and must be of type String."); + + } + } + +} diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js b/qpid/java/broker-plugins/derby-store/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js new file mode 100644 index 0000000000..04016b5fae --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/xhr", + "dojo/dom", + "dojo/dom-construct", + "dojo/_base/window", + "dijit/registry", + "dojo/parser", + "dojo/_base/array", + "dojo/_base/event", + "dojo/_base/json", + "dojo/string", + "dojo/store/Memory", + "dijit/form/FilteringSelect", + "dojo/domReady!"], + function (xhr, dom, construct, win, registry, parser, array, event, json, string, Memory, FilteringSelect) { + return { + show: function() { + var node = dom.byId("addVirtualHost.storeSpecificDiv"); + var that = this; + + array.forEach(registry.toArray(), + function(item) { + if(item.id.substr(0,34) == "formAddVirtualHost.specific.store.") { + item.destroyRecursive(); + } + }); + + xhr.get({url: "virtualhost/store/derby/add.html", + sync: true, + load: function(data) { + node.innerHTML = data; + parser.parse(node); + + }}); + } + }; + }); diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/resources/virtualhost/store/derby/add.html b/qpid/java/broker-plugins/derby-store/src/main/java/resources/virtualhost/store/derby/add.html new file mode 100644 index 0000000000..2ed5b35c10 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/resources/virtualhost/store/derby/add.html @@ -0,0 +1,9 @@ + + + + + +
Path to store location*: + +
diff --git a/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..88ca1fed5e --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.derby.DerbyMessageStoreFactory diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java new file mode 100644 index 0000000000..ffb6ac479a --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import org.apache.qpid.server.store.AbstractDurableConfigurationStoreTestCase; + +public class DerbyMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase +{ + + private DerbyMessageStore _derbyMessageStore; + + @Override + protected void onReopenStore() + { + _derbyMessageStore = null; + } + + @Override + protected DerbyMessageStore createMessageStore() throws Exception + { + createStoreIfNecessary(); + return _derbyMessageStore; + } + + + private void createStoreIfNecessary() + { + if(_derbyMessageStore == null) + { + _derbyMessageStore = new DerbyMessageStore(); + } + } + + @Override + protected DerbyMessageStore createConfigStore() throws Exception + { + createStoreIfNecessary(); + return _derbyMessageStore; + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java new file mode 100644 index 0000000000..479675dac1 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase +{ + private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class); + + private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10; + + /** + * Estimated using an assumption that a physical disk space occupied by a + * message is 3 times bigger then a message size + */ + private static final int OVERFULL_SIZE = (int) (MESSAGE_DATA.length * 3 * NUMBER_OF_MESSAGES_TO_OVERFILL_STORE * 0.8); + + private static final int UNDERFULL_SIZE = (int) (OVERFULL_SIZE * 0.8); + + @Override + protected int getNumberOfMessagesToFillStore() + { + return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; + } + + @Override + protected void applyStoreSpecificConfiguration(VirtualHost vhost) + { + _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + + when(vhost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); + when(vhost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); + } + + @Override + protected MessageStore createStore() throws Exception + { + return new DerbyMessageStore(); + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java new file mode 100644 index 0000000000..859fad629b --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import java.io.File; + +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreTestCase; +import org.apache.qpid.util.FileUtils; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +public class DerbyMessageStoreTest extends MessageStoreTestCase +{ + private String _storeLocation; + + @Override + public void tearDown() throws Exception + { + try + { + deleteStoreIfExists(); + } + finally + { + super.tearDown(); + } + } + + public void testOnDelete() throws Exception + { + File location = new File(_storeLocation); + assertTrue("Store does not exist at " + _storeLocation, location.exists()); + + getStore().close(); + assertTrue("Store does not exist at " + _storeLocation, location.exists()); + + getStore().onDelete(); + assertFalse("Store exists at " + _storeLocation, location.exists()); + } + + @Override + protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception + { + _storeLocation = TMP_FOLDER + File.separator + getTestName(); + when(virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation); + deleteStoreIfExists(); + } + + private void deleteStoreIfExists() + { + File location = new File(_storeLocation); + if (location.exists()) + { + FileUtils.delete(location, true); + } + } + + @Override + protected MessageStore createMessageStore() + { + return new DerbyMessageStore(); + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/build.xml b/qpid/java/broker-plugins/jdbc-store/build.xml new file mode 100644 index 0000000000..de6ec59845 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/build.xml @@ -0,0 +1,31 @@ + + + + + + + + + + + + + + diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java new file mode 100644 index 0000000000..7945ae3b46 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +class DefaultConnectionProvider implements ConnectionProvider +{ + private final String _connectionUrl; + + public DefaultConnectionProvider(String connectionUrl) + { + _connectionUrl = connectionUrl; + } + + @Override + public Connection getConnection() throws SQLException + { + return DriverManager.getConnection(_connectionUrl); + } + + @Override + public void close() throws SQLException + { + } +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java new file mode 100644 index 0000000000..8fc7de12d0 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; + +public class DefaultConnectionProviderFactory implements JDBCConnectionProviderFactory +{ + + @Override + public String getType() + { + return "NONE"; + } + + @Override + public ConnectionProvider getConnectionProvider(String connectionUrl, + VirtualHost virtualHost) + { + return new DefaultConnectionProvider(connectionUrl); + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java new file mode 100644 index 0000000000..f8d93536bb --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -0,0 +1,462 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.jdbc; + + +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; +import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; + +/** + * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence + * mechanism. + * + */ +public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore +{ + + private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); + + + public static final String TYPE = "JDBC"; + public static final String CONNECTION_URL = "connectionURL"; + + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + + + private static class JDBCDetails + { + private final String _vendor; + private String _blobType; + private String _varBinaryType; + private String _bigintType; + private boolean _useBytesMethodsForBlob; + + private JDBCDetails(String vendor, + String blobType, + String varBinaryType, + String bigIntType, + boolean useBytesMethodsForBlob) + { + _vendor = vendor; + setBlobType(blobType); + setVarBinaryType(varBinaryType); + setBigintType(bigIntType); + setUseBytesMethodsForBlob(useBytesMethodsForBlob); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + JDBCDetails that = (JDBCDetails) o; + + if (!getVendor().equals(that.getVendor())) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return getVendor().hashCode(); + } + + @Override + public String toString() + { + return "JDBCDetails{" + + "vendor='" + getVendor() + '\'' + + ", blobType='" + getBlobType() + '\'' + + ", varBinaryType='" + getVarBinaryType() + '\'' + + ", bigIntType='" + getBigintType() + '\'' + + ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + + '}'; + } + + public String getVendor() + { + return _vendor; + } + + public String getBlobType() + { + return _blobType; + } + + public void setBlobType(String blobType) + { + _blobType = blobType; + } + + public String getVarBinaryType() + { + return _varBinaryType; + } + + public void setVarBinaryType(String varBinaryType) + { + _varBinaryType = varBinaryType; + } + + public boolean isUseBytesMethodsForBlob() + { + return _useBytesMethodsForBlob; + } + + public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) + { + _useBytesMethodsForBlob = useBytesMethodsForBlob; + } + + public String getBigintType() + { + return _bigintType; + } + + public void setBigintType(String bigintType) + { + _bigintType = bigintType; + } + } + + private static JDBCDetails DERBY_DETAILS = + new JDBCDetails("derby", + "blob", + "varchar(%d) for bit data", + "bigint", + false); + + private static JDBCDetails POSTGRESQL_DETAILS = + new JDBCDetails("postgresql", + "bytea", + "bytea", + "bigint", + true); + + private static JDBCDetails MYSQL_DETAILS = + new JDBCDetails("mysql", + "blob", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails SYBASE_DETAILS = + new JDBCDetails("sybase", + "image", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails ORACLE_DETAILS = + new JDBCDetails("oracle", + "blob", + "raw(%d)", + "number", + false); + + + private static Map VENDOR_DETAILS = new HashMap(); + + static + { + + addDetails(DERBY_DETAILS); + addDetails(POSTGRESQL_DETAILS); + addDetails(MYSQL_DETAILS); + addDetails(SYBASE_DETAILS); + addDetails(ORACLE_DETAILS); + } + + private static void addDetails(JDBCDetails details) + { + VENDOR_DETAILS.put(details.getVendor(), details); + } + + private String _blobType; + private String _varBinaryType; + private String _bigIntType; + private boolean _useBytesMethodsForBlob; + + private List _transactions = new CopyOnWriteArrayList(); + + + public JDBCMessageStore() + { + } + + protected Logger getLogger() + { + return _logger; + } + + protected String getSqlBlobType() + { + return _blobType; + } + + protected String getSqlVarBinaryType(int size) + { + return String.format(_varBinaryType, size); + } + + public String getSqlBigIntType() + { + return _bigIntType; + } + + @Override + protected void doClose() throws AMQStoreException + { + while(!_transactions.isEmpty()) + { + RecordedJDBCTransaction txn = _transactions.get(0); + txn.abortTran(); + } + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new AMQStoreException("Unable to close connection provider ", e); + } + } + + + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); + } + + + protected void implementationSpecificConfiguration(String name, + VirtualHost virtualHost) + throws ClassNotFoundException, SQLException + { + + + String connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null + ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH)) + : String.valueOf(virtualHost.getAttribute(CONNECTION_URL)); + + JDBCDetails details = null; + + String[] components = connectionURL.split(":",3); + if(components.length >= 2) + { + String vendor = components[1]; + details = VENDOR_DETAILS.get(vendor); + } + + if(details == null) + { + getLogger().info("Do not recognize vendor from connection URL: " + connectionURL); + + // TODO - is there a better default than derby + details = DERBY_DETAILS; + } + + + Object poolAttribute = virtualHost.getAttribute("connectionPool"); + String connectionPoolType = poolAttribute == null ? "DEFAULT" : String.valueOf(poolAttribute); + + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost); + + _blobType = getStringAttribute(virtualHost, "jdbcBlobType",details.getBlobType()); + _varBinaryType = getStringAttribute(virtualHost, "jdbcVarbinaryType",details.getVarBinaryType()); + _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, "jdbcBytesForBlob",details.isUseBytesMethodsForBlob()); + _bigIntType = getStringAttribute(virtualHost, "jdbcBigIntType", details.getBigintType()); + } + + + private String getStringAttribute(VirtualHost virtualHost, String attributeName, String defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + return attrValue.toString(); + } + return defaultVal; + } + + private boolean getBooleanAttribute(VirtualHost virtualHost, String attributeName, boolean defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + if(attrValue instanceof Boolean) + { + return ((Boolean) attrValue).booleanValue(); + } + else if(attrValue instanceof String) + { + return Boolean.parseBoolean((String)attrValue); + } + + } + return defaultVal; + } + + + protected void storedSizeChange(int contentSize) + { + } + + @Override + public String getStoreLocation() + { + return ""; + } + + @Override + public String getStoreType() + { + return TYPE; + } + + @Override + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + if(_useBytesMethodsForBlob) + { + return rs.getBytes(col); + } + else + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + + } + } + + @Override + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + byte[] bytes; + if(_useBytesMethodsForBlob) + { + bytes = rs.getBytes(col); + return new String(bytes,UTF8_CHARSET); + } + else + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + bytes = blob.getBytes(1, (int)blob.length()); + } + return new String(bytes, UTF8_CHARSET); + + } + + @Override + public Transaction newTransaction() + { + return new RecordedJDBCTransaction(); + } + + private class RecordedJDBCTransaction extends JDBCTransaction + { + private RecordedJDBCTransaction() + { + super(); + JDBCMessageStore.this._transactions.add(this); + } + + @Override + public void commitTran() throws AMQStoreException + { + try + { + super.commitTran(); + } + finally + { + JDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public StoreFuture commitTranAsync() throws AMQStoreException + { + try + { + return super.commitTranAsync(); + } + finally + { + JDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public void abortTran() throws AMQStoreException + { + try + { + super.abortTran(); + } + finally + { + JDBCMessageStore.this._transactions.remove(this); + } + } + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java new file mode 100644 index 0000000000..82d2275156 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.MessageStore; + +public class JDBCMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return JDBCMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new JDBCMessageStore(); + } + + @Override + public Map convertStoreConfiguration(Configuration storeConfiguration) + { + Map convertedMap = new HashMap(); + convertedMap.put("jdbcBlobType", storeConfiguration.getString("sqlBlobType")); + convertedMap.put("jdbcVarbinaryType", storeConfiguration.getString("sqlVarbinaryType")); + if(storeConfiguration.containsKey("useBytesForBlob")) + { + convertedMap.put("jdbcUseBytesForBlob", storeConfiguration.getBoolean("useBytesForBlob")); + } + convertedMap.put("jdbcBigIntType", storeConfiguration.getString("sqlBigIntType")); + convertedMap.put("connectionPool", storeConfiguration.getString("pool.type")); + convertedMap.put("minConnectionsPerPartition", storeConfiguration.getInteger("pool.minConnectionsPerPartition", + null)); + convertedMap.put("maxConnectionsPerPartition", storeConfiguration.getInteger("pool.maxConnectionsPerPartition", + null)); + convertedMap.put("partitionCount", storeConfiguration.getInteger("pool.partitionCount", null)); + + return convertedMap; + } + + + @Override + public void validateAttributes(Map attributes) + { + Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL); + if(!(connectionURL instanceof String)) + { + Object storePath = attributes.get(VirtualHost.STORE_PATH); + if(!(storePath instanceof String)) + { + throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL + +"' is required and must be of type String."); + + } + } + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js new file mode 100644 index 0000000000..dd79aae2fa --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/xhr", + "dojo/dom", + "dojo/dom-construct", + "dojo/_base/window", + "dijit/registry", + "dojo/parser", + "dojo/_base/array", + "dojo/_base/event", + "dojo/_base/json", + "dojo/string", + "dojo/store/Memory", + "dijit/form/FilteringSelect", + "dojo/domReady!"], + function (xhr, dom, construct, win, registry, parser, array, event, json, string, Memory, FilteringSelect) { + return { + show: function() { + var node = dom.byId("addVirtualHost.storeSpecificDiv"); + var that = this; + + array.forEach(registry.toArray(), + function(item) { + if(item.id.substr(0,34) == "formAddVirtualHost.specific.store.") { + item.destroyRecursive(); + } + }); + + xhr.get({url: "virtualhost/store/jdbc/add.html", + sync: true, + load: function(data) { + node.innerHTML = data; + parser.parse(node); + + if (that.hasOwnProperty("poolTypeChooser")) + { + that.poolTypeChooser.destroy(); + } + + var selectPoolType = function(type) { + if(type && string.trim(type) != "") { + require(["qpid/management/virtualhost/store/pool/"+type.toLowerCase()+"/add"], + function(poolType) + { + poolType.show(); + }); + } + } + + xhr.get({ + sync: true, + url: "rest/helper?action=pluginList&plugin=JDBCConnectionProviderFactory", + handleAs: "json" + }).then( + function(data) { + var poolTypes = data; + var poolTypesData = []; + for (var i =0 ; i < poolTypes.length; i++) + { + poolTypesData[i]= {id: poolTypes[i], name: poolTypes[i]}; + } + var poolTypesStore = new Memory({ data: poolTypesData }); + var poolTypesDiv = dom.byId("addVirtualHost.specific.selectPoolType"); + var input = construct.create("input", {id: "addPoolType", required: false}, poolTypesDiv); + that.poolTypeChooser = new FilteringSelect({ id: "addVirtualHost.specific.store.poolType", + name: "connectionPool", + store: poolTypesStore, + searchAttr: "name", required: false, + onChange: selectPoolType }, input); + }); + + }}); + } + }; + }); diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js new file mode 100644 index 0000000000..7276737873 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/xhr", + "dojo/dom", + "dojo/dom-construct", + "dojo/_base/window", + "dijit/registry", + "dojo/parser", + "dojo/_base/array", + "dojo/_base/event", + "dojo/_base/json", + "dojo/string", + "dojo/store/Memory", + "dijit/form/FilteringSelect", + "dojo/domReady!"], + function (xhr, dom, construct, win, registry, parser, array, event, json, string, Memory, FilteringSelect) { + return { + show: function() { + var node = dom.byId("addVirtualHost.poolSpecificDiv"); + var that = this; + + array.forEach(registry.toArray(), + function(item) { + if(item.id.substr(0,39) == "formAddVirtualHost.specific.store.pool.") { + item.destroyRecursive(); + } + }); + + xhr.get({url: "virtualhost/store/pool/none/add.html", + sync: true, + load: function(data) { + node.innerHTML = data; + parser.parse(node); + + }}); + } + }; + }); diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/jdbc/add.html b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/jdbc/add.html new file mode 100644 index 0000000000..966b4fcc06 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/jdbc/add.html @@ -0,0 +1,15 @@ + + + + + + + + + +
JDBC Url*: + +
Connection Pool:
+
+
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..a77458f27d --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java new file mode 100644 index 0000000000..9c348383c6 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreTestCase; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +public class JDBCMessageStoreTest extends MessageStoreTestCase +{ + private String _connectionURL; + + @Override + public void tearDown() throws Exception + { + try + { + shutdownDerby(); + } + finally + { + super.tearDown(); + } + } + + public void testOnDelete() throws Exception + { + String[] expectedTables = JDBCMessageStore.ALL_TABLES; + assertTablesExist(expectedTables, true); + getStore().close(); + assertTablesExist(expectedTables, true); + getStore().onDelete(); + assertTablesExist(expectedTables, false); + } + + @Override + protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception + { + _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; + + when(virtualHost.getAttribute(eq("connectionURL"))).thenReturn(_connectionURL); + } + + @Override + protected MessageStore createMessageStore() + { + return new JDBCMessageStore(); + } + + private void assertTablesExist(String[] expectedTables, boolean exists) throws SQLException + { + Set existingTables = getTableNames(); + for (String tableName : expectedTables) + { + assertEquals("Table " + tableName + (exists ? " is not found" : " actually exist"), exists, + existingTables.contains(tableName)); + } + } + + private Set getTableNames() throws SQLException + { + Set tableNames = new HashSet(); + Connection conn = null; + try + { + conn = openConnection(); + DatabaseMetaData metaData = conn.getMetaData(); + ResultSet tables = metaData.getTables(null, null, null, new String[] { "TABLE" }); + try + { + while (tables.next()) + { + tableNames.add(tables.getString("TABLE_NAME")); + } + } + finally + { + tables.close(); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + return tableNames; + } + + private Connection openConnection() throws SQLException + { + return DriverManager.getConnection(_connectionURL); + } + + + private void shutdownDerby() throws SQLException + { + Connection connection = null; + try + { + connection = DriverManager.getConnection("jdbc:derby:memory:/" + getTestName() + ";shutdown=true"); + } + catch(SQLException e) + { + if (e.getSQLState().equalsIgnoreCase("08006")) + { + //expected and represents a clean shutdown of this database only, do nothing. + } + else + { + throw e; + } + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js deleted file mode 100644 index 04016b5fae..0000000000 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/derby/add.js +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -define(["dojo/_base/xhr", - "dojo/dom", - "dojo/dom-construct", - "dojo/_base/window", - "dijit/registry", - "dojo/parser", - "dojo/_base/array", - "dojo/_base/event", - "dojo/_base/json", - "dojo/string", - "dojo/store/Memory", - "dijit/form/FilteringSelect", - "dojo/domReady!"], - function (xhr, dom, construct, win, registry, parser, array, event, json, string, Memory, FilteringSelect) { - return { - show: function() { - var node = dom.byId("addVirtualHost.storeSpecificDiv"); - var that = this; - - array.forEach(registry.toArray(), - function(item) { - if(item.id.substr(0,34) == "formAddVirtualHost.specific.store.") { - item.destroyRecursive(); - } - }); - - xhr.get({url: "virtualhost/store/derby/add.html", - sync: true, - load: function(data) { - node.innerHTML = data; - parser.parse(node); - - }}); - } - }; - }); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js deleted file mode 100644 index dd79aae2fa..0000000000 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/jdbc/add.js +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -define(["dojo/_base/xhr", - "dojo/dom", - "dojo/dom-construct", - "dojo/_base/window", - "dijit/registry", - "dojo/parser", - "dojo/_base/array", - "dojo/_base/event", - "dojo/_base/json", - "dojo/string", - "dojo/store/Memory", - "dijit/form/FilteringSelect", - "dojo/domReady!"], - function (xhr, dom, construct, win, registry, parser, array, event, json, string, Memory, FilteringSelect) { - return { - show: function() { - var node = dom.byId("addVirtualHost.storeSpecificDiv"); - var that = this; - - array.forEach(registry.toArray(), - function(item) { - if(item.id.substr(0,34) == "formAddVirtualHost.specific.store.") { - item.destroyRecursive(); - } - }); - - xhr.get({url: "virtualhost/store/jdbc/add.html", - sync: true, - load: function(data) { - node.innerHTML = data; - parser.parse(node); - - if (that.hasOwnProperty("poolTypeChooser")) - { - that.poolTypeChooser.destroy(); - } - - var selectPoolType = function(type) { - if(type && string.trim(type) != "") { - require(["qpid/management/virtualhost/store/pool/"+type.toLowerCase()+"/add"], - function(poolType) - { - poolType.show(); - }); - } - } - - xhr.get({ - sync: true, - url: "rest/helper?action=pluginList&plugin=JDBCConnectionProviderFactory", - handleAs: "json" - }).then( - function(data) { - var poolTypes = data; - var poolTypesData = []; - for (var i =0 ; i < poolTypes.length; i++) - { - poolTypesData[i]= {id: poolTypes[i], name: poolTypes[i]}; - } - var poolTypesStore = new Memory({ data: poolTypesData }); - var poolTypesDiv = dom.byId("addVirtualHost.specific.selectPoolType"); - var input = construct.create("input", {id: "addPoolType", required: false}, poolTypesDiv); - that.poolTypeChooser = new FilteringSelect({ id: "addVirtualHost.specific.store.poolType", - name: "connectionPool", - store: poolTypesStore, - searchAttr: "name", required: false, - onChange: selectPoolType }, input); - }); - - }}); - } - }; - }); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js deleted file mode 100644 index 7276737873..0000000000 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/pool/none/add.js +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -define(["dojo/_base/xhr", - "dojo/dom", - "dojo/dom-construct", - "dojo/_base/window", - "dijit/registry", - "dojo/parser", - "dojo/_base/array", - "dojo/_base/event", - "dojo/_base/json", - "dojo/string", - "dojo/store/Memory", - "dijit/form/FilteringSelect", - "dojo/domReady!"], - function (xhr, dom, construct, win, registry, parser, array, event, json, string, Memory, FilteringSelect) { - return { - show: function() { - var node = dom.byId("addVirtualHost.poolSpecificDiv"); - var that = this; - - array.forEach(registry.toArray(), - function(item) { - if(item.id.substr(0,39) == "formAddVirtualHost.specific.store.pool.") { - item.destroyRecursive(); - } - }); - - xhr.get({url: "virtualhost/store/pool/none/add.html", - sync: true, - load: function(data) { - node.innerHTML = data; - parser.parse(node); - - }}); - } - }; - }); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/derby/add.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/derby/add.html deleted file mode 100644 index 2ed5b35c10..0000000000 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/derby/add.html +++ /dev/null @@ -1,9 +0,0 @@ - - - - - -
Path to store location*: - -
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/jdbc/add.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/jdbc/add.html deleted file mode 100644 index 966b4fcc06..0000000000 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/jdbc/add.html +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - - - - -
JDBC Url*: - -
Connection Pool:
-
-
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index dbb2ee8aee..3c83715305 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -25,9 +25,9 @@ - + - + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java deleted file mode 100644 index ac310d02c9..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ /dev/null @@ -1,466 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store.derby; - - -import java.io.File; -import java.sql.Blob; -import java.sql.CallableStatement; -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; -import org.apache.qpid.util.FileUtils; - -/** - * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence - * mechanism. - * - */ -public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore -{ - - private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); - - private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; - - public static final String MEMORY_STORE_LOCATION = ":memory:"; - - private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; - - public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; - - public static final String TYPE = "DERBY"; - - private long _totalStoreSize; - private boolean _limitBusted; - private long _persistentSizeLowThreshold; - private long _persistentSizeHighThreshold; - - protected String _connectionURL; - - private String _storeLocation; - private Class _driverClass; - - public DerbyMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } - - @Override - protected String getSqlBlobType() - { - return "blob"; - } - - @Override - protected String getSqlVarBinaryType(int size) - { - return "varchar("+size+") for bit data"; - } - - @Override - protected String getSqlBigIntType() - { - return "bigint"; - } - - protected void doClose() throws SQLException - { - try - { - Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true"); - // Shouldn't reach this point - shutdown=true should throw SQLException - conn.close(); - getLogger().error("Unable to shut down the store"); - } - catch (SQLException e) - { - if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) - { - //expected and represents a clean shutdown of this database only, do nothing. - } - else - { - getLogger().error("Exception whilst shutting down the store: " + e); - throw e; - } - } - } - - @Override - protected void implementationSpecificConfiguration(String name, - VirtualHost virtualHost) - throws ClassNotFoundException - { - //Update to pick up QPID_WORK and use that as the default location not just derbyDB - - _driverClass = (Class) Class.forName(SQL_DRIVER_NAME); - - String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; - String databasePath = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); - if(databasePath == null) - { - databasePath = defaultPath; - } - - if(!MEMORY_STORE_LOCATION.equals(databasePath)) - { - File environmentPath = new File(databasePath); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - } - - _storeLocation = databasePath; - - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); - - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - - //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created. - _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true"; - - - - _eventManager.addEventListener(new EventListener() - { - @Override - public void event(Event event) - { - setInitialSize(); - } - }, Event.BEFORE_ACTIVATE); - - } - - private void setInitialSize() - { - Connection conn = null; - try - { - - - try - { - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - } - finally - { - if(conn != null) - { - conn.close(); - - - } - } - } - catch (SQLException e) - { - getLogger().error("Unable to set initial store size", e); - } - } - - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - byte[] bytes = blob.getBytes(1, (int)blob.length()); - return new String(bytes, UTF8_CHARSET); - } - - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - } - - - protected boolean tableExists(final String tableName, final Connection conn) throws SQLException - { - PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); - try - { - stmt.setString(1, tableName); - ResultSet rs = stmt.executeQuery(); - try - { - return rs.next(); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - - - @Override - public String getStoreLocation() - { - return _storeLocation; - } - - protected synchronized void storedSizeChange(final int delta) - { - if(getPersistentSizeHighThreshold() > 0) - { - synchronized(this) - { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 3*delta; - - Connection conn = null; - try - { - - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - } - else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) - { - long oldSize = _totalStoreSize; - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(conn); - - _totalStoreSize = getSizeOnDisk(conn); - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - - } - } - catch (SQLException e) - { - closeConnection(conn); - throw new RuntimeException("Exception while processing store size change", e); - } - } - } - } - - private void reduceSizeOnDisk(Connection conn) - { - CallableStatement cs = null; - PreparedStatement stmt = null; - try - { - String tableQuery = - "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; - stmt = conn.prepareStatement(tableQuery); - ResultSet rs = null; - - List schemas = new ArrayList(); - List tables = new ArrayList(); - - try - { - rs = stmt.executeQuery(); - while(rs.next()) - { - schemas.add(rs.getString(1)); - tables.add(rs.getString(2)); - } - } - finally - { - if(rs != null) - { - rs.close(); - } - } - - - cs = conn.prepareCall - ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); - - for(int i = 0; i < schemas.size(); i++) - { - cs.setString(1, schemas.get(i)); - cs.setString(2, tables.get(i)); - cs.setShort(3, (short) 0); - cs.execute(); - } - } - catch (SQLException e) - { - closeConnection(conn); - throw new RuntimeException("Error reducing on disk size", e); - } - finally - { - closePreparedStatement(stmt); - closePreparedStatement(cs); - } - - } - - private long getSizeOnDisk(Connection conn) - { - PreparedStatement stmt = null; - try - { - String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + - " FROM " + - " SYS.SYSTABLES systabs," + - " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + - " WHERE systabs.tabletype = 'T'"; - - stmt = conn.prepareStatement(sizeQuery); - - ResultSet rs = null; - long size = 0l; - - try - { - rs = stmt.executeQuery(); - while(rs.next()) - { - size = rs.getLong(1); - } - } - finally - { - if(rs != null) - { - rs.close(); - } - } - - return size; - - } - catch (SQLException e) - { - closeConnection(conn); - throw new RuntimeException("Error establishing on disk size", e); - } - finally - { - closePreparedStatement(stmt); - } - - } - - - private long getPersistentSizeLowThreshold() - { - return _persistentSizeLowThreshold; - } - - private long getPersistentSizeHighThreshold() - { - return _persistentSizeHighThreshold; - } - - @Override - public String getStoreType() - { - return TYPE; - } - - @Override - public void onDelete() - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Deleting store " + _storeLocation); - } - - if (MEMORY_STORE_LOCATION.equals(_storeLocation)) - { - return; - } - - if (_storeLocation != null) - { - File location = new File(_storeLocation); - if (location.exists()) - { - if (!FileUtils.delete(location, true)) - { - _logger.error("Cannot delete " + _storeLocation); - } - } - } - } - - protected Connection getConnection() throws SQLException - { - return DriverManager.getConnection(_connectionURL); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java deleted file mode 100644 index 1b111ad65e..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.derby; - -import java.util.Collections; -import java.util.Map; -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.MessageStoreFactory; -import org.apache.qpid.server.store.MessageStore; - -public class DerbyMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public String getType() - { - return DerbyMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new DerbyMessageStore(); - } - - @Override - public Map convertStoreConfiguration(Configuration configuration) - { - return Collections.emptyMap(); - } - - - @Override - public void validateAttributes(Map attributes) - { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) - { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH - +"' is required and must be of type String."); - - } - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java index c66fa4e869..54978776e7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/ConnectionProvider.java @@ -29,3 +29,4 @@ public interface ConnectionProvider void close() throws SQLException; } + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java deleted file mode 100644 index 7945ae3b46..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProvider.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.jdbc; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -class DefaultConnectionProvider implements ConnectionProvider -{ - private final String _connectionUrl; - - public DefaultConnectionProvider(String connectionUrl) - { - _connectionUrl = connectionUrl; - } - - @Override - public Connection getConnection() throws SQLException - { - return DriverManager.getConnection(_connectionUrl); - } - - @Override - public void close() throws SQLException - { - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java deleted file mode 100644 index 8fc7de12d0..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.jdbc; - -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; - -public class DefaultConnectionProviderFactory implements JDBCConnectionProviderFactory -{ - - @Override - public String getType() - { - return "NONE"; - } - - @Override - public ConnectionProvider getConnectionProvider(String connectionUrl, - VirtualHost virtualHost) - { - return new DefaultConnectionProvider(connectionUrl); - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java deleted file mode 100644 index f8d93536bb..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ /dev/null @@ -1,462 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store.jdbc; - - -import java.sql.Blob; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.Transaction; - -/** - * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence - * mechanism. - * - */ -public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStore -{ - - private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); - - - public static final String TYPE = "JDBC"; - public static final String CONNECTION_URL = "connectionURL"; - - protected String _connectionURL; - private ConnectionProvider _connectionProvider; - - - private static class JDBCDetails - { - private final String _vendor; - private String _blobType; - private String _varBinaryType; - private String _bigintType; - private boolean _useBytesMethodsForBlob; - - private JDBCDetails(String vendor, - String blobType, - String varBinaryType, - String bigIntType, - boolean useBytesMethodsForBlob) - { - _vendor = vendor; - setBlobType(blobType); - setVarBinaryType(varBinaryType); - setBigintType(bigIntType); - setUseBytesMethodsForBlob(useBytesMethodsForBlob); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - JDBCDetails that = (JDBCDetails) o; - - if (!getVendor().equals(that.getVendor())) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - return getVendor().hashCode(); - } - - @Override - public String toString() - { - return "JDBCDetails{" + - "vendor='" + getVendor() + '\'' + - ", blobType='" + getBlobType() + '\'' + - ", varBinaryType='" + getVarBinaryType() + '\'' + - ", bigIntType='" + getBigintType() + '\'' + - ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + - '}'; - } - - public String getVendor() - { - return _vendor; - } - - public String getBlobType() - { - return _blobType; - } - - public void setBlobType(String blobType) - { - _blobType = blobType; - } - - public String getVarBinaryType() - { - return _varBinaryType; - } - - public void setVarBinaryType(String varBinaryType) - { - _varBinaryType = varBinaryType; - } - - public boolean isUseBytesMethodsForBlob() - { - return _useBytesMethodsForBlob; - } - - public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) - { - _useBytesMethodsForBlob = useBytesMethodsForBlob; - } - - public String getBigintType() - { - return _bigintType; - } - - public void setBigintType(String bigintType) - { - _bigintType = bigintType; - } - } - - private static JDBCDetails DERBY_DETAILS = - new JDBCDetails("derby", - "blob", - "varchar(%d) for bit data", - "bigint", - false); - - private static JDBCDetails POSTGRESQL_DETAILS = - new JDBCDetails("postgresql", - "bytea", - "bytea", - "bigint", - true); - - private static JDBCDetails MYSQL_DETAILS = - new JDBCDetails("mysql", - "blob", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails SYBASE_DETAILS = - new JDBCDetails("sybase", - "image", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails ORACLE_DETAILS = - new JDBCDetails("oracle", - "blob", - "raw(%d)", - "number", - false); - - - private static Map VENDOR_DETAILS = new HashMap(); - - static - { - - addDetails(DERBY_DETAILS); - addDetails(POSTGRESQL_DETAILS); - addDetails(MYSQL_DETAILS); - addDetails(SYBASE_DETAILS); - addDetails(ORACLE_DETAILS); - } - - private static void addDetails(JDBCDetails details) - { - VENDOR_DETAILS.put(details.getVendor(), details); - } - - private String _blobType; - private String _varBinaryType; - private String _bigIntType; - private boolean _useBytesMethodsForBlob; - - private List _transactions = new CopyOnWriteArrayList(); - - - public JDBCMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } - - protected String getSqlBlobType() - { - return _blobType; - } - - protected String getSqlVarBinaryType(int size) - { - return String.format(_varBinaryType, size); - } - - public String getSqlBigIntType() - { - return _bigIntType; - } - - @Override - protected void doClose() throws AMQStoreException - { - while(!_transactions.isEmpty()) - { - RecordedJDBCTransaction txn = _transactions.get(0); - txn.abortTran(); - } - try - { - _connectionProvider.close(); - } - catch (SQLException e) - { - throw new AMQStoreException("Unable to close connection provider ", e); - } - } - - - protected Connection getConnection() throws SQLException - { - return _connectionProvider.getConnection(); - } - - - protected void implementationSpecificConfiguration(String name, - VirtualHost virtualHost) - throws ClassNotFoundException, SQLException - { - - - String connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null - ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH)) - : String.valueOf(virtualHost.getAttribute(CONNECTION_URL)); - - JDBCDetails details = null; - - String[] components = connectionURL.split(":",3); - if(components.length >= 2) - { - String vendor = components[1]; - details = VENDOR_DETAILS.get(vendor); - } - - if(details == null) - { - getLogger().info("Do not recognize vendor from connection URL: " + connectionURL); - - // TODO - is there a better default than derby - details = DERBY_DETAILS; - } - - - Object poolAttribute = virtualHost.getAttribute("connectionPool"); - String connectionPoolType = poolAttribute == null ? "DEFAULT" : String.valueOf(poolAttribute); - - JDBCConnectionProviderFactory connectionProviderFactory = - JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); - if(connectionProviderFactory == null) - { - _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); - connectionProviderFactory = new DefaultConnectionProviderFactory(); - } - - _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost); - - _blobType = getStringAttribute(virtualHost, "jdbcBlobType",details.getBlobType()); - _varBinaryType = getStringAttribute(virtualHost, "jdbcVarbinaryType",details.getVarBinaryType()); - _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, "jdbcBytesForBlob",details.isUseBytesMethodsForBlob()); - _bigIntType = getStringAttribute(virtualHost, "jdbcBigIntType", details.getBigintType()); - } - - - private String getStringAttribute(VirtualHost virtualHost, String attributeName, String defaultVal) - { - Object attrValue = virtualHost.getAttribute(attributeName); - if(attrValue != null) - { - return attrValue.toString(); - } - return defaultVal; - } - - private boolean getBooleanAttribute(VirtualHost virtualHost, String attributeName, boolean defaultVal) - { - Object attrValue = virtualHost.getAttribute(attributeName); - if(attrValue != null) - { - if(attrValue instanceof Boolean) - { - return ((Boolean) attrValue).booleanValue(); - } - else if(attrValue instanceof String) - { - return Boolean.parseBoolean((String)attrValue); - } - - } - return defaultVal; - } - - - protected void storedSizeChange(int contentSize) - { - } - - @Override - public String getStoreLocation() - { - return ""; - } - - @Override - public String getStoreType() - { - return TYPE; - } - - @Override - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - if(_useBytesMethodsForBlob) - { - return rs.getBytes(col); - } - else - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - - } - } - - @Override - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - byte[] bytes; - if(_useBytesMethodsForBlob) - { - bytes = rs.getBytes(col); - return new String(bytes,UTF8_CHARSET); - } - else - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - bytes = blob.getBytes(1, (int)blob.length()); - } - return new String(bytes, UTF8_CHARSET); - - } - - @Override - public Transaction newTransaction() - { - return new RecordedJDBCTransaction(); - } - - private class RecordedJDBCTransaction extends JDBCTransaction - { - private RecordedJDBCTransaction() - { - super(); - JDBCMessageStore.this._transactions.add(this); - } - - @Override - public void commitTran() throws AMQStoreException - { - try - { - super.commitTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public StoreFuture commitTranAsync() throws AMQStoreException - { - try - { - return super.commitTranAsync(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public void abortTran() throws AMQStoreException - { - try - { - super.abortTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java deleted file mode 100644 index 82d2275156..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.jdbc; - -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.MessageStoreFactory; -import org.apache.qpid.server.store.MessageStore; - -public class JDBCMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public String getType() - { - return JDBCMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new JDBCMessageStore(); - } - - @Override - public Map convertStoreConfiguration(Configuration storeConfiguration) - { - Map convertedMap = new HashMap(); - convertedMap.put("jdbcBlobType", storeConfiguration.getString("sqlBlobType")); - convertedMap.put("jdbcVarbinaryType", storeConfiguration.getString("sqlVarbinaryType")); - if(storeConfiguration.containsKey("useBytesForBlob")) - { - convertedMap.put("jdbcUseBytesForBlob", storeConfiguration.getBoolean("useBytesForBlob")); - } - convertedMap.put("jdbcBigIntType", storeConfiguration.getString("sqlBigIntType")); - convertedMap.put("connectionPool", storeConfiguration.getString("pool.type")); - convertedMap.put("minConnectionsPerPartition", storeConfiguration.getInteger("pool.minConnectionsPerPartition", - null)); - convertedMap.put("maxConnectionsPerPartition", storeConfiguration.getInteger("pool.maxConnectionsPerPartition", - null)); - convertedMap.put("partitionCount", storeConfiguration.getInteger("pool.partitionCount", null)); - - return convertedMap; - } - - - @Override - public void validateAttributes(Map attributes) - { - Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL); - if(!(connectionURL instanceof String)) - { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) - { - throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL - +"' is required and must be of type String."); - - } - } - } - -} diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory index 0edd44f5a5..02f22eb21a 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -16,6 +16,4 @@ # specific language governing permissions and limitations # under the License. # -org.apache.qpid.server.store.derby.DerbyMessageStoreFactory org.apache.qpid.server.store.MemoryMessageStoreFactory -org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java index c6473d9520..042abca9c4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java @@ -78,8 +78,7 @@ public class VirtualHostRecovererTest extends TestCase attributes.put(VirtualHost.NAME, getName()); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - attributes.put(VirtualHost.STORE_PATH, "/path/to/virtualhost/store"); - attributes.put(VirtualHost.STORE_TYPE, "DERBY"); + attributes.put(VirtualHost.STORE_TYPE, "MEMORY"); when(entry.getAttributes()).thenReturn(attributes); VirtualHost host = recoverer.create(null, entry, parent); @@ -99,8 +98,7 @@ public class VirtualHostRecovererTest extends TestCase attributes = new HashMap(); attributes.put(VirtualHost.NAME, getName()); - attributes.put(VirtualHost.STORE_PATH, "/path/to/store"); - attributes.put(VirtualHost.STORE_TYPE, "DERBY"); + attributes.put(VirtualHost.STORE_TYPE, "MEMORY"); mandatoryAttributes = new String[]{VirtualHost.NAME, VirtualHost.STORE_TYPE}; checkMandatoryAttributesAreValidated(mandatoryAttributes, attributes); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java new file mode 100644 index 0000000000..c6d166bc4c --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -0,0 +1,519 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockStoredMessage; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase +{ + private static final String EXCHANGE_NAME = "exchangeName"; + private String _storePath; + private String _storeName; + private MessageStore _messageStore; + private Configuration _configuration; + private VirtualHost _virtualHost; + + private ConfigurationRecoveryHandler _recoveryHandler; + private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; + private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; + private TransactionLogRecoveryHandler _logRecoveryHandler; + private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; + private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; + + private Exchange _exchange = mock(Exchange.class); + private static final String ROUTING_KEY = "routingKey"; + private static final String QUEUE_NAME = "queueName"; + private FieldTable _bindingArgs; + private UUID _queueId; + private UUID _exchangeId; + private DurableConfigurationStore _configStore; + + public void setUp() throws Exception + { + super.setUp(); + + _queueId = UUIDGenerator.generateRandomUUID(); + _exchangeId = UUIDGenerator.generateRandomUUID(); + + _storeName = getName(); + _storePath = TMP_FOLDER + File.separator + _storeName; + FileUtils.delete(new File(_storePath), true); + setTestSystemProperty("QPID_WORK", TMP_FOLDER); + _configuration = mock(Configuration.class); + _recoveryHandler = mock(ConfigurationRecoveryHandler.class); + _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); + _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); + _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); + _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); + _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); + _virtualHost = mock(VirtualHost.class); + + when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); + when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); + when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); + when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME)); + when(_exchange.getId()).thenReturn(_exchangeId); + when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn( + _storePath); + when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath); + + _bindingArgs = new FieldTable(); + AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue(); + String argValue = "some selector expression"; + _bindingArgs.put(argKey, argValue); + + reopenStore(); + } + + public void tearDown() throws Exception + { + FileUtils.delete(new File(_storePath), true); + super.tearDown(); + } + + public void testCreateExchange() throws Exception + { + Exchange exchange = createTestExchange(); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); + + reopenStore(); + verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()), + eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), + org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", + org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); + } + + private Map map(Object... vals) + { + Map map = new HashMap(); + boolean isValue = false; + String key = null; + for(Object obj : vals) + { + if(isValue) + { + map.put(key,obj); + } + else + { + key = (String) obj; + } + isValue = !isValue; + } + return map; + } + + public void testRemoveExchange() throws Exception + { + Exchange exchange = createTestExchange(); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); + + DurableConfigurationStoreHelper.removeExchange(_configStore, exchange); + + reopenStore(); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap()); + } + + public void testBindQueue() throws Exception + { + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, + _exchange, FieldTable.convertToMap(_bindingArgs)); + DurableConfigurationStoreHelper.createBinding(_configStore, binding); + + reopenStore(); + + Map map = new HashMap(); + map.put(org.apache.qpid.server.model.Binding.EXCHANGE, _exchange.getId().toString()); + map.put(org.apache.qpid.server.model.Binding.QUEUE, queue.getId().toString()); + map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY); + map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs)); + + verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()), + eq(map)); + } + + public void testUnbindQueue() throws Exception + { + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, + _exchange, FieldTable.convertToMap(_bindingArgs)); + DurableConfigurationStoreHelper.createBinding(_configStore, binding); + + DurableConfigurationStoreHelper.removeBinding(_configStore, binding); + reopenStore(); + + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), + eq(org.apache.qpid.server.model.Binding.class.getName()), + anyMap()); + } + + public void testCreateQueueAMQQueue() throws Exception + { + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); + + reopenStore(); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + } + + public void testCreateQueueAMQQueueFieldTable() throws Exception + { + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map attributes = new HashMap(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + + reopenStore(); + + + Map queueAttributes = new HashMap(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + } + + public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception + { + Exchange alternateExchange = createTestAlternateExchange(); + + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); + + reopenStore(); + + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + } + + private Exchange createTestAlternateExchange() + { + UUID exchUuid = UUID.randomUUID(); + Exchange alternateExchange = mock(Exchange.class); + when(alternateExchange.getId()).thenReturn(exchUuid); + return alternateExchange; + } + + public void testUpdateQueueExclusivity() throws Exception + { + // create queue + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map attributes = new HashMap(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + + // update the queue to have exclusive=false + queue = createTestQueue(getName(), getName() + "Owner", false); + when(queue.getArguments()).thenReturn(attributes); + + DurableConfigurationStoreHelper.updateQueue(_configStore, queue); + + reopenStore(); + + Map queueAttributes = new HashMap(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + + } + + public void testUpdateQueueAlternateExchange() throws Exception + { + // create queue + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map attributes = new HashMap(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + + // update the queue to have exclusive=false + Exchange alternateExchange = createTestAlternateExchange(); + queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); + when(queue.getArguments()).thenReturn(attributes); + + DurableConfigurationStoreHelper.updateQueue(_configStore, queue); + + reopenStore(); + + Map queueAttributes = new HashMap(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + } + + public void testRemoveQueue() throws Exception + { + // create queue + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map attributes = new HashMap(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + + // remove queue + DurableConfigurationStoreHelper.removeQueue(_configStore,queue); + reopenStore(); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), + eq(org.apache.qpid.server.model.Queue.class.getName()), + anyMap()); + } + + private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException + { + return createTestQueue(queueName, queueOwner, exclusive, null); + } + + private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException + { + AMQQueue queue = mock(AMQQueue.class); + when(queue.getName()).thenReturn(queueName); + when(queue.getNameShortString()).thenReturn(AMQShortString.valueOf(queueName)); + when(queue.getOwner()).thenReturn(AMQShortString.valueOf(queueOwner)); + when(queue.isExclusive()).thenReturn(exclusive); + when(queue.getId()).thenReturn(_queueId); + when(queue.getAlternateExchange()).thenReturn(alternateExchange); + return queue; + } + + private Exchange createTestExchange() + { + Exchange exchange = mock(Exchange.class); + when(exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(getName())); + when(exchange.getName()).thenReturn(getName()); + when(exchange.getTypeShortString()).thenReturn(AMQShortString.valueOf(getName() + "Type")); + when(exchange.isAutoDelete()).thenReturn(true); + when(exchange.getId()).thenReturn(_exchangeId); + return exchange; + } + + private void reopenStore() throws Exception + { + onReopenStore(); + if (_messageStore != null) + { + _messageStore.close(); + } + _messageStore = createMessageStore(); + _configStore = createConfigStore(); + + _configStore.configureConfigStore(_storeName, _recoveryHandler, _virtualHost); + _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler); + _messageStore.activate(); + } + + protected abstract void onReopenStore(); + + abstract protected MessageStore createMessageStore() throws Exception; + /*{ + String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); + if (storeClass == null) + { + storeClass = DerbyMessageStore.class.getName(); + } + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + MessageStore messageStore = (MessageStore) Class.forName(storeClass).newInstance(); + return messageStore; + } +*/ + abstract protected DurableConfigurationStore createConfigStore() throws Exception; + /*{ + String storeClass = System.getProperty(CONFIGURATION_STORE_CLASS_NAME_KEY); + if (storeClass == null) + { + storeClass = DerbyMessageStore.class.getName(); + } + Class clazz = (Class) Class.forName(storeClass); + DurableConfigurationStore configurationStore ; + if(clazz.isInstance(_messageStore)) + { + configurationStore = (DurableConfigurationStore) _messageStore; + } + else + { + configurationStore = (DurableConfigurationStore) Class.forName(storeClass).newInstance(); + } + return configurationStore; + } +*/ + public void testRecordXid() throws Exception + { + Record enqueueRecord = getTestRecord(1); + Record dequeueRecord = getTestRecord(2); + Record[] enqueues = { enqueueRecord }; + Record[] dequeues = { dequeueRecord }; + byte[] globalId = new byte[] { 1 }; + byte[] branchId = new byte[] { 2 }; + + Transaction transaction = _messageStore.newTransaction(); + transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); + transaction.commitTran(); + reopenStore(); + verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + transaction = _messageStore.newTransaction(); + transaction.removeXid(1l, globalId, branchId); + transaction.commitTran(); + + reopenStore(); + verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + } + + private Record getTestRecord(long messageNumber) + { + UUID queueId1 = UUIDGenerator.generateRandomUUID(); + TransactionLogResource queue1 = mock(TransactionLogResource.class); + when(queue1.getId()).thenReturn(queueId1); + EnqueableMessage message1 = mock(EnqueableMessage.class); + when(message1.isPersistent()).thenReturn(true); + when(message1.getMessageNumber()).thenReturn(messageNumber); + when(message1.getStoredMessage()).thenReturn(new MockStoredMessage(messageNumber)); + Record enqueueRecord = new TestRecord(queue1, message1); + return enqueueRecord; + } + + private static class TestRecord implements Record + { + private TransactionLogResource _queue; + private EnqueableMessage _message; + + public TestRecord(TransactionLogResource queue, EnqueableMessage message) + { + super(); + _queue = queue; + _message = message; + } + + @Override + public TransactionLogResource getQueue() + { + return _queue; + } + + @Override + public EnqueableMessage getMessage() + { + return _message; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); + result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (!(obj instanceof Record)) + { + return false; + } + Record other = (Record) obj; + if (_message == null && other.getMessage() != null) + { + return false; + } + if (_queue == null && other.getQueue() != null) + { + return false; + } + if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) + { + return false; + } + return _queue.getId().equals(other.getQueue().getId()); + } + + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java deleted file mode 100644 index 4a6b3f2cad..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ /dev/null @@ -1,520 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockStoredMessage; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.Transaction.Record; -import org.apache.qpid.server.store.derby.DerbyMessageStore; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -public class DurableConfigurationStoreTest extends QpidTestCase -{ - private static final String EXCHANGE_NAME = "exchangeName"; - private String _storePath; - private String _storeName; - private MessageStore _messageStore; - private Configuration _configuration; - private VirtualHost _virtualHost; - - private ConfigurationRecoveryHandler _recoveryHandler; - private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; - private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; - private TransactionLogRecoveryHandler _logRecoveryHandler; - private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; - private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; - - private Exchange _exchange = mock(Exchange.class); - private static final String ROUTING_KEY = "routingKey"; - private static final String QUEUE_NAME = "queueName"; - private FieldTable _bindingArgs; - private UUID _queueId; - private UUID _exchangeId; - private DurableConfigurationStore _configStore; - - public void setUp() throws Exception - { - super.setUp(); - - _queueId = UUIDGenerator.generateRandomUUID(); - _exchangeId = UUIDGenerator.generateRandomUUID(); - - _storeName = getName(); - _storePath = TMP_FOLDER + File.separator + _storeName; - FileUtils.delete(new File(_storePath), true); - setTestSystemProperty("QPID_WORK", TMP_FOLDER); - _configuration = mock(Configuration.class); - _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); - _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); - _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); - _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); - _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); - _virtualHost = mock(VirtualHost.class); - - when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); - when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); - when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME)); - when(_exchange.getId()).thenReturn(_exchangeId); - when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn( - _storePath); - when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath); - - _bindingArgs = new FieldTable(); - AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue(); - String argValue = "some selector expression"; - _bindingArgs.put(argKey, argValue); - - reopenStore(); - } - - public void tearDown() throws Exception - { - FileUtils.delete(new File(_storePath), true); - super.tearDown(); - } - - public void testCreateExchange() throws Exception - { - Exchange exchange = createTestExchange(); - DurableConfigurationStoreHelper.createExchange(_configStore, exchange); - - reopenStore(); - verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()), - eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), - org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", - org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); - } - - private Map map(Object... vals) - { - Map map = new HashMap(); - boolean isValue = false; - String key = null; - for(Object obj : vals) - { - if(isValue) - { - map.put(key,obj); - } - else - { - key = (String) obj; - } - isValue = !isValue; - } - return map; - } - - public void testRemoveExchange() throws Exception - { - Exchange exchange = createTestExchange(); - DurableConfigurationStoreHelper.createExchange(_configStore, exchange); - - DurableConfigurationStoreHelper.removeExchange(_configStore, exchange); - - reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap()); - } - - public void testBindQueue() throws Exception - { - AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); - Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, - _exchange, FieldTable.convertToMap(_bindingArgs)); - DurableConfigurationStoreHelper.createBinding(_configStore, binding); - - reopenStore(); - - Map map = new HashMap(); - map.put(org.apache.qpid.server.model.Binding.EXCHANGE, _exchange.getId().toString()); - map.put(org.apache.qpid.server.model.Binding.QUEUE, queue.getId().toString()); - map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY); - map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs)); - - verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()), - eq(map)); - } - - public void testUnbindQueue() throws Exception - { - AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); - Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, - _exchange, FieldTable.convertToMap(_bindingArgs)); - DurableConfigurationStoreHelper.createBinding(_configStore, binding); - - DurableConfigurationStoreHelper.removeBinding(_configStore, binding); - reopenStore(); - - verify(_recoveryHandler, never()).configuredObject(any(UUID.class), - eq(org.apache.qpid.server.model.Binding.class.getName()), - anyMap()); - } - - public void testCreateQueueAMQQueue() throws Exception - { - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); - - reopenStore(); - Map queueAttributes = new HashMap(); - queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); - } - - public void testCreateQueueAMQQueueFieldTable() throws Exception - { - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - Map attributes = new HashMap(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); - - reopenStore(); - - - Map queueAttributes = new HashMap(); - - queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - queueAttributes.put(Queue.ARGUMENTS, attributes); - - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); - } - - public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception - { - Exchange alternateExchange = createTestAlternateExchange(); - - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); - - reopenStore(); - - Map queueAttributes = new HashMap(); - queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); - } - - private Exchange createTestAlternateExchange() - { - UUID exchUuid = UUID.randomUUID(); - Exchange alternateExchange = mock(Exchange.class); - when(alternateExchange.getId()).thenReturn(exchUuid); - return alternateExchange; - } - - public void testUpdateQueueExclusivity() throws Exception - { - // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - Map attributes = new HashMap(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); - - // update the queue to have exclusive=false - queue = createTestQueue(getName(), getName() + "Owner", false); - when(queue.getArguments()).thenReturn(attributes); - - DurableConfigurationStoreHelper.updateQueue(_configStore, queue); - - reopenStore(); - - Map queueAttributes = new HashMap(); - - queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); - queueAttributes.put(Queue.ARGUMENTS, attributes); - - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); - - } - - public void testUpdateQueueAlternateExchange() throws Exception - { - // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - Map attributes = new HashMap(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); - - // update the queue to have exclusive=false - Exchange alternateExchange = createTestAlternateExchange(); - queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); - when(queue.getArguments()).thenReturn(attributes); - - DurableConfigurationStoreHelper.updateQueue(_configStore, queue); - - reopenStore(); - - Map queueAttributes = new HashMap(); - - queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.OWNER, getName()+"Owner"); - queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); - queueAttributes.put(Queue.ARGUMENTS, attributes); - queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); - } - - public void testRemoveQueue() throws Exception - { - // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - Map attributes = new HashMap(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); - - // remove queue - DurableConfigurationStoreHelper.removeQueue(_configStore,queue); - reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(UUID.class), - eq(org.apache.qpid.server.model.Queue.class.getName()), - anyMap()); - } - - private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException - { - return createTestQueue(queueName, queueOwner, exclusive, null); - } - - private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException - { - AMQQueue queue = mock(AMQQueue.class); - when(queue.getName()).thenReturn(queueName); - when(queue.getNameShortString()).thenReturn(AMQShortString.valueOf(queueName)); - when(queue.getOwner()).thenReturn(AMQShortString.valueOf(queueOwner)); - when(queue.isExclusive()).thenReturn(exclusive); - when(queue.getId()).thenReturn(_queueId); - when(queue.getAlternateExchange()).thenReturn(alternateExchange); - return queue; - } - - private Exchange createTestExchange() - { - Exchange exchange = mock(Exchange.class); - when(exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(getName())); - when(exchange.getName()).thenReturn(getName()); - when(exchange.getTypeShortString()).thenReturn(AMQShortString.valueOf(getName() + "Type")); - when(exchange.isAutoDelete()).thenReturn(true); - when(exchange.getId()).thenReturn(_exchangeId); - return exchange; - } - - private void reopenStore() throws Exception - { - if (_messageStore != null) - { - _messageStore.close(); - } - _messageStore = createMessageStore(); - _configStore = createConfigStore(); - - _configStore.configureConfigStore(_storeName, _recoveryHandler, _virtualHost); - _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler); - _messageStore.activate(); - } - - protected MessageStore createMessageStore() throws Exception - { - String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); - if (storeClass == null) - { - storeClass = DerbyMessageStore.class.getName(); - } - CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); - MessageStore messageStore = (MessageStore) Class.forName(storeClass).newInstance(); - return messageStore; - } - - protected DurableConfigurationStore createConfigStore() throws Exception - { - String storeClass = System.getProperty(CONFIGURATION_STORE_CLASS_NAME_KEY); - if (storeClass == null) - { - storeClass = DerbyMessageStore.class.getName(); - } - Class clazz = (Class) Class.forName(storeClass); - DurableConfigurationStore configurationStore ; - if(clazz.isInstance(_messageStore)) - { - configurationStore = (DurableConfigurationStore) _messageStore; - } - else - { - configurationStore = (DurableConfigurationStore) Class.forName(storeClass).newInstance(); - } - return configurationStore; - } - - public void testRecordXid() throws Exception - { - Record enqueueRecord = getTestRecord(1); - Record dequeueRecord = getTestRecord(2); - Record[] enqueues = { enqueueRecord }; - Record[] dequeues = { dequeueRecord }; - byte[] globalId = new byte[] { 1 }; - byte[] branchId = new byte[] { 2 }; - - Transaction transaction = _messageStore.newTransaction(); - transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); - transaction.commitTran(); - reopenStore(); - verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); - - transaction = _messageStore.newTransaction(); - transaction.removeXid(1l, globalId, branchId); - transaction.commitTran(); - - reopenStore(); - verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); - } - - private Record getTestRecord(long messageNumber) - { - UUID queueId1 = UUIDGenerator.generateRandomUUID(); - TransactionLogResource queue1 = mock(TransactionLogResource.class); - when(queue1.getId()).thenReturn(queueId1); - EnqueableMessage message1 = mock(EnqueableMessage.class); - when(message1.isPersistent()).thenReturn(true); - when(message1.getMessageNumber()).thenReturn(messageNumber); - when(message1.getStoredMessage()).thenReturn(new MockStoredMessage(messageNumber)); - Record enqueueRecord = new TestRecord(queue1, message1); - return enqueueRecord; - } - - private static class TestRecord implements Record - { - private TransactionLogResource _queue; - private EnqueableMessage _message; - - public TestRecord(TransactionLogResource queue, EnqueableMessage message) - { - super(); - _queue = queue; - _message = message; - } - - @Override - public TransactionLogResource getQueue() - { - return _queue; - } - - @Override - public EnqueableMessage getMessage() - { - return _message; - } - - @Override - public int hashCode() - { - final int prime = 31; - int result = 1; - result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); - result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) - { - return true; - } - if (obj == null) - { - return false; - } - if (!(obj instanceof Record)) - { - return false; - } - Record other = (Record) obj; - if (_message == null && other.getMessage() != null) - { - return false; - } - if (_queue == null && other.getQueue() != null) - { - return false; - } - if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) - { - return false; - } - return _queue.getId().equals(other.getQueue().getId()); - } - - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreCreatorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreCreatorTest.java deleted file mode 100644 index e74937dd1c..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreCreatorTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.store.derby.DerbyMessageStore; -import org.apache.qpid.test.utils.QpidTestCase; - -public class MessageStoreCreatorTest extends QpidTestCase -{ - private static final String[] STORE_TYPES = {MemoryMessageStore.TYPE, DerbyMessageStore.TYPE}; - - public void testMessageStoreCreator() - { - MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); - for (String type : STORE_TYPES) - { - MessageStore store = messageStoreCreator.createMessageStore(type); - assertNotNull("Store of type " + type + " is not created", store); - } - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java deleted file mode 100644 index 479675dac1..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.derby; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; -import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase -{ - private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class); - - private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10; - - /** - * Estimated using an assumption that a physical disk space occupied by a - * message is 3 times bigger then a message size - */ - private static final int OVERFULL_SIZE = (int) (MESSAGE_DATA.length * 3 * NUMBER_OF_MESSAGES_TO_OVERFILL_STORE * 0.8); - - private static final int UNDERFULL_SIZE = (int) (OVERFULL_SIZE * 0.8); - - @Override - protected int getNumberOfMessagesToFillStore() - { - return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; - } - - @Override - protected void applyStoreSpecificConfiguration(VirtualHost vhost) - { - _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); - - when(vhost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); - when(vhost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); - } - - @Override - protected MessageStore createStore() throws Exception - { - return new DerbyMessageStore(); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java deleted file mode 100644 index 859fad629b..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.derby; - -import java.io.File; - -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreTestCase; -import org.apache.qpid.util.FileUtils; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -public class DerbyMessageStoreTest extends MessageStoreTestCase -{ - private String _storeLocation; - - @Override - public void tearDown() throws Exception - { - try - { - deleteStoreIfExists(); - } - finally - { - super.tearDown(); - } - } - - public void testOnDelete() throws Exception - { - File location = new File(_storeLocation); - assertTrue("Store does not exist at " + _storeLocation, location.exists()); - - getStore().close(); - assertTrue("Store does not exist at " + _storeLocation, location.exists()); - - getStore().onDelete(); - assertFalse("Store exists at " + _storeLocation, location.exists()); - } - - @Override - protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception - { - _storeLocation = TMP_FOLDER + File.separator + getTestName(); - when(virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation); - deleteStoreIfExists(); - } - - private void deleteStoreIfExists() - { - File location = new File(_storeLocation); - if (location.exists()) - { - FileUtils.delete(location, true); - } - } - - @Override - protected MessageStore createMessageStore() - { - return new DerbyMessageStore(); - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java deleted file mode 100644 index a8e0460cea..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.jdbc; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashSet; -import java.util.Set; - -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreTestCase; -import org.apache.qpid.server.store.derby.DerbyMessageStore; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -public class JDBCMessageStoreTest extends MessageStoreTestCase -{ - private String _connectionURL; - - @Override - public void tearDown() throws Exception - { - try - { - shutdownDerby(); - } - finally - { - super.tearDown(); - } - } - - public void testOnDelete() throws Exception - { - String[] expectedTables = JDBCMessageStore.ALL_TABLES; - assertTablesExist(expectedTables, true); - getStore().close(); - assertTablesExist(expectedTables, true); - getStore().onDelete(); - assertTablesExist(expectedTables, false); - } - - @Override - protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception - { - _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; - - when(virtualHost.getAttribute(eq("connectionURL"))).thenReturn(_connectionURL); - } - - @Override - protected MessageStore createMessageStore() - { - return new JDBCMessageStore(); - } - - private void assertTablesExist(String[] expectedTables, boolean exists) throws SQLException - { - Set existingTables = getTableNames(); - for (String tableName : expectedTables) - { - assertEquals("Table " + tableName + (exists ? " is not found" : " actually exist"), exists, - existingTables.contains(tableName)); - } - } - - private Set getTableNames() throws SQLException - { - Set tableNames = new HashSet(); - Connection conn = null; - try - { - conn = openConnection(); - DatabaseMetaData metaData = conn.getMetaData(); - ResultSet tables = metaData.getTables(null, null, null, new String[] { "TABLE" }); - try - { - while (tables.next()) - { - tableNames.add(tables.getString("TABLE_NAME")); - } - } - finally - { - tables.close(); - } - } - finally - { - if (conn != null) - { - conn.close(); - } - } - return tableNames; - } - - private Connection openConnection() throws SQLException - { - return DriverManager.getConnection(_connectionURL); - } - - - private void shutdownDerby() throws SQLException - { - Connection connection = null; - try - { - connection = DriverManager.getConnection("jdbc:derby:memory:/" + getTestName() + ";shutdown=true"); - } - catch(SQLException e) - { - if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) - { - //expected and represents a clean shutdown of this database only, do nothing. - } - else - { - throw e; - } - } - finally - { - if (connection != null) - { - connection.close(); - } - } - } -} diff --git a/qpid/java/build.deps b/qpid/java/build.deps index e8c56f19af..630932638a 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -96,6 +96,8 @@ perftests.test.libs=${test.libs} broker-plugins-access-control.test.libs=${test.libs} broker-plugins-management-http.test.libs=${test.libs} broker-plugins-management-jmx.test.libs=${test.libs} +broker-plugins-jdbc-store.test.libs=${test.libs} +broker-plugins-derby-store.test.libs=${test.libs} management-common.test.libs=${test.libs} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 67e3e3b8f1..cf05cc0304 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -69,7 +69,6 @@ import org.apache.qpid.server.protocol.AmqpProtocolVersion; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.server.store.derby.DerbyMessageStore; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.FileUtils; @@ -839,7 +838,7 @@ public class QpidBrokerTestCase extends QpidTestCase if (System.getProperty("profile", "").startsWith("java-dby-mem")) { - storeDir = DerbyMessageStore.MEMORY_STORE_LOCATION; + storeDir = ":memory:"; } else if (!MEMORY_STORE_CLASS_NAME.equals(storeClassName)) { -- cgit v1.2.1