diff options
Diffstat (limited to 'java/broker/src')
6 files changed, 108 insertions, 48 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 9fb3a5040b..4696ec4453 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -226,7 +226,7 @@ public class AMQChannel BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties; //fixme: fudge for QPID-677 properties.getHeaders().keySet(); - + properties.setUserId(protocolSession.getAuthorizedID().getName()); } @@ -378,7 +378,14 @@ public class AMQChannel { _txnContext.rollback(); unsubscribeAllConsumers(session); - requeue(); + try + { + requeue(); + } + catch (AMQException e) + { + _log.error("Caught AMQException whilst attempting to reque:" + e); + } setClosing(true); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 8ede553464..1a9dc6673a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -53,7 +53,7 @@ public class DefaultExchangeFactory implements ExchangeFactory { _exchangeClassMap.put(type.getName(), type); } - + public Collection<ExchangeType<? extends Exchange>> getRegisteredTypes() { return _exchangeClassMap.values(); @@ -75,6 +75,12 @@ public class DefaultExchangeFactory implements ExchangeFactory public void initialise(Configuration hostConfig) { + + if (hostConfig == null) + { + return; + } + for(Object className : hostConfig.getList("custom-exchanges.class-name")) { try diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index fa9d83cd7e..543e043bed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -165,7 +165,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter //fixme -- this can be null if (amqProtocolSession != null) { - amqProtocolSession.closeSession(); + try + { + amqProtocolSession.closeSession(); + } + catch (AMQException e) + { + _logger.error("Caught AMQException whilst closingSession:" + e); + } } } @@ -199,7 +206,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } else if (throwable instanceof IOException) { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); + _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 22fa0fab23..455983c6d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,15 +20,14 @@ */ package org.apache.qpid.server.registry; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.HashMap; +import java.util.Map; + /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. @@ -59,24 +58,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void run() { _logger.info("Shutting down application registries..."); - try - { - synchronized (ApplicationRegistry.class) - { - Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator(); - - while (keyIterator.hasNext()) - { - IApplicationRegistry instance = keyIterator.next(); - - instance.close(); - } - } - } - catch (Exception e) - { - _logger.error("Error shutting down message store: " + e, e); - } + removeAll(); } } @@ -116,6 +98,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } catch (Exception e) { + _logger.error("Error shutting down message store: " + e, e); } finally @@ -124,6 +107,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + public static void removeAll() + { + Object[] keys = _instanceMap.keySet().toArray(); + for (Object k : keys) + { + remove((Integer) k); + } + } protected ApplicationRegistry(Configuration configuration) { @@ -154,7 +145,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry catch (Exception e) { _logger.error("Error configuring application: " + e, e); - //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); + //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); throw new RuntimeException("Unable to create Application Registry", e); } } @@ -167,7 +158,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void close() throws Exception { - for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) + for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) { virtualHost.close(); } @@ -204,7 +195,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return instance; } - public static void setDefaultApplicationRegistry(String clazz) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 8ccb0be0a8..7a6e0b011f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,27 +20,26 @@ */ package org.apache.qpid.server.store; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; -/** - * A simple message store that stores the messages in a threadsafe structure in memory. - */ +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -54,6 +53,7 @@ public class MemoryMessageStore implements MessageStore protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap; private final AtomicLong _messageId = new AtomicLong(1); + private AtomicBoolean _closed = new AtomicBoolean(false); public void configure() { @@ -77,6 +77,7 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { + _closed.getAndSet(true); if (_metaDataMap != null) { _metaDataMap.clear(); @@ -89,8 +90,9 @@ public class MemoryMessageStore implements MessageStore } } - public void removeMessage(StoreContext context, Long messageId) + public void removeMessage(StoreContext context, Long messageId) throws AMQException { + checkNotClosed(); if (_log.isDebugEnabled()) { _log.debug("Removing message with id " + messageId); @@ -172,9 +174,10 @@ public class MemoryMessageStore implements MessageStore public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { + checkNotClosed(); List<ContentChunk> bodyList = _contentBodyMap.get(messageId); - if(bodyList == null && lastContentBody) + if (bodyList == null && lastContentBody) { _contentBodyMap.put(messageId, Collections.singletonList(contentBody)); } @@ -193,17 +196,28 @@ public class MemoryMessageStore implements MessageStore public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { + checkNotClosed(); _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { + checkNotClosed(); return _metaDataMap.get(messageId); } public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { + checkNotClosed(); List<ContentChunk> bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); } + + private void checkNotClosed() throws MessageStoreClosedException + { + if (_closed.get()) + { + throw new MessageStoreClosedException(); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java new file mode 100644 index 0000000000..3d1538c7eb --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java @@ -0,0 +1,36 @@ +package org.apache.qpid.server.store; + +import org.apache.qpid.AMQException;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * NOTE: this class currently extends AMQException but + * we should be using AMQExceptions internally in the code base for Protocol errors hence + * the message store interface should throw a different super class which this should be + * moved to reflect + */ +public class MessageStoreClosedException extends AMQException +{ + public MessageStoreClosedException() + { + super("Message store closed"); + } +} |
