diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-05-02 16:49:03 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-05-02 16:49:03 +0000 |
| commit | c98198c56248d16465a1871aa11e9c94c0f950db (patch) | |
| tree | 56272c427e9b36386a94b4f7bdddb402002bd1c0 /java/broker/src | |
| parent | 4fa14823a4110d82c26edcc1aaf0cd9d325a9dd4 (diff) | |
| download | qpid-python-c98198c56248d16465a1871aa11e9c94c0f950db.tar.gz | |
I am commiting the patch supplied by Arnaud Simon. This patch contains support for dtx.
Currently there is one test case failing. I will try to fix it, if not Arnuad will provide a patch soon
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@534541 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
42 files changed, 3346 insertions, 457 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index d31359b019..ae1cf43f6c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -58,8 +58,10 @@ import org.apache.qpid.server.management.ManagedBroker; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exception.InternalErrorException; +import org.apache.qpid.server.exception.QueueAlreadyExistsException; /** * This MBean implements the broker management interface and exposes the @@ -100,7 +102,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr * @param exchangeName * @param type * @param durable - * @param autoDelete * @throws JMException */ public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException @@ -158,7 +159,6 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr * @param queueName * @param durable * @param owner - * @param autoDelete * @throws JMException */ public void createNewQueue(String queueName, String owner, boolean durable) throws JMException @@ -180,7 +180,13 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost()); if (queue.isDurable() && !queue.isAutoDelete()) { - _messageStore.createQueue(queue); + try + { + _messageStore.createQueue(queue); + } catch (Exception e) + { + throw new JMException("problem creating queue " + queue.getName()); + } } Configuration virtualHostDefaultQueueConfiguration = @@ -222,10 +228,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr try { queue.delete(); - _messageStore.removeQueue(new AMQShortString(queueName)); - + _messageStore.destroyQueue(queue); } - catch (AMQException ex) + catch (Exception ex) { JMException jme = new JMException(ex.getMessage()); jme.initCause(ex); diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 918b5fc176..9711bbf4d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -47,11 +47,9 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.Subscription; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.LocalTransactionalContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.txn.*; public class AMQChannel { @@ -93,6 +91,8 @@ public class AMQChannel private final MessageStore _messageStore; + private final TransactionManager _transactionManager; + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); private final AtomicBoolean _suspended = new AtomicBoolean(false); @@ -116,7 +116,8 @@ public class AMQChannel //Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) + + public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges) throws AMQException { _session = session; @@ -125,6 +126,7 @@ public class AMQChannel _prefetch_HighWaterMark = DEFAULT_PREFETCH; _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; + _transactionManager = transactionManager; _exchanges = exchanges; // by default the session is non-transactional _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); @@ -133,7 +135,7 @@ public class AMQChannel /** Sets this channel to be part of a local transaction */ public void setLocalTransactional() { - _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages); + _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages); } public boolean isTransactional() diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 8573902af4..e337b26b33 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -36,8 +36,10 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exception.InternalErrorException; +import org.apache.qpid.server.exception.QueueAlreadyExistsException; public class VirtualHostConfiguration { @@ -48,7 +50,9 @@ public class VirtualHostConfiguration private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost."; - public VirtualHostConfiguration(String configFile) throws ConfigurationException + public VirtualHostConfiguration(String configFile) + throws + ConfigurationException { _logger.info("Loading Config file:" + configFile); @@ -57,24 +61,25 @@ public class VirtualHostConfiguration } - - private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException + private void configureVirtualHost(String virtualHostName, Configuration configuration) + throws + ConfigurationException, + AMQException { - _logger.debug("Loding configuration for virtaulhost: "+virtualHostName); + _logger.debug("Loding configuration for virtaulhost: " + virtualHostName); VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName); - - if(virtualHost == null) + if (virtualHost == null) { throw new ConfigurationException("Unknown virtual host: " + virtualHostName); } List exchangeNames = configuration.getList("exchanges.exchange.name"); - for(Object exchangeNameObj : exchangeNames) + for (Object exchangeNameObj : exchangeNames) { String exchangeName = String.valueOf(exchangeNameObj); configureExchange(virtualHost, exchangeName, configuration); @@ -83,7 +88,7 @@ public class VirtualHostConfiguration List queueNames = configuration.getList("queues.queue.name"); - for(Object queueNameObj : queueNames) + for (Object queueNameObj : queueNames) { String queueName = String.valueOf(queueNameObj); configureQueue(virtualHost, queueName, configuration); @@ -91,12 +96,14 @@ public class VirtualHostConfiguration } - private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException + private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) + throws + AMQException { CompositeConfiguration exchangeConfiguration = new CompositeConfiguration(); - exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString)); + exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange." + exchangeNameString)); exchangeConfiguration.addConfiguration(configuration.subset("exchanges")); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); @@ -110,18 +117,17 @@ public class VirtualHostConfiguration Exchange exchange; - synchronized (exchangeRegistry) { exchange = exchangeRegistry.getExchange(exchangeName); - if(exchange == null) + if (exchange == null) { - AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct")); - boolean durable = exchangeConfiguration.getBoolean("durable",false); - boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false); + AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type", "direct")); + boolean durable = exchangeConfiguration.getBoolean("durable", false); + boolean autodelete = exchangeConfiguration.getBoolean("autodelete", false); - Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); + Exchange newExchange = exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); exchangeRegistry.registerExchange(newExchange); } @@ -149,11 +155,14 @@ public class VirtualHostConfiguration return queueConfiguration; } - private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException + private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) + throws + AMQException, + ConfigurationException { CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString)); + queueConfiguration.addConfiguration(configuration.subset("queues.queue." + queueNameString)); queueConfiguration.addConfiguration(configuration.subset("queues")); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); @@ -173,7 +182,7 @@ public class VirtualHostConfiguration { _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName()); - boolean durable = queueConfiguration.getBoolean("durable" ,false); + boolean durable = queueConfiguration.getBoolean("durable", false); boolean autodelete = queueConfiguration.getBoolean("autodelete", false); String owner = queueConfiguration.getString("owner", null); @@ -184,21 +193,32 @@ public class VirtualHostConfiguration if (queue.isDurable()) { - messageStore.createQueue(queue); + try + { + messageStore.createQueue(queue); + } catch (InternalErrorException e) + { + _logger.error("Problem when creating Queue '" + queueNameString + + "' on virtual host " + virtualHost.getName() + ", not creating."); + + } catch (QueueAlreadyExistsException e) + { + _logger.error("Queue '" + queueNameString + + "' already exists on virtual host " + virtualHost.getName() + ", not creating."); + } } queueRegistry.registerQueue(queue); - } - else + } else { - _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating."); + _logger.info("Queue '" + queueNameString + "' already exists on virtual host " + virtualHost.getName() + ", not creating."); } String exchangeName = queueConfiguration.getString("exchange", null); Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - if(exchange == null) + if (exchange == null) { exchange = virtualHost.getExchangeRegistry().getDefaultExchange(); } @@ -211,15 +231,15 @@ public class VirtualHostConfiguration synchronized (exchange) { List routingKeys = queueConfiguration.getList("routingKey"); - if(routingKeys == null || routingKeys.isEmpty()) + if (routingKeys == null || routingKeys.isEmpty()) { routingKeys = Collections.singletonList(queue.getName()); } - for(Object routingKeyNameObj : routingKeys) + for (Object routingKeyNameObj : routingKeys) { AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - + queue.bind(routingKey, null, exchange); @@ -227,31 +247,33 @@ public class VirtualHostConfiguration _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); } - if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange()) + if (exchange != virtualHost.getExchangeRegistry().getDefaultExchange()) { - queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange()); + queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange()); } } } - Configurator.configure(queue, queueConfiguration); } - public void performBindings() throws AMQException, ConfigurationException + public void performBindings() + throws + AMQException, + ConfigurationException { List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name"); String defaultVirtualHostName = _config.getString("default"); - if(defaultVirtualHostName != null) + if (defaultVirtualHostName != null) { - ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName); + ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName); } _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames); - for(Object nameObject : virtualHostNames) + for (Object nameObject : virtualHostNames) { String name = String.valueOf(nameObject); configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name)); @@ -265,5 +287,4 @@ public class VirtualHostConfiguration } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java new file mode 100644 index 0000000000..ae1b916144 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java @@ -0,0 +1,59 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 15:52:29
+ */
+public class CommandInvalidException extends Exception
+{
+ /**
+ * Constructs a new CommandInvalidException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public CommandInvalidException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new CommandInvalidException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public CommandInvalidException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new CommandInvalidException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public CommandInvalidException(Throwable cause)
+ {
+ super(cause);
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java new file mode 100644 index 0000000000..f5fcfeee8f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java @@ -0,0 +1,57 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:41:53
+ */
+public class InternalErrorException extends Exception
+{
+ /**
+ * Constructs a new InternalErrorException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public InternalErrorException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new InternalErrorException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public InternalErrorException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new InternalErrorException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public InternalErrorException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java new file mode 100644 index 0000000000..3cae098403 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java @@ -0,0 +1,72 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:12:27
+ */
+public class InvalidXidException extends Exception
+{
+ /**
+ * Constructs a newr InvalidXidException with a standard message
+ *
+ * @param xid The invalid xid.
+ */
+ public InvalidXidException(Xid xid)
+ {
+ super("The Xid: " + xid + " is invalid");
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a cause
+ *
+ * @param xid The invalid xid.
+ * @param cause The casue for the xid to be invalid
+ */
+ public InvalidXidException(Xid xid, Throwable cause)
+ {
+ super("The Xid: " + xid + " is invalid", cause);
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a reason message
+ *
+ * @param reason The reason why the xid is invalid
+ * @param xid The invalid xid.
+ */
+ public InvalidXidException(Xid xid, String reason)
+ {
+ super("The Xid: " + xid + " is invalid, The reason is: " + reason);
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a reason message and cause
+ *
+ * @param reason The reason why the xid is invalid
+ * @param xid The invalid xid.
+ * @param cause The casue for the xid to be invalid
+ */
+ public InvalidXidException(Xid xid, String reason, Throwable cause)
+ {
+ super("The Xid: " + xid + " is invalid, The reason is: " + reason, cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java new file mode 100644 index 0000000000..f95132a450 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java @@ -0,0 +1,58 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 09:46:31
+ */
+public class MessageAlreadyStagedException extends Exception
+{
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public MessageAlreadyStagedException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public MessageAlreadyStagedException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public MessageAlreadyStagedException(Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java new file mode 100644 index 0000000000..b8ffe91247 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java @@ -0,0 +1,58 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 30-Mar-2007
+ * Time: 10:52:29
+ */
+public class MessageDoesntExistException extends Exception
+{
+ /**
+ * Constructs a new MessageDoesntExistException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public MessageDoesntExistException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new MessageDoesntExistException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public MessageDoesntExistException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new MessageDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public MessageDoesntExistException(Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java new file mode 100644 index 0000000000..af8c7374bf --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java @@ -0,0 +1,58 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 16:47:40
+ */
+public class NotPreparedException extends Exception
+{
+ /**
+ * Constructs a new NotPreparedException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public NotPreparedException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new NotPreparedException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public NotPreparedException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new NotPreparedException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public NotPreparedException(Throwable cause)
+ {
+ super(cause);
+ }
+}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java new file mode 100644 index 0000000000..39751261e4 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java @@ -0,0 +1,58 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 30-Mar-2007
+ * Time: 10:49:00
+ */
+public class QueueAlreadyExistsException extends Exception
+{
+ /**
+ * Constructs a new QueueAlreadyExistsException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public QueueAlreadyExistsException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new QueueAlreadyExistsException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public QueueAlreadyExistsException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public QueueAlreadyExistsException(Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java new file mode 100644 index 0000000000..88dea864a5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java @@ -0,0 +1,58 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:38:24
+ */
+public class QueueDoesntExistException extends Exception
+{
+ /**
+ * Constructs a new QueueDoesntExistException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public QueueDoesntExistException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public QueueDoesntExistException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public QueueDoesntExistException(Throwable cause)
+ {
+ super(cause);
+ }
+}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java b/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java new file mode 100644 index 0000000000..9c1a28413f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java @@ -0,0 +1,72 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 15:45:06
+ */
+public class UnknownXidException extends Exception
+{
+ /**
+ * Constructs a newr UnknownXidException with a standard message
+ *
+ * @param xid The unknown xid.
+ */
+ public UnknownXidException(Xid xid)
+ {
+ super("The Xid: " + xid + " is unknown");
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a cause
+ *
+ * @param xid The unknown xid.
+ * @param cause The casue for the xid to be unknown
+ */
+ public UnknownXidException(Xid xid, Throwable cause)
+ {
+ super("The Xid: " + xid + " is unknown", cause);
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a reason message
+ *
+ * @param reason The reason why the xid is unknown
+ * @param xid The unknown xid.
+ */
+ public UnknownXidException(Xid xid, String reason)
+ {
+ super("The Xid: " + xid + " is unknown, The reason is: " + reason);
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a reason message and cause
+ *
+ * @param reason The reason why the xid is unknown
+ * @param xid The unknown xid.
+ * @param cause The casue for the xid to be unknown
+ */
+ public UnknownXidException(Xid xid, String reason, Throwable cause)
+ {
+ super("The Xid: " + xid + " is unknown, The reason is: " + reason, cause);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 4774383642..9066af70d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -29,7 +29,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.exception.InternalErrorException; public class DefaultExchangeRegistry implements ExchangeRegistry { @@ -65,7 +66,13 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _exchangeMap.put(exchange.getName(), exchange); if(exchange.isDurable()) { - getMessageStore().createExchange(exchange); + try + { + getMessageStore().createExchange(exchange); + } catch (InternalErrorException e) + { + throw new AMQException("problem registering excahgne " + exchange, e); + } } } @@ -87,7 +94,13 @@ public class DefaultExchangeRegistry implements ExchangeRegistry { if(e.isDurable()) { - getMessageStore().removeExchange(e); + try + { + getMessageStore().removeExchange(e); + } catch (InternalErrorException e1) + { + throw new AMQException("Problem unregistering Exchange " + name, e1); + } } e.close(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 03fc7a3926..e4abae4f28 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -49,7 +49,8 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(), + final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getTransactionManager(), + virtualHost.getMessageStore(), virtualHost.getExchangeRegistry()); session.addChannel(channel); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 2e697d4564..ec9041c309 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -42,9 +42,11 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exception.InternalErrorException; +import org.apache.qpid.server.exception.QueueAlreadyExistsException; import org.apache.commons.configuration.Configuration; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> @@ -103,7 +105,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queue = createQueue(body, virtualHost, session); if (queue.isDurable() && !queue.isAutoDelete()) { - store.createQueue(queue); + try + { + store.createQueue(queue); + } catch (Exception e) + { + throw new AMQException("Problem when creating queue " + queue, e); + } } queueRegistry.registerQueue(queue); if (autoRegister) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index eb7089afdc..cfea4637ab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -30,9 +30,11 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exception.InternalErrorException; +import org.apache.qpid.server.exception.QueueDoesntExistException; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -107,7 +109,13 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB if (queue.isDurable()) { - store.removeQueue(queue.getName()); + try + { + store.destroyQueue(queue); + } catch (Exception e) + { + throw new AMQException("problem when destroying queue " + queue, e); + } } // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java new file mode 100644 index 0000000000..581bca2efe --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java @@ -0,0 +1,169 @@ +/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 26-Apr-2007
+ * Time: 08:23:45
+ */
+public class MemoryMessageStore implements MessageStore
+{
+
+ public void removeExchange(Exchange exchange)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void close()
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ return new byte[0]; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getNewMessageId()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java new file mode 100644 index 0000000000..c4b1d3182f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java @@ -0,0 +1,270 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:34:02
+ */
+public interface MessageStore
+{
+ /**
+ * Create a new exchange
+ *
+ * @param exchange the exchange to be persisted
+ * @throws InternalErrorException If an error occurs
+ */
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException;
+
+ /**
+ * Remove an exchange
+ * @param exchange The exchange to be removed
+ * @throws InternalErrorException If an error occurs
+ */
+ public void removeExchange(Exchange exchange) throws
+ InternalErrorException;
+
+ /**
+ * Bind a queue with an exchange given a routing key
+ *
+ * @param exchange The exchange to bind the queue with
+ * @param routingKey The routing key
+ * @param queue The queue to be bound
+ * @param args Args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void bindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Unbind a queue from an exchange
+ *
+ * @param exchange The exchange the queue was bound to
+ * @param routingKey The routing queue
+ * @param queue The queue to unbind
+ * @param args args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void unbindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param virtualHost The virtual host using by this store
+ * @param tm The transaction manager implementation
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @throws InternalErrorException If an error occurs that means the store is unable to configure itself
+ * @throws IllegalArgumentException If the configuration arguments are illegal
+ */
+ void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException;
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws InternalErrorException if close fails
+ */
+ void close()
+ throws
+ InternalErrorException;
+
+ /**
+ * Create a queue
+ *
+ * @param queue the queue to be created
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueAlreadyExistsException If the queue already exists in the store
+ */
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException;
+
+ /**
+ * Destroy a queue
+ *
+ * @param queue The queue to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ */
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException;
+
+ /**
+ * Stage the message before effective enqueue
+ *
+ * @param m The message to stage
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageAlreadyStagedException If the message is already staged
+ */
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException;
+
+
+ /**
+ * Append more data with a previously staged message
+ *
+ * @param m The message to which data must be appended
+ * @param data Data to happen to the message
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be written
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message has not been staged
+ */
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the content of previously staged or enqueued message.
+ * The message headers are also set.
+ *
+ * @param m The message for which the content must be loaded
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be loaded
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Destroy a previously staged message
+ *
+ * @param m the message to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist in the store
+ */
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Enqueue a message under the scope of the transaction branch
+ * identified by xid when specified.
+ * <p> This operation is propagated to the queue and the message.
+ * <p> A message that has been previously staged is assumed to have had
+ * its payload already added (see appendContent)
+ *
+ * @param xid The xid of the transaction branch under which the message must be enqueued.
+ * <p> It he xid is null then the message is enqueued outside the scope of any transaction.
+ * @param m The message to be enqueued
+ * @param queue The queue into which the message must be enqueued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ * @throws MessageDoesntExistException If the Message does not exist
+ */
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException;
+
+ /**
+ * Dequeue a message under the scope of the transaction branch identified by xid
+ * if specified.
+ * <p> This operation is propagated to the queue and the message.
+ *
+ * @param xid The xid of the transaction branch under which the message must be dequeued.
+ * <p> It he xid is null then the message is dequeued outside the scope of any transaction.
+ * @param m The message to be dequeued
+ * @param queue The queue from which the message must be dequeued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ */
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException;
+
+ //=========================================================
+ // Recovery specific methods
+ //=========================================================
+
+ /**
+ * List all the persistent queues
+ *
+ * @return All the persistent queues
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException;
+
+ /**
+ * All enqueued messages of a given queue
+ *
+ * @param queue The queue where the message are retrieved from
+ * @return The list all enqueued messages of a given queue
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException;
+
+ /**
+ * Get a new message ID
+ *
+ * @return A new message ID
+ */
+ public long getNewMessageId();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java new file mode 100644 index 0000000000..b228bb3027 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java @@ -0,0 +1,116 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+/**
+ * A storable message can be persisted in the message store.
+ *
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 08:56:48
+ */
+public interface StorableMessage
+{
+ /**
+ * The message ID is used by the store to identify a message.
+ *
+ * @return The message identifier
+ */
+ public long getMessageId();
+
+ /**
+ * Get the message header body that is saved when the message is staged.
+ *
+ * @return The message header body
+ */
+ public byte[] getHeaderBody();
+
+ /**
+ * Get the message header body size in bytes.
+ *
+ * @return The message header body size
+ */
+ public int getHeaderSize();
+
+ /**
+ * Get the message payload. This is required when the message is
+ * enqueued without any prior staging.
+ * <p> When the message is staged, the payload can be partial or even empty.
+ *
+ * @return The message payload
+ */
+ public byte[] getData();
+
+ /**
+ * Get the message payload size in bytes.
+ *
+ * @return The message payload size in bytes
+ */
+ public int getPayloadSize();
+
+ /**
+ * Specify whether this message has been enqueued
+ *
+ * @return true if this message is enqueued, false otherwise
+ */
+ public boolean isEnqueued();
+
+ /**
+ * This is called by the message store when this message is enqueued in the message store.
+ *
+ * @param queue The storable queue into which the message is enqueued
+ */
+ public void enqueue(StorableQueue queue);
+
+ /**
+ * This is called by the message store when this message is dequeued.
+ *
+ * @param queue The storable queue out of which the message is dequeued
+ */
+ public void dequeue(StorableQueue queue);
+
+ /**
+ * A message can be enqueued in several queues.
+ * The queue position represents the index of the provided queue within the ordered
+ * list of queues the message has been enqueued.
+ * <p>For example:
+ * <p> If the message is successively enqueued in queue Q1, Q2 and Q3 then
+ * the position of Q1 is 0, position of Q2 is 1 and position of Q3 is 2.
+ * <p> If the message is dequeud form Q2 then position of Q1 is stil 0 but position
+ * of Q3 becomes 1.
+ *
+ * @param queue The storable queue for which the position should be determined
+ *
+ * @return The position of the specified storable queue
+ */
+ public int getQueuePosition(StorableQueue queue);
+
+ /**
+ * Indicates whether this message has been staged.
+ *
+ * @return True if the message has been staged, false otherwise
+ */
+ public boolean isStaged();
+
+ /**
+ * Call by the message store when this message is staged.
+ */
+ public void staged();
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java b/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java new file mode 100644 index 0000000000..10a9a3b8b8 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java @@ -0,0 +1,75 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * A storable queue can store storable messages and can be persisted in the store.
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 08:52:18
+ */
+public interface StorableQueue
+{
+ /**
+ * Get This queue unique id.
+ *
+ * @return The queue ID
+ */
+ public int getQueueID();
+
+ /**
+ * Set the queue ID.
+ *
+ * @param id This queue ID
+ */
+ public void setQueueID(int id);
+
+ /**
+ * Get this queue owner.
+ *
+ * @return This queue owner
+ */
+ public AMQShortString getOwner();
+
+ /**
+ * Get this queue name.
+ *
+ * @return the name of this queue
+ */
+ public AMQShortString getName();
+
+ /**
+ * Signifies to this queue that a message is dequeued.
+ * This operation is called by the store.
+ *
+ * @param m The dequeued message
+ */
+ public void dequeue(StorableMessage m);
+
+ /**
+ * Signifies to this queue that a message is enqueued.
+ * This operation is called by the store.
+ *
+ * @param m The enqueued message
+ */
+ public void enqueue(StorableMessage m);
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 955aaa6acb..eefff090df 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -25,38 +25,45 @@ import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.StorableQueue; /** Combines the information that make up a deliverable message into a more manageable form. */ import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -/** Combines the information that make up a deliverable message into a more manageable form. */ -public class AMQMessage +/** + * Combines the information that make up a deliverable message into a more manageable form. + */ +public class AMQMessage implements StorableMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); - /** Used in clustering */ + // The ordered list of queues into which this message is enqueued. + private List<StorableQueue> _queues = new LinkedList<StorableQueue>(); + // Indicates whether this message is staged + private boolean _isStaged = false; + + /** + * Used in clustering + */ private Set<Object> _tokens; - /** Only use in clustering - should ideally be removed? */ + /** + * Only use in clustering - should ideally be removed? + */ private AMQProtocolSession _publisher; private final Long _messageId; @@ -221,13 +228,14 @@ public class AMQMessage * @param messageId * @param store * @param factory - * * @throws AMQException */ - public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException + public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) + throws + AMQException { _messageId = messageId; - _messageHandle = factory.createMessageHandle(messageId, store, true); + _messageHandle = factory.createMessageHandle(store, this, true); _txnContext = txnConext; _transientMessageData = null; } @@ -241,7 +249,9 @@ public class AMQMessage * @param contentHeader */ public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException + TransactionalContext txnContext, ContentHeaderBody contentHeader) + throws + AMQException { this(messageId, info, txnContext); setContentHeaderBody(contentHeader); @@ -256,14 +266,15 @@ public class AMQMessage * @param contentHeader * @param destinationQueues * @param contentBodies - * * @throws AMQException */ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext, - MessageHandleFactory messageHandleFactory) throws AMQException + MessageHandleFactory messageHandleFactory) + throws + AMQException { this(messageId, info, txnContext, contentHeader); _transientMessageData.setDestinationQueues(destinationQueues); @@ -274,7 +285,9 @@ public class AMQMessage } } - protected AMQMessage(AMQMessage msg) throws AMQException + protected AMQMessage(AMQMessage msg) + throws + AMQException { _messageId = msg._messageId; _messageHandle = msg._messageHandle; @@ -283,6 +296,98 @@ public class AMQMessage _transientMessageData = msg._transientMessageData; } + //======================================================================== + // Interface StorableMessage + //======================================================================== + + public long getMessageId() + { + return _messageId; + } + + public byte[] getHeaderBody() + { + byte[] result = null; + ContentHeaderBody headerBody; + ByteBuffer bufferedResult; + try + { + headerBody = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId); + result = new byte[headerBody.getSize()]; + bufferedResult = ByteBuffer.wrap(result); + headerBody.writePayload(bufferedResult); + } catch (AMQException e) + { + _log.error("Error when getting message header", e); + } + return result; + } + + public int getHeaderSize() + { + int result = 0; + try + { + result = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId).getSize(); + } catch (AMQException e) + { + _log.error("Error when getting message header size", e); + } + return result; + } + + public byte[] getData() + { + return _messageHandle.getMessagePayload(); + } + + public int getPayloadSize() + { + return _messageHandle.getMessagePayload().length; + } + + public boolean isEnqueued() + { + return _queues.size() > 0; + } + + public void enqueue(StorableQueue queue) + { + _queues.add(queue); + if (_log.isDebugEnabled()) + { + _log.debug("enqueued"); + } + } + + public void dequeue(StorableQueue queue) + { + _queues.remove(queue); + if (_log.isDebugEnabled()) + { + _log.debug("dequeued"); + } + } + + public int getQueuePosition(StorableQueue queue) + { + if (_log.isDebugEnabled()) + { + _log.debug("The queue position is " + _queues.indexOf(queue)); + } + return _queues.indexOf(queue); + } + + public boolean isStaged() + { + return _isStaged; + } + + public void staged() + { + _isStaged = true; + } + public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { return new BodyFrameIterator(protocolSession, channel); @@ -293,32 +398,36 @@ public class AMQMessage return new BodyContentIterator(); } - public ContentHeaderBody getContentHeaderBody() throws AMQException + public ContentHeaderBody getContentHeaderBody() + throws + AMQException { if (_transientMessageData != null) { return _transientMessageData.getContentHeaderBody(); - } - else + } else { return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId); } } public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) - throws AMQException + throws + AMQException { _transientMessageData.setContentHeaderBody(contentHeaderBody); } - public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException + public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) + throws + AMQException { final boolean persistent = isPersistent(); - _messageHandle = factory.createMessageHandle(_messageId, store, persistent); - if (persistent) - { + _messageHandle = factory.createMessageHandle(store, this, persistent); + //if (persistent) + // { _txnContext.beginTranIfNecessary(); - } + // } // enqueuing the messages ensure that if required the destinations are recorded to a // persistent store @@ -334,7 +443,9 @@ public class AMQMessage } } - public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException + public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) + throws + AMQException { _transientMessageData.addBodyLength(contentChunk.getSize()); final boolean allContentReceived = isAllContentReceived(); @@ -343,22 +454,19 @@ public class AMQMessage { deliver(storeContext); return true; - } - else + } else { return false; } } - public boolean isAllContentReceived() throws AMQException + public boolean isAllContentReceived() + throws + AMQException { return _transientMessageData.isAllContentReceived(); } - public long getMessageId() - { - return _messageId; - } /** * Creates a long-lived reference to this message, and increments the count of such references, as an atomic @@ -366,11 +474,13 @@ public class AMQMessage */ public AMQMessage takeReference() { - incrementReference();// _referenceCount.incrementAndGet(); + _referenceCount.incrementAndGet(); return this; } - /** Threadsafe. Increment the reference count on the message. */ + /** + * Threadsafe. Increment the reference count on the message. + */ protected void incrementReference() { _referenceCount.incrementAndGet(); @@ -385,11 +495,12 @@ public class AMQMessage * message store. * * @param storeContext - * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException + public void decrementReference(StoreContext storeContext) + throws + MessageCleanupException { int count = _referenceCount.decrementAndGet(); @@ -419,8 +530,7 @@ public class AMQMessage incrementReference(); throw new MessageCleanupException(_messageId, e); } - } - else + } else { if (_log.isDebugEnabled()) { @@ -497,8 +607,7 @@ public class AMQMessage if (taken.getAndSet(true)) { return true; - } - else + } else { _takenMap.put(queue, taken); _takenBySubcriptionMap.put(queue, sub); @@ -524,8 +633,7 @@ public class AMQMessage if (taken == null) { taken = new AtomicBoolean(false); - } - else + } else { taken.set(false); } @@ -546,8 +654,7 @@ public class AMQMessage if (_tokens.contains(token)) { return true; - } - else + } else { _tokens.add(token); return false; @@ -560,26 +667,30 @@ public class AMQMessage * AMQMessageHandle implementation can be picked based on various criteria. * * @param queue the queue - * * @throws org.apache.qpid.AMQException if there is an error enqueuing the message */ - public void enqueue(AMQQueue queue) throws AMQException + public void enqueue(AMQQueue queue) + throws + AMQException { _transientMessageData.addDestinationQueue(queue); } - public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException + public void dequeue(StoreContext storeContext, AMQQueue queue) + throws + AMQException { _messageHandle.dequeue(storeContext, _messageId, queue); } - public boolean isPersistent() throws AMQException + public boolean isPersistent() + throws + AMQException { if (_transientMessageData != null) { return _transientMessageData.isPersistent(); - } - else + } else { return _messageHandle.isPersistent(getStoreContext(), _messageId); } @@ -591,7 +702,9 @@ public class AMQMessage * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered * to a consumer */ - public void checkDeliveredToConsumer() throws NoConsumersException + public void checkDeliveredToConsumer() + throws + NoConsumersException { if (_immediate && !_deliveredToConsumer) @@ -600,14 +713,15 @@ public class AMQMessage } } - public MessagePublishInfo getMessagePublishInfo() throws AMQException + public MessagePublishInfo getMessagePublishInfo() + throws + AMQException { MessagePublishInfo pb; if (_transientMessageData != null) { pb = _transientMessageData.getMessagePublishInfo(); - } - else + } else { pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId); } @@ -630,13 +744,17 @@ public class AMQMessage } - /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */ + /** + * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). + */ public void setDeliveredToConsumer() { _deliveredToConsumer = true; } - private void deliver(StoreContext storeContext) throws AMQException + private void deliver(StoreContext storeContext) + throws + AMQException { // we get a reference to the destination queues now so that we can clear the // transient message data as quickly as possible @@ -650,7 +768,7 @@ public class AMQMessage // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(), - _transientMessageData.getContentHeaderBody()); + _transientMessageData.getContentHeaderBody()); // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery @@ -867,7 +985,9 @@ public class AMQMessage } - public void restoreTransientMessageData() throws AMQException + public void restoreTransientMessageData() + throws + AMQException { TransientMessageData transientMessageData = new TransientMessageData(); transientMessageData.setMessagePublishInfo(getMessagePublishInfo()); @@ -889,7 +1009,7 @@ public class AMQMessage // _taken + " by :" + _takenBySubcription; return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + - _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } public Subscription getDeliveredSubscription(AMQQueue queue) @@ -911,8 +1031,7 @@ public class AMQMessage } _rejectedBy.add(subscription); - } - else + } else { _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); } @@ -925,8 +1044,7 @@ public class AMQMessage if (rejected) // We have subscriptions that rejected this message { return _rejectedBy.contains(subscription); - } - else // This messasge hasn't been rejected yet. + } else // This messasge hasn't been rejected yet. { return rejected; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index ede55b3bbf..296e61bfa9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -76,4 +76,7 @@ public interface AMQMessageHandle void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException; long getArrivalTime(); + + // added by Arnaud + byte[] getMessagePayload(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0adf6153f8..fa3b34a634 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.text.MessageFormat; import java.util.List; +import java.util.Hashtable; +import java.util.Collection; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,6 +38,9 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exception.InternalErrorException; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.StorableQueue; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -47,9 +52,10 @@ import org.apache.qpid.server.virtualhost.VirtualHost; * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described * fully in RFC 006. */ -public class AMQQueue implements Managable, Comparable +public class AMQQueue implements Managable, Comparable, StorableQueue { + public static int s_queueID =0; public static final class ExistingExclusiveSubscription extends AMQException { @@ -76,15 +82,27 @@ public class AMQQueue implements Managable, Comparable private final AMQShortString _name; - /** null means shared */ + // The queueu ID + int _queueId; + // The list of enqueued messages. + Hashtable<Long, StorableMessage> _messages = new Hashtable<Long, StorableMessage>(); + + + /** + * null means shared + */ private final AMQShortString _owner; private final boolean _durable; - /** If true, this queue is deleted when the last subscriber is removed */ + /** + * If true, this queue is deleted when the last subscriber is removed + */ private final boolean _autoDelete; - /** Holds subscribers to the queue. */ + /** + * Holds subscribers to the queue. + */ private final SubscriptionSet _subscribers; private final SubscriptionFactory _subscriptionFactory; @@ -97,13 +115,19 @@ public class AMQQueue implements Managable, Comparable private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); - /** Manages message delivery. */ + /** + * Manages message delivery. + */ private final DeliveryManager _deliveryMgr; - /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ + /** + * Used to track bindings to exchanges so that on deletion they can easily be cancelled. + */ private final ExchangeBindings _bindings = new ExchangeBindings(this); - /** Executor on which asynchronous delivery will be carriedout where required */ + /** + * Executor on which asynchronous delivery will be carriedout where required + */ private final Executor _asyncDelivery; private final AMQQueueMBean _managedObject; @@ -111,27 +135,39 @@ public class AMQQueue implements Managable, Comparable private final VirtualHost _virtualHost; - /** max allowed size(KB) of a single message */ + /** + * max allowed size(KB) of a single message + */ @Configured(path = "maximumMessageSize", defaultValue = "0") public long _maximumMessageSize; - /** max allowed number of messages on a queue. */ + /** + * max allowed number of messages on a queue. + */ @Configured(path = "maximumMessageCount", defaultValue = "0") public long _maximumMessageCount; - /** max queue depth for the queue */ + /** + * max queue depth for the queue + */ @Configured(path = "maximumQueueDepth", defaultValue = "0") public long _maximumQueueDepth; - /** maximum message age before alerts occur */ + /** + * maximum message age before alerts occur + */ @Configured(path = "maximumMessageAge", defaultValue = "0") public long _maximumMessageAge; - /** the minimum interval between sending out consequetive alerts of the same type */ + /** + * the minimum interval between sending out consequetive alerts of the same type + */ @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") public long _minimumAlertRepeatGap; - /** total messages received by the queue since startup. */ + /** + * total messages received by the queue since startup. + */ public AtomicLong _totalMessagesReceived = new AtomicLong(); public int compareTo(Object o) @@ -141,26 +177,29 @@ public class AMQQueue implements Managable, Comparable public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) - throws AMQException + throws + AMQException { this(name, durable, owner, autoDelete, virtualHost, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory()); + AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory()); } protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost, SubscriptionSet subscribers) - throws AMQException + throws + AMQException { this(name, durable, owner, autoDelete, virtualHost, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory()); + AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory()); } protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory) - throws AMQException + throws + AMQException { if (name == null) { @@ -183,9 +222,12 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + _queueId = s_queueID++; } - private AMQQueueMBean createMBean() throws AMQException + private AMQQueueMBean createMBean() + throws + AMQException { try { @@ -222,13 +264,17 @@ public class AMQQueue implements Managable, Comparable return _autoDelete; } - /** @return no of messages(undelivered) on the queue. */ + /** + * @return no of messages(undelivered) on the queue. + */ public int getMessageCount() { return _deliveryMgr.getQueueMessageCount(); } - /** @return List of messages(undelivered) on the queue. */ + /** + * @return List of messages(undelivered) on the queue. + */ public List<AMQMessage> getMessagesOnTheQueue() { return _deliveryMgr.getMessages(); @@ -239,7 +285,6 @@ public class AMQQueue implements Managable, Comparable * * @param fromMessageId * @param toMessageId - * * @return List of messages */ public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId) @@ -254,7 +299,6 @@ public class AMQQueue implements Managable, Comparable /** * @param messageId - * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) @@ -321,7 +365,9 @@ public class AMQQueue implements Managable, Comparable _deliveryMgr.processAsync(_asyncDelivery); } - /** @return MBean object associated with this Queue */ + /** + * @return MBean object associated with this Queue + */ public ManagedObject getManagedObject() { return _managedObject; @@ -379,34 +425,58 @@ public class AMQQueue implements Managable, Comparable } - /** Removes the AMQMessage from the top of the queue. */ - public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException + /** + * Removes the AMQMessage from the top of the queue. + */ + public synchronized void deleteMessageFromTop(StoreContext storeContext) + throws + AMQException { _deliveryMgr.removeAMessageFromTop(storeContext); } - /** removes all the messages from the queue. */ - public synchronized long clearQueue(StoreContext storeContext) throws AMQException + /** + * removes all the messages from the queue. + */ + public synchronized long clearQueue(StoreContext storeContext) + throws + AMQException { return _deliveryMgr.clearAllMessages(storeContext); } - public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException + public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) + throws + AMQException { exchange.registerQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) { - _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments); + try + { + _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments); + } catch (InternalErrorException e) + { + throw new AMQException("Problem binding queue ", e); + } } _bindings.addBinding(routingKey, arguments, exchange); } - public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException + public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) + throws + AMQException { exchange.deregisterQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) { - _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments); + try + { + _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments); + } catch (InternalErrorException e) + { + throw new AMQException("problem unbinding queue", e); + } } _bindings.remove(routingKey, arguments, exchange); } @@ -414,7 +484,8 @@ public class AMQQueue implements Managable, Comparable public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal, boolean exclusive) - throws AMQException + throws + AMQException { if (incrementSubscriberCount() > 1) { @@ -422,15 +493,13 @@ public class AMQQueue implements Managable, Comparable { decrementSubscriberCount(); throw EXISTING_EXCLUSIVE; - } - else if (exclusive) + } else if (exclusive) { decrementSubscriberCount(); throw EXISTING_SUBSCRIPTION; } - } - else if (exclusive) + } else if (exclusive) { setExclusive(true); } @@ -438,11 +507,11 @@ public class AMQQueue implements Managable, Comparable if (_logger.isDebugEnabled()) { _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " + - "consumer tag {2} with {3}", ps, channel, consumerTag, this)); + "consumer tag {2} with {3}", ps, channel, consumerTag, this)); } Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, - filters, noLocal, this); + filters, noLocal, this); if (subscription.filtersMessages()) { @@ -477,22 +546,24 @@ public class AMQQueue implements Managable, Comparable } - public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) + throws + AMQException { if (_logger.isDebugEnabled()) { _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, - this)); + this)); } Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, - ps, - consumerTag))) - == null) + ps, + consumerTag))) + == null) { throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag + - " and protocol session key " + ps.getKey() + " not registered with queue " + this); + " and protocol session key " + ps.getKey() + " not registered with queue " + this); } removedSubscription.close(); @@ -524,26 +595,28 @@ public class AMQQueue implements Managable, Comparable } - public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException + public int delete(boolean checkUnused, boolean checkEmpty) + throws + AMQException { if (checkUnused && !_subscribers.isEmpty()) { _logger.info("Will not delete " + this + " as it is in use."); return 0; - } - else if (checkEmpty && _deliveryMgr.hasQueuedMessages()) + } else if (checkEmpty && _deliveryMgr.hasQueuedMessages()) { _logger.info("Will not delete " + this + " as it is not empty."); return 0; - } - else + } else { delete(); return _deliveryMgr.getQueueMessageCount(); } } - public void delete() throws AMQException + public void delete() + throws + AMQException { if (!_deleted.getAndSet(true)) { @@ -559,7 +632,9 @@ public class AMQQueue implements Managable, Comparable } } - protected void autodelete() throws AMQException + protected void autodelete() + throws + AMQException { if (_logger.isDebugEnabled()) { @@ -568,7 +643,9 @@ public class AMQQueue implements Managable, Comparable delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException + public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) + throws + AMQException { //fixme not sure what this is doing. should we be passing deliverFirst through here? // This code is not used so when it is perhaps it should @@ -591,7 +668,9 @@ public class AMQQueue implements Managable, Comparable // return _deliveryMgr; // } - public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException + public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) + throws + AMQException { _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); try @@ -607,7 +686,9 @@ public class AMQQueue implements Managable, Comparable } } - void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException + void dequeue(StoreContext storeContext, AMQMessage msg) + throws + FailedDequeueException { try { @@ -637,7 +718,9 @@ public class AMQQueue implements Managable, Comparable return _subscribers; } - protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException + protected void updateReceivedMessageCount(AMQMessage msg) + throws + AMQException { if (!msg.isRedelivered()) { @@ -680,7 +763,9 @@ public class AMQQueue implements Managable, Comparable return "Queue(" + _name + ")@" + System.identityHashCode(this); } - public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException + public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) + throws + AMQException { return _deliveryMgr.performGet(session, channel, acks); } @@ -697,7 +782,9 @@ public class AMQQueue implements Managable, Comparable public static interface Task { - public void doTask(AMQQueue queue) throws AMQException; + public void doTask(AMQQueue queue) + throws + AMQException; } public void addQueueDeleteTask(Task task) @@ -729,4 +816,53 @@ public class AMQQueue implements Managable, Comparable { _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg); } + + //======================================================================== + // Interface StorableQueue + //======================================================================== + + public int getQueueID() + { + return _queueId; + } + + public void setQueueID(int id) + { + _queueId = id; + } + + public void dequeue(StorableMessage m) + { + _messages.remove(m.getMessageId()); + } + + public void enqueue(StorableMessage m) + { + _messages.put(m.getMessageId(), m); + } + + //======================================================================== + // Used by the Store + //======================================================================== + + /** + * Get the list of enqueud messages + * + * @return The list of enqueud messages + */ + public Collection<StorableMessage> getAllEnqueuedMessages() + { + return _messages.values(); + } + + /** + * Get the enqueued message identified by messageID + * + * @param messageId the id of the enqueued message to recover + * @return The enqueued message with the specified id + */ + public StorableMessage getEnqueuedMessage(long messageId) + { + return _messages.get(messageId); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 630186991b..1d9f56669e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import java.util.LinkedList; import java.util.List; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -46,11 +47,18 @@ public class InMemoryMessageHandle implements AMQMessageHandle private long _arrivalTime; + // the message payload + private byte[] _payload; + // a buffer to write the payload + ByteBuffer _buffer; + public InMemoryMessageHandle() { } - public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException + public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) + throws + AMQException { return _contentHeaderBody; } @@ -60,28 +68,36 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _contentBodies.size(); } - public long getBodySize(StoreContext context, Long messageId) throws AMQException + public long getBodySize(StoreContext context, Long messageId) + throws + AMQException { return getContentHeaderBody(context, messageId).bodySize; } - public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException + public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) + throws + AMQException, + IllegalArgumentException { if (index > _contentBodies.size() - 1) { throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + - (_contentBodies.size() - 1)); + (_contentBodies.size() - 1)); } return _contentBodies.get(index); } public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) - throws AMQException + throws + AMQException { _contentBodies.add(contentBody); } - public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException + public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) + throws + AMQException { return _messagePublishInfo; } @@ -97,40 +113,50 @@ public class InMemoryMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(StoreContext context, Long messageId) throws AMQException + public boolean isPersistent(StoreContext context, Long messageId) + throws + AMQException { //todo remove literal values to a constant file such as AMQConstants in common ContentHeaderBody chb = getContentHeaderBody(context, messageId); return chb.properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; + ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; } /** * This is called when all the content has been received. + * * @param messagePublishInfo * @param contentHeaderBody * @throws AMQException */ public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) - throws AMQException + throws + AMQException { _messagePublishInfo = messagePublishInfo; _contentHeaderBody = contentHeaderBody; _arrivalTime = System.currentTimeMillis(); } - public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + public void removeMessage(StoreContext storeContext, Long messageId) + throws + AMQException { // NO OP } - public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException + public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) + throws + AMQException { // NO OP } - public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException + public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) + throws + AMQException { // NO OP } @@ -140,4 +166,24 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _arrivalTime; } + + // added by Arnaud + public byte[] getMessagePayload() + { + if (_payload == null) + { + int bodySize = (int) _contentHeaderBody.bodySize; + _buffer = ByteBuffer.allocate(bodySize); + _payload = new byte[bodySize]; + for (ContentChunk contentBody : _contentBodies) + { + int chunkSize = contentBody.getSize(); + byte[] chunk = new byte[chunkSize]; + contentBody.getData().get(chunk); + _buffer.put(chunk); + } + _buffer.get(_payload); + } + return _payload; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java index 94ab935115..69aaffa907 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.messageStore.StorableMessage; /** * Constructs a message handle based on the publish body, the content header and the queue to which the message @@ -31,12 +32,13 @@ import org.apache.qpid.server.store.MessageStore; public class MessageHandleFactory { - public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent) + public AMQMessageHandle createMessageHandle(MessageStore store, StorableMessage m, boolean persistent) { // just hardcoded for now if (persistent) { - return new WeakReferenceMessageHandle(store); + // return new WeakReferenceMessageHandle(store); + return new StorableMessageHandle(store, m); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index 3b1b5acf3c..ed2101fd75 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.messageStore.StorableQueue; public interface QueueRegistry diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java new file mode 100644 index 0000000000..5978e3b10d --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java @@ -0,0 +1,215 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import javax.transaction.xa.Xid; +import java.util.List; +import java.util.LinkedList; +import java.nio.ByteBuffer; + +/** + * Created by Arnaud Simon + * Date: 25-Apr-2007 + * Time: 14:26:34 + */ +public class StorableMessageHandle implements AMQMessageHandle +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(StorableMessageHandle.class); + + //======================================================================== + // Instance Fields + //======================================================================== + // the message store + final private MessageStore _messageStore; + // A reference on the message itself + final private StorableMessage _message; + // the message payload + private byte[] _payload; + // a buffer to write the payload + ByteBuffer _buffer; + // the ContentHeaderBody + private ContentHeaderBody _contentHeaderBody; + // the arrival time + private long _arrivalTime; + // Specify if this messag is redelivered + private boolean _redelivered; + // MessagePublishInfo + private MessagePublishInfo _messagePublishInfo; + // list of chunks + private List<ContentChunk> _chunks = new LinkedList<ContentChunk>(); + + //======================================================================== + // Constructors + //======================================================================== + + public StorableMessageHandle(MessageStore messageStore, StorableMessage message) + { + _messageStore = messageStore; + _message = message; + } + + //======================================================================== + // Interface AMQMessageHandle + //======================================================================== + public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) + throws + AMQException + { + return _contentHeaderBody; + } + + public int getBodyCount(StoreContext context, Long messageId) + throws + AMQException + { + return _chunks.size(); + } + + public long getBodySize(StoreContext context, Long messageId) + throws + AMQException + { + return _payload.length; + } + + public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) + throws + IllegalArgumentException, + AMQException + { + return _chunks.get(index); + } + + public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) + throws + AMQException + { + _chunks.add(contentBody); + // if rquired this message can be added to the store + //_messageStore.appendContent(_message, _payload, 0, 10); + + } + + public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) + throws + AMQException + { + return _messagePublishInfo; + } + + public boolean isRedelivered() + { + return _redelivered; + } + + public void setRedelivered(boolean redelivered) + { + _redelivered = redelivered; + } + + public boolean isPersistent(StoreContext context, Long messageId) + throws + AMQException + { + return _contentHeaderBody.properties instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + } + + public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, + MessagePublishInfo messagePublishInfo, + ContentHeaderBody contentHeaderBody) + throws + AMQException + { + _contentHeaderBody = contentHeaderBody; + _arrivalTime = System.currentTimeMillis(); + _messagePublishInfo = messagePublishInfo; + } + + public void removeMessage(StoreContext storeContext, Long messageId) + throws + AMQException + { + // This is already handled by the store but we can possibly do: + // _messageStore.destroy(_message); + } + + public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) + throws + AMQException + { + try + { + _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue); + } catch (Exception e) + { + throw new AMQException("PRoblem during message enqueue", e); + } + } + + public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) + throws + AMQException + { + try + { + _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue); + } catch (Exception e) + { + throw new AMQException("PRoblem during message dequeue", e); + } + } + + public long getArrivalTime() + { + return _arrivalTime; + } + + public byte[] getMessagePayload() + { + if (_payload == null) + { + int bodySize = (int) _contentHeaderBody.bodySize; + _buffer = ByteBuffer.allocate(bodySize); + _payload = new byte[bodySize]; + for (ContentChunk contentBody : _chunks) + { + int chunkSize = contentBody.getSize(); + byte[] chunk = new byte[chunkSize]; + contentBody.getData().get(chunk); + _buffer.put(chunk); + } + _buffer.get(_payload); + } + return _payload; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 373a64e2eb..64fb6c15d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -224,4 +224,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle return _arrivalTime; } + + // added by Arnaud + public byte[] getMessagePayload() + { + return new byte[0]; //To change body of implemented methods use File | Settings | File Templates. + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java new file mode 100644 index 0000000000..0d25ab0e32 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java @@ -0,0 +1,59 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.exception.*; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 25-Apr-2007 + * Time: 17:13:07 + */ +public class DequeueRecord implements TransactionRecord +{ + + + public void commit(MessageStore store, Xid xid) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException + { + // do nothing + } + + public void rollback(MessageStore store) + throws + InternalErrorException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void prepare(MessageStore store) + throws + InternalErrorException + { + //To change body of implemented methods use File | Settings | File Templates. + } +} + diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java new file mode 100644 index 0000000000..eff623ca7c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java @@ -0,0 +1,248 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.exception.InternalErrorException; +import org.apache.qpid.server.exception.InvalidXidException; +import org.apache.log4j.Logger; + +import javax.transaction.xa.Xid; +import java.util.List; +import java.util.LinkedList; + +/** + * Created by Arnaud Simon + * Date: 25-Apr-2007 + * Time: 15:58:07 + */ +public class DistributedTransactionalContext implements TransactionalContext +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(DistributedTransactionalContext.class); + + //======================================================================== + // Instance Fields + //======================================================================== + // the message store + final private MessageStore _messageStore; + // The transaction manager + final private TransactionManager _transactionManager; + // the store context + final private StoreContext _storeContext; + // the returned messages + final private List<RequiredDeliveryException> _returnMessages; + // for generating xids + private byte[] _txId = ("txid").getBytes(); + private int _count = 0; + + public DistributedTransactionalContext(TransactionManager transactionManager, MessageStore messageStore, StoreContext storeContext, + List<RequiredDeliveryException> returnMessages) + { + _messageStore = messageStore; + _storeContext = storeContext; + _returnMessages = returnMessages; + _transactionManager = transactionManager; + } + + public void beginTranIfNecessary() + throws + AMQException + { + // begin the transaction and pass the XID through the context + Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId); + try + { + _transactionManager.begin(xid); + _storeContext.setPayload(xid); + } catch (Exception e) + { + throw new AMQException("Problem during transaction begin", e); + } + } + + public void commit() + throws + AMQException + { + try + { + _transactionManager.commit_one_phase((Xid) _storeContext.getPayload()); + } catch (Exception e) + { + throw new AMQException("Problem during transaction commit", e); + } + } + + public void rollback() + throws + AMQException + { + try + { + _transactionManager.rollback((Xid) _storeContext.getPayload()); + } catch (Exception e) + { + throw new AMQException("Problem during transaction rollback", e); + } + } + + public void messageFullyReceived(boolean persistent) + throws + AMQException + { + // The message is now fully received, we can stage it before enqueued if necessary + } + + public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) + throws + AMQException + { + try + { + //The message has been delivered to the queues + message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue); + // add a record in the transaction + _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, message, queue, deliverFirst)); + } catch (Exception e) + { + throw new AMQException("Problem during transaction rollback", e); + } + } + + public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, + boolean multiple, + final UnacknowledgedMessageMap unacknowledgedMessageMap) + throws + AMQException + { + if (multiple) + { + if (deliveryTag == 0) + { + //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, + // tells the server to acknowledge all outstanding mesages. + _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + + unacknowledgedMessageMap.size()); + unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) + throws + AMQException + { + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + message.message.getMessageId()); + } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + dequeue(message); + return false; + } + + public void visitComplete() + { + unacknowledgedMessageMap.clear(); + } + }); + } else + { + if (!unacknowledgedMessageMap.contains(deliveryTag)) + { + throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); + } + + LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); + unacknowledgedMessageMap.drainTo(acked, deliveryTag); + for (UnacknowledgedMessage msg : acked) + { + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + msg.message.getMessageId()); + } + //Message has been ack so discard it. This will dequeue and decrement the reference. + dequeue(msg); + } + } + } else + { + UnacknowledgedMessage msg; + msg = unacknowledgedMessageMap.remove(deliveryTag); + + if (msg == null) + { + _log.info("Single ack on delivery tag " + deliveryTag); + throw new AMQException("Single ack on delivery tag " + deliveryTag); + } + + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + msg.message.getMessageId()); + } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + dequeue(msg); + + if (_log.isDebugEnabled()) + { + _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + + msg.message.getMessageId()); + } + } + } + + private void dequeue(UnacknowledgedMessage message) + throws + AMQException + { + // Dequeue the message from the strore + message.discard(_storeContext); + // Add a record + try + { + _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new DequeueRecord()); + } catch (Exception e) + { + throw new AMQException("Problem during message dequeue", e); + } + } + + + public void messageProcessed(AMQProtocolSession protocolSession) + throws + AMQException + { + // The message has been sent + } + + public StoreContext getStoreContext() + { + return _storeContext; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java new file mode 100644 index 0000000000..cdf209fb12 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java @@ -0,0 +1,79 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.exception.*; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.AMQException; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 25-Apr-2007 + * Time: 17:01:06 + */ +public class EnqueueRecord implements TransactionRecord +{ + private final StoreContext _storeContext; + private final AMQMessage _msg; + private final AMQQueue _queue; + private final boolean _first; + + EnqueueRecord(StoreContext storeContext, AMQMessage msg, AMQQueue q, boolean firsr) + { + _storeContext = storeContext; + _msg = msg; + _queue = q; + _first = firsr; + } + + public void commit(MessageStore store, Xid xid) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException + { + try + { + _queue.process(_storeContext, _msg, _first); + } catch (AMQException e) + { + throw new InternalErrorException(e); + } + } + + public void rollback(MessageStore store) + throws + InternalErrorException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void prepare(MessageStore store) + throws + InternalErrorException + { + //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java new file mode 100644 index 0000000000..a132bcefe6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java @@ -0,0 +1,125 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.exception.*; + +import javax.transaction.xa.Xid; +import java.util.Set; + +/** + * Created by Arnaud Simon + * Date: 02-May-2007 + * Time: 08:41:33 + */ +public class MemoryTransactionManager implements TransactionManager +{ + + public XAFlag begin(Xid xid) + throws + InternalErrorException, + InvalidXidException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public XAFlag prepare(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public XAFlag rollback(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public XAFlag commit(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException, + NotPreparedException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public XAFlag commit_one_phase(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void forget(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void setTimeout(Xid xid, long timeout) + throws + InternalErrorException, + UnknownXidException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public long getTimeout(Xid xid) + throws + InternalErrorException, + UnknownXidException + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public Set<Xid> recover(boolean startscan, boolean endscan) + throws + InternalErrorException, + CommandInvalidException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void HeuristicOutcome(Xid xid) + throws + UnknownXidException, + InternalErrorException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Transaction getTransaction(Xid xid) + throws + UnknownXidException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 181dfa3a80..857fb350a0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -31,7 +31,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.NoConsumersException; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.store.StoreContext; /** @author Apache Software Foundation */ @@ -74,7 +74,7 @@ public class NonTransactionalContext implements TransactionalContext { if (!_inTran) { - _messageStore.beginTran(_storeContext); + // _messageStore.beginTran(_storeContext); _inTran = true; } } @@ -212,7 +212,7 @@ public class NonTransactionalContext implements TransactionalContext { if (persistent) { - _messageStore.commitTran(_storeContext); + // _messageStore.commitTran(_storeContext); _inTran = false; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java new file mode 100644 index 0000000000..cd2f619f7e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +/** + * Created by Arnaud Simon + * Date: 25-Apr-2007 + * Time: 14:08:39 + */ +public interface Transaction +{ + + /** + * Add an abstract record to this tx. + * + * @param record The record to be added + */ + public void addRecord(TransactionRecord record); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java new file mode 100644 index 0000000000..8047236985 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.exception.*; + +import javax.transaction.xa.Xid; +import java.util.Set; + +/** + * Created by Arnaud Simon + * Date: 29-Mar-2007 + * Time: 13:29:31 + */ +public interface TransactionManager +{ + /** + * Begin a transaction branch identified by Xid + * + * @param xid The xid of the branch to begin + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution. + * <li> <code>XAFlag.rbrollback</code>: The transaction branch was marked rollback-only for an unspecified reason. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws InvalidXidException The Xid is invalid + */ + public XAFlag begin(Xid xid) + throws + InternalErrorException, + InvalidXidException; + + /** + * Prepare the transaction branch identified by Xid + * + * @param xid The xid of the branch to prepare + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution. + * <li> <code>XAFlag.rdonly</code>: The transaction branch was read-only and has been committed. + * <li> <code>XAFlag.rbrollback</code>: The transaction branch was marked rollback-only for an unspeci?ed reason. + * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Prepare has been call in an improper context + * @throws UnknownXidException The Xid is unknown + */ + public XAFlag prepare(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException; + + /** + * Rollback the transaction branch identified by Xid + * + * @param xid The xid of the branch to rollback + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution, + * <li> <code>XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed. + * <li> <code>XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed. + * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back. + * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back. + * <li> <code>XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason. + * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Rollback has been call in an improper context + * @throws UnknownXidException The Xid is unknown + */ + public XAFlag rollback(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException; + + /** + * Commit the transaction branch identified by Xid + * + * @param xid The xid of the branch to commit + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution, + * <li> <code>XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed. + * <li> <code>XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed. + * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back. + * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Commit has been call in an improper context + * @throws UnknownXidException The Xid is unknown + * @throws NotPreparedException The branch was not prepared prior to commit + */ + public XAFlag commit(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException, + NotPreparedException; + + /** + * One phase commit the transaction branch identified by Xid + * + * @param xid The xid of the branch to one phase commit + * @return <ul> + * <li> <code>XAFlag.ok</code>: Normal execution, + * <li> <code>XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed. + * <li> <code>XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed. + * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back. + * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back. + * <li> <code>XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason. + * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long. + * </ul> + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Commit has been call in an improper context + * @throws UnknownXidException The Xid is unknown + */ + public XAFlag commit_one_phase(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException; + + /** + * Forget about the transaction branch identified by Xid + * + * @param xid The xid of the branch to forget + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Forget has been call in an improper context + * @throws UnknownXidException The Xid is unknown + */ + public void forget(Xid xid) + throws + InternalErrorException, + CommandInvalidException, + UnknownXidException; + + /** + * Set the transaction branch timeout value in seconds + * + * @param xid The xid of the branch to set timeout + * @param timeout Timeout value in seconds + * @throws InternalErrorException In case of internal problem + * @throws UnknownXidException The Xid is unknown + */ + public void setTimeout(Xid xid, long timeout) + throws + InternalErrorException, + UnknownXidException; + + /** + * Get the transaction branch timeout + * + * @param xid The xid of the branch to get the timeout from + * @return The timeout associated with the branch identified with xid + * @throws InternalErrorException In case of internal problem + * @throws UnknownXidException The Xid is unknown + */ + public long getTimeout(Xid xid) + throws + InternalErrorException, + UnknownXidException; + + /** + * Get a set of Xids the RM has prepared or heuristically completed + * + * @param startscan Indicates that recovery scan should start + * @param endscan Indicates that the recovery scan should end after returning the Xids + * @return Set of Xids the RM has prepared or heuristically completed + * @throws InternalErrorException In case of internal problem + * @throws CommandInvalidException Recover has been call in an improper context + */ + public Set<Xid> recover(boolean startscan, boolean endscan) + throws + InternalErrorException, + CommandInvalidException; + + + /** + * An error happened (for example the channel has been abruptly closed) + * with this Xid, TM must make a heuristical decision. + * + * @param xid The Xid of the transaction branch to be heuristically completed + * @throws UnknownXidException The Xid is unknown + * @throws InternalErrorException In case of internal problem + */ + public void HeuristicOutcome(Xid xid) + throws + UnknownXidException, + InternalErrorException; + + /** + * Get the Transaction corresponding to the provided Xid + * @param xid The Xid of the transaction to ger + * @return The transaction with the provided Xid + * @throws UnknownXidException The Xid is unknown + */ + public Transaction getTransaction(Xid xid) + throws + UnknownXidException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java new file mode 100644 index 0000000000..3f6f1d6b8e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.exception.*; +import org.apache.qpid.server.messageStore.MessageStore; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 25-Apr-2007 + * Time: 14:12:17 + */ +public interface TransactionRecord +{ + /** + * Commit this record. + * + * @param store the store to be used during commit + * @param xid the xid of the tx branch + * @throws org.apache.qpid.server.exception.InternalErrorException in case of internal problem + * @throws org.apache.qpid.server.exception.QueueDoesntExistException the queue does not exist + * @throws org.apache.qpid.server.exception.InvalidXidException the xid is invalid + * @throws org.apache.qpid.server.exception.UnknownXidException the xid is unknonw + * @throws org.apache.qpid.server.exception.MessageDoesntExistException the message does not exist + */ + public abstract void commit(MessageStore store, Xid xid) + throws + InternalErrorException, + QueueDoesntExistException, + InvalidXidException, + UnknownXidException, + MessageDoesntExistException; + + /** + * rollback this record + * + * @param store the store to be used + * @throws InternalErrorException In case of internal error + */ + public abstract void rollback(MessageStore store) + throws + InternalErrorException; + + /** + * Prepare this record + * + * @param store the store to be used + * @throws InternalErrorException In case of internal error + */ + public abstract void prepare(MessageStore store) + throws + InternalErrorException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java b/java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java new file mode 100644 index 0000000000..8214ab7dac --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java @@ -0,0 +1,50 @@ +package org.apache.qpid.server.txn; + +/** + * Created by Arnaud Simon + * Date: 29-Mar-2007 + * Time: 14:57:01 + */ +public enum XAFlag +{ + rbrollback(1, "XA-RBROLLBACK", "The rollback was caused by an unspecified reason"), + rbtimeout(2, "XA-RBTIMEOUT", "The transaction branch took too long"), + heurhaz(3, "XA-HEURHAZ", "The transaction branch may have been heuristically completed"), + heurcom(4, "XA-HEURCOM", "The transaction branch has been heuristically committed"), + heurrb(5, "XA-HEURRB", "The transaction branch has been heuristically rolled back"), + heurmix(6, "XA-HEURMIX", "The transaction branch has been heuristically committed and rolled back"), + rdonly(7, "XA-RDONLY", "The transaction branch was read-only and has been committed"), + ok(8, "XA-OK", "Normal execution"); + + private final int _code; + + private final String _name; + + private final String _description; + + XAFlag(int code, String name, String description) + { + _code = code; + _name = name; + _description = description; + } + + //============================================== + // Getter methods + //============================================== + + public int getCode() + { + return _code; + } + + public String getName() + { + return _name; + } + + public String getDescription() + { + return _description; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java b/java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java new file mode 100644 index 0000000000..91db1d97c0 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java @@ -0,0 +1,210 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + + +import org.apache.log4j.Logger; + +import javax.transaction.xa.Xid; + +/** + * Created by Arnaud Simon + * Date: 03-Apr-2007 + * Time: 20:32:55 + */ +public class XidImpl implements Xid +{ + //======================================================================== + // Static Constants + //======================================================================== + // The logger for this class + private static final Logger _log = Logger.getLogger(XidImpl.class); + + //======================================================================== + // Instance Fields + //======================================================================== + + //the transaction branch identifier part of XID as an array of bytes + private byte[] m_branchQualifier; + + // the format identifier part of the XID. + private int m_formatID; + + // the global transaction identifier part of XID as an array of bytes. + private byte[] m_globalTransactionID; + + //======================================================================== + // Constructor(s) + //======================================================================== + + /** + * Create new Xid. + */ + public XidImpl() + { + + } + + /** + * Create new XidImpl from an existing Xid. + * <p/> + * This is usually called when an application server provides some implementation + * of the Xid interface and we need to cast this into our own XidImpl. + * + * @param xid the xid to cloning + */ + public XidImpl(Xid xid) + { + if (_log.isDebugEnabled()) + { + _log.debug("Cloning Xid"); + } + m_branchQualifier = xid.getBranchQualifier(); + m_formatID = xid.getFormatId(); + m_globalTransactionID = xid.getGlobalTransactionId(); + } + + /** + * Create a new Xid. + * + * @param branchQualifier The transaction branch identifier part of XID as an array of bytes. + * @param format The format identifier part of the XID. + * @param globalTransactionID The global transaction identifier part of XID as an array of bytes. + */ + public XidImpl(byte[] branchQualifier, int format, byte[] globalTransactionID) + { + if (_log.isDebugEnabled()) + { + _log.debug("creating Xid"); + } + m_branchQualifier = branchQualifier; + m_formatID = format; + m_globalTransactionID = globalTransactionID; + } + +//======================================================================== + + // Xid interface implementation + //======================================================================== + /** + * Format identifier. O means the OSI CCR format. + * + * @return Global transaction identifier. + */ + public byte[] getGlobalTransactionId() + { + return m_globalTransactionID; + } + + /** + * Obtain the transaction branch identifier part of XID as an array of bytes. + * + * @return Branch identifier part of XID. + */ + public byte[] getBranchQualifier() + { + return m_branchQualifier; + } + + /** + * Obtain the format identifier part of the XID. + * + * @return Format identifier. O means the OSI CCR format. + */ + public int getFormatId() + { + return m_formatID; + } + +//======================================================================== +// Object operations +//======================================================================== + + /** + * Indicates whether some other Xid is "equal to" this one. + * <p/> + * Two Xids are equal if and only if their three elementary parts are equal + * + * @param o the object to compare this <code>XidImpl</code> against. + * @return code>true</code> if the <code>XidImpl</code> are equal; <code>false</code> otherwise. + */ + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o instanceof XidImpl) + { + XidImpl other = (XidImpl) o; + if (m_formatID == other.getFormatId()) + { + if (m_branchQualifier.length == other.getBranchQualifier().length) + { + for (int i = 0; i < m_branchQualifier.length; i++) + { + if (m_branchQualifier[i] != other.getBranchQualifier()[i]) + { + return false; + } + } + + if (m_globalTransactionID.length == other.getGlobalTransactionId().length) + { + for (int i = 0; i < m_globalTransactionID.length; i++) + { + if (m_globalTransactionID[i] != other.getGlobalTransactionId()[i]) + { + return false; + } + } + // everithing is equal + return true; + } + } + } + } + return false; + } + + /** + * Returns a hash code for this Xid. + * <p/> + * As this object is used as a key entry in a hashMap it is necessary to provide an implementation + * of hashcode in order to fulfill the following aspect of the general contract of + * {@link Object#hashCode()} that is: + * <ul> + * <li>If two objects are equal according to the <tt>equals(Object)</tt> + * method, then calling the <code>hashCode</code> method on each of + * the two objects must produce the same integer result. + * </ul> + * <p/> + * The hash code for a + * <code>XidImpl</code> object is computed as + * <blockquote><pre> + * hashcode( globalTransactionID ) + hashcode( branchQualifier ) + formatID + * </pre></blockquote> + * + * @return a hash code value for this object. + */ + public int hashCode() + { + return (new String(m_globalTransactionID)).hashCode() + (new String(m_branchQualifier)).hashCode() + m_formatID; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 150b98b424..f7c578d9a0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -59,7 +59,8 @@ public class NullApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { - _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore"); + _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.MemoryMessageStore"); + _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.MemoryTransactionManager"); Properties users = new Properties(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index b5c59dbbb7..7e329a5274 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -1,249 +1,272 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-
-public class VirtualHost implements Accessable
-{
- private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
- private final String _name;
-
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private MessageStore _messageStore;
-
- protected VirtualHostMBean _virtualHostMBean;
-
- private AMQBrokerManagerMBean _brokerMBean;
-
- private AuthenticationManager _authenticationManager;
-
- private AccessManager _accessManager;
-
-
- public void setAccessableName(String name)
- {
- _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
- + name + ") ignored remains :" + getAccessableName());
- }
-
- public String getAccessableName()
- {
- return _name;
- }
-
-
- /**
- * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
- {
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, "VirtualHost");
- }
-
- public String getObjectInstanceName()
- {
- return _name.toString();
- }
-
- public String getName()
- {
- return _name.toString();
- }
-
- public VirtualHost getVirtualHost()
- {
- return VirtualHost.this;
- }
-
-
- } // End of MBean class
-
-
- public VirtualHost(String name, MessageStore store) throws Exception
- {
- this(name, null, store);
- }
-
- public VirtualHost(String name, Configuration hostConfig) throws Exception
- {
- this(name, hostConfig, null);
- }
-
- private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
- {
- _name = name;
-
- _virtualHostMBean = new VirtualHostMBean();
- // This isn't needed to be registered
- //_virtualHostMBean.register();
-
- _queueRegistry = new DefaultQueueRegistry(this);
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
-
- if (store != null)
- {
- _messageStore = store;
- }
- else
- {
- if (hostConfig == null)
- {
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
- }
- initialiseMessageStore(hostConfig);
- }
-
- _exchangeRegistry.initialise();
-
- _logger.warn("VirtualHost authentication Managers require spec change to be operational.");
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
- _accessManager = new AccessManagerImpl(name, hostConfig);
-
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _brokerMBean.register();
- }
-
- private void initialiseMessageStore(Configuration config) throws Exception
- {
- String messageStoreClass = config.getString("store.class");
-
- Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
-
- if (!(o instanceof MessageStore))
- {
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
- " does not.");
- }
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", config);
- }
-
-
- public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
- {
- T instance;
- try
- {
- instance = instanceType.newInstance();
- }
- catch (Exception e)
- {
- _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
- }
- Configurator.configure(instance);
-
- return instance;
- }
-
-
- public String getName()
- {
- return _name;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public ApplicationRegistry getApplicationRegistry()
- {
- throw new UnsupportedOperationException();
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public AccessManager getAccessManager()
- {
- return _accessManager;
- }
-
- public void close() throws Exception
- {
- if (_messageStore != null)
- {
- _messageStore.close();
- }
- }
-
- public ManagedObject getBrokerMBean()
- {
- return _brokerMBean;
- }
-
- public ManagedObject getManagedObject()
- {
- return _virtualHostMBean;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import javax.management.NotCompliantMBeanException; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.txn.TransactionManager; +import org.apache.qpid.server.security.access.AccessManager; +import org.apache.qpid.server.security.access.AccessManagerImpl; +import org.apache.qpid.server.security.access.Accessable; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.messageStore.MessageStore; + +public class VirtualHost implements Accessable +{ + private static final Logger _logger = Logger.getLogger(VirtualHost.class); + + + private final String _name; + + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private MessageStore _messageStore; + + private TransactionManager _transactionManager; + + protected VirtualHostMBean _virtualHostMBean; + + private AMQBrokerManagerMBean _brokerMBean; + + private AuthenticationManager _authenticationManager; + + private AccessManager _accessManager; + + + public void setAccessableName(String name) + { + _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" + + name + ") ignored remains :" + getAccessableName()); + } + + public String getAccessableName() + { + return _name; + } + + + /** + * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any + * implementaion of an Exchange MBean should extend this class. + */ + public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost + { + public VirtualHostMBean() throws NotCompliantMBeanException + { + super(ManagedVirtualHost.class, "VirtualHost"); + } + + public String getObjectInstanceName() + { + return _name.toString(); + } + + public String getName() + { + return _name.toString(); + } + + public VirtualHost getVirtualHost() + { + return VirtualHost.this; + } + + + } // End of MBean class + + + public VirtualHost(String name, MessageStore store) throws Exception + { + this(name, null, store); + } + + public VirtualHost(String name, Configuration hostConfig) throws Exception + { + this(name, hostConfig, null); + } + + private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception + { + _name = name; + + _virtualHostMBean = new VirtualHostMBean(); + // This isn't needed to be registered + //_virtualHostMBean.register(); + + _queueRegistry = new DefaultQueueRegistry(this); + _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeRegistry = new DefaultExchangeRegistry(this); + + if (store != null) + { + _messageStore = store; + } + else + { + if (hostConfig == null) + { + throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); + } + initialiseTransactionManager(hostConfig); + initialiseMessageStore(hostConfig); + } + + _exchangeRegistry.initialise(); + + _logger.warn("VirtualHost authentication Managers require spec change to be operational."); + _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); + + _accessManager = new AccessManagerImpl(name, hostConfig); + + _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); + _brokerMBean.register(); + } + + private void initialiseMessageStore(Configuration config) throws Exception + { + String messageStoreClass = config.getString("store.class"); + + Class clazz = Class.forName(messageStoreClass); + Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + " does not."); + } + _messageStore = (MessageStore) o; + _messageStore.configure(this, _transactionManager, "store", config); + } + + private void initialiseTransactionManager(Configuration config) throws Exception + { + String transactionManagerClass = config.getString("txn.class"); + Class clazz = Class.forName(transactionManagerClass); + Object o = clazz.newInstance(); + + if (!(o instanceof TransactionManager)) + { + throw new ClassCastException("Transaction Manager class must implement " + TransactionManager.class + ". Class " + clazz + + " does not."); + } + _transactionManager = (TransactionManager) o; + } + + + public <T> T getConfiguredObject(Class<T> instanceType, Configuration config) + { + T instance; + try + { + instance = instanceType.newInstance(); + } + catch (Exception e) + { + _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); + throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); + } + Configurator.configure(instance); + + return instance; + } + + + public String getName() + { + return _name; + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public ApplicationRegistry getApplicationRegistry() + { + throw new UnsupportedOperationException(); + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + + public TransactionManager getTransactionManager() + { + return _transactionManager; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public AccessManager getAccessManager() + { + return _accessManager; + } + + public void close() throws Exception + { + if (_messageStore != null) + { + _messageStore.close(); + } + } + + public ManagedObject getBrokerMBean() + { + return _brokerMBean; + } + + public ManagedObject getManagedObject() + { + return _virtualHostMBean; + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index a9496d0de1..a7e5c0d1d0 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -19,14 +19,16 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.messageStore.MemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionManager; +import org.apache.qpid.server.txn.MemoryTransactionManager; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.TestMinaProtocolSession; @@ -51,6 +53,8 @@ public class AMQQueueAlertTest extends TestCase private VirtualHost _virtualHost; private AMQMinaProtocolSession protocolSession = null; private MessageStore _messageStore = new MemoryMessageStore(); + private TransactionManager _txm = new MemoryTransactionManager(); + private StoreContext _storeContext = new StoreContext(); private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, null, @@ -170,7 +174,7 @@ public class AMQQueueAlertTest extends TestCase public void testQueueDepthAlertWithSubscribers() throws Exception { protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 2,_txm, _messageStore, null); protocolSession.addChannel(channel); // Create queue diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 551eb8f0a0..03137c9794 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -30,9 +30,11 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.TransactionManager; +import org.apache.qpid.server.txn.MemoryTransactionManager; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.messageStore.MemoryMessageStore; import javax.management.JMException; import java.util.LinkedList; @@ -47,6 +49,7 @@ public class AMQQueueMBeanTest extends TestCase private AMQQueue _queue; private AMQQueueMBean _queueMBean; private MessageStore _messageStore = new MemoryMessageStore(); + private TransactionManager _txm = new MemoryTransactionManager(); private StoreContext _storeContext = new StoreContext(); private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, null, @@ -80,7 +83,7 @@ public class AMQQueueMBeanTest extends TestCase TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 1,_txm, _messageStore, null); protocolSession.addChannel(channel); _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false); |
