summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-22 08:49:53 +0000
committerGordon Sim <gsim@apache.org>2007-05-22 08:49:53 +0000
commit65647bc9645a0201f9c9e8ad91469ec2b9e9db17 (patch)
tree71a9a9c10e2d1bf9e5b7bafdd93c70b7620ffd3a /java/broker
parent3e981a1d1308bd57df075f423041d45bef73fed6 (diff)
downloadqpid-python-65647bc9645a0201f9c9e8ad91469ec2b9e9db17.tar.gz
Patch from Arnaud Simon (asimon@redhat.com) in connection with QPID-496. This adds a JDBC based message store implementation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540493 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java1759
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java570
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java135
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java87
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java93
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java85
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java106
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java196
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java554
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java7
13 files changed, 3344 insertions, 276 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5de59f47a3..43a04dbfa1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -473,7 +473,7 @@ public class AMQChannel
unacked.message.setRedelivered(true);
// Deliver Message
- deliveryContext.deliver(unacked.message, unacked.queue, false);
+ deliveryContext.deliver(unacked.message, unacked.queue, true);
// Should we allow access To the DM to directy deliver the message?
// As we don't need to check for Consumers or worry about incrementing the message count?
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
new file mode 100644
index 0000000000..44de0d3d98
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
@@ -0,0 +1,1759 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.*;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.sql.*;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 15-May-2007
+ * Time: 09:59:12
+ */
+public class JDBCStore implements MessageStore
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCStore.class);
+ // the database connection pool
+ public static ConnectionPool _connectionPool = null;
+ // the prepared statements
+ //==== IMPORTANT: remember to update if we add more prepared statements!
+ private static final int CREATE_EXCHANGE = 0;
+ private static final int DELETE_EXCHANGE = 1;
+ private static final int BIND_QUEUE = 2;
+ private static final int UNBIND_QUEUE = 3;
+ private static final int CREATE_QUEUE = 4;
+ private static final int DELETE_QUEUE = 5;
+ private static final int STAGE_MESSAGE = 6;
+ private static final int UPDATE_MESSAGE_PAYLOAD = 7;
+ private static final int SELECT_MESSAGE_PAYLOAD = 8;
+ private static final int DELETE_MESSAGE = 9;
+ private static final int ENQUEUE = 10;
+ private static final int DEQUEUE = 11;
+ private static final int GET_ALL_QUEUES = 12;
+ private static final int GET_ALL_MESSAGES = 13;
+ private static final int SAVE_RECORD = 14;
+ private static final int SAVE_XID = 15;
+ private static final int DELETE_RECORD = 16;
+ private static final int DELETE_XID = 17;
+ private static final int UPDATE_QMR = 18;
+ private static final int GET_CONTENT_HEADER = 19;
+ private static final int GET_MESSAGE_INFO = 20;
+ //==== size:
+ private static final int STATEMENT_SIZE = 21;
+ //========================================================================
+ // field properties
+ //========================================================================
+ //The default URL
+ protected String _connectionURL = "jdbc:derby:derbyDB;create=true";
+ // The default driver
+ private String _driver = "org.apache.derby.jdbc.EmbeddedDriver";
+ // The pool max size
+ private int _maxSize = 40;
+ // The tables
+ // the table containing the messages
+ private String _tableNameMessage = "MessageTable";
+ private String _tableNameQueue = "QueueTable";
+ private String _tableNameQueueMessageRelation = "QeueMessageRelation";
+ private String _tableNameExchange = "Exchange";
+ private String _tableNameExchangeQueueRelation = "ExchangeQueueRelation";
+ private String _tableNameTransaction = "TransactionTable";
+ private String _tableNameRecord = "RecordTable";
+
+ // The transaction maanger
+ private JDBCTransactionManager _tm;
+ // the message ID
+ private long _messageID = 0;
+ // the virtual host
+ private VirtualHost _virtualHost;
+ // indicate whether this store is recovering
+ private boolean _recovering = false;
+ // the recovered queues
+ private HashMap<Integer, AMQQueue> _queueMap;
+
+ //========================================================================
+ // Interface MessageStore
+ //========================================================================
+ public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException
+ {
+ _log.info("Configuring Derby message store");
+ // the virtual host
+ _virtualHost = virtualHost;
+ // Specify that the tables must be dropped.
+ // If true then this means that recovery is not possible.
+ boolean dropTables = true;
+ if (config != null)
+ {
+ dropTables = config.getBoolean(base + "dropTables", false);
+ _driver = config.getString(base + "driver", _driver);
+ _connectionURL = config.getString(base + "connectionURL", _connectionURL);
+ _maxSize = config.getInt(base + "connectionPoolSize", 20);
+ }
+ if (dropTables)
+ {
+ _log.info("Dropping table of Derby message store");
+ }
+ if (!setupStore(dropTables))
+ {
+ _log.error("Error configuration of Derby store failed");
+ throw new InternalErrorException("Error configuration of Derby store failed");
+ }
+ // recovery
+ _recovering = true;
+ _queueMap = recover(); //==> recover the queues and the messages
+ // recreate the excahnges and bind the queues
+ recoverExchanges(_queueMap);
+ _recovering = false;
+ _tm = (JDBCTransactionManager) tm;
+ _tm.configure(this, "txn", config);
+ _queueMap.clear();
+ _queueMap = null;
+ }
+
+ public void close()
+ throws
+ InternalErrorException
+ {
+ // nothing has to be done
+ }
+
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException
+ {
+ if (!_recovering)
+ {
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[CREATE_EXCHANGE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchange +
+ " (Name,Type) VALUES (?,?)");
+ connection.getStatements()[CREATE_EXCHANGE] = pstmt;
+ }
+ pstmt.setString(1, exchange.getName().asString());
+ pstmt.setString(2, exchange.getType().asString());
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot create Exchange: " + exchange);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot create Exchange: " + exchange);
+ }
+ }
+ }
+ }
+ }
+
+ public void removeExchange(Exchange exchange)
+ throws
+ InternalErrorException
+ {
+ if (!_recovering)
+ {
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[DELETE_EXCHANGE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchange +
+ " WHERE Name = ?");
+ connection.getStatements()[DELETE_EXCHANGE] = pstmt;
+ }
+ pstmt.setString(1, exchange.getName().asString());
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+ }
+ }
+ }
+ }
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException
+ {
+ if (!_recovering)
+ {
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[BIND_QUEUE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchangeQueueRelation +
+ " (QueueID,Name,RoutingKey,fieldTable) VALUES (?,?,?,?)");
+ connection.getStatements()[BIND_QUEUE] = pstmt;
+ }
+ pstmt.setInt(1, queue.getQueueID());
+ pstmt.setString(2, exchange.getName().asString());
+ pstmt.setString(3, routingKey.asString());
+ if (args != null)
+ {
+ pstmt.setBytes(4, args.getDataAsBytes());
+ } else
+ {
+ pstmt.setBytes(4, null);
+ }
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot create Exchange: " + exchange);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot create Exchange: " + exchange);
+ }
+ }
+ }
+ }
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException
+ {
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[UNBIND_QUEUE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchangeQueueRelation +
+ " WHERE QueueID = ? AND NAME = ? AND RoutingKey = ?");
+ connection.getStatements()[UNBIND_QUEUE] = pstmt;
+ }
+ pstmt.setInt(1, queue.getQueueID());
+ pstmt.setString(2, exchange.getName().asString());
+ pstmt.setString(3, routingKey.asString());
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot remove Exchange: " + exchange);
+ }
+ }
+ }
+ }
+
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException
+ {
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[CREATE_QUEUE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueue +
+ " (QueueID,Name,Owner) VALUES (?,?,?)");
+ connection.getStatements()[CREATE_QUEUE] = pstmt;
+ }
+ pstmt.setInt(1, queue.getQueueID());
+ pstmt.setString(2, queue.getName().asString());
+ if (queue.getOwner() != null)
+ {
+ pstmt.setString(3, queue.getOwner().asString());
+ } else
+ {
+ pstmt.setString(3, null);
+ }
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot create Queue: " + queue);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot create Queue: " + queue);
+ }
+ }
+ }
+ }
+
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException
+ {
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[DELETE_QUEUE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueue +
+ " WHERE QueueID = ?");
+ connection.getStatements()[DELETE_QUEUE] = pstmt;
+ }
+ pstmt.setInt(1, queue.getQueueID());
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot remove Queue: " + queue);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot remove Queue: " + queue);
+ }
+ }
+ }
+ }
+
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException
+ {
+ if (m.isStaged() || m.isEnqueued())
+ {
+ _log.error("Message with Id " + m.getMessageId() + " is already staged");
+ throw new MessageAlreadyStagedException("Message eith Id " + m.getMessageId() + " is already staged");
+ }
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ stage(connection, m);
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot stage Message: " + m);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot stage Message: " + m);
+ }
+ }
+ }
+ }
+
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ // The message must have been staged
+ if (!m.isStaged())
+ {
+ _log.error("Cannot append content of message Id "
+ + m.getMessageId() + " as it has not been staged");
+ throw new MessageDoesntExistException("Cannot append content of message Id "
+ + m.getMessageId() + " as it has not been staged");
+ }
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ appendContent(connection, m, data, offset, size);
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot stage Message: " + m);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot stage Message: " + m);
+ }
+ }
+ }
+ }
+
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ MyConnection connection = null;
+ try
+ {
+ byte[] result;
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[SELECT_MESSAGE_PAYLOAD];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
+ " WHERE MessageID = ? ");
+ connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
+ }
+ pstmt.setLong(1, m.getMessageId());
+ ResultSet rs = pstmt.executeQuery();
+ if (!rs.next())
+ {
+ throw new MessageDoesntExistException("Cannot load content of message Id "
+ + m.getMessageId() + " as it has not been found");
+ }
+ Blob myBlob = rs.getBlob(1);
+
+ if (myBlob.length() > 0)
+ {
+ if (size == 0)
+ {
+ result = myBlob.getBytes(offset, (int) myBlob.length());
+ } else
+ {
+ result = myBlob.getBytes(offset, size);
+ }
+ } else
+ {
+ throw new MessageDoesntExistException("Cannot load content of message Id "
+ + m.getMessageId() + " as it has not been found");
+ }
+ rs.close();
+ return result;
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot load Message: " + m);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot load Message: " + m);
+ }
+ }
+ }
+ }
+
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ destroy(connection, m);
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot destroy message: " + m);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot destroy message: " + m);
+ }
+ }
+ }
+ }
+
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ MyConnection connection = null;
+ // Get the current tx
+ JDBCTransaction tx = getTx(xid);
+ // If this operation is transacted then we need to add a record
+ if (tx != null && !tx.isPrepared())
+ {
+ // add an enqueue record
+ tx.addRecord(new JDBCEnqueueRecord(m, queue));
+ } else
+ {
+ try
+ {
+ if (tx != null)
+ {
+ connection = tx.getConnection();
+ } else
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ }
+ if (!m.isStaged() && !m.isEnqueued())
+ {
+ //This is the first time this message is enqueued and it has not been staged.
+ stage(connection, m);
+ appendContent(connection, m, m.getData(), 0, m.getData().length);
+ }
+ PreparedStatement pstmt = connection.getStatements()[ENQUEUE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueueMessageRelation +
+ " (QueueID,MessageID,Prepared) VALUES (?,?,0)");
+ connection.getStatements()[ENQUEUE] = pstmt;
+ }
+ pstmt.setInt(1, queue.getQueueID());
+ pstmt.setLong(2, m.getMessageId());
+ pstmt.executeUpdate();
+ m.enqueue(queue);
+ queue.enqueue(m);
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+ } finally
+ {
+ if (tx == null && connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+ }
+ }
+ }
+ }
+ }
+
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException
+ {
+ MyConnection connection = null;
+ // Get the current tx
+ JDBCTransaction tx = getTx(xid);
+ // If this operation is transacted then we need to add a record
+ if (tx != null && !tx.isPrepared())
+ {
+ // add an dequeue record
+ tx.addRecord(new JDBCDequeueRecord(m, queue));
+ } else
+ {
+ try
+ {
+ if (tx != null)
+ {
+ connection = tx.getConnection();
+ } else
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ }
+ PreparedStatement pstmt = connection.getStatements()[DEQUEUE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueueMessageRelation +
+ " WHERE QueueID = ? AND MessageID = ?");
+ connection.getStatements()[DEQUEUE] = pstmt;
+ }
+ pstmt.setInt(1, queue.getQueueID());
+ pstmt.setLong(2, m.getMessageId());
+ pstmt.executeUpdate();
+ m.dequeue(queue);
+ if (!m.isEnqueued())
+ {
+ // delete this message from persistence store
+ destroy(connection, m);
+ }
+ queue.dequeue(m);
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+ } finally
+ {
+ if (tx == null && connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue);
+ }
+ }
+ }
+ }
+ }
+
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException
+ {
+ MyConnection connection = null;
+ List<StorableQueue> result = new ArrayList<StorableQueue>();
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[GET_ALL_QUEUES];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("SELECT * FROM " + _tableNameQueue);
+ connection.getStatements()[GET_ALL_QUEUES] = pstmt;
+ }
+ ResultSet rs = pstmt.executeQuery();
+ while (rs.next())
+ {
+ //the queue owner may be null
+ AMQShortString queueOwner = null;
+ if (rs.getString(3) != null)
+ {
+ queueOwner = new AMQShortString(rs.getString(3));
+ }
+ result.add(new AMQQueue(new AMQShortString(rs.getString(2)), true, queueOwner,
+ false, _virtualHost));
+ }
+ rs.close();
+ return result;
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot get all queues");
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot get all queues");
+ }
+ }
+ }
+ }
+
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException
+ {
+ MyConnection connection = null;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ return getAllMessages(connection, queue);
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot get all queues");
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot get all queues");
+ }
+ }
+ }
+ }
+
+ public HashMap<Xid, Transaction> getAllInddoubt()
+ throws
+ InternalErrorException
+ {
+ MyConnection connection = null;
+ HashMap<Xid, Transaction> result = new HashMap<Xid, Transaction>();
+ try
+ {
+ TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+ MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
+ // re-create all the tx
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ Statement stmt = connection.getConnection().createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameTransaction);
+ JDBCTransaction foundTx;
+ Xid foundXid;
+ long foundXIDID;
+ while (rs.next())
+ {
+ // set the XID_ID
+ foundXIDID = rs.getLong(1);
+ if (foundXIDID > JDBCTransaction._xidId)
+ {
+ JDBCTransaction._xidId = foundXIDID;
+ }
+ foundTx = new JDBCTransaction();
+ foundXid = new XidImpl(rs.getBlob(3).getBytes(1, (int) rs.getBlob(3).length()),
+ rs.getInt(2), rs.getBlob(4).getBytes(1, (int) rs.getBlob(4).length()));
+ // get all the records
+ Statement stmtr = connection.getConnection().createStatement();
+ ResultSet rsr = stmtr.executeQuery("SELECT * FROM " + _tableNameRecord +
+ " WHERE XID_ID = " + rs.getLong(1));
+ int foundType;
+ AMQQueue foundQueue;
+ StorableMessage foundMessage;
+ TransactionRecord foundRecord;
+ while (rsr.next())
+ {
+ // those messages were not recovered before so they need to be recreated
+ foundType = rsr.getInt(2);
+ foundQueue = _queueMap.get(new Integer(rsr.getInt(4)));
+ foundMessage = new AMQMessage(rs.getLong(3), this, messageHandleFactory, txnContext);
+ if (foundType == JDBCAbstractRecord.TYPE_DEQUEUE)
+ {
+ foundRecord = new JDBCDequeueRecord(foundMessage, foundQueue);
+ } else
+ {
+ foundRecord = new JDBCEnqueueRecord(foundMessage, foundQueue);
+ }
+ foundTx.addRecord(foundRecord);
+ }
+ rsr.close();
+ // add this tx to the map
+ result.put(foundXid, foundTx);
+ }
+ rs.close();
+ return result;
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot recover: ", e);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot recover: ", e);
+ }
+ }
+ }
+ }
+
+
+ public long getNewMessageId()
+ {
+ return _messageID++;
+ }
+
+ //========================================================================
+ // Public methods
+ //========================================================================
+
+ public MyConnection getConnection()
+ throws
+ Exception
+ {
+ return (MyConnection) _connectionPool.acquireInstance();
+ }
+
+ public void commitConnection(MyConnection connection)
+ throws
+ InternalErrorException
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot commit connection =");
+ }
+ }
+
+ public void rollbackConnection(MyConnection connection)
+ throws
+ InternalErrorException
+ {
+ try
+ {
+ connection.getConnection().rollback();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to rollback this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot rollback connection");
+ }
+ }
+
+ public void appendContent(MyConnection connection, StorableMessage m, byte[] data, int offset, int size)
+ throws
+ SQLException,
+ MessageDoesntExistException
+ {
+ PreparedStatement pstmt = connection.getStatements()[SELECT_MESSAGE_PAYLOAD];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
+ " WHERE MessageID = ? ");
+ connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
+ }
+ pstmt.setLong(1, m.getMessageId());
+ ResultSet rs = pstmt.executeQuery();
+ if (!rs.next())
+ {
+ throw new MessageDoesntExistException("Cannot append content of message Id "
+ + m.getMessageId() + " as it has not been found");
+ }
+ Blob myBlob = rs.getBlob(1);
+ byte[] oldPayload;
+ if (myBlob != null && myBlob.length() > 0)
+ {
+ oldPayload = myBlob.getBytes(1, (int) myBlob.length());
+ } else
+ {
+ oldPayload = new byte[0];
+ }
+ rs.close();
+ byte[] newPayload = new byte[oldPayload.length + size];
+ ByteBuffer buffer = ByteBuffer.wrap(newPayload);
+ buffer.put(oldPayload);
+ buffer.put(data, offset, size);
+ PreparedStatement pstmtUpdate = connection.getStatements()[UPDATE_MESSAGE_PAYLOAD];
+ if (pstmtUpdate == null)
+ {
+ pstmtUpdate = connection.getConnection().prepareStatement("UPDATE " + _tableNameMessage +
+ " SET Payload = ? WHERE MessageID = ?");
+ connection.getStatements()[UPDATE_MESSAGE_PAYLOAD] = pstmtUpdate;
+ }
+ pstmtUpdate.setBytes(1, newPayload);
+ pstmtUpdate.setLong(2, m.getMessageId());
+ pstmtUpdate.executeUpdate();
+ }
+
+ public void stage(MyConnection connection, StorableMessage m)
+ throws
+ Exception
+ {
+ PreparedStatement pstmt = connection.getStatements()[STAGE_MESSAGE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameMessage +
+ " (MessageID,Header,ExchangeName,RoutingKey,Mandatory,Is_Immediate) VALUES (?,?,?,?,?,?)");
+ connection.getStatements()[STAGE_MESSAGE] = pstmt;
+ }
+ pstmt.setLong(1, m.getMessageId());
+ pstmt.setBytes(2, m.getHeaderBody());
+ pstmt.setString(3, ((AMQMessage) m).getMessagePublishInfo().getExchange().asString());
+ pstmt.setString(4, ((AMQMessage) m).getMessagePublishInfo().getRoutingKey().asString());
+ pstmt.setBoolean(5, ((AMQMessage) m).getMessagePublishInfo().isMandatory());
+ pstmt.setBoolean(6, ((AMQMessage) m).getMessagePublishInfo().isImmediate());
+ pstmt.executeUpdate();
+ m.staged();
+ }
+
+ public void saveRecord(MyConnection connection, JDBCTransaction tx, JDBCAbstractRecord record)
+ throws
+ InternalErrorException
+ {
+ try
+ {
+ PreparedStatement pstmt = connection.getStatements()[SAVE_RECORD];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameRecord +
+ " (XID_ID,Type,MessageID,QueueID) VALUES (?,?,?,?)");
+ connection.getStatements()[SAVE_RECORD] = pstmt;
+ }
+ pstmt.setLong(1, tx.getXidID());
+ pstmt.setInt(2, record.getType());
+ pstmt.setLong(3, record.getMessageID());
+ pstmt.setLong(4, record.getQueueID());
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot save record: " + record);
+ }
+ }
+
+ public void saveXID(MyConnection connection, JDBCTransaction tx, Xid xid)
+ throws
+ InternalErrorException
+ {
+ try
+ {
+ PreparedStatement pstmt = connection.getStatements()[SAVE_XID];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameTransaction +
+ " (XID_ID,FormatId, BranchQualifier,GlobalTransactionId) VALUES (?,?,?,?)");
+ connection.getStatements()[SAVE_XID] = pstmt;
+ }
+ pstmt.setLong(1, tx.getXidID());
+ pstmt.setInt(2, xid.getFormatId());
+ pstmt.setBytes(3, xid.getBranchQualifier());
+ pstmt.setBytes(4, xid.getGlobalTransactionId());
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot save xid: " + xid);
+ }
+ }
+
+ public void deleteRecords(MyConnection connection, JDBCTransaction tx)
+ throws
+ InternalErrorException
+ {
+ try
+ {
+ PreparedStatement pstmt = connection.getStatements()[DELETE_RECORD];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameRecord +
+ " WHERE XID_ID = ?");
+ connection.getStatements()[DELETE_RECORD] = pstmt;
+ }
+ pstmt.setLong(1, tx.getXidID());
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot delete record: " + tx.getXidID());
+ }
+ }
+
+ public void deleteXID(MyConnection connection, JDBCTransaction tx)
+ throws
+ InternalErrorException
+ {
+ try
+ {
+ PreparedStatement pstmt = connection.getStatements()[DELETE_XID];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameTransaction +
+ " WHERE XID_ID = ?");
+ connection.getStatements()[DELETE_XID] = pstmt;
+ }
+ pstmt.setLong(1, tx.getXidID());
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot delete xid: " + tx.getXidID());
+ }
+ }
+
+ public void prepareDequeu(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ UnknownXidException,
+ InternalErrorException
+ {
+ JDBCTransaction tx = getTx(xid);
+ if (tx == null)
+ {
+ throw new UnknownXidException(xid);
+ }
+ updateQueueMessageRelation(tx.getConnection(), queue.getQueueID(), m.getMessageId(), 1);
+
+ }
+
+ public void rollbackDequeu(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ UnknownXidException,
+ InternalErrorException
+ {
+ JDBCTransaction tx = getTx(xid);
+ if (tx == null)
+ {
+ throw new UnknownXidException(xid);
+ }
+ updateQueueMessageRelation(tx.getConnection(), queue.getQueueID(), m.getMessageId(), 0);
+ }
+
+ //========================================================================
+ // Private methods
+ //========================================================================
+
+
+ private void updateQueueMessageRelation(MyConnection connection,
+ int queueID, long messageId, int prepared)
+ throws
+ InternalErrorException
+ {
+ try
+ {
+ PreparedStatement pstmt = connection.getStatements()[UPDATE_QMR];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("UPDATE " + _tableNameQueueMessageRelation +
+ " SET Prepared = ? WHERE MessageID = ? AND QueueID = ?");
+ connection.getStatements()[UPDATE_QMR] = pstmt;
+ }
+ pstmt.setInt(1, prepared);
+ pstmt.setLong(2, messageId);
+ pstmt.setInt(3, queueID);
+ pstmt.executeUpdate();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot update QMR", e);
+ }
+
+ }
+
+ public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+ throws
+ InternalErrorException
+ {
+ MyConnection connection = null;
+ MessagePublishInfo result;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[GET_MESSAGE_INFO];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("SELECT ExchangeName, RoutingKey," +
+ " Mandatory, Is_Immediate from " + _tableNameMessage +
+ " WHERE MessageID = ?");
+ connection.getStatements()[GET_MESSAGE_INFO] = pstmt;
+ }
+ pstmt.setLong(1, m.getMessageId());
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next())
+ {
+ final AMQShortString exchange = new AMQShortString(rs.getString(1));
+ final AMQShortString routingKey = new AMQShortString(rs.getString(2));
+ final boolean mandatory = rs.getBoolean(3);
+ final boolean immediate = rs.getBoolean(4);
+ result = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+ } else
+ {
+ throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
+ }
+ rs.close();
+ return result;
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
+ }
+ }
+ }
+ }
+
+ public ContentHeaderBody getContentHeaderBody(StorableMessage m)
+ throws
+ InternalErrorException
+ {
+ MyConnection connection = null;
+ ContentHeaderBody result;
+ try
+ {
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ PreparedStatement pstmt = connection.getStatements()[GET_CONTENT_HEADER];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("SELECT Header from " + _tableNameMessage +
+ " WHERE MessageID = ?");
+ connection.getStatements()[GET_CONTENT_HEADER] = pstmt;
+ }
+ pstmt.setLong(1, m.getMessageId());
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next())
+ {
+ result = new ContentHeaderBody(ByteBuffer.wrap(rs.getBlob(1).getBytes(1, (int) rs.getBlob(1).length())), 0);
+ } else
+ {
+ throw new InternalErrorException("Cannot get Content Header of message: " + m);
+ }
+ rs.close();
+ return result;
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot get Content Header of message: " + m);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot get Content Header of message: " + m);
+ }
+ }
+ }
+ }
+
+ private List<StorableMessage> getAllMessages(MyConnection connection, StorableQueue queue)
+ throws
+ SQLException,
+ AMQException
+ {
+ List<StorableMessage> result = new ArrayList<StorableMessage>();
+ TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+ MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
+ PreparedStatement pstmt = connection.getStatements()[GET_ALL_MESSAGES];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("SELECT " + _tableNameMessage + ".MessageID, Header FROM " +
+ _tableNameMessage +
+ " INNER JOIN " +
+ _tableNameQueueMessageRelation +
+ " ON " +
+ _tableNameMessage + ".MessageID = " + _tableNameQueueMessageRelation + ".MessageID" +
+ " WHERE " +
+ _tableNameQueueMessageRelation + ".QueueID = ?" +
+ " AND " +
+ _tableNameQueueMessageRelation + ".Prepared = 0");
+ connection.getStatements()[GET_ALL_MESSAGES] = pstmt;
+ }
+ pstmt.setInt(1, queue.getQueueID());
+ ResultSet rs = pstmt.executeQuery();
+ AMQMessage foundMessage;
+ // ContentHeaderBody hb;
+ while (rs.next())
+ {
+ foundMessage = new AMQMessage(rs.getLong(1), this, messageHandleFactory, txnContext);
+ result.add(foundMessage);
+ }
+ rs.close();
+ return result;
+ }
+
+ private HashMap<Integer, AMQQueue> recover()
+ throws
+ InternalErrorException
+ {
+ MyConnection connection = null;
+ HashMap<Integer, AMQQueue> result = new HashMap<Integer, AMQQueue>();
+ try
+ {
+ // re-create all the queues
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ Statement stmt = connection.getConnection().createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameQueue);
+ AMQQueue foundQueue;
+ List<StorableMessage> foundMessages;
+ StoreContext context = new StoreContext();
+ while (rs.next())
+ {
+ AMQShortString owner = null;
+ if (rs.getString(3) != null)
+ {
+ owner = new AMQShortString(rs.getString(3));
+ }
+ foundQueue = new AMQQueue(new AMQShortString(rs.getString(2)),
+ true, owner, false, _virtualHost);
+ // get all the Messages of that queue
+ foundMessages = getAllMessages(connection, foundQueue);
+ // enqueue those messages
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Recovering " + foundMessages.size() + " messages for queue " + foundQueue.getName());
+ }
+ for (StorableMessage foundMessage : foundMessages)
+ {
+ foundMessage.staged();
+ foundMessage.enqueue(foundQueue);
+ foundQueue.enqueue(foundMessage);
+ foundQueue.process(context, (AMQMessage) foundMessage, false);
+ }
+ // add the queue in the result map
+ result.put(foundQueue.getQueueID(), foundQueue);
+ // add it in the registry
+ _virtualHost.getQueueRegistry().registerQueue(foundQueue);
+ }
+ rs.close();
+ return result;
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot recover: ", e);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot recover: ", e);
+ }
+ }
+ }
+ }
+
+ private void recoverExchanges(HashMap<Integer, AMQQueue> queueMap)
+ throws
+ InternalErrorException
+ {
+ MyConnection connection = null;
+ try
+ {
+ // re-create all the exchanges
+ connection = (MyConnection) _connectionPool.acquireInstance();
+ Statement stmt = connection.getConnection().createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameExchange);
+ Exchange foundExchange;
+ AMQQueue foundQueue;
+ while (rs.next())
+ {
+ foundExchange = _virtualHost.getExchangeFactory().createExchange(
+ new AMQShortString(rs.getString(1)), new AMQShortString(rs.getString(2)), true, false, 0);
+ // get all the bindings
+ Statement stmtb = connection.getConnection().createStatement();
+ ResultSet rsb = stmtb.executeQuery("SELECT * FROM " + _tableNameExchangeQueueRelation +
+ " WHERE Name = '" + rs.getString(1) + "'");
+ while (rsb.next())
+ {
+ foundQueue = queueMap.get(new Integer(rsb.getInt(1)));
+ if (foundQueue != null)
+ {
+ // the field table
+ FieldTable ft = null;
+ if (rsb.getBlob(4) != null)
+ {
+ long length = rsb.getBlob(4).length();
+ ByteBuffer buffer = ByteBuffer.wrap(rsb.getBlob(4).getBytes(1, (int) length));
+ ft = new FieldTable(buffer, length);
+ }
+ foundQueue.bind(new AMQShortString(rsb.getString(3)), ft, foundExchange);
+ }
+ }
+ rsb.close();
+ // register this exchange
+ _virtualHost.getExchangeRegistry().registerExchange(foundExchange);
+ }
+ rs.close();
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Cannot recover: ", e);
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.getConnection().commit();
+ _connectionPool.releaseInstance(connection);
+ } catch (SQLException e)
+ {
+ // we did not manage to commit this connection
+ // it is better to release it
+ _connectionPool.releaseDeadInstance();
+ throw new InternalErrorException("Cannot recover: ", e);
+ }
+ }
+ }
+ }
+
+ private void destroy(MyConnection connection, StorableMessage m)
+ throws
+ SQLException
+ {
+ PreparedStatement pstmt = connection.getStatements()[DELETE_MESSAGE];
+ if (pstmt == null)
+ {
+ pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameMessage +
+ " WHERE MessageID = ?");
+ connection.getStatements()[DELETE_MESSAGE] = pstmt;
+ }
+ pstmt.setLong(1, m.getMessageId());
+ pstmt.executeUpdate();
+ }
+
+ private JDBCTransaction getTx(Xid xid)
+ throws
+ UnknownXidException
+ {
+ JDBCTransaction tx = null;
+ if (xid != null)
+ {
+ tx = _tm.getTransaction(xid);
+ }
+ return tx;
+ }
+
+ /**
+ * setupConnections - Initialize the connections
+ *
+ * @return true if ok
+ */
+ private synchronized boolean setupConnections()
+ {
+ try
+ {
+ if (_connectionPool == null)
+ {
+ // In an embedded environment, loading the driver also starts Derby.
+ Class.forName(_driver).newInstance();
+ _connectionPool = new ConnectionPool(_maxSize);
+ }
+ }
+ catch (Exception e)
+ {
+ _log.warn("Setup connections trouble", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Try to create the connection and table.
+ * If this fails, then we will exit.
+ */
+ protected synchronized boolean setupStore(boolean dropTables)
+ {
+ if (!setupConnections())
+ {
+ return false;
+ }
+ MyConnection myconnection = null;
+ try
+ {
+ myconnection = (MyConnection) _connectionPool.acquireInstance();
+ Statement stmt = myconnection._connection.createStatement();
+ /*
+ * TODO Need some management interface to delete the table!
+ */
+ if (dropTables)
+ {
+ try
+ {
+ stmt.executeUpdate("DROP TABLE " + _tableNameMessage);
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // don't want to print error - chances are it
+ // just reports that the table does not exist
+ // ex.printStackTrace();
+ }
+ try
+ {
+ stmt.executeUpdate("DROP TABLE " + _tableNameQueue);
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // ex.printStackTrace();
+ }
+ try
+ {
+ stmt.executeUpdate("DROP TABLE " + _tableNameQueueMessageRelation);
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // ex.printStackTrace();
+ }
+ try
+ {
+ stmt.executeUpdate("DROP TABLE " + _tableNameExchange);
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // ex.printStackTrace();
+ }
+ try
+ {
+ stmt.executeUpdate("DROP TABLE " + _tableNameExchangeQueueRelation);
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // ex.printStackTrace();
+ }
+ try
+ {
+ stmt.executeUpdate("DROP TABLE " + _tableNameRecord);
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // ex.printStackTrace();
+ }
+ try
+ {
+ stmt.executeUpdate("DROP TABLE " + _tableNameTransaction);
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // ex.printStackTrace();
+ }
+ }
+ // create the table for messages
+ try
+ {
+ stmt.executeUpdate("CREATE TABLE " + _tableNameMessage + " (MessageID FLOAT NOT NULL, Header BLOB," +
+ " Payload BLOB, ExchangeName VARCHAR(1024), RoutingKey VARCHAR(1024)," +
+ " Mandatory INTEGER, Is_Immediate INTEGER, PRIMARY KEY(MessageID))");
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // ex.printStackTrace();
+ // assume this is reporting that the table already exists:
+ }
+ // create the table for queues
+ try
+ {
+ stmt.executeUpdate("CREATE TABLE " + _tableNameQueue + " (QueueID INTEGER NOT NULL, " +
+ "Name VARCHAR(1024) NOT NULL, Owner VARCHAR(1024), PRIMARY KEY(QueueID))");
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ //ex.printStackTrace();
+ // assume this is reporting that the table already exists:
+ }
+ // create the table for queue to message mapping
+ try
+ {
+ stmt.executeUpdate("CREATE TABLE " + _tableNameQueueMessageRelation + " (QueueID INTEGER NOT NULL, " +
+ "MessageID FLOAT NOT NULL, Prepared INTEGER)");
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ //ex.printStackTrace();
+ // assume this is reporting that the table already exists:
+ }
+ try
+ {
+ stmt.executeUpdate("CREATE TABLE " + _tableNameExchange + " (Name VARCHAR(1024) NOT NULL, " +
+ "Type VARCHAR(1024) NOT NULL, PRIMARY KEY(Name))");
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ //ex.printStackTrace();
+ // assume this is reporting that the table already exists:
+ }
+ try
+ {
+ stmt.executeUpdate("CREATE TABLE " + _tableNameExchangeQueueRelation + " (QueueID INTEGER NOT NULL, " +
+ "Name VARCHAR(1024) NOT NULL, RoutingKey VARCHAR(1024), FieldTable BLOB )");
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ //ex.printStackTrace();
+ // assume this is reporting that the table already exists:
+ }
+ try
+ {
+ stmt.executeUpdate("CREATE TABLE " + _tableNameRecord + " (XID_ID FLOAT, Type INTEGER, MessageID FLOAT, " +
+ "QueueID INTEGER, PRIMARY KEY(Type, MessageID, QueueID))");
+ // we could alter the table with QueueID as foreign key
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ //ex.printStackTrace();
+ // assume this is reporting that the table already exists:
+ }
+ try
+ {
+ stmt.executeUpdate("CREATE TABLE " + _tableNameTransaction + " (XID_ID FLOAT, FormatId INTEGER, " +
+ "BranchQualifier BLOB, GlobalTransactionId BLOB, PRIMARY KEY(XID_ID))");
+ myconnection._connection.commit();
+ }
+ catch (SQLException ex)
+ {
+ // ex.printStackTrace();
+ // assume this is reporting that the table already exists:
+ }
+ }
+ catch (Throwable e)
+ {
+ _log.warn("Setup Store trouble: ", e);
+ return false;
+ }
+ finally
+ {
+ if (myconnection != null)
+ {
+ _connectionPool.releaseInstance(myconnection);
+ }
+ }
+ return true;
+ }
+ //========================================================================================
+ //============== the connection pool =====================================================
+ //========================================================================================
+
+ private class ConnectionPool extends Pool
+ {
+
+ /**
+ * Create a pool of specified size. Negative or null pool sizes are
+ * disallowed.
+ *
+ * @param poolSize The size of the pool to create. Should be 1 or
+ * greater.
+ * @throws Exception If the pool size is less than 1.
+ */
+ public ConnectionPool(int poolSize)
+ throws
+ Exception
+ {
+ super(poolSize);
+ }
+
+ /**
+ * @return An instance of the pooled object.
+ * @throws Exception In case of internal error.
+ */
+ protected MyConnection createInstance()
+ throws
+ Exception
+ {
+ try
+ {
+ // standard way to obtain a Connection object is to call the method DriverManager.getConnection,
+ // which takes a String containing a connection URL (uniform resource locator).
+ Connection conn = DriverManager.getConnection(_connectionURL);
+ //conn.setAutoCommit(true);
+ PreparedStatement[] st = new PreparedStatement[STATEMENT_SIZE];
+ for (int j = 0; j < STATEMENT_SIZE; j++)
+ {
+ st[j] = null;
+ }
+ return new MyConnection(conn, st);
+ }
+ catch (SQLException e)
+ {
+ throw new Exception("sqlException when creating connection to " + _connectionURL, e);
+ }
+ }
+ }
+
+ public class MyConnection
+ {
+ // the connection
+ private Connection _connection = null;
+ // its associated prepared statements
+ private PreparedStatement[] _preparedStatements = null;
+
+ MyConnection(Connection con, PreparedStatement[] st)
+ {
+ _connection = con;
+ _preparedStatements = st;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ public PreparedStatement[] getStatements()
+ {
+ return _preparedStatements;
+ }
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
index 2bcebe0fa7..8954ffc4d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
@@ -29,6 +29,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exception.*;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.txn.MemoryDequeueRecord;
@@ -249,4 +251,23 @@ public class MemoryMessageStore implements MessageStore
{
return _messageID++;
}
+
+
+ public ContentHeaderBody getContentHeaderBody(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ // do nothing this is only used during recovery
+ return null;
+ }
+
+ public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ // do nothing this is only used during recovery
+ return null;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
index c4b1d3182f..913f3ed9c6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
@@ -1,270 +1,300 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.messageStore;
-
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
-
-import javax.transaction.xa.Xid;
-import java.util.Collection;
-
-/**
- * Created by Arnaud Simon
- * Date: 29-Mar-2007
- * Time: 17:34:02
- */
-public interface MessageStore
-{
- /**
- * Create a new exchange
- *
- * @param exchange the exchange to be persisted
- * @throws InternalErrorException If an error occurs
- */
- public void createExchange(Exchange exchange)
- throws
- InternalErrorException;
-
- /**
- * Remove an exchange
- * @param exchange The exchange to be removed
- * @throws InternalErrorException If an error occurs
- */
- public void removeExchange(Exchange exchange) throws
- InternalErrorException;
-
- /**
- * Bind a queue with an exchange given a routing key
- *
- * @param exchange The exchange to bind the queue with
- * @param routingKey The routing key
- * @param queue The queue to be bound
- * @param args Args
- * @throws InternalErrorException If an error occurs
- */
- public void bindQueue(Exchange exchange,
- AMQShortString routingKey,
- StorableQueue queue, FieldTable args)
- throws
- InternalErrorException;
-
- /**
- * Unbind a queue from an exchange
- *
- * @param exchange The exchange the queue was bound to
- * @param routingKey The routing queue
- * @param queue The queue to unbind
- * @param args args
- * @throws InternalErrorException If an error occurs
- */
- public void unbindQueue(Exchange exchange,
- AMQShortString routingKey,
- StorableQueue queue, FieldTable args)
- throws
- InternalErrorException;
-
- /**
- * Called after instantiation in order to configure the message store. A particular implementation can define
- * whatever parameters it wants.
- *
- * @param virtualHost The virtual host using by this store
- * @param tm The transaction manager implementation
- * @param base The base element identifier from which all configuration items are relative. For example, if the base
- * element is "store", the all elements used by concrete classes will be "store.foo" etc.
- * @param config The apache commons configuration object
- * @throws InternalErrorException If an error occurs that means the store is unable to configure itself
- * @throws IllegalArgumentException If the configuration arguments are illegal
- */
- void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
- throws
- InternalErrorException,
- IllegalArgumentException;
-
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws InternalErrorException if close fails
- */
- void close()
- throws
- InternalErrorException;
-
- /**
- * Create a queue
- *
- * @param queue the queue to be created
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueAlreadyExistsException If the queue already exists in the store
- */
- public void createQueue(StorableQueue queue)
- throws
- InternalErrorException,
- QueueAlreadyExistsException;
-
- /**
- * Destroy a queue
- *
- * @param queue The queue to be destroyed
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- */
- public void destroyQueue(StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException;
-
- /**
- * Stage the message before effective enqueue
- *
- * @param m The message to stage
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageAlreadyStagedException If the message is already staged
- */
- public void stage(StorableMessage m)
- throws
- InternalErrorException,
- MessageAlreadyStagedException;
-
-
- /**
- * Append more data with a previously staged message
- *
- * @param m The message to which data must be appended
- * @param data Data to happen to the message
- * @param offset The number of bytes from the beginning of the payload
- * @param size The number of bytes to be written
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message has not been staged
- */
- public void appendContent(StorableMessage m, byte[] data, int offset, int size)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Get the content of previously staged or enqueued message.
- * The message headers are also set.
- *
- * @param m The message for which the content must be loaded
- * @param offset The number of bytes from the beginning of the payload
- * @param size The number of bytes to be loaded
- * @return The message content
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message does not exist
- */
- public byte[] loadContent(StorableMessage m, int offset, int size)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Destroy a previously staged message
- *
- * @param m the message to be destroyed
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message does not exist in the store
- */
- public void destroy(StorableMessage m)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Enqueue a message under the scope of the transaction branch
- * identified by xid when specified.
- * <p> This operation is propagated to the queue and the message.
- * <p> A message that has been previously staged is assumed to have had
- * its payload already added (see appendContent)
- *
- * @param xid The xid of the transaction branch under which the message must be enqueued.
- * <p> It he xid is null then the message is enqueued outside the scope of any transaction.
- * @param m The message to be enqueued
- * @param queue The queue into which the message must be enqueued
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- * @throws InvalidXidException The transaction branch is invalid
- * @throws UnknownXidException The transaction branch is unknown
- * @throws MessageDoesntExistException If the Message does not exist
- */
- public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException,
- InvalidXidException,
- UnknownXidException,
- MessageDoesntExistException;
-
- /**
- * Dequeue a message under the scope of the transaction branch identified by xid
- * if specified.
- * <p> This operation is propagated to the queue and the message.
- *
- * @param xid The xid of the transaction branch under which the message must be dequeued.
- * <p> It he xid is null then the message is dequeued outside the scope of any transaction.
- * @param m The message to be dequeued
- * @param queue The queue from which the message must be dequeued
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- * @throws InvalidXidException The transaction branch is invalid
- * @throws UnknownXidException The transaction branch is unknown
- */
- public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException,
- InvalidXidException,
- UnknownXidException;
-
- //=========================================================
- // Recovery specific methods
- //=========================================================
-
- /**
- * List all the persistent queues
- *
- * @return All the persistent queues
- * @throws InternalErrorException In case of internal message store problem
- */
- public Collection<StorableQueue> getAllQueues()
- throws
- InternalErrorException;
-
- /**
- * All enqueued messages of a given queue
- *
- * @param queue The queue where the message are retrieved from
- * @return The list all enqueued messages of a given queue
- * @throws InternalErrorException In case of internal message store problem
- */
- public Collection<StorableMessage> getAllMessages(StorableQueue queue)
- throws
- InternalErrorException;
-
- /**
- * Get a new message ID
- *
- * @return A new message ID
- */
- public long getNewMessageId();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:34:02
+ */
+public interface MessageStore
+{
+ /**
+ * Create a new exchange
+ *
+ * @param exchange the exchange to be persisted
+ * @throws InternalErrorException If an error occurs
+ */
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException;
+
+ /**
+ * Remove an exchange
+ *
+ * @param exchange The exchange to be removed
+ * @throws InternalErrorException If an error occurs
+ */
+ public void removeExchange(Exchange exchange)
+ throws
+ InternalErrorException;
+
+ /**
+ * Bind a queue with an exchange given a routing key
+ *
+ * @param exchange The exchange to bind the queue with
+ * @param routingKey The routing key
+ * @param queue The queue to be bound
+ * @param args Args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void bindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Unbind a queue from an exchange
+ *
+ * @param exchange The exchange the queue was bound to
+ * @param routingKey The routing queue
+ * @param queue The queue to unbind
+ * @param args args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void unbindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param virtualHost The virtual host using by this store
+ * @param tm The transaction manager implementation
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @throws InternalErrorException If an error occurs that means the store is unable to configure itself
+ * @throws IllegalArgumentException If the configuration arguments are illegal
+ */
+ void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException;
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws InternalErrorException if close fails
+ */
+ void close()
+ throws
+ InternalErrorException;
+
+ /**
+ * Create a queue
+ *
+ * @param queue the queue to be created
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueAlreadyExistsException If the queue already exists in the store
+ */
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException;
+
+ /**
+ * Destroy a queue
+ *
+ * @param queue The queue to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ */
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException;
+
+ /**
+ * Stage the message before effective enqueue
+ *
+ * @param m The message to stage
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageAlreadyStagedException If the message is already staged
+ */
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException;
+
+
+ /**
+ * Append more data with a previously staged message
+ *
+ * @param m The message to which data must be appended
+ * @param data Data to happen to the message
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be written
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message has not been staged
+ */
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the content of previously staged or enqueued message.
+ * The message headers are also set.
+ *
+ * @param m The message for which the content must be loaded
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be loaded
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the content header of this message
+ *
+ * @param m The message
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public ContentHeaderBody getContentHeaderBody(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the MessagePublishInfo of this message
+ *
+ * @param m The message
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Destroy a previously staged message
+ *
+ * @param m the message to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist in the store
+ */
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Enqueue a message under the scope of the transaction branch
+ * identified by xid when specified.
+ * <p> This operation is propagated to the queue and the message.
+ * <p> A message that has been previously staged is assumed to have had
+ * its payload already added (see appendContent)
+ *
+ * @param xid The xid of the transaction branch under which the message must be enqueued.
+ * <p> It he xid is null then the message is enqueued outside the scope of any transaction.
+ * @param m The message to be enqueued
+ * @param queue The queue into which the message must be enqueued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ * @throws MessageDoesntExistException If the Message does not exist
+ */
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException;
+
+ /**
+ * Dequeue a message under the scope of the transaction branch identified by xid
+ * if specified.
+ * <p> This operation is propagated to the queue and the message.
+ *
+ * @param xid The xid of the transaction branch under which the message must be dequeued.
+ * <p> It he xid is null then the message is dequeued outside the scope of any transaction.
+ * @param m The message to be dequeued
+ * @param queue The queue from which the message must be dequeued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ */
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException;
+
+ //=========================================================
+ // Recovery specific methods
+ //=========================================================
+
+ /**
+ * List all the persistent queues
+ *
+ * @return All the persistent queues
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException;
+
+ /**
+ * All enqueued messages of a given queue
+ *
+ * @param queue The queue where the message are retrieved from
+ * @return The list all enqueued messages of a given queue
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException;
+
+ /**
+ * Get a new message ID
+ *
+ * @return A new message ID
+ */
+ public long getNewMessageId();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java
new file mode 100644
index 0000000000..49af218b7a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java
@@ -0,0 +1,135 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import java.util.ArrayList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 15-May-2007
+ * Time: 10:11:49
+ */
+public abstract class Pool
+{
+ // The maximum size of the pool.
+ private int _maxPoolSize = -1;
+ // The current size of the pool.
+ private int _currentPoolSize = 0;
+ // The pool objects.
+ private volatile ArrayList<Object> _poolObjects = new ArrayList<Object>();
+ //The current number of created instances.
+ private int _instanceCount = 0;
+
+ /**
+ * Create a pool of specified size. Negative or null pool sizes are
+ * disallowed.
+ *
+ * @param poolSize The size of the pool to create. Should be 1 or
+ * greater.
+ * @throws Exception If the pool size is less than 1.
+ */
+ public Pool(int poolSize) throws Exception
+ {
+ if (poolSize <= 0)
+ {
+ throw new Exception("pool size is less than 1: " + poolSize);
+ }
+ _maxPoolSize = poolSize;
+ }
+
+ /**
+ * Return the maximum size of this pool.
+ *
+ * @return The maximum size of this pool.
+ */
+ public final int maxSize()
+ {
+ return _maxPoolSize;
+ }
+
+ /**
+ * Return the current number of created instances.
+ *
+ * @return The current number of created instances in this pool.
+ */
+ public final int numberOfInstances()
+ {
+ return _instanceCount;
+ }
+
+ /**
+ * Extending classes MUST define how to create an instance of the object
+ * that they pool.
+ *
+ * @return An instance of the pooled object.
+ * @throws Exception In case of internal error.
+ */
+ abstract protected Object createInstance() throws Exception;
+
+ /**
+ * Remove the next available object from the pool or wait for one to become
+ * available.
+ *
+ * @return The next available instance.
+ * @throws Exception If the call is interrupted
+ */
+ public final synchronized Object acquireInstance() throws Exception
+ {
+ while (_currentPoolSize == _maxPoolSize)
+ {
+ try
+ {
+ this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Exception("pool wait threw interrupted exception", e);
+ }
+ }
+ if (_poolObjects.size() == 0)
+ {
+ _poolObjects.add(createInstance());
+ _instanceCount++;
+ }
+ _currentPoolSize++;
+ return _poolObjects.remove(0);
+ }
+
+ /**
+ * Return an object back into this pool.
+ *
+ * @param object The returning object.
+ */
+ public synchronized void releaseInstance(Object object)
+ {
+ _poolObjects.add(object);
+ _currentPoolSize--;
+ this.notify();
+ }
+
+ /**
+ * Return a dead object back into this pool.
+ *
+ */
+ public synchronized void releaseDeadInstance()
+ {
+ _instanceCount--;
+ _currentPoolSize--;
+ this.notify();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
index 560549c126..47126f4b68 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,7 +24,11 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.MessageDoesntExistException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.log4j.Logger;
import javax.transaction.xa.Xid;
@@ -65,7 +69,7 @@ public class StorableMessageHandle implements AMQMessageHandle
// MessagePublishInfo
private MessagePublishInfo _messagePublishInfo;
// list of chunks
- private List<ContentChunk> _chunks = new LinkedList<ContentChunk>();
+ private List<ContentChunk> _chunks;
//========================================================================
// Constructors
@@ -84,6 +88,17 @@ public class StorableMessageHandle implements AMQMessageHandle
throws
AMQException
{
+ if (_contentHeaderBody == null)
+ {
+ // load it from the store
+ try
+ {
+ _contentHeaderBody = _messageStore.getContentHeaderBody(_message);
+ } catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
return _contentHeaderBody;
}
@@ -91,6 +106,17 @@ public class StorableMessageHandle implements AMQMessageHandle
throws
AMQException
{
+ if (_chunks == null )
+ {
+ if(_message.isStaged() )
+ {
+ loadChunks();
+ }
+ else
+ {
+ return 0;
+ }
+ }
return _chunks.size();
}
@@ -106,13 +132,57 @@ public class StorableMessageHandle implements AMQMessageHandle
IllegalArgumentException,
AMQException
{
+ if (_chunks == null)
+ {
+ loadChunks();
+ }
return _chunks.get(index);
}
+ private void loadChunks()
+ throws
+ AMQException
+ {
+ try
+ {
+ _chunks = new LinkedList<ContentChunk>();
+ byte[] underlying = _messageStore.loadContent(_message, 1, 0);
+ final int size = underlying.length;
+ final org.apache.mina.common.ByteBuffer data =
+ org.apache.mina.common.ByteBuffer.wrap(underlying);
+ ContentChunk cb = new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return size;
+ }
+
+ public org.apache.mina.common.ByteBuffer getData()
+ {
+ return data;
+ }
+
+ public void reduceToFit()
+ {
+
+ }
+ };
+ _chunks.add(cb);
+ } catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
+
public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
throws
AMQException
{
+ if (_chunks == null)
+ {
+ _chunks = new LinkedList<ContentChunk>();
+ }
_chunks.add(contentBody);
// if rquired this message can be added to the store
//_messageStore.appendContent(_message, _payload, 0, 10);
@@ -123,6 +193,17 @@ public class StorableMessageHandle implements AMQMessageHandle
throws
AMQException
{
+ if (_messagePublishInfo == null)
+ {
+ // read it from the store
+ try
+ {
+ _messagePublishInfo = _messageStore.getMessagePublishInfo(_message);
+ } catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
return _messagePublishInfo;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java
new file mode 100644
index 0000000000..cc0cc7140b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java
@@ -0,0 +1,93 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.UnknownXidException;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 15:15:18
+ */
+public abstract class JDBCAbstractRecord implements TransactionRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCEnqueueRecord.class);
+ // The record types
+ static public final int TYPE_DEQUEUE = 1;
+ static public final int TYPE_ENQUEUE = 2;
+
+ // the queue
+ StorableQueue _queue;
+ // the message
+ StorableMessage _message;
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public JDBCAbstractRecord(StorableMessage m, StorableQueue queue)
+ {
+ _queue = queue;
+ _message = m;
+ }
+
+ public abstract int getType();
+ public long getMessageID()
+ {
+ return _message.getMessageId();
+ }
+
+ public int getQueueID()
+ {
+ return _queue.getQueueID();
+ }
+
+ public void rollback(MessageStore store)
+ throws
+ InternalErrorException
+ {
+
+ }
+
+ public void prepare(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ }
+
+
+ public abstract void rollback(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException;
+
+ public abstract void prepare(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java
new file mode 100644
index 0000000000..c18a6a7fcf
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java
@@ -0,0 +1,85 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:50:34
+ */
+public class JDBCDequeueRecord extends JDBCAbstractRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCDequeueRecord.class);
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public JDBCDequeueRecord( StorableMessage m, StorableQueue queue)
+ {
+ super(m, queue);
+ }
+
+ //========================================================================
+ // Interface TransactionRecord
+ //========================================================================
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ store.dequeue(xid, _message, _queue);
+ }
+
+ public void rollback(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ ((JDBCStore) store).rollbackDequeu(xid, _message, _queue);
+ }
+
+ public void prepare(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ ((JDBCStore) store).prepareDequeu(xid, _message, _queue);
+ }
+
+ public int getType()
+ {
+ return TYPE_DEQUEUE;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java
new file mode 100644
index 0000000000..4a4a23153e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java
@@ -0,0 +1,106 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:50:20
+ */
+public class JDBCEnqueueRecord extends JDBCAbstractRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCEnqueueRecord.class);
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public JDBCEnqueueRecord(StorableMessage m, StorableQueue queue)
+ {
+ super(m, queue);
+ }
+
+ //========================================================================
+ // Interface TransactionRecord
+ //========================================================================
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ store.enqueue(xid, _message, _queue);
+ }
+
+ public void rollback(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ if (!_message.isEnqueued())
+ {
+ // try to delete the message
+ try
+ {
+ store.destroy(_message);
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Problem when destoying message ", e);
+ }
+ }
+ }
+
+ public void prepare(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ try
+ {
+ if (!_message.isEnqueued() && !_message.isStaged())
+ {
+ store.stage(_message);
+ store.appendContent(_message, _message.getData(), 0, _message.getData().length);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new InternalErrorException("Problem when persisting message ", e);
+ }
+ }
+
+ public int getType()
+ {
+ return TYPE_ENQUEUE;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java
new file mode 100644
index 0000000000..cc35b50cc8
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.JDBCStore;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:09:35
+ */
+public class JDBCTransaction implements Transaction
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCTransaction.class);
+ public static long _xidId = 0;
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // the associated connection
+ private JDBCStore.MyConnection _connection;
+ // Indicates whether this transaction is prepared
+ private boolean _prepared = false;
+ // Indicates that this transaction has heuristically rolled back
+ private boolean _heurRollBack = false;
+ // The list of records associated with this tx
+ private List<TransactionRecord> _records = new LinkedList<TransactionRecord>();
+ // The date when this tx has been created.
+ private long _dateCreated;
+ // The timeout in seconds
+ private long _timeout;
+ // this instance xid id used as primary key
+ private long _thisxidId;
+
+ //=========================================================
+ // Constructors
+ //=========================================================
+
+ /**
+ * Create a transaction
+ *
+ */
+ public JDBCTransaction()
+ {
+ _dateCreated = System.currentTimeMillis();
+ _thisxidId = _xidId++;
+ }
+
+ //=========================================================
+ // Getter and Setter methods
+ //=========================================================
+
+ /**
+ * Notify that this tx has been prepared
+ */
+ public void prepare()
+ {
+ _prepared = true;
+ }
+
+ /**
+ * Specify whether this transaction is prepared
+ *
+ * @return true if this transaction is prepared, false otherwise
+ */
+ public boolean isPrepared()
+ {
+ return _prepared;
+ }
+
+ /**
+ * Notify that this tx has been heuristically rolled back
+ */
+ public void heurRollback()
+ {
+ _heurRollBack = true;
+ }
+
+ /**
+ * Specify whether this transaction has been heuristically rolled back
+ *
+ * @return true if this transaction has been heuristically rolled back , false otherwise
+ */
+ public boolean isHeurRollback()
+ {
+ return _heurRollBack;
+ }
+
+ /**
+ * Add an abstract record to this tx.
+ *
+ * @param record The record to be added
+ */
+ public void addRecord(TransactionRecord record)
+ {
+ _records.add(record);
+ }
+
+ /**
+ * Get the list of records associated with this tx.
+ *
+ * @return The list of records associated with this tx.
+ */
+ public List<TransactionRecord> getrecords()
+ {
+ return _records;
+ }
+
+ /**
+ * Set this tx timeout
+ *
+ * @param timeout This tx timeout in seconds
+ */
+ public void setTimeout(long timeout)
+ {
+ _timeout = timeout;
+ }
+
+ /**
+ * Get this tx timeout
+ *
+ * @return This tx timeout in seconds
+ */
+ public long getTimeout()
+ {
+ return _timeout;
+ }
+
+ /**
+ * Specify whether this tx has expired
+ *
+ * @return true if this tx has expired, false otherwise
+ */
+ public boolean hasExpired()
+ {
+ long currentDate = System.currentTimeMillis();
+ boolean result = currentDate - _dateCreated > _timeout * 1000;
+ if (_log.isDebugEnabled() && result)
+ {
+ _log.debug("transaction has expired");
+ }
+ return result;
+ }
+
+ /**
+ * Get the JDBC connection
+ * @return The JDBC connection
+ */
+ public JDBCStore.MyConnection getConnection()
+ {
+ return _connection;
+ }
+
+ /**
+ * Set the JDBC connection
+ *
+ * @param connection The new JDBC connection
+ */
+ public void setConnection(JDBCStore.MyConnection connection)
+ {
+ _connection = connection;
+ }
+
+ /**
+ * This tx xid id used as primary key
+ *
+ * @return this tx xid id
+ */
+ public long getXidID()
+ {
+ return _thisxidId;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
new file mode 100644
index 0000000000..e60fb89bf7
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
@@ -0,0 +1,554 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.HashMap;
+import java.util.Set;
+
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:05:45
+ */
+public class JDBCTransactionManager implements TransactionManager
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCTransactionManager.class);
+
+ private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout";
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // The underlying jdbc message store
+ private JDBCStore _messagStore;
+
+ // A map of XID/x
+ private HashMap<Xid, Transaction> _xidMap;
+
+ // A map of in-doubt txs
+ private HashMap<Xid, Transaction> _indoubtXidMap;
+
+ // A default tx timeout in sec
+ private int _defaultTimeout; // set to 10s if not specified in the config
+
+ //===================================
+ //=== Configuartion
+ //===================================
+
+ /**
+ * Configure this TM with the Message store implementation
+ *
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @param messageStroe the message store associated with the TM
+ */
+ public void configure(MessageStore messageStroe, String base, Configuration config)
+ {
+ _messagStore = (JDBCStore) messageStroe;
+ if (config != null)
+ {
+ _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 120);
+ } else
+ {
+ _defaultTimeout = 120;
+ }
+ _log.info("Using transaction timeout of " + _defaultTimeout + " s");
+ // get the list of in-doubt transactions
+ try
+ {
+ _indoubtXidMap = _messagStore.getAllInddoubt();
+ _xidMap = _indoubtXidMap;
+ } catch (Exception e)
+ {
+ _log.fatal("Cannot recover in-doubt transactions", e);
+ }
+ }
+
+ //===================================
+ //=== TransactionManager interface
+ //===================================
+
+ /**
+ * Begin a transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to begin
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws InvalidXidException The Xid is invalid
+ */
+ public synchronized XAFlag begin(Xid xid)
+ throws
+ InternalErrorException,
+ InvalidXidException
+ {
+ if (xid == null)
+ {
+ throw new InvalidXidException(xid, "null xid");
+ }
+ if (_xidMap.containsKey(xid))
+ {
+ throw new InvalidXidException(xid, "Xid already exist");
+ }
+ Transaction tx = new JDBCTransaction();
+ tx.setTimeout(_defaultTimeout);
+ _xidMap.put(xid, tx);
+ return XAFlag.ok;
+ }
+
+ /**
+ * Prepare the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to prepare
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution.
+ * <li> <code>XAFlag.rdonly</code>: The transaction branch was read-only and has been committed.
+ * <li> <code>XAFlag.rbrollback</code>: The transaction branch was marked rollback-only for an unspecied reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Prepare has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public synchronized XAFlag prepare(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ // get the transaction
+ JDBCTransaction tx = getTransaction(xid);
+ XAFlag result = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ result = XAFlag.rdonly;
+ } else if (tx.hasExpired())
+ {
+ result = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ if (tx.isPrepared())
+ {
+ throw new CommandInvalidException("TransactionImpl is already prepared");
+ }
+ if (tx.getrecords().size() == 0)
+ {
+ // the tx was read only (no work has been done)
+ _xidMap.remove(xid);
+ result = XAFlag.rdonly;
+ } else
+ {
+ try
+ {
+ JDBCStore.MyConnection con = _messagStore.getConnection();
+ tx.setConnection(con);
+ // save the xid
+ _messagStore.saveXID(con, tx, xid);
+ for (TransactionRecord record : tx.getrecords())
+ {
+ if (record instanceof JDBCAbstractRecord)
+ {
+ ((JDBCAbstractRecord) record).prepare(_messagStore, xid);
+ _messagStore.saveRecord(con, tx, (JDBCAbstractRecord) record);
+ } else
+ {
+ record.prepare(_messagStore);
+ }
+ }
+ _messagStore.commitConnection(con);
+ tx.setConnection(null);
+ } catch (Exception e)
+ {
+ _log.error("Cannot prepare tx: " + xid);
+ throw new InternalErrorException("Cannot prepare tx: " + xid);
+ }
+ tx.prepare();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Rollback the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to rollback
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>NOT SUPPORTED XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * <li> <code>NOT SUPPORTED XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Rollback has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public synchronized XAFlag rollback(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ // get the transaction
+ JDBCTransaction tx = getTransaction(xid);
+ XAFlag flag = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else
+ {
+ try
+ {
+ JDBCStore.MyConnection con = _messagStore.getConnection();
+ tx.setConnection(con);
+ for (TransactionRecord record : tx.getrecords())
+ {
+ if (record instanceof JDBCAbstractRecord)
+ {
+ ((JDBCAbstractRecord) record).rollback(_messagStore, xid);
+ } else
+ {
+ record.rollback(_messagStore);
+ }
+ }
+ if (tx.isPrepared())
+ {
+ _messagStore.deleteRecords(con, tx);
+ _messagStore.deleteXID(con, tx);
+ _messagStore.commitConnection(con);
+ }
+ _messagStore.commitConnection(con);
+ tx.setConnection(null);
+ }
+ catch (Exception e)
+ {
+ // this should not happen
+ _log.error("Error when rolling back distributed transaction: " + xid);
+ throw new InternalErrorException("Error when rolling back distributed transaction: " + xid, e);
+ }
+ removeTransaction(xid);
+ }
+ if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ }
+ return flag;
+ }
+
+ /**
+ * Commit the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to commit
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the specied transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Commit has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ * @throws org.apache.qpid.server.exception.NotPreparedException
+ * The branch was not prepared prior to commit
+ */
+ public synchronized XAFlag commit(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException,
+ NotPreparedException
+ {
+ // get the transaction
+ JDBCTransaction tx = getTransaction(xid);
+ XAFlag flag = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ if (!tx.isPrepared())
+ {
+ throw new NotPreparedException("TransactionImpl is not prepared");
+ }
+ try
+ {
+ JDBCStore.MyConnection con = _messagStore.getConnection();
+ tx.setConnection(con);
+ for (TransactionRecord record : tx.getrecords())
+ {
+ try
+ {
+ record.commit(_messagStore, xid);
+ } catch (InvalidXidException e)
+ {
+ throw new UnknownXidException(xid, e);
+ } catch (Exception e)
+ {
+ // this should not happen as the queue and the message must exist
+ _log.error("Error when committing distributed transaction heurmix mode returned: " + xid);
+ flag = XAFlag.heurmix;
+ }
+ }
+ _messagStore.deleteRecords(con, tx);
+ _messagStore.deleteXID(con, tx);
+ _messagStore.commitConnection(con);
+ tx.setConnection(null);
+ } catch (Exception e)
+ {
+ // this should not happen
+ _log.error("Error when committing distributed transaction heurrb mode returned: " + xid);
+ throw new InternalErrorException("Error when committing distributed transaction: " + xid, e);
+ }
+ removeTransaction(xid);
+ }
+ return flag;
+ }
+
+ /**
+ * One phase commit the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to one phase commit
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>NOT SUPPORTED XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * <li> <code>NOT SUPPORTED XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Commit has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public synchronized XAFlag commit_one_phase(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ XAFlag flag = XAFlag.ok;
+ JDBCTransaction tx = getTransaction(xid);
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ try
+ {
+ // we do not need to prepare the tx
+ tx.prepare();
+ JDBCStore.MyConnection con = _messagStore.getConnection();
+ tx.setConnection(con);
+ for (TransactionRecord record : tx.getrecords())
+ {
+ try
+ {
+ record.commit(_messagStore, xid);
+ } catch (InvalidXidException e)
+ {
+ throw new UnknownXidException(xid, e);
+ } catch (Exception e)
+ {
+ // this should not happen as the queue and the message must exist
+ _log.error("Error when committing transaction heurmix mode returned: " + xid);
+ flag = XAFlag.heurmix;
+ }
+ }
+ _messagStore.commitConnection(con);
+ tx.setConnection(null);
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ throw new InternalErrorException("cannot commit transaxtion with xid " + xid + " " + e, e);
+ }
+ finally
+ {
+ removeTransaction(xid);
+ }
+ }
+ return flag;
+ }
+
+ /**
+ * Forget about the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to forget
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Forget has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public void forget(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ synchronized (xid)
+ {
+ getTransaction(xid);
+ removeTransaction(xid);
+ }
+ }
+
+ /**
+ * Set the transaction branch timeout value in seconds
+ *
+ * @param xid The xid of the branch to set timeout
+ * @param timeout Timeout value in seconds
+ * @throws InternalErrorException In case of internal problem
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public void setTimeout(Xid xid, long timeout)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ JDBCTransaction tx = getTransaction(xid);
+ tx.setTimeout(timeout);
+ }
+
+ /**
+ * Get the transaction branch timeout
+ *
+ * @param xid The xid of the branch to get the timeout from
+ * @return The timeout associated with the branch identified with xid
+ * @throws InternalErrorException In case of internal problem
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public long getTimeout(Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ JDBCTransaction tx = getTransaction(xid);
+ return tx.getTimeout();
+ }
+
+ /**
+ * Get a set of Xids the RM has prepared or heuristically completed
+ *
+ * @param startscan Indicates that recovery scan should start
+ * @param endscan Indicates that the recovery scan should end after returning the Xids
+ * @return Set of Xids the RM has prepared or heuristically completed
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Recover has been call in an improper context
+ */
+ public Set<Xid> recover(boolean startscan, boolean endscan)
+ throws
+ InternalErrorException,
+ CommandInvalidException
+ {
+ return _indoubtXidMap.keySet();
+ }
+
+ /**
+ * An error happened (for example the channel has been abruptly closed)
+ * with this Xid, TM must make a heuristical decision.
+ *
+ * @param xid The Xid of the transaction branch to be heuristically completed
+ * @throws UnknownXidException The Xid is unknown
+ * @throws InternalErrorException In case of internal problem
+ */
+ public void HeuristicOutcome(Xid xid)
+ throws
+ UnknownXidException,
+ InternalErrorException
+ {
+ synchronized (xid)
+ {
+ JDBCTransaction tx = getTransaction(xid);
+ if (!tx.isPrepared())
+ {
+ // heuristically rollback this tx
+ for (TransactionRecord record : tx.getrecords())
+ {
+ record.rollback(_messagStore);
+ }
+ tx.heurRollback();
+ }
+ // add this branch in the list of indoubt tx
+ _indoubtXidMap.put(xid, tx);
+ }
+ }
+
+
+ public JDBCTransaction getTransaction(Xid xid)
+ throws
+ UnknownXidException
+ {
+ Transaction tx = _xidMap.get(xid);
+ if (tx == null)
+ {
+ throw new UnknownXidException(xid);
+ }
+ return (JDBCTransaction) tx;
+ }
+
+ //==========================================================================
+ //== Methods for Message Store
+ //==========================================================================
+
+ /**
+ * Get the default tx timeout in seconds
+ *
+ * @return the default tx timeout in seconds
+ */
+ public int getDefaultTimeout()
+ {
+ return _defaultTimeout;
+ }
+ //==========================================================================
+ //== Private Methods
+ //==========================================================================
+
+ private void removeTransaction(Xid xid)
+ {
+ _xidMap.remove(xid);
+ _indoubtXidMap.remove(xid);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index addb0c791f..8becaf52b9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -93,7 +93,10 @@ public class NonTransactionalContext implements TransactionalContext
{
try
{
- message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
+ if( ! deliverFirst )
+ {
+ message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
+ }
queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index f7c578d9a0..12b2a4f7a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -57,10 +57,14 @@ public class NullApplicationRegistry extends ApplicationRegistry
super(new MapConfiguration(new HashMap()));
}
- public void initialise() throws Exception
+ public void initialise()
+ throws
+ Exception
{
_configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.MemoryMessageStore");
_configuration.addProperty("txn.class", "org.apache.qpid.server.txn.MemoryTransactionManager");
+ // _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.JDBCStore");
+ // _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.JDBCTransactionManager");
Properties users = new Properties();
@@ -121,3 +125,4 @@ public class NullApplicationRegistry extends ApplicationRegistry
}
+