diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-18 20:51:43 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-18 20:51:43 +0000 |
| commit | 326b9560c14d1c30eb71c1396858791f9187d11e (patch) | |
| tree | 34ba78548d48295e88b9e038f382bd8861f32500 /qpid/java/broker-plugins/jdbc-store/src | |
| parent | 2622dda9c7d3267efd985b5ae5928b99063d2fa7 (diff) | |
| download | qpid-python-326b9560c14d1c30eb71c1396858791f9187d11e.tar.gz | |
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with
another persistent store implementation.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603626 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/jdbc-store/src')
8 files changed, 827 insertions, 529 deletions
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java new file mode 100644 index 0000000000..bd245fa4f4 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.jdbc; + + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; + +public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore +{ + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false); + private final List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<>(); + + private ConfiguredObject<?> _parent; + + @Override + public final void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + { + if (_messageStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + + doOpen(parent, storeSettings); + + createOrOpenMessageStoreDatabase(); + setMaximumMessageId(); + } + } + + protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) + throws StoreException; + + @Override + public final void upgradeStoreStructure() throws StoreException + { + checkMessageStoreOpen(); + + upgrade(_parent); + } + + @Override + public final void closeMessageStore() + { + if (_messageStoreOpen.compareAndSet(true, false)) + { + try + { + while(!_transactions.isEmpty()) + { + RecordedJDBCTransaction txn = _transactions.get(0); + txn.abortTran(); + } + } + finally + { + doClose(); + } + + } + } + + protected abstract void doClose(); + + protected boolean isMessageStoreOpen() + { + return _messageStoreOpen.get(); + } + + @Override + protected void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + + @Override + protected void storedSizeChange(int contentSize) + { + } + + @Override + public Transaction newTransaction() + { + return new RecordedJDBCTransaction(); + } + + + private class RecordedJDBCTransaction extends JDBCTransaction + { + private RecordedJDBCTransaction() + { + super(); + GenericAbstractJDBCMessageStore.this._transactions.add(this); + } + + @Override + public void commitTran() + { + try + { + super.commitTran(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public StoreFuture commitTranAsync() + { + try + { + return super.commitTranAsync(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public void abortTran() + { + try + { + super.abortTran(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + } +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java new file mode 100644 index 0000000000..7bafe5a859 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.jdbc; + + +import java.nio.charset.Charset; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; +import org.apache.qpid.server.store.*; +import org.apache.qpid.server.util.MapValueConverter; + +/** + * Implementation of a DurableConfigurationStore backed by Generic JDBC Database + * that also provides a MessageStore. + */ +public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStore implements MessageStoreProvider +{ + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + private static final Logger LOGGER = Logger.getLogger(GenericJDBCConfigurationStore.class); + + public static final String CONNECTION_URL = "connectionUrl"; + public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; + public static final String JDBC_BIG_INT_TYPE = "bigIntType"; + public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; + public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; + public static final String JDBC_BLOB_TYPE = "blobType"; + + private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); + private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); + + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + + private String _blobType; + private String _varBinaryType; + private String _bigIntType; + private boolean _useBytesMethodsForBlob; + + private ConfiguredObject<?> _parent; + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + throws StoreException + { + if (_configurationStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); + Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); + + JDBCDetails details = null; + + String[] components = _connectionURL.split(":", 3); + if(components.length >= 2) + { + String vendor = components[1]; + details = JDBCDetails.getDetails(vendor); + } + + if(details == null) + { + getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); + + details = JDBCDetails.getDefaultDetails(); + } + + String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); + + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + LOGGER.warn("Unknown connection pool type: " + + connectionPoolType + + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + try + { + _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); + } + catch (SQLException e) + { + throw new StoreException("Failed to create connection provider for " + _connectionURL); + } + _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); + _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); + _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); + _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, + storeSettings, + details.getBigintType()); + + createOrOpenConfigurationStoreDatabase(); + } + } + + @Override + public void upgradeStoreStructure() throws StoreException + { + checkConfigurationStoreOpen(); + upgradeIfNecessary(_parent); + } + + @Override + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); + } + + @Override + public void closeConfigurationStore() throws StoreException + { + if (_configurationStoreOpen.compareAndSet(true, false)) + { + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new StoreException("Unable to close connection provider ", e); + } + } + } + + @Override + protected String getSqlBlobType() + { + return _blobType; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return String.format(_varBinaryType, size); + } + + @Override + public String getSqlBigIntType() + { + return _bigIntType; + } + + @Override + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + byte[] bytes; + if(_useBytesMethodsForBlob) + { + bytes = rs.getBytes(col); + return new String(bytes,UTF8_CHARSET); + } + else + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + bytes = blob.getBytes(1, (int)blob.length()); + } + return new String(bytes, UTF8_CHARSET); + + } + + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + if(_useBytesMethodsForBlob) + { + return rs.getBytes(col); + } + else + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + + } + } + + @Override + protected void checkConfigurationStoreOpen() + { + if (!_configurationStoreOpen.get()) + { + throw new IllegalStateException("Configuration store is not open"); + } + } + + @Override + protected Logger getLogger() + { + return LOGGER; + } + + @Override + public MessageStore getMessageStore() + { + return _messageStoreFacade; + } + + private class MessageStoreWrapper extends GenericAbstractJDBCMessageStore + { + @Override + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + protected Connection getConnection() throws SQLException + { + return GenericJDBCConfigurationStore.this.getConnection(); + } + + @Override + protected void doClose() + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + public String getStoreLocation() + { + return GenericJDBCConfigurationStore.this._connectionURL; + } + + @Override + protected Logger getLogger() + { + return GenericJDBCConfigurationStore.this.getLogger(); + } + + @Override + protected String getSqlBlobType() + { + return GenericJDBCConfigurationStore.this.getSqlBlobType(); + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return GenericJDBCConfigurationStore.this.getSqlVarBinaryType(size); + } + + @Override + protected String getSqlBigIntType() + { + return GenericJDBCConfigurationStore.this.getSqlBigIntType(); + } + + @Override + protected byte[] getBlobAsBytes(final ResultSet rs, final int col) throws SQLException + { + return GenericJDBCConfigurationStore.this.getBlobAsBytes(rs, col); + } + } + + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java new file mode 100644 index 0000000000..dad4432183 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java @@ -0,0 +1,177 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.jdbc; + + +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.util.MapValueConverter; + +/** + * Implementation of a MessageStore backed by a Generic JDBC Database. + */ +public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore +{ + + private static final Logger _logger = Logger.getLogger(GenericJDBCMessageStore.class); + + public static final String TYPE = "JDBC"; + public static final String CONNECTION_URL = "connectionUrl"; + public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; + public static final String JDBC_BIG_INT_TYPE = "bigIntType"; + public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; + public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; + public static final String JDBC_BLOB_TYPE = "blobType"; + + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + + private String _blobType; + private String _varBinaryType; + private String _bigIntType; + private boolean _useBytesMethodsForBlob; + + + @Override + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) throws StoreException + { + _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); + + org.apache.qpid.server.store.jdbc.JDBCDetails details = null; + + String[] components = _connectionURL.split(":", 3); + if(components.length >= 2) + { + String vendor = components[1]; + details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDetails(vendor); + } + + if(details == null) + { + getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); + + details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDefaultDetails(); + } + + + _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); + _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); + _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); + _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, + storeSettings, + details.getBigintType()); + + Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); + String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); + + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + try + { + _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); + } + catch (SQLException e) + { + throw new StoreException("Failed to create connection provider for " + _connectionURL); + } + + } + + @Override + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); + } + + protected void doClose() + { + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new StoreException("Unable to close connection provider ", e); + } + } + + + @Override + protected Logger getLogger() + { + return _logger; + } + + @Override + protected String getSqlBlobType() + { + return _blobType; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return String.format(_varBinaryType, size); + } + + @Override + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + if(_useBytesMethodsForBlob) + { + return rs.getBytes(col); + } + else + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + + } + } + + @Override + public String getSqlBigIntType() + { + return _bigIntType; + } + + @Override + public String getStoreLocation() + { + return _connectionURL; + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java new file mode 100644 index 0000000000..6cf1413b83 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.store.jdbc; + +import java.util.HashMap; +import java.util.Map; + +public class JDBCDetails +{ + + private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<>(); + + private static JDBCDetails DERBY_DETAILS = + new JDBCDetails("derby", + "blob", + "varchar(%d) for bit data", + "bigint", + false); + + private static JDBCDetails POSTGRESQL_DETAILS = + new JDBCDetails("postgresql", + "bytea", + "bytea", + "bigint", + true); + + private static JDBCDetails MYSQL_DETAILS = + new JDBCDetails("mysql", + "blob", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails SYBASE_DETAILS = + new JDBCDetails("sybase", + "image", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails ORACLE_DETAILS = + new JDBCDetails("oracle", + "blob", + "raw(%d)", + "number", + false); + + + static + { + + addDetails(DERBY_DETAILS); + addDetails(POSTGRESQL_DETAILS); + addDetails(MYSQL_DETAILS); + addDetails(SYBASE_DETAILS); + addDetails(ORACLE_DETAILS); + } + + public static JDBCDetails getDetails(String vendor) + { + return VENDOR_DETAILS.get(vendor); + } + + public static JDBCDetails getDefaultDetails() + { + return DERBY_DETAILS; + } + + private static void addDetails(JDBCDetails details) + { + VENDOR_DETAILS.put(details.getVendor(), details); + } + + private final String _vendor; + private String _blobType; + private String _varBinaryType; + private String _bigintType; + private boolean _useBytesMethodsForBlob; + + JDBCDetails(String vendor, + String blobType, + String varBinaryType, + String bigIntType, + boolean useBytesMethodsForBlob) + { + _vendor = vendor; + setBlobType(blobType); + setVarBinaryType(varBinaryType); + setBigintType(bigIntType); + setUseBytesMethodsForBlob(useBytesMethodsForBlob); + } + + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + JDBCDetails that = (JDBCDetails) o; + + if (!getVendor().equals(that.getVendor())) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return getVendor().hashCode(); + } + + @Override + public String toString() + { + return "JDBCDetails{" + + "vendor='" + getVendor() + '\'' + + ", blobType='" + getBlobType() + '\'' + + ", varBinaryType='" + getVarBinaryType() + '\'' + + ", bigIntType='" + getBigintType() + '\'' + + ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + + '}'; + } + + public String getVendor() + { + return _vendor; + } + + public String getBlobType() + { + return _blobType; + } + + public void setBlobType(String blobType) + { + _blobType = blobType; + } + + public String getVarBinaryType() + { + return _varBinaryType; + } + + public void setVarBinaryType(String varBinaryType) + { + _varBinaryType = varBinaryType; + } + + public boolean isUseBytesMethodsForBlob() + { + return _useBytesMethodsForBlob; + } + + public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) + { + _useBytesMethodsForBlob = useBytesMethodsForBlob; + } + + public String getBigintType() + { + return _bigintType; + } + + public void setBigintType(String bigintType) + { + _bigintType = bigintType; + } +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java deleted file mode 100644 index 61ecb6748c..0000000000 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ /dev/null @@ -1,522 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store.jdbc; - - -import java.sql.Blob; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.log4j.Logger; - -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreProvider; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; -import org.apache.qpid.server.util.MapValueConverter; - -/** - * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence - * mechanism. - * - */ -public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider -{ - - private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); - - public static final String TYPE = "JDBC"; - public static final String CONNECTION_URL = "connectionUrl"; - public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; - public static final String JDBC_BIG_INT_TYPE = "bigIntType"; - public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; - public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; - public static final String JDBC_BLOB_TYPE = "blobType"; - - protected String _connectionURL; - private ConnectionProvider _connectionProvider; - - private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); - - private static class JDBCDetails - { - private final String _vendor; - private String _blobType; - private String _varBinaryType; - private String _bigintType; - private boolean _useBytesMethodsForBlob; - - private JDBCDetails(String vendor, - String blobType, - String varBinaryType, - String bigIntType, - boolean useBytesMethodsForBlob) - { - _vendor = vendor; - setBlobType(blobType); - setVarBinaryType(varBinaryType); - setBigintType(bigIntType); - setUseBytesMethodsForBlob(useBytesMethodsForBlob); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - JDBCDetails that = (JDBCDetails) o; - - if (!getVendor().equals(that.getVendor())) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - return getVendor().hashCode(); - } - - @Override - public String toString() - { - return "JDBCDetails{" + - "vendor='" + getVendor() + '\'' + - ", blobType='" + getBlobType() + '\'' + - ", varBinaryType='" + getVarBinaryType() + '\'' + - ", bigIntType='" + getBigintType() + '\'' + - ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + - '}'; - } - - public String getVendor() - { - return _vendor; - } - - public String getBlobType() - { - return _blobType; - } - - public void setBlobType(String blobType) - { - _blobType = blobType; - } - - public String getVarBinaryType() - { - return _varBinaryType; - } - - public void setVarBinaryType(String varBinaryType) - { - _varBinaryType = varBinaryType; - } - - public boolean isUseBytesMethodsForBlob() - { - return _useBytesMethodsForBlob; - } - - public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) - { - _useBytesMethodsForBlob = useBytesMethodsForBlob; - } - - public String getBigintType() - { - return _bigintType; - } - - public void setBigintType(String bigintType) - { - _bigintType = bigintType; - } - } - - private static JDBCDetails DERBY_DETAILS = - new JDBCDetails("derby", - "blob", - "varchar(%d) for bit data", - "bigint", - false); - - private static JDBCDetails POSTGRESQL_DETAILS = - new JDBCDetails("postgresql", - "bytea", - "bytea", - "bigint", - true); - - private static JDBCDetails MYSQL_DETAILS = - new JDBCDetails("mysql", - "blob", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails SYBASE_DETAILS = - new JDBCDetails("sybase", - "image", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails ORACLE_DETAILS = - new JDBCDetails("oracle", - "blob", - "raw(%d)", - "number", - false); - - - private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>(); - - static - { - - addDetails(DERBY_DETAILS); - addDetails(POSTGRESQL_DETAILS); - addDetails(MYSQL_DETAILS); - addDetails(SYBASE_DETAILS); - addDetails(ORACLE_DETAILS); - } - - private static void addDetails(JDBCDetails details) - { - VENDOR_DETAILS.put(details.getVendor(), details); - } - - private String _blobType; - private String _varBinaryType; - private String _bigIntType; - private boolean _useBytesMethodsForBlob; - - private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>(); - - - public JDBCMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } - - protected String getSqlBlobType() - { - return _blobType; - } - - protected String getSqlVarBinaryType(int size) - { - return String.format(_varBinaryType, size); - } - - public String getSqlBigIntType() - { - return _bigIntType; - } - - @Override - protected void doClose() - { - try - { - while(!_transactions.isEmpty()) - { - RecordedJDBCTransaction txn = _transactions.get(0); - txn.abortTran(); - } - } - finally - { - try - { - _connectionProvider.close(); - } - catch (SQLException e) - { - throw new StoreException("Unable to close connection provider ", e); - } - } - } - - - protected Connection getConnection() throws SQLException - { - return _connectionProvider.getConnection(); - } - - - protected void implementationSpecificConfiguration(String name, Map<String, Object> storeSettings) - throws ClassNotFoundException, SQLException - { - _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); - Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); - - JDBCDetails details = null; - - String[] components = _connectionURL.split(":",3); - if(components.length >= 2) - { - String vendor = components[1]; - details = VENDOR_DETAILS.get(vendor); - } - - if(details == null) - { - getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); - - // TODO - is there a better default than derby - details = DERBY_DETAILS; - } - - String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); - - JDBCConnectionProviderFactory connectionProviderFactory = - JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); - if(connectionProviderFactory == null) - { - _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); - connectionProviderFactory = new DefaultConnectionProviderFactory(); - } - - _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); - _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); - _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); - _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); - _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, storeSettings, details.getBigintType()); - } - - @Override - protected void storedSizeChange(int contentSize) - { - } - - public String getStoreLocation() - { - return _connectionURL; - } - - @Override - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - if(_useBytesMethodsForBlob) - { - return rs.getBytes(col); - } - else - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - - } - } - - @Override - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - byte[] bytes; - if(_useBytesMethodsForBlob) - { - bytes = rs.getBytes(col); - return new String(bytes,UTF8_CHARSET); - } - else - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - bytes = blob.getBytes(1, (int)blob.length()); - } - return new String(bytes, UTF8_CHARSET); - - } - - @Override - public Transaction newTransaction() - { - return new RecordedJDBCTransaction(); - } - - private class RecordedJDBCTransaction extends JDBCTransaction - { - private RecordedJDBCTransaction() - { - super(); - JDBCMessageStore.this._transactions.add(this); - } - - @Override - public void commitTran() - { - try - { - super.commitTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public StoreFuture commitTranAsync() - { - try - { - return super.commitTranAsync(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public void abortTran() - { - try - { - super.abortTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - } - - @Override - public MessageStore getMessageStore() - { - return _messageStoreFacade; - } - - private class MessageStoreWrapper implements MessageStore - { - - @Override - public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) - { - JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings); - } - - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) - { - return JDBCMessageStore.this.addMessage(metaData); - } - - @Override - public boolean isPersistent() - { - return JDBCMessageStore.this.isPersistent(); - } - - @Override - public Transaction newTransaction() - { - return JDBCMessageStore.this.newTransaction(); - } - - @Override - public void closeMessageStore() - { - JDBCMessageStore.this.closeMessageStore(); - } - - @Override - public void addEventListener(final EventListener eventListener, final Event... events) - { - JDBCMessageStore.this.addEventListener(eventListener, events); - } - - @Override - public void upgradeStoreStructure() throws StoreException - { - } - - @Override - public String getStoreLocation() - { - return JDBCMessageStore.this.getStoreLocation(); - } - - @Override - public void onDelete() - { - JDBCMessageStore.this.onDelete(); - } - - @Override - public void visitMessages(final MessageHandler handler) throws StoreException - { - JDBCMessageStore.this.visitMessages(handler); - } - - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException - { - JDBCMessageStore.this.visitMessageInstances(handler); - } - - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - JDBCMessageStore.this.visitDistributedTransactions(handler); - } - } - -} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java index 1dd39a8696..85e8f89dbe 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java @@ -26,7 +26,7 @@ import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.jdbc.JDBCMessageStore; +import org.apache.qpid.server.store.jdbc.GenericJDBCMessageStore; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @ManagedObject(category = false, type = JDBCVirtualHost.VIRTUAL_HOST_TYPE) @@ -45,6 +45,6 @@ public class JDBCVirtualHost extends AbstractVirtualHost<JDBCVirtualHost> @Override protected MessageStore createMessageStore() { - return new JDBCMessageStore().getMessageStore(); + return new GenericJDBCMessageStore(); } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java index 2108b4e09a..ab8f4554cb 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.jdbc.JDBCMessageStore; +import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore; import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode; @ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category = false ) @@ -62,7 +62,7 @@ public class JDBCVirtualHostNodeImpl extends AbstractStandardVirtualHostNode<JDB @Override protected DurableConfigurationStore createConfigurationStore() { - return new JDBCMessageStore(); + return new GenericJDBCConfigurationStore(); } @Override diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index 1f03b7f75f..8261e93347 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -52,7 +52,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase public void testOnDelete() throws Exception { - Set<String> expectedTables = JDBCMessageStore.MESSAGE_STORE_TABLE_NAMES; + Set<String> expectedTables = GenericJDBCMessageStore.MESSAGE_STORE_TABLE_NAMES; assertTablesExist(expectedTables, true); getStore().closeMessageStore(); assertTablesExist(expectedTables, true); @@ -65,7 +65,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase { _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); - messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL); + messageStoreSettings.put(GenericJDBCMessageStore.CONNECTION_URL, _connectionURL); return messageStoreSettings; } @@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return (new JDBCMessageStore()).getMessageStore(); + return new GenericJDBCMessageStore(); } private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException |
