From 9bac85e36d60df28fb6f86e3bbe9e3f46689aa04 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 27 Jun 2007 15:49:51 +0000 Subject: Merged revisions 550748-551121 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r550748 | ritchiem | 2007-06-26 10:20:17 +0100 (Tue, 26 Jun 2007) | 1 line Added xml file for logging during sustained tests ........ r550773 | rupertlssmith | 2007-06-26 12:03:04 +0100 (Tue, 26 Jun 2007) | 1 line Immediate and mandatory flag tests added. ........ r550849 | rupertlssmith | 2007-06-26 17:43:58 +0100 (Tue, 26 Jun 2007) | 1 line QPID-509 Mandatory messages not returned outside a transaction. They are now. ........ r551117 | ritchiem | 2007-06-27 11:51:34 +0100 (Wed, 27 Jun 2007) | 2 lines Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki. http://cwiki.apache.org/qpid/sustained-tests.html ........ r551118 | ritchiem | 2007-06-27 11:51:51 +0100 (Wed, 27 Jun 2007) | 1 line Added intelij files to ignore list ........ r551119 | ritchiem | 2007-06-27 11:55:34 +0100 (Wed, 27 Jun 2007) | 1 line POM update to add Apache content to built jars ........ r551120 | ritchiem | 2007-06-27 11:58:25 +0100 (Wed, 27 Jun 2007) | 1 line Updated default guest password as it was not correct. ........ r551121 | ritchiem | 2007-06-27 12:00:48 +0100 (Wed, 27 Jun 2007) | 1 line Added additional information to log message when available to aid the explination of a failed connection ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551207 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 25 ++-- .../qpid/server/exchange/DestNameExchange.java | 2 +- .../qpid/server/exchange/DestWildExchange.java | 94 ++++++++------ .../qpid/server/exchange/FanoutExchange.java | 71 +++++------ .../qpid/server/exchange/HeadersExchange.java | 2 +- .../server/protocol/AMQMinaProtocolSession.java | 139 ++++++++++----------- .../org/apache/qpid/server/queue/AMQQueue.java | 14 ++- 7 files changed, 183 insertions(+), 164 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 43a04dbfa1..28a9e85489 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,18 +20,9 @@ */ package org.apache.qpid.server; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; @@ -52,6 +43,16 @@ import org.apache.qpid.server.queue.Subscription; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class AMQChannel { public static final int DEFAULT_PREFETCH = 5000; @@ -208,7 +209,8 @@ public class AMQChannel _currentMessage.setPublisher(publisher); } - public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException + public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) + throws AMQException { if (_currentMessage == null) { @@ -230,6 +232,7 @@ public class AMQChannel // check and deliver if header says body length is zero if (contentHeaderBody.bodySize == 0) { + _txnContext.messageProcessed(protocolSession); _currentMessage = null; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 01242f90de..0dcceaddbb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -169,7 +169,7 @@ public class DestNameExchange extends AbstractExchange if (queues == null || queues.isEmpty()) { String msg = "Routing key " + routingKey + " is not known to " + this; - if (info.isMandatory()) + if (info.isMandatory() || info.isImmediate()) { throw new NoRouteException(msg, payload, null); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 222e341b1a..f6a95b5e55 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.server.exchange; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import java.util.LinkedList; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import javax.management.JMException; import javax.management.MBeanException; @@ -41,24 +46,21 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; public class DestWildExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap> _routingKey2queues = new ConcurrentHashMap>(); - // private ConcurrentHashMap _routingKey2queue = new ConcurrentHashMap(); + private ConcurrentHashMap> _routingKey2queues = + new ConcurrentHashMap>(); + // private ConcurrentHashMap _routingKey2queue = new ConcurrentHashMap(); private static final String TOPIC_SEPARATOR = "."; private static final String AMQP_STAR = "*"; private static final String AMQP_HASH = "#"; @@ -90,7 +92,7 @@ public class DestWildExchange extends AbstractExchange queueList.add(q.getName().toString()); } - Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])}; + Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) }; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -118,7 +120,6 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class - public AMQShortString getType() { return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; @@ -140,6 +141,7 @@ public class DestWildExchange extends AbstractExchange { queueList = _routingKey2queues.get(routingKey); } + if (!queueList.contains(queue)) { queueList.add(queue); @@ -165,8 +167,8 @@ public class DestWildExchange extends AbstractExchange for (int index = 0; index < size; index++) { - //if there are more levels - if (index + 1 < size) + // if there are more levels + if ((index + 1) < size) { if (_subscription.get(index).equals(AMQP_HASH)) { @@ -175,7 +177,7 @@ public class DestWildExchange extends AbstractExchange // we don't need #.# delete this one _subscription.remove(index); size--; - //redo this normalisation + // redo this normalisation index--; } @@ -186,7 +188,7 @@ public class DestWildExchange extends AbstractExchange _subscription.add(index + 1, _subscription.remove(index)); } } - }//if we have more levels + } // if we have more levels } StringBuilder sb = new StringBuilder(); @@ -211,9 +213,9 @@ public class DestWildExchange extends AbstractExchange List queues = getMatchedQueues(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag - if (queues == null || queues.size() == 0) + if ((queues == null) || queues.isEmpty()) { - if (info.isMandatory()) + if (info.isMandatory() || info.isImmediate()) { String msg = "Topic " + routingKey + " is not known to " + this; throw new NoRouteException(msg, payload, null); @@ -222,6 +224,7 @@ public class DestWildExchange extends AbstractExchange { _logger.warn("No queues found for routing key " + routingKey); _logger.warn("Routing map contains: " + _routingKey2queues); + return; } } @@ -238,14 +241,15 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { List queues = _routingKey2queues.get(normalize(routingKey)); - return queues != null && queues.contains(queue); - } + return (queues != null) && queues.contains(queue); + } public boolean isBound(AMQShortString routingKey) throws AMQException { List queues = _routingKey2queues.get(normalize(routingKey)); - return queues != null && !queues.isEmpty(); + + return (queues != null) && !queues.isEmpty(); } public boolean isBound(AMQQueue queue) throws AMQException @@ -257,6 +261,7 @@ public class DestWildExchange extends AbstractExchange return true; } } + return false; } @@ -279,12 +284,14 @@ public class DestWildExchange extends AbstractExchange " with routing key " + routingKey + ". No queue was registered with that routing key", null); } + boolean removedQ = queues.remove(queue); if (!removedQ) { throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey, null); } + if (queues.isEmpty()) { _routingKey2queues.remove(routingKey); @@ -304,7 +311,6 @@ public class DestWildExchange extends AbstractExchange } } - private List getMatchedQueues(AMQShortString routingKey) { List list = new LinkedList(); @@ -334,7 +340,6 @@ public class DestWildExchange extends AbstractExchange queueList.add(queTok.nextToken()); } - int depth = 0; boolean matching = true; boolean done = false; @@ -343,25 +348,26 @@ public class DestWildExchange extends AbstractExchange while (matching && !done) { - if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip) + if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip))) { done = true; // if it was the routing key that ran out of digits - if (routingkeyList.size() == depth + routingskip) + if (routingkeyList.size() == (depth + routingskip)) { if (queueList.size() > (depth + queueskip)) - { // a hash and it is the last entry - matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1; + { // a hash and it is the last entry + matching = + queueList.get(depth + queueskip).equals(AMQP_HASH) + && (queueList.size() == (depth + queueskip + 1)); } } - else if (routingkeyList.size() > depth + routingskip) + else if (routingkeyList.size() > (depth + routingskip)) { // There is still more routing key to check matching = false; } - continue; } @@ -377,27 +383,33 @@ public class DestWildExchange extends AbstractExchange else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) { // Is this a # at the end - if (queueList.size() == depth + queueskip + 1) + if (queueList.size() == (depth + queueskip + 1)) { done = true; + continue; } // otherwise # in the middle - while (routingkeyList.size() > depth + routingskip) + while (routingkeyList.size() > (depth + routingskip)) { if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) { queueskip++; depth++; + break; } + routingskip++; } + continue; } + matching = false; } + depth++; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 6148fd4e1c..bf00eeb9d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -1,27 +1,36 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ - package org.apache.qpid.server.exchange; -import java.util.concurrent.CopyOnWriteArraySet; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import javax.management.JMException; import javax.management.MBeanException; @@ -36,16 +45,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; +import java.util.concurrent.CopyOnWriteArraySet; public class FanoutExchange extends AbstractExchange { @@ -63,7 +63,7 @@ public class FanoutExchange extends AbstractExchange private final class FanoutExchangeMBean extends ExchangeMBean { @MBeanConstructor("Creates an MBean for AMQ fanout exchange") - public FanoutExchangeMBean() throws JMException + public FanoutExchangeMBean() throws JMException { super(); _exchangeType = "fanout"; @@ -79,9 +79,7 @@ public class FanoutExchange extends AbstractExchange { String queueName = queue.getName().toString(); - - - Object[] bindingItemValues = {queueName, new String[] {queueName}}; + Object[] bindingItemValues = { queueName, new String[] { queueName } }; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -98,7 +96,7 @@ public class FanoutExchange extends AbstractExchange } try - { + { queue.bind(new AMQShortString(binding), null, FanoutExchange.this); } catch (AMQException ex) @@ -107,8 +105,7 @@ public class FanoutExchange extends AbstractExchange } } - }// End of MBean class - + } // End of MBean class protected ExchangeMBean createMBean() throws AMQException { @@ -147,7 +144,6 @@ public class FanoutExchange extends AbstractExchange { assert queue != null; - if (!_queues.remove(queue)) { throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + @@ -159,10 +155,10 @@ public class FanoutExchange extends AbstractExchange { final MessagePublishInfo publishInfo = payload.getMessagePublishInfo(); final AMQShortString routingKey = publishInfo.getRoutingKey(); - if (_queues == null || _queues.isEmpty()) + if ((_queues == null) || _queues.isEmpty()) { String msg = "No queues bound to " + this; - if (publishInfo.isMandatory()) + if (publishInfo.isMandatory() || publishInfo.isImmediate()) { throw new NoRouteException(msg, payload, null); } @@ -193,13 +189,12 @@ public class FanoutExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey) throws AMQException { - return _queues != null && !_queues.isEmpty(); + return (_queues != null) && !_queues.isEmpty(); } public boolean isBound(AMQQueue queue) throws AMQException { - return _queues.contains(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 8205924207..e86094e26f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -229,7 +229,7 @@ public class HeadersExchange extends AbstractExchange String msg = "Exchange " + getName() + ": message not routable."; - if (payload.getMessagePublishInfo().isMandatory()) + if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate()) { throw new NoRouteException(msg, payload, null); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 82e969b496..c9f5e42286 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,24 +20,13 @@ */ package org.apache.qpid.server.protocol; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.security.Principal; - -import javax.management.JMException; -import javax.security.sasl.SaslServer; - import org.apache.log4j.Logger; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -46,22 +35,34 @@ import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public class AMQMinaProtocolSession implements AMQProtocolSession, - Managable +import javax.management.JMException; +import javax.security.sasl.SaslServer; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -111,25 +112,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; - public ManagedObject getManagedObject() { return _managedObject; } - - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, - AMQCodecFactory codecFactory) - throws AMQException + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) + throws AMQException { _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; session.setAttachment(this); - _codecFactory = codecFactory; - try { IoServiceConfig config = session.getServiceConfig(); @@ -140,16 +136,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (RuntimeException e) { e.printStackTrace(); - // throw e; + // throw e; } -// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); + // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, - AMQCodecFactory codecFactory, AMQStateManager stateManager) - throws AMQException + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, + AMQStateManager stateManager) throws AMQException { _stateManager = stateManager; _minaProtocolSession = session; @@ -182,8 +177,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return (AMQProtocolSession) minaProtocolSession.getAttachment(); } - public void dataBlockReceived(AMQDataBlock message) - throws Exception + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; if (message instanceof ProtocolInitiation) @@ -203,8 +197,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void frameReceived(AMQFrame frame) - throws AMQException + private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); @@ -252,13 +245,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String locales = "en_US"; // Interfacing with generated code - be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short) getProtocolMajorVersion(), // versionMajor - (short) getProtocolMinorVersion()); // versionMinor + AMQFrame response = + ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short) getProtocolMajorVersion(), // versionMajor + (short) getProtocolMinorVersion()); // versionMinor _minaProtocolSession.write(response); } catch (AMQException e) @@ -269,21 +262,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // TODO: Close connection (but how to wait until message is sent?) // ritchiem 2006-12-04 will this not do? -// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); -// future.join(); -// close connection + // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); + // future.join(); + // close connection } } - private void methodFrameReceived(int channelId, AMQMethodBody methodBody) { - final AMQMethodEvent evt = new AMQMethodEvent(channelId, - methodBody); + final AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody); - //Check that this channel is not closing + // Check that this channel is not closing if (channelAwaitingClosure(channelId)) { if ((evt.getMethod() instanceof ChannelCloseOkBody)) @@ -299,11 +290,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); } + return; } } - try { try @@ -315,10 +306,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { for (AMQMethodListener listener : _frameListeners) { - wasAnyoneInterested = listener.methodReceived(evt) || - wasAnyoneInterested; + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } + if (!wasAnyoneInterested) { throw new AMQNoMethodHandlerException(evt, null); @@ -332,6 +323,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Closing channel due to: " + e.getMessage()); } + writeFrame(e.getCloseFrame(channelId)); closeChannel(channelId); } @@ -341,14 +333,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); } + if (_logger.isInfoEnabled()) { _logger.info("Closing connection due to: " + e.getMessage()); } + closeSession(); - AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + AMQConnectionException ce = + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(ce.getCloseFrame(channelId)); @@ -360,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Closing connection due to: " + e.getMessage()); } + closeSession(); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(channelId)); @@ -372,17 +368,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { listener.error(e); } + _minaProtocolSession.close(); } } - private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); - channel.publishContentHeader(body); + channel.publishContentHeader(body, this); } @@ -427,15 +423,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null); } + return channel; } public AMQChannel getChannel(int channelId) throws AMQException { - final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) - ? _cachedChannels[channelId] - : _channelMap.get(channelId); - if (channel == null || channel.isClosing()) + final AMQChannel channel = + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + if ((channel == null) || channel.isClosing()) { return null; } @@ -466,8 +462,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, if (_channelMap.size() == _maxNoOfChannels) { - String errorMessage = toString() + ": maximum number of channels has been reached (" + - _maxNoOfChannels + "); can't create channel"; + String errorMessage = + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; _logger.error(errorMessage); throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null); } @@ -480,6 +477,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _cachedChannels[channelId] = channel; } + checkForNotification(); } @@ -504,7 +502,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void commitTransactions(AMQChannel channel) throws AMQException { - if (channel != null && channel.isTransactional()) + if ((channel != null) && channel.isTransactional()) { channel.commit(); } @@ -512,7 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void rollbackTransactions(AMQChannel channel) throws AMQException { - if (channel != null && channel.isTransactional()) + if ((channel != null) && channel.isTransactional()) { channel.rollback(); } @@ -597,6 +595,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { channel.close(this); } + _channelMap.clear(); for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) { @@ -615,6 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } + for (Task task : _taskList) { task.doTask(this); @@ -687,6 +687,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); } + if (_clientProperties.getString(ClientProperties.version.toString()) != null) { _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); @@ -715,7 +716,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public boolean isProtocolVersion(byte major, byte minor) { - return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor; + return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); } public VersionSpecificRegistry getRegistry() @@ -723,13 +724,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _registry; } - public Object getClientIdentifier() { return _minaProtocolSession.getRemoteAddress(); } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -769,6 +768,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public String getClientVersion() { - return _clientVersion == null ? null : _clientVersion.toString(); + return (_clientVersion == null) ? null : _clientVersion.toString(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a803ef1227..6273ac997b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -49,6 +49,16 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import javax.management.JMException; + +import java.text.MessageFormat; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described * fully in RFC 006. @@ -607,7 +617,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException + /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { // fixme not sure what this is doing. should we be passing deliverFirst through here? // This code is not used so when it is perhaps it should @@ -623,7 +633,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue // from the queue: dequeue(storeContext, msg); } - } + }*/ // public DeliveryManager getDeliveryManager() // { -- cgit v1.2.1