diff options
| author | Gordon Sim <gsim@apache.org> | 2007-05-22 08:49:53 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-05-22 08:49:53 +0000 |
| commit | 65647bc9645a0201f9c9e8ad91469ec2b9e9db17 (patch) | |
| tree | 71a9a9c10e2d1bf9e5b7bafdd93c70b7620ffd3a /java/broker | |
| parent | 3e981a1d1308bd57df075f423041d45bef73fed6 (diff) | |
| download | qpid-python-65647bc9645a0201f9c9e8ad91469ec2b9e9db17.tar.gz | |
Patch from Arnaud Simon (asimon@redhat.com) in connection with QPID-496. This adds a JDBC based message store implementation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540493 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
13 files changed, 3344 insertions, 276 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5de59f47a3..43a04dbfa1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -473,7 +473,7 @@ public class AMQChannel unacked.message.setRedelivered(true); // Deliver Message - deliveryContext.deliver(unacked.message, unacked.queue, false); + deliveryContext.deliver(unacked.message, unacked.queue, true); // Should we allow access To the DM to directy deliver the message? // As we don't need to check for Consumers or worry about incrementing the message count? diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java new file mode 100644 index 0000000000..44de0d3d98 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java @@ -0,0 +1,1759 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.messageStore; + +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exception.*; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.txn.*; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +import javax.transaction.xa.Xid; +import java.util.Collection; +import java.util.List; +import java.util.ArrayList; +import java.util.HashMap; +import java.sql.*; + +/** + * Created by Arnaud Simon + * Date: 15-May-2007 + * Time: 09:59:12 + */ +public class JDBCStore implements MessageStore +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(JDBCStore.class); + // the database connection pool + public static ConnectionPool _connectionPool = null; + // the prepared statements + //==== IMPORTANT: remember to update if we add more prepared statements! + private static final int CREATE_EXCHANGE = 0; + private static final int DELETE_EXCHANGE = 1; + private static final int BIND_QUEUE = 2; + private static final int UNBIND_QUEUE = 3; + private static final int CREATE_QUEUE = 4; + private static final int DELETE_QUEUE = 5; + private static final int STAGE_MESSAGE = 6; + private static final int UPDATE_MESSAGE_PAYLOAD = 7; + private static final int SELECT_MESSAGE_PAYLOAD = 8; + private static final int DELETE_MESSAGE = 9; + private static final int ENQUEUE = 10; + private static final int DEQUEUE = 11; + private static final int GET_ALL_QUEUES = 12; + private static final int GET_ALL_MESSAGES = 13; + private static final int SAVE_RECORD = 14; + private static final int SAVE_XID = 15; + private static final int DELETE_RECORD = 16; + private static final int DELETE_XID = 17; + private static final int UPDATE_QMR = 18; + private static final int GET_CONTENT_HEADER = 19; + private static final int GET_MESSAGE_INFO = 20; + //==== size: + private static final int STATEMENT_SIZE = 21; + //======================================================================== + // field properties + //======================================================================== + //The default URL + protected String _connectionURL = "jdbc:derby:derbyDB;create=true"; + // The default driver + private String _driver = "org.apache.derby.jdbc.EmbeddedDriver"; + // The pool max size + private int _maxSize = 40; + // The tables + // the table containing the messages + private String _tableNameMessage = "MessageTable"; + private String _tableNameQueue = "QueueTable"; + private String _tableNameQueueMessageRelation = "QeueMessageRelation"; + private String _tableNameExchange = "Exchange"; + private String _tableNameExchangeQueueRelation = "ExchangeQueueRelation"; + private String _tableNameTransaction = "TransactionTable"; + private String _tableNameRecord = "RecordTable"; + + // The transaction maanger + private JDBCTransactionManager _tm; + // the message ID + private long _messageID = 0; + // the virtual host + private VirtualHost _virtualHost; + // indicate whether this store is recovering + private boolean _recovering = false; + // the recovered queues + private HashMap<Integer, AMQQueue> _queueMap; + + //======================================================================== + // Interface MessageStore + //======================================================================== + public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config) + throws + InternalErrorException, + IllegalArgumentException + { + _log.info("Configuring Derby message store"); + // the virtual host + _virtualHost = virtualHost; + // Specify that the tables must be dropped. + // If true then this means that recovery is not possible. + boolean dropTables = true; + if (config != null) + { + dropTables = config.getBoolean(base + "dropTables", false); + _driver = config.getString(base + "driver", _driver); + _connectionURL = config.getString(base + "connectionURL", _connectionURL); + _maxSize = config.getInt(base + "connectionPoolSize", 20); + } + if (dropTables) + { + _log.info("Dropping table of Derby message store"); + } + if (!setupStore(dropTables)) + { + _log.error("Error configuration of Derby store failed"); + throw new InternalErrorException("Error configuration of Derby store failed"); + } + // recovery + _recovering = true; + _queueMap = recover(); //==> recover the queues and the messages + // recreate the excahnges and bind the queues + recoverExchanges(_queueMap); + _recovering = false; + _tm = (JDBCTransactionManager) tm; + _tm.configure(this, "txn", config); + _queueMap.clear(); + _queueMap = null; + } + + public void close() + throws + InternalErrorException + { + // nothing has to be done + } + + public void createExchange(Exchange exchange) + throws + InternalErrorException + { + if (!_recovering) + { + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[CREATE_EXCHANGE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchange + + " (Name,Type) VALUES (?,?)"); + connection.getStatements()[CREATE_EXCHANGE] = pstmt; + } + pstmt.setString(1, exchange.getName().asString()); + pstmt.setString(2, exchange.getType().asString()); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot create Exchange: " + exchange); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot create Exchange: " + exchange); + } + } + } + } + } + + public void removeExchange(Exchange exchange) + throws + InternalErrorException + { + if (!_recovering) + { + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[DELETE_EXCHANGE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchange + + " WHERE Name = ?"); + connection.getStatements()[DELETE_EXCHANGE] = pstmt; + } + pstmt.setString(1, exchange.getName().asString()); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot remove Exchange: " + exchange); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot remove Exchange: " + exchange); + } + } + } + } + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args) + throws + InternalErrorException + { + if (!_recovering) + { + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[BIND_QUEUE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchangeQueueRelation + + " (QueueID,Name,RoutingKey,fieldTable) VALUES (?,?,?,?)"); + connection.getStatements()[BIND_QUEUE] = pstmt; + } + pstmt.setInt(1, queue.getQueueID()); + pstmt.setString(2, exchange.getName().asString()); + pstmt.setString(3, routingKey.asString()); + if (args != null) + { + pstmt.setBytes(4, args.getDataAsBytes()); + } else + { + pstmt.setBytes(4, null); + } + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot create Exchange: " + exchange); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot create Exchange: " + exchange); + } + } + } + } + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args) + throws + InternalErrorException + { + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[UNBIND_QUEUE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchangeQueueRelation + + " WHERE QueueID = ? AND NAME = ? AND RoutingKey = ?"); + connection.getStatements()[UNBIND_QUEUE] = pstmt; + } + pstmt.setInt(1, queue.getQueueID()); + pstmt.setString(2, exchange.getName().asString()); + pstmt.setString(3, routingKey.asString()); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot remove Exchange: " + exchange); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot remove Exchange: " + exchange); + } + } + } + } + + public void createQueue(StorableQueue queue) + throws + InternalErrorException, + QueueAlreadyExistsException + { + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[CREATE_QUEUE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueue + + " (QueueID,Name,Owner) VALUES (?,?,?)"); + connection.getStatements()[CREATE_QUEUE] = pstmt; + } + pstmt.setInt(1, queue.getQueueID()); + pstmt.setString(2, queue.getName().asString()); + if (queue.getOwner() != null) + { + pstmt.setString(3, queue.getOwner().asString()); + } else + { + pstmt.setString(3, null); + } + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot create Queue: " + queue); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot create Queue: " + queue); + } + } + } + } + + public void destroyQueue(StorableQueue queue) + throws + InternalErrorException, + QueueDoesntExistException + { + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[DELETE_QUEUE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueue + + " WHERE QueueID = ?"); + connection.getStatements()[DELETE_QUEUE] = pstmt; + } + pstmt.setInt(1, queue.getQueueID()); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot remove Queue: " + queue); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot remove Queue: " + queue); + } + } + } + } + + public void stage(StorableMessage m) + throws + InternalErrorException, + MessageAlreadyStagedException + { + if (m.isStaged() || m.isEnqueued()) + { + _log.error("Message with Id " + m.getMessageId() + " is already staged"); + throw new MessageAlreadyStagedException("Message eith Id " + m.getMessageId() + " is already staged"); + } + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + stage(connection, m); + } catch (Exception e) + { + throw new InternalErrorException("Cannot stage Message: " + m); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot stage Message: " + m); + } + } + } + } + + public void appendContent(StorableMessage m, byte[] data, int offset, int size) + throws + InternalErrorException, + MessageDoesntExistException + { + // The message must have been staged + if (!m.isStaged()) + { + _log.error("Cannot append content of message Id " + + m.getMessageId() + " as it has not been staged"); + throw new MessageDoesntExistException("Cannot append content of message Id " + + m.getMessageId() + " as it has not been staged"); + } + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + appendContent(connection, m, data, offset, size); + } catch (Exception e) + { + throw new InternalErrorException("Cannot stage Message: " + m); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot stage Message: " + m); + } + } + } + } + + public byte[] loadContent(StorableMessage m, int offset, int size) + throws + InternalErrorException, + MessageDoesntExistException + { + MyConnection connection = null; + try + { + byte[] result; + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[SELECT_MESSAGE_PAYLOAD]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage + + " WHERE MessageID = ? "); + connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt; + } + pstmt.setLong(1, m.getMessageId()); + ResultSet rs = pstmt.executeQuery(); + if (!rs.next()) + { + throw new MessageDoesntExistException("Cannot load content of message Id " + + m.getMessageId() + " as it has not been found"); + } + Blob myBlob = rs.getBlob(1); + + if (myBlob.length() > 0) + { + if (size == 0) + { + result = myBlob.getBytes(offset, (int) myBlob.length()); + } else + { + result = myBlob.getBytes(offset, size); + } + } else + { + throw new MessageDoesntExistException("Cannot load content of message Id " + + m.getMessageId() + " as it has not been found"); + } + rs.close(); + return result; + } catch (Exception e) + { + throw new InternalErrorException("Cannot load Message: " + m); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot load Message: " + m); + } + } + } + } + + public void destroy(StorableMessage m) + throws + InternalErrorException, + MessageDoesntExistException + { + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + destroy(connection, m); + } catch (Exception e) + { + throw new InternalErrorException("Cannot destroy message: " + m); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot destroy message: " + m); + } + } + } + } + + public void enqueue(Xid xid, StorableMessage m, StorableQueue queue) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException + { + MyConnection connection = null; + // Get the current tx + JDBCTransaction tx = getTx(xid); + // If this operation is transacted then we need to add a record + if (tx != null && !tx.isPrepared()) + { + // add an enqueue record + tx.addRecord(new JDBCEnqueueRecord(m, queue)); + } else + { + try + { + if (tx != null) + { + connection = tx.getConnection(); + } else + { + connection = (MyConnection) _connectionPool.acquireInstance(); + } + if (!m.isStaged() && !m.isEnqueued()) + { + //This is the first time this message is enqueued and it has not been staged. + stage(connection, m); + appendContent(connection, m, m.getData(), 0, m.getData().length); + } + PreparedStatement pstmt = connection.getStatements()[ENQUEUE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueueMessageRelation + + " (QueueID,MessageID,Prepared) VALUES (?,?,0)"); + connection.getStatements()[ENQUEUE] = pstmt; + } + pstmt.setInt(1, queue.getQueueID()); + pstmt.setLong(2, m.getMessageId()); + pstmt.executeUpdate(); + m.enqueue(queue); + queue.enqueue(m); + } catch (Exception e) + { + throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue); + } finally + { + if (tx == null && connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue); + } + } + } + } + } + + public void dequeue(Xid xid, StorableMessage m, StorableQueue queue) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException + { + MyConnection connection = null; + // Get the current tx + JDBCTransaction tx = getTx(xid); + // If this operation is transacted then we need to add a record + if (tx != null && !tx.isPrepared()) + { + // add an dequeue record + tx.addRecord(new JDBCDequeueRecord(m, queue)); + } else + { + try + { + if (tx != null) + { + connection = tx.getConnection(); + } else + { + connection = (MyConnection) _connectionPool.acquireInstance(); + } + PreparedStatement pstmt = connection.getStatements()[DEQUEUE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueueMessageRelation + + " WHERE QueueID = ? AND MessageID = ?"); + connection.getStatements()[DEQUEUE] = pstmt; + } + pstmt.setInt(1, queue.getQueueID()); + pstmt.setLong(2, m.getMessageId()); + pstmt.executeUpdate(); + m.dequeue(queue); + if (!m.isEnqueued()) + { + // delete this message from persistence store + destroy(connection, m); + } + queue.dequeue(m); + } catch (Exception e) + { + throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue); + } finally + { + if (tx == null && connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue); + } + } + } + } + } + + public Collection<StorableQueue> getAllQueues() + throws + InternalErrorException + { + MyConnection connection = null; + List<StorableQueue> result = new ArrayList<StorableQueue>(); + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[GET_ALL_QUEUES]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("SELECT * FROM " + _tableNameQueue); + connection.getStatements()[GET_ALL_QUEUES] = pstmt; + } + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) + { + //the queue owner may be null + AMQShortString queueOwner = null; + if (rs.getString(3) != null) + { + queueOwner = new AMQShortString(rs.getString(3)); + } + result.add(new AMQQueue(new AMQShortString(rs.getString(2)), true, queueOwner, + false, _virtualHost)); + } + rs.close(); + return result; + } catch (Exception e) + { + throw new InternalErrorException("Cannot get all queues"); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot get all queues"); + } + } + } + } + + public Collection<StorableMessage> getAllMessages(StorableQueue queue) + throws + InternalErrorException + { + MyConnection connection = null; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + return getAllMessages(connection, queue); + } catch (Exception e) + { + throw new InternalErrorException("Cannot get all queues"); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot get all queues"); + } + } + } + } + + public HashMap<Xid, Transaction> getAllInddoubt() + throws + InternalErrorException + { + MyConnection connection = null; + HashMap<Xid, Transaction> result = new HashMap<Xid, Transaction>(); + try + { + TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null); + MessageHandleFactory messageHandleFactory = new MessageHandleFactory(); + // re-create all the tx + connection = (MyConnection) _connectionPool.acquireInstance(); + Statement stmt = connection.getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameTransaction); + JDBCTransaction foundTx; + Xid foundXid; + long foundXIDID; + while (rs.next()) + { + // set the XID_ID + foundXIDID = rs.getLong(1); + if (foundXIDID > JDBCTransaction._xidId) + { + JDBCTransaction._xidId = foundXIDID; + } + foundTx = new JDBCTransaction(); + foundXid = new XidImpl(rs.getBlob(3).getBytes(1, (int) rs.getBlob(3).length()), + rs.getInt(2), rs.getBlob(4).getBytes(1, (int) rs.getBlob(4).length())); + // get all the records + Statement stmtr = connection.getConnection().createStatement(); + ResultSet rsr = stmtr.executeQuery("SELECT * FROM " + _tableNameRecord + + " WHERE XID_ID = " + rs.getLong(1)); + int foundType; + AMQQueue foundQueue; + StorableMessage foundMessage; + TransactionRecord foundRecord; + while (rsr.next()) + { + // those messages were not recovered before so they need to be recreated + foundType = rsr.getInt(2); + foundQueue = _queueMap.get(new Integer(rsr.getInt(4))); + foundMessage = new AMQMessage(rs.getLong(3), this, messageHandleFactory, txnContext); + if (foundType == JDBCAbstractRecord.TYPE_DEQUEUE) + { + foundRecord = new JDBCDequeueRecord(foundMessage, foundQueue); + } else + { + foundRecord = new JDBCEnqueueRecord(foundMessage, foundQueue); + } + foundTx.addRecord(foundRecord); + } + rsr.close(); + // add this tx to the map + result.put(foundXid, foundTx); + } + rs.close(); + return result; + } catch (Exception e) + { + throw new InternalErrorException("Cannot recover: ", e); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot recover: ", e); + } + } + } + } + + + public long getNewMessageId() + { + return _messageID++; + } + + //======================================================================== + // Public methods + //======================================================================== + + public MyConnection getConnection() + throws + Exception + { + return (MyConnection) _connectionPool.acquireInstance(); + } + + public void commitConnection(MyConnection connection) + throws + InternalErrorException + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot commit connection ="); + } + } + + public void rollbackConnection(MyConnection connection) + throws + InternalErrorException + { + try + { + connection.getConnection().rollback(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to rollback this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot rollback connection"); + } + } + + public void appendContent(MyConnection connection, StorableMessage m, byte[] data, int offset, int size) + throws + SQLException, + MessageDoesntExistException + { + PreparedStatement pstmt = connection.getStatements()[SELECT_MESSAGE_PAYLOAD]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage + + " WHERE MessageID = ? "); + connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt; + } + pstmt.setLong(1, m.getMessageId()); + ResultSet rs = pstmt.executeQuery(); + if (!rs.next()) + { + throw new MessageDoesntExistException("Cannot append content of message Id " + + m.getMessageId() + " as it has not been found"); + } + Blob myBlob = rs.getBlob(1); + byte[] oldPayload; + if (myBlob != null && myBlob.length() > 0) + { + oldPayload = myBlob.getBytes(1, (int) myBlob.length()); + } else + { + oldPayload = new byte[0]; + } + rs.close(); + byte[] newPayload = new byte[oldPayload.length + size]; + ByteBuffer buffer = ByteBuffer.wrap(newPayload); + buffer.put(oldPayload); + buffer.put(data, offset, size); + PreparedStatement pstmtUpdate = connection.getStatements()[UPDATE_MESSAGE_PAYLOAD]; + if (pstmtUpdate == null) + { + pstmtUpdate = connection.getConnection().prepareStatement("UPDATE " + _tableNameMessage + + " SET Payload = ? WHERE MessageID = ?"); + connection.getStatements()[UPDATE_MESSAGE_PAYLOAD] = pstmtUpdate; + } + pstmtUpdate.setBytes(1, newPayload); + pstmtUpdate.setLong(2, m.getMessageId()); + pstmtUpdate.executeUpdate(); + } + + public void stage(MyConnection connection, StorableMessage m) + throws + Exception + { + PreparedStatement pstmt = connection.getStatements()[STAGE_MESSAGE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameMessage + + " (MessageID,Header,ExchangeName,RoutingKey,Mandatory,Is_Immediate) VALUES (?,?,?,?,?,?)"); + connection.getStatements()[STAGE_MESSAGE] = pstmt; + } + pstmt.setLong(1, m.getMessageId()); + pstmt.setBytes(2, m.getHeaderBody()); + pstmt.setString(3, ((AMQMessage) m).getMessagePublishInfo().getExchange().asString()); + pstmt.setString(4, ((AMQMessage) m).getMessagePublishInfo().getRoutingKey().asString()); + pstmt.setBoolean(5, ((AMQMessage) m).getMessagePublishInfo().isMandatory()); + pstmt.setBoolean(6, ((AMQMessage) m).getMessagePublishInfo().isImmediate()); + pstmt.executeUpdate(); + m.staged(); + } + + public void saveRecord(MyConnection connection, JDBCTransaction tx, JDBCAbstractRecord record) + throws + InternalErrorException + { + try + { + PreparedStatement pstmt = connection.getStatements()[SAVE_RECORD]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameRecord + + " (XID_ID,Type,MessageID,QueueID) VALUES (?,?,?,?)"); + connection.getStatements()[SAVE_RECORD] = pstmt; + } + pstmt.setLong(1, tx.getXidID()); + pstmt.setInt(2, record.getType()); + pstmt.setLong(3, record.getMessageID()); + pstmt.setLong(4, record.getQueueID()); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot save record: " + record); + } + } + + public void saveXID(MyConnection connection, JDBCTransaction tx, Xid xid) + throws + InternalErrorException + { + try + { + PreparedStatement pstmt = connection.getStatements()[SAVE_XID]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameTransaction + + " (XID_ID,FormatId, BranchQualifier,GlobalTransactionId) VALUES (?,?,?,?)"); + connection.getStatements()[SAVE_XID] = pstmt; + } + pstmt.setLong(1, tx.getXidID()); + pstmt.setInt(2, xid.getFormatId()); + pstmt.setBytes(3, xid.getBranchQualifier()); + pstmt.setBytes(4, xid.getGlobalTransactionId()); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot save xid: " + xid); + } + } + + public void deleteRecords(MyConnection connection, JDBCTransaction tx) + throws + InternalErrorException + { + try + { + PreparedStatement pstmt = connection.getStatements()[DELETE_RECORD]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameRecord + + " WHERE XID_ID = ?"); + connection.getStatements()[DELETE_RECORD] = pstmt; + } + pstmt.setLong(1, tx.getXidID()); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot delete record: " + tx.getXidID()); + } + } + + public void deleteXID(MyConnection connection, JDBCTransaction tx) + throws + InternalErrorException + { + try + { + PreparedStatement pstmt = connection.getStatements()[DELETE_XID]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameTransaction + + " WHERE XID_ID = ?"); + connection.getStatements()[DELETE_XID] = pstmt; + } + pstmt.setLong(1, tx.getXidID()); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot delete xid: " + tx.getXidID()); + } + } + + public void prepareDequeu(Xid xid, StorableMessage m, StorableQueue queue) + throws + UnknownXidException, + InternalErrorException + { + JDBCTransaction tx = getTx(xid); + if (tx == null) + { + throw new UnknownXidException(xid); + } + updateQueueMessageRelation(tx.getConnection(), queue.getQueueID(), m.getMessageId(), 1); + + } + + public void rollbackDequeu(Xid xid, StorableMessage m, StorableQueue queue) + throws + UnknownXidException, + InternalErrorException + { + JDBCTransaction tx = getTx(xid); + if (tx == null) + { + throw new UnknownXidException(xid); + } + updateQueueMessageRelation(tx.getConnection(), queue.getQueueID(), m.getMessageId(), 0); + } + + //======================================================================== + // Private methods + //======================================================================== + + + private void updateQueueMessageRelation(MyConnection connection, + int queueID, long messageId, int prepared) + throws + InternalErrorException + { + try + { + PreparedStatement pstmt = connection.getStatements()[UPDATE_QMR]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("UPDATE " + _tableNameQueueMessageRelation + + " SET Prepared = ? WHERE MessageID = ? AND QueueID = ?"); + connection.getStatements()[UPDATE_QMR] = pstmt; + } + pstmt.setInt(1, prepared); + pstmt.setLong(2, messageId); + pstmt.setInt(3, queueID); + pstmt.executeUpdate(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot update QMR", e); + } + + } + + public MessagePublishInfo getMessagePublishInfo(StorableMessage m) + throws + InternalErrorException + { + MyConnection connection = null; + MessagePublishInfo result; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[GET_MESSAGE_INFO]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("SELECT ExchangeName, RoutingKey," + + " Mandatory, Is_Immediate from " + _tableNameMessage + + " WHERE MessageID = ?"); + connection.getStatements()[GET_MESSAGE_INFO] = pstmt; + } + pstmt.setLong(1, m.getMessageId()); + ResultSet rs = pstmt.executeQuery(); + if (rs.next()) + { + final AMQShortString exchange = new AMQShortString(rs.getString(1)); + final AMQShortString routingKey = new AMQShortString(rs.getString(2)); + final boolean mandatory = rs.getBoolean(3); + final boolean immediate = rs.getBoolean(4); + result = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return exchange; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return mandatory; + } + + public AMQShortString getRoutingKey() + { + return routingKey; + } + }; + } else + { + throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m); + } + rs.close(); + return result; + } catch (Exception e) + { + throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m); + } + } + } + } + + public ContentHeaderBody getContentHeaderBody(StorableMessage m) + throws + InternalErrorException + { + MyConnection connection = null; + ContentHeaderBody result; + try + { + connection = (MyConnection) _connectionPool.acquireInstance(); + PreparedStatement pstmt = connection.getStatements()[GET_CONTENT_HEADER]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("SELECT Header from " + _tableNameMessage + + " WHERE MessageID = ?"); + connection.getStatements()[GET_CONTENT_HEADER] = pstmt; + } + pstmt.setLong(1, m.getMessageId()); + ResultSet rs = pstmt.executeQuery(); + if (rs.next()) + { + result = new ContentHeaderBody(ByteBuffer.wrap(rs.getBlob(1).getBytes(1, (int) rs.getBlob(1).length())), 0); + } else + { + throw new InternalErrorException("Cannot get Content Header of message: " + m); + } + rs.close(); + return result; + } catch (Exception e) + { + throw new InternalErrorException("Cannot get Content Header of message: " + m); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot get Content Header of message: " + m); + } + } + } + } + + private List<StorableMessage> getAllMessages(MyConnection connection, StorableQueue queue) + throws + SQLException, + AMQException + { + List<StorableMessage> result = new ArrayList<StorableMessage>(); + TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null); + MessageHandleFactory messageHandleFactory = new MessageHandleFactory(); + PreparedStatement pstmt = connection.getStatements()[GET_ALL_MESSAGES]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("SELECT " + _tableNameMessage + ".MessageID, Header FROM " + + _tableNameMessage + + " INNER JOIN " + + _tableNameQueueMessageRelation + + " ON " + + _tableNameMessage + ".MessageID = " + _tableNameQueueMessageRelation + ".MessageID" + + " WHERE " + + _tableNameQueueMessageRelation + ".QueueID = ?" + + " AND " + + _tableNameQueueMessageRelation + ".Prepared = 0"); + connection.getStatements()[GET_ALL_MESSAGES] = pstmt; + } + pstmt.setInt(1, queue.getQueueID()); + ResultSet rs = pstmt.executeQuery(); + AMQMessage foundMessage; + // ContentHeaderBody hb; + while (rs.next()) + { + foundMessage = new AMQMessage(rs.getLong(1), this, messageHandleFactory, txnContext); + result.add(foundMessage); + } + rs.close(); + return result; + } + + private HashMap<Integer, AMQQueue> recover() + throws + InternalErrorException + { + MyConnection connection = null; + HashMap<Integer, AMQQueue> result = new HashMap<Integer, AMQQueue>(); + try + { + // re-create all the queues + connection = (MyConnection) _connectionPool.acquireInstance(); + Statement stmt = connection.getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameQueue); + AMQQueue foundQueue; + List<StorableMessage> foundMessages; + StoreContext context = new StoreContext(); + while (rs.next()) + { + AMQShortString owner = null; + if (rs.getString(3) != null) + { + owner = new AMQShortString(rs.getString(3)); + } + foundQueue = new AMQQueue(new AMQShortString(rs.getString(2)), + true, owner, false, _virtualHost); + // get all the Messages of that queue + foundMessages = getAllMessages(connection, foundQueue); + // enqueue those messages + if (_log.isDebugEnabled()) + { + _log.debug("Recovering " + foundMessages.size() + " messages for queue " + foundQueue.getName()); + } + for (StorableMessage foundMessage : foundMessages) + { + foundMessage.staged(); + foundMessage.enqueue(foundQueue); + foundQueue.enqueue(foundMessage); + foundQueue.process(context, (AMQMessage) foundMessage, false); + } + // add the queue in the result map + result.put(foundQueue.getQueueID(), foundQueue); + // add it in the registry + _virtualHost.getQueueRegistry().registerQueue(foundQueue); + } + rs.close(); + return result; + } catch (Exception e) + { + throw new InternalErrorException("Cannot recover: ", e); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot recover: ", e); + } + } + } + } + + private void recoverExchanges(HashMap<Integer, AMQQueue> queueMap) + throws + InternalErrorException + { + MyConnection connection = null; + try + { + // re-create all the exchanges + connection = (MyConnection) _connectionPool.acquireInstance(); + Statement stmt = connection.getConnection().createStatement(); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameExchange); + Exchange foundExchange; + AMQQueue foundQueue; + while (rs.next()) + { + foundExchange = _virtualHost.getExchangeFactory().createExchange( + new AMQShortString(rs.getString(1)), new AMQShortString(rs.getString(2)), true, false, 0); + // get all the bindings + Statement stmtb = connection.getConnection().createStatement(); + ResultSet rsb = stmtb.executeQuery("SELECT * FROM " + _tableNameExchangeQueueRelation + + " WHERE Name = '" + rs.getString(1) + "'"); + while (rsb.next()) + { + foundQueue = queueMap.get(new Integer(rsb.getInt(1))); + if (foundQueue != null) + { + // the field table + FieldTable ft = null; + if (rsb.getBlob(4) != null) + { + long length = rsb.getBlob(4).length(); + ByteBuffer buffer = ByteBuffer.wrap(rsb.getBlob(4).getBytes(1, (int) length)); + ft = new FieldTable(buffer, length); + } + foundQueue.bind(new AMQShortString(rsb.getString(3)), ft, foundExchange); + } + } + rsb.close(); + // register this exchange + _virtualHost.getExchangeRegistry().registerExchange(foundExchange); + } + rs.close(); + } catch (Exception e) + { + throw new InternalErrorException("Cannot recover: ", e); + } finally + { + if (connection != null) + { + try + { + connection.getConnection().commit(); + _connectionPool.releaseInstance(connection); + } catch (SQLException e) + { + // we did not manage to commit this connection + // it is better to release it + _connectionPool.releaseDeadInstance(); + throw new InternalErrorException("Cannot recover: ", e); + } + } + } + } + + private void destroy(MyConnection connection, StorableMessage m) + throws + SQLException + { + PreparedStatement pstmt = connection.getStatements()[DELETE_MESSAGE]; + if (pstmt == null) + { + pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameMessage + + " WHERE MessageID = ?"); + connection.getStatements()[DELETE_MESSAGE] = pstmt; + } + pstmt.setLong(1, m.getMessageId()); + pstmt.executeUpdate(); + } + + private JDBCTransaction getTx(Xid xid) + throws + UnknownXidException + { + JDBCTransaction tx = null; + if (xid != null) + { + tx = _tm.getTransaction(xid); + } + return tx; + } + + /** + * setupConnections - Initialize the connections + * + * @return true if ok + */ + private synchronized boolean setupConnections() + { + try + { + if (_connectionPool == null) + { + // In an embedded environment, loading the driver also starts Derby. + Class.forName(_driver).newInstance(); + _connectionPool = new ConnectionPool(_maxSize); + } + } + catch (Exception e) + { + _log.warn("Setup connections trouble", e); + return false; + } + return true; + } + + /** + * Try to create the connection and table. + * If this fails, then we will exit. + */ + protected synchronized boolean setupStore(boolean dropTables) + { + if (!setupConnections()) + { + return false; + } + MyConnection myconnection = null; + try + { + myconnection = (MyConnection) _connectionPool.acquireInstance(); + Statement stmt = myconnection._connection.createStatement(); + /* + * TODO Need some management interface to delete the table! + */ + if (dropTables) + { + try + { + stmt.executeUpdate("DROP TABLE " + _tableNameMessage); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // don't want to print error - chances are it + // just reports that the table does not exist + // ex.printStackTrace(); + } + try + { + stmt.executeUpdate("DROP TABLE " + _tableNameQueue); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // ex.printStackTrace(); + } + try + { + stmt.executeUpdate("DROP TABLE " + _tableNameQueueMessageRelation); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // ex.printStackTrace(); + } + try + { + stmt.executeUpdate("DROP TABLE " + _tableNameExchange); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // ex.printStackTrace(); + } + try + { + stmt.executeUpdate("DROP TABLE " + _tableNameExchangeQueueRelation); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // ex.printStackTrace(); + } + try + { + stmt.executeUpdate("DROP TABLE " + _tableNameRecord); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // ex.printStackTrace(); + } + try + { + stmt.executeUpdate("DROP TABLE " + _tableNameTransaction); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // ex.printStackTrace(); + } + } + // create the table for messages + try + { + stmt.executeUpdate("CREATE TABLE " + _tableNameMessage + " (MessageID FLOAT NOT NULL, Header BLOB," + + " Payload BLOB, ExchangeName VARCHAR(1024), RoutingKey VARCHAR(1024)," + + " Mandatory INTEGER, Is_Immediate INTEGER, PRIMARY KEY(MessageID))"); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // ex.printStackTrace(); + // assume this is reporting that the table already exists: + } + // create the table for queues + try + { + stmt.executeUpdate("CREATE TABLE " + _tableNameQueue + " (QueueID INTEGER NOT NULL, " + + "Name VARCHAR(1024) NOT NULL, Owner VARCHAR(1024), PRIMARY KEY(QueueID))"); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + //ex.printStackTrace(); + // assume this is reporting that the table already exists: + } + // create the table for queue to message mapping + try + { + stmt.executeUpdate("CREATE TABLE " + _tableNameQueueMessageRelation + " (QueueID INTEGER NOT NULL, " + + "MessageID FLOAT NOT NULL, Prepared INTEGER)"); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + //ex.printStackTrace(); + // assume this is reporting that the table already exists: + } + try + { + stmt.executeUpdate("CREATE TABLE " + _tableNameExchange + " (Name VARCHAR(1024) NOT NULL, " + + "Type VARCHAR(1024) NOT NULL, PRIMARY KEY(Name))"); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + //ex.printStackTrace(); + // assume this is reporting that the table already exists: + } + try + { + stmt.executeUpdate("CREATE TABLE " + _tableNameExchangeQueueRelation + " (QueueID INTEGER NOT NULL, " + + "Name VARCHAR(1024) NOT NULL, RoutingKey VARCHAR(1024), FieldTable BLOB )"); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + //ex.printStackTrace(); + // assume this is reporting that the table already exists: + } + try + { + stmt.executeUpdate("CREATE TABLE " + _tableNameRecord + " (XID_ID FLOAT, Type INTEGER, MessageID FLOAT, " + + "QueueID INTEGER, PRIMARY KEY(Type, MessageID, QueueID))"); + // we could alter the table with QueueID as foreign key + myconnection._connection.commit(); + } + catch (SQLException ex) + { + //ex.printStackTrace(); + // assume this is reporting that the table already exists: + } + try + { + stmt.executeUpdate("CREATE TABLE " + _tableNameTransaction + " (XID_ID FLOAT, FormatId INTEGER, " + + "BranchQualifier BLOB, GlobalTransactionId BLOB, PRIMARY KEY(XID_ID))"); + myconnection._connection.commit(); + } + catch (SQLException ex) + { + // ex.printStackTrace(); + // assume this is reporting that the table already exists: + } + } + catch (Throwable e) + { + _log.warn("Setup Store trouble: ", e); + return false; + } + finally + { + if (myconnection != null) + { + _connectionPool.releaseInstance(myconnection); + } + } + return true; + } + //======================================================================================== + //============== the connection pool ===================================================== + //======================================================================================== + + private class ConnectionPool extends Pool + { + + /** + * Create a pool of specified size. Negative or null pool sizes are + * disallowed. + * + * @param poolSize The size of the pool to create. Should be 1 or + * greater. + * @throws Exception If the pool size is less than 1. + */ + public ConnectionPool(int poolSize) + throws + Exception + { + super(poolSize); + } + + /** + * @return An instance of the pooled object. + * @throws Exception In case of internal error. + */ + protected MyConnection createInstance() + throws + Exception + { + try + { + // standard way to obtain a Connection object is to call the method DriverManager.getConnection, + // which takes a String containing a connection URL (uniform resource locator). + Connection conn = DriverManager.getConnection(_connectionURL); + //conn.setAutoCommit(true); + PreparedStatement[] st = new PreparedStatement[STATEMENT_SIZE]; + for (int j = 0; j < STATEMENT_SIZE; j++) + { + st[j] = null; + } + return new MyConnection(conn, st); + } + catch (SQLException e) + { + throw new Exception("sqlException when creating connection to " + _connectionURL, e); + } + } + } + + public class MyConnection + { + // the connection + private Connection _connection = null; + // its associated prepared statements + private PreparedStatement[] _preparedStatements = null; + + MyConnection(Connection con, PreparedStatement[] st) + { + _connection = con; + _preparedStatements = st; + } + + public Connection getConnection() + { + return _connection; + } + + public PreparedStatement[] getStatements() + { + return _preparedStatements; + } + + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java index 2bcebe0fa7..8954ffc4d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java @@ -29,6 +29,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exception.*; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.txn.MemoryDequeueRecord; @@ -249,4 +251,23 @@ public class MemoryMessageStore implements MessageStore { return _messageID++; } + + + public ContentHeaderBody getContentHeaderBody(StorableMessage m) + throws + InternalErrorException, + MessageDoesntExistException + { + // do nothing this is only used during recovery + return null; + } + + public MessagePublishInfo getMessagePublishInfo(StorableMessage m) + throws + InternalErrorException, + MessageDoesntExistException + { + // do nothing this is only used during recovery + return null; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java index c4b1d3182f..913f3ed9c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java @@ -1,270 +1,300 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.messageStore;
-
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
-
-import javax.transaction.xa.Xid;
-import java.util.Collection;
-
-/**
- * Created by Arnaud Simon
- * Date: 29-Mar-2007
- * Time: 17:34:02
- */
-public interface MessageStore
-{
- /**
- * Create a new exchange
- *
- * @param exchange the exchange to be persisted
- * @throws InternalErrorException If an error occurs
- */
- public void createExchange(Exchange exchange)
- throws
- InternalErrorException;
-
- /**
- * Remove an exchange
- * @param exchange The exchange to be removed
- * @throws InternalErrorException If an error occurs
- */
- public void removeExchange(Exchange exchange) throws
- InternalErrorException;
-
- /**
- * Bind a queue with an exchange given a routing key
- *
- * @param exchange The exchange to bind the queue with
- * @param routingKey The routing key
- * @param queue The queue to be bound
- * @param args Args
- * @throws InternalErrorException If an error occurs
- */
- public void bindQueue(Exchange exchange,
- AMQShortString routingKey,
- StorableQueue queue, FieldTable args)
- throws
- InternalErrorException;
-
- /**
- * Unbind a queue from an exchange
- *
- * @param exchange The exchange the queue was bound to
- * @param routingKey The routing queue
- * @param queue The queue to unbind
- * @param args args
- * @throws InternalErrorException If an error occurs
- */
- public void unbindQueue(Exchange exchange,
- AMQShortString routingKey,
- StorableQueue queue, FieldTable args)
- throws
- InternalErrorException;
-
- /**
- * Called after instantiation in order to configure the message store. A particular implementation can define
- * whatever parameters it wants.
- *
- * @param virtualHost The virtual host using by this store
- * @param tm The transaction manager implementation
- * @param base The base element identifier from which all configuration items are relative. For example, if the base
- * element is "store", the all elements used by concrete classes will be "store.foo" etc.
- * @param config The apache commons configuration object
- * @throws InternalErrorException If an error occurs that means the store is unable to configure itself
- * @throws IllegalArgumentException If the configuration arguments are illegal
- */
- void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
- throws
- InternalErrorException,
- IllegalArgumentException;
-
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws InternalErrorException if close fails
- */
- void close()
- throws
- InternalErrorException;
-
- /**
- * Create a queue
- *
- * @param queue the queue to be created
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueAlreadyExistsException If the queue already exists in the store
- */
- public void createQueue(StorableQueue queue)
- throws
- InternalErrorException,
- QueueAlreadyExistsException;
-
- /**
- * Destroy a queue
- *
- * @param queue The queue to be destroyed
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- */
- public void destroyQueue(StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException;
-
- /**
- * Stage the message before effective enqueue
- *
- * @param m The message to stage
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageAlreadyStagedException If the message is already staged
- */
- public void stage(StorableMessage m)
- throws
- InternalErrorException,
- MessageAlreadyStagedException;
-
-
- /**
- * Append more data with a previously staged message
- *
- * @param m The message to which data must be appended
- * @param data Data to happen to the message
- * @param offset The number of bytes from the beginning of the payload
- * @param size The number of bytes to be written
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message has not been staged
- */
- public void appendContent(StorableMessage m, byte[] data, int offset, int size)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Get the content of previously staged or enqueued message.
- * The message headers are also set.
- *
- * @param m The message for which the content must be loaded
- * @param offset The number of bytes from the beginning of the payload
- * @param size The number of bytes to be loaded
- * @return The message content
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message does not exist
- */
- public byte[] loadContent(StorableMessage m, int offset, int size)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Destroy a previously staged message
- *
- * @param m the message to be destroyed
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message does not exist in the store
- */
- public void destroy(StorableMessage m)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Enqueue a message under the scope of the transaction branch
- * identified by xid when specified.
- * <p> This operation is propagated to the queue and the message.
- * <p> A message that has been previously staged is assumed to have had
- * its payload already added (see appendContent)
- *
- * @param xid The xid of the transaction branch under which the message must be enqueued.
- * <p> It he xid is null then the message is enqueued outside the scope of any transaction.
- * @param m The message to be enqueued
- * @param queue The queue into which the message must be enqueued
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- * @throws InvalidXidException The transaction branch is invalid
- * @throws UnknownXidException The transaction branch is unknown
- * @throws MessageDoesntExistException If the Message does not exist
- */
- public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException,
- InvalidXidException,
- UnknownXidException,
- MessageDoesntExistException;
-
- /**
- * Dequeue a message under the scope of the transaction branch identified by xid
- * if specified.
- * <p> This operation is propagated to the queue and the message.
- *
- * @param xid The xid of the transaction branch under which the message must be dequeued.
- * <p> It he xid is null then the message is dequeued outside the scope of any transaction.
- * @param m The message to be dequeued
- * @param queue The queue from which the message must be dequeued
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- * @throws InvalidXidException The transaction branch is invalid
- * @throws UnknownXidException The transaction branch is unknown
- */
- public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException,
- InvalidXidException,
- UnknownXidException;
-
- //=========================================================
- // Recovery specific methods
- //=========================================================
-
- /**
- * List all the persistent queues
- *
- * @return All the persistent queues
- * @throws InternalErrorException In case of internal message store problem
- */
- public Collection<StorableQueue> getAllQueues()
- throws
- InternalErrorException;
-
- /**
- * All enqueued messages of a given queue
- *
- * @param queue The queue where the message are retrieved from
- * @return The list all enqueued messages of a given queue
- * @throws InternalErrorException In case of internal message store problem
- */
- public Collection<StorableMessage> getAllMessages(StorableQueue queue)
- throws
- InternalErrorException;
-
- /**
- * Get a new message ID
- *
- * @return A new message ID
- */
- public long getNewMessageId();
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.messageStore; + +import org.apache.qpid.server.exception.*; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.txn.TransactionManager; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.commons.configuration.Configuration; + +import javax.transaction.xa.Xid; +import java.util.Collection; + +/** + * Created by Arnaud Simon + * Date: 29-Mar-2007 + * Time: 17:34:02 + */ +public interface MessageStore +{ + /** + * Create a new exchange + * + * @param exchange the exchange to be persisted + * @throws InternalErrorException If an error occurs + */ + public void createExchange(Exchange exchange) + throws + InternalErrorException; + + /** + * Remove an exchange + * + * @param exchange The exchange to be removed + * @throws InternalErrorException If an error occurs + */ + public void removeExchange(Exchange exchange) + throws + InternalErrorException; + + /** + * Bind a queue with an exchange given a routing key + * + * @param exchange The exchange to bind the queue with + * @param routingKey The routing key + * @param queue The queue to be bound + * @param args Args + * @throws InternalErrorException If an error occurs + */ + public void bindQueue(Exchange exchange, + AMQShortString routingKey, + StorableQueue queue, FieldTable args) + throws + InternalErrorException; + + /** + * Unbind a queue from an exchange + * + * @param exchange The exchange the queue was bound to + * @param routingKey The routing queue + * @param queue The queue to unbind + * @param args args + * @throws InternalErrorException If an error occurs + */ + public void unbindQueue(Exchange exchange, + AMQShortString routingKey, + StorableQueue queue, FieldTable args) + throws + InternalErrorException; + + /** + * Called after instantiation in order to configure the message store. A particular implementation can define + * whatever parameters it wants. + * + * @param virtualHost The virtual host using by this store + * @param tm The transaction manager implementation + * @param base The base element identifier from which all configuration items are relative. For example, if the base + * element is "store", the all elements used by concrete classes will be "store.foo" etc. + * @param config The apache commons configuration object + * @throws InternalErrorException If an error occurs that means the store is unable to configure itself + * @throws IllegalArgumentException If the configuration arguments are illegal + */ + void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config) + throws + InternalErrorException, + IllegalArgumentException; + + /** + * Called to close and cleanup any resources used by the message store. + * + * @throws InternalErrorException if close fails + */ + void close() + throws + InternalErrorException; + + /** + * Create a queue + * + * @param queue the queue to be created + * @throws InternalErrorException In case of internal message store problem + * @throws QueueAlreadyExistsException If the queue already exists in the store + */ + public void createQueue(StorableQueue queue) + throws + InternalErrorException, + QueueAlreadyExistsException; + + /** + * Destroy a queue + * + * @param queue The queue to be destroyed + * @throws InternalErrorException In case of internal message store problem + * @throws QueueDoesntExistException If the queue does not exist in the store + */ + public void destroyQueue(StorableQueue queue) + throws + InternalErrorException, + QueueDoesntExistException; + + /** + * Stage the message before effective enqueue + * + * @param m The message to stage + * @throws InternalErrorException In case of internal message store problem + * @throws MessageAlreadyStagedException If the message is already staged + */ + public void stage(StorableMessage m) + throws + InternalErrorException, + MessageAlreadyStagedException; + + + /** + * Append more data with a previously staged message + * + * @param m The message to which data must be appended + * @param data Data to happen to the message + * @param offset The number of bytes from the beginning of the payload + * @param size The number of bytes to be written + * @throws InternalErrorException In case of internal message store problem + * @throws MessageDoesntExistException If the message has not been staged + */ + public void appendContent(StorableMessage m, byte[] data, int offset, int size) + throws + InternalErrorException, + MessageDoesntExistException; + + /** + * Get the content of previously staged or enqueued message. + * The message headers are also set. + * + * @param m The message for which the content must be loaded + * @param offset The number of bytes from the beginning of the payload + * @param size The number of bytes to be loaded + * @return The message content + * @throws InternalErrorException In case of internal message store problem + * @throws MessageDoesntExistException If the message does not exist + */ + public byte[] loadContent(StorableMessage m, int offset, int size) + throws + InternalErrorException, + MessageDoesntExistException; + + /** + * Get the content header of this message + * + * @param m The message + * @return The message content + * @throws InternalErrorException In case of internal message store problem + * @throws MessageDoesntExistException If the message does not exist + */ + public ContentHeaderBody getContentHeaderBody(StorableMessage m) + throws + InternalErrorException, + MessageDoesntExistException; + + /** + * Get the MessagePublishInfo of this message + * + * @param m The message + * @return The message content + * @throws InternalErrorException In case of internal message store problem + * @throws MessageDoesntExistException If the message does not exist + */ + public MessagePublishInfo getMessagePublishInfo(StorableMessage m) + throws + InternalErrorException, + MessageDoesntExistException; + + /** + * Destroy a previously staged message + * + * @param m the message to be destroyed + * @throws InternalErrorException In case of internal message store problem + * @throws MessageDoesntExistException If the message does not exist in the store + */ + public void destroy(StorableMessage m) + throws + InternalErrorException, + MessageDoesntExistException; + + /** + * Enqueue a message under the scope of the transaction branch + * identified by xid when specified. + * <p> This operation is propagated to the queue and the message. + * <p> A message that has been previously staged is assumed to have had + * its payload already added (see appendContent) + * + * @param xid The xid of the transaction branch under which the message must be enqueued. + * <p> It he xid is null then the message is enqueued outside the scope of any transaction. + * @param m The message to be enqueued + * @param queue The queue into which the message must be enqueued + * @throws InternalErrorException In case of internal message store problem + * @throws QueueDoesntExistException If the queue does not exist in the store + * @throws InvalidXidException The transaction branch is invalid + * @throws UnknownXidException The transaction branch is unknown + * @throws MessageDoesntExistException If the Message does not exist + */ + public void enqueue(Xid xid, StorableMessage m, StorableQueue queue) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException; + + /** + * Dequeue a message under the scope of the transaction branch identified by xid + * if specified. + * <p> This operation is propagated to the queue and the message. + * + * @param xid The xid of the transaction branch under which the message must be dequeued. + * <p> It he xid is null then the message is dequeued outside the scope of any transaction. + * @param m The message to be dequeued + * @param queue The queue from which the message must be dequeued + * @throws InternalErrorException In case of internal message store problem + * @throws QueueDoesntExistException If the queue does not exist in the store + * @throws InvalidXidException The transaction branch is invalid + * @throws UnknownXidException The transaction branch is unknown + */ + public void dequeue(Xid xid, StorableMessage m, StorableQueue queue) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException; + + //========================================================= + // Recovery specific methods + //========================================================= + + /** + * List all the persistent queues + * + * @return All the persistent queues + * @throws InternalErrorException In case of internal message store problem + */ + public Collection<StorableQueue> getAllQueues() + throws + InternalErrorException; + + /** + * All enqueued messages of a given queue + * + * @param queue The queue where the message are retrieved from + * @return The list all enqueued messages of a given queue + * @throws InternalErrorException In case of internal message store problem + */ + public Collection<StorableMessage> getAllMessages(StorableQueue queue) + throws + InternalErrorException; + + /** + * Get a new message ID + * + * @return A new message ID + */ + public long getNewMessageId(); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java new file mode 100644 index 0000000000..49af218b7a --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java @@ -0,0 +1,135 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.messageStore; + +import java.util.ArrayList; + +/** + * Created by Arnaud Simon + * Date: 15-May-2007 + * Time: 10:11:49 + */ +public abstract class Pool +{ + // The maximum size of the pool. + private int _maxPoolSize = -1; + // The current size of the pool. + private int _currentPoolSize = 0; + // The pool objects. + private volatile ArrayList<Object> _poolObjects = new ArrayList<Object>(); + //The current number of created instances. + private int _instanceCount = 0; + + /** + * Create a pool of specified size. Negative or null pool sizes are + * disallowed. + * + * @param poolSize The size of the pool to create. Should be 1 or + * greater. + * @throws Exception If the pool size is less than 1. + */ + public Pool(int poolSize) throws Exception + { + if (poolSize <= 0) + { + throw new Exception("pool size is less than 1: " + poolSize); + } + _maxPoolSize = poolSize; + } + + /** + * Return the maximum size of this pool. + * + * @return The maximum size of this pool. + */ + public final int maxSize() + { + return _maxPoolSize; + } + + /** + * Return the current number of created instances. + * + * @return The current number of created instances in this pool. + */ + public final int numberOfInstances() + { + return _instanceCount; + } + + /** + * Extending classes MUST define how to create an instance of the object + * that they pool. + * + * @return An instance of the pooled object. + * @throws Exception In case of internal error. + */ + abstract protected Object createInstance() throws Exception; + + /** + * Remove the next available object from the pool or wait for one to become + * available. + * + * @return The next available instance. + * @throws Exception If the call is interrupted + */ + public final synchronized Object acquireInstance() throws Exception + { + while (_currentPoolSize == _maxPoolSize) + { + try + { + this.wait(); + } + catch (InterruptedException e) + { + throw new Exception("pool wait threw interrupted exception", e); + } + } + if (_poolObjects.size() == 0) + { + _poolObjects.add(createInstance()); + _instanceCount++; + } + _currentPoolSize++; + return _poolObjects.remove(0); + } + + /** + * Return an object back into this pool. + * + * @param object The returning object. + */ + public synchronized void releaseInstance(Object object) + { + _poolObjects.add(object); + _currentPoolSize--; + this.notify(); + } + + /** + * Return a dead object back into this pool. + * + */ + public synchronized void releaseDeadInstance() + { + _instanceCount--; + _currentPoolSize--; + this.notify(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java index 560549c126..47126f4b68 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,7 +24,11 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.JDBCStore; +import org.apache.qpid.server.exception.InternalErrorException; +import org.apache.qpid.server.exception.MessageDoesntExistException; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.log4j.Logger; import javax.transaction.xa.Xid; @@ -65,7 +69,7 @@ public class StorableMessageHandle implements AMQMessageHandle // MessagePublishInfo private MessagePublishInfo _messagePublishInfo; // list of chunks - private List<ContentChunk> _chunks = new LinkedList<ContentChunk>(); + private List<ContentChunk> _chunks; //======================================================================== // Constructors @@ -84,6 +88,17 @@ public class StorableMessageHandle implements AMQMessageHandle throws AMQException { + if (_contentHeaderBody == null) + { + // load it from the store + try + { + _contentHeaderBody = _messageStore.getContentHeaderBody(_message); + } catch (Exception e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e); + } + } return _contentHeaderBody; } @@ -91,6 +106,17 @@ public class StorableMessageHandle implements AMQMessageHandle throws AMQException { + if (_chunks == null ) + { + if(_message.isStaged() ) + { + loadChunks(); + } + else + { + return 0; + } + } return _chunks.size(); } @@ -106,13 +132,57 @@ public class StorableMessageHandle implements AMQMessageHandle IllegalArgumentException, AMQException { + if (_chunks == null) + { + loadChunks(); + } return _chunks.get(index); } + private void loadChunks() + throws + AMQException + { + try + { + _chunks = new LinkedList<ContentChunk>(); + byte[] underlying = _messageStore.loadContent(_message, 1, 0); + final int size = underlying.length; + final org.apache.mina.common.ByteBuffer data = + org.apache.mina.common.ByteBuffer.wrap(underlying); + ContentChunk cb = new ContentChunk() + { + + public int getSize() + { + return size; + } + + public org.apache.mina.common.ByteBuffer getData() + { + return data; + } + + public void reduceToFit() + { + + } + }; + _chunks.add(cb); + } catch (Exception e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e); + } + } + public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) throws AMQException { + if (_chunks == null) + { + _chunks = new LinkedList<ContentChunk>(); + } _chunks.add(contentBody); // if rquired this message can be added to the store //_messageStore.appendContent(_message, _payload, 0, 10); @@ -123,6 +193,17 @@ public class StorableMessageHandle implements AMQMessageHandle throws AMQException { + if (_messagePublishInfo == null) + { + // read it from the store + try + { + _messagePublishInfo = _messageStore.getMessagePublishInfo(_message); + } catch (Exception e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e); + } + } return _messagePublishInfo; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java new file mode 100644 index 0000000000..cc0cc7140b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java @@ -0,0 +1,93 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.messageStore.StorableQueue; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.exception.InternalErrorException; +import org.apache.qpid.server.exception.UnknownXidException; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 16-May-2007 + * Time: 15:15:18 + */ +public abstract class JDBCAbstractRecord implements TransactionRecord +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(JDBCEnqueueRecord.class); + // The record types + static public final int TYPE_DEQUEUE = 1; + static public final int TYPE_ENQUEUE = 2; + + // the queue + StorableQueue _queue; + // the message + StorableMessage _message; + + //======================================================================== + // Constructor + //======================================================================== + public JDBCAbstractRecord(StorableMessage m, StorableQueue queue) + { + _queue = queue; + _message = m; + } + + public abstract int getType(); + public long getMessageID() + { + return _message.getMessageId(); + } + + public int getQueueID() + { + return _queue.getQueueID(); + } + + public void rollback(MessageStore store) + throws + InternalErrorException + { + + } + + public void prepare(MessageStore store) + throws + InternalErrorException + { + } + + + public abstract void rollback(MessageStore store, Xid xid) + throws + InternalErrorException, + UnknownXidException; + + public abstract void prepare(MessageStore store, Xid xid) + throws + InternalErrorException, + UnknownXidException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java new file mode 100644 index 0000000000..c18a6a7fcf --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java @@ -0,0 +1,85 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.messageStore.StorableQueue; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.messageStore.JDBCStore; +import org.apache.qpid.server.exception.*; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 16-May-2007 + * Time: 14:50:34 + */ +public class JDBCDequeueRecord extends JDBCAbstractRecord +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(JDBCDequeueRecord.class); + + //======================================================================== + // Constructor + //======================================================================== + public JDBCDequeueRecord( StorableMessage m, StorableQueue queue) + { + super(m, queue); + } + + //======================================================================== + // Interface TransactionRecord + //======================================================================== + + public void commit(MessageStore store, Xid xid) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException + { + store.dequeue(xid, _message, _queue); + } + + public void rollback(MessageStore store, Xid xid) + throws + InternalErrorException, + UnknownXidException + { + ((JDBCStore) store).rollbackDequeu(xid, _message, _queue); + } + + public void prepare(MessageStore store, Xid xid) + throws + InternalErrorException, + UnknownXidException + { + ((JDBCStore) store).prepareDequeu(xid, _message, _queue); + } + + public int getType() + { + return TYPE_DEQUEUE; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java new file mode 100644 index 0000000000..4a4a23153e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java @@ -0,0 +1,106 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.messageStore.StorableQueue; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.messageStore.JDBCStore; +import org.apache.qpid.server.exception.*; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 16-May-2007 + * Time: 14:50:20 + */ +public class JDBCEnqueueRecord extends JDBCAbstractRecord +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(JDBCEnqueueRecord.class); + + //======================================================================== + // Constructor + //======================================================================== + public JDBCEnqueueRecord(StorableMessage m, StorableQueue queue) + { + super(m, queue); + } + + //======================================================================== + // Interface TransactionRecord + //======================================================================== + + public void commit(MessageStore store, Xid xid) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException + { + store.enqueue(xid, _message, _queue); + } + + public void rollback(MessageStore store, Xid xid) + throws + InternalErrorException, + UnknownXidException + { + if (!_message.isEnqueued()) + { + // try to delete the message + try + { + store.destroy(_message); + } catch (Exception e) + { + throw new InternalErrorException("Problem when destoying message ", e); + } + } + } + + public void prepare(MessageStore store, Xid xid) + throws + InternalErrorException, + UnknownXidException + { + try + { + if (!_message.isEnqueued() && !_message.isStaged()) + { + store.stage(_message); + store.appendContent(_message, _message.getData(), 0, _message.getData().length); + } + } + catch (Exception e) + { + throw new InternalErrorException("Problem when persisting message ", e); + } + } + + public int getType() + { + return TYPE_ENQUEUE; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java new file mode 100644 index 0000000000..cc35b50cc8 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java @@ -0,0 +1,196 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.messageStore.JDBCStore; + +import java.util.List; +import java.util.LinkedList; + +/** + * Created by Arnaud Simon + * Date: 16-May-2007 + * Time: 14:09:35 + */ +public class JDBCTransaction implements Transaction +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(JDBCTransaction.class); + public static long _xidId = 0; + //======================================================================== + // Instance Fields + //======================================================================== + // the associated connection + private JDBCStore.MyConnection _connection; + // Indicates whether this transaction is prepared + private boolean _prepared = false; + // Indicates that this transaction has heuristically rolled back + private boolean _heurRollBack = false; + // The list of records associated with this tx + private List<TransactionRecord> _records = new LinkedList<TransactionRecord>(); + // The date when this tx has been created. + private long _dateCreated; + // The timeout in seconds + private long _timeout; + // this instance xid id used as primary key + private long _thisxidId; + + //========================================================= + // Constructors + //========================================================= + + /** + * Create a transaction + * + */ + public JDBCTransaction() + { + _dateCreated = System.currentTimeMillis(); + _thisxidId = _xidId++; + } + + //========================================================= + // Getter and Setter methods + //========================================================= + + /** + * Notify that this tx has been prepared + */ + public void prepare() + { + _prepared = true; + } + + /** + * Specify whether this transaction is prepared + * + * @return true if this transaction is prepared, false otherwise + */ + public boolean isPrepared() + { + return _prepared; + } + + /** + * Notify that this tx has been heuristically rolled back + */ + public void heurRollback() + { + _heurRollBack = true; + } + + /** + * Specify whether this transaction has been heuristically rolled back + * + * @return true if this transaction has been heuristically rolled back , false otherwise + */ + public boolean isHeurRollback() + { + return _heurRollBack; + } + + /** + * Add an abstract record to this tx. + * + * @param record The record to be added + */ + public void addRecord(TransactionRecord record) + { + _records.add(record); + } + + /** + * Get the list of records associated with this tx. + * + * @return The list of records associated with this tx. + */ + public List<TransactionRecord> getrecords() + { + return _records; + } + + /** + * Set this tx timeout + * + * @param timeout This tx timeout in seconds + */ + public void setTimeout(long timeout) + { + _timeout = timeout; + } + + /** + * Get this tx timeout + * + * @return This tx timeout in seconds + */ + public long getTimeout() + { + return _timeout; + } + + /** + * Specify whether this tx has expired + * + * @return true if this tx has expired, false otherwise + */ + public boolean hasExpired() + { + long currentDate = System.currentTimeMillis(); + boolean result = currentDate - _dateCreated > _timeout * 1000; + if (_log.isDebugEnabled() && result) + { + _log.debug("transaction has expired"); + } + return result; + } + + /** + * Get the JDBC connection + * @return The JDBC connection + */ + public JDBCStore.MyConnection getConnection() + { + return _connection; + } + + /** + * Set the JDBC connection + * + * @param connection The new JDBC connection + */ + public void setConnection(JDBCStore.MyConnection connection) + { + _connection = connection; + } + + /** + * This tx xid id used as primary key + * + * @return this tx xid id + */ + public long getXidID() + { + return _thisxidId; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java new file mode 100644 index 0000000000..e60fb89bf7 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java @@ -0,0 +1,554 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.messageStore.JDBCStore; +import org.apache.qpid.server.exception.*; +import org.apache.commons.configuration.Configuration; + +import javax.transaction.xa.Xid; +import java.util.HashMap; +import java.util.Set; + + +/** + * Created by Arnaud Simon + * Date: 16-May-2007 + * Time: 14:05:45 + */ +public class JDBCTransactionManager implements TransactionManager +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(JDBCTransactionManager.class); + + private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout"; + + //======================================================================== + // Instance Fields + //======================================================================== + // The underlying jdbc message store + private JDBCStore _messagStore; + + // A map of XID/x + private HashMap<Xid, Transaction> _xidMap; + + // A map of in-doubt txs + private HashMap<Xid, Transaction> _indoubtXidMap; + + // A default tx timeout in sec + private int _defaultTimeout; // set to 10s if not specified in the config + + //=================================== + //=== Configuartion + //=================================== + + /** + * Configure this TM with the Message store implementation + * + * @param base The base element identifier from which all configuration items are relative. For example, if the base + * element is "store", the all elements used by concrete classes will be "store.foo" etc. + * @param config The apache commons configuration object + * @param messageStroe the message store associated with the TM + */ + public void configure(MessageStore messageStroe, String base, Configuration config) + { + _messagStore = (JDBCStore) messageStroe; + if (config != null) + { + _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 120); + } else + { + _defaultTimeout = 120; + } + _log.info("Using transaction timeout of " + _defaultTimeout + " s"); + // get the list of in-doubt transactions + try + { + _indoubtXidMap = _messagStore.getAllInddoubt(); + _xidMap = _indoubtXidMap; + } catch (Exception e) + { + _log.fatal("Cannot recover in-doubt transactions", e); + } + } + + //=================================== + //=== TransactionManager interface + //=================================== + + /** + * Begin a transaction branch identified by Xid + * + * @param xid The xid of the branch to begin + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws InvalidXidException The Xid is invalid + */ + public synchronized XAFlag begin(Xid xid) + throws + InternalErrorException, + InvalidXidException + { + if (xid == null) + { + throw new InvalidXidException(xid, "null xid"); + } + if (_xidMap.containsKey(xid)) + { + throw new InvalidXidException(xid, "Xid already exist"); + } + Transaction tx = new JDBCTransaction(); + tx.setTimeout(_defaultTimeout); + _xidMap.put(xid, tx); + return XAFlag.ok; + } + + /** + * Prepare the transaction branch identified by Xid + * + * @param xid The xid of the branch to prepare + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution. + * <li> <code>XAFlag.rdonly</code>: The transaction branch was read-only and has been committed. + * <li> <code>XAFlag.rbrollback</code>: The transaction branch was marked rollback-only for an unspecied reason. + * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Prepare has been call in an improper context + * @throws UnknownXidException The Xid is unknown + */ + public synchronized XAFlag prepare(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException + { + // get the transaction + JDBCTransaction tx = getTransaction(xid); + XAFlag result = XAFlag.ok; + if (tx.isHeurRollback()) + { + result = XAFlag.rdonly; + } else if (tx.hasExpired()) + { + result = XAFlag.rbtimeout; + // rollback this tx branch + rollback(xid); + } else + { + if (tx.isPrepared()) + { + throw new CommandInvalidException("TransactionImpl is already prepared"); + } + if (tx.getrecords().size() == 0) + { + // the tx was read only (no work has been done) + _xidMap.remove(xid); + result = XAFlag.rdonly; + } else + { + try + { + JDBCStore.MyConnection con = _messagStore.getConnection(); + tx.setConnection(con); + // save the xid + _messagStore.saveXID(con, tx, xid); + for (TransactionRecord record : tx.getrecords()) + { + if (record instanceof JDBCAbstractRecord) + { + ((JDBCAbstractRecord) record).prepare(_messagStore, xid); + _messagStore.saveRecord(con, tx, (JDBCAbstractRecord) record); + } else + { + record.prepare(_messagStore); + } + } + _messagStore.commitConnection(con); + tx.setConnection(null); + } catch (Exception e) + { + _log.error("Cannot prepare tx: " + xid); + throw new InternalErrorException("Cannot prepare tx: " + xid); + } + tx.prepare(); + } + } + return result; + } + + /** + * Rollback the transaction branch identified by Xid + * + * @param xid The xid of the branch to rollback + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution, + * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed. + * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed. + * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back. + * <li> <code>NOT SUPPORTED XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back. + * <li> <code>NOT SUPPORTED XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason. + * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Rollback has been call in an improper context + * @throws UnknownXidException The Xid is unknown + */ + public synchronized XAFlag rollback(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException + { + // get the transaction + JDBCTransaction tx = getTransaction(xid); + XAFlag flag = XAFlag.ok; + if (tx.isHeurRollback()) + { + flag = XAFlag.heurrb; + } else + { + try + { + JDBCStore.MyConnection con = _messagStore.getConnection(); + tx.setConnection(con); + for (TransactionRecord record : tx.getrecords()) + { + if (record instanceof JDBCAbstractRecord) + { + ((JDBCAbstractRecord) record).rollback(_messagStore, xid); + } else + { + record.rollback(_messagStore); + } + } + if (tx.isPrepared()) + { + _messagStore.deleteRecords(con, tx); + _messagStore.deleteXID(con, tx); + _messagStore.commitConnection(con); + } + _messagStore.commitConnection(con); + tx.setConnection(null); + } + catch (Exception e) + { + // this should not happen + _log.error("Error when rolling back distributed transaction: " + xid); + throw new InternalErrorException("Error when rolling back distributed transaction: " + xid, e); + } + removeTransaction(xid); + } + if (tx.hasExpired()) + { + flag = XAFlag.rbtimeout; + } + return flag; + } + + /** + * Commit the transaction branch identified by Xid + * + * @param xid The xid of the branch to commit + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution, + * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed. + * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the specied transaction branch was committed. + * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back. + * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Commit has been call in an improper context + * @throws UnknownXidException The Xid is unknown + * @throws org.apache.qpid.server.exception.NotPreparedException + * The branch was not prepared prior to commit + */ + public synchronized XAFlag commit(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException, + NotPreparedException + { + // get the transaction + JDBCTransaction tx = getTransaction(xid); + XAFlag flag = XAFlag.ok; + if (tx.isHeurRollback()) + { + flag = XAFlag.heurrb; + } else if (tx.hasExpired()) + { + flag = XAFlag.rbtimeout; + // rollback this tx branch + rollback(xid); + } else + { + if (!tx.isPrepared()) + { + throw new NotPreparedException("TransactionImpl is not prepared"); + } + try + { + JDBCStore.MyConnection con = _messagStore.getConnection(); + tx.setConnection(con); + for (TransactionRecord record : tx.getrecords()) + { + try + { + record.commit(_messagStore, xid); + } catch (InvalidXidException e) + { + throw new UnknownXidException(xid, e); + } catch (Exception e) + { + // this should not happen as the queue and the message must exist + _log.error("Error when committing distributed transaction heurmix mode returned: " + xid); + flag = XAFlag.heurmix; + } + } + _messagStore.deleteRecords(con, tx); + _messagStore.deleteXID(con, tx); + _messagStore.commitConnection(con); + tx.setConnection(null); + } catch (Exception e) + { + // this should not happen + _log.error("Error when committing distributed transaction heurrb mode returned: " + xid); + throw new InternalErrorException("Error when committing distributed transaction: " + xid, e); + } + removeTransaction(xid); + } + return flag; + } + + /** + * One phase commit the transaction branch identified by Xid + * + * @param xid The xid of the branch to one phase commit + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution, + * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed. + * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed. + * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back. + * <li> <code>NOT SUPPORTED XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back. + * <li> <code>NOT SUPPORTED XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason. + * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Commit has been call in an improper context + * @throws UnknownXidException The Xid is unknown + */ + public synchronized XAFlag commit_one_phase(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException + { + XAFlag flag = XAFlag.ok; + JDBCTransaction tx = getTransaction(xid); + if (tx.isHeurRollback()) + { + flag = XAFlag.heurrb; + } else if (tx.hasExpired()) + { + flag = XAFlag.rbtimeout; + // rollback this tx branch + rollback(xid); + } else + { + try + { + // we do not need to prepare the tx + tx.prepare(); + JDBCStore.MyConnection con = _messagStore.getConnection(); + tx.setConnection(con); + for (TransactionRecord record : tx.getrecords()) + { + try + { + record.commit(_messagStore, xid); + } catch (InvalidXidException e) + { + throw new UnknownXidException(xid, e); + } catch (Exception e) + { + // this should not happen as the queue and the message must exist + _log.error("Error when committing transaction heurmix mode returned: " + xid); + flag = XAFlag.heurmix; + } + } + _messagStore.commitConnection(con); + tx.setConnection(null); + } catch (Exception e) + { + e.printStackTrace(); + throw new InternalErrorException("cannot commit transaxtion with xid " + xid + " " + e, e); + } + finally + { + removeTransaction(xid); + } + } + return flag; + } + + /** + * Forget about the transaction branch identified by Xid + * + * @param xid The xid of the branch to forget + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Forget has been call in an improper context + * @throws UnknownXidException The Xid is unknown + */ + public void forget(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException + { + synchronized (xid) + { + getTransaction(xid); + removeTransaction(xid); + } + } + + /** + * Set the transaction branch timeout value in seconds + * + * @param xid The xid of the branch to set timeout + * @param timeout Timeout value in seconds + * @throws InternalErrorException In case of internal problem + * @throws UnknownXidException The Xid is unknown + */ + public void setTimeout(Xid xid, long timeout) + throws + InternalErrorException, + UnknownXidException + { + JDBCTransaction tx = getTransaction(xid); + tx.setTimeout(timeout); + } + + /** + * Get the transaction branch timeout + * + * @param xid The xid of the branch to get the timeout from + * @return The timeout associated with the branch identified with xid + * @throws InternalErrorException In case of internal problem + * @throws UnknownXidException The Xid is unknown + */ + public long getTimeout(Xid xid) + throws + InternalErrorException, + UnknownXidException + { + JDBCTransaction tx = getTransaction(xid); + return tx.getTimeout(); + } + + /** + * Get a set of Xids the RM has prepared or heuristically completed + * + * @param startscan Indicates that recovery scan should start + * @param endscan Indicates that the recovery scan should end after returning the Xids + * @return Set of Xids the RM has prepared or heuristically completed + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Recover has been call in an improper context + */ + public Set<Xid> recover(boolean startscan, boolean endscan) + throws + InternalErrorException, + CommandInvalidException + { + return _indoubtXidMap.keySet(); + } + + /** + * An error happened (for example the channel has been abruptly closed) + * with this Xid, TM must make a heuristical decision. + * + * @param xid The Xid of the transaction branch to be heuristically completed + * @throws UnknownXidException The Xid is unknown + * @throws InternalErrorException In case of internal problem + */ + public void HeuristicOutcome(Xid xid) + throws + UnknownXidException, + InternalErrorException + { + synchronized (xid) + { + JDBCTransaction tx = getTransaction(xid); + if (!tx.isPrepared()) + { + // heuristically rollback this tx + for (TransactionRecord record : tx.getrecords()) + { + record.rollback(_messagStore); + } + tx.heurRollback(); + } + // add this branch in the list of indoubt tx + _indoubtXidMap.put(xid, tx); + } + } + + + public JDBCTransaction getTransaction(Xid xid) + throws + UnknownXidException + { + Transaction tx = _xidMap.get(xid); + if (tx == null) + { + throw new UnknownXidException(xid); + } + return (JDBCTransaction) tx; + } + + //========================================================================== + //== Methods for Message Store + //========================================================================== + + /** + * Get the default tx timeout in seconds + * + * @return the default tx timeout in seconds + */ + public int getDefaultTimeout() + { + return _defaultTimeout; + } + //========================================================================== + //== Private Methods + //========================================================================== + + private void removeTransaction(Xid xid) + { + _xidMap.remove(xid); + _indoubtXidMap.remove(xid); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index addb0c791f..8becaf52b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -93,7 +93,10 @@ public class NonTransactionalContext implements TransactionalContext { try { - message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue); + if( ! deliverFirst ) + { + message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue); + } queue.process(_storeContext, message, deliverFirst); //following check implements the functionality //required by the 'immediate' flag: diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index f7c578d9a0..12b2a4f7a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -57,10 +57,14 @@ public class NullApplicationRegistry extends ApplicationRegistry super(new MapConfiguration(new HashMap())); } - public void initialise() throws Exception + public void initialise() + throws + Exception { _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.MemoryMessageStore"); _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.MemoryTransactionManager"); + // _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.JDBCStore"); + // _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.JDBCTransactionManager"); Properties users = new Properties(); @@ -121,3 +125,4 @@ public class NullApplicationRegistry extends ApplicationRegistry } + |
