diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-24 17:49:03 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-24 17:49:03 +0000 |
| commit | d964eae817b538c532996af0b41993d128fa5a5c (patch) | |
| tree | b0f9a56bc8a7691bd4cf009cbc83cf0fc8aa3ffc /java/client/src | |
| parent | 757d86d81e811f105f72fdfce5bc18d83aaa08d4 (diff) | |
| download | qpid-python-d964eae817b538c532996af0b41993d128fa5a5c.tar.gz | |
QPID-832 : Fix eol-style
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651325 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
19 files changed, 3205 insertions, 3205 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java index 93f10761e2..7e257e0c20 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java @@ -1,26 +1,26 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-public interface AMQSessionAdapter
-{
- public AMQSession getSession();
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +public interface AMQSessionAdapter +{ + public AMQSession getSession(); +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java index 768e2862b3..fa2afb3ee4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java @@ -1,40 +1,40 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQUndefinedDestination extends AMQDestination
-{
-
- private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown");
-
-
- public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
- {
- super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
- }
-
- public boolean isNameRequired()
- {
- return getAMQQueueName() == null;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.framing.AMQShortString; + +public class AMQUndefinedDestination extends AMQDestination +{ + + private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown"); + + + public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) + { + super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName); + } + + public boolean isNameRequired() + { + return getAMQQueueName() == null; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java index 8cb285c1ac..7cc548915c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java +++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java @@ -1,66 +1,66 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-
-import org.apache.qpid.framing.AMQShortString;
-
-public enum CustomJMSXProperty
-{
- JMS_AMQP_NULL,
- JMS_QPID_DESTTYPE,
- JMSXGroupID,
- JMSXGroupSeq,
- JMSXUserID;
-
-
- private final AMQShortString _nameAsShortString;
-
- CustomJMSXProperty()
- {
- _nameAsShortString = new AMQShortString(toString());
- }
-
- public AMQShortString getShortStringName()
- {
- return _nameAsShortString;
- }
-
- private static Enumeration _names;
-
- public static synchronized Enumeration asEnumeration()
- {
- if(_names == null)
- {
- CustomJMSXProperty[] properties = values();
- ArrayList<String> nameList = new ArrayList<String>(properties.length);
- for(CustomJMSXProperty property : properties)
- {
- nameList.add(property.toString());
- }
- _names = Collections.enumeration(nameList);
- }
- return _names;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; + +import org.apache.qpid.framing.AMQShortString; + +public enum CustomJMSXProperty +{ + JMS_AMQP_NULL, + JMS_QPID_DESTTYPE, + JMSXGroupID, + JMSXGroupSeq, + JMSXUserID; + + + private final AMQShortString _nameAsShortString; + + CustomJMSXProperty() + { + _nameAsShortString = new AMQShortString(toString()); + } + + public AMQShortString getShortStringName() + { + return _nameAsShortString; + } + + private static Enumeration _names; + + public static synchronized Enumeration asEnumeration() + { + if(_names == null) + { + CustomJMSXProperty[] properties = values(); + ArrayList<String> nameList = new ArrayList<String>(properties.length); + for(CustomJMSXProperty property : properties) + { + nameList.add(property.toString()); + } + _names = Collections.enumeration(nameList); + } + return _names; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java index 03d25aa243..7f8e80c73a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java @@ -1,38 +1,38 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.client;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-
-/**
- * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
- * so that operations related to their "temporary-ness" can be abstracted out.
- */
-interface TemporaryDestination extends Destination
-{
-
- public void delete() throws JMSException;
- public AMQSession getSession();
- public boolean isDeleted();
-
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.client; + +import javax.jms.Destination; +import javax.jms.JMSException; + +/** + * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue + * so that operations related to their "temporary-ness" can be abstracted out. + */ +interface TemporaryDestination extends Destination +{ + + public void delete() throws JMSException; + public AMQSession getSession(); + public boolean isDeleted(); + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java index 1ec98efe0e..51cc94965a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -1,75 +1,75 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.client.failover;
-
-import org.apache.qpid.client.AMQConnection;
-
-/**
- * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support
- * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this
- * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be
- * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception,
- * for example, because the caller already holds a lock preventing that condition from arising.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Perform a fail-over protected operation raising providing no handling of fail-over conditions.
- * </table>
- */
-public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E>
-{
- /** The protected operation that is to be retried in the event of fail-over. */
- FailoverProtectedOperation<T, E> operation;
-
- /** The connection on which the fail-over protected operation is to be performed. */
- AMQConnection connection;
-
- /**
- * Creates an automatic retrying fail-over handler for the specified operation.
- *
- * @param operation The fail-over protected operation to wrap in this handler.
- */
- public FailoverNoopSupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
- {
- this.operation = operation;
- this.connection = con;
- }
-
- /**
- * Delegates to another continuation which is to be provided with fail-over handling.
- *
- * @return The return value from the delegated to continuation.
- * @throws E Any exception that the delegated to continuation may raise.
- */
- public T execute() throws E
- {
- try
- {
- return operation.execute();
- }
- catch (FailoverException e)
- {
- throw new IllegalStateException("Fail-over interupted no-op failover support. "
- + "No-op support should only be used where the caller is certain fail-over cannot occur.", e);
- }
- }
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.client.failover; + +import org.apache.qpid.client.AMQConnection; + +/** + * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support + * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this + * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be + * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception, + * for example, because the caller already holds a lock preventing that condition from arising. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Perform a fail-over protected operation raising providing no handling of fail-over conditions. + * </table> + */ +public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E> +{ + /** The protected operation that is to be retried in the event of fail-over. */ + FailoverProtectedOperation<T, E> operation; + + /** The connection on which the fail-over protected operation is to be performed. */ + AMQConnection connection; + + /** + * Creates an automatic retrying fail-over handler for the specified operation. + * + * @param operation The fail-over protected operation to wrap in this handler. + */ + public FailoverNoopSupport(FailoverProtectedOperation<T, E> operation, AMQConnection con) + { + this.operation = operation; + this.connection = con; + } + + /** + * Delegates to another continuation which is to be provided with fail-over handling. + * + * @return The return value from the delegated to continuation. + * @throws E Any exception that the delegated to continuation may raise. + */ + public T execute() throws E + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + throw new IllegalStateException("Fail-over interupted no-op failover support. " + + "No-op support should only be used where the caller is certain fail-over cannot occur.", e); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java index 9a7f43926e..e9c5f24791 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java @@ -1,49 +1,49 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.client.failover;
-
-/**
- * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because
- * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers
- * for failover protected operations, in order to provide different handling schemes when failovers occurr.
- *
- * <p/>The type of checked exception that the operation may perform has been generified, in order that fail over
- * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not
- * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes
- * will mask the exception.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities
- * <tr><td> Perform an operation that may be interrupted by fail-over.
- * </table>
- */
-public interface FailoverProtectedOperation<T, E extends Exception>
-{
- /**
- * Performs the continuations work.
- *
- * @return Provdes scope for the continuation to return an arbitrary value.
- *
- * @throws FailoverException If the operation is interrupted by a fail-over notification.
- */
- public abstract T execute() throws E, FailoverException;
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.client.failover; + +/** + * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because + * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers + * for failover protected operations, in order to provide different handling schemes when failovers occurr. + * + * <p/>The type of checked exception that the operation may perform has been generified, in order that fail over + * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not + * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes + * will mask the exception. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities + * <tr><td> Perform an operation that may be interrupted by fail-over. + * </table> + */ +public interface FailoverProtectedOperation<T, E extends Exception> +{ + /** + * Performs the continuations work. + * + * @return Provdes scope for the continuation to return an arbitrary value. + * + * @throws FailoverException If the operation is interrupted by a fail-over notification. + */ + public abstract T execute() throws E, FailoverException; +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index e756d7baf9..cf7e978c03 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -1,135 +1,135 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.failover;
-
-import org.apache.qpid.client.AMQConnection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified
- * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due
- * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request,
- * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This
- * automatic retrying is continued until the continuation succeeds, or throws an exception (different to
- * FailoverException, which is used to signal the failure of the original condition).
- *
- * <p/>The blocking condition used is that the connection is not currently failing over, and the mutex used is the
- * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods.
- * These are used like a lock and condition variable.
- *
- * <p/>The wrapped operation may throw a FailoverException, this is an exception that can be raised by a
- * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a
- * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are
- * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous
- * methods that should not be attempted when a fail-over is in progress.
- *
- * <p/>Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be
- * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want
- * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation
- * until it succeeds. Synchronous methods are usually coordinated with a
- * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants
- * to start and throws a FailoverException in response to this.
- *
- * <p/>Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be
- * started during fail-over, but be delayed until any current fail-over has completed.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide a continuation synchronized on a fail-over lock and condition.
- * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
- * </table>
- *
- * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
- * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
- * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
- * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
- * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
- *
- * @todo InterruptedException not handled well.
- */
-public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E>
-{
- /** Used for debugging. */
- private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class);
-
- /** The protected operation that is to be retried in the event of fail-over. */
- FailoverProtectedOperation<T, E> operation;
-
- /** The connection on which the fail-over protected operation is to be performed. */
- AMQConnection connection;
-
- /**
- * Creates an automatic retrying fail-over handler for the specified operation.
- *
- * @param operation The fail-over protected operation to wrap in this handler.
- */
- public FailoverRetrySupport(FailoverProtectedOperation<T, E> operation, AMQConnection con)
- {
- this.operation = operation;
- this.connection = con;
- }
-
- /**
- * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats
- * until the operation throws AMQException or succeeds without being interrupted by fail-over.
- *
- * @return The result of executing the continuation.
- *
- * @throws E Any underlying exception is allowed to fall through.
- */
- public T execute() throws E
- {
- while (true)
- {
- try
- {
- connection.blockUntilNotFailingOver();
- }
- catch (InterruptedException e)
- {
- _log.debug("Interrupted: " + e, e);
-
- return null;
- }
-
- synchronized (connection.getFailoverMutex())
- {
- try
- {
- return operation.execute();
- }
- catch (FailoverException e)
- {
- _log.debug("Failover exception caught during operation: " + e, e);
- }
- catch (IllegalStateException e)
- {
- if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
- {
- throw e;
- }
- }
- }
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.failover; + +import org.apache.qpid.client.AMQConnection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified + * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due + * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request, + * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This + * automatic retrying is continued until the continuation succeeds, or throws an exception (different to + * FailoverException, which is used to signal the failure of the original condition). + * + * <p/>The blocking condition used is that the connection is not currently failing over, and the mutex used is the + * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods. + * These are used like a lock and condition variable. + * + * <p/>The wrapped operation may throw a FailoverException, this is an exception that can be raised by a + * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a + * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are + * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous + * methods that should not be attempted when a fail-over is in progress. + * + * <p/>Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be + * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want + * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation + * until it succeeds. Synchronous methods are usually coordinated with a + * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants + * to start and throws a FailoverException in response to this. + * + * <p/>Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be + * started during fail-over, but be delayed until any current fail-over has completed. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide a continuation synchronized on a fail-over lock and condition. + * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception. + * </table> + * + * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see + * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary + * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation. + * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type + * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that. + * + * @todo InterruptedException not handled well. + */ +public class FailoverRetrySupport<T, E extends Exception> implements FailoverSupport<T, E> +{ + /** Used for debugging. */ + private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class); + + /** The protected operation that is to be retried in the event of fail-over. */ + FailoverProtectedOperation<T, E> operation; + + /** The connection on which the fail-over protected operation is to be performed. */ + AMQConnection connection; + + /** + * Creates an automatic retrying fail-over handler for the specified operation. + * + * @param operation The fail-over protected operation to wrap in this handler. + */ + public FailoverRetrySupport(FailoverProtectedOperation<T, E> operation, AMQConnection con) + { + this.operation = operation; + this.connection = con; + } + + /** + * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats + * until the operation throws AMQException or succeeds without being interrupted by fail-over. + * + * @return The result of executing the continuation. + * + * @throws E Any underlying exception is allowed to fall through. + */ + public T execute() throws E + { + while (true) + { + try + { + connection.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + _log.debug("Interrupted: " + e, e); + + return null; + } + + synchronized (connection.getFailoverMutex()) + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + _log.debug("Failover exception caught during operation: " + e, e); + } + catch (IllegalStateException e) + { + if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) + { + throw e; + } + } + } + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java index a150d1446a..5bd36aa88b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java @@ -1,36 +1,36 @@ -package org.apache.qpid.client.handler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQChannelClosedException;
-import org.apache.qpid.protocol.AMQConstant;
-
-public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
-{
- private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class);
-
- private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler();
-
- public static AccessRequestOkMethodHandler getInstance()
- {
- return _handler;
- }
-
- public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
- throws AMQException
- {
- _logger.debug("AccessRequestOk method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
- session.setTicket(method.getTicket(), channelId);
-
- }
-}
+package org.apache.qpid.client.handler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.framing.*; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.protocol.AMQConstant; + +public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody> +{ + private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class); + + private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler(); + + public static AccessRequestOkMethodHandler getInstance() + { + return _handler; + } + + public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId) + throws AMQException + { + _logger.debug("AccessRequestOk method received"); + final AMQProtocolSession session = stateManager.getProtocolSession(); + session.setTicket(method.getTicket(), channelId); + + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index de976b05bd..afb7517a12 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -1,528 +1,528 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.handler;
-
-import java.util.Map;
-import java.util.HashMap;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
-
-public class ClientMethodDispatcherImpl implements MethodDispatcher
-{
-
-
- private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
- private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
- private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
- private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
- private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
- private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
- private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
- private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
- private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
- private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
- private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
- private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
- private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
-
-
-
- private static interface DispatcherFactory
- {
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
- }
-
- private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
- new HashMap<ProtocolVersion, DispatcherFactory>();
-
- static
- {
- _dispatcherFactories.put(ProtocolVersion.v8_0,
- new DispatcherFactory()
- {
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
- {
- return new ClientMethodDispatcherImpl_8_0(stateManager);
- }
- });
-
- _dispatcherFactories.put(ProtocolVersion.v0_9,
- new DispatcherFactory()
- {
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
- {
- return new ClientMethodDispatcherImpl_0_9(stateManager);
- }
- });
-
- }
-
-
- public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
- {
- DispatcherFactory factory = _dispatcherFactories.get(version);
- return factory.createMethodDispatcher(stateManager);
- }
-
-
-
-
- private AMQStateManager _stateManager;
-
- public ClientMethodDispatcherImpl(AMQStateManager stateManager)
- {
- _stateManager = stateManager;
- }
-
-
- public AMQStateManager getStateManager()
- {
- return _stateManager;
- }
-
- public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
- {
- _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
- {
- _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
- {
- _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
- {
- _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
- {
- _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
- {
- _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
- {
- _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
- {
- _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
- {
- _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
- {
- _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
- {
- _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
- {
- _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
- {
- _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
- return true;
- }
-
- public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler; + +import java.util.Map; +import java.util.HashMap; + +import org.apache.qpid.framing.*; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; + + +public class ClientMethodDispatcherImpl implements MethodDispatcher +{ + + + private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance(); + private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance(); + private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); + private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); + private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); + private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); + private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); + private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance(); + private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance(); + private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance(); + private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance(); + private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance(); + + + + private static interface DispatcherFactory + { + public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager); + } + + private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories = + new HashMap<ProtocolVersion, DispatcherFactory>(); + + static + { + _dispatcherFactories.put(ProtocolVersion.v8_0, + new DispatcherFactory() + { + public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) + { + return new ClientMethodDispatcherImpl_8_0(stateManager); + } + }); + + _dispatcherFactories.put(ProtocolVersion.v0_9, + new DispatcherFactory() + { + public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) + { + return new ClientMethodDispatcherImpl_0_9(stateManager); + } + }); + + } + + + public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager) + { + DispatcherFactory factory = _dispatcherFactories.get(version); + return factory.createMethodDispatcher(stateManager); + } + + + + + private AMQStateManager _stateManager; + + public ClientMethodDispatcherImpl(AMQStateManager stateManager) + { + _stateManager = stateManager; + } + + + public AMQStateManager getStateManager() + { + return _stateManager; + } + + public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException + { + _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException + { + _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException + { + _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException + { + _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException + { + _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException + { + _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException + { + _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException + { + _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException + { + _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException + { + _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException + { + _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException + { + _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException + { + _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException + { + return false; + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java index ae6d5e8283..e235368357 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java @@ -1,155 +1,155 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.handler;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
-
-public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
-{
- public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
- {
- super(stateManager);
- }
-
-
- public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; + + +public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9 +{ + public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager) + { + super(stateManager); + } + + + public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException + { + return false; + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java index 6bd6874cde..b0f003cd2d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java @@ -1,85 +1,85 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
- package org.apache.qpid.client.handler;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
-
-public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
-{
- public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
- {
- super(stateManager);
- }
-
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.qpid.client.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; + +public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 +{ + public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager) + { + super(stateManager); + } + + public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException + { + return false; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index 5904131122..9bdde01bf3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -1,802 +1,802 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-
-package org.apache.qpid.client.message;
-
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-
-/**
- * @author Apache Software Foundation
- */
-public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
-{
-
- protected static final byte BOOLEAN_TYPE = (byte) 1;
-
- protected static final byte BYTE_TYPE = (byte) 2;
-
- protected static final byte BYTEARRAY_TYPE = (byte) 3;
-
- protected static final byte SHORT_TYPE = (byte) 4;
-
- protected static final byte CHAR_TYPE = (byte) 5;
-
- protected static final byte INT_TYPE = (byte) 6;
-
- protected static final byte LONG_TYPE = (byte) 7;
-
- protected static final byte FLOAT_TYPE = (byte) 8;
-
- protected static final byte DOUBLE_TYPE = (byte) 9;
-
- protected static final byte STRING_TYPE = (byte) 10;
-
- protected static final byte NULL_STRING_TYPE = (byte) 11;
-
- /**
- * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
- * a byte array in multiple chunks, hence this is used to track how much is left to be read
- */
- private int _byteArrayRemaining = -1;
-
- AbstractBytesTypedMessage()
- {
- this(null);
- }
-
- /**
- * Construct a stream message with existing data.
- *
- * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
- */
- AbstractBytesTypedMessage(ByteBuffer data)
- {
- super(data); // this instanties a content header
- }
-
-
- AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
- {
- super(messageNbr, contentHeader, exchange, routingKey, data);
- }
-
-
- protected byte readWireType() throws MessageFormatException, MessageEOFException,
- MessageNotReadableException
- {
- checkReadable();
- checkAvailable(1);
- return _data.get();
- }
-
- protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
- {
- checkWritable();
- _data.put(type);
- _changedData = true;
- }
-
- protected boolean readBoolean() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- boolean result;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private boolean readBooleanImpl()
- {
- return _data.get() != 0;
- }
-
- protected byte readByte() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- byte result;
- try
- {
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
-
- private byte readByteImpl()
- {
- return _data.get();
- }
-
- protected short readShort() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- short result;
- try
- {
- switch (wireType)
- {
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
-
- private short readShortImpl()
- {
- return _data.getShort();
- }
-
- /**
- * Note that this method reads a unicode character as two bytes from the stream
- *
- * @return the character read from the stream
- * @throws javax.jms.JMSException
- */
- protected char readChar() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- try
- {
- if(wireType == NULL_STRING_TYPE){
- throw new NullPointerException();
- }
-
- if (wireType != CHAR_TYPE)
- {
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
- }
- else
- {
- checkAvailable(2);
- return readCharImpl();
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private char readCharImpl()
- {
- return _data.getChar();
- }
-
- protected int readInt() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- int result;
- try
- {
- switch (wireType)
- {
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected int readIntImpl()
- {
- return _data.getInt();
- }
-
- protected long readLong() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- long result;
- try
- {
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private long readLongImpl()
- {
- return _data.getLong();
- }
-
- protected float readFloat() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- float result;
- try
- {
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private float readFloatImpl()
- {
- return _data.getFloat();
- }
-
- protected double readDouble() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- double result;
- try
- {
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private double readDoubleImpl()
- {
- return _data.getDouble();
- }
-
- protected String readString() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- String result;
- try
- {
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- throw new NullPointerException("data is null");
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected String readStringImpl() throws JMSException
- {
- try
- {
- return _data.getString(Charset.forName("UTF-8").newDecoder());
- }
- catch (CharacterCodingException e)
- {
- JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- je.setLinkedException(e);
- throw je;
- }
- }
-
- protected int readBytes(byte[] bytes) throws JMSException
- {
- if (bytes == null)
- {
- throw new IllegalArgumentException("byte array must not be null");
- }
- checkReadable();
- // first call
- if (_byteArrayRemaining == -1)
- {
- // type discriminator checked separately so you get a MessageFormatException rather than
- // an EOF even in the case where both would be applicable
- checkAvailable(1);
- byte wireType = readWireType();
- if (wireType != BYTEARRAY_TYPE)
- {
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
- }
- checkAvailable(4);
- int size = _data.getInt();
- // length of -1 indicates null
- if (size == -1)
- {
- return -1;
- }
- else
- {
- if (size > _data.remaining())
- {
- throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " +
- _data.remaining() + " bytes");
- }
- else
- {
- _byteArrayRemaining = size;
- }
- }
- }
- else if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- return -1;
- }
-
- int returnedSize = readBytesImpl(bytes);
- if (returnedSize < bytes.length)
- {
- _byteArrayRemaining = -1;
- }
- return returnedSize;
- }
-
- private int readBytesImpl(byte[] bytes)
- {
- int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
- _byteArrayRemaining -= count;
-
- if (count == 0)
- {
- return 0;
- }
- else
- {
- _data.get(bytes, 0, count);
- return count;
- }
- }
-
- protected Object readObject() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- Object result = null;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
- result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- byte[] bytesResult = new byte[size];
- readBytesImpl(bytesResult);
- result = bytesResult;
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected void writeBoolean(boolean b) throws JMSException
- {
- writeTypeDiscriminator(BOOLEAN_TYPE);
- _data.put(b ? (byte) 1 : (byte) 0);
- }
-
- protected void writeByte(byte b) throws JMSException
- {
- writeTypeDiscriminator(BYTE_TYPE);
- _data.put(b);
- }
-
- protected void writeShort(short i) throws JMSException
- {
- writeTypeDiscriminator(SHORT_TYPE);
- _data.putShort(i);
- }
-
- protected void writeChar(char c) throws JMSException
- {
- writeTypeDiscriminator(CHAR_TYPE);
- _data.putChar(c);
- }
-
- protected void writeInt(int i) throws JMSException
- {
- writeTypeDiscriminator(INT_TYPE);
- writeIntImpl(i);
- }
-
- protected void writeIntImpl(int i)
- {
- _data.putInt(i);
- }
-
- protected void writeLong(long l) throws JMSException
- {
- writeTypeDiscriminator(LONG_TYPE);
- _data.putLong(l);
- }
-
- protected void writeFloat(float v) throws JMSException
- {
- writeTypeDiscriminator(FLOAT_TYPE);
- _data.putFloat(v);
- }
-
- protected void writeDouble(double v) throws JMSException
- {
- writeTypeDiscriminator(DOUBLE_TYPE);
- _data.putDouble(v);
- }
-
- protected void writeString(String string) throws JMSException
- {
- if (string == null)
- {
- writeTypeDiscriminator(NULL_STRING_TYPE);
- }
- else
- {
- writeTypeDiscriminator(STRING_TYPE);
- try
- {
- writeStringImpl(string);
- }
- catch (CharacterCodingException e)
- {
- JMSException ex = new JMSException("Unable to encode string: " + e);
- ex.setLinkedException(e);
- throw ex;
- }
- }
- }
-
- protected void writeStringImpl(String string)
- throws CharacterCodingException
- {
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte) 0);
- }
-
- protected void writeBytes(byte[] bytes) throws JMSException
- {
- writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
- }
-
- protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
- {
- writeTypeDiscriminator(BYTEARRAY_TYPE);
- if (bytes == null)
- {
- _data.putInt(-1);
- }
- else
- {
- _data.putInt(length);
- _data.put(bytes, offset, length);
- }
- }
-
- protected void writeObject(Object object) throws JMSException
- {
- checkWritable();
- Class clazz;
-
- if (object == null)
- {
- // string handles the output of null values
- clazz = String.class;
- }
- else
- {
- clazz = object.getClass();
- }
-
- if (clazz == Byte.class)
- {
- writeByte((Byte) object);
- }
- else if (clazz == Boolean.class)
- {
- writeBoolean((Boolean) object);
- }
- else if (clazz == byte[].class)
- {
- writeBytes((byte[]) object);
- }
- else if (clazz == Short.class)
- {
- writeShort((Short) object);
- }
- else if (clazz == Character.class)
- {
- writeChar((Character) object);
- }
- else if (clazz == Integer.class)
- {
- writeInt((Integer) object);
- }
- else if (clazz == Long.class)
- {
- writeLong((Long) object);
- }
- else if (clazz == Float.class)
- {
- writeFloat((Float) object);
- }
- else if (clazz == Double.class)
- {
- writeDouble((Double) object);
- }
- else if (clazz == String.class)
- {
- writeString((String) object);
- }
- else
- {
- throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
- }
- }
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.client.message; + +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage +{ + + protected static final byte BOOLEAN_TYPE = (byte) 1; + + protected static final byte BYTE_TYPE = (byte) 2; + + protected static final byte BYTEARRAY_TYPE = (byte) 3; + + protected static final byte SHORT_TYPE = (byte) 4; + + protected static final byte CHAR_TYPE = (byte) 5; + + protected static final byte INT_TYPE = (byte) 6; + + protected static final byte LONG_TYPE = (byte) 7; + + protected static final byte FLOAT_TYPE = (byte) 8; + + protected static final byte DOUBLE_TYPE = (byte) 9; + + protected static final byte STRING_TYPE = (byte) 10; + + protected static final byte NULL_STRING_TYPE = (byte) 11; + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + AbstractBytesTypedMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesTypedMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + } + + + protected byte readWireType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException + { + checkReadable(); + checkAvailable(1); + return _data.get(); + } + + protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException + { + checkWritable(); + _data.put(type); + _changedData = true; + } + + protected boolean readBoolean() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private boolean readBooleanImpl() + { + return _data.get() != 0; + } + + protected byte readByte() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private byte readByteImpl() + { + return _data.get(); + } + + protected short readShort() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + protected char readChar() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if(wireType == NULL_STRING_TYPE){ + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private char readCharImpl() + { + return _data.getChar(); + } + + protected int readInt() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected int readIntImpl() + { + return _data.getInt(); + } + + protected long readLong() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private long readLongImpl() + { + return _data.getLong(); + } + + protected float readFloat() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private float readFloatImpl() + { + return _data.getFloat(); + } + + protected double readDouble() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private double readDoubleImpl() + { + return _data.getDouble(); + } + + protected String readString() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected String readStringImpl() throws JMSException + { + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + protected int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + + _data.remaining() + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + protected Object readObject() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected void writeBoolean(boolean b) throws JMSException + { + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + protected void writeByte(byte b) throws JMSException + { + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + protected void writeShort(short i) throws JMSException + { + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + protected void writeChar(char c) throws JMSException + { + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + protected void writeInt(int i) throws JMSException + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + protected void writeIntImpl(int i) + { + _data.putInt(i); + } + + protected void writeLong(long l) throws JMSException + { + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + + protected void writeFloat(float v) throws JMSException + { + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); + } + + protected void writeDouble(double v) throws JMSException + { + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } + + protected void writeString(String string) throws JMSException + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + try + { + writeStringImpl(string); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + } + + protected void writeStringImpl(String string) + throws CharacterCodingException + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte) 0); + } + + protected void writeBytes(byte[] bytes) throws JMSException + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + if (bytes == null) + { + _data.putInt(-1); + } + else + { + _data.putInt(length); + _data.put(bytes, offset, length); + } + } + + protected void writeObject(Object object) throws JMSException + { + checkWritable(); + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 39b4e1e27b..fec0117a03 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -1,552 +1,552 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-import java.util.Enumeration;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-
-public final class JMSHeaderAdapter
-{
- private final FieldTable _headers;
-
- public JMSHeaderAdapter(FieldTable headers)
- {
- _headers = headers;
- }
-
-
- public FieldTable getHeaders()
- {
- return _headers;
- }
-
- public boolean getBoolean(String string) throws JMSException
- {
- checkPropertyName(string);
- Boolean b = getHeaders().getBoolean(string);
-
- if (b == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf((String) str);
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
- }
-
- return b;
- }
-
- public boolean getBoolean(AMQShortString string) throws JMSException
- {
- checkPropertyName(string);
- Boolean b = getHeaders().getBoolean(string);
-
- if (b == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf((String) str);
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
- }
-
- return b;
- }
-
- public char getCharacter(String string) throws JMSException
- {
- checkPropertyName(string);
- Character c = getHeaders().getCharacter(string);
-
- if (c == null)
- {
- if (getHeaders().isNullStringValue(string))
- {
- throw new NullPointerException("Cannot convert null char");
- }
- else
- {
- throw new MessageFormatException("getChar can't use " + string + " item.");
- }
- }
- else
- {
- return (char) c;
- }
- }
-
- public byte[] getBytes(String string) throws JMSException
- {
- return getBytes(new AMQShortString(string));
- }
-
- public byte[] getBytes(AMQShortString string) throws JMSException
- {
- checkPropertyName(string);
-
- byte[] bs = getHeaders().getBytes(string);
-
- if (bs == null)
- {
- throw new MessageFormatException("getBytes can't use " + string + " item.");
- }
- else
- {
- return bs;
- }
- }
-
- public byte getByte(String string) throws JMSException
- {
- checkPropertyName(string);
- Byte b = getHeaders().getByte(string);
- if (b == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getByte can't use " + string + " item.");
- }
- else
- {
- return Byte.valueOf((String) str);
- }
- }
- else
- {
- b = Byte.valueOf(null);
- }
- }
-
- return b;
- }
-
- public short getShort(String string) throws JMSException
- {
- checkPropertyName(string);
- Short s = getHeaders().getShort(string);
-
- if (s == null)
- {
- s = Short.valueOf(getByte(string));
- }
-
- return s;
- }
-
- public int getInteger(String string) throws JMSException
- {
- checkPropertyName(string);
- Integer i = getHeaders().getInteger(string);
-
- if (i == null)
- {
- i = Integer.valueOf(getShort(string));
- }
-
- return i;
- }
-
- public long getLong(String string) throws JMSException
- {
- checkPropertyName(string);
- Long l = getHeaders().getLong(string);
-
- if (l == null)
- {
- l = Long.valueOf(getInteger(string));
- }
-
- return l;
- }
-
- public float getFloat(String string) throws JMSException
- {
- checkPropertyName(string);
- Float f = getHeaders().getFloat(string);
-
- if (f == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getFloat can't use " + string + " item.");
- }
- else
- {
- return Float.valueOf((String) str);
- }
- }
- else
- {
- f = Float.valueOf(null);
- }
-
- }
-
- return f;
- }
-
- public double getDouble(String string) throws JMSException
- {
- checkPropertyName(string);
- Double d = getHeaders().getDouble(string);
-
- if (d == null)
- {
- d = Double.valueOf(getFloat(string));
- }
-
- return d;
- }
-
- public String getString(String string) throws JMSException
- {
- checkPropertyName(string);
- String s = getHeaders().getString(string);
-
- if (s == null)
- {
- if (getHeaders().containsKey(string))
- {
- Object o = getHeaders().getObject(string);
- if (o instanceof byte[])
- {
- throw new MessageFormatException("getObject couldn't find " + string + " item.");
- }
- else
- {
- if (o == null)
- {
- return null;
- }
- else
- {
- s = String.valueOf(o);
- }
- }
- }//else return s // null;
- }
-
- return s;
- }
-
- public Object getObject(String string) throws JMSException
- {
- checkPropertyName(string);
- return getHeaders().getObject(string);
- }
-
- public void setBoolean(AMQShortString string, boolean b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setBoolean(string, b);
- }
-
- public void setBoolean(String string, boolean b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setBoolean(string, b);
- }
-
- public void setChar(String string, char c) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setChar(string, c);
- }
-
- public Object setBytes(AMQShortString string, byte[] bytes)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes);
- }
-
- public Object setBytes(String string, byte[] bytes)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes);
- }
-
- public Object setBytes(String string, byte[] bytes, int start, int length)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes, start, length);
- }
-
- public void setByte(String string, byte b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setByte(string, b);
- }
-
- public void setByte(AMQShortString string, byte b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setByte(string, b);
- }
-
-
- public void setShort(String string, short i) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setShort(string, i);
- }
-
- public void setInteger(String string, int i) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setInteger(string, i);
- }
-
- public void setInteger(AMQShortString string, int i) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setInteger(string, i);
- }
-
- public void setLong(String string, long l) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setLong(string, l);
- }
-
- public void setFloat(String string, float v) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setFloat(string, v);
- }
-
- public void setDouble(String string, double v) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setDouble(string, v);
- }
-
- public void setString(String string, String string1) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setString(string, string1);
- }
-
- public void setString(AMQShortString string, String string1) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setString(string, string1);
- }
-
- public void setObject(String string, Object object) throws JMSException
- {
- checkPropertyName(string);
- try
- {
- getHeaders().setObject(string, object);
- }
- catch (AMQPInvalidClassException aice)
- {
- MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
- mfe.setLinkedException(aice);
- throw mfe;
- }
- }
-
- public boolean itemExists(String string) throws JMSException
- {
- checkPropertyName(string);
- return getHeaders().containsKey(string);
- }
-
- public Enumeration getPropertyNames()
- {
- return getHeaders().getPropertyNames();
- }
-
- public void clear()
- {
- getHeaders().clear();
- }
-
- public boolean propertyExists(AMQShortString propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().propertyExists(propertyName);
- }
-
- public boolean propertyExists(String propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().propertyExists(propertyName);
- }
-
- public Object put(Object key, Object value)
- {
- checkPropertyName(key.toString());
- return getHeaders().setObject(key.toString(), value);
- }
-
- public Object remove(AMQShortString propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().remove(propertyName);
- }
-
- public Object remove(String propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().remove(propertyName);
- }
-
- public boolean isEmpty()
- {
- return getHeaders().isEmpty();
- }
-
- public void writeToBuffer(ByteBuffer data)
- {
- getHeaders().writeToBuffer(data);
- }
-
- public Enumeration getMapNames()
- {
- return getPropertyNames();
- }
-
- protected static void checkPropertyName(CharSequence propertyName)
- {
- if (propertyName == null)
- {
- throw new IllegalArgumentException("Property name must not be null");
- }
- else if (propertyName.length() == 0)
- {
- throw new IllegalArgumentException("Property name must not be the empty string");
- }
-
- checkIdentiferFormat(propertyName);
- }
-
- protected static void checkIdentiferFormat(CharSequence propertyName)
- {
-// JMS requirements 3.5.1 Property Names
-// Identifiers:
-// - An identifier is an unlimited-length character sequence that must begin
-// with a Java identifier start character; all following characters must be Java
-// identifier part characters. An identifier start character is any character for
-// which the method Character.isJavaIdentifierStart returns true. This includes
-// '_' and '$'. An identifier part character is any character for which the
-// method Character.isJavaIdentifierPart returns true.
-// - Identifiers cannot be the names NULL, TRUE, or FALSE.
-// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
-// ESCAPE.
-// � Identifiers are either header field references or property references. The
-// type of a property value in a message selector corresponds to the type
-// used to set the property. If a property that does not exist in a message is
-// referenced, its value is NULL. The semantics of evaluating NULL values
-// in a selector are described in Section 3.8.1.2, �Null Values.�
-// � The conversions that apply to the get methods for properties do not
-// apply when a property is used in a message selector expression. For
-// example, suppose you set a property as a string value, as in the
-// following:
-// myMessage.setStringProperty("NumberOfOrders", "2");
-// The following expression in a message selector would evaluate to false,
-// because a string cannot be used in an arithmetic expression:
-// "NumberOfOrders > 1"
-// � Identifiers are case sensitive.
-// � Message header field references are restricted to JMSDeliveryMode,
-// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
-// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
-// null and if so are treated as a NULL value.
-
- if (Boolean.getBoolean("strict-jms"))
- {
- // JMS start character
- if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
- }
-
- // JMS part character
- int length = propertyName.length();
- for (int c = 1; c < length; c++)
- {
- if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
- }
- }
-
- // JMS invalid names
- if ((propertyName.equals("NULL")
- || propertyName.equals("TRUE")
- || propertyName.equals("FALSE")
- || propertyName.equals("NOT")
- || propertyName.equals("AND")
- || propertyName.equals("OR")
- || propertyName.equals("BETWEEN")
- || propertyName.equals("LIKE")
- || propertyName.equals("IN")
- || propertyName.equals("IS")
- || propertyName.equals("ESCAPE")))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
- }
- }
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import java.util.Enumeration; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + + +public final class JMSHeaderAdapter +{ + private final FieldTable _headers; + + public JMSHeaderAdapter(FieldTable headers) + { + _headers = headers; + } + + + public FieldTable getHeaders() + { + return _headers; + } + + public boolean getBoolean(String string) throws JMSException + { + checkPropertyName(string); + Boolean b = getHeaders().getBoolean(string); + + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public boolean getBoolean(AMQShortString string) throws JMSException + { + checkPropertyName(string); + Boolean b = getHeaders().getBoolean(string); + + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public char getCharacter(String string) throws JMSException + { + checkPropertyName(string); + Character c = getHeaders().getCharacter(string); + + if (c == null) + { + if (getHeaders().isNullStringValue(string)) + { + throw new NullPointerException("Cannot convert null char"); + } + else + { + throw new MessageFormatException("getChar can't use " + string + " item."); + } + } + else + { + return (char) c; + } + } + + public byte[] getBytes(String string) throws JMSException + { + return getBytes(new AMQShortString(string)); + } + + public byte[] getBytes(AMQShortString string) throws JMSException + { + checkPropertyName(string); + + byte[] bs = getHeaders().getBytes(string); + + if (bs == null) + { + throw new MessageFormatException("getBytes can't use " + string + " item."); + } + else + { + return bs; + } + } + + public byte getByte(String string) throws JMSException + { + checkPropertyName(string); + Byte b = getHeaders().getByte(string); + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getByte can't use " + string + " item."); + } + else + { + return Byte.valueOf((String) str); + } + } + else + { + b = Byte.valueOf(null); + } + } + + return b; + } + + public short getShort(String string) throws JMSException + { + checkPropertyName(string); + Short s = getHeaders().getShort(string); + + if (s == null) + { + s = Short.valueOf(getByte(string)); + } + + return s; + } + + public int getInteger(String string) throws JMSException + { + checkPropertyName(string); + Integer i = getHeaders().getInteger(string); + + if (i == null) + { + i = Integer.valueOf(getShort(string)); + } + + return i; + } + + public long getLong(String string) throws JMSException + { + checkPropertyName(string); + Long l = getHeaders().getLong(string); + + if (l == null) + { + l = Long.valueOf(getInteger(string)); + } + + return l; + } + + public float getFloat(String string) throws JMSException + { + checkPropertyName(string); + Float f = getHeaders().getFloat(string); + + if (f == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getFloat can't use " + string + " item."); + } + else + { + return Float.valueOf((String) str); + } + } + else + { + f = Float.valueOf(null); + } + + } + + return f; + } + + public double getDouble(String string) throws JMSException + { + checkPropertyName(string); + Double d = getHeaders().getDouble(string); + + if (d == null) + { + d = Double.valueOf(getFloat(string)); + } + + return d; + } + + public String getString(String string) throws JMSException + { + checkPropertyName(string); + String s = getHeaders().getString(string); + + if (s == null) + { + if (getHeaders().containsKey(string)) + { + Object o = getHeaders().getObject(string); + if (o instanceof byte[]) + { + throw new MessageFormatException("getObject couldn't find " + string + " item."); + } + else + { + if (o == null) + { + return null; + } + else + { + s = String.valueOf(o); + } + } + }//else return s // null; + } + + return s; + } + + public Object getObject(String string) throws JMSException + { + checkPropertyName(string); + return getHeaders().getObject(string); + } + + public void setBoolean(AMQShortString string, boolean b) throws JMSException + { + checkPropertyName(string); + getHeaders().setBoolean(string, b); + } + + public void setBoolean(String string, boolean b) throws JMSException + { + checkPropertyName(string); + getHeaders().setBoolean(string, b); + } + + public void setChar(String string, char c) throws JMSException + { + checkPropertyName(string); + getHeaders().setChar(string, c); + } + + public Object setBytes(AMQShortString string, byte[] bytes) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes); + } + + public Object setBytes(String string, byte[] bytes) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes); + } + + public Object setBytes(String string, byte[] bytes, int start, int length) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes, start, length); + } + + public void setByte(String string, byte b) throws JMSException + { + checkPropertyName(string); + getHeaders().setByte(string, b); + } + + public void setByte(AMQShortString string, byte b) throws JMSException + { + checkPropertyName(string); + getHeaders().setByte(string, b); + } + + + public void setShort(String string, short i) throws JMSException + { + checkPropertyName(string); + getHeaders().setShort(string, i); + } + + public void setInteger(String string, int i) throws JMSException + { + checkPropertyName(string); + getHeaders().setInteger(string, i); + } + + public void setInteger(AMQShortString string, int i) throws JMSException + { + checkPropertyName(string); + getHeaders().setInteger(string, i); + } + + public void setLong(String string, long l) throws JMSException + { + checkPropertyName(string); + getHeaders().setLong(string, l); + } + + public void setFloat(String string, float v) throws JMSException + { + checkPropertyName(string); + getHeaders().setFloat(string, v); + } + + public void setDouble(String string, double v) throws JMSException + { + checkPropertyName(string); + getHeaders().setDouble(string, v); + } + + public void setString(String string, String string1) throws JMSException + { + checkPropertyName(string); + getHeaders().setString(string, string1); + } + + public void setString(AMQShortString string, String string1) throws JMSException + { + checkPropertyName(string); + getHeaders().setString(string, string1); + } + + public void setObject(String string, Object object) throws JMSException + { + checkPropertyName(string); + try + { + getHeaders().setObject(string, object); + } + catch (AMQPInvalidClassException aice) + { + MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); + mfe.setLinkedException(aice); + throw mfe; + } + } + + public boolean itemExists(String string) throws JMSException + { + checkPropertyName(string); + return getHeaders().containsKey(string); + } + + public Enumeration getPropertyNames() + { + return getHeaders().getPropertyNames(); + } + + public void clear() + { + getHeaders().clear(); + } + + public boolean propertyExists(AMQShortString propertyName) + { + checkPropertyName(propertyName); + return getHeaders().propertyExists(propertyName); + } + + public boolean propertyExists(String propertyName) + { + checkPropertyName(propertyName); + return getHeaders().propertyExists(propertyName); + } + + public Object put(Object key, Object value) + { + checkPropertyName(key.toString()); + return getHeaders().setObject(key.toString(), value); + } + + public Object remove(AMQShortString propertyName) + { + checkPropertyName(propertyName); + return getHeaders().remove(propertyName); + } + + public Object remove(String propertyName) + { + checkPropertyName(propertyName); + return getHeaders().remove(propertyName); + } + + public boolean isEmpty() + { + return getHeaders().isEmpty(); + } + + public void writeToBuffer(ByteBuffer data) + { + getHeaders().writeToBuffer(data); + } + + public Enumeration getMapNames() + { + return getPropertyNames(); + } + + protected static void checkPropertyName(CharSequence propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if (propertyName.length() == 0) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + protected static void checkIdentiferFormat(CharSequence propertyName) + { +// JMS requirements 3.5.1 Property Names +// Identifiers: +// - An identifier is an unlimited-length character sequence that must begin +// with a Java identifier start character; all following characters must be Java +// identifier part characters. An identifier start character is any character for +// which the method Character.isJavaIdentifierStart returns true. This includes +// '_' and '$'. An identifier part character is any character for which the +// method Character.isJavaIdentifierPart returns true. +// - Identifiers cannot be the names NULL, TRUE, or FALSE. +// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or +// ESCAPE. +// � Identifiers are either header field references or property references. The +// type of a property value in a message selector corresponds to the type +// used to set the property. If a property that does not exist in a message is +// referenced, its value is NULL. The semantics of evaluating NULL values +// in a selector are described in Section 3.8.1.2, �Null Values.� +// � The conversions that apply to the get methods for properties do not +// apply when a property is used in a message selector expression. For +// example, suppose you set a property as a string value, as in the +// following: +// myMessage.setStringProperty("NumberOfOrders", "2"); +// The following expression in a message selector would evaluate to false, +// because a string cannot be used in an arithmetic expression: +// "NumberOfOrders > 1" +// � Identifiers are case sensitive. +// � Message header field references are restricted to JMSDeliveryMode, +// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and +// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be +// null and if so are treated as a NULL value. + + if (Boolean.getBoolean("strict-jms")) + { + // JMS start character + if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); + } + + // JMS part character + int length = propertyName.length(); + for (int c = 1; c < length; c++) + { + if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); + } + } + + // JMS invalid names + if ((propertyName.equals("NULL") + || propertyName.equals("TRUE") + || propertyName.equals("FALSE") + || propertyName.equals("NOT") + || propertyName.equals("AND") + || propertyName.equals("OR") + || propertyName.equals("BETWEEN") + || propertyName.equals("LIKE") + || propertyName.equals("IN") + || propertyName.equals("IS") + || propertyName.equals("ESCAPE"))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); + } + } + + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java index 5185278eef..2c99b9a97b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java @@ -1,32 +1,32 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.state;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.AMQException;
-
-public class AMQMethodNotImplementedException extends AMQException
-{
- public AMQMethodNotImplementedException(AMQMethodBody body)
- {
- super(null, "Unexpected Method Received: " + body.getClass().getName(), null);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.state; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.AMQException; + +public class AMQMethodNotImplementedException extends AMQException +{ + public AMQMethodNotImplementedException(AMQMethodBody body) + { + super(null, "Unexpected Method Received: " + body.getClass().getName(), null); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java index 6752ee616f..e65f9ad2f4 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/Message.java +++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java @@ -1,28 +1,28 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.jms;
-
-import javax.jms.JMSException;
-
-public interface Message extends javax.jms.Message
-{
- public void acknowledgeThis() throws JMSException;
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; + +public interface Message extends javax.jms.Message +{ + public void acknowledgeThis() throws JMSException; +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java index a5279a195b..1738db7239 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java @@ -1,104 +1,104 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.unit.basic;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.testutil.QpidTestCase;
-
-import javax.jms.Session;
-import javax.jms.QueueSession;
-import javax.jms.Queue;
-import javax.jms.QueueSender;
-import javax.jms.TextMessage;
-import javax.jms.InvalidDestinationException;
-
-public class InvalidDestinationTest extends QpidTestCase
-{
- private AMQConnection _connection;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _connection = (AMQConnection) getConnection("guest", "guest");
- }
-
- protected void tearDown() throws Exception
- {
- _connection.close();
- super.tearDown();
- }
-
-
-
- public void testInvalidDestination() throws Exception
- {
- Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
- AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
- QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // This is the only easy way to create and bind a queue from the API :-(
- queueSession.createConsumer(validDestination);
-
- QueueSender sender = queueSession.createSender(invalidDestination);
- TextMessage msg = queueSession.createTextMessage("Hello");
- try
- {
- sender.send(msg);
- fail("Expected InvalidDestinationException");
- }
- catch (InvalidDestinationException ex)
- {
- // pass
- }
- sender.close();
-
- sender = queueSession.createSender(null);
- invalidDestination = new AMQQueue("amq.direct","unknownQ");
-
- try
- {
- sender.send(invalidDestination,msg);
- fail("Expected InvalidDestinationException");
- }
- catch (InvalidDestinationException ex)
- {
- // pass
- }
- sender.send(validDestination,msg);
- sender.close();
- validDestination = new AMQQueue("amq.direct","knownQ");
- sender = queueSession.createSender(validDestination);
- sender.send(msg);
-
-
-
-
- }
-
-
- public static junit.framework.Test suite()
- {
-
- return new junit.framework.TestSuite(InvalidDestinationTest.class);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.basic; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.testutil.QpidTestCase; + +import javax.jms.Session; +import javax.jms.QueueSession; +import javax.jms.Queue; +import javax.jms.QueueSender; +import javax.jms.TextMessage; +import javax.jms.InvalidDestinationException; + +public class InvalidDestinationTest extends QpidTestCase +{ + private AMQConnection _connection; + + protected void setUp() throws Exception + { + super.setUp(); + _connection = (AMQConnection) getConnection("guest", "guest"); + } + + protected void tearDown() throws Exception + { + _connection.close(); + super.tearDown(); + } + + + + public void testInvalidDestination() throws Exception + { + Queue invalidDestination = new AMQQueue("amq.direct","unknownQ"); + AMQQueue validDestination = new AMQQueue("amq.direct","knownQ"); + QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + // This is the only easy way to create and bind a queue from the API :-( + queueSession.createConsumer(validDestination); + + QueueSender sender = queueSession.createSender(invalidDestination); + TextMessage msg = queueSession.createTextMessage("Hello"); + try + { + sender.send(msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + sender.close(); + + sender = queueSession.createSender(null); + invalidDestination = new AMQQueue("amq.direct","unknownQ"); + + try + { + sender.send(invalidDestination,msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + sender.send(validDestination,msg); + sender.close(); + validDestination = new AMQQueue("amq.direct","knownQ"); + sender = queueSession.createSender(validDestination); + sender.send(msg); + + + + + } + + + public static junit.framework.Test suite() + { + + return new junit.framework.TestSuite(InvalidDestinationTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index 34197f2608..46b99fac8d 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -1,221 +1,221 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.unit.client.temporaryqueue;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TextMessage;
-import junit.framework.Assert;
-
-import org.apache.qpid.testutil.QpidTestCase;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.transport.TransportConnection;
-
-import java.util.List;
-import java.util.LinkedList;
-
-public class TemporaryQueueTest extends QpidTestCase
-{
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- protected Connection createConnection() throws Exception
- {
- return getConnection("guest", "guest");
- }
-
- public void testTempoaryQueue() throws Exception
- {
- Connection conn = createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue queue = session.createTemporaryQueue();
- assertNotNull(queue);
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
- conn.start();
- producer.send(session.createTextMessage("hello"));
- TextMessage tm = (TextMessage) consumer.receive(2000);
- assertNotNull(tm);
- assertEquals("hello", tm.getText());
-
- try
- {
- queue.delete();
- fail("Expected JMSException : should not be able to delete while there are active consumers");
- }
- catch (JMSException je)
- {
- ; //pass
- }
-
- consumer.close();
-
- try
- {
- queue.delete();
- }
- catch (JMSException je)
- {
- fail("Unexpected Exception: " + je.getMessage());
- }
-
- conn.close();
- }
-
- public void tUniqueness() throws Exception
- {
- int numProcs = Runtime.getRuntime().availableProcessors();
- final int threadsProc = 5;
-
- runUniqueness(1, 10);
- runUniqueness(numProcs * threadsProc, 10);
- runUniqueness(numProcs * threadsProc, 100);
- runUniqueness(numProcs * threadsProc, 500);
- }
-
- void runUniqueness(int makers, int queues) throws Exception
- {
- Connection connection = createConnection();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
-
- //Create Makers
- for (int m = 0; m < makers; m++)
- {
- tqList.add(new TempQueueMaker(session, queues));
- }
-
-
- List<Thread> threadList = new LinkedList<Thread>();
-
- //Create Makers
- for (TempQueueMaker maker : tqList)
- {
- threadList.add(new Thread(maker));
- }
-
- //Start threads
- for (Thread thread : threadList)
- {
- thread.start();
- }
-
- // Join Threads
- for (Thread thread : threadList)
- {
- try
- {
- thread.join();
- }
- catch (InterruptedException e)
- {
- fail("Couldn't correctly join threads");
- }
- }
-
-
- List<AMQQueue> list = new LinkedList<AMQQueue>();
-
- // Test values
- for (TempQueueMaker maker : tqList)
- {
- check(maker, list);
- }
-
- Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
-
- connection.close();
- }
-
- private void check(TempQueueMaker tq, List<AMQQueue> list)
- {
- for (AMQQueue q : tq.getList())
- {
- if (list.contains(q))
- {
- fail(q + " already exists.");
- }
- else
- {
- list.add(q);
- }
- }
- }
-
-
- class TempQueueMaker implements Runnable
- {
- List<AMQQueue> _queues;
- Session _session;
- private int _count;
-
-
- TempQueueMaker(Session session, int queues) throws JMSException
- {
- _queues = new LinkedList<AMQQueue>();
-
- _count = queues;
-
- _session = session;
- }
-
- public void run()
- {
- int i = 0;
- try
- {
- for (; i < _count; i++)
- {
- _queues.add((AMQQueue) _session.createTemporaryQueue());
- }
- }
- catch (JMSException jmse)
- {
- //stop
- }
- }
-
- List<AMQQueue> getList()
- {
- return _queues;
- }
- }
-
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TemporaryQueueTest.class);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.client.temporaryqueue; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; +import junit.framework.Assert; + +import org.apache.qpid.testutil.QpidTestCase; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; + +import java.util.List; +import java.util.LinkedList; + +public class TemporaryQueueTest extends QpidTestCase +{ + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + protected Connection createConnection() throws Exception + { + return getConnection("guest", "guest"); + } + + public void testTempoaryQueue() throws Exception + { + Connection conn = createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + conn.start(); + producer.send(session.createTextMessage("hello")); + TextMessage tm = (TextMessage) consumer.receive(2000); + assertNotNull(tm); + assertEquals("hello", tm.getText()); + + try + { + queue.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); + } + catch (JMSException je) + { + ; //pass + } + + consumer.close(); + + try + { + queue.delete(); + } + catch (JMSException je) + { + fail("Unexpected Exception: " + je.getMessage()); + } + + conn.close(); + } + + public void tUniqueness() throws Exception + { + int numProcs = Runtime.getRuntime().availableProcessors(); + final int threadsProc = 5; + + runUniqueness(1, 10); + runUniqueness(numProcs * threadsProc, 10); + runUniqueness(numProcs * threadsProc, 100); + runUniqueness(numProcs * threadsProc, 500); + } + + void runUniqueness(int makers, int queues) throws Exception + { + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>(); + + //Create Makers + for (int m = 0; m < makers; m++) + { + tqList.add(new TempQueueMaker(session, queues)); + } + + + List<Thread> threadList = new LinkedList<Thread>(); + + //Create Makers + for (TempQueueMaker maker : tqList) + { + threadList.add(new Thread(maker)); + } + + //Start threads + for (Thread thread : threadList) + { + thread.start(); + } + + // Join Threads + for (Thread thread : threadList) + { + try + { + thread.join(); + } + catch (InterruptedException e) + { + fail("Couldn't correctly join threads"); + } + } + + + List<AMQQueue> list = new LinkedList<AMQQueue>(); + + // Test values + for (TempQueueMaker maker : tqList) + { + check(maker, list); + } + + Assert.assertEquals("Not enough queues made.", makers * queues, list.size()); + + connection.close(); + } + + private void check(TempQueueMaker tq, List<AMQQueue> list) + { + for (AMQQueue q : tq.getList()) + { + if (list.contains(q)) + { + fail(q + " already exists."); + } + else + { + list.add(q); + } + } + } + + + class TempQueueMaker implements Runnable + { + List<AMQQueue> _queues; + Session _session; + private int _count; + + + TempQueueMaker(Session session, int queues) throws JMSException + { + _queues = new LinkedList<AMQQueue>(); + + _count = queues; + + _session = session; + } + + public void run() + { + int i = 0; + try + { + for (; i < _count; i++) + { + _queues.add((AMQQueue) _session.createTemporaryQueue()); + } + } + catch (JMSException jmse) + { + //stop + } + } + + List<AMQQueue> getList() + { + return _queues; + } + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TemporaryQueueTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java index d25986d991..54b2ee95f4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java @@ -1,144 +1,144 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.close;
-
-import junit.framework.Assert;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.testutil.QpidTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.junit.concurrency.TestRunnable;
-import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-/**
- * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method.
- * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent
- * before closing the connection.
- *
- * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Check that
- * closing a connection whilst handling a message, blocks till completion of the handler. </table>
- */
-public class CloseBeforeAckTest extends QpidTestCase
-{
- private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class);
-
- Connection connection;
- Session session;
- public static final String TEST_QUEUE_NAME = "TestQueue";
- private int TEST_COUNT = 25;
-
- class TestThread1 extends TestRunnable implements MessageListener
- {
- public void runWithExceptions() throws Exception
- {
- // Set this up to listen for message on the test session.
- session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this);
- }
-
- public void onMessage(Message message)
- {
- // Give thread 2 permission to close the session.
- allow(new int[] { 1 });
-
- // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete.
- waitFor(new int[] { 1 }, true);
- }
- }
-
- TestThread1 testThread1 = new TestThread1();
-
- TestRunnable testThread2 =
- new TestRunnable()
- {
- public void runWithExceptions() throws Exception
- {
- // Send a message to be picked up by thread 1.
- session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME),
- session.createTextMessage("Hi there thread 1!"));
-
- // Wait for thread 1 to pick up the message and give permission to continue.
- waitFor(new int[] { 0 }, false);
-
- // Close the connection.
- session.close();
-
- // Allow thread 1 to continue to completion, if it is erronously still waiting.
- allow(new int[] { 1 });
- }
- };
-
- public void testCloseBeforeAutoAck_QPID_397() throws Exception
- {
- // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks
- // message at the end of the onMessage method, after a close has been sent.
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ThreadTestCoordinator tt = new ThreadTestCoordinator(2);
-
- tt.addTestThread(testThread1, 0);
- tt.addTestThread(testThread2, 1);
- tt.setDeadlockTimeout(500);
- tt.run();
-
- String errorMessage = tt.joinAndRetrieveMessages();
-
- // Print any error messages or exceptions.
- log.debug(errorMessage);
-
- if (!tt.getExceptions().isEmpty())
- {
- for (Exception e : tt.getExceptions())
- {
- log.debug("Exception thrown during test thread: ", e);
- }
- }
-
- Assert.assertTrue(errorMessage, "".equals(errorMessage));
- }
-
- public void closeBeforeAutoAckManyTimes() throws Exception
- {
- for (int i = 0; i < TEST_COUNT; i++)
- {
- testCloseBeforeAutoAck_QPID_397();
- }
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- connection = getConnection("guest", "guest");
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.close; + +import junit.framework.Assert; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.QpidTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.junit.concurrency.TestRunnable; +import org.apache.qpid.junit.concurrency.ThreadTestCoordinator; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; + +/** + * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method. + * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent + * before closing the connection. + * + * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Check that + * closing a connection whilst handling a message, blocks till completion of the handler. </table> + */ +public class CloseBeforeAckTest extends QpidTestCase +{ + private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class); + + Connection connection; + Session session; + public static final String TEST_QUEUE_NAME = "TestQueue"; + private int TEST_COUNT = 25; + + class TestThread1 extends TestRunnable implements MessageListener + { + public void runWithExceptions() throws Exception + { + // Set this up to listen for message on the test session. + session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this); + } + + public void onMessage(Message message) + { + // Give thread 2 permission to close the session. + allow(new int[] { 1 }); + + // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete. + waitFor(new int[] { 1 }, true); + } + } + + TestThread1 testThread1 = new TestThread1(); + + TestRunnable testThread2 = + new TestRunnable() + { + public void runWithExceptions() throws Exception + { + // Send a message to be picked up by thread 1. + session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME), + session.createTextMessage("Hi there thread 1!")); + + // Wait for thread 1 to pick up the message and give permission to continue. + waitFor(new int[] { 0 }, false); + + // Close the connection. + session.close(); + + // Allow thread 1 to continue to completion, if it is erronously still waiting. + allow(new int[] { 1 }); + } + }; + + public void testCloseBeforeAutoAck_QPID_397() throws Exception + { + // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks + // message at the end of the onMessage method, after a close has been sent. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ThreadTestCoordinator tt = new ThreadTestCoordinator(2); + + tt.addTestThread(testThread1, 0); + tt.addTestThread(testThread2, 1); + tt.setDeadlockTimeout(500); + tt.run(); + + String errorMessage = tt.joinAndRetrieveMessages(); + + // Print any error messages or exceptions. + log.debug(errorMessage); + + if (!tt.getExceptions().isEmpty()) + { + for (Exception e : tt.getExceptions()) + { + log.debug("Exception thrown during test thread: ", e); + } + } + + Assert.assertTrue(errorMessage, "".equals(errorMessage)); + } + + public void closeBeforeAutoAckManyTimes() throws Exception + { + for (int i = 0; i < TEST_COUNT; i++) + { + testCloseBeforeAutoAck_QPID_397(); + } + } + + protected void setUp() throws Exception + { + super.setUp(); + connection = getConnection("guest", "guest"); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java index 0b9d0bdc2d..131cbd5f68 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java @@ -1,89 +1,89 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.unit.message;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.testutil.QpidTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-/**
- * @author Apache Software Foundation
- */
-public class JMSDestinationTest extends QpidTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(JMSDestinationTest.class);
-
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public void testJMSDestination() throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false,
- true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
-
- Connection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
-
- TextMessage sentMsg = producerSession.createTextMessage("hello");
- assertNull(sentMsg.getJMSDestination());
-
- producer.send(sentMsg);
-
- assertEquals(sentMsg.getJMSDestination(), queue);
-
- con2.close();
-
- con.start();
-
- TextMessage rm = (TextMessage) consumer.receive();
- assertNotNull(rm);
-
- assertEquals(rm.getJMSDestination(), queue);
- con.close();
- }
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.message; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.testutil.QpidTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * @author Apache Software Foundation + */ +public class JMSDestinationTest extends QpidTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(JMSDestinationTest.class); + + + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + public void testJMSDestination() throws Exception + { + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = + new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, + true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + Connection con2 = (AMQConnection) getConnection("guest", "guest"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + TextMessage sentMsg = producerSession.createTextMessage("hello"); + assertNull(sentMsg.getJMSDestination()); + + producer.send(sentMsg); + + assertEquals(sentMsg.getJMSDestination(), queue); + + con2.close(); + + con.start(); + + TextMessage rm = (TextMessage) consumer.receive(); + assertNotNull(rm); + + assertEquals(rm.getJMSDestination(), queue); + con.close(); + } + +} |
