diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 15:49:51 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 15:49:51 +0000 |
| commit | 9bac85e36d60df28fb6f86e3bbe9e3f46689aa04 (patch) | |
| tree | 593c9d3d5cd46191099b558d0ef2d3ef012f968d /java/broker/src | |
| parent | f10117cd6464a107b086e0b7f7ea44a496b04c3d (diff) | |
| download | qpid-python-9bac85e36d60df28fb6f86e3bbe9e3f46689aa04.tar.gz | |
Merged revisions 550748-551121 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r550748 | ritchiem | 2007-06-26 10:20:17 +0100 (Tue, 26 Jun 2007) | 1 line
Added xml file for logging during sustained tests
........
r550773 | rupertlssmith | 2007-06-26 12:03:04 +0100 (Tue, 26 Jun 2007) | 1 line
Immediate and mandatory flag tests added.
........
r550849 | rupertlssmith | 2007-06-26 17:43:58 +0100 (Tue, 26 Jun 2007) | 1 line
QPID-509 Mandatory messages not returned outside a transaction. They are now.
........
r551117 | ritchiem | 2007-06-27 11:51:34 +0100 (Wed, 27 Jun 2007) | 2 lines
Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki.
http://cwiki.apache.org/qpid/sustained-tests.html
........
r551118 | ritchiem | 2007-06-27 11:51:51 +0100 (Wed, 27 Jun 2007) | 1 line
Added intelij files to ignore list
........
r551119 | ritchiem | 2007-06-27 11:55:34 +0100 (Wed, 27 Jun 2007) | 1 line
POM update to add Apache content to built jars
........
r551120 | ritchiem | 2007-06-27 11:58:25 +0100 (Wed, 27 Jun 2007) | 1 line
Updated default guest password as it was not correct.
........
r551121 | ritchiem | 2007-06-27 12:00:48 +0100 (Wed, 27 Jun 2007) | 1 line
Added additional information to log message when available to aid the explination of a failed connection
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551207 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
7 files changed, 183 insertions, 164 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 43a04dbfa1..28a9e85489 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,18 +20,9 @@ */ package org.apache.qpid.server; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; @@ -52,6 +43,16 @@ import org.apache.qpid.server.queue.Subscription; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class AMQChannel { public static final int DEFAULT_PREFETCH = 5000; @@ -208,7 +209,8 @@ public class AMQChannel _currentMessage.setPublisher(publisher); } - public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException + public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) + throws AMQException { if (_currentMessage == null) { @@ -230,6 +232,7 @@ public class AMQChannel // check and deliver if header says body length is zero if (contentHeaderBody.bodySize == 0) { + _txnContext.messageProcessed(protocolSession); _currentMessage = null; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 01242f90de..0dcceaddbb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -169,7 +169,7 @@ public class DestNameExchange extends AbstractExchange if (queues == null || queues.isEmpty()) { String msg = "Routing key " + routingKey + " is not known to " + this; - if (info.isMandatory()) + if (info.isMandatory() || info.isImmediate()) { throw new NoRouteException(msg, payload, null); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 222e341b1a..f6a95b5e55 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.server.exchange; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import java.util.LinkedList; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import javax.management.JMException; import javax.management.MBeanException; @@ -41,24 +46,21 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; public class DestWildExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); - // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); + private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private static final String TOPIC_SEPARATOR = "."; private static final String AMQP_STAR = "*"; private static final String AMQP_HASH = "#"; @@ -90,7 +92,7 @@ public class DestWildExchange extends AbstractExchange queueList.add(q.getName().toString()); } - Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])}; + Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) }; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -118,7 +120,6 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class - public AMQShortString getType() { return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; @@ -140,6 +141,7 @@ public class DestWildExchange extends AbstractExchange { queueList = _routingKey2queues.get(routingKey); } + if (!queueList.contains(queue)) { queueList.add(queue); @@ -165,8 +167,8 @@ public class DestWildExchange extends AbstractExchange for (int index = 0; index < size; index++) { - //if there are more levels - if (index + 1 < size) + // if there are more levels + if ((index + 1) < size) { if (_subscription.get(index).equals(AMQP_HASH)) { @@ -175,7 +177,7 @@ public class DestWildExchange extends AbstractExchange // we don't need #.# delete this one _subscription.remove(index); size--; - //redo this normalisation + // redo this normalisation index--; } @@ -186,7 +188,7 @@ public class DestWildExchange extends AbstractExchange _subscription.add(index + 1, _subscription.remove(index)); } } - }//if we have more levels + } // if we have more levels } StringBuilder sb = new StringBuilder(); @@ -211,9 +213,9 @@ public class DestWildExchange extends AbstractExchange List<AMQQueue> queues = getMatchedQueues(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag - if (queues == null || queues.size() == 0) + if ((queues == null) || queues.isEmpty()) { - if (info.isMandatory()) + if (info.isMandatory() || info.isImmediate()) { String msg = "Topic " + routingKey + " is not known to " + this; throw new NoRouteException(msg, payload, null); @@ -222,6 +224,7 @@ public class DestWildExchange extends AbstractExchange { _logger.warn("No queues found for routing key " + routingKey); _logger.warn("Routing map contains: " + _routingKey2queues); + return; } } @@ -238,14 +241,15 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); - return queues != null && queues.contains(queue); - } + return (queues != null) && queues.contains(queue); + } public boolean isBound(AMQShortString routingKey) throws AMQException { List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); - return queues != null && !queues.isEmpty(); + + return (queues != null) && !queues.isEmpty(); } public boolean isBound(AMQQueue queue) throws AMQException @@ -257,6 +261,7 @@ public class DestWildExchange extends AbstractExchange return true; } } + return false; } @@ -279,12 +284,14 @@ public class DestWildExchange extends AbstractExchange " with routing key " + routingKey + ". No queue was registered with that routing key", null); } + boolean removedQ = queues.remove(queue); if (!removedQ) { throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey, null); } + if (queues.isEmpty()) { _routingKey2queues.remove(routingKey); @@ -304,7 +311,6 @@ public class DestWildExchange extends AbstractExchange } } - private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) { List<AMQQueue> list = new LinkedList<AMQQueue>(); @@ -334,7 +340,6 @@ public class DestWildExchange extends AbstractExchange queueList.add(queTok.nextToken()); } - int depth = 0; boolean matching = true; boolean done = false; @@ -343,25 +348,26 @@ public class DestWildExchange extends AbstractExchange while (matching && !done) { - if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip) + if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip))) { done = true; // if it was the routing key that ran out of digits - if (routingkeyList.size() == depth + routingskip) + if (routingkeyList.size() == (depth + routingskip)) { if (queueList.size() > (depth + queueskip)) - { // a hash and it is the last entry - matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1; + { // a hash and it is the last entry + matching = + queueList.get(depth + queueskip).equals(AMQP_HASH) + && (queueList.size() == (depth + queueskip + 1)); } } - else if (routingkeyList.size() > depth + routingskip) + else if (routingkeyList.size() > (depth + routingskip)) { // There is still more routing key to check matching = false; } - continue; } @@ -377,27 +383,33 @@ public class DestWildExchange extends AbstractExchange else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) { // Is this a # at the end - if (queueList.size() == depth + queueskip + 1) + if (queueList.size() == (depth + queueskip + 1)) { done = true; + continue; } // otherwise # in the middle - while (routingkeyList.size() > depth + routingskip) + while (routingkeyList.size() > (depth + routingskip)) { if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) { queueskip++; depth++; + break; } + routingskip++; } + continue; } + matching = false; } + depth++; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 6148fd4e1c..bf00eeb9d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -1,27 +1,36 @@ /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
-
package org.apache.qpid.server.exchange;
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -36,16 +45,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
{
@@ -63,7 +63,7 @@ public class FanoutExchange extends AbstractExchange private final class FanoutExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ fanout exchange")
- public FanoutExchangeMBean() throws JMException
+ public FanoutExchangeMBean() throws JMException
{
super();
_exchangeType = "fanout";
@@ -79,9 +79,7 @@ public class FanoutExchange extends AbstractExchange {
String queueName = queue.getName().toString();
-
-
- Object[] bindingItemValues = {queueName, new String[] {queueName}};
+ Object[] bindingItemValues = { queueName, new String[] { queueName } };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -98,7 +96,7 @@ public class FanoutExchange extends AbstractExchange }
try
- {
+ {
queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
@@ -107,8 +105,7 @@ public class FanoutExchange extends AbstractExchange }
}
- }// End of MBean class
-
+ } // End of MBean class
protected ExchangeMBean createMBean() throws AMQException
{
@@ -147,7 +144,6 @@ public class FanoutExchange extends AbstractExchange {
assert queue != null;
-
if (!_queues.remove(queue))
{
throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
@@ -159,10 +155,10 @@ public class FanoutExchange extends AbstractExchange {
final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
final AMQShortString routingKey = publishInfo.getRoutingKey();
- if (_queues == null || _queues.isEmpty())
+ if ((_queues == null) || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishInfo.isMandatory())
+ if (publishInfo.isMandatory() || publishInfo.isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
@@ -193,13 +189,12 @@ public class FanoutExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey) throws AMQException
{
- return _queues != null && !_queues.isEmpty();
+ return (_queues != null) && !_queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
{
-
return _queues.contains(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 8205924207..e86094e26f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -229,7 +229,7 @@ public class HeadersExchange extends AbstractExchange String msg = "Exchange " + getName() + ": message not routable."; - if (payload.getMessagePublishInfo().isMandatory()) + if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate()) { throw new NoRouteException(msg, payload, null); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 82e969b496..c9f5e42286 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,24 +20,13 @@ */ package org.apache.qpid.server.protocol; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.security.Principal; - -import javax.management.JMException; -import javax.security.sasl.SaslServer; - import org.apache.log4j.Logger; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -46,22 +35,34 @@ import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public class AMQMinaProtocolSession implements AMQProtocolSession, - Managable +import javax.management.JMException; +import javax.security.sasl.SaslServer; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -111,25 +112,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; - public ManagedObject getManagedObject() { return _managedObject; } - - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, - AMQCodecFactory codecFactory) - throws AMQException + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) + throws AMQException { _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; session.setAttachment(this); - _codecFactory = codecFactory; - try { IoServiceConfig config = session.getServiceConfig(); @@ -140,16 +136,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (RuntimeException e) { e.printStackTrace(); - // throw e; + // throw e; } -// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); + // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, - AMQCodecFactory codecFactory, AMQStateManager stateManager) - throws AMQException + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, + AMQStateManager stateManager) throws AMQException { _stateManager = stateManager; _minaProtocolSession = session; @@ -182,8 +177,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return (AMQProtocolSession) minaProtocolSession.getAttachment(); } - public void dataBlockReceived(AMQDataBlock message) - throws Exception + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; if (message instanceof ProtocolInitiation) @@ -203,8 +197,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void frameReceived(AMQFrame frame) - throws AMQException + private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); @@ -252,13 +245,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String locales = "en_US"; // Interfacing with generated code - be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short) getProtocolMajorVersion(), // versionMajor - (short) getProtocolMinorVersion()); // versionMinor + AMQFrame response = + ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short) getProtocolMajorVersion(), // versionMajor + (short) getProtocolMinorVersion()); // versionMinor _minaProtocolSession.write(response); } catch (AMQException e) @@ -269,21 +262,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // TODO: Close connection (but how to wait until message is sent?) // ritchiem 2006-12-04 will this not do? -// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); -// future.join(); -// close connection + // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); + // future.join(); + // close connection } } - private void methodFrameReceived(int channelId, AMQMethodBody methodBody) { - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, - methodBody); + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); - //Check that this channel is not closing + // Check that this channel is not closing if (channelAwaitingClosure(channelId)) { if ((evt.getMethod() instanceof ChannelCloseOkBody)) @@ -299,11 +290,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); } + return; } } - try { try @@ -315,10 +306,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { for (AMQMethodListener listener : _frameListeners) { - wasAnyoneInterested = listener.methodReceived(evt) || - wasAnyoneInterested; + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } + if (!wasAnyoneInterested) { throw new AMQNoMethodHandlerException(evt, null); @@ -332,6 +323,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Closing channel due to: " + e.getMessage()); } + writeFrame(e.getCloseFrame(channelId)); closeChannel(channelId); } @@ -341,14 +333,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); } + if (_logger.isInfoEnabled()) { _logger.info("Closing connection due to: " + e.getMessage()); } + closeSession(); - AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + AMQConnectionException ce = + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(ce.getCloseFrame(channelId)); @@ -360,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Closing connection due to: " + e.getMessage()); } + closeSession(); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(channelId)); @@ -372,17 +368,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { listener.error(e); } + _minaProtocolSession.close(); } } - private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); - channel.publishContentHeader(body); + channel.publishContentHeader(body, this); } @@ -427,15 +423,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null); } + return channel; } public AMQChannel getChannel(int channelId) throws AMQException { - final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) - ? _cachedChannels[channelId] - : _channelMap.get(channelId); - if (channel == null || channel.isClosing()) + final AMQChannel channel = + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + if ((channel == null) || channel.isClosing()) { return null; } @@ -466,8 +462,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, if (_channelMap.size() == _maxNoOfChannels) { - String errorMessage = toString() + ": maximum number of channels has been reached (" + - _maxNoOfChannels + "); can't create channel"; + String errorMessage = + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; _logger.error(errorMessage); throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null); } @@ -480,6 +477,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _cachedChannels[channelId] = channel; } + checkForNotification(); } @@ -504,7 +502,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void commitTransactions(AMQChannel channel) throws AMQException { - if (channel != null && channel.isTransactional()) + if ((channel != null) && channel.isTransactional()) { channel.commit(); } @@ -512,7 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void rollbackTransactions(AMQChannel channel) throws AMQException { - if (channel != null && channel.isTransactional()) + if ((channel != null) && channel.isTransactional()) { channel.rollback(); } @@ -597,6 +595,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { channel.close(this); } + _channelMap.clear(); for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) { @@ -615,6 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } + for (Task task : _taskList) { task.doTask(this); @@ -687,6 +687,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); } + if (_clientProperties.getString(ClientProperties.version.toString()) != null) { _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); @@ -715,7 +716,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public boolean isProtocolVersion(byte major, byte minor) { - return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor; + return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); } public VersionSpecificRegistry getRegistry() @@ -723,13 +724,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _registry; } - public Object getClientIdentifier() { return _minaProtocolSession.getRemoteAddress(); } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -769,6 +768,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public String getClientVersion() { - return _clientVersion == null ? null : _clientVersion.toString(); + return (_clientVersion == null) ? null : _clientVersion.toString(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a803ef1227..6273ac997b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -49,6 +49,16 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import javax.management.JMException; + +import java.text.MessageFormat; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described * fully in RFC 006. @@ -607,7 +617,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException + /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { // fixme not sure what this is doing. should we be passing deliverFirst through here? // This code is not used so when it is perhaps it should @@ -623,7 +633,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue // from the queue: dequeue(storeContext, msg); } - } + }*/ // public DeliveryManager getDeliveryManager() // { |
