summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-06-27 15:49:51 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-06-27 15:49:51 +0000
commit9bac85e36d60df28fb6f86e3bbe9e3f46689aa04 (patch)
tree593c9d3d5cd46191099b558d0ef2d3ef012f968d /java/broker/src
parentf10117cd6464a107b086e0b7f7ea44a496b04c3d (diff)
downloadqpid-python-9bac85e36d60df28fb6f86e3bbe9e3f46689aa04.tar.gz
Merged revisions 550748-551121 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r550748 | ritchiem | 2007-06-26 10:20:17 +0100 (Tue, 26 Jun 2007) | 1 line Added xml file for logging during sustained tests ........ r550773 | rupertlssmith | 2007-06-26 12:03:04 +0100 (Tue, 26 Jun 2007) | 1 line Immediate and mandatory flag tests added. ........ r550849 | rupertlssmith | 2007-06-26 17:43:58 +0100 (Tue, 26 Jun 2007) | 1 line QPID-509 Mandatory messages not returned outside a transaction. They are now. ........ r551117 | ritchiem | 2007-06-27 11:51:34 +0100 (Wed, 27 Jun 2007) | 2 lines Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki. http://cwiki.apache.org/qpid/sustained-tests.html ........ r551118 | ritchiem | 2007-06-27 11:51:51 +0100 (Wed, 27 Jun 2007) | 1 line Added intelij files to ignore list ........ r551119 | ritchiem | 2007-06-27 11:55:34 +0100 (Wed, 27 Jun 2007) | 1 line POM update to add Apache content to built jars ........ r551120 | ritchiem | 2007-06-27 11:58:25 +0100 (Wed, 27 Jun 2007) | 1 line Updated default guest password as it was not correct. ........ r551121 | ritchiem | 2007-06-27 12:00:48 +0100 (Wed, 27 Jun 2007) | 1 line Added additional information to log message when available to aid the explination of a failed connection ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551207 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java94
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java71
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java139
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java14
7 files changed, 183 insertions, 164 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 43a04dbfa1..28a9e85489 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,18 +20,9 @@
*/
package org.apache.qpid.server;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
@@ -52,6 +43,16 @@ import org.apache.qpid.server.queue.Subscription;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class AMQChannel
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -208,7 +209,8 @@ public class AMQChannel
_currentMessage.setPublisher(publisher);
}
- public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException
+ public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
+ throws AMQException
{
if (_currentMessage == null)
{
@@ -230,6 +232,7 @@ public class AMQChannel
// check and deliver if header says body length is zero
if (contentHeaderBody.bodySize == 0)
{
+ _txnContext.messageProcessed(protocolSession);
_currentMessage = null;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 01242f90de..0dcceaddbb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -169,7 +169,7 @@ public class DestNameExchange extends AbstractExchange
if (queues == null || queues.isEmpty())
{
String msg = "Routing key " + routingKey + " is not known to " + this;
- if (info.isMandatory())
+ if (info.isMandatory() || info.isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 222e341b1a..f6a95b5e55 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -41,24 +46,21 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
public class DestWildExchange extends AbstractExchange
{
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
- private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
- // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+ private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final String TOPIC_SEPARATOR = ".";
private static final String AMQP_STAR = "*";
private static final String AMQP_HASH = "#";
@@ -90,7 +92,7 @@ public class DestWildExchange extends AbstractExchange
queueList.add(q.getName().toString());
}
- Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
+ Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -118,7 +120,6 @@ public class DestWildExchange extends AbstractExchange
} // End of MBean class
-
public AMQShortString getType()
{
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -140,6 +141,7 @@ public class DestWildExchange extends AbstractExchange
{
queueList = _routingKey2queues.get(routingKey);
}
+
if (!queueList.contains(queue))
{
queueList.add(queue);
@@ -165,8 +167,8 @@ public class DestWildExchange extends AbstractExchange
for (int index = 0; index < size; index++)
{
- //if there are more levels
- if (index + 1 < size)
+ // if there are more levels
+ if ((index + 1) < size)
{
if (_subscription.get(index).equals(AMQP_HASH))
{
@@ -175,7 +177,7 @@ public class DestWildExchange extends AbstractExchange
// we don't need #.# delete this one
_subscription.remove(index);
size--;
- //redo this normalisation
+ // redo this normalisation
index--;
}
@@ -186,7 +188,7 @@ public class DestWildExchange extends AbstractExchange
_subscription.add(index + 1, _subscription.remove(index));
}
}
- }//if we have more levels
+ } // if we have more levels
}
StringBuilder sb = new StringBuilder();
@@ -211,9 +213,9 @@ public class DestWildExchange extends AbstractExchange
List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
- if (queues == null || queues.size() == 0)
+ if ((queues == null) || queues.isEmpty())
{
- if (info.isMandatory())
+ if (info.isMandatory() || info.isImmediate())
{
String msg = "Topic " + routingKey + " is not known to " + this;
throw new NoRouteException(msg, payload, null);
@@ -222,6 +224,7 @@ public class DestWildExchange extends AbstractExchange
{
_logger.warn("No queues found for routing key " + routingKey);
_logger.warn("Routing map contains: " + _routingKey2queues);
+
return;
}
}
@@ -238,14 +241,15 @@ public class DestWildExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
- return queues != null && queues.contains(queue);
- }
+ return (queues != null) && queues.contains(queue);
+ }
public boolean isBound(AMQShortString routingKey) throws AMQException
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
- return queues != null && !queues.isEmpty();
+
+ return (queues != null) && !queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
@@ -257,6 +261,7 @@ public class DestWildExchange extends AbstractExchange
return true;
}
}
+
return false;
}
@@ -279,12 +284,14 @@ public class DestWildExchange extends AbstractExchange
" with routing key " + routingKey + ". No queue was registered with that routing key", null);
}
+
boolean removedQ = queues.remove(queue);
if (!removedQ)
{
throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
" with routing key " + routingKey, null);
}
+
if (queues.isEmpty())
{
_routingKey2queues.remove(routingKey);
@@ -304,7 +311,6 @@ public class DestWildExchange extends AbstractExchange
}
}
-
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
List<AMQQueue> list = new LinkedList<AMQQueue>();
@@ -334,7 +340,6 @@ public class DestWildExchange extends AbstractExchange
queueList.add(queTok.nextToken());
}
-
int depth = 0;
boolean matching = true;
boolean done = false;
@@ -343,25 +348,26 @@ public class DestWildExchange extends AbstractExchange
while (matching && !done)
{
- if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+ if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
{
done = true;
// if it was the routing key that ran out of digits
- if (routingkeyList.size() == depth + routingskip)
+ if (routingkeyList.size() == (depth + routingskip))
{
if (queueList.size() > (depth + queueskip))
- { // a hash and it is the last entry
- matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+ { // a hash and it is the last entry
+ matching =
+ queueList.get(depth + queueskip).equals(AMQP_HASH)
+ && (queueList.size() == (depth + queueskip + 1));
}
}
- else if (routingkeyList.size() > depth + routingskip)
+ else if (routingkeyList.size() > (depth + routingskip))
{
// There is still more routing key to check
matching = false;
}
-
continue;
}
@@ -377,27 +383,33 @@ public class DestWildExchange extends AbstractExchange
else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
{
// Is this a # at the end
- if (queueList.size() == depth + queueskip + 1)
+ if (queueList.size() == (depth + queueskip + 1))
{
done = true;
+
continue;
}
// otherwise # in the middle
- while (routingkeyList.size() > depth + routingskip)
+ while (routingkeyList.size() > (depth + routingskip))
{
if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
{
queueskip++;
depth++;
+
break;
}
+
routingskip++;
}
+
continue;
}
+
matching = false;
}
+
depth++;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 6148fd4e1c..bf00eeb9d3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -1,27 +1,36 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
-
package org.apache.qpid.server.exchange;
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -36,16 +45,7 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
{
@@ -63,7 +63,7 @@ public class FanoutExchange extends AbstractExchange
private final class FanoutExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ fanout exchange")
- public FanoutExchangeMBean() throws JMException
+ public FanoutExchangeMBean() throws JMException
{
super();
_exchangeType = "fanout";
@@ -79,9 +79,7 @@ public class FanoutExchange extends AbstractExchange
{
String queueName = queue.getName().toString();
-
-
- Object[] bindingItemValues = {queueName, new String[] {queueName}};
+ Object[] bindingItemValues = { queueName, new String[] { queueName } };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -98,7 +96,7 @@ public class FanoutExchange extends AbstractExchange
}
try
- {
+ {
queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
@@ -107,8 +105,7 @@ public class FanoutExchange extends AbstractExchange
}
}
- }// End of MBean class
-
+ } // End of MBean class
protected ExchangeMBean createMBean() throws AMQException
{
@@ -147,7 +144,6 @@ public class FanoutExchange extends AbstractExchange
{
assert queue != null;
-
if (!_queues.remove(queue))
{
throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
@@ -159,10 +155,10 @@ public class FanoutExchange extends AbstractExchange
{
final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
final AMQShortString routingKey = publishInfo.getRoutingKey();
- if (_queues == null || _queues.isEmpty())
+ if ((_queues == null) || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishInfo.isMandatory())
+ if (publishInfo.isMandatory() || publishInfo.isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
@@ -193,13 +189,12 @@ public class FanoutExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey) throws AMQException
{
- return _queues != null && !_queues.isEmpty();
+ return (_queues != null) && !_queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
{
-
return _queues.contains(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 8205924207..e86094e26f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -229,7 +229,7 @@ public class HeadersExchange extends AbstractExchange
String msg = "Exchange " + getName() + ": message not routable.";
- if (payload.getMessagePublishInfo().isMandatory())
+ if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 82e969b496..c9f5e42286 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,24 +20,13 @@
*/
package org.apache.qpid.server.protocol;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.security.Principal;
-
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -46,22 +35,34 @@ import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public class AMQMinaProtocolSession implements AMQProtocolSession,
- Managable
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -111,25 +112,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
-
public ManagedObject getManagedObject()
{
return _managedObject;
}
-
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
- AMQCodecFactory codecFactory)
- throws AMQException
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
+ throws AMQException
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
-
_codecFactory = codecFactory;
-
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -140,16 +136,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
catch (RuntimeException e)
{
e.printStackTrace();
- // throw e;
+ // throw e;
}
-// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+ // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
- AMQCodecFactory codecFactory, AMQStateManager stateManager)
- throws AMQException
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
+ AMQStateManager stateManager) throws AMQException
{
_stateManager = stateManager;
_minaProtocolSession = session;
@@ -182,8 +177,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
- public void dataBlockReceived(AMQDataBlock message)
- throws Exception
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
if (message instanceof ProtocolInitiation)
@@ -203,8 +197,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- private void frameReceived(AMQFrame frame)
- throws AMQException
+ private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
@@ -252,13 +245,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
String locales = "en_US";
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short) getProtocolMajorVersion(), // versionMajor
- (short) getProtocolMinorVersion()); // versionMinor
+ AMQFrame response =
+ ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short) getProtocolMajorVersion(), // versionMajor
+ (short) getProtocolMinorVersion()); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -269,21 +262,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// TODO: Close connection (but how to wait until message is sent?)
// ritchiem 2006-12-04 will this not do?
-// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
-// future.join();
-// close connection
+ // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
+ // future.join();
+ // close connection
}
}
-
private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId,
- methodBody);
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- //Check that this channel is not closing
+ // Check that this channel is not closing
if (channelAwaitingClosure(channelId))
{
if ((evt.getMethod() instanceof ChannelCloseOkBody))
@@ -299,11 +290,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Channel[" + channelId + "] awaiting closure ignoring");
}
+
return;
}
}
-
try
{
try
@@ -315,10 +306,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
for (AMQMethodListener listener : _frameListeners)
{
- wasAnyoneInterested = listener.methodReceived(evt) ||
- wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
}
+
if (!wasAnyoneInterested)
{
throw new AMQNoMethodHandlerException(evt, null);
@@ -332,6 +323,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Closing channel due to: " + e.getMessage());
}
+
writeFrame(e.getCloseFrame(channelId));
closeChannel(channelId);
}
@@ -341,14 +333,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
}
+
if (_logger.isInfoEnabled())
{
_logger.info("Closing connection due to: " + e.getMessage());
}
+
closeSession();
- AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
+ AMQConnectionException ce =
+ evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(ce.getCloseFrame(channelId));
@@ -360,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Closing connection due to: " + e.getMessage());
}
+
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));
@@ -372,17 +368,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
listener.error(e);
}
+
_minaProtocolSession.close();
}
}
-
private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
- channel.publishContentHeader(body);
+ channel.publishContentHeader(body, this);
}
@@ -427,15 +423,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null);
}
+
return channel;
}
public AMQChannel getChannel(int channelId) throws AMQException
{
- final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId)
- ? _cachedChannels[channelId]
- : _channelMap.get(channelId);
- if (channel == null || channel.isClosing())
+ final AMQChannel channel =
+ ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+ if ((channel == null) || channel.isClosing())
{
return null;
}
@@ -466,8 +462,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
if (_channelMap.size() == _maxNoOfChannels)
{
- String errorMessage = toString() + ": maximum number of channels has been reached (" +
- _maxNoOfChannels + "); can't create channel";
+ String errorMessage =
+ toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+ + "); can't create channel";
_logger.error(errorMessage);
throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null);
}
@@ -480,6 +477,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_cachedChannels[channelId] = channel;
}
+
checkForNotification();
}
@@ -504,7 +502,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void commitTransactions(AMQChannel channel) throws AMQException
{
- if (channel != null && channel.isTransactional())
+ if ((channel != null) && channel.isTransactional())
{
channel.commit();
}
@@ -512,7 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void rollbackTransactions(AMQChannel channel) throws AMQException
{
- if (channel != null && channel.isTransactional())
+ if ((channel != null) && channel.isTransactional())
{
channel.rollback();
}
@@ -597,6 +595,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
channel.close(this);
}
+
_channelMap.clear();
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
@@ -615,6 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_managedObject.unregister();
}
+
for (Task task : _taskList)
{
task.doTask(this);
@@ -687,6 +687,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
}
+
if (_clientProperties.getString(ClientProperties.version.toString()) != null)
{
_clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
@@ -715,7 +716,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public boolean isProtocolVersion(byte major, byte minor)
{
- return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor;
+ return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
}
public VersionSpecificRegistry getRegistry()
@@ -723,13 +724,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _registry;
}
-
public Object getClientIdentifier()
{
return _minaProtocolSession.getRemoteAddress();
}
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -769,6 +768,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public String getClientVersion()
{
- return _clientVersion == null ? null : _clientVersion.toString();
+ return (_clientVersion == null) ? null : _clientVersion.toString();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a803ef1227..6273ac997b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -49,6 +49,16 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.management.JMException;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
* fully in RFC 006.
@@ -607,7 +617,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+ /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
// fixme not sure what this is doing. should we be passing deliverFirst through here?
// This code is not used so when it is perhaps it should
@@ -623,7 +633,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
// from the queue:
dequeue(storeContext, msg);
}
- }
+ }*/
// public DeliveryManager getDeliveryManager()
// {