summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-04-24 17:49:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-04-24 17:49:03 +0000
commitd964eae817b538c532996af0b41993d128fa5a5c (patch)
treeb0f9a56bc8a7691bd4cf009cbc83cf0fc8aa3ffc /java/client/src
parent757d86d81e811f105f72fdfce5bc18d83aaa08d4 (diff)
downloadqpid-python-d964eae817b538c532996af0b41993d128fa5a5c.tar.gz
QPID-832 : Fix eol-style
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651325 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java52
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java80
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java132
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java76
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java150
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java98
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java270
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java72
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java1056
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java310
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java170
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java1604
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java1104
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java64
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/Message.java56
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java208
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java442
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java288
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java178
19 files changed, 3205 insertions, 3205 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
index 93f10761e2..7e257e0c20 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
@@ -1,26 +1,26 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-public interface AMQSessionAdapter
-{
- public AMQSession getSession();
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+public interface AMQSessionAdapter
+{
+ public AMQSession getSession();
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
index 768e2862b3..fa2afb3ee4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
@@ -1,40 +1,40 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQUndefinedDestination extends AMQDestination
-{
-
- private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown");
-
-
- public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
- {
- super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
- }
-
- public boolean isNameRequired()
- {
- return getAMQQueueName() == null;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class AMQUndefinedDestination extends AMQDestination
+{
+
+ private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown");
+
+
+ public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
+ }
+
+ public boolean isNameRequired()
+ {
+ return getAMQQueueName() == null;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
index 8cb285c1ac..7cc548915c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
+++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
@@ -1,66 +1,66 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-
-import org.apache.qpid.framing.AMQShortString;
-
-public enum CustomJMSXProperty
-{
- JMS_AMQP_NULL,
- JMS_QPID_DESTTYPE,
- JMSXGroupID,
- JMSXGroupSeq,
- JMSXUserID;
-
-
- private final AMQShortString _nameAsShortString;
-
- CustomJMSXProperty()
- {
- _nameAsShortString = new AMQShortString(toString());
- }
-
- public AMQShortString getShortStringName()
- {
- return _nameAsShortString;
- }
-
- private static Enumeration _names;
-
- public static synchronized Enumeration asEnumeration()
- {
- if(_names == null)
- {
- CustomJMSXProperty[] properties = values();
- ArrayList<String> nameList = new ArrayList<String>(properties.length);
- for(CustomJMSXProperty property : properties)
- {
- nameList.add(property.toString());
- }
- _names = Collections.enumeration(nameList);
- }
- return _names;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public enum CustomJMSXProperty
+{
+ JMS_AMQP_NULL,
+ JMS_QPID_DESTTYPE,
+ JMSXGroupID,
+ JMSXGroupSeq,
+ JMSXUserID;
+
+
+ private final AMQShortString _nameAsShortString;
+
+ CustomJMSXProperty()
+ {
+ _nameAsShortString = new AMQShortString(toString());
+ }
+
+ public AMQShortString getShortStringName()
+ {
+ return _nameAsShortString;
+ }
+
+ private static Enumeration _names;
+
+ public static synchronized Enumeration asEnumeration()
+ {
+ if(_names == null)
+ {
+ CustomJMSXProperty[] properties = values();
+ ArrayList<String> nameList = new ArrayList<String>(properties.length);
+ for(CustomJMSXProperty property : properties)
+ {
+ nameList.add(property.toString());
+ }
+ _names = Collections.enumeration(nameList);
+ }
+ return _names;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
index 03d25aa243..7f8e80c73a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
@@ -1,38 +1,38 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.client;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-
-/**
- * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
- * so that operations related to their "temporary-ness" can be abstracted out.
- */
-interface TemporaryDestination extends Destination
-{
-
- public void delete() throws JMSException;
- public AMQSession getSession();
- public boolean isDeleted();
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.client;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+/**
+ * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * so that operations related to their "temporary-ness" can be abstracted out.
+ */
+interface TemporaryDestination extends Destination
+{
+
+ public void delete() throws JMSException;
+ public AMQSession getSession();
+ public boolean isDeleted();
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
index 1ec98efe0e..51cc94965a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
@@ -1,75 +1,75 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.client.failover;
-
-import org.apache.qpid.client.AMQConnection;
-
-/**
- * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support
- * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this
- * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be
- * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception,
- * for example, because the caller already holds a lock preventing that condition from arising.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Perform a fail-over protected operation raising providing no handling of fail-over conditions.
- * </table>
- */
-public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E>
-{
- /** The protected operation that is to be retried in the event of fail-over. */
- FailoverProtectedOperation<T, E> operation;
-
- /** The connection on which the fail-over protected operation is to be performed. */
- AMQConnection connection;
-
- /**
- * Creates an automatic retrying fail-over handler for the specified operation.
- *
- * @param operation The fail-over protected operation to wrap in this handler.
- */
- public FailoverNoopSupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
- {
- this.operation = operation;
- this.connection = con;
- }
-
- /**
- * Delegates to another continuation which is to be provided with fail-over handling.
- *
- * @return The return value from the delegated to continuation.
- * @throws E Any exception that the delegated to continuation may raise.
- */
- public T execute() throws E
- {
- try
- {
- return operation.execute();
- }
- catch (FailoverException e)
- {
- throw new IllegalStateException("Fail-over interupted no-op failover support. "
- + "No-op support should only be used where the caller is certain fail-over cannot occur.", e);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.client.failover;
+
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support
+ * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this
+ * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be
+ * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception,
+ * for example, because the caller already holds a lock preventing that condition from arising.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Perform a fail-over protected operation raising providing no handling of fail-over conditions.
+ * </table>
+ */
+public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E>
+{
+ /** The protected operation that is to be retried in the event of fail-over. */
+ FailoverProtectedOperation<T, E> operation;
+
+ /** The connection on which the fail-over protected operation is to be performed. */
+ AMQConnection connection;
+
+ /**
+ * Creates an automatic retrying fail-over handler for the specified operation.
+ *
+ * @param operation The fail-over protected operation to wrap in this handler.
+ */
+ public FailoverNoopSupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
+ {
+ this.operation = operation;
+ this.connection = con;
+ }
+
+ /**
+ * Delegates to another continuation which is to be provided with fail-over handling.
+ *
+ * @return The return value from the delegated to continuation.
+ * @throws E Any exception that the delegated to continuation may raise.
+ */
+ public T execute() throws E
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ throw new IllegalStateException("Fail-over interupted no-op failover support. "
+ + "No-op support should only be used where the caller is certain fail-over cannot occur.", e);
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java
index 9a7f43926e..e9c5f24791 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java
@@ -1,49 +1,49 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.client.failover;
-
-/**
- * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because
- * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers
- * for failover protected operations, in order to provide different handling schemes when failovers occurr.
- *
- * <p/>The type of checked exception that the operation may perform has been generified, in order that fail over
- * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not
- * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes
- * will mask the exception.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities
- * <tr><td> Perform an operation that may be interrupted by fail-over.
- * </table>
- */
-public interface FailoverProtectedOperation<T, E extends Exception>
-{
- /**
- * Performs the continuations work.
- *
- * @return Provdes scope for the continuation to return an arbitrary value.
- *
- * @throws FailoverException If the operation is interrupted by a fail-over notification.
- */
- public abstract T execute() throws E, FailoverException;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.client.failover;
+
+/**
+ * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because
+ * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers
+ * for failover protected operations, in order to provide different handling schemes when failovers occurr.
+ *
+ * <p/>The type of checked exception that the operation may perform has been generified, in order that fail over
+ * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not
+ * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes
+ * will mask the exception.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Perform an operation that may be interrupted by fail-over.
+ * </table>
+ */
+public interface FailoverProtectedOperation<T, E extends Exception>
+{
+ /**
+ * Performs the continuations work.
+ *
+ * @return Provdes scope for the continuation to return an arbitrary value.
+ *
+ * @throws FailoverException If the operation is interrupted by a fail-over notification.
+ */
+ public abstract T execute() throws E, FailoverException;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
index e756d7baf9..cf7e978c03 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -1,135 +1,135 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.failover;
-
-import org.apache.qpid.client.AMQConnection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified
- * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due
- * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request,
- * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This
- * automatic retrying is continued until the continuation succeeds, or throws an exception (different to
- * FailoverException, which is used to signal the failure of the original condition).
- *
- * <p/>The blocking condition used is that the connection is not currently failing over, and the mutex used is the
- * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods.
- * These are used like a lock and condition variable.
- *
- * <p/>The wrapped operation may throw a FailoverException, this is an exception that can be raised by a
- * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a
- * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are
- * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous
- * methods that should not be attempted when a fail-over is in progress.
- *
- * <p/>Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be
- * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want
- * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation
- * until it succeeds. Synchronous methods are usually coordinated with a
- * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants
- * to start and throws a FailoverException in response to this.
- *
- * <p/>Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be
- * started during fail-over, but be delayed until any current fail-over has completed.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide a continuation synchronized on a fail-over lock and condition.
- * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
- * </table>
- *
- * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
- * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
- * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
- * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
- * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
- *
- * @todo InterruptedException not handled well.
- */
-public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E>
-{
- /** Used for debugging. */
- private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class);
-
- /** The protected operation that is to be retried in the event of fail-over. */
- FailoverProtectedOperation<T, E> operation;
-
- /** The connection on which the fail-over protected operation is to be performed. */
- AMQConnection connection;
-
- /**
- * Creates an automatic retrying fail-over handler for the specified operation.
- *
- * @param operation The fail-over protected operation to wrap in this handler.
- */
- public FailoverRetrySupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
- {
- this.operation = operation;
- this.connection = con;
- }
-
- /**
- * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats
- * until the operation throws AMQException or succeeds without being interrupted by fail-over.
- *
- * @return The result of executing the continuation.
- *
- * @throws E Any underlying exception is allowed to fall through.
- */
- public T execute() throws E
- {
- while (true)
- {
- try
- {
- connection.blockUntilNotFailingOver();
- }
- catch (InterruptedException e)
- {
- _log.debug("Interrupted: " + e, e);
-
- return null;
- }
-
- synchronized (connection.getFailoverMutex())
- {
- try
- {
- return operation.execute();
- }
- catch (FailoverException e)
- {
- _log.debug("Failover exception caught during operation: " + e, e);
- }
- catch (IllegalStateException e)
- {
- if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
- {
- throw e;
- }
- }
- }
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.qpid.client.AMQConnection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified
+ * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due
+ * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request,
+ * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This
+ * automatic retrying is continued until the continuation succeeds, or throws an exception (different to
+ * FailoverException, which is used to signal the failure of the original condition).
+ *
+ * <p/>The blocking condition used is that the connection is not currently failing over, and the mutex used is the
+ * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods.
+ * These are used like a lock and condition variable.
+ *
+ * <p/>The wrapped operation may throw a FailoverException, this is an exception that can be raised by a
+ * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a
+ * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are
+ * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous
+ * methods that should not be attempted when a fail-over is in progress.
+ *
+ * <p/>Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be
+ * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want
+ * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation
+ * until it succeeds. Synchronous methods are usually coordinated with a
+ * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants
+ * to start and throws a FailoverException in response to this.
+ *
+ * <p/>Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be
+ * started during fail-over, but be delayed until any current fail-over has completed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a continuation synchronized on a fail-over lock and condition.
+ * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
+ * </table>
+ *
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
+ * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
+ * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
+ * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
+ * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
+ *
+ * @todo InterruptedException not handled well.
+ */
+public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E>
+{
+ /** Used for debugging. */
+ private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class);
+
+ /** The protected operation that is to be retried in the event of fail-over. */
+ FailoverProtectedOperation<T, E> operation;
+
+ /** The connection on which the fail-over protected operation is to be performed. */
+ AMQConnection connection;
+
+ /**
+ * Creates an automatic retrying fail-over handler for the specified operation.
+ *
+ * @param operation The fail-over protected operation to wrap in this handler.
+ */
+ public FailoverRetrySupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
+ {
+ this.operation = operation;
+ this.connection = con;
+ }
+
+ /**
+ * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats
+ * until the operation throws AMQException or succeeds without being interrupted by fail-over.
+ *
+ * @return The result of executing the continuation.
+ *
+ * @throws E Any underlying exception is allowed to fall through.
+ */
+ public T execute() throws E
+ {
+ while (true)
+ {
+ try
+ {
+ connection.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ _log.debug("Interrupted: " + e, e);
+
+ return null;
+ }
+
+ synchronized (connection.getFailoverMutex())
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ _log.debug("Failover exception caught during operation: " + e, e);
+ }
+ catch (IllegalStateException e)
+ {
+ if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
+ {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
index a150d1446a..5bd36aa88b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
@@ -1,36 +1,36 @@
-package org.apache.qpid.client.handler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQChannelClosedException;
-import org.apache.qpid.protocol.AMQConstant;
-
-public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
-{
- private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class);
-
- private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler();
-
- public static AccessRequestOkMethodHandler getInstance()
- {
- return _handler;
- }
-
- public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
- throws AMQException
- {
- _logger.debug("AccessRequestOk method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
- session.setTicket(method.getTicket(), channelId);
-
- }
-}
+package org.apache.qpid.client.handler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class);
+
+ private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler();
+
+ public static AccessRequestOkMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
+ throws AMQException
+ {
+ _logger.debug("AccessRequestOk method received");
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ session.setTicket(method.getTicket(), channelId);
+
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index de976b05bd..afb7517a12 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -1,528 +1,528 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.handler;
-
-import java.util.Map;
-import java.util.HashMap;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
-
-public class ClientMethodDispatcherImpl implements MethodDispatcher
-{
-
-
- private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
- private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
- private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
- private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
- private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
- private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
- private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
- private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
- private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
- private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
- private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
- private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
- private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
-
-
-
- private static interface DispatcherFactory
- {
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
- }
-
- private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
- new HashMap<ProtocolVersion, DispatcherFactory>();
-
- static
- {
- _dispatcherFactories.put(ProtocolVersion.v8_0,
- new DispatcherFactory()
- {
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
- {
- return new ClientMethodDispatcherImpl_8_0(stateManager);
- }
- });
-
- _dispatcherFactories.put(ProtocolVersion.v0_9,
- new DispatcherFactory()
- {
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
- {
- return new ClientMethodDispatcherImpl_0_9(stateManager);
- }
- });
-
- }
-
-
- public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
- {
- DispatcherFactory factory = _dispatcherFactories.get(version);
- return factory.createMethodDispatcher(stateManager);
- }
-
-
-
-
- private AMQStateManager _stateManager;
-
- public ClientMethodDispatcherImpl(AMQStateManager stateManager)
- {
- _stateManager = stateManager;
- }
-
-
- public AMQStateManager getStateManager()
- {
- return _stateManager;
- }
-
- public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
- {
- _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
- {
- _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
- {
- _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
- {
- _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
- {
- _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
- {
- _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
- {
- _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
- {
- _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
- {
- _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
- {
- _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
- {
- _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
- {
- _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
- {
- _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+
+
+public class ClientMethodDispatcherImpl implements MethodDispatcher
+{
+
+
+ private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
+ private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
+ private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
+ private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
+ private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+ private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
+ private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
+ private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
+ private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
+ private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
+ private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
+ private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
+
+
+
+ private static interface DispatcherFactory
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
+ }
+
+ private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
+ new HashMap<ProtocolVersion, DispatcherFactory>();
+
+ static
+ {
+ _dispatcherFactories.put(ProtocolVersion.v8_0,
+ new DispatcherFactory()
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ClientMethodDispatcherImpl_8_0(stateManager);
+ }
+ });
+
+ _dispatcherFactories.put(ProtocolVersion.v0_9,
+ new DispatcherFactory()
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ClientMethodDispatcherImpl_0_9(stateManager);
+ }
+ });
+
+ }
+
+
+ public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
+ {
+ DispatcherFactory factory = _dispatcherFactories.get(version);
+ return factory.createMethodDispatcher(stateManager);
+ }
+
+
+
+
+ private AMQStateManager _stateManager;
+
+ public ClientMethodDispatcherImpl(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
+
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
+ {
+ _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
+ {
+ _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
+ {
+ _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+ {
+ _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
+ {
+ _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+ {
+ _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
+ {
+ _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
+ {
+ _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
+ {
+ _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
+ {
+ _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
+ {
+ _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
+ {
+ _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
+ {
+ _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
index ae6d5e8283..e235368357 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
@@ -1,155 +1,155 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.handler;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
-
-public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
-{
- public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
- {
- super(stateManager);
- }
-
-
- public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+
+
+public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
+{
+ public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
index 6bd6874cde..b0f003cd2d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
@@ -1,85 +1,85 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
- package org.apache.qpid.client.handler;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
-
-public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
-{
- public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
- {
- super(stateManager);
- }
-
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+
+public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
+{
+ public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
index 5904131122..9bdde01bf3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
@@ -1,802 +1,802 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.client.message;
-
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-
-/**
- * @author Apache Software Foundation
- */
-public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
-{
-
- protected static final byte BOOLEAN_TYPE = (byte) 1;
-
- protected static final byte BYTE_TYPE = (byte) 2;
-
- protected static final byte BYTEARRAY_TYPE = (byte) 3;
-
- protected static final byte SHORT_TYPE = (byte) 4;
-
- protected static final byte CHAR_TYPE = (byte) 5;
-
- protected static final byte INT_TYPE = (byte) 6;
-
- protected static final byte LONG_TYPE = (byte) 7;
-
- protected static final byte FLOAT_TYPE = (byte) 8;
-
- protected static final byte DOUBLE_TYPE = (byte) 9;
-
- protected static final byte STRING_TYPE = (byte) 10;
-
- protected static final byte NULL_STRING_TYPE = (byte) 11;
-
- /**
- * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
- * a byte array in multiple chunks, hence this is used to track how much is left to be read
- */
- private int _byteArrayRemaining = -1;
-
- AbstractBytesTypedMessage()
- {
- this(null);
- }
-
- /**
- * Construct a stream message with existing data.
- *
- * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
- */
- AbstractBytesTypedMessage(ByteBuffer data)
- {
- super(data); // this instanties a content header
- }
-
-
- AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
- {
- super(messageNbr, contentHeader, exchange, routingKey, data);
- }
-
-
- protected byte readWireType() throws MessageFormatException, MessageEOFException,
- MessageNotReadableException
- {
- checkReadable();
- checkAvailable(1);
- return _data.get();
- }
-
- protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
- {
- checkWritable();
- _data.put(type);
- _changedData = true;
- }
-
- protected boolean readBoolean() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- boolean result;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private boolean readBooleanImpl()
- {
- return _data.get() != 0;
- }
-
- protected byte readByte() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- byte result;
- try
- {
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
-
- private byte readByteImpl()
- {
- return _data.get();
- }
-
- protected short readShort() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- short result;
- try
- {
- switch (wireType)
- {
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
-
- private short readShortImpl()
- {
- return _data.getShort();
- }
-
- /**
- * Note that this method reads a unicode character as two bytes from the stream
- *
- * @return the character read from the stream
- * @throws javax.jms.JMSException
- */
- protected char readChar() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- try
- {
- if(wireType == NULL_STRING_TYPE){
- throw new NullPointerException();
- }
-
- if (wireType != CHAR_TYPE)
- {
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
- }
- else
- {
- checkAvailable(2);
- return readCharImpl();
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private char readCharImpl()
- {
- return _data.getChar();
- }
-
- protected int readInt() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- int result;
- try
- {
- switch (wireType)
- {
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected int readIntImpl()
- {
- return _data.getInt();
- }
-
- protected long readLong() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- long result;
- try
- {
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private long readLongImpl()
- {
- return _data.getLong();
- }
-
- protected float readFloat() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- float result;
- try
- {
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private float readFloatImpl()
- {
- return _data.getFloat();
- }
-
- protected double readDouble() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- double result;
- try
- {
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private double readDoubleImpl()
- {
- return _data.getDouble();
- }
-
- protected String readString() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- String result;
- try
- {
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- throw new NullPointerException("data is null");
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected String readStringImpl() throws JMSException
- {
- try
- {
- return _data.getString(Charset.forName("UTF-8").newDecoder());
- }
- catch (CharacterCodingException e)
- {
- JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- je.setLinkedException(e);
- throw je;
- }
- }
-
- protected int readBytes(byte[] bytes) throws JMSException
- {
- if (bytes == null)
- {
- throw new IllegalArgumentException("byte array must not be null");
- }
- checkReadable();
- // first call
- if (_byteArrayRemaining == -1)
- {
- // type discriminator checked separately so you get a MessageFormatException rather than
- // an EOF even in the case where both would be applicable
- checkAvailable(1);
- byte wireType = readWireType();
- if (wireType != BYTEARRAY_TYPE)
- {
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
- }
- checkAvailable(4);
- int size = _data.getInt();
- // length of -1 indicates null
- if (size == -1)
- {
- return -1;
- }
- else
- {
- if (size > _data.remaining())
- {
- throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " +
- _data.remaining() + " bytes");
- }
- else
- {
- _byteArrayRemaining = size;
- }
- }
- }
- else if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- return -1;
- }
-
- int returnedSize = readBytesImpl(bytes);
- if (returnedSize < bytes.length)
- {
- _byteArrayRemaining = -1;
- }
- return returnedSize;
- }
-
- private int readBytesImpl(byte[] bytes)
- {
- int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
- _byteArrayRemaining -= count;
-
- if (count == 0)
- {
- return 0;
- }
- else
- {
- _data.get(bytes, 0, count);
- return count;
- }
- }
-
- protected Object readObject() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- Object result = null;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
- result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- byte[] bytesResult = new byte[size];
- readBytesImpl(bytesResult);
- result = bytesResult;
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected void writeBoolean(boolean b) throws JMSException
- {
- writeTypeDiscriminator(BOOLEAN_TYPE);
- _data.put(b ? (byte) 1 : (byte) 0);
- }
-
- protected void writeByte(byte b) throws JMSException
- {
- writeTypeDiscriminator(BYTE_TYPE);
- _data.put(b);
- }
-
- protected void writeShort(short i) throws JMSException
- {
- writeTypeDiscriminator(SHORT_TYPE);
- _data.putShort(i);
- }
-
- protected void writeChar(char c) throws JMSException
- {
- writeTypeDiscriminator(CHAR_TYPE);
- _data.putChar(c);
- }
-
- protected void writeInt(int i) throws JMSException
- {
- writeTypeDiscriminator(INT_TYPE);
- writeIntImpl(i);
- }
-
- protected void writeIntImpl(int i)
- {
- _data.putInt(i);
- }
-
- protected void writeLong(long l) throws JMSException
- {
- writeTypeDiscriminator(LONG_TYPE);
- _data.putLong(l);
- }
-
- protected void writeFloat(float v) throws JMSException
- {
- writeTypeDiscriminator(FLOAT_TYPE);
- _data.putFloat(v);
- }
-
- protected void writeDouble(double v) throws JMSException
- {
- writeTypeDiscriminator(DOUBLE_TYPE);
- _data.putDouble(v);
- }
-
- protected void writeString(String string) throws JMSException
- {
- if (string == null)
- {
- writeTypeDiscriminator(NULL_STRING_TYPE);
- }
- else
- {
- writeTypeDiscriminator(STRING_TYPE);
- try
- {
- writeStringImpl(string);
- }
- catch (CharacterCodingException e)
- {
- JMSException ex = new JMSException("Unable to encode string: " + e);
- ex.setLinkedException(e);
- throw ex;
- }
- }
- }
-
- protected void writeStringImpl(String string)
- throws CharacterCodingException
- {
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte) 0);
- }
-
- protected void writeBytes(byte[] bytes) throws JMSException
- {
- writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
- }
-
- protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
- {
- writeTypeDiscriminator(BYTEARRAY_TYPE);
- if (bytes == null)
- {
- _data.putInt(-1);
- }
- else
- {
- _data.putInt(length);
- _data.put(bytes, offset, length);
- }
- }
-
- protected void writeObject(Object object) throws JMSException
- {
- checkWritable();
- Class clazz;
-
- if (object == null)
- {
- // string handles the output of null values
- clazz = String.class;
- }
- else
- {
- clazz = object.getClass();
- }
-
- if (clazz == Byte.class)
- {
- writeByte((Byte) object);
- }
- else if (clazz == Boolean.class)
- {
- writeBoolean((Boolean) object);
- }
- else if (clazz == byte[].class)
- {
- writeBytes((byte[]) object);
- }
- else if (clazz == Short.class)
- {
- writeShort((Short) object);
- }
- else if (clazz == Character.class)
- {
- writeChar((Character) object);
- }
- else if (clazz == Integer.class)
- {
- writeInt((Integer) object);
- }
- else if (clazz == Long.class)
- {
- writeLong((Long) object);
- }
- else if (clazz == Float.class)
- {
- writeFloat((Float) object);
- }
- else if (clazz == Double.class)
- {
- writeDouble((Double) object);
- }
- else if (clazz == String.class)
- {
- writeString((String) object);
- }
- else
- {
- throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+
+/**
+ * @author Apache Software Foundation
+ */
+public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
+{
+
+ protected static final byte BOOLEAN_TYPE = (byte) 1;
+
+ protected static final byte BYTE_TYPE = (byte) 2;
+
+ protected static final byte BYTEARRAY_TYPE = (byte) 3;
+
+ protected static final byte SHORT_TYPE = (byte) 4;
+
+ protected static final byte CHAR_TYPE = (byte) 5;
+
+ protected static final byte INT_TYPE = (byte) 6;
+
+ protected static final byte LONG_TYPE = (byte) 7;
+
+ protected static final byte FLOAT_TYPE = (byte) 8;
+
+ protected static final byte DOUBLE_TYPE = (byte) 9;
+
+ protected static final byte STRING_TYPE = (byte) 10;
+
+ protected static final byte NULL_STRING_TYPE = (byte) 11;
+
+ /**
+ * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
+ * a byte array in multiple chunks, hence this is used to track how much is left to be read
+ */
+ private int _byteArrayRemaining = -1;
+
+ AbstractBytesTypedMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a stream message with existing data.
+ *
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ AbstractBytesTypedMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ }
+
+
+ AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
+ {
+ super(messageNbr, contentHeader, exchange, routingKey, data);
+ }
+
+
+ protected byte readWireType() throws MessageFormatException, MessageEOFException,
+ MessageNotReadableException
+ {
+ checkReadable();
+ checkAvailable(1);
+ return _data.get();
+ }
+
+ protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
+ {
+ checkWritable();
+ _data.put(type);
+ _changedData = true;
+ }
+
+ protected boolean readBoolean() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ boolean result;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private boolean readBooleanImpl()
+ {
+ return _data.get() != 0;
+ }
+
+ protected byte readByte() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ byte result;
+ try
+ {
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
+
+ private byte readByteImpl()
+ {
+ return _data.get();
+ }
+
+ protected short readShort() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ short result;
+ try
+ {
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
+
+ private short readShortImpl()
+ {
+ return _data.getShort();
+ }
+
+ /**
+ * Note that this method reads a unicode character as two bytes from the stream
+ *
+ * @return the character read from the stream
+ * @throws javax.jms.JMSException
+ */
+ protected char readChar() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ try
+ {
+ if(wireType == NULL_STRING_TYPE){
+ throw new NullPointerException();
+ }
+
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private char readCharImpl()
+ {
+ return _data.getChar();
+ }
+
+ protected int readInt() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ int result;
+ try
+ {
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected int readIntImpl()
+ {
+ return _data.getInt();
+ }
+
+ protected long readLong() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ long result;
+ try
+ {
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private long readLongImpl()
+ {
+ return _data.getLong();
+ }
+
+ protected float readFloat() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ float result;
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private float readFloatImpl()
+ {
+ return _data.getFloat();
+ }
+
+ protected double readDouble() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ double result;
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private double readDoubleImpl()
+ {
+ return _data.getDouble();
+ }
+
+ protected String readString() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ String result;
+ try
+ {
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ throw new NullPointerException("data is null");
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected String readStringImpl() throws JMSException
+ {
+ try
+ {
+ return _data.getString(Charset.forName("UTF-8").newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+
+ protected int readBytes(byte[] bytes) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ checkReadable();
+ // first call
+ if (_byteArrayRemaining == -1)
+ {
+ // type discriminator checked separately so you get a MessageFormatException rather than
+ // an EOF even in the case where both would be applicable
+ checkAvailable(1);
+ byte wireType = readWireType();
+ if (wireType != BYTEARRAY_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+ }
+ checkAvailable(4);
+ int size = _data.getInt();
+ // length of -1 indicates null
+ if (size == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ if (size > _data.remaining())
+ {
+ throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " +
+ _data.remaining() + " bytes");
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ }
+ }
+ }
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
+
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
+ }
+
+ private int readBytesImpl(byte[] bytes)
+ {
+ int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
+ _byteArrayRemaining -= count;
+
+ if (count == 0)
+ {
+ return 0;
+ }
+ else
+ {
+ _data.get(bytes, 0, count);
+ return count;
+ }
+ }
+
+ protected Object readObject() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ Object result = null;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ byte[] bytesResult = new byte[size];
+ readBytesImpl(bytesResult);
+ result = bytesResult;
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected void writeBoolean(boolean b) throws JMSException
+ {
+ writeTypeDiscriminator(BOOLEAN_TYPE);
+ _data.put(b ? (byte) 1 : (byte) 0);
+ }
+
+ protected void writeByte(byte b) throws JMSException
+ {
+ writeTypeDiscriminator(BYTE_TYPE);
+ _data.put(b);
+ }
+
+ protected void writeShort(short i) throws JMSException
+ {
+ writeTypeDiscriminator(SHORT_TYPE);
+ _data.putShort(i);
+ }
+
+ protected void writeChar(char c) throws JMSException
+ {
+ writeTypeDiscriminator(CHAR_TYPE);
+ _data.putChar(c);
+ }
+
+ protected void writeInt(int i) throws JMSException
+ {
+ writeTypeDiscriminator(INT_TYPE);
+ writeIntImpl(i);
+ }
+
+ protected void writeIntImpl(int i)
+ {
+ _data.putInt(i);
+ }
+
+ protected void writeLong(long l) throws JMSException
+ {
+ writeTypeDiscriminator(LONG_TYPE);
+ _data.putLong(l);
+ }
+
+ protected void writeFloat(float v) throws JMSException
+ {
+ writeTypeDiscriminator(FLOAT_TYPE);
+ _data.putFloat(v);
+ }
+
+ protected void writeDouble(double v) throws JMSException
+ {
+ writeTypeDiscriminator(DOUBLE_TYPE);
+ _data.putDouble(v);
+ }
+
+ protected void writeString(String string) throws JMSException
+ {
+ if (string == null)
+ {
+ writeTypeDiscriminator(NULL_STRING_TYPE);
+ }
+ else
+ {
+ writeTypeDiscriminator(STRING_TYPE);
+ try
+ {
+ writeStringImpl(string);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ }
+ }
+
+ protected void writeStringImpl(String string)
+ throws CharacterCodingException
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must write the null terminator ourselves
+ _data.put((byte) 0);
+ }
+
+ protected void writeBytes(byte[] bytes) throws JMSException
+ {
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
+ }
+
+ protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ writeTypeDiscriminator(BYTEARRAY_TYPE);
+ if (bytes == null)
+ {
+ _data.putInt(-1);
+ }
+ else
+ {
+ _data.putInt(length);
+ _data.put(bytes, offset, length);
+ }
+ }
+
+ protected void writeObject(Object object) throws JMSException
+ {
+ checkWritable();
+ Class clazz;
+
+ if (object == null)
+ {
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
+ }
+
+ if (clazz == Byte.class)
+ {
+ writeByte((Byte) object);
+ }
+ else if (clazz == Boolean.class)
+ {
+ writeBoolean((Boolean) object);
+ }
+ else if (clazz == byte[].class)
+ {
+ writeBytes((byte[]) object);
+ }
+ else if (clazz == Short.class)
+ {
+ writeShort((Short) object);
+ }
+ else if (clazz == Character.class)
+ {
+ writeChar((Character) object);
+ }
+ else if (clazz == Integer.class)
+ {
+ writeInt((Integer) object);
+ }
+ else if (clazz == Long.class)
+ {
+ writeLong((Long) object);
+ }
+ else if (clazz == Float.class)
+ {
+ writeFloat((Float) object);
+ }
+ else if (clazz == Double.class)
+ {
+ writeDouble((Double) object);
+ }
+ else if (clazz == String.class)
+ {
+ writeString((String) object);
+ }
+ else
+ {
+ throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index 39b4e1e27b..fec0117a03 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -1,552 +1,552 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-import java.util.Enumeration;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-
-public final class JMSHeaderAdapter
-{
- private final FieldTable _headers;
-
- public JMSHeaderAdapter(FieldTable headers)
- {
- _headers = headers;
- }
-
-
- public FieldTable getHeaders()
- {
- return _headers;
- }
-
- public boolean getBoolean(String string) throws JMSException
- {
- checkPropertyName(string);
- Boolean b = getHeaders().getBoolean(string);
-
- if (b == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf((String) str);
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
- }
-
- return b;
- }
-
- public boolean getBoolean(AMQShortString string) throws JMSException
- {
- checkPropertyName(string);
- Boolean b = getHeaders().getBoolean(string);
-
- if (b == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf((String) str);
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
- }
-
- return b;
- }
-
- public char getCharacter(String string) throws JMSException
- {
- checkPropertyName(string);
- Character c = getHeaders().getCharacter(string);
-
- if (c == null)
- {
- if (getHeaders().isNullStringValue(string))
- {
- throw new NullPointerException("Cannot convert null char");
- }
- else
- {
- throw new MessageFormatException("getChar can't use " + string + " item.");
- }
- }
- else
- {
- return (char) c;
- }
- }
-
- public byte[] getBytes(String string) throws JMSException
- {
- return getBytes(new AMQShortString(string));
- }
-
- public byte[] getBytes(AMQShortString string) throws JMSException
- {
- checkPropertyName(string);
-
- byte[] bs = getHeaders().getBytes(string);
-
- if (bs == null)
- {
- throw new MessageFormatException("getBytes can't use " + string + " item.");
- }
- else
- {
- return bs;
- }
- }
-
- public byte getByte(String string) throws JMSException
- {
- checkPropertyName(string);
- Byte b = getHeaders().getByte(string);
- if (b == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getByte can't use " + string + " item.");
- }
- else
- {
- return Byte.valueOf((String) str);
- }
- }
- else
- {
- b = Byte.valueOf(null);
- }
- }
-
- return b;
- }
-
- public short getShort(String string) throws JMSException
- {
- checkPropertyName(string);
- Short s = getHeaders().getShort(string);
-
- if (s == null)
- {
- s = Short.valueOf(getByte(string));
- }
-
- return s;
- }
-
- public int getInteger(String string) throws JMSException
- {
- checkPropertyName(string);
- Integer i = getHeaders().getInteger(string);
-
- if (i == null)
- {
- i = Integer.valueOf(getShort(string));
- }
-
- return i;
- }
-
- public long getLong(String string) throws JMSException
- {
- checkPropertyName(string);
- Long l = getHeaders().getLong(string);
-
- if (l == null)
- {
- l = Long.valueOf(getInteger(string));
- }
-
- return l;
- }
-
- public float getFloat(String string) throws JMSException
- {
- checkPropertyName(string);
- Float f = getHeaders().getFloat(string);
-
- if (f == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getFloat can't use " + string + " item.");
- }
- else
- {
- return Float.valueOf((String) str);
- }
- }
- else
- {
- f = Float.valueOf(null);
- }
-
- }
-
- return f;
- }
-
- public double getDouble(String string) throws JMSException
- {
- checkPropertyName(string);
- Double d = getHeaders().getDouble(string);
-
- if (d == null)
- {
- d = Double.valueOf(getFloat(string));
- }
-
- return d;
- }
-
- public String getString(String string) throws JMSException
- {
- checkPropertyName(string);
- String s = getHeaders().getString(string);
-
- if (s == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object o = getHeaders().getObject(string);
- if (o instanceof byte[])
- {
- throw new MessageFormatException("getObject couldn't find " + string + " item.");
- }
- else
- {
- if (o == null)
- {
- return null;
- }
- else
- {
- s = String.valueOf(o);
- }
- }
- }//else return s // null;
- }
-
- return s;
- }
-
- public Object getObject(String string) throws JMSException
- {
- checkPropertyName(string);
- return getHeaders().getObject(string);
- }
-
- public void setBoolean(AMQShortString string, boolean b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setBoolean(string, b);
- }
-
- public void setBoolean(String string, boolean b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setBoolean(string, b);
- }
-
- public void setChar(String string, char c) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setChar(string, c);
- }
-
- public Object setBytes(AMQShortString string, byte[] bytes)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes);
- }
-
- public Object setBytes(String string, byte[] bytes)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes);
- }
-
- public Object setBytes(String string, byte[] bytes, int start, int length)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes, start, length);
- }
-
- public void setByte(String string, byte b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setByte(string, b);
- }
-
- public void setByte(AMQShortString string, byte b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setByte(string, b);
- }
-
-
- public void setShort(String string, short i) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setShort(string, i);
- }
-
- public void setInteger(String string, int i) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setInteger(string, i);
- }
-
- public void setInteger(AMQShortString string, int i) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setInteger(string, i);
- }
-
- public void setLong(String string, long l) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setLong(string, l);
- }
-
- public void setFloat(String string, float v) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setFloat(string, v);
- }
-
- public void setDouble(String string, double v) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setDouble(string, v);
- }
-
- public void setString(String string, String string1) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setString(string, string1);
- }
-
- public void setString(AMQShortString string, String string1) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setString(string, string1);
- }
-
- public void setObject(String string, Object object) throws JMSException
- {
- checkPropertyName(string);
- try
- {
- getHeaders().setObject(string, object);
- }
- catch (AMQPInvalidClassException aice)
- {
- MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
- mfe.setLinkedException(aice);
- throw mfe;
- }
- }
-
- public boolean itemExists(String string) throws JMSException
- {
- checkPropertyName(string);
- return getHeaders().containsKey(string);
- }
-
- public Enumeration getPropertyNames()
- {
- return getHeaders().getPropertyNames();
- }
-
- public void clear()
- {
- getHeaders().clear();
- }
-
- public boolean propertyExists(AMQShortString propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().propertyExists(propertyName);
- }
-
- public boolean propertyExists(String propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().propertyExists(propertyName);
- }
-
- public Object put(Object key, Object value)
- {
- checkPropertyName(key.toString());
- return getHeaders().setObject(key.toString(), value);
- }
-
- public Object remove(AMQShortString propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().remove(propertyName);
- }
-
- public Object remove(String propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().remove(propertyName);
- }
-
- public boolean isEmpty()
- {
- return getHeaders().isEmpty();
- }
-
- public void writeToBuffer(ByteBuffer data)
- {
- getHeaders().writeToBuffer(data);
- }
-
- public Enumeration getMapNames()
- {
- return getPropertyNames();
- }
-
- protected static void checkPropertyName(CharSequence propertyName)
- {
- if (propertyName == null)
- {
- throw new IllegalArgumentException("Property name must not be null");
- }
- else if (propertyName.length() == 0)
- {
- throw new IllegalArgumentException("Property name must not be the empty string");
- }
-
- checkIdentiferFormat(propertyName);
- }
-
- protected static void checkIdentiferFormat(CharSequence propertyName)
- {
-// JMS requirements 3.5.1 Property Names
-// Identifiers:
-// - An identifier is an unlimited-length character sequence that must begin
-// with a Java identifier start character; all following characters must be Java
-// identifier part characters. An identifier start character is any character for
-// which the method Character.isJavaIdentifierStart returns true. This includes
-// '_' and '$'. An identifier part character is any character for which the
-// method Character.isJavaIdentifierPart returns true.
-// - Identifiers cannot be the names NULL, TRUE, or FALSE.
-// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
-// ESCAPE.
-// � Identifiers are either header field references or property references. The
-// type of a property value in a message selector corresponds to the type
-// used to set the property. If a property that does not exist in a message is
-// referenced, its value is NULL. The semantics of evaluating NULL values
-// in a selector are described in Section 3.8.1.2, �Null Values.�
-// � The conversions that apply to the get methods for properties do not
-// apply when a property is used in a message selector expression. For
-// example, suppose you set a property as a string value, as in the
-// following:
-// myMessage.setStringProperty("NumberOfOrders", "2");
-// The following expression in a message selector would evaluate to false,
-// because a string cannot be used in an arithmetic expression:
-// "NumberOfOrders > 1"
-// � Identifiers are case sensitive.
-// � Message header field references are restricted to JMSDeliveryMode,
-// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
-// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
-// null and if so are treated as a NULL value.
-
- if (Boolean.getBoolean("strict-jms"))
- {
- // JMS start character
- if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
- }
-
- // JMS part character
- int length = propertyName.length();
- for (int c = 1; c < length; c++)
- {
- if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
- }
- }
-
- // JMS invalid names
- if ((propertyName.equals("NULL")
- || propertyName.equals("TRUE")
- || propertyName.equals("FALSE")
- || propertyName.equals("NOT")
- || propertyName.equals("AND")
- || propertyName.equals("OR")
- || propertyName.equals("BETWEEN")
- || propertyName.equals("LIKE")
- || propertyName.equals("IN")
- || propertyName.equals("IS")
- || propertyName.equals("ESCAPE")))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
- }
- }
-
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import java.util.Enumeration;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+
+public final class JMSHeaderAdapter
+{
+ private final FieldTable _headers;
+
+ public JMSHeaderAdapter(FieldTable headers)
+ {
+ _headers = headers;
+ }
+
+
+ public FieldTable getHeaders()
+ {
+ return _headers;
+ }
+
+ public boolean getBoolean(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Boolean b = getHeaders().getBoolean(string);
+
+ if (b == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object str = getHeaders().getObject(string);
+
+ if (str == null || !(str instanceof String))
+ {
+ throw new MessageFormatException("getBoolean can't use " + string + " item.");
+ }
+ else
+ {
+ return Boolean.valueOf((String) str);
+ }
+ }
+ else
+ {
+ b = Boolean.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public boolean getBoolean(AMQShortString string) throws JMSException
+ {
+ checkPropertyName(string);
+ Boolean b = getHeaders().getBoolean(string);
+
+ if (b == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object str = getHeaders().getObject(string);
+
+ if (str == null || !(str instanceof String))
+ {
+ throw new MessageFormatException("getBoolean can't use " + string + " item.");
+ }
+ else
+ {
+ return Boolean.valueOf((String) str);
+ }
+ }
+ else
+ {
+ b = Boolean.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public char getCharacter(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Character c = getHeaders().getCharacter(string);
+
+ if (c == null)
+ {
+ if (getHeaders().isNullStringValue(string))
+ {
+ throw new NullPointerException("Cannot convert null char");
+ }
+ else
+ {
+ throw new MessageFormatException("getChar can't use " + string + " item.");
+ }
+ }
+ else
+ {
+ return (char) c;
+ }
+ }
+
+ public byte[] getBytes(String string) throws JMSException
+ {
+ return getBytes(new AMQShortString(string));
+ }
+
+ public byte[] getBytes(AMQShortString string) throws JMSException
+ {
+ checkPropertyName(string);
+
+ byte[] bs = getHeaders().getBytes(string);
+
+ if (bs == null)
+ {
+ throw new MessageFormatException("getBytes can't use " + string + " item.");
+ }
+ else
+ {
+ return bs;
+ }
+ }
+
+ public byte getByte(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Byte b = getHeaders().getByte(string);
+ if (b == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object str = getHeaders().getObject(string);
+
+ if (str == null || !(str instanceof String))
+ {
+ throw new MessageFormatException("getByte can't use " + string + " item.");
+ }
+ else
+ {
+ return Byte.valueOf((String) str);
+ }
+ }
+ else
+ {
+ b = Byte.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public short getShort(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Short s = getHeaders().getShort(string);
+
+ if (s == null)
+ {
+ s = Short.valueOf(getByte(string));
+ }
+
+ return s;
+ }
+
+ public int getInteger(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Integer i = getHeaders().getInteger(string);
+
+ if (i == null)
+ {
+ i = Integer.valueOf(getShort(string));
+ }
+
+ return i;
+ }
+
+ public long getLong(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Long l = getHeaders().getLong(string);
+
+ if (l == null)
+ {
+ l = Long.valueOf(getInteger(string));
+ }
+
+ return l;
+ }
+
+ public float getFloat(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Float f = getHeaders().getFloat(string);
+
+ if (f == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object str = getHeaders().getObject(string);
+
+ if (str == null || !(str instanceof String))
+ {
+ throw new MessageFormatException("getFloat can't use " + string + " item.");
+ }
+ else
+ {
+ return Float.valueOf((String) str);
+ }
+ }
+ else
+ {
+ f = Float.valueOf(null);
+ }
+
+ }
+
+ return f;
+ }
+
+ public double getDouble(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Double d = getHeaders().getDouble(string);
+
+ if (d == null)
+ {
+ d = Double.valueOf(getFloat(string));
+ }
+
+ return d;
+ }
+
+ public String getString(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ String s = getHeaders().getString(string);
+
+ if (s == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object o = getHeaders().getObject(string);
+ if (o instanceof byte[])
+ {
+ throw new MessageFormatException("getObject couldn't find " + string + " item.");
+ }
+ else
+ {
+ if (o == null)
+ {
+ return null;
+ }
+ else
+ {
+ s = String.valueOf(o);
+ }
+ }
+ }//else return s // null;
+ }
+
+ return s;
+ }
+
+ public Object getObject(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ return getHeaders().getObject(string);
+ }
+
+ public void setBoolean(AMQShortString string, boolean b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setBoolean(string, b);
+ }
+
+ public void setBoolean(String string, boolean b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setBoolean(string, b);
+ }
+
+ public void setChar(String string, char c) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setChar(string, c);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes)
+ {
+ checkPropertyName(string);
+ return getHeaders().setBytes(string, bytes);
+ }
+
+ public Object setBytes(String string, byte[] bytes)
+ {
+ checkPropertyName(string);
+ return getHeaders().setBytes(string, bytes);
+ }
+
+ public Object setBytes(String string, byte[] bytes, int start, int length)
+ {
+ checkPropertyName(string);
+ return getHeaders().setBytes(string, bytes, start, length);
+ }
+
+ public void setByte(String string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setByte(string, b);
+ }
+
+ public void setByte(AMQShortString string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setByte(string, b);
+ }
+
+
+ public void setShort(String string, short i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setShort(string, i);
+ }
+
+ public void setInteger(String string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setInteger(string, i);
+ }
+
+ public void setInteger(AMQShortString string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setInteger(string, i);
+ }
+
+ public void setLong(String string, long l) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setLong(string, l);
+ }
+
+ public void setFloat(String string, float v) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setFloat(string, v);
+ }
+
+ public void setDouble(String string, double v) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setDouble(string, v);
+ }
+
+ public void setString(String string, String string1) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setString(string, string1);
+ }
+
+ public void setString(AMQShortString string, String string1) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setString(string, string1);
+ }
+
+ public void setObject(String string, Object object) throws JMSException
+ {
+ checkPropertyName(string);
+ try
+ {
+ getHeaders().setObject(string, object);
+ }
+ catch (AMQPInvalidClassException aice)
+ {
+ MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ mfe.setLinkedException(aice);
+ throw mfe;
+ }
+ }
+
+ public boolean itemExists(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ return getHeaders().containsKey(string);
+ }
+
+ public Enumeration getPropertyNames()
+ {
+ return getHeaders().getPropertyNames();
+ }
+
+ public void clear()
+ {
+ getHeaders().clear();
+ }
+
+ public boolean propertyExists(AMQShortString propertyName)
+ {
+ checkPropertyName(propertyName);
+ return getHeaders().propertyExists(propertyName);
+ }
+
+ public boolean propertyExists(String propertyName)
+ {
+ checkPropertyName(propertyName);
+ return getHeaders().propertyExists(propertyName);
+ }
+
+ public Object put(Object key, Object value)
+ {
+ checkPropertyName(key.toString());
+ return getHeaders().setObject(key.toString(), value);
+ }
+
+ public Object remove(AMQShortString propertyName)
+ {
+ checkPropertyName(propertyName);
+ return getHeaders().remove(propertyName);
+ }
+
+ public Object remove(String propertyName)
+ {
+ checkPropertyName(propertyName);
+ return getHeaders().remove(propertyName);
+ }
+
+ public boolean isEmpty()
+ {
+ return getHeaders().isEmpty();
+ }
+
+ public void writeToBuffer(ByteBuffer data)
+ {
+ getHeaders().writeToBuffer(data);
+ }
+
+ public Enumeration getMapNames()
+ {
+ return getPropertyNames();
+ }
+
+ protected static void checkPropertyName(CharSequence propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ checkIdentiferFormat(propertyName);
+ }
+
+ protected static void checkIdentiferFormat(CharSequence propertyName)
+ {
+// JMS requirements 3.5.1 Property Names
+// Identifiers:
+// - An identifier is an unlimited-length character sequence that must begin
+// with a Java identifier start character; all following characters must be Java
+// identifier part characters. An identifier start character is any character for
+// which the method Character.isJavaIdentifierStart returns true. This includes
+// '_' and '$'. An identifier part character is any character for which the
+// method Character.isJavaIdentifierPart returns true.
+// - Identifiers cannot be the names NULL, TRUE, or FALSE.
+// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// ESCAPE.
+// � Identifiers are either header field references or property references. The
+// type of a property value in a message selector corresponds to the type
+// used to set the property. If a property that does not exist in a message is
+// referenced, its value is NULL. The semantics of evaluating NULL values
+// in a selector are described in Section 3.8.1.2, �Null Values.�
+// � The conversions that apply to the get methods for properties do not
+// apply when a property is used in a message selector expression. For
+// example, suppose you set a property as a string value, as in the
+// following:
+// myMessage.setStringProperty("NumberOfOrders", "2");
+// The following expression in a message selector would evaluate to false,
+// because a string cannot be used in an arithmetic expression:
+// "NumberOfOrders > 1"
+// � Identifiers are case sensitive.
+// � Message header field references are restricted to JMSDeliveryMode,
+// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+// null and if so are treated as a NULL value.
+
+ if (Boolean.getBoolean("strict-jms"))
+ {
+ // JMS start character
+ if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+ }
+
+ // JMS part character
+ int length = propertyName.length();
+ for (int c = 1; c < length; c++)
+ {
+ if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+ }
+ }
+
+ // JMS invalid names
+ if ((propertyName.equals("NULL")
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+ }
+ }
+
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java
index 5185278eef..2c99b9a97b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java
@@ -1,32 +1,32 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.state;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.AMQException;
-
-public class AMQMethodNotImplementedException extends AMQException
-{
- public AMQMethodNotImplementedException(AMQMethodBody body)
- {
- super(null, "Unexpected Method Received: " + body.getClass().getName(), null);
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
+
+public class AMQMethodNotImplementedException extends AMQException
+{
+ public AMQMethodNotImplementedException(AMQMethodBody body)
+ {
+ super(null, "Unexpected Method Received: " + body.getClass().getName(), null);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java
index 6752ee616f..e65f9ad2f4 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/Message.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java
@@ -1,28 +1,28 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms;
-
-import javax.jms.JMSException;
-
-public interface Message extends javax.jms.Message
-{
- public void acknowledgeThis() throws JMSException;
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+
+public interface Message extends javax.jms.Message
+{
+ public void acknowledgeThis() throws JMSException;
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
index a5279a195b..1738db7239 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
@@ -1,104 +1,104 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.unit.basic;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.testutil.QpidTestCase;
-
-import javax.jms.Session;
-import javax.jms.QueueSession;
-import javax.jms.Queue;
-import javax.jms.QueueSender;
-import javax.jms.TextMessage;
-import javax.jms.InvalidDestinationException;
-
-public class InvalidDestinationTest extends QpidTestCase
-{
- private AMQConnection _connection;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _connection = (AMQConnection) getConnection("guest", "guest");
- }
-
- protected void tearDown() throws Exception
- {
- _connection.close();
- super.tearDown();
- }
-
-
-
- public void testInvalidDestination() throws Exception
- {
- Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
- AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
- QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // This is the only easy way to create and bind a queue from the API :-(
- queueSession.createConsumer(validDestination);
-
- QueueSender sender = queueSession.createSender(invalidDestination);
- TextMessage msg = queueSession.createTextMessage("Hello");
- try
- {
- sender.send(msg);
- fail("Expected InvalidDestinationException");
- }
- catch (InvalidDestinationException ex)
- {
- // pass
- }
- sender.close();
-
- sender = queueSession.createSender(null);
- invalidDestination = new AMQQueue("amq.direct","unknownQ");
-
- try
- {
- sender.send(invalidDestination,msg);
- fail("Expected InvalidDestinationException");
- }
- catch (InvalidDestinationException ex)
- {
- // pass
- }
- sender.send(validDestination,msg);
- sender.close();
- validDestination = new AMQQueue("amq.direct","knownQ");
- sender = queueSession.createSender(validDestination);
- sender.send(msg);
-
-
-
-
- }
-
-
- public static junit.framework.Test suite()
- {
-
- return new junit.framework.TestSuite(InvalidDestinationTest.class);
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.testutil.QpidTestCase;
+
+import javax.jms.Session;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.TextMessage;
+import javax.jms.InvalidDestinationException;
+
+public class InvalidDestinationTest extends QpidTestCase
+{
+ private AMQConnection _connection;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = (AMQConnection) getConnection("guest", "guest");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ super.tearDown();
+ }
+
+
+
+ public void testInvalidDestination() throws Exception
+ {
+ Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
+ AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+ QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This is the only easy way to create and bind a queue from the API :-(
+ queueSession.createConsumer(validDestination);
+
+ QueueSender sender = queueSession.createSender(invalidDestination);
+ TextMessage msg = queueSession.createTextMessage("Hello");
+ try
+ {
+ sender.send(msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.close();
+
+ sender = queueSession.createSender(null);
+ invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+ try
+ {
+ sender.send(invalidDestination,msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.send(validDestination,msg);
+ sender.close();
+ validDestination = new AMQQueue("amq.direct","knownQ");
+ sender = queueSession.createSender(validDestination);
+ sender.send(msg);
+
+
+
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+
+ return new junit.framework.TestSuite(InvalidDestinationTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
index 34197f2608..46b99fac8d 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -1,221 +1,221 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.unit.client.temporaryqueue;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TextMessage;
-import junit.framework.Assert;
-
-import org.apache.qpid.testutil.QpidTestCase;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.transport.TransportConnection;
-
-import java.util.List;
-import java.util.LinkedList;
-
-public class TemporaryQueueTest extends QpidTestCase
-{
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- protected Connection createConnection() throws Exception
- {
- return getConnection("guest", "guest");
- }
-
- public void testTempoaryQueue() throws Exception
- {
- Connection conn = createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue queue = session.createTemporaryQueue();
- assertNotNull(queue);
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
- conn.start();
- producer.send(session.createTextMessage("hello"));
- TextMessage tm = (TextMessage) consumer.receive(2000);
- assertNotNull(tm);
- assertEquals("hello", tm.getText());
-
- try
- {
- queue.delete();
- fail("Expected JMSException : should not be able to delete while there are active consumers");
- }
- catch (JMSException je)
- {
- ; //pass
- }
-
- consumer.close();
-
- try
- {
- queue.delete();
- }
- catch (JMSException je)
- {
- fail("Unexpected Exception: " + je.getMessage());
- }
-
- conn.close();
- }
-
- public void tUniqueness() throws Exception
- {
- int numProcs = Runtime.getRuntime().availableProcessors();
- final int threadsProc = 5;
-
- runUniqueness(1, 10);
- runUniqueness(numProcs * threadsProc, 10);
- runUniqueness(numProcs * threadsProc, 100);
- runUniqueness(numProcs * threadsProc, 500);
- }
-
- void runUniqueness(int makers, int queues) throws Exception
- {
- Connection connection = createConnection();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
-
- //Create Makers
- for (int m = 0; m < makers; m++)
- {
- tqList.add(new TempQueueMaker(session, queues));
- }
-
-
- List<Thread> threadList = new LinkedList<Thread>();
-
- //Create Makers
- for (TempQueueMaker maker : tqList)
- {
- threadList.add(new Thread(maker));
- }
-
- //Start threads
- for (Thread thread : threadList)
- {
- thread.start();
- }
-
- // Join Threads
- for (Thread thread : threadList)
- {
- try
- {
- thread.join();
- }
- catch (InterruptedException e)
- {
- fail("Couldn't correctly join threads");
- }
- }
-
-
- List<AMQQueue> list = new LinkedList<AMQQueue>();
-
- // Test values
- for (TempQueueMaker maker : tqList)
- {
- check(maker, list);
- }
-
- Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
-
- connection.close();
- }
-
- private void check(TempQueueMaker tq, List<AMQQueue> list)
- {
- for (AMQQueue q : tq.getList())
- {
- if (list.contains(q))
- {
- fail(q + " already exists.");
- }
- else
- {
- list.add(q);
- }
- }
- }
-
-
- class TempQueueMaker implements Runnable
- {
- List<AMQQueue> _queues;
- Session _session;
- private int _count;
-
-
- TempQueueMaker(Session session, int queues) throws JMSException
- {
- _queues = new LinkedList<AMQQueue>();
-
- _count = queues;
-
- _session = session;
- }
-
- public void run()
- {
- int i = 0;
- try
- {
- for (; i < _count; i++)
- {
- _queues.add((AMQQueue) _session.createTemporaryQueue());
- }
- }
- catch (JMSException jmse)
- {
- //stop
- }
- }
-
- List<AMQQueue> getList()
- {
- return _queues;
- }
- }
-
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TemporaryQueueTest.class);
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import junit.framework.Assert;
+
+import org.apache.qpid.testutil.QpidTestCase;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import java.util.List;
+import java.util.LinkedList;
+
+public class TemporaryQueueTest extends QpidTestCase
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ protected Connection createConnection() throws Exception
+ {
+ return getConnection("guest", "guest");
+ }
+
+ public void testTempoaryQueue() throws Exception
+ {
+ Connection conn = createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("hello", tm.getText());
+
+ try
+ {
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch (JMSException je)
+ {
+ ; //pass
+ }
+
+ consumer.close();
+
+ try
+ {
+ queue.delete();
+ }
+ catch (JMSException je)
+ {
+ fail("Unexpected Exception: " + je.getMessage());
+ }
+
+ conn.close();
+ }
+
+ public void tUniqueness() throws Exception
+ {
+ int numProcs = Runtime.getRuntime().availableProcessors();
+ final int threadsProc = 5;
+
+ runUniqueness(1, 10);
+ runUniqueness(numProcs * threadsProc, 10);
+ runUniqueness(numProcs * threadsProc, 100);
+ runUniqueness(numProcs * threadsProc, 500);
+ }
+
+ void runUniqueness(int makers, int queues) throws Exception
+ {
+ Connection connection = createConnection();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
+
+ //Create Makers
+ for (int m = 0; m < makers; m++)
+ {
+ tqList.add(new TempQueueMaker(session, queues));
+ }
+
+
+ List<Thread> threadList = new LinkedList<Thread>();
+
+ //Create Makers
+ for (TempQueueMaker maker : tqList)
+ {
+ threadList.add(new Thread(maker));
+ }
+
+ //Start threads
+ for (Thread thread : threadList)
+ {
+ thread.start();
+ }
+
+ // Join Threads
+ for (Thread thread : threadList)
+ {
+ try
+ {
+ thread.join();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Couldn't correctly join threads");
+ }
+ }
+
+
+ List<AMQQueue> list = new LinkedList<AMQQueue>();
+
+ // Test values
+ for (TempQueueMaker maker : tqList)
+ {
+ check(maker, list);
+ }
+
+ Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
+
+ connection.close();
+ }
+
+ private void check(TempQueueMaker tq, List<AMQQueue> list)
+ {
+ for (AMQQueue q : tq.getList())
+ {
+ if (list.contains(q))
+ {
+ fail(q + " already exists.");
+ }
+ else
+ {
+ list.add(q);
+ }
+ }
+ }
+
+
+ class TempQueueMaker implements Runnable
+ {
+ List<AMQQueue> _queues;
+ Session _session;
+ private int _count;
+
+
+ TempQueueMaker(Session session, int queues) throws JMSException
+ {
+ _queues = new LinkedList<AMQQueue>();
+
+ _count = queues;
+
+ _session = session;
+ }
+
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ for (; i < _count; i++)
+ {
+ _queues.add((AMQQueue) _session.createTemporaryQueue());
+ }
+ }
+ catch (JMSException jmse)
+ {
+ //stop
+ }
+ }
+
+ List<AMQQueue> getList()
+ {
+ return _queues;
+ }
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TemporaryQueueTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
index d25986d991..54b2ee95f4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
@@ -1,144 +1,144 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.close;
-
-import junit.framework.Assert;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.testutil.QpidTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.junit.concurrency.TestRunnable;
-import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-/**
- * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method.
- * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent
- * before closing the connection.
- *
- * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Check that
- * closing a connection whilst handling a message, blocks till completion of the handler. </table>
- */
-public class CloseBeforeAckTest extends QpidTestCase
-{
- private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class);
-
- Connection connection;
- Session session;
- public static final String TEST_QUEUE_NAME = "TestQueue";
- private int TEST_COUNT = 25;
-
- class TestThread1 extends TestRunnable implements MessageListener
- {
- public void runWithExceptions() throws Exception
- {
- // Set this up to listen for message on the test session.
- session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this);
- }
-
- public void onMessage(Message message)
- {
- // Give thread 2 permission to close the session.
- allow(new int[] { 1 });
-
- // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete.
- waitFor(new int[] { 1 }, true);
- }
- }
-
- TestThread1 testThread1 = new TestThread1();
-
- TestRunnable testThread2 =
- new TestRunnable()
- {
- public void runWithExceptions() throws Exception
- {
- // Send a message to be picked up by thread 1.
- session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME),
- session.createTextMessage("Hi there thread 1!"));
-
- // Wait for thread 1 to pick up the message and give permission to continue.
- waitFor(new int[] { 0 }, false);
-
- // Close the connection.
- session.close();
-
- // Allow thread 1 to continue to completion, if it is erronously still waiting.
- allow(new int[] { 1 });
- }
- };
-
- public void testCloseBeforeAutoAck_QPID_397() throws Exception
- {
- // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks
- // message at the end of the onMessage method, after a close has been sent.
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ThreadTestCoordinator tt = new ThreadTestCoordinator(2);
-
- tt.addTestThread(testThread1, 0);
- tt.addTestThread(testThread2, 1);
- tt.setDeadlockTimeout(500);
- tt.run();
-
- String errorMessage = tt.joinAndRetrieveMessages();
-
- // Print any error messages or exceptions.
- log.debug(errorMessage);
-
- if (!tt.getExceptions().isEmpty())
- {
- for (Exception e : tt.getExceptions())
- {
- log.debug("Exception thrown during test thread: ", e);
- }
- }
-
- Assert.assertTrue(errorMessage, "".equals(errorMessage));
- }
-
- public void closeBeforeAutoAckManyTimes() throws Exception
- {
- for (int i = 0; i < TEST_COUNT; i++)
- {
- testCloseBeforeAutoAck_QPID_397();
- }
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- connection = getConnection("guest", "guest");
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import junit.framework.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.testutil.QpidTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.junit.concurrency.TestRunnable;
+import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+/**
+ * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method.
+ * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent
+ * before closing the connection.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Check that
+ * closing a connection whilst handling a message, blocks till completion of the handler. </table>
+ */
+public class CloseBeforeAckTest extends QpidTestCase
+{
+ private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class);
+
+ Connection connection;
+ Session session;
+ public static final String TEST_QUEUE_NAME = "TestQueue";
+ private int TEST_COUNT = 25;
+
+ class TestThread1 extends TestRunnable implements MessageListener
+ {
+ public void runWithExceptions() throws Exception
+ {
+ // Set this up to listen for message on the test session.
+ session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this);
+ }
+
+ public void onMessage(Message message)
+ {
+ // Give thread 2 permission to close the session.
+ allow(new int[] { 1 });
+
+ // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete.
+ waitFor(new int[] { 1 }, true);
+ }
+ }
+
+ TestThread1 testThread1 = new TestThread1();
+
+ TestRunnable testThread2 =
+ new TestRunnable()
+ {
+ public void runWithExceptions() throws Exception
+ {
+ // Send a message to be picked up by thread 1.
+ session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME),
+ session.createTextMessage("Hi there thread 1!"));
+
+ // Wait for thread 1 to pick up the message and give permission to continue.
+ waitFor(new int[] { 0 }, false);
+
+ // Close the connection.
+ session.close();
+
+ // Allow thread 1 to continue to completion, if it is erronously still waiting.
+ allow(new int[] { 1 });
+ }
+ };
+
+ public void testCloseBeforeAutoAck_QPID_397() throws Exception
+ {
+ // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks
+ // message at the end of the onMessage method, after a close has been sent.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ThreadTestCoordinator tt = new ThreadTestCoordinator(2);
+
+ tt.addTestThread(testThread1, 0);
+ tt.addTestThread(testThread2, 1);
+ tt.setDeadlockTimeout(500);
+ tt.run();
+
+ String errorMessage = tt.joinAndRetrieveMessages();
+
+ // Print any error messages or exceptions.
+ log.debug(errorMessage);
+
+ if (!tt.getExceptions().isEmpty())
+ {
+ for (Exception e : tt.getExceptions())
+ {
+ log.debug("Exception thrown during test thread: ", e);
+ }
+ }
+
+ Assert.assertTrue(errorMessage, "".equals(errorMessage));
+ }
+
+ public void closeBeforeAutoAckManyTimes() throws Exception
+ {
+ for (int i = 0; i < TEST_COUNT; i++)
+ {
+ testCloseBeforeAutoAck_QPID_397();
+ }
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ connection = getConnection("guest", "guest");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
index 0b9d0bdc2d..131cbd5f68 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
@@ -1,89 +1,89 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.unit.message;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.testutil.QpidTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-/**
- * @author Apache Software Foundation
- */
-public class JMSDestinationTest extends QpidTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(JMSDestinationTest.class);
-
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public void testJMSDestination() throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false,
- true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
-
- Connection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
-
- TextMessage sentMsg = producerSession.createTextMessage("hello");
- assertNull(sentMsg.getJMSDestination());
-
- producer.send(sentMsg);
-
- assertEquals(sentMsg.getJMSDestination(), queue);
-
- con2.close();
-
- con.start();
-
- TextMessage rm = (TextMessage) consumer.receive();
- assertNotNull(rm);
-
- assertEquals(rm.getJMSDestination(), queue);
- con.close();
- }
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.unit.message;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.testutil.QpidTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSDestinationTest extends QpidTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(JMSDestinationTest.class);
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testJMSDestination() throws Exception
+ {
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue =
+ new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false,
+ true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = (AMQConnection) getConnection("guest", "guest");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ TextMessage sentMsg = producerSession.createTextMessage("hello");
+ assertNull(sentMsg.getJMSDestination());
+
+ producer.send(sentMsg);
+
+ assertEquals(sentMsg.getJMSDestination(), queue);
+
+ con2.close();
+
+ con.start();
+
+ TextMessage rm = (TextMessage) consumer.receive();
+ assertNotNull(rm);
+
+ assertEquals(rm.getJMSDestination(), queue);
+ con.close();
+ }
+
+}