From 3e981a1d1308bd57df075f423041d45bef73fed6 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Mon, 21 May 2007 16:34:03 +0000 Subject: Documented remaining exceptions. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540198 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/exception/CommandInvalidException.java | 39 +++--- .../server/exception/InternalErrorException.java | 25 +++- .../qpid/server/exception/InvalidXidException.java | 20 ++- .../exception/MessageAlreadyStagedException.java | 12 +- .../exception/MessageDoesntExistException.java | 18 ++- .../server/exception/NotPreparedException.java | 21 ++- .../exception/QueueAlreadyExistsException.java | 17 ++- .../exception/QueueDoesntExistException.java | 19 ++- .../qpid/server/exception/UnknownXidException.java | 23 +++- .../server/messageStore/MemoryMessageStore.java | 133 ++++++++---------- .../org/apache/qpid/server/txn/EnqueueRecord.java | 37 ++--- .../qpid/server/txn/MemoryTransactionManager.java | 152 ++++++++++----------- 12 files changed, 277 insertions(+), 239 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java index ae1b916144..2b5fbf9795 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,22 +18,16 @@ package org.apache.qpid.server.exception; /** - * Created by Arnaud Simon - * Date: 29-Mar-2007 - * Time: 15:52:29 + * CommandInvalidException indicates that an innapropriate request has been made to a transaction manager. For example, + * calling prepare on an already prepared transaction. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents an error due to an innapropriate request to a transction manager. + *
*/ public class CommandInvalidException extends Exception { - /** - * Constructs a new CommandInvalidException with the specified detail message. - * - * @param message the detail message. - */ - public CommandInvalidException(String message) - { - super(message); - } - /** * Constructs a new CommandInvalidException with the specified detail message and * cause. @@ -46,14 +40,27 @@ public class CommandInvalidException extends Exception super(message, cause); } + /** + * Constructs a new CommandInvalidException with the specified detail message. + * + * @param message the detail message. + * + * @deprected + */ + public CommandInvalidException(String message) + { + super(message); + } + /** * Constructs a new CommandInvalidException with the specified cause. * * @param cause the cause + * + * @deprected */ public CommandInvalidException(Throwable cause) { super(cause); } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java index f5fcfeee8f..60e1514d2b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,9 +18,15 @@ package org.apache.qpid.server.exception; /** - * Created by Arnaud Simon - * Date: 29-Mar-2007 - * Time: 14:41:53 + * General purpose exception for non-specifc error cases. Do not use. + * + * @deprected Far too broad to be a checked exception. Will be abused as a "don't know what to do with it" exception + * when Runtimes should be used. If this has a specific meaning within transaction managers, it should + * be renamed to something like TxManagerException, for example. At the moment, transaction managers are not + * catching this exception and taking some action, so it is clear that it is being used for errors that are + * not recoverable/handleable from; use runtimes. So far, it is only caught to be rethrown as AMQException, + * which is the other catch-all exception case to be eliminated. There are sequences in the code where + * AMQException is caught and rethrown as InternalErrorException, which is cause and rethrown as AMQException. */ public class InternalErrorException extends Exception { @@ -28,6 +34,8 @@ public class InternalErrorException extends Exception * Constructs a new InternalErrorException with the specified detail message. * * @param message the detail message. + * + * @deprected */ public InternalErrorException(String message) { @@ -50,8 +58,11 @@ public class InternalErrorException extends Exception * Constructs a new InternalErrorException with the specified cause. * * @param cause the cause + * + * @deprected */ - public InternalErrorException(Throwable cause) { + public InternalErrorException(Throwable cause) + { super(cause); } -} \ No newline at end of file +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java index 3cae098403..277614afff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,9 +20,13 @@ package org.apache.qpid.server.exception; import javax.transaction.xa.Xid; /** - * Created by Arnaud Simon - * Date: 29-Mar-2007 - * Time: 14:12:27 + * InvalidXidException indicates that an Xid under which to conduct a transaction is invalid. This may be because it + * has an incorrect format, is null, or a transcaction with the same Xid is already running. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents an invalid Xid for a transaction. + *
*/ public class InvalidXidException extends Exception { @@ -30,6 +34,8 @@ public class InvalidXidException extends Exception * Constructs a newr InvalidXidException with a standard message * * @param xid The invalid xid. + * + * @deprected */ public InvalidXidException(Xid xid) { @@ -41,6 +47,8 @@ public class InvalidXidException extends Exception * * @param xid The invalid xid. * @param cause The casue for the xid to be invalid + * + * @deprected */ public InvalidXidException(Xid xid, Throwable cause) { @@ -64,6 +72,8 @@ public class InvalidXidException extends Exception * @param reason The reason why the xid is invalid * @param xid The invalid xid. * @param cause The casue for the xid to be invalid + * + * @deprected */ public InvalidXidException(Xid xid, String reason, Throwable cause) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java index f95132a450..336f5fdf64 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,9 +18,7 @@ package org.apache.qpid.server.exception; /** - * Created by Arnaud Simon - * Date: 03-Apr-2007 - * Time: 09:46:31 + * @todo Need to understand what message staging is to document properly. */ public class MessageAlreadyStagedException extends Exception { @@ -40,6 +38,8 @@ public class MessageAlreadyStagedException extends Exception * * @param message the detail message . * @param cause the cause. + * + * @deprected */ public MessageAlreadyStagedException(String message, Throwable cause) { @@ -50,6 +50,8 @@ public class MessageAlreadyStagedException extends Exception * Constructs a new MessageAlreadyStagedException with the specified cause. * * @param cause the cause + * + * @deprected */ public MessageAlreadyStagedException(Throwable cause) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java index b8ffe91247..544d669d4b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,9 +18,13 @@ package org.apache.qpid.server.exception; /** - * Created by Arnaud Simon - * Date: 30-Mar-2007 - * Time: 10:52:29 + * MessageDoesntExistException indicates that a message store cannot find a message looked up by its id. This may + * indicate message loss. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents failure of message store to find a message. + *
*/ public class MessageDoesntExistException extends Exception { @@ -40,6 +44,8 @@ public class MessageDoesntExistException extends Exception * * @param message the detail message . * @param cause the cause. + * + * @deprected */ public MessageDoesntExistException(String message, Throwable cause) { @@ -50,6 +56,8 @@ public class MessageDoesntExistException extends Exception * Constructs a new MessageDoesntExistException with the specified cause. * * @param cause the cause + * + * @deprected */ public MessageDoesntExistException(Throwable cause) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java index af8c7374bf..ab288748f5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,9 +18,14 @@ package org.apache.qpid.server.exception; /** - * Created by Arnaud Simon - * Date: 29-Mar-2007 - * Time: 16:47:40 + * NotPreparedException indicates a failure to commit a transaction that has not been prepared. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents failure to commit an unprepared transaction. + *
+ * + * @todo There is already a CommandInvalidException which would seem to cover this too. Use it instead? */ public class NotPreparedException extends Exception { @@ -40,6 +45,8 @@ public class NotPreparedException extends Exception * * @param message the detail message . * @param cause the cause. + * + * @deprecated */ public NotPreparedException(String message, Throwable cause) { @@ -50,9 +57,11 @@ public class NotPreparedException extends Exception * Constructs a new NotPreparedException with the specified cause. * * @param cause the cause + * + * @deprected */ public NotPreparedException(Throwable cause) { super(cause); } -} \ No newline at end of file +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java index 39751261e4..3635d2c19d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,9 +18,12 @@ package org.apache.qpid.server.exception; /** - * Created by Arnaud Simon - * Date: 30-Mar-2007 - * Time: 10:49:00 + * QueueAlreadyExistsException inidicates failure of a message store to create a queue that already exists. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents failure to create a queue that already exists. + *
*/ public class QueueAlreadyExistsException extends Exception { @@ -40,6 +43,8 @@ public class QueueAlreadyExistsException extends Exception * * @param message the detail message . * @param cause the cause. + * + * @deprecated */ public QueueAlreadyExistsException(String message, Throwable cause) { @@ -50,6 +55,8 @@ public class QueueAlreadyExistsException extends Exception * Constructs a new QueueDoesntExistException with the specified cause. * * @param cause the cause + * + * @deprecated */ public QueueAlreadyExistsException(Throwable cause) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java index 88dea864a5..d05c152228 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,9 +18,12 @@ package org.apache.qpid.server.exception; /** - * Created by Arnaud Simon - * Date: 29-Mar-2007 - * Time: 17:38:24 + * MessageDoesntExistException indicates that a message store cannot find a queue. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents failure to find a queue on a message store. + *
*/ public class QueueDoesntExistException extends Exception { @@ -40,6 +43,8 @@ public class QueueDoesntExistException extends Exception * * @param message the detail message . * @param cause the cause. + * + * @deprecated */ public QueueDoesntExistException(String message, Throwable cause) { @@ -50,9 +55,11 @@ public class QueueDoesntExistException extends Exception * Constructs a new QueueDoesntExistException with the specified cause. * * @param cause the cause + * + * @deprecated */ public QueueDoesntExistException(Throwable cause) { super(cause); } -} \ No newline at end of file +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java index 9c1a28413f..b8a8de90a0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,9 +20,16 @@ package org.apache.qpid.server.exception; import javax.transaction.xa.Xid; /** - * Created by Arnaud Simon - * Date: 29-Mar-2007 - * Time: 15:45:06 + * UnknownXidException indicates that an Xid under which a transactional operation is to be run is not known. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents failure of a transaction manager to recognize an Xid. + *
+ * + * @todo Already have an InvalidXidException, this might be splitting things too far? There are cases where invalid is + * caught and rethrown as unknown. What is unknown specifically used for that invalid is not? For example, when + * recovering, is it important to distinguish between invalid and unknown? */ public class UnknownXidException extends Exception { @@ -30,6 +37,8 @@ public class UnknownXidException extends Exception * Constructs a newr UnknownXidException with a standard message * * @param xid The unknown xid. + * + * @deprecated */ public UnknownXidException(Xid xid) { @@ -41,6 +50,8 @@ public class UnknownXidException extends Exception * * @param xid The unknown xid. * @param cause The casue for the xid to be unknown + * + * @deprecated */ public UnknownXidException(Xid xid, Throwable cause) { @@ -52,6 +63,8 @@ public class UnknownXidException extends Exception * * @param reason The reason why the xid is unknown * @param xid The unknown xid. + * + * @deprecated */ public UnknownXidException(Xid xid, String reason) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java index 38fd9daa39..2bcebe0fa7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,22 +17,25 @@ */ package org.apache.qpid.server.messageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.txn.TransactionManager; -import org.apache.qpid.server.txn.TransactionRecord; -import org.apache.qpid.server.txn.MemoryEnqueueRecord; -import org.apache.qpid.server.txn.MemoryDequeueRecord; -import org.apache.qpid.server.exception.*; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.*; + +import javax.transaction.xa.Xid; + import org.apache.commons.configuration.Configuration; + import org.apache.log4j.Logger; -import javax.transaction.xa.Xid; -import java.util.*; -import java.io.ByteArrayOutputStream; -import java.nio.ByteBuffer; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exception.*; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.txn.MemoryDequeueRecord; +import org.apache.qpid.server.txn.MemoryEnqueueRecord; +import org.apache.qpid.server.txn.TransactionManager; +import org.apache.qpid.server.txn.TransactionRecord; +import org.apache.qpid.server.virtualhost.VirtualHost; /** * This a simple in-memory implementation of a message store i.e. nothing is persisted @@ -43,9 +46,9 @@ import java.nio.ByteBuffer; */ public class MemoryMessageStore implements MessageStore { - //======================================================================== + // ======================================================================== // Static Constants - //======================================================================== + // ======================================================================== // The logger for this class private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -58,42 +61,34 @@ public class MemoryMessageStore implements MessageStore // The transaction manager private TransactionManager _txm; - //======================================================================== + // ======================================================================== // Interface MessageStore - //======================================================================== + // ======================================================================== - public void removeExchange(Exchange exchange) - throws - InternalErrorException + public void removeExchange(Exchange exchange) throws InternalErrorException { - // do nothing this is inmemory + // do nothing this is inmemory } public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args) - throws - InternalErrorException + throws InternalErrorException { // do nothing this is inmemory } - public void createExchange(Exchange exchange) - throws - InternalErrorException + public void createExchange(Exchange exchange) throws InternalErrorException { // do nothing this is inmemory } public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args) - throws - InternalErrorException + throws InternalErrorException { // do nothing this is inmemory } public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config) - throws - InternalErrorException, - IllegalArgumentException + throws InternalErrorException, IllegalArgumentException { _log.info("Configuring memory message store"); // Initialise the maps @@ -103,19 +98,14 @@ public class MemoryMessageStore implements MessageStore _txm.configure(this, "txn", config); } - public void close() - throws - InternalErrorException + public void close() throws InternalErrorException { _log.info("Closing memory message store"); _stagedMessages.clear(); _queueMap.clear(); } - public void createQueue(StorableQueue queue) - throws - InternalErrorException, - QueueAlreadyExistsException + public void createQueue(StorableQueue queue) throws InternalErrorException, QueueAlreadyExistsException { if (_queueMap.containsKey(queue)) { @@ -125,10 +115,7 @@ public class MemoryMessageStore implements MessageStore _queueMap.put(queue, new LinkedList()); } - public void destroyQueue(StorableQueue queue) - throws - InternalErrorException, - QueueDoesntExistException + public void destroyQueue(StorableQueue queue) throws InternalErrorException, QueueDoesntExistException { if (!_queueMap.containsKey(queue)) { @@ -138,65 +125,56 @@ public class MemoryMessageStore implements MessageStore _queueMap.remove(queue); } - public void stage(StorableMessage m) - throws - InternalErrorException, - MessageAlreadyStagedException + public void stage(StorableMessage m) throws InternalErrorException, MessageAlreadyStagedException { if (_stagedMessages.containsKey(m)) { throw new MessageAlreadyStagedException("message " + m + " already staged"); } + _stagedMessages.put(m, new ByteArrayOutputStream()); m.staged(); } public void appendContent(StorableMessage m, byte[] data, int offset, int size) - throws - InternalErrorException, - MessageDoesntExistException + throws InternalErrorException, MessageDoesntExistException { if (!_stagedMessages.containsKey(m)) { throw new MessageDoesntExistException("message " + m + " has not been staged"); } + _stagedMessages.get(m).write(data, offset, size); } public byte[] loadContent(StorableMessage m, int offset, int size) - throws - InternalErrorException, - MessageDoesntExistException + throws InternalErrorException, MessageDoesntExistException { if (!_stagedMessages.containsKey(m)) { throw new MessageDoesntExistException("message " + m + " has not been staged"); } + byte[] result = new byte[size]; ByteBuffer buf = ByteBuffer.allocate(size); buf.put(_stagedMessages.get(m).toByteArray(), offset, size); buf.get(result); + return result; } - public void destroy(StorableMessage m) - throws - InternalErrorException, - MessageDoesntExistException + public void destroy(StorableMessage m) throws InternalErrorException, MessageDoesntExistException { if (!_stagedMessages.containsKey(m)) { throw new MessageDoesntExistException("message " + m + " has not been staged"); } + _stagedMessages.remove(m); } public void enqueue(Xid xid, StorableMessage m, StorableQueue queue) - throws - InternalErrorException, - QueueDoesntExistException, - InvalidXidException, - UnknownXidException, + throws InternalErrorException, QueueDoesntExistException, InvalidXidException, UnknownXidException, MessageDoesntExistException { if (xid != null) @@ -204,46 +182,49 @@ public class MemoryMessageStore implements MessageStore // this is a tx operation TransactionRecord enqueueRecord = new MemoryEnqueueRecord(m, queue); _txm.getTransaction(xid).addRecord(enqueueRecord); - } else + } + else { if (!_stagedMessages.containsKey(m)) { try { stage(m); - } catch (MessageAlreadyStagedException e) + } + catch (MessageAlreadyStagedException e) { - throw new InternalErrorException(e); + throw new InternalErrorException(e.getMessage(), e); } + appendContent(m, m.getData(), 0, m.getPayloadSize()); } + if (!_queueMap.containsKey(queue)) { throw new QueueDoesntExistException("queue " + queue + " dos not exist"); } + _queueMap.get(queue).add(m); m.enqueue(queue); } } public void dequeue(Xid xid, StorableMessage m, StorableQueue queue) - throws - InternalErrorException, - QueueDoesntExistException, - InvalidXidException, - UnknownXidException + throws InternalErrorException, QueueDoesntExistException, InvalidXidException, UnknownXidException { if (xid != null) { // this is a tx operation TransactionRecord dequeueRecord = new MemoryDequeueRecord(m, queue); _txm.getTransaction(xid).addRecord(dequeueRecord); - } else + } + else { if (!_queueMap.containsKey(queue)) { throw new QueueDoesntExistException("queue " + queue + " dos not exist"); } + m.dequeue(queue); _queueMap.get(queue).remove(m); if (!m.isEnqueued()) @@ -254,16 +235,12 @@ public class MemoryMessageStore implements MessageStore } } - public Collection getAllQueues() - throws - InternalErrorException + public Collection getAllQueues() throws InternalErrorException { return _queueMap.keySet(); } - public Collection getAllMessages(StorableQueue queue) - throws - InternalErrorException + public Collection getAllMessages(StorableQueue queue) throws InternalErrorException { return _queueMap.get(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java index cdf209fb12..6770d8a8dd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,14 +17,14 @@ */ package org.apache.qpid.server.txn; -import org.apache.qpid.server.messageStore.MessageStore; +import javax.transaction.xa.Xid; + +import org.apache.qpid.AMQException; import org.apache.qpid.server.exception.*; -import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.AMQException; - -import javax.transaction.xa.Xid; +import org.apache.qpid.server.store.StoreContext; /** * Created by Arnaud Simon @@ -47,33 +47,26 @@ public class EnqueueRecord implements TransactionRecord } public void commit(MessageStore store, Xid xid) - throws - InternalErrorException, - QueueDoesntExistException, - InvalidXidException, - UnknownXidException, + throws InternalErrorException, QueueDoesntExistException, InvalidXidException, UnknownXidException, MessageDoesntExistException { try { _queue.process(_storeContext, _msg, _first); - } catch (AMQException e) + } + catch (AMQException e) { - throw new InternalErrorException(e); + throw new InternalErrorException(e.getMessage(), e); } } - public void rollback(MessageStore store) - throws - InternalErrorException + public void rollback(MessageStore store) throws InternalErrorException { - //To change body of implemented methods use File | Settings | File Templates. + // To change body of implemented methods use File | Settings | File Templates. } - public void prepare(MessageStore store) - throws - InternalErrorException + public void prepare(MessageStore store) throws InternalErrorException { - //To change body of implemented methods use File | Settings | File Templates. + // To change body of implemented methods use File | Settings | File Templates. } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java index e740ff9c30..a361f85e2d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,14 +17,17 @@ */ package org.apache.qpid.server.txn; -import org.apache.qpid.server.exception.*; -import org.apache.qpid.server.messageStore.MessageStore; +import java.util.HashMap; +import java.util.Set; + +import javax.transaction.xa.Xid; + import org.apache.commons.configuration.Configuration; + import org.apache.log4j.Logger; -import javax.transaction.xa.Xid; -import java.util.Set; -import java.util.HashMap; +import org.apache.qpid.server.exception.*; +import org.apache.qpid.server.messageStore.MessageStore; /** * Created by Arnaud Simon @@ -33,17 +36,17 @@ import java.util.HashMap; */ public class MemoryTransactionManager implements TransactionManager { - //======================================================================== + // ======================================================================== // Static Constants - //======================================================================== + // ======================================================================== // The logger for this class private static final Logger _log = Logger.getLogger(MemoryTransactionManager.class); private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout"; - //======================================================================== + // ======================================================================== // Instance Fields - //======================================================================== + // ======================================================================== // The underlying BDB message store private MessageStore _messagStore; // A map of XID/BDBtx @@ -52,30 +55,29 @@ public class MemoryTransactionManager implements TransactionManager private HashMap _indoubtXidMap; // A default tx timeout in sec - private int _defaultTimeout; // set to 10s if not specified in the config + private int _defaultTimeout; // set to 10s if not specified in the config - //======================================================================== + // ======================================================================== // Interface TransactionManager - //======================================================================== + // ======================================================================== public void configure(MessageStore messageStroe, String base, Configuration config) { _messagStore = messageStroe; if (config != null) { _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 10); - } else + } + else { _defaultTimeout = 10; } + _log.info("Using transaction timeout of " + _defaultTimeout + " s"); _xidMap = new HashMap(); _indoubtXidMap = new HashMap(); } - public XAFlag begin(Xid xid) - throws - InternalErrorException, - InvalidXidException + public XAFlag begin(Xid xid) throws InternalErrorException, InvalidXidException { synchronized (xid) { @@ -83,22 +85,21 @@ public class MemoryTransactionManager implements TransactionManager { throw new InvalidXidException(xid, "null xid"); } + if (_xidMap.containsKey(xid)) { throw new InvalidXidException(xid, "Xid already exist"); } + MemoryTransaction tx = new MemoryTransaction(); tx.setTimeout(_defaultTimeout); _xidMap.put(xid, tx); + return XAFlag.ok; } } - public XAFlag prepare(Xid xid) - throws - InternalErrorException, - CommandInvalidException, - UnknownXidException + public XAFlag prepare(Xid xid) throws InternalErrorException, CommandInvalidException, UnknownXidException { synchronized (xid) { @@ -110,32 +111,32 @@ public class MemoryTransactionManager implements TransactionManager result = XAFlag.rbtimeout; // rollback this tx branch rollback(xid); - } else + } + else { if (tx.isPrepared()) { - throw new CommandInvalidException("TransactionImpl is already prepared"); + throw new CommandInvalidException("TransactionImpl is already prepared", null); } + if (tx.getrecords().size() == 0) { // the tx was read only (no work has been done) _xidMap.remove(xid); result = XAFlag.rdonly; - } else + } + else { // we need to persist the tx records tx.prepare(); } } + return result; } } - public XAFlag rollback(Xid xid) - throws - InternalErrorException, - CommandInvalidException, - UnknownXidException + public XAFlag rollback(Xid xid) throws InternalErrorException, CommandInvalidException, UnknownXidException { synchronized (xid) { @@ -145,28 +146,28 @@ public class MemoryTransactionManager implements TransactionManager if (tx.isHeurRollback()) { flag = XAFlag.heurrb; - } else + } + else { for (TransactionRecord record : tx.getrecords()) { record.rollback(_messagStore); } + _xidMap.remove(xid); } + if (tx.hasExpired()) { flag = XAFlag.rbtimeout; } + return flag; } } public XAFlag commit(Xid xid) - throws - InternalErrorException, - CommandInvalidException, - UnknownXidException, - NotPreparedException + throws InternalErrorException, CommandInvalidException, UnknownXidException, NotPreparedException { synchronized (xid) { @@ -176,43 +177,46 @@ public class MemoryTransactionManager implements TransactionManager if (tx.isHeurRollback()) { flag = XAFlag.heurrb; - } else if (tx.hasExpired()) + } + else if (tx.hasExpired()) { flag = XAFlag.rbtimeout; // rollback this tx branch rollback(xid); - } else + } + else { if (!tx.isPrepared()) { throw new NotPreparedException("TransactionImpl is not prepared"); } + for (TransactionRecord record : tx.getrecords()) { try { record.commit(_messagStore, xid); - } catch (InvalidXidException e) + } + catch (InvalidXidException e) { - throw new UnknownXidException(xid, e); - } catch (Exception e) + throw new UnknownXidException(xid, e.getMessage(), e); + } + catch (Exception e) { // this should not happen as the queue and the message must exist _log.error("Error when committing distributed transaction heurmix mode returned: " + xid); flag = XAFlag.heurmix; } } + _xidMap.remove(xid); } + return flag; } } - public XAFlag commit_one_phase(Xid xid) - throws - InternalErrorException, - CommandInvalidException, - UnknownXidException + public XAFlag commit_one_phase(Xid xid) throws InternalErrorException, CommandInvalidException, UnknownXidException { synchronized (xid) { @@ -221,12 +225,14 @@ public class MemoryTransactionManager implements TransactionManager if (tx.isHeurRollback()) { flag = XAFlag.heurrb; - } else if (tx.hasExpired()) + } + else if (tx.hasExpired()) { flag = XAFlag.rbtimeout; // rollback this tx branch rollback(xid); - } else + } + else { // we need to prepare the tx tx.prepare(); @@ -237,31 +243,30 @@ public class MemoryTransactionManager implements TransactionManager try { record.commit(_messagStore, xid); - } catch (InvalidXidException e) + } + catch (InvalidXidException e) { - throw new UnknownXidException(xid, e); - } catch (Exception e) + throw new UnknownXidException(xid, e.getMessage(), e); + } + catch (Exception e) { // this should not happen as the queue and the message must exist _log.error("Error when committing transaction heurmix mode returned: " + xid); flag = XAFlag.heurmix; } } - } + } finally { _xidMap.remove(xid); } } + return flag; } } - public void forget(Xid xid) - throws - InternalErrorException, - CommandInvalidException, - UnknownXidException + public void forget(Xid xid) throws InternalErrorException, CommandInvalidException, UnknownXidException { synchronized (xid) { @@ -269,36 +274,25 @@ public class MemoryTransactionManager implements TransactionManager } } - public void setTimeout(Xid xid, long timeout) - throws - InternalErrorException, - UnknownXidException + public void setTimeout(Xid xid, long timeout) throws InternalErrorException, UnknownXidException { Transaction tx = getTransaction(xid); tx.setTimeout(timeout); } - public long getTimeout(Xid xid) - throws - InternalErrorException, - UnknownXidException + public long getTimeout(Xid xid) throws InternalErrorException, UnknownXidException { Transaction tx = getTransaction(xid); + return tx.getTimeout(); } - public Set recover(boolean startscan, boolean endscan) - throws - InternalErrorException, - CommandInvalidException + public Set recover(boolean startscan, boolean endscan) throws InternalErrorException, CommandInvalidException { return _indoubtXidMap.keySet(); } - public void HeuristicOutcome(Xid xid) - throws - UnknownXidException, - InternalErrorException + public void HeuristicOutcome(Xid xid) throws UnknownXidException, InternalErrorException { synchronized (xid) { @@ -310,6 +304,7 @@ public class MemoryTransactionManager implements TransactionManager { record.rollback(_messagStore); } + tx.heurRollback(); } // add this branch in the list of indoubt tx @@ -317,15 +312,14 @@ public class MemoryTransactionManager implements TransactionManager } } - public Transaction getTransaction(Xid xid) - throws - UnknownXidException + public Transaction getTransaction(Xid xid) throws UnknownXidException { Transaction tx = _xidMap.get(xid); if (tx == null) { - throw new UnknownXidException(xid); + throw new UnknownXidException(xid, "", null); } + return tx; } } -- cgit v1.2.1