summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-05-02 16:49:03 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-05-02 16:49:03 +0000
commitc98198c56248d16465a1871aa11e9c94c0f950db (patch)
tree56272c427e9b36386a94b4f7bdddb402002bd1c0 /java/broker/src
parent4fa14823a4110d82c26edcc1aaf0cd9d325a9dd4 (diff)
downloadqpid-python-c98198c56248d16465a1871aa11e9c94c0f950db.tar.gz
I am commiting the patch supplied by Arnaud Simon. This patch contains support for dtx.
Currently there is one test case failing. I will try to fix it, if not Arnuad will provide a patch soon git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@534541 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java93
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java59
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java169
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java270
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java116
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java75
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java254
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java252
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java70
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java215
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java59
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java248
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java79
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java125
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java215
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java50
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java210
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java521
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java10
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java9
42 files changed, 3346 insertions, 457 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index d31359b019..ae1cf43f6c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -58,8 +58,10 @@ import org.apache.qpid.server.management.ManagedBroker;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
/**
* This MBean implements the broker management interface and exposes the
@@ -100,7 +102,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
* @param exchangeName
* @param type
* @param durable
- * @param autoDelete
* @throws JMException
*/
public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException
@@ -158,7 +159,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
* @param queueName
* @param durable
* @param owner
- * @param autoDelete
* @throws JMException
*/
public void createNewQueue(String queueName, String owner, boolean durable) throws JMException
@@ -180,7 +180,13 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
if (queue.isDurable() && !queue.isAutoDelete())
{
- _messageStore.createQueue(queue);
+ try
+ {
+ _messageStore.createQueue(queue);
+ } catch (Exception e)
+ {
+ throw new JMException("problem creating queue " + queue.getName());
+ }
}
Configuration virtualHostDefaultQueueConfiguration =
@@ -222,10 +228,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
try
{
queue.delete();
- _messageStore.removeQueue(new AMQShortString(queueName));
-
+ _messageStore.destroyQueue(queue);
}
- catch (AMQException ex)
+ catch (Exception ex)
{
JMException jme = new JMException(ex.getMessage());
jme.initCause(ex);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 918b5fc176..9711bbf4d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -47,11 +47,9 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.Subscription;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.LocalTransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.*;
public class AMQChannel
{
@@ -93,6 +91,8 @@ public class AMQChannel
private final MessageStore _messageStore;
+ private final TransactionManager _transactionManager;
+
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -116,7 +116,8 @@ public class AMQChannel
//Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
+
+ public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
_session = session;
@@ -125,6 +126,7 @@ public class AMQChannel
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
+ _transactionManager = transactionManager;
_exchanges = exchanges;
// by default the session is non-transactional
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
@@ -133,7 +135,7 @@ public class AMQChannel
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
+ _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
}
public boolean isTransactional()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 8573902af4..e337b26b33 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -36,8 +36,10 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
public class VirtualHostConfiguration
{
@@ -48,7 +50,9 @@ public class VirtualHostConfiguration
private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
- public VirtualHostConfiguration(String configFile) throws ConfigurationException
+ public VirtualHostConfiguration(String configFile)
+ throws
+ ConfigurationException
{
_logger.info("Loading Config file:" + configFile);
@@ -57,24 +61,25 @@ public class VirtualHostConfiguration
}
-
- private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
+ private void configureVirtualHost(String virtualHostName, Configuration configuration)
+ throws
+ ConfigurationException,
+ AMQException
{
- _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
+ _logger.debug("Loding configuration for virtaulhost: " + virtualHostName);
VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
-
- if(virtualHost == null)
+ if (virtualHost == null)
{
throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
}
List exchangeNames = configuration.getList("exchanges.exchange.name");
- for(Object exchangeNameObj : exchangeNames)
+ for (Object exchangeNameObj : exchangeNames)
{
String exchangeName = String.valueOf(exchangeNameObj);
configureExchange(virtualHost, exchangeName, configuration);
@@ -83,7 +88,7 @@ public class VirtualHostConfiguration
List queueNames = configuration.getList("queues.queue.name");
- for(Object queueNameObj : queueNames)
+ for (Object queueNameObj : queueNames)
{
String queueName = String.valueOf(queueNameObj);
configureQueue(virtualHost, queueName, configuration);
@@ -91,12 +96,14 @@ public class VirtualHostConfiguration
}
- private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException
+ private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration)
+ throws
+ AMQException
{
CompositeConfiguration exchangeConfiguration = new CompositeConfiguration();
- exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString));
+ exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange." + exchangeNameString));
exchangeConfiguration.addConfiguration(configuration.subset("exchanges"));
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -110,18 +117,17 @@ public class VirtualHostConfiguration
Exchange exchange;
-
synchronized (exchangeRegistry)
{
exchange = exchangeRegistry.getExchange(exchangeName);
- if(exchange == null)
+ if (exchange == null)
{
- AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct"));
- boolean durable = exchangeConfiguration.getBoolean("durable",false);
- boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false);
+ AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type", "direct"));
+ boolean durable = exchangeConfiguration.getBoolean("durable", false);
+ boolean autodelete = exchangeConfiguration.getBoolean("autodelete", false);
- Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
+ Exchange newExchange = exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
exchangeRegistry.registerExchange(newExchange);
}
@@ -149,11 +155,14 @@ public class VirtualHostConfiguration
return queueConfiguration;
}
- private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException
+ private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration)
+ throws
+ AMQException,
+ ConfigurationException
{
CompositeConfiguration queueConfiguration = new CompositeConfiguration();
- queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString));
+ queueConfiguration.addConfiguration(configuration.subset("queues.queue." + queueNameString));
queueConfiguration.addConfiguration(configuration.subset("queues"));
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -173,7 +182,7 @@ public class VirtualHostConfiguration
{
_logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName());
- boolean durable = queueConfiguration.getBoolean("durable" ,false);
+ boolean durable = queueConfiguration.getBoolean("durable", false);
boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
String owner = queueConfiguration.getString("owner", null);
@@ -184,21 +193,32 @@ public class VirtualHostConfiguration
if (queue.isDurable())
{
- messageStore.createQueue(queue);
+ try
+ {
+ messageStore.createQueue(queue);
+ } catch (InternalErrorException e)
+ {
+ _logger.error("Problem when creating Queue '" + queueNameString
+ + "' on virtual host " + virtualHost.getName() + ", not creating.");
+
+ } catch (QueueAlreadyExistsException e)
+ {
+ _logger.error("Queue '" + queueNameString
+ + "' already exists on virtual host " + virtualHost.getName() + ", not creating.");
+ }
}
queueRegistry.registerQueue(queue);
- }
- else
+ } else
{
- _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating.");
+ _logger.info("Queue '" + queueNameString + "' already exists on virtual host " + virtualHost.getName() + ", not creating.");
}
String exchangeName = queueConfiguration.getString("exchange", null);
Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
- if(exchange == null)
+ if (exchange == null)
{
exchange = virtualHost.getExchangeRegistry().getDefaultExchange();
}
@@ -211,15 +231,15 @@ public class VirtualHostConfiguration
synchronized (exchange)
{
List routingKeys = queueConfiguration.getList("routingKey");
- if(routingKeys == null || routingKeys.isEmpty())
+ if (routingKeys == null || routingKeys.isEmpty())
{
routingKeys = Collections.singletonList(queue.getName());
}
- for(Object routingKeyNameObj : routingKeys)
+ for (Object routingKeyNameObj : routingKeys)
{
AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
-
+
queue.bind(routingKey, null, exchange);
@@ -227,31 +247,33 @@ public class VirtualHostConfiguration
_logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
}
- if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
+ if (exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
{
- queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());
+ queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());
}
}
}
-
Configurator.configure(queue, queueConfiguration);
}
- public void performBindings() throws AMQException, ConfigurationException
+ public void performBindings()
+ throws
+ AMQException,
+ ConfigurationException
{
List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
String defaultVirtualHostName = _config.getString("default");
- if(defaultVirtualHostName != null)
+ if (defaultVirtualHostName != null)
{
- ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName);
+ ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName);
}
_logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
- for(Object nameObject : virtualHostNames)
+ for (Object nameObject : virtualHostNames)
{
String name = String.valueOf(nameObject);
configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
@@ -265,5 +287,4 @@ public class VirtualHostConfiguration
}
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java
new file mode 100644
index 0000000000..ae1b916144
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 15:52:29
+ */
+public class CommandInvalidException extends Exception
+{
+ /**
+ * Constructs a new CommandInvalidException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public CommandInvalidException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new CommandInvalidException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public CommandInvalidException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new CommandInvalidException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public CommandInvalidException(Throwable cause)
+ {
+ super(cause);
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java
new file mode 100644
index 0000000000..f5fcfeee8f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java
@@ -0,0 +1,57 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:41:53
+ */
+public class InternalErrorException extends Exception
+{
+ /**
+ * Constructs a new InternalErrorException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public InternalErrorException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new InternalErrorException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public InternalErrorException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new InternalErrorException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public InternalErrorException(Throwable cause) {
+ super(cause);
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java
new file mode 100644
index 0000000000..3cae098403
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:12:27
+ */
+public class InvalidXidException extends Exception
+{
+ /**
+ * Constructs a newr InvalidXidException with a standard message
+ *
+ * @param xid The invalid xid.
+ */
+ public InvalidXidException(Xid xid)
+ {
+ super("The Xid: " + xid + " is invalid");
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a cause
+ *
+ * @param xid The invalid xid.
+ * @param cause The casue for the xid to be invalid
+ */
+ public InvalidXidException(Xid xid, Throwable cause)
+ {
+ super("The Xid: " + xid + " is invalid", cause);
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a reason message
+ *
+ * @param reason The reason why the xid is invalid
+ * @param xid The invalid xid.
+ */
+ public InvalidXidException(Xid xid, String reason)
+ {
+ super("The Xid: " + xid + " is invalid, The reason is: " + reason);
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a reason message and cause
+ *
+ * @param reason The reason why the xid is invalid
+ * @param xid The invalid xid.
+ * @param cause The casue for the xid to be invalid
+ */
+ public InvalidXidException(Xid xid, String reason, Throwable cause)
+ {
+ super("The Xid: " + xid + " is invalid, The reason is: " + reason, cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java
new file mode 100644
index 0000000000..f95132a450
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 09:46:31
+ */
+public class MessageAlreadyStagedException extends Exception
+{
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public MessageAlreadyStagedException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public MessageAlreadyStagedException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public MessageAlreadyStagedException(Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java
new file mode 100644
index 0000000000..b8ffe91247
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 30-Mar-2007
+ * Time: 10:52:29
+ */
+public class MessageDoesntExistException extends Exception
+{
+ /**
+ * Constructs a new MessageDoesntExistException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public MessageDoesntExistException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new MessageDoesntExistException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public MessageDoesntExistException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new MessageDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public MessageDoesntExistException(Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java
new file mode 100644
index 0000000000..af8c7374bf
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 16:47:40
+ */
+public class NotPreparedException extends Exception
+{
+ /**
+ * Constructs a new NotPreparedException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public NotPreparedException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new NotPreparedException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public NotPreparedException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new NotPreparedException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public NotPreparedException(Throwable cause)
+ {
+ super(cause);
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java
new file mode 100644
index 0000000000..39751261e4
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 30-Mar-2007
+ * Time: 10:49:00
+ */
+public class QueueAlreadyExistsException extends Exception
+{
+ /**
+ * Constructs a new QueueAlreadyExistsException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public QueueAlreadyExistsException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new QueueAlreadyExistsException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public QueueAlreadyExistsException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public QueueAlreadyExistsException(Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java
new file mode 100644
index 0000000000..88dea864a5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:38:24
+ */
+public class QueueDoesntExistException extends Exception
+{
+ /**
+ * Constructs a new QueueDoesntExistException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public QueueDoesntExistException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public QueueDoesntExistException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public QueueDoesntExistException(Throwable cause)
+ {
+ super(cause);
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java
new file mode 100644
index 0000000000..9c1a28413f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 15:45:06
+ */
+public class UnknownXidException extends Exception
+{
+ /**
+ * Constructs a newr UnknownXidException with a standard message
+ *
+ * @param xid The unknown xid.
+ */
+ public UnknownXidException(Xid xid)
+ {
+ super("The Xid: " + xid + " is unknown");
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a cause
+ *
+ * @param xid The unknown xid.
+ * @param cause The casue for the xid to be unknown
+ */
+ public UnknownXidException(Xid xid, Throwable cause)
+ {
+ super("The Xid: " + xid + " is unknown", cause);
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a reason message
+ *
+ * @param reason The reason why the xid is unknown
+ * @param xid The unknown xid.
+ */
+ public UnknownXidException(Xid xid, String reason)
+ {
+ super("The Xid: " + xid + " is unknown, The reason is: " + reason);
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a reason message and cause
+ *
+ * @param reason The reason why the xid is unknown
+ * @param xid The unknown xid.
+ * @param cause The casue for the xid to be unknown
+ */
+ public UnknownXidException(Xid xid, String reason, Throwable cause)
+ {
+ super("The Xid: " + xid + " is unknown, The reason is: " + reason, cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 4774383642..9066af70d9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -29,7 +29,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.exception.InternalErrorException;
public class DefaultExchangeRegistry implements ExchangeRegistry
{
@@ -65,7 +66,13 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
_exchangeMap.put(exchange.getName(), exchange);
if(exchange.isDurable())
{
- getMessageStore().createExchange(exchange);
+ try
+ {
+ getMessageStore().createExchange(exchange);
+ } catch (InternalErrorException e)
+ {
+ throw new AMQException("problem registering excahgne " + exchange, e);
+ }
}
}
@@ -87,7 +94,13 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
{
if(e.isDurable())
{
- getMessageStore().removeExchange(e);
+ try
+ {
+ getMessageStore().removeExchange(e);
+ } catch (InternalErrorException e1)
+ {
+ throw new AMQException("Problem unregistering Exchange " + name, e1);
+ }
}
e.close();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 03fc7a3926..e4abae4f28 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -49,7 +49,8 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
+ final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getTransactionManager(),
+ virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
session.addChannel(channel);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 2e697d4564..ec9041c309 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -42,9 +42,11 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
import org.apache.commons.configuration.Configuration;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -103,7 +105,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queue = createQueue(body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- store.createQueue(queue);
+ try
+ {
+ store.createQueue(queue);
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem when creating queue " + queue, e);
+ }
}
queueRegistry.registerQueue(queue);
if (autoRegister)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index eb7089afdc..cfea4637ab 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -30,9 +30,11 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueDoesntExistException;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -107,7 +109,13 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
if (queue.isDurable())
{
- store.removeQueue(queue.getName());
+ try
+ {
+ store.destroyQueue(queue);
+ } catch (Exception e)
+ {
+ throw new AMQException("problem when destroying queue " + queue, e);
+ }
}
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
new file mode 100644
index 0000000000..581bca2efe
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
@@ -0,0 +1,169 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 26-Apr-2007
+ * Time: 08:23:45
+ */
+public class MemoryMessageStore implements MessageStore
+{
+
+ public void removeExchange(Exchange exchange)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void close()
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ return new byte[0]; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getNewMessageId()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
new file mode 100644
index 0000000000..c4b1d3182f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:34:02
+ */
+public interface MessageStore
+{
+ /**
+ * Create a new exchange
+ *
+ * @param exchange the exchange to be persisted
+ * @throws InternalErrorException If an error occurs
+ */
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException;
+
+ /**
+ * Remove an exchange
+ * @param exchange The exchange to be removed
+ * @throws InternalErrorException If an error occurs
+ */
+ public void removeExchange(Exchange exchange) throws
+ InternalErrorException;
+
+ /**
+ * Bind a queue with an exchange given a routing key
+ *
+ * @param exchange The exchange to bind the queue with
+ * @param routingKey The routing key
+ * @param queue The queue to be bound
+ * @param args Args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void bindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Unbind a queue from an exchange
+ *
+ * @param exchange The exchange the queue was bound to
+ * @param routingKey The routing queue
+ * @param queue The queue to unbind
+ * @param args args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void unbindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param virtualHost The virtual host using by this store
+ * @param tm The transaction manager implementation
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @throws InternalErrorException If an error occurs that means the store is unable to configure itself
+ * @throws IllegalArgumentException If the configuration arguments are illegal
+ */
+ void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException;
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws InternalErrorException if close fails
+ */
+ void close()
+ throws
+ InternalErrorException;
+
+ /**
+ * Create a queue
+ *
+ * @param queue the queue to be created
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueAlreadyExistsException If the queue already exists in the store
+ */
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException;
+
+ /**
+ * Destroy a queue
+ *
+ * @param queue The queue to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ */
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException;
+
+ /**
+ * Stage the message before effective enqueue
+ *
+ * @param m The message to stage
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageAlreadyStagedException If the message is already staged
+ */
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException;
+
+
+ /**
+ * Append more data with a previously staged message
+ *
+ * @param m The message to which data must be appended
+ * @param data Data to happen to the message
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be written
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message has not been staged
+ */
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the content of previously staged or enqueued message.
+ * The message headers are also set.
+ *
+ * @param m The message for which the content must be loaded
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be loaded
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Destroy a previously staged message
+ *
+ * @param m the message to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist in the store
+ */
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Enqueue a message under the scope of the transaction branch
+ * identified by xid when specified.
+ * <p> This operation is propagated to the queue and the message.
+ * <p> A message that has been previously staged is assumed to have had
+ * its payload already added (see appendContent)
+ *
+ * @param xid The xid of the transaction branch under which the message must be enqueued.
+ * <p> It he xid is null then the message is enqueued outside the scope of any transaction.
+ * @param m The message to be enqueued
+ * @param queue The queue into which the message must be enqueued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ * @throws MessageDoesntExistException If the Message does not exist
+ */
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException;
+
+ /**
+ * Dequeue a message under the scope of the transaction branch identified by xid
+ * if specified.
+ * <p> This operation is propagated to the queue and the message.
+ *
+ * @param xid The xid of the transaction branch under which the message must be dequeued.
+ * <p> It he xid is null then the message is dequeued outside the scope of any transaction.
+ * @param m The message to be dequeued
+ * @param queue The queue from which the message must be dequeued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ */
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException;
+
+ //=========================================================
+ // Recovery specific methods
+ //=========================================================
+
+ /**
+ * List all the persistent queues
+ *
+ * @return All the persistent queues
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException;
+
+ /**
+ * All enqueued messages of a given queue
+ *
+ * @param queue The queue where the message are retrieved from
+ * @return The list all enqueued messages of a given queue
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException;
+
+ /**
+ * Get a new message ID
+ *
+ * @return A new message ID
+ */
+ public long getNewMessageId();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java
new file mode 100644
index 0000000000..b228bb3027
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+/**
+ * A storable message can be persisted in the message store.
+ *
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 08:56:48
+ */
+public interface StorableMessage
+{
+ /**
+ * The message ID is used by the store to identify a message.
+ *
+ * @return The message identifier
+ */
+ public long getMessageId();
+
+ /**
+ * Get the message header body that is saved when the message is staged.
+ *
+ * @return The message header body
+ */
+ public byte[] getHeaderBody();
+
+ /**
+ * Get the message header body size in bytes.
+ *
+ * @return The message header body size
+ */
+ public int getHeaderSize();
+
+ /**
+ * Get the message payload. This is required when the message is
+ * enqueued without any prior staging.
+ * <p> When the message is staged, the payload can be partial or even empty.
+ *
+ * @return The message payload
+ */
+ public byte[] getData();
+
+ /**
+ * Get the message payload size in bytes.
+ *
+ * @return The message payload size in bytes
+ */
+ public int getPayloadSize();
+
+ /**
+ * Specify whether this message has been enqueued
+ *
+ * @return true if this message is enqueued, false otherwise
+ */
+ public boolean isEnqueued();
+
+ /**
+ * This is called by the message store when this message is enqueued in the message store.
+ *
+ * @param queue The storable queue into which the message is enqueued
+ */
+ public void enqueue(StorableQueue queue);
+
+ /**
+ * This is called by the message store when this message is dequeued.
+ *
+ * @param queue The storable queue out of which the message is dequeued
+ */
+ public void dequeue(StorableQueue queue);
+
+ /**
+ * A message can be enqueued in several queues.
+ * The queue position represents the index of the provided queue within the ordered
+ * list of queues the message has been enqueued.
+ * <p>For example:
+ * <p> If the message is successively enqueued in queue Q1, Q2 and Q3 then
+ * the position of Q1 is 0, position of Q2 is 1 and position of Q3 is 2.
+ * <p> If the message is dequeud form Q2 then position of Q1 is stil 0 but position
+ * of Q3 becomes 1.
+ *
+ * @param queue The storable queue for which the position should be determined
+ *
+ * @return The position of the specified storable queue
+ */
+ public int getQueuePosition(StorableQueue queue);
+
+ /**
+ * Indicates whether this message has been staged.
+ *
+ * @return True if the message has been staged, false otherwise
+ */
+ public boolean isStaged();
+
+ /**
+ * Call by the message store when this message is staged.
+ */
+ public void staged();
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java
new file mode 100644
index 0000000000..10a9a3b8b8
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * A storable queue can store storable messages and can be persisted in the store.
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 08:52:18
+ */
+public interface StorableQueue
+{
+ /**
+ * Get This queue unique id.
+ *
+ * @return The queue ID
+ */
+ public int getQueueID();
+
+ /**
+ * Set the queue ID.
+ *
+ * @param id This queue ID
+ */
+ public void setQueueID(int id);
+
+ /**
+ * Get this queue owner.
+ *
+ * @return This queue owner
+ */
+ public AMQShortString getOwner();
+
+ /**
+ * Get this queue name.
+ *
+ * @return the name of this queue
+ */
+ public AMQShortString getName();
+
+ /**
+ * Signifies to this queue that a message is dequeued.
+ * This operation is called by the store.
+ *
+ * @param m The dequeued message
+ */
+ public void dequeue(StorableMessage m);
+
+ /**
+ * Signifies to this queue that a message is enqueued.
+ * This operation is called by the store.
+ *
+ * @param m The enqueued message
+ */
+ public void enqueue(StorableMessage m);
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 955aaa6acb..eefff090df 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -25,38 +25,45 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
/** Combines the information that make up a deliverable message into a more manageable form. */
import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-/** Combines the information that make up a deliverable message into a more manageable form. */
-public class AMQMessage
+/**
+ * Combines the information that make up a deliverable message into a more manageable form.
+ */
+public class AMQMessage implements StorableMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /** Used in clustering */
+ // The ordered list of queues into which this message is enqueued.
+ private List<StorableQueue> _queues = new LinkedList<StorableQueue>();
+ // Indicates whether this message is staged
+ private boolean _isStaged = false;
+
+ /**
+ * Used in clustering
+ */
private Set<Object> _tokens;
- /** Only use in clustering - should ideally be removed? */
+ /**
+ * Only use in clustering - should ideally be removed?
+ */
private AMQProtocolSession _publisher;
private final Long _messageId;
@@ -221,13 +228,14 @@ public class AMQMessage
* @param messageId
* @param store
* @param factory
- *
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
+ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
+ throws
+ AMQException
{
_messageId = messageId;
- _messageHandle = factory.createMessageHandle(messageId, store, true);
+ _messageHandle = factory.createMessageHandle(store, this, true);
_txnContext = txnConext;
_transientMessageData = null;
}
@@ -241,7 +249,9 @@ public class AMQMessage
* @param contentHeader
*/
public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
+ TransactionalContext txnContext, ContentHeaderBody contentHeader)
+ throws
+ AMQException
{
this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
@@ -256,14 +266,15 @@ public class AMQMessage
* @param contentHeader
* @param destinationQueues
* @param contentBodies
- *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext,
ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
- MessageHandleFactory messageHandleFactory) throws AMQException
+ MessageHandleFactory messageHandleFactory)
+ throws
+ AMQException
{
this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
@@ -274,7 +285,9 @@ public class AMQMessage
}
}
- protected AMQMessage(AMQMessage msg) throws AMQException
+ protected AMQMessage(AMQMessage msg)
+ throws
+ AMQException
{
_messageId = msg._messageId;
_messageHandle = msg._messageHandle;
@@ -283,6 +296,98 @@ public class AMQMessage
_transientMessageData = msg._transientMessageData;
}
+ //========================================================================
+ // Interface StorableMessage
+ //========================================================================
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+
+ public byte[] getHeaderBody()
+ {
+ byte[] result = null;
+ ContentHeaderBody headerBody;
+ ByteBuffer bufferedResult;
+ try
+ {
+ headerBody = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId);
+ result = new byte[headerBody.getSize()];
+ bufferedResult = ByteBuffer.wrap(result);
+ headerBody.writePayload(bufferedResult);
+ } catch (AMQException e)
+ {
+ _log.error("Error when getting message header", e);
+ }
+ return result;
+ }
+
+ public int getHeaderSize()
+ {
+ int result = 0;
+ try
+ {
+ result = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId).getSize();
+ } catch (AMQException e)
+ {
+ _log.error("Error when getting message header size", e);
+ }
+ return result;
+ }
+
+ public byte[] getData()
+ {
+ return _messageHandle.getMessagePayload();
+ }
+
+ public int getPayloadSize()
+ {
+ return _messageHandle.getMessagePayload().length;
+ }
+
+ public boolean isEnqueued()
+ {
+ return _queues.size() > 0;
+ }
+
+ public void enqueue(StorableQueue queue)
+ {
+ _queues.add(queue);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("enqueued");
+ }
+ }
+
+ public void dequeue(StorableQueue queue)
+ {
+ _queues.remove(queue);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("dequeued");
+ }
+ }
+
+ public int getQueuePosition(StorableQueue queue)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("The queue position is " + _queues.indexOf(queue));
+ }
+ return _queues.indexOf(queue);
+ }
+
+ public boolean isStaged()
+ {
+ return _isStaged;
+ }
+
+ public void staged()
+ {
+ _isStaged = true;
+ }
+
public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
return new BodyFrameIterator(protocolSession, channel);
@@ -293,32 +398,36 @@ public class AMQMessage
return new BodyContentIterator();
}
- public ContentHeaderBody getContentHeaderBody() throws AMQException
+ public ContentHeaderBody getContentHeaderBody()
+ throws
+ AMQException
{
if (_transientMessageData != null)
{
return _transientMessageData.getContentHeaderBody();
- }
- else
+ } else
{
return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
}
}
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- throws AMQException
+ throws
+ AMQException
{
_transientMessageData.setContentHeaderBody(contentHeaderBody);
}
- public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException
+ public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
+ throws
+ AMQException
{
final boolean persistent = isPersistent();
- _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
- if (persistent)
- {
+ _messageHandle = factory.createMessageHandle(store, this, persistent);
+ //if (persistent)
+ // {
_txnContext.beginTranIfNecessary();
- }
+ // }
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
@@ -334,7 +443,9 @@ public class AMQMessage
}
}
- public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
+ public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk)
+ throws
+ AMQException
{
_transientMessageData.addBodyLength(contentChunk.getSize());
final boolean allContentReceived = isAllContentReceived();
@@ -343,22 +454,19 @@ public class AMQMessage
{
deliver(storeContext);
return true;
- }
- else
+ } else
{
return false;
}
}
- public boolean isAllContentReceived() throws AMQException
+ public boolean isAllContentReceived()
+ throws
+ AMQException
{
return _transientMessageData.isAllContentReceived();
}
- public long getMessageId()
- {
- return _messageId;
- }
/**
* Creates a long-lived reference to this message, and increments the count of such references, as an atomic
@@ -366,11 +474,13 @@ public class AMQMessage
*/
public AMQMessage takeReference()
{
- incrementReference();// _referenceCount.incrementAndGet();
+ _referenceCount.incrementAndGet();
return this;
}
- /** Threadsafe. Increment the reference count on the message. */
+ /**
+ * Threadsafe. Increment the reference count on the message.
+ */
protected void incrementReference()
{
_referenceCount.incrementAndGet();
@@ -385,11 +495,12 @@ public class AMQMessage
* message store.
*
* @param storeContext
- *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
+ public void decrementReference(StoreContext storeContext)
+ throws
+ MessageCleanupException
{
int count = _referenceCount.decrementAndGet();
@@ -419,8 +530,7 @@ public class AMQMessage
incrementReference();
throw new MessageCleanupException(_messageId, e);
}
- }
- else
+ } else
{
if (_log.isDebugEnabled())
{
@@ -497,8 +607,7 @@ public class AMQMessage
if (taken.getAndSet(true))
{
return true;
- }
- else
+ } else
{
_takenMap.put(queue, taken);
_takenBySubcriptionMap.put(queue, sub);
@@ -524,8 +633,7 @@ public class AMQMessage
if (taken == null)
{
taken = new AtomicBoolean(false);
- }
- else
+ } else
{
taken.set(false);
}
@@ -546,8 +654,7 @@ public class AMQMessage
if (_tokens.contains(token))
{
return true;
- }
- else
+ } else
{
_tokens.add(token);
return false;
@@ -560,26 +667,30 @@ public class AMQMessage
* AMQMessageHandle implementation can be picked based on various criteria.
*
* @param queue the queue
- *
* @throws org.apache.qpid.AMQException if there is an error enqueuing the message
*/
- public void enqueue(AMQQueue queue) throws AMQException
+ public void enqueue(AMQQueue queue)
+ throws
+ AMQException
{
_transientMessageData.addDestinationQueue(queue);
}
- public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
+ public void dequeue(StoreContext storeContext, AMQQueue queue)
+ throws
+ AMQException
{
_messageHandle.dequeue(storeContext, _messageId, queue);
}
- public boolean isPersistent() throws AMQException
+ public boolean isPersistent()
+ throws
+ AMQException
{
if (_transientMessageData != null)
{
return _transientMessageData.isPersistent();
- }
- else
+ } else
{
return _messageHandle.isPersistent(getStoreContext(), _messageId);
}
@@ -591,7 +702,9 @@ public class AMQMessage
* @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException
+ public void checkDeliveredToConsumer()
+ throws
+ NoConsumersException
{
if (_immediate && !_deliveredToConsumer)
@@ -600,14 +713,15 @@ public class AMQMessage
}
}
- public MessagePublishInfo getMessagePublishInfo() throws AMQException
+ public MessagePublishInfo getMessagePublishInfo()
+ throws
+ AMQException
{
MessagePublishInfo pb;
if (_transientMessageData != null)
{
pb = _transientMessageData.getMessagePublishInfo();
- }
- else
+ } else
{
pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
}
@@ -630,13 +744,17 @@ public class AMQMessage
}
- /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
}
- private void deliver(StoreContext storeContext) throws AMQException
+ private void deliver(StoreContext storeContext)
+ throws
+ AMQException
{
// we get a reference to the destination queues now so that we can clear the
// transient message data as quickly as possible
@@ -650,7 +768,7 @@ public class AMQMessage
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
_messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
- _transientMessageData.getContentHeaderBody());
+ _transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -867,7 +985,9 @@ public class AMQMessage
}
- public void restoreTransientMessageData() throws AMQException
+ public void restoreTransientMessageData()
+ throws
+ AMQException
{
TransientMessageData transientMessageData = new TransientMessageData();
transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
@@ -889,7 +1009,7 @@ public class AMQMessage
// _taken + " by :" + _takenBySubcription;
return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
- _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
@@ -911,8 +1031,7 @@ public class AMQMessage
}
_rejectedBy.add(subscription);
- }
- else
+ } else
{
_log.warn("Requesting rejection by null subscriber:" + debugIdentity());
}
@@ -925,8 +1044,7 @@ public class AMQMessage
if (rejected) // We have subscriptions that rejected this message
{
return _rejectedBy.contains(subscription);
- }
- else // This messasge hasn't been rejected yet.
+ } else // This messasge hasn't been rejected yet.
{
return rejected;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index ede55b3bbf..296e61bfa9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -76,4 +76,7 @@ public interface AMQMessageHandle
void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
long getArrivalTime();
+
+ // added by Arnaud
+ byte[] getMessagePayload();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 0adf6153f8..fa3b34a634 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import java.text.MessageFormat;
import java.util.List;
+import java.util.Hashtable;
+import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,6 +38,9 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -47,9 +52,10 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
* fully in RFC 006.
*/
-public class AMQQueue implements Managable, Comparable
+public class AMQQueue implements Managable, Comparable, StorableQueue
{
+ public static int s_queueID =0;
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -76,15 +82,27 @@ public class AMQQueue implements Managable, Comparable
private final AMQShortString _name;
- /** null means shared */
+ // The queueu ID
+ int _queueId;
+ // The list of enqueued messages.
+ Hashtable<Long, StorableMessage> _messages = new Hashtable<Long, StorableMessage>();
+
+
+ /**
+ * null means shared
+ */
private final AMQShortString _owner;
private final boolean _durable;
- /** If true, this queue is deleted when the last subscriber is removed */
+ /**
+ * If true, this queue is deleted when the last subscriber is removed
+ */
private final boolean _autoDelete;
- /** Holds subscribers to the queue. */
+ /**
+ * Holds subscribers to the queue.
+ */
private final SubscriptionSet _subscribers;
private final SubscriptionFactory _subscriptionFactory;
@@ -97,13 +115,19 @@ public class AMQQueue implements Managable, Comparable
private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
- /** Manages message delivery. */
+ /**
+ * Manages message delivery.
+ */
private final DeliveryManager _deliveryMgr;
- /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
+ /**
+ * Used to track bindings to exchanges so that on deletion they can easily be cancelled.
+ */
private final ExchangeBindings _bindings = new ExchangeBindings(this);
- /** Executor on which asynchronous delivery will be carriedout where required */
+ /**
+ * Executor on which asynchronous delivery will be carriedout where required
+ */
private final Executor _asyncDelivery;
private final AMQQueueMBean _managedObject;
@@ -111,27 +135,39 @@ public class AMQQueue implements Managable, Comparable
private final VirtualHost _virtualHost;
- /** max allowed size(KB) of a single message */
+ /**
+ * max allowed size(KB) of a single message
+ */
@Configured(path = "maximumMessageSize", defaultValue = "0")
public long _maximumMessageSize;
- /** max allowed number of messages on a queue. */
+ /**
+ * max allowed number of messages on a queue.
+ */
@Configured(path = "maximumMessageCount", defaultValue = "0")
public long _maximumMessageCount;
- /** max queue depth for the queue */
+ /**
+ * max queue depth for the queue
+ */
@Configured(path = "maximumQueueDepth", defaultValue = "0")
public long _maximumQueueDepth;
- /** maximum message age before alerts occur */
+ /**
+ * maximum message age before alerts occur
+ */
@Configured(path = "maximumMessageAge", defaultValue = "0")
public long _maximumMessageAge;
- /** the minimum interval between sending out consequetive alerts of the same type */
+ /**
+ * the minimum interval between sending out consequetive alerts of the same type
+ */
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap;
- /** total messages received by the queue since startup. */
+ /**
+ * total messages received by the queue since startup.
+ */
public AtomicLong _totalMessagesReceived = new AtomicLong();
public int compareTo(Object o)
@@ -141,26 +177,29 @@ public class AMQQueue implements Managable, Comparable
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
+ throws
+ AMQException
{
this(name, durable, owner, autoDelete, virtualHost,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory());
+ AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, VirtualHost virtualHost,
SubscriptionSet subscribers)
- throws AMQException
+ throws
+ AMQException
{
this(name, durable, owner, autoDelete, virtualHost,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory());
+ AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, VirtualHost virtualHost,
Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
- throws AMQException
+ throws
+ AMQException
{
if (name == null)
{
@@ -183,9 +222,12 @@ public class AMQQueue implements Managable, Comparable
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ _queueId = s_queueID++;
}
- private AMQQueueMBean createMBean() throws AMQException
+ private AMQQueueMBean createMBean()
+ throws
+ AMQException
{
try
{
@@ -222,13 +264,17 @@ public class AMQQueue implements Managable, Comparable
return _autoDelete;
}
- /** @return no of messages(undelivered) on the queue. */
+ /**
+ * @return no of messages(undelivered) on the queue.
+ */
public int getMessageCount()
{
return _deliveryMgr.getQueueMessageCount();
}
- /** @return List of messages(undelivered) on the queue. */
+ /**
+ * @return List of messages(undelivered) on the queue.
+ */
public List<AMQMessage> getMessagesOnTheQueue()
{
return _deliveryMgr.getMessages();
@@ -239,7 +285,6 @@ public class AMQQueue implements Managable, Comparable
*
* @param fromMessageId
* @param toMessageId
- *
* @return List of messages
*/
public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
@@ -254,7 +299,6 @@ public class AMQQueue implements Managable, Comparable
/**
* @param messageId
- *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -321,7 +365,9 @@ public class AMQQueue implements Managable, Comparable
_deliveryMgr.processAsync(_asyncDelivery);
}
- /** @return MBean object associated with this Queue */
+ /**
+ * @return MBean object associated with this Queue
+ */
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -379,34 +425,58 @@ public class AMQQueue implements Managable, Comparable
}
- /** Removes the AMQMessage from the top of the queue. */
- public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+ /**
+ * Removes the AMQMessage from the top of the queue.
+ */
+ public synchronized void deleteMessageFromTop(StoreContext storeContext)
+ throws
+ AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext);
}
- /** removes all the messages from the queue. */
- public synchronized long clearQueue(StoreContext storeContext) throws AMQException
+ /**
+ * removes all the messages from the queue.
+ */
+ public synchronized long clearQueue(StoreContext storeContext)
+ throws
+ AMQException
{
return _deliveryMgr.clearAllMessages(storeContext);
}
- public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
+ public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
+ throws
+ AMQException
{
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+ try
+ {
+ _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+ } catch (InternalErrorException e)
+ {
+ throw new AMQException("Problem binding queue ", e);
+ }
}
_bindings.addBinding(routingKey, arguments, exchange);
}
- public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
+ public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
+ throws
+ AMQException
{
exchange.deregisterQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+ try
+ {
+ _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+ } catch (InternalErrorException e)
+ {
+ throw new AMQException("problem unbinding queue", e);
+ }
}
_bindings.remove(routingKey, arguments, exchange);
}
@@ -414,7 +484,8 @@ public class AMQQueue implements Managable, Comparable
public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
FieldTable filters, boolean noLocal, boolean exclusive)
- throws AMQException
+ throws
+ AMQException
{
if (incrementSubscriberCount() > 1)
{
@@ -422,15 +493,13 @@ public class AMQQueue implements Managable, Comparable
{
decrementSubscriberCount();
throw EXISTING_EXCLUSIVE;
- }
- else if (exclusive)
+ } else if (exclusive)
{
decrementSubscriberCount();
throw EXISTING_SUBSCRIPTION;
}
- }
- else if (exclusive)
+ } else if (exclusive)
{
setExclusive(true);
}
@@ -438,11 +507,11 @@ public class AMQQueue implements Managable, Comparable
if (_logger.isDebugEnabled())
{
_logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " +
- "consumer tag {2} with {3}", ps, channel, consumerTag, this));
+ "consumer tag {2} with {3}", ps, channel, consumerTag, this));
}
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
- filters, noLocal, this);
+ filters, noLocal, this);
if (subscription.filtersMessages())
{
@@ -477,22 +546,24 @@ public class AMQQueue implements Managable, Comparable
}
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
+ public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag)
+ throws
+ AMQException
{
if (_logger.isDebugEnabled())
{
_logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
- this));
+ this));
}
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
- ps,
- consumerTag)))
- == null)
+ ps,
+ consumerTag)))
+ == null)
{
throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +
- " and protocol session key " + ps.getKey() + " not registered with queue " + this);
+ " and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
removedSubscription.close();
@@ -524,26 +595,28 @@ public class AMQQueue implements Managable, Comparable
}
- public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
+ public int delete(boolean checkUnused, boolean checkEmpty)
+ throws
+ AMQException
{
if (checkUnused && !_subscribers.isEmpty())
{
_logger.info("Will not delete " + this + " as it is in use.");
return 0;
- }
- else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
+ } else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
{
_logger.info("Will not delete " + this + " as it is not empty.");
return 0;
- }
- else
+ } else
{
delete();
return _deliveryMgr.getQueueMessageCount();
}
}
- public void delete() throws AMQException
+ public void delete()
+ throws
+ AMQException
{
if (!_deleted.getAndSet(true))
{
@@ -559,7 +632,9 @@ public class AMQQueue implements Managable, Comparable
}
}
- protected void autodelete() throws AMQException
+ protected void autodelete()
+ throws
+ AMQException
{
if (_logger.isDebugEnabled())
{
@@ -568,7 +643,9 @@ public class AMQQueue implements Managable, Comparable
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst)
+ throws
+ AMQException
{
//fixme not sure what this is doing. should we be passing deliverFirst through here?
// This code is not used so when it is perhaps it should
@@ -591,7 +668,9 @@ public class AMQQueue implements Managable, Comparable
// return _deliveryMgr;
// }
- public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst)
+ throws
+ AMQException
{
_deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
@@ -607,7 +686,9 @@ public class AMQQueue implements Managable, Comparable
}
}
- void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+ void dequeue(StoreContext storeContext, AMQMessage msg)
+ throws
+ FailedDequeueException
{
try
{
@@ -637,7 +718,9 @@ public class AMQQueue implements Managable, Comparable
return _subscribers;
}
- protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
+ protected void updateReceivedMessageCount(AMQMessage msg)
+ throws
+ AMQException
{
if (!msg.isRedelivered())
{
@@ -680,7 +763,9 @@ public class AMQQueue implements Managable, Comparable
return "Queue(" + _name + ")@" + System.identityHashCode(this);
}
- public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
+ public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks)
+ throws
+ AMQException
{
return _deliveryMgr.performGet(session, channel, acks);
}
@@ -697,7 +782,9 @@ public class AMQQueue implements Managable, Comparable
public static interface Task
{
- public void doTask(AMQQueue queue) throws AMQException;
+ public void doTask(AMQQueue queue)
+ throws
+ AMQException;
}
public void addQueueDeleteTask(Task task)
@@ -729,4 +816,53 @@ public class AMQQueue implements Managable, Comparable
{
_deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
}
+
+ //========================================================================
+ // Interface StorableQueue
+ //========================================================================
+
+ public int getQueueID()
+ {
+ return _queueId;
+ }
+
+ public void setQueueID(int id)
+ {
+ _queueId = id;
+ }
+
+ public void dequeue(StorableMessage m)
+ {
+ _messages.remove(m.getMessageId());
+ }
+
+ public void enqueue(StorableMessage m)
+ {
+ _messages.put(m.getMessageId(), m);
+ }
+
+ //========================================================================
+ // Used by the Store
+ //========================================================================
+
+ /**
+ * Get the list of enqueud messages
+ *
+ * @return The list of enqueud messages
+ */
+ public Collection<StorableMessage> getAllEnqueuedMessages()
+ {
+ return _messages.values();
+ }
+
+ /**
+ * Get the enqueued message identified by messageID
+ *
+ * @param messageId the id of the enqueued message to recover
+ * @return The enqueued message with the specified id
+ */
+ public StorableMessage getEnqueuedMessage(long messageId)
+ {
+ return _messages.get(messageId);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index 630186991b..1d9f56669e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import java.util.LinkedList;
import java.util.List;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -46,11 +47,18 @@ public class InMemoryMessageHandle implements AMQMessageHandle
private long _arrivalTime;
+ // the message payload
+ private byte[] _payload;
+ // a buffer to write the payload
+ ByteBuffer _buffer;
+
public InMemoryMessageHandle()
{
}
- public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId)
+ throws
+ AMQException
{
return _contentHeaderBody;
}
@@ -60,28 +68,36 @@ public class InMemoryMessageHandle implements AMQMessageHandle
return _contentBodies.size();
}
- public long getBodySize(StoreContext context, Long messageId) throws AMQException
+ public long getBodySize(StoreContext context, Long messageId)
+ throws
+ AMQException
{
return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentChunk getContentChunk(StoreContext context, Long messageId, int index)
+ throws
+ AMQException,
+ IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
- (_contentBodies.size() - 1));
+ (_contentBodies.size() - 1));
}
return _contentBodies.get(index);
}
public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
- throws AMQException
+ throws
+ AMQException
{
_contentBodies.add(contentBody);
}
- public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
+ public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId)
+ throws
+ AMQException
{
return _messagePublishInfo;
}
@@ -97,40 +113,50 @@ public class InMemoryMessageHandle implements AMQMessageHandle
_redelivered = redelivered;
}
- public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
+ public boolean isPersistent(StoreContext context, Long messageId)
+ throws
+ AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
ContentHeaderBody chb = getContentHeaderBody(context, messageId);
return chb.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+ ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
}
/**
* This is called when all the content has been received.
+ *
* @param messagePublishInfo
* @param contentHeaderBody
* @throws AMQException
*/
public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
- throws AMQException
+ throws
+ AMQException
{
_messagePublishInfo = messagePublishInfo;
_contentHeaderBody = contentHeaderBody;
_arrivalTime = System.currentTimeMillis();
}
- public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ public void removeMessage(StoreContext storeContext, Long messageId)
+ throws
+ AMQException
{
// NO OP
}
- public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
+ public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue)
+ throws
+ AMQException
{
// NO OP
}
- public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
+ public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue)
+ throws
+ AMQException
{
// NO OP
}
@@ -140,4 +166,24 @@ public class InMemoryMessageHandle implements AMQMessageHandle
return _arrivalTime;
}
+
+ // added by Arnaud
+ public byte[] getMessagePayload()
+ {
+ if (_payload == null)
+ {
+ int bodySize = (int) _contentHeaderBody.bodySize;
+ _buffer = ByteBuffer.allocate(bodySize);
+ _payload = new byte[bodySize];
+ for (ContentChunk contentBody : _contentBodies)
+ {
+ int chunkSize = contentBody.getSize();
+ byte[] chunk = new byte[chunkSize];
+ contentBody.getData().get(chunk);
+ _buffer.put(chunk);
+ }
+ _buffer.get(_payload);
+ }
+ return _payload;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
index 94ab935115..69aaffa907 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.StorableMessage;
/**
* Constructs a message handle based on the publish body, the content header and the queue to which the message
@@ -31,12 +32,13 @@ import org.apache.qpid.server.store.MessageStore;
public class MessageHandleFactory
{
- public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent)
+ public AMQMessageHandle createMessageHandle(MessageStore store, StorableMessage m, boolean persistent)
{
// just hardcoded for now
if (persistent)
{
- return new WeakReferenceMessageHandle(store);
+ // return new WeakReferenceMessageHandle(store);
+ return new StorableMessageHandle(store, m);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index 3b1b5acf3c..ed2101fd75 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.messageStore.StorableQueue;
public interface QueueRegistry
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
new file mode 100644
index 0000000000..5978e3b10d
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
@@ -0,0 +1,215 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+import java.util.List;
+import java.util.LinkedList;
+import java.nio.ByteBuffer;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 25-Apr-2007
+ * Time: 14:26:34
+ */
+public class StorableMessageHandle implements AMQMessageHandle
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(StorableMessageHandle.class);
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // the message store
+ final private MessageStore _messageStore;
+ // A reference on the message itself
+ final private StorableMessage _message;
+ // the message payload
+ private byte[] _payload;
+ // a buffer to write the payload
+ ByteBuffer _buffer;
+ // the ContentHeaderBody
+ private ContentHeaderBody _contentHeaderBody;
+ // the arrival time
+ private long _arrivalTime;
+ // Specify if this messag is redelivered
+ private boolean _redelivered;
+ // MessagePublishInfo
+ private MessagePublishInfo _messagePublishInfo;
+ // list of chunks
+ private List<ContentChunk> _chunks = new LinkedList<ContentChunk>();
+
+ //========================================================================
+ // Constructors
+ //========================================================================
+
+ public StorableMessageHandle(MessageStore messageStore, StorableMessage message)
+ {
+ _messageStore = messageStore;
+ _message = message;
+ }
+
+ //========================================================================
+ // Interface AMQMessageHandle
+ //========================================================================
+ public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId)
+ throws
+ AMQException
+ {
+ return _contentHeaderBody;
+ }
+
+ public int getBodyCount(StoreContext context, Long messageId)
+ throws
+ AMQException
+ {
+ return _chunks.size();
+ }
+
+ public long getBodySize(StoreContext context, Long messageId)
+ throws
+ AMQException
+ {
+ return _payload.length;
+ }
+
+ public ContentChunk getContentChunk(StoreContext context, Long messageId, int index)
+ throws
+ IllegalArgumentException,
+ AMQException
+ {
+ return _chunks.get(index);
+ }
+
+ public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
+ throws
+ AMQException
+ {
+ _chunks.add(contentBody);
+ // if rquired this message can be added to the store
+ //_messageStore.appendContent(_message, _payload, 0, 10);
+
+ }
+
+ public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId)
+ throws
+ AMQException
+ {
+ return _messagePublishInfo;
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+ public void setRedelivered(boolean redelivered)
+ {
+ _redelivered = redelivered;
+ }
+
+ public boolean isPersistent(StoreContext context, Long messageId)
+ throws
+ AMQException
+ {
+ return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId,
+ MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody contentHeaderBody)
+ throws
+ AMQException
+ {
+ _contentHeaderBody = contentHeaderBody;
+ _arrivalTime = System.currentTimeMillis();
+ _messagePublishInfo = messagePublishInfo;
+ }
+
+ public void removeMessage(StoreContext storeContext, Long messageId)
+ throws
+ AMQException
+ {
+ // This is already handled by the store but we can possibly do:
+ // _messageStore.destroy(_message);
+ }
+
+ public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue)
+ throws
+ AMQException
+ {
+ try
+ {
+ _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue);
+ } catch (Exception e)
+ {
+ throw new AMQException("PRoblem during message enqueue", e);
+ }
+ }
+
+ public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue)
+ throws
+ AMQException
+ {
+ try
+ {
+ _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue);
+ } catch (Exception e)
+ {
+ throw new AMQException("PRoblem during message dequeue", e);
+ }
+ }
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
+ public byte[] getMessagePayload()
+ {
+ if (_payload == null)
+ {
+ int bodySize = (int) _contentHeaderBody.bodySize;
+ _buffer = ByteBuffer.allocate(bodySize);
+ _payload = new byte[bodySize];
+ for (ContentChunk contentBody : _chunks)
+ {
+ int chunkSize = contentBody.getSize();
+ byte[] chunk = new byte[chunkSize];
+ contentBody.getData().get(chunk);
+ _buffer.put(chunk);
+ }
+ _buffer.get(_payload);
+ }
+ return _payload;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index 373a64e2eb..64fb6c15d5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -224,4 +224,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
return _arrivalTime;
}
+
+ // added by Arnaud
+ public byte[] getMessagePayload()
+ {
+ return new byte[0]; //To change body of implemented methods use File | Settings | File Templates.
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
new file mode 100644
index 0000000000..0d25ab0e32
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 25-Apr-2007
+ * Time: 17:13:07
+ */
+public class DequeueRecord implements TransactionRecord
+{
+
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ // do nothing
+ }
+
+ public void rollback(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void prepare(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
+
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
new file mode 100644
index 0000000000..eff623ca7c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
@@ -0,0 +1,248 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.InvalidXidException;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 25-Apr-2007
+ * Time: 15:58:07
+ */
+public class DistributedTransactionalContext implements TransactionalContext
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(DistributedTransactionalContext.class);
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // the message store
+ final private MessageStore _messageStore;
+ // The transaction manager
+ final private TransactionManager _transactionManager;
+ // the store context
+ final private StoreContext _storeContext;
+ // the returned messages
+ final private List<RequiredDeliveryException> _returnMessages;
+ // for generating xids
+ private byte[] _txId = ("txid").getBytes();
+ private int _count = 0;
+
+ public DistributedTransactionalContext(TransactionManager transactionManager, MessageStore messageStore, StoreContext storeContext,
+ List<RequiredDeliveryException> returnMessages)
+ {
+ _messageStore = messageStore;
+ _storeContext = storeContext;
+ _returnMessages = returnMessages;
+ _transactionManager = transactionManager;
+ }
+
+ public void beginTranIfNecessary()
+ throws
+ AMQException
+ {
+ // begin the transaction and pass the XID through the context
+ Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId);
+ try
+ {
+ _transactionManager.begin(xid);
+ _storeContext.setPayload(xid);
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem during transaction begin", e);
+ }
+ }
+
+ public void commit()
+ throws
+ AMQException
+ {
+ try
+ {
+ _transactionManager.commit_one_phase((Xid) _storeContext.getPayload());
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem during transaction commit", e);
+ }
+ }
+
+ public void rollback()
+ throws
+ AMQException
+ {
+ try
+ {
+ _transactionManager.rollback((Xid) _storeContext.getPayload());
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem during transaction rollback", e);
+ }
+ }
+
+ public void messageFullyReceived(boolean persistent)
+ throws
+ AMQException
+ {
+ // The message is now fully received, we can stage it before enqueued if necessary
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst)
+ throws
+ AMQException
+ {
+ try
+ {
+ //The message has been delivered to the queues
+ message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
+ // add a record in the transaction
+ _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, message, queue, deliverFirst));
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem during transaction rollback", e);
+ }
+ }
+
+ public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag,
+ boolean multiple,
+ final UnacknowledgedMessageMap unacknowledgedMessageMap)
+ throws
+ AMQException
+ {
+ if (multiple)
+ {
+ if (deliveryTag == 0)
+ {
+ //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
+ // tells the server to acknowledge all outstanding mesages.
+ _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
+ unacknowledgedMessageMap.size());
+ unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message)
+ throws
+ AMQException
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + message.message.getMessageId());
+ }
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ dequeue(message);
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ unacknowledgedMessageMap.clear();
+ }
+ });
+ } else
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ }
+
+ LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
+ unacknowledgedMessageMap.drainTo(acked, deliveryTag);
+ for (UnacknowledgedMessage msg : acked)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + msg.message.getMessageId());
+ }
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ dequeue(msg);
+ }
+ }
+ } else
+ {
+ UnacknowledgedMessage msg;
+ msg = unacknowledgedMessageMap.remove(deliveryTag);
+
+ if (msg == null)
+ {
+ _log.info("Single ack on delivery tag " + deliveryTag);
+ throw new AMQException("Single ack on delivery tag " + deliveryTag);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + msg.message.getMessageId());
+ }
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ dequeue(msg);
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
+ msg.message.getMessageId());
+ }
+ }
+ }
+
+ private void dequeue(UnacknowledgedMessage message)
+ throws
+ AMQException
+ {
+ // Dequeue the message from the strore
+ message.discard(_storeContext);
+ // Add a record
+ try
+ {
+ _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new DequeueRecord());
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem during message dequeue", e);
+ }
+ }
+
+
+ public void messageProcessed(AMQProtocolSession protocolSession)
+ throws
+ AMQException
+ {
+ // The message has been sent
+ }
+
+ public StoreContext getStoreContext()
+ {
+ return _storeContext;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java
new file mode 100644
index 0000000000..cdf209fb12
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java
@@ -0,0 +1,79 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.AMQException;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 25-Apr-2007
+ * Time: 17:01:06
+ */
+public class EnqueueRecord implements TransactionRecord
+{
+ private final StoreContext _storeContext;
+ private final AMQMessage _msg;
+ private final AMQQueue _queue;
+ private final boolean _first;
+
+ EnqueueRecord(StoreContext storeContext, AMQMessage msg, AMQQueue q, boolean firsr)
+ {
+ _storeContext = storeContext;
+ _msg = msg;
+ _queue = q;
+ _first = firsr;
+ }
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ try
+ {
+ _queue.process(_storeContext, _msg, _first);
+ } catch (AMQException e)
+ {
+ throw new InternalErrorException(e);
+ }
+ }
+
+ public void rollback(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void prepare(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
new file mode 100644
index 0000000000..a132bcefe6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
@@ -0,0 +1,125 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+import java.util.Set;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 02-May-2007
+ * Time: 08:41:33
+ */
+public class MemoryTransactionManager implements TransactionManager
+{
+
+ public XAFlag begin(Xid xid)
+ throws
+ InternalErrorException,
+ InvalidXidException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public XAFlag prepare(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public XAFlag rollback(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public XAFlag commit(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException,
+ NotPreparedException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public XAFlag commit_one_phase(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void forget(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setTimeout(Xid xid, long timeout)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getTimeout(Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Set<Xid> recover(boolean startscan, boolean endscan)
+ throws
+ InternalErrorException,
+ CommandInvalidException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void HeuristicOutcome(Xid xid)
+ throws
+ UnknownXidException,
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Transaction getTransaction(Xid xid)
+ throws
+ UnknownXidException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 181dfa3a80..857fb350a0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -31,7 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.store.StoreContext;
/** @author Apache Software Foundation */
@@ -74,7 +74,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (!_inTran)
{
- _messageStore.beginTran(_storeContext);
+ // _messageStore.beginTran(_storeContext);
_inTran = true;
}
}
@@ -212,7 +212,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (persistent)
{
- _messageStore.commitTran(_storeContext);
+ // _messageStore.commitTran(_storeContext);
_inTran = false;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
new file mode 100644
index 0000000000..cd2f619f7e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 25-Apr-2007
+ * Time: 14:08:39
+ */
+public interface Transaction
+{
+
+ /**
+ * Add an abstract record to this tx.
+ *
+ * @param record The record to be added
+ */
+ public void addRecord(TransactionRecord record);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
new file mode 100644
index 0000000000..8047236985
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+import java.util.Set;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 13:29:31
+ */
+public interface TransactionManager
+{
+ /**
+ * Begin a transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to begin
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution.
+ * <li> <code>XAFlag.rbrollback</code>: The transaction branch was marked rollback-only for an unspecified reason.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws InvalidXidException The Xid is invalid
+ */
+ public XAFlag begin(Xid xid)
+ throws
+ InternalErrorException,
+ InvalidXidException;
+
+ /**
+ * Prepare the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to prepare
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution.
+ * <li> <code>XAFlag.rdonly</code>: The transaction branch was read-only and has been committed.
+ * <li> <code>XAFlag.rbrollback</code>: The transaction branch was marked rollback-only for an unspeci?ed reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Prepare has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public XAFlag prepare(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException;
+
+ /**
+ * Rollback the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to rollback
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * <li> <code>XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Rollback has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public XAFlag rollback(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException;
+
+ /**
+ * Commit the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to commit
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Commit has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ * @throws NotPreparedException The branch was not prepared prior to commit
+ */
+ public XAFlag commit(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException,
+ NotPreparedException;
+
+ /**
+ * One phase commit the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to one phase commit
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * <li> <code>XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Commit has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public XAFlag commit_one_phase(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException;
+
+ /**
+ * Forget about the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to forget
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Forget has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public void forget(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException;
+
+ /**
+ * Set the transaction branch timeout value in seconds
+ *
+ * @param xid The xid of the branch to set timeout
+ * @param timeout Timeout value in seconds
+ * @throws InternalErrorException In case of internal problem
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public void setTimeout(Xid xid, long timeout)
+ throws
+ InternalErrorException,
+ UnknownXidException;
+
+ /**
+ * Get the transaction branch timeout
+ *
+ * @param xid The xid of the branch to get the timeout from
+ * @return The timeout associated with the branch identified with xid
+ * @throws InternalErrorException In case of internal problem
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public long getTimeout(Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException;
+
+ /**
+ * Get a set of Xids the RM has prepared or heuristically completed
+ *
+ * @param startscan Indicates that recovery scan should start
+ * @param endscan Indicates that the recovery scan should end after returning the Xids
+ * @return Set of Xids the RM has prepared or heuristically completed
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Recover has been call in an improper context
+ */
+ public Set<Xid> recover(boolean startscan, boolean endscan)
+ throws
+ InternalErrorException,
+ CommandInvalidException;
+
+
+ /**
+ * An error happened (for example the channel has been abruptly closed)
+ * with this Xid, TM must make a heuristical decision.
+ *
+ * @param xid The Xid of the transaction branch to be heuristically completed
+ * @throws UnknownXidException The Xid is unknown
+ * @throws InternalErrorException In case of internal problem
+ */
+ public void HeuristicOutcome(Xid xid)
+ throws
+ UnknownXidException,
+ InternalErrorException;
+
+ /**
+ * Get the Transaction corresponding to the provided Xid
+ * @param xid The Xid of the transaction to ger
+ * @return The transaction with the provided Xid
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public Transaction getTransaction(Xid xid)
+ throws
+ UnknownXidException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java
new file mode 100644
index 0000000000..3f6f1d6b8e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.messageStore.MessageStore;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 25-Apr-2007
+ * Time: 14:12:17
+ */
+public interface TransactionRecord
+{
+ /**
+ * Commit this record.
+ *
+ * @param store the store to be used during commit
+ * @param xid the xid of the tx branch
+ * @throws org.apache.qpid.server.exception.InternalErrorException in case of internal problem
+ * @throws org.apache.qpid.server.exception.QueueDoesntExistException the queue does not exist
+ * @throws org.apache.qpid.server.exception.InvalidXidException the xid is invalid
+ * @throws org.apache.qpid.server.exception.UnknownXidException the xid is unknonw
+ * @throws org.apache.qpid.server.exception.MessageDoesntExistException the message does not exist
+ */
+ public abstract void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException;
+
+ /**
+ * rollback this record
+ *
+ * @param store the store to be used
+ * @throws InternalErrorException In case of internal error
+ */
+ public abstract void rollback(MessageStore store)
+ throws
+ InternalErrorException;
+
+ /**
+ * Prepare this record
+ *
+ * @param store the store to be used
+ * @throws InternalErrorException In case of internal error
+ */
+ public abstract void prepare(MessageStore store)
+ throws
+ InternalErrorException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java b/java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java
new file mode 100644
index 0000000000..8214ab7dac
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.server.txn;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:57:01
+ */
+public enum XAFlag
+{
+ rbrollback(1, "XA-RBROLLBACK", "The rollback was caused by an unspecified reason"),
+ rbtimeout(2, "XA-RBTIMEOUT", "The transaction branch took too long"),
+ heurhaz(3, "XA-HEURHAZ", "The transaction branch may have been heuristically completed"),
+ heurcom(4, "XA-HEURCOM", "The transaction branch has been heuristically committed"),
+ heurrb(5, "XA-HEURRB", "The transaction branch has been heuristically rolled back"),
+ heurmix(6, "XA-HEURMIX", "The transaction branch has been heuristically committed and rolled back"),
+ rdonly(7, "XA-RDONLY", "The transaction branch was read-only and has been committed"),
+ ok(8, "XA-OK", "Normal execution");
+
+ private final int _code;
+
+ private final String _name;
+
+ private final String _description;
+
+ XAFlag(int code, String name, String description)
+ {
+ _code = code;
+ _name = name;
+ _description = description;
+ }
+
+ //==============================================
+ // Getter methods
+ //==============================================
+
+ public int getCode()
+ {
+ return _code;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String getDescription()
+ {
+ return _description;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java b/java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java
new file mode 100644
index 0000000000..91db1d97c0
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java
@@ -0,0 +1,210 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 20:32:55
+ */
+public class XidImpl implements Xid
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(XidImpl.class);
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+
+ //the transaction branch identifier part of XID as an array of bytes
+ private byte[] m_branchQualifier;
+
+ // the format identifier part of the XID.
+ private int m_formatID;
+
+ // the global transaction identifier part of XID as an array of bytes.
+ private byte[] m_globalTransactionID;
+
+ //========================================================================
+ // Constructor(s)
+ //========================================================================
+
+ /**
+ * Create new Xid.
+ */
+ public XidImpl()
+ {
+
+ }
+
+ /**
+ * Create new XidImpl from an existing Xid.
+ * <p/>
+ * This is usually called when an application server provides some implementation
+ * of the Xid interface and we need to cast this into our own XidImpl.
+ *
+ * @param xid the xid to cloning
+ */
+ public XidImpl(Xid xid)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Cloning Xid");
+ }
+ m_branchQualifier = xid.getBranchQualifier();
+ m_formatID = xid.getFormatId();
+ m_globalTransactionID = xid.getGlobalTransactionId();
+ }
+
+ /**
+ * Create a new Xid.
+ *
+ * @param branchQualifier The transaction branch identifier part of XID as an array of bytes.
+ * @param format The format identifier part of the XID.
+ * @param globalTransactionID The global transaction identifier part of XID as an array of bytes.
+ */
+ public XidImpl(byte[] branchQualifier, int format, byte[] globalTransactionID)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("creating Xid");
+ }
+ m_branchQualifier = branchQualifier;
+ m_formatID = format;
+ m_globalTransactionID = globalTransactionID;
+ }
+
+//========================================================================
+
+ // Xid interface implementation
+ //========================================================================
+ /**
+ * Format identifier. O means the OSI CCR format.
+ *
+ * @return Global transaction identifier.
+ */
+ public byte[] getGlobalTransactionId()
+ {
+ return m_globalTransactionID;
+ }
+
+ /**
+ * Obtain the transaction branch identifier part of XID as an array of bytes.
+ *
+ * @return Branch identifier part of XID.
+ */
+ public byte[] getBranchQualifier()
+ {
+ return m_branchQualifier;
+ }
+
+ /**
+ * Obtain the format identifier part of the XID.
+ *
+ * @return Format identifier. O means the OSI CCR format.
+ */
+ public int getFormatId()
+ {
+ return m_formatID;
+ }
+
+//========================================================================
+// Object operations
+//========================================================================
+
+ /**
+ * Indicates whether some other Xid is "equal to" this one.
+ * <p/>
+ * Two Xids are equal if and only if their three elementary parts are equal
+ *
+ * @param o the object to compare this <code>XidImpl</code> against.
+ * @return code>true</code> if the <code>XidImpl</code> are equal; <code>false</code> otherwise.
+ */
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o instanceof XidImpl)
+ {
+ XidImpl other = (XidImpl) o;
+ if (m_formatID == other.getFormatId())
+ {
+ if (m_branchQualifier.length == other.getBranchQualifier().length)
+ {
+ for (int i = 0; i < m_branchQualifier.length; i++)
+ {
+ if (m_branchQualifier[i] != other.getBranchQualifier()[i])
+ {
+ return false;
+ }
+ }
+
+ if (m_globalTransactionID.length == other.getGlobalTransactionId().length)
+ {
+ for (int i = 0; i < m_globalTransactionID.length; i++)
+ {
+ if (m_globalTransactionID[i] != other.getGlobalTransactionId()[i])
+ {
+ return false;
+ }
+ }
+ // everithing is equal
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns a hash code for this Xid.
+ * <p/>
+ * As this object is used as a key entry in a hashMap it is necessary to provide an implementation
+ * of hashcode in order to fulfill the following aspect of the general contract of
+ * {@link Object#hashCode()} that is:
+ * <ul>
+ * <li>If two objects are equal according to the <tt>equals(Object)</tt>
+ * method, then calling the <code>hashCode</code> method on each of
+ * the two objects must produce the same integer result.
+ * </ul>
+ * <p/>
+ * The hash code for a
+ * <code>XidImpl</code> object is computed as
+ * <blockquote><pre>
+ * hashcode( globalTransactionID ) + hashcode( branchQualifier ) + formatID
+ * </pre></blockquote>
+ *
+ * @return a hash code value for this object.
+ */
+ public int hashCode()
+ {
+ return (new String(m_globalTransactionID)).hashCode() + (new String(m_branchQualifier)).hashCode() + m_formatID;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 150b98b424..f7c578d9a0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -59,7 +59,8 @@ public class NullApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
- _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
+ _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.MemoryMessageStore");
+ _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.MemoryTransactionManager");
Properties users = new Properties();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index b5c59dbbb7..7e329a5274 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -1,249 +1,272 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-
-public class VirtualHost implements Accessable
-{
- private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
- private final String _name;
-
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private MessageStore _messageStore;
-
- protected VirtualHostMBean _virtualHostMBean;
-
- private AMQBrokerManagerMBean _brokerMBean;
-
- private AuthenticationManager _authenticationManager;
-
- private AccessManager _accessManager;
-
-
- public void setAccessableName(String name)
- {
- _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
- + name + ") ignored remains :" + getAccessableName());
- }
-
- public String getAccessableName()
- {
- return _name;
- }
-
-
- /**
- * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, "VirtualHost");
- }
-
- public String getObjectInstanceName()
- {
- return _name.toString();
- }
-
- public String getName()
- {
- return _name.toString();
- }
-
- public VirtualHost getVirtualHost()
- {
- return VirtualHost.this;
- }
-
-
- } // End of MBean class
-
-
- public VirtualHost(String name, MessageStore store) throws Exception
- {
- this(name, null, store);
- }
-
- public VirtualHost(String name, Configuration hostConfig) throws Exception
- {
- this(name, hostConfig, null);
- }
-
- private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
- {
- _name = name;
-
- _virtualHostMBean = new VirtualHostMBean();
- // This isn't needed to be registered
- //_virtualHostMBean.register();
-
- _queueRegistry = new DefaultQueueRegistry(this);
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
-
- if (store != null)
- {
- _messageStore = store;
- }
- else
- {
- if (hostConfig == null)
- {
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
- }
- initialiseMessageStore(hostConfig);
- }
-
- _exchangeRegistry.initialise();
-
- _logger.warn("VirtualHost authentication Managers require spec change to be operational.");
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
- _accessManager = new AccessManagerImpl(name, hostConfig);
-
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _brokerMBean.register();
- }
-
- private void initialiseMessageStore(Configuration config) throws Exception
- {
- String messageStoreClass = config.getString("store.class");
-
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", config);
- }
-
-
- public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
- {
- T instance;
- try
- {
- instance = instanceType.newInstance();
- }
- catch (Exception e)
- {
- _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
- }
- Configurator.configure(instance);
-
- return instance;
- }
-
-
- public String getName()
- {
- return _name;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- throw new UnsupportedOperationException();
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public AccessManager getAccessManager()
- {
- return _accessManager;
- }
-
- public void close() throws Exception
- {
- if (_messageStore != null)
- {
- _messageStore.close();
- }
- }
-
- public ManagedObject getBrokerMBean()
- {
- return _brokerMBean;
- }
-
- public ManagedObject getManagedObject()
- {
- return _virtualHostMBean;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.security.access.AccessManager;
+import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.messageStore.MessageStore;
+
+public class VirtualHost implements Accessable
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ private TransactionManager _transactionManager;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+ private AuthenticationManager _authenticationManager;
+
+ private AccessManager _accessManager;
+
+
+ public void setAccessableName(String name)
+ {
+ _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
+ + name + ") ignored remains :" + getAccessableName());
+ }
+
+ public String getAccessableName()
+ {
+ return _name;
+ }
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
+ * implementaion of an Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+
+ public VirtualHost(String name, MessageStore store) throws Exception
+ {
+ this(name, null, store);
+ }
+
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ this(name, hostConfig, null);
+ }
+
+ private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ // This isn't needed to be registered
+ //_virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+ if (store != null)
+ {
+ _messageStore = store;
+ }
+ else
+ {
+ if (hostConfig == null)
+ {
+ throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ }
+ initialiseTransactionManager(hostConfig);
+ initialiseMessageStore(hostConfig);
+ }
+
+ _exchangeRegistry.initialise();
+
+ _logger.warn("VirtualHost authentication Managers require spec change to be operational.");
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
+
+ _accessManager = new AccessManagerImpl(name, hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, _transactionManager, "store", config);
+ }
+
+ private void initialiseTransactionManager(Configuration config) throws Exception
+ {
+ String transactionManagerClass = config.getString("txn.class");
+ Class clazz = Class.forName(transactionManagerClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof TransactionManager))
+ {
+ throw new ClassCastException("Transaction Manager class must implement " + TransactionManager.class + ". Class " + clazz +
+ " does not.");
+ }
+ _transactionManager = (TransactionManager) o;
+ }
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public TransactionManager getTransactionManager()
+ {
+ return _transactionManager;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public AccessManager getAccessManager()
+ {
+ return _accessManager;
+ }
+
+ public void close() throws Exception
+ {
+ if (_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index a9496d0de1..a7e5c0d1d0 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -19,14 +19,16 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.MemoryTransactionManager;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.TestMinaProtocolSession;
@@ -51,6 +53,8 @@ public class AMQQueueAlertTest extends TestCase
private VirtualHost _virtualHost;
private AMQMinaProtocolSession protocolSession = null;
private MessageStore _messageStore = new MemoryMessageStore();
+ private TransactionManager _txm = new MemoryTransactionManager();
+
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
null,
@@ -170,7 +174,7 @@ public class AMQQueueAlertTest extends TestCase
public void testQueueDepthAlertWithSubscribers() throws Exception
{
protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 2,_txm, _messageStore, null);
protocolSession.addChannel(channel);
// Create queue
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 551eb8f0a0..03137c9794 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -30,9 +30,11 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.MemoryTransactionManager;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.messageStore.MemoryMessageStore;
import javax.management.JMException;
import java.util.LinkedList;
@@ -47,6 +49,7 @@ public class AMQQueueMBeanTest extends TestCase
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private MessageStore _messageStore = new MemoryMessageStore();
+ private TransactionManager _txm = new MemoryTransactionManager();
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
null,
@@ -80,7 +83,7 @@ public class AMQQueueMBeanTest extends TestCase
TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 1,_txm, _messageStore, null);
protocolSession.addChannel(channel);
_queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);