diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-06-19 13:29:23 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-06-19 13:29:23 +0000 |
| commit | 6914ef1fd32597442c8a4dff89a3675c82932084 (patch) | |
| tree | e0849a07aeb7819f418a8a6475d6226a5e5f98c0 | |
| parent | a16002f9be0a06da956eb548d70a3fcd1adeab89 (diff) | |
| download | qpid-python-6914ef1fd32597442c8a4dff89a3675c82932084.tar.gz | |
QPID-950 : Fixed Derby Message Store
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@669480 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java | 183 |
1 files changed, 111 insertions, 72 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index cc22569d77..9d22e2b929 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -1,11 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ package org.apache.qpid.server.store; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQQueueFactory; + import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.txn.TransactionalContext; @@ -21,6 +42,7 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import java.io.File; +import java.io.ByteArrayInputStream; import java.sql.DriverManager; import java.sql.Driver; import java.sql.Connection; @@ -39,26 +61,6 @@ import java.util.HashMap; import java.util.TreeMap; -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ public class DerbyMessageStore implements MessageStore { @@ -67,7 +69,7 @@ public class DerbyMessageStore implements MessageStore private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; - private static final String DERBY_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; @@ -91,6 +93,39 @@ public class DerbyMessageStore implements MessageStore private String _connectionURL; + + private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; + private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; + private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"; + private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )"; + private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; + private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; + private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )"; + private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )"; + private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME; + private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; + private static final String SELECT_FROM_BINDINGS = + "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?"; + private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; + private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"; + private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; + private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"; + private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"; + private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)"; + private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; + private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)"; + private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?"; + private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)"; + private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)"; + private static final String SELECT_FROM_MESSAGE_META_DATA = + "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"; + private static final String SELECT_FROM_MESSAGE_CONTENT = + "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?"; + private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME; + private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; + + private enum State { INITIAL, @@ -129,10 +164,6 @@ public class DerbyMessageStore implements MessageStore createOrOpenDatabase(databasePath); - - - - // this recovers durable queues and persistent messages recover(); @@ -145,7 +176,7 @@ public class DerbyMessageStore implements MessageStore { if(DRIVER_CLASS == null) { - DRIVER_CLASS = (Class<Driver>) Class.forName(DERBY_DRIVER_NAME); + DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); } } @@ -163,7 +194,7 @@ public class DerbyMessageStore implements MessageStore createMessageMetaDataTable(conn); createMessageContentTable(conn); - + conn.close(); } @@ -174,10 +205,10 @@ public class DerbyMessageStore implements MessageStore { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"); + stmt.execute(CREATE_DB_VERSION_TABLE); stmt.close(); - PreparedStatement pstmt = conn.prepareStatement("INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"); + PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION); pstmt.setInt(1, DB_VERSION); pstmt.execute(); pstmt.close(); @@ -191,8 +222,8 @@ public class DerbyMessageStore implements MessageStore if(!tableExists(EXCHANGE_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); - - stmt.execute("CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"); + + stmt.execute(CREATE_EXCHANGE_TABLE); stmt.close(); } } @@ -202,7 +233,7 @@ public class DerbyMessageStore implements MessageStore if(!tableExists(QUEUE_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )"); + stmt.execute(CREATE_QUEUE_TABLE); stmt.close(); } } @@ -212,8 +243,7 @@ public class DerbyMessageStore implements MessageStore if(!tableExists(BINDINGS_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"); - + stmt.execute(CREATE_BINDINGS_TABLE); stmt.close(); } @@ -225,7 +255,7 @@ public class DerbyMessageStore implements MessageStore if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"); + stmt.execute(CREATE_QUEUE_ENTRY_TABLE); stmt.close(); } @@ -237,7 +267,7 @@ public class DerbyMessageStore implements MessageStore if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )"); + stmt.execute(CREATE_MESSAGE_META_DATA_TABLE); stmt.close(); } @@ -250,7 +280,7 @@ public class DerbyMessageStore implements MessageStore if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )"); + stmt.execute(CREATE_MESSAGE_CONTENT_TABLE); stmt.close(); } @@ -261,7 +291,7 @@ public class DerbyMessageStore implements MessageStore private boolean tableExists(final String tableName, final Connection conn) throws SQLException { - PreparedStatement stmt = conn.prepareStatement("SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"); + PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); stmt.setString(1, tableName); ResultSet rs = stmt.executeQuery(); boolean exists = rs.next(); @@ -283,8 +313,6 @@ public class DerbyMessageStore implements MessageStore recoverExchanges(); -// - try { @@ -317,7 +345,7 @@ public class DerbyMessageStore implements MessageStore Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT name, owner FROM " + QUEUE_TABLE_NAME); + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE); Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>(); while(rs.next()) { @@ -353,7 +381,7 @@ public class DerbyMessageStore implements MessageStore Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME); + ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE); Exchange exchange; while(rs.next()) @@ -391,7 +419,7 @@ public class DerbyMessageStore implements MessageStore { conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement("SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?"); + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS); stmt.setString(1, exchange.getName().toString()); ResultSet rs = stmt.executeQuery(); @@ -425,6 +453,7 @@ public class DerbyMessageStore implements MessageStore } queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT); + } } } @@ -439,9 +468,7 @@ public class DerbyMessageStore implements MessageStore public void close() throws Exception { - _closed.getAndSet(true); - } public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException @@ -462,12 +489,11 @@ public class DerbyMessageStore implements MessageStore MessageMetaData mmd = getMessageMetaData(storeContext, messageId); try { - PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA); stmt.setLong(1,messageId); wrapper.setRequiresCommit(); int results = stmt.executeUpdate(); - if (results == 0) { if (localTx) @@ -484,8 +510,7 @@ public class DerbyMessageStore implements MessageStore _logger.debug("Deleted metadata for message " + messageId); } - - stmt = conn.prepareStatement("DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"); + stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT); stmt.setLong(1,messageId); results = stmt.executeUpdate(); @@ -528,7 +553,7 @@ public class DerbyMessageStore implements MessageStore { conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"); + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE); stmt.setString(1, exchange.getName().toString()); stmt.setString(2, exchange.getType().toString()); stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0); @@ -542,7 +567,6 @@ public class DerbyMessageStore implements MessageStore if(conn != null) { conn.close(); - } } } @@ -561,7 +585,7 @@ public class DerbyMessageStore implements MessageStore try { conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE); stmt.setString(1, exchange.getName().toString()); int results = stmt.executeUpdate(); if(results == 0) @@ -606,16 +630,20 @@ public class DerbyMessageStore implements MessageStore try { conn = newConnection(); - // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob - PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"); + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS); stmt.setString(1, exchange.getName().toString() ); stmt.setString(2, queue.getName().toString()); stmt.setString(3, routingKey == null ? null : routingKey.toString()); if(args != null) { + /* This would be the Java 6 way of setting a Blob Blob blobArgs = conn.createBlob(); blobArgs.setBytes(0, args.getDataAsBytes()); stmt.setBlob(4, blobArgs); + */ + byte[] bytes = args.getDataAsBytes(); + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + stmt.setBinaryStream(4, bis, bytes.length); } else { @@ -662,7 +690,7 @@ public class DerbyMessageStore implements MessageStore { conn = newConnection(); // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob - PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS); stmt.setString(1, exchange.getName().toString() ); stmt.setString(2, queue.getName().toString()); stmt.setString(3, routingKey == null ? null : routingKey.toString()); @@ -711,7 +739,7 @@ public class DerbyMessageStore implements MessageStore Connection conn = newConnection(); PreparedStatement stmt = - conn.prepareStatement("INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)"); + conn.prepareStatement(INSERT_INTO_QUEUE); stmt.setString(1, queue.getName().toString()); stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString()); @@ -733,12 +761,12 @@ public class DerbyMessageStore implements MessageStore private Connection newConnection() throws SQLException { - return DriverManager.getConnection(_connectionURL); + final Connection connection = DriverManager.getConnection(_connectionURL); + return connection; } public void removeQueue(final AMQQueue queue) throws AMQException { - AMQShortString name = queue.getName(); _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); Connection conn = null; @@ -747,7 +775,7 @@ public class DerbyMessageStore implements MessageStore try { conn = newConnection(); - PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE); stmt.setString(1, name.toString()); int results = stmt.executeUpdate(); @@ -785,15 +813,15 @@ public class DerbyMessageStore implements MessageStore public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { + AMQShortString name = queue.getName(); boolean localTx = getOrCreateTransaction(context); Connection conn = getConnection(context); ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); - AMQShortString name = queue.getName(); try { - PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)"); + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); stmt.setString(1,name.toString()); stmt.setLong(2,messageId); stmt.executeUpdate(); @@ -826,15 +854,15 @@ public class DerbyMessageStore implements MessageStore public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - AMQShortString name = queue.getName(); + boolean localTx = getOrCreateTransaction(context); Connection conn = getConnection(context); ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); try { - PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?"); + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); stmt.setString(1,name.toString()); stmt.setLong(2,messageId); int results = stmt.executeUpdate(); @@ -931,17 +959,18 @@ public class DerbyMessageStore implements MessageStore try { + Connection conn = connWrapper.getConnection(); if(connWrapper.requiresCommit()) { - Connection conn = connWrapper.getConnection(); conn.commit(); if (_logger.isDebugEnabled()) { _logger.debug("commit tran completed"); } - conn.close(); + } + conn.close(); } catch (SQLException e) { @@ -1002,21 +1031,25 @@ public class DerbyMessageStore implements MessageStore int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException - { + { boolean localTx = getOrCreateTransaction(context); Connection conn = getConnection(context); ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); try { - PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)"); + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); stmt.setLong(1,messageId); stmt.setInt(2, index); byte[] chunkData = new byte[contentBody.getSize()]; contentBody.getData().duplicate().get(chunkData); + /* this would be the Java 6 way of doing things Blob dataAsBlob = conn.createBlob(); dataAsBlob.setBytes(1L, chunkData); stmt.setBlob(3, dataAsBlob); + */ + ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); + stmt.setBinaryStream(3, bis, chunkData.length); stmt.executeUpdate(); connWrapper.requiresCommit(); @@ -1048,7 +1081,7 @@ public class DerbyMessageStore implements MessageStore try { - PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)"); + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA); stmt.setLong(1,messageId); stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString()); stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString()); @@ -1060,9 +1093,13 @@ public class DerbyMessageStore implements MessageStore byte[] underlying = new byte[bodySize]; ByteBuffer buf = ByteBuffer.wrap(underlying); headerBody.writePayload(buf); +/* Blob dataAsBlob = conn.createBlob(); dataAsBlob.setBytes(1L, underlying); stmt.setBlob(6, dataAsBlob); +*/ + ByteArrayInputStream bis = new ByteArrayInputStream(underlying); + stmt.setBinaryStream(6,bis,underlying.length); stmt.setInt(7, mmd.getContentChunkCount()); @@ -1096,7 +1133,7 @@ public class DerbyMessageStore implements MessageStore try { - PreparedStatement stmt = conn.prepareStatement("SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?"); + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA); stmt.setLong(1,messageId); ResultSet rs = stmt.executeQuery(); @@ -1181,7 +1218,7 @@ public class DerbyMessageStore implements MessageStore try { - PreparedStatement stmt = conn.prepareStatement("SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?"); + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); stmt.setLong(1,messageId); stmt.setInt(2, index); ResultSet rs = stmt.executeQuery(); @@ -1269,6 +1306,7 @@ public class DerbyMessageStore implements MessageStore public void process() throws AMQException { _queue.enqueue(_context, _message); + } } @@ -1303,7 +1341,7 @@ public class DerbyMessageStore implements MessageStore TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null); Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME); + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); while (rs.next()) @@ -1318,6 +1356,7 @@ public class DerbyMessageStore implements MessageStore if (queue == null) { queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null); + _virtualHost.getQueueRegistry().registerQueue(queue); queues.put(queueName, queue); } |
